This is the second part of the series, in the first part we talked about the general idea of concurrency, how it's different from parallelism and saw how Python handles concurrency.
Part 1: Talking Concurrency -1
In the second part of the blog, we will look into the modern solution towards the problem using the new Asyncio
module.
Import Asyncio
In the last post, we looked into a basic code snippet on how can we write concurrently. We also discussed some of the basic terminology used while using the Asyncio module. If you don't remember you should quickly take a recap as we would look at those concepts in a bit detailed manner.
Before looking at some code, let's understand some basic terminologies that would help in understanding the code better.
Eventloop: it's an infinite loop that keeps track of all the running tasks. It manages all the suspended functions and executes them when the time is right. These functions are stored in the queue called as the Task Queue, the event loop constantly polls the task queue and passes them to the event loop. When a task is passed on to the event loop it returns back a future object.
Future: a future is an indirect reference to a forthcoming result. It can loosely be translated as promise you make to do something when a condition is met, so when the condition is met a future can “callback” when ready to be executed. Since everything is an object in python, future is also an object that has the __await__()
method implemented and its job is to hold a certain state and result. The state can be one of three things:
Pending: it does not have a result or exception yet.
Cancelled: it was canceled
Finished: it was finished either with a result or exception.
Futures also have a method called the add_done_callback()
this is method allows the function to be called as soon as the task is completed with its process and is returned with a result. Which is the python object that would be returned with the expected result or raise an exception when the task is finished.
- Tasks: a task executes a coroutine in an event loop. In a program,
asyncio.create_task(coroutine)
wraps the coroutine into a task and schedules its execution. asyncio.create_task(coroutine)
returns a task object. Every time a coroutine is awaited for a future, the future is sent back to the task and binds itself to the future by calling the add_done_callback()
on the future. From now on if the state of the future changes from either canceled or finished, while raising an exception or by passing the result as a python object. The task will be called and it will rise back up to its existence.
Since a typical program will have multiple tasks to be executed concurrently, we create normally with asyncio.create_task(coroutine)
but we run them with asyncio.gather()
.
- Coroutine: Asyncio was introduced in Python 3.4, initially it started off as decorator based coroutines
@asyncio.coroutine
which used a yield from
keyword. Later in Python 3.5 async
and await
keywords were introduced which made working/reading concurrent code much easier. I won't go into much detailed on how coroutines evolved to the new async def
keyword, because I planning to write a separate blog on that.
As we looked into the basic definition of coroutines in the last blog, we can loosely describe them as restartable functions.
You make a coroutine with the help of the async def
keyword and you can suspend the coroutine with the await
keyword. Every time when you await
the function gets suspended while whatever you asked to wait on happens, and then when it's finished, the event loop will wake the function up again and resume it from the await call, passing any result out. Since coroutines evolved from generators and generators are iterators with __iter__()
method, coroutines also have __await__()
which allows them to continue every time await
is called.
At each step a coroutine does three things:
- It either awaits a future
- It awaits another coroutine
- It returns a result.
Before moving forward, I want to talk about await
. In Python, anything that can be awaited i.e used with the await
keyword is called an awaitable object. The most common awaitable that you would use would be coroutines, futures and tasks. Thus anything is blocking get's put to the event loop using the await
and added to the list of paused coroutines.
Now let's look at a very basic async program to understand how everything fits in together.
import asyncio
import asyncio
async def compute(x, y):
print("Compute %s + %s ..." % (x, y))
await asyncio.sleep(1.0)
return x + y
async def print_sum(x, y):
result = await compute(x, y)
print("%s + %s = %s" % (x, y, result))
asyncio.run(print_sum())
The sequence diagram below describes the flow of the above program.
Now that we know all the basic terminology used in an async program let's look at a slightly complex code below for getting a better understanding all the jargons we learned above.
import asyncio
async def compute(x, y):
"""
A coroutine that takes in two values and returns the sum.
"""
print(f"Computing the value of {x} and {y}")
await asyncio.sleep(1)
return x + y
async def print_sum():
"""
A coroutine that creates tasks.
"""
value1 = asyncio.create_task(compute(1, 0))
value2 = asyncio.create_task(compute(1, 0))
value3 = asyncio.create_task(compute(1, 0))
print(sum(await asyncio.gather(value1, value2, value3)))
asyncio.run(print_sum())
async def print_sum()
and async def compute()
are the two coroutines in the above program, the async def print_sum()
as the main function used in the sync programming. The main function executes the entire program and all the functions related to it. The same approach is followed here, one coroutine awaits all the other coroutine.
Though this can be easily miss-understood, in that case, the program would just fine but would run in more like a sequential manner.
value1 = await asyncio.create_task(compute(1, 0))
value2 = await asyncio.create_task(compute(1, 0))
value3 = await asyncio.create_task(compute(1, 0))
print(sum(value1, value2, value3))
The above code can be a good example of how not to write async code, here using await on every task we are making all the calls sync thus making the program sequential. To avoid this asyncio.gather()
is used in the program. To gather all the tasks in the program, value1
, value2
and value3
.
Finally, when all the tasks are gathered together, they are run concurrently.
Sync-Async-Sync
A lot of time you might be in a situation where you might have to call a sync function def
from coroutine async def
or have to call coroutine async def
from sync function def
. Ideally, you “shouldn't” use sync functions for calls that can be async like a database call because that is something that could provide further optimization. But there is nothing wrong with using a synchronous library for database, an async library for HTTP and gradually move things to async.
Calling a sync function def
from a coroutine async def
. In that case, you run the sync function in a different thread using the threadpool executor. The runinexecutor() method of the event loop takes an executor instance, a regular callable to invoke, and any arguments to be passed to the callable. It returns a Future that can be used to wait for the function to finish its work and return something.
import asyncio
import concurrent.futures
def blocking_io():
# File operations (such as logging) can block the
# event loop: run them in a thread pool.
with open('/dev/urandom', 'rb') as f:
return f.read(100)
def cpu_bound():
# CPU-bound operations will block the event loop:
# in general it is preferable to run them in a
# process pool.
return sum(i * i for i in range(10 ** 7))
async def main():
loop = asyncio.get_running_loop()
## Options:
# 1. Run in the default loop's executor:
result = await loop.run_in_executor(
None, blocking_io)
print('default thread pool', result)
# 2. Run in a custom thread pool:
with concurrent.futures.ThreadPoolExecutor() as pool:
result = await loop.run_in_executor(
pool, blocking_io)
print('custom thread pool', result)
# 3. Run in a custom process pool:
with concurrent.futures.ProcessPoolExecutor() as pool:
result = await loop.run_in_executor(
pool, cpu_bound)
print('custom process pool', result)
asyncio.run(main())
When you have to call coroutines from the normal sync function. You just have to manually get_event_loo()
, create tasks()
and call the asyncio.gather()
function. Since you can await
, one thing you can do is create a queue with asyncio.queue()
and use that queue to pass around the data between different coroutines.
import asyncio
async def compute(x, y, data):
print(f"Computing the value of {x} and {y}")
result = x + y
await data.put(result)
async def process(n, data):
processed, sumx = 0, 0
while processed < n:
item = await data.get()
print(item)
processed += 1
value = item
sumx += value
print(f"The sum is:{sumx}")
await asyncio.sleep(.5)
def main():
loop = asyncio.get_event_loop()
data = asyncio.Queue()
sum1 = loop.create_task(compute(1, 4, data))
sum2 = loop.create_task(compute(0, 0, data))
sum3 = loop.create_task(process(2, data))
final_task = asyncio.gather(sum1, sum2, sum3)
loop.run_until_complete(final_task)
if __name__ == '__main__':
main()
What now?
- Just to get a better understanding of all the next syntax you learned, you can try out a sample problem mentioned below.
Write a program that reads log files and refires those URLs that have a 5xx status code. Once the refiring is done just add the &retry=True
in the prefix of the URL and store them in a separate log file.
The log file will be a text file, you can check out a sample file here.
- As I am still exploring the concept concurrency so I don't exactly know the best practices and pitfalls you should avoid while writing async code, but I highly recommend you check out asyncio: We Did It Wrong – roguelynn. This article can be a good followup after you are done with this one and are comfortable with syntax of asyncio.
Just before ending the blog I would like to thank maxking and Jason Braganza for helping me out in the blog.
In the next part of the series, I will be talking about threads and finally will conclude the series with asyncio based frameworks such as quart and aiohttp.
Happy Coding!