asyncio
25 Feb 2019At Faculty, we build a lot of our platform’s backend services in Scala. It’s a really nice programming language to work with, and I’ve found the functional programming model and strong typing really effective in writing well tested, robust software, however when it came to writing a lightweight agent for our new jobs feature, we decided that the computational resource demands of running the JVM was too costly.
We decided instead to leverage the experience of the team in writing modern Python code, taking advantage of some of the new functionality added to recent versions, in particular asynchronous programming with coroutines and type hinting.
In this post I’ll share some of our experience in developing asynchronous code in Python with asyncio.
Coroutines and event loops
Before diving into some code examples, I’d like to explain a little about how asynchronous programming with coroutines works.
Coroutines are a kind of concurrent programming that works collaboratively. Each coroutine is a line of execution that can suspend its control of the program to allow another coroutine to run. This is particularly useful when a coroutine is waiting for something like a network request to complete - in this case execution of that coroutine cannot continue, but other useful work could be done by other coroutines.
Implementation of a coroutine programming system requires some way of code releasing control of the program and being resumed at a later time (we will cover the syntax for this in Python below), but also requires a system for managing the execution of active coroutines. This is typically achieved with an event loop. The event loop keeps track of active coroutines, and when one releases control, the event loop will pass control to another.
It’s useful to have basic familiarity with these concepts as it helps to understand the syntax and purpose of the libraries we’re dealing with. I’ll refer to event loops and coroutines extensively through the post.
Writing coroutines in Python
Python 3.5 introduced the async/await syntax to the language. These keywords
allow you to declare coroutines and use them. For example, use async def
to
declare a coroutine:
async def example():
print("Example is running")
The above allows you to declare a coroutine, but to run it, it needs to be
explicitly run on an event loop. Calling it will return a coroutine
object
which is not actually running:
>>> example()
<coroutine object example at 0x10b2bb4c0>
You’re free to use whichever event loop implementation you like, however the
standard library provides asyncio, a popular choice that a lot of
third party tools like aiohttp have been built on top of. To run our
coroutine with asyncio, get an event loop and pass it the coroutine
object we
got above:
>>> import asyncio
>>> loop = asyncio.get_event_loop()
>>> loop.run_until_complete(example())
Example is running
>>> loop.close()
Note: Python 3.7 added asyncio.run()
, which creates an event loop and
runs a coroutine on it for you. In thise case the above example becomes simply
asyncio.run(example())
. In the rest of the example I’ll use asyncio.run()
,
assuming Python 3.7 or later, but you can adapt the code to create a loop and
call its run_until_complete()
method if using older Python versions.
Giving up control
We’ve covered writing basic coroutines, however as mentioned earlier they’re
only really useful when they yield control of the event loop so other
coroutines can do some work. To do this, you await
another coroutine (or
other awaitable objects provided by asyncio):
async def inner():
print("inner coroutine")
async def example():
await inner()
In this example, at the point where we await
the inner coroutine, control of
the event loop is given up, allowing other coroutines on the loop to be
executed instead.
If the called coroutine returns something on completion, the await
statement
will return it when the coroutine completes. In the following example, we bind
the result of get_message
and print it:
>>> async def get_message():
>>> return "Coroutines are great"
>>>
>>> async def example():
>>> message = await get_message()
>>> print(message)
>>>
>>> asyncio.run(example())
Coroutines are great
A more complete example
Having coroutines yield control of the event loop is most useful when it’s
anticipated that we’re going to have to wait idle for a while until some useful
work can be done. We can emulate this case with asyncio.sleep
, which simply
waits for a specified number of seconds before completing:
import asyncio
async def print_after(message, delay):
"""Print a message after the specified delay (in seconds)"""
await asyncio.sleep(delay)
print(message)
async def main():
# Use asyncio.gather to run two coroutines concurrently:
await asyncio.gather(
print_after("world!", 2),
print_after("Hello", 1)
)
asyncio.run(main())
Running this example prints out:
Hello
world!
When do coroutines start running?
A common pitfall when using coroutines with asyncio is that they sometimes need
to be scheduled on the event loop explicitly. Consider the following example,
where I’ve attempted to reproduce the same behaviour as when using
asyncio.gather
above:
import asyncio
async def print_after(message, delay):
"""Print a message after the specified delay (in seconds)"""
await asyncio.sleep(delay)
print(message)
async def main():
# Start coroutine twice (hopefully they start!)
first_awaitable = print_after("world!", 2)
second_awaitable = print_after("Hello", 1)
# Wait for coroutines to finish
await first_awaitable
await second_awaitable
asyncio.run(main())
However, when running this, I get the following output, despite expecting “Hello” to get printed first after its shorter delay:
world!
Hello
The reason for this becomes clearer after adapting the example to instead print at the start and end of the coroutine’s execution:
import asyncio
async def example(message):
print("start of example():", message)
await asyncio.sleep(1)
print("end of example():", message)
async def main():
# Start coroutine twice (hopefully they start!)
first_awaitable = example("First call")
second_awaitable = example("Second call")
# Wait for coroutines to finish
await first_awaitable
await second_awaitable
asyncio.run(main())
Running the above results in the following output:
start of example(): First call
end of example(): First call
start of example(): Second call
end of example(): Second call
The problem is that asyncio is not starting the execution of the second call to
example()
until the first one is finished. Surely this defeats the purpose of
using coroutines in the first place!
Well, this all happens because asyncio does not start the execution of a
coroutine until one is explicitly registered with it (such as with
asyncio.run()
) or you await it in another coroutine. If we want to start
multiple coroutines and have them run concurrently as above, we can either use
asyncio.gather()
as in the earlier example, or schedule them individually
with asyncio.create_task()
:
import asyncio
async def print_after(message, delay):
"""Print a message after the specified delay (in seconds)"""
await asyncio.sleep(delay)
print(message)
async def main():
# Start coroutine twice (hopefully they start!)
first_awaitable = asyncio.create_task(print_after("world!", 2))
second_awaitable = asyncio.create_task(print_after("Hello", 1))
# Wait for coroutines to finish
await first_awaitable
await second_awaitable
asyncio.run(main())
The adapted snippet above starts running the coroutines immediately and waits for them to finish, resulting in “Hello” getting printed first as expected:
Hello
world!
Note: asyncio.create_task()
was introduced in Python 3.7. In older
versions of Python, use asyncio.ensure_future()
instead.
Running commands in asyncio
The main responsibilities of the job agent we developed were to install the dependencies of a batch job and then run the job itself. Each of these steps require running potentially long lasting shell commands, while simultaneously logging their output and CPU and memory utilisation on the computer with a service over HTTP.
Using coroutine-based concurrency works really well with this model, as for most of the time of the program’s execution time, it’s waiting for other processes or for network I/O. We therefore decided to implement our agent with Python coroutines and asyncio’s implementation of subprocess.
asyncio provides an interface for running commands that’s very similar to
subprocess
in the Python standard library:
import asyncio
async def echo(string):
process = await asyncio.create_subprocess_exec("echo", string)
await process.wait()
asyncio.run(echo("Hello, world!"))
Processes are created using asyncio.create_subprocess_exec()
(which is itself
a coroutine and so needs to be awaited). Some of the methods on the process
object are also coroutines (like .wait()
in the above example).
Asynchronous HTTP with aiohttp
Our job agent is also responsible for sending information back to a central tracking server, for example to determine the health of the job and allow the central service to take action when a job becomes unhealthy.
Making a network request is another I/O bound operation that fits well with a coroutine-based concurrency programming model. We used aiohttp, a popular HTTP library built on top of asyncio, to send monitoring information back to our tracking service.
To make an HTTP request with aiohttp:
import aiohttp
import asyncio
async def fetch_and_print(url):
async with aiohttp.ClientSession() as session:
response = await session.get(url)
print(await response.text())
asyncio.run(fetch_and_print("https://python.org/"))
The above example uses an aiohttp.ClientSession
as an asynchronous context
manager with the async wait
syntax. This works much in the same way as
standard context managers in Python, except that the code that governs entering
and exiting the context is implemented in a coroutine and so can also be
executed asyncronously. In this case, using the session as a context manager
ensures that it is closed when we’re done with it.
Putting it all together
With the tools above, we can now put together a simple version of our job running agent, using asyncio to have everything running concurrently!
The agent will:
- Send a notification to a tracking server to indicate that it has started
- Periodically send a heartbeat to the tracking server so it knows the job is still healthy
- Run the job command
- Send a notification to the tracking server on completion of the job, indicating if it was successful or if it failed
I’ve put the complete example on GitHub, along with a simple backend you can test it against. See in particular agent.py for the example agent using coroutines to send heartbeats to the tracking server while running the command.
Summary
Couroutines are a great way to achieve concurrency in Python programs performing I/O bound tasks such as running system commands and handling network requests. I’ve demonstrated some of the basics of using them and provided a more complete example application, but I encourage you to go out and try it out for yourself to see how it works for your needs.