In the first post of this series, I looked at how to achieve parallel execution in Python using multiprocessing and discussed how this is unsuitable with WSGI-based web frameworks because WSGI only allows the web server to create new processes, not the framework. At the end, I mentioned several alternative Python HTTP servers which use asynchronous I/O with an event-loop-based scheduler to handle parallelism.

In this post, we will look at how asynchronous I/O works in general, and specifically how it works in Python.

Event loops and asyncio

Event-loop-based concurrency generally works by putting a lot of small tasks into a scheduler that take turns using the CPU and waiting in the queue. A task keeps executing until it hits I/O, at which point it will give control back to the scheduler, along with a file descriptor and some other information for resuming the task. When it is detected that the I/O operation is complete, the task will resume executing where it left off until it completes or hits another I/O operation. This means your thread can keep doing work while it is waiting on I/O, but each task will only use a tiny amount of memory when it’s not executing (relative to a process or an OS thread). It’s a very efficient way to use a single core.

Callbacks, monads and async/await

Traditionally, the way a scheduler resumes a task is with a callback function which would take the result of the I/O operation as input. This works well, but dealing with a lot of deeply nested callbacks is not easy for programmers, and is often referred to as “callback hell”.

Consider the following Python program:

1
2
3
print("what is your name? ")
name = input()
print(f"hello, {name}.")

The question is printed, we wait for user input, and we print a greeting.

However, if we need to return control to the scheduler for each I/O operation with callbacks, our program will look like this:

1
2
3
4
5
6
7
8
9
async_print(
    "what is your name? ",
    callback=lambda _: async_input(
        callback=lambda name: async_print(
            f"hello, {name}.",
            callback=None
        )
    )
)

This code is ridiculous in Python, but it is not uncommon for JavaScript which handles web requests to have a similar structure because you don’t want your page to freeze while content loads in the background. It’s also how every I/O function works in Haskell. Why?

Haskell is a purely functional, lazy programming language. Each program can be thought of as a mathematical expression rather than a list of steps. sub-expressions don’t evaluate until their value is needed (i.e. lazy evaluation). However, the order of evaluation in programs is often important, especially when I/O is involved! How does Haskell make sure I/O happens at the right time?

The answer is monads!

One way to think about monads is as a wrapper around a value. You can only get to the value by giving a callback function to the monad where the unwrapped value is bound to an argument and so available inside the body of that function.

In Python, it might look like this:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
class IOMonad:
    def __init__(self, value):
        self._value = value

    def bind(self, callback) -> "IOMonad":
        # In Haskell, there would be some runtime magic
        # here to ensure the expression producing the value
        # is evaluated, but Python doesn't need this.

        return callback(self._value)


def monadic_print(message):
    return IOMonad(print(message))


def monadic_input():
    return IOMonad(input())


monadic_print("what is your name?").bind(
    lambda _: monadic_input().bind(
        lambda name: monadic_print(f"hello, {name}")
    )
)

We see that this monadic code looks similar to the callback version we had earlier. However, because this pattern is so prevalant in Haskell, they created a special syntax called do notation. This do notation allows the monadic callback code at the end of the previous example to be written like this (in a world where Python had do notation):

1
2
3
4
do:
    monadic_print("what is your name?")
    name <- monadic_input()
    monadic_print(f"hello, {name}")

This is just a simple syntax transformation in the parser, but it’s much prettier than callback code.

F#, a functional language for .NET, doesn’t use lazy evaluation or need monads for IO. Still, the F# developers were inspired by Haskell to use monads to implement asynchronous I/O in a way that looks more like sequential code than callbacks, and they created async blocks to preform a similar syntax transformation to do notation in Haskell.

Seeing this from the other side of Microsoft, The C# developers were inspired and added async and await keywords to the language that signaled to the compiler to convert functions written sequentially into the same kind of monadic callback code we’ve seen above.

There is a progression from old-style callback code like in C and JavaScript to monadic callbacks in Haskell and F#, and finally to async and await keywords which signal syntax transformations in C#. This approach was adopted in TypeScript and JavaScript as well.

The Python community also wanted a way to write asynchronous code without explicit callbacks. However, Python got rid of the callbacks altogether. Python already had resumable functions in the form of generators. Generators allow efficient transfer of control flow between the calling context in the function itself at specified breakpoints using the yield keyword. While it looks different and it doesn’t use a callback, yield is providing a similar functionality to the monadic bind function.

Python’s coroutines were originally implemented in terms of these generators (check out this video to see how). Today, we also use async and await keywords to create coroutines in Python, but they are implemented in the same way as generator functions. The async keyword simply shows that the function definition which follows returns an awaitable coroutine, rather than a normal value. The await keyword is the breakpoint where control flow will be returned to the scheduler to keep the event loop chugging away.

This monadic approach to asynchronous I/O has some drawbacks. Only coroutines can await, so you get into this situation where older I/O libraries which don’t use async are not usable (from a practical standpoint) with in new async code. You end up in the “What color is your function?” situation. It all gets very annoying when you didn’t await something you should have or you try to await something that doesn’t work that way. Still, it is a useful way to retrofit asynchronous I/O into a language which wasn’t designed for it. Newer languages like Go have a simpler way to handle this because the scheduler was baked into the runtime and all the I/O functions from the beginning.

Still, whether or not async and await are the ideal way to solve this problem, they are better than writing callbacks, and they are what we have at our disposal in Python and a number of other languages.

Using asyncio

So we have these pseudo-monadic async and await keywords for defining coroutines in Python, but what can we do with them? By themselves, not very much!

1
2
3
4
async def get_greeting():
    return "hello, world."

print(get_greeting())
1
2
3
4
<coroutine object get_greeting at 0x7f009fab7940>
example.py:6: RuntimeWarning: coroutine 'get_greeting' was never awaited
  print(get_greeting())
RuntimeWarning: Enable tracemalloc to get the object allocation traceback

Hm. Not great. We defined a get_greeting coroutine, but nothing happened because we never await. Let’s try:

1
2
3
4
async def get_greeting():
    return "hello, world."

print(await get_greeting())
1
2
3
4
  File "example.py", line 6
    print(await get_greeting())
          ^
SyntaxError: 'await' outside function

It didn’t work the first time because I didn’t await the coroutine, and in the second attempt, it wouldn’t let me await the coroutine?

Well, that’s because all of this is supposed to be executed in the context of an event loop. There are several available for Python. The one distributed with the standard library is in the asyncio module. There are a few ways to get the loop running and put things into it. The one you should start with these days is asyncio.run, which creates the loop and runs a coroutine in it, returning the output.

1
2
3
4
5
6
7
8
import asyncio

async def get_greeting():
    return "hello, world."

print(asyncio.run(get_greeting()))

# works!

Nice. Let’s see what we can do with this!

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
import asyncio
import datetime


async def wait_on_io():
    # it's a lie. We're just sleeping, not waiting on I/O at all.
    await asyncio.sleep(1)
    return datetime.datetime.now()


async def main():
    for i in range(6):
        time = await wait_on_io()
        print(f"work from {i} finished at", time)


asyncio.run(main())
1
2
3
4
5
6
work from 0 finished at 2021-10-14 16:44:14.702228
work from 1 finished at 2021-10-14 16:44:15.704024
work from 2 finished at 2021-10-14 16:44:16.705294
work from 3 finished at 2021-10-14 16:44:17.706283
work from 4 finished at 2021-10-14 16:44:18.707198
work from 5 finished at 2021-10-14 16:44:19.708191

Hm. This was slow and didn’t seem to do anything concurrently. Just writing async functions doesn’t make your code automatically concurrent. Yes, it is technically yielding control flow to the scheduler, but it goes right back to where it left off because there is nothing else scheduled.

To bring it all together, we create tasks with asyncio.create_task, which takes a coroutine as input. When you create the task, it is scheduled, and it will run concurrently with the other jobs in the scheduler. You can await the task later to block in the running coroutine until the result is ready. (Note that it only blocks in the current coroutine. The other tasks can still run.) We update our two functions accordingly…

1
2
3
4
5
6
7
8
9
async def wait_on_io(i):
    await asyncio.sleep(1)
    return i, datetime.datetime.now()

async def main():
    tasks = [asyncio.create_task(wait_on_io(i)) for i in range(6)]
    for task in tasks:
        i, time = await task
        print(f"work from {i} finished at", time)

… and we get something like this:

1
2
3
4
5
6
work from 0 finished at 2021-10-14 16:46:41.748214
work from 1 finished at 2021-10-14 16:46:41.748263
work from 2 finished at 2021-10-14 16:46:41.748277
work from 3 finished at 2021-10-14 16:46:41.748290
work from 4 finished at 2021-10-14 16:46:41.748302
work from 5 finished at 2021-10-14 16:46:41.748313

All the jobs complete at more or less the same time and the program is fast! If the order of the output doesn’t matter, we can tweak our main function a little more:

1
2
3
4
5
async def main():
    tasks = {asyncio.create_task(wait_on_io(i)) for i in range(10)}
    for task in asyncio.as_completed(tasks):
        i, time = await task
        print(f"work from {i} finished at", time)

Using asyncio.as_completed makes it so it won’t hold up the order by waiting on the first task to finish if one after it is already done, though it doesn’t make a difference in our trivial example.

The weakness of event loops

I’ve said a lot so far about how great using an event loop as a mechanism for concurrency is because of the speed and high density of tasks it allows. However, there is one major drawback. Asynchronous I/O is a form of cooperative multitasking. This means it is always explicit where control flow is given up to the scheduler, and one task never interrupts another to use the CPU. This is in contrast to OS threads, which can interrupt each other while executing.

The consequence of this in a language like Python or JavaScript where execution is always on a single core is that, if you get stuck in a point where control is never returned to the scheduler, everything else has to wait on it. This is especially a problem if you’re doing CPU-intensive work—exactly the case with my Revrit API!

The third and final post of this series will explore how to combine multiprocessing and asyncio to finally achieve the goal of having a responsive web server together with CPU-intensive parallel execution.