Talking Concurrency: Asyncio

This is the second part of the series, in the first part we talked about the general idea of concurrency, how it's different from parallelism and saw how Python handles concurrency.

Part 1: Talking Concurrency -1

In the second part of the blog, we will look into the modern solution towards the problem using the new Asyncio module.

Import Asyncio

In the last post, we looked into a basic code snippet on how can we write concurrently. We also discussed some of the basic terminology used while using the Asyncio module. If you don't remember you should quickly take a recap as we would look at those concepts in a bit detailed manner.

Before looking at some code, let's understand some basic terminologies that would help in understanding the code better.

Pending: it does not have a result or exception yet. Cancelled: it was canceled Finished: it was finished either with a result or exception.

Futures also have a method called the add_done_callback() this is method allows the function to be called as soon as the task is completed with its process and is returned with a result. Which is the python object that would be returned with the expected result or raise an exception when the task is finished.

Since a typical program will have multiple tasks to be executed concurrently, we create normally with asyncio.create_task(coroutine) but we run them with asyncio.gather().

As we looked into the basic definition of coroutines in the last blog, we can loosely describe them as restartable functions.

You make a coroutine with the help of the async def keyword and you can suspend the coroutine with the await keyword. Every time when you await the function gets suspended while whatever you asked to wait on happens, and then when it's finished, the event loop will wake the function up again and resume it from the await call, passing any result out. Since coroutines evolved from generators and generators are iterators with __iter__() method, coroutines also have __await__() which allows them to continue every time await is called.

At each step a coroutine does three things:

Before moving forward, I want to talk about await. In Python, anything that can be awaited i.e used with the await keyword is called an awaitable object. The most common awaitable that you would use would be coroutines, futures and tasks. Thus anything is blocking get's put to the event loop using the await and added to the list of paused coroutines.

Now let's look at a very basic async program to understand how everything fits in together.

import asyncio

import asyncio

async def compute(x, y):
    print("Compute %s + %s ..." % (x, y))
    await asyncio.sleep(1.0)
    return x + y

async def print_sum(x, y):
    result = await compute(x, y)
    print("%s + %s = %s" % (x, y, result))

asyncio.run(print_sum())

The sequence diagram below describes the flow of the above program.

tulip_coro.png

Now that we know all the basic terminology used in an async program let's look at a slightly complex code below for getting a better understanding all the jargons we learned above.

import asyncio


async def compute(x, y):
    """
    A coroutine that takes in two values and returns the sum.
    """
    print(f"Computing the value of {x} and {y}")
    await asyncio.sleep(1)
    return x + y


async def print_sum():
    """
    A coroutine that creates tasks.
    """
    value1 = asyncio.create_task(compute(1, 0))
    value2 = asyncio.create_task(compute(1, 0))
    value3 = asyncio.create_task(compute(1, 0))
    print(sum(await asyncio.gather(value1, value2, value3)))

asyncio.run(print_sum())

async def print_sum() and async def compute() are the two coroutines in the above program, the async def print_sum() as the main function used in the sync programming. The main function executes the entire program and all the functions related to it. The same approach is followed here, one coroutine awaits all the other coroutine.

Though this can be easily miss-understood, in that case, the program would just fine but would run in more like a sequential manner.

    value1 = await asyncio.create_task(compute(1, 0))
    value2 = await asyncio.create_task(compute(1, 0))
    value3 = await asyncio.create_task(compute(1, 0))
    print(sum(value1, value2, value3))

The above code can be a good example of how not to write async code, here using await on every task we are making all the calls sync thus making the program sequential. To avoid this asyncio.gather() is used in the program. To gather all the tasks in the program, value1, value2 and value3.

Finally, when all the tasks are gathered together, they are run concurrently.

Sync-Async-Sync

A lot of time you might be in a situation where you might have to call a sync function def from coroutine async def or have to call coroutine async def from sync function def. Ideally, you “shouldn't” use sync functions for calls that can be async like a database call because that is something that could provide further optimization. But there is nothing wrong with using a synchronous library for database, an async library for HTTP and gradually move things to async.

Calling a sync function def from a coroutine async def. In that case, you run the sync function in a different thread using the threadpool executor. The runinexecutor() method of the event loop takes an executor instance, a regular callable to invoke, and any arguments to be passed to the callable. It returns a Future that can be used to wait for the function to finish its work and return something.

import asyncio
import concurrent.futures

def blocking_io():
    # File operations (such as logging) can block the
    # event loop: run them in a thread pool.
    with open('/dev/urandom', 'rb') as f:
        return f.read(100)

def cpu_bound():
    # CPU-bound operations will block the event loop:
    # in general it is preferable to run them in a
    # process pool.
    return sum(i * i for i in range(10 ** 7))

async def main():
    loop = asyncio.get_running_loop()

    ## Options:

    # 1. Run in the default loop's executor:
    result = await loop.run_in_executor(
        None, blocking_io)
    print('default thread pool', result)

    # 2. Run in a custom thread pool:
    with concurrent.futures.ThreadPoolExecutor() as pool:
        result = await loop.run_in_executor(
            pool, blocking_io)
        print('custom thread pool', result)

    # 3. Run in a custom process pool:
    with concurrent.futures.ProcessPoolExecutor() as pool:
        result = await loop.run_in_executor(
            pool, cpu_bound)
        print('custom process pool', result)

asyncio.run(main())

When you have to call coroutines from the normal sync function. You just have to manually get_event_loo() , create tasks() and call the asyncio.gather() function. Since you can await, one thing you can do is create a queue with asyncio.queue() and use that queue to pass around the data between different coroutines.


import asyncio


async def compute(x, y, data):
    print(f"Computing the value of {x} and {y}")
    result = x + y
    await data.put(result)


async def process(n, data):
    processed, sumx = 0, 0
    while processed < n:
        item = await data.get()
        print(item)
        processed += 1
        value = item
        sumx += value
    print(f"The sum is:{sumx}")
    await asyncio.sleep(.5)


def main():
    loop = asyncio.get_event_loop()
    data = asyncio.Queue()
    sum1 = loop.create_task(compute(1, 4, data))
    sum2 = loop.create_task(compute(0, 0, data))
    sum3 = loop.create_task(process(2, data))
    final_task = asyncio.gather(sum1, sum2, sum3)
    loop.run_until_complete(final_task)


if __name__ == '__main__':
    main()

What now?

Write a program that reads log files and refires those URLs that have a 5xx status code. Once the refiring is done just add the &retry=True in the prefix of the URL and store them in a separate log file.

The log file will be a text file, you can check out a sample file here.

Just before ending the blog I would like to thank maxking and Jason Braganza for helping me out in the blog.

In the next part of the series, I will be talking about threads and finally will conclude the series with asyncio based frameworks such as quart and aiohttp.

Happy Coding!