In the first post of this series, I looked at how to achieve parallel execution in Python using multiprocessing pools and discussed how this is unsuitable with WSGI-based web frameworks because WSGI only allows the web server to create new processes, not the framework. At the end, I mentioned several alternative Python HTTP servers which use asynchronous I/O with an event-loop-based scheduler to handle parallelism and do work with multiprocessing.

In the second post, we looked at how asynchronous I/O works in general, and specifically how it works in Python–specifically how callback code can be rewritten in terms of functions with the async and await keywords, inspired by monadic I/O in functional programming languages.

How will we combine these two things? First, I’m going to show you the wrong way to do it!

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
import asyncio
import multiprocessing

# define anything you need in the worker processes before you create
# the pool.
def fibonacci(n: int):
    """A function which finds the n-th element in the Fibonacci sequence
    with catastrophically bad performance as n increases.
    """
    if n < 2:
        return n
    else:
        return fibonacci(n-1) + fibonacci(n-2)

pool = multiprocessing.Pool()

async def fib_task(n: int):
    """compute fibonacci n-th element in the fibonacci sequence in another
    process in the background while polling for the result.

    DO NOT DO THIS!!!
    """
    result = pool.apply_async(fibonacci, [n])
    while not result.ready():
        await asyncio.sleep(0)
    return result.get()

async def main():
    my_fib_task = asyncio.create_task(fib_task(33))

    # do some other work here. maybe talk to some sockets.

    print(await my_fib_task)

asyncio.run(main())

This works and it’s not horrible, but it’s going to keep the CPU working in the main process going over the loop. Normal async tasks will use select to poll I/O internally so the main process isn’t wasting CPU cycles jumping in and out of the scheduler while waiting on a result.

Luckily, asyncio provides its own way to communicate with a process pool. Each asyncio Loop instance has a run_in_executor method, which is the main “escape hatch” to run CPU intensive code in an asyncio application.

However there is one additional module we need to bring it all together, because this function doesn’t work with muliprocessing.Pool, but rather subclasses of concurrent.futurers.Executor. There are two subclasses which may be used, ProcessPoolExecutor and ThreadPoolExecutor. We won’t use threads for this because, as discussed in the first post in this series, each Python process is locked to one core, so jobs running in a different thread will still take turns using the same core. This might be appropriate to use with a library function that has blocking I/O for which there is no asyncio alternative.

However, in our case, the goal really is to do work on multiple cores in the background while the web server handles requests, so we will take the ProcessPoolExecutor.

The code we wrote earlier would instead be written like this:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
import asyncio
import concurrent.futures

def fibonacci(n: int):
    if n < 2:
        return n
    else:
        return fibonacci(n-1) + fibonacci(n-2)

pool = concurrent.futures.ProcessPoolExecutor()

async def main():
    loop = asyncio.get_running_loop()
    my_fib_task = loop.run_in_executor(pool, fibonacci, 33)

    # do work here

    print(await my_fib_task)

asyncio.run(main())

Just to go over this:

  • First, define all the code that will be used in the worker processes. Worker processes are created with fork (… uh, on Unix-like operating systems…), which means all the memory of the currently running process will be copied into workers, so only objects which will have been defined earlier exist in the workers.
  • Next, create your executor. Here, we retain the name pool from the previous example. You can specify options when you create the executor, but without any options, it will create one worker for each core the machine makes available to it.
  • When we finally want to use the code, first we have to get the event loop from asyncio (using asyncio.get_running_loop, as you see), and then use its run_in_executor method to turn this background work into an awaitable task.

As a final step to all of this, let’s put this in the context of a web framework which supports async and await. I’ll use Tornado because I know it, but it can be any asynchronous web framework, including one that runs on ASGI.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
import asyncio
import concurrent.futures
import tornado.web

def fibonacci(n: int):
    if n < 2:
        return n
    else:
        return fibonacci(n-1) + fibonacci(n-2)

pool = concurrent.futures.ProcessPoolExecutor()

class FibHandler(tornado.web.RequestHandler):
    async def get(self, number):
        n = int(number)
        loop = asyncio.get_running_loop()
        task = loop.run_in_executor(pool, fibonacci, n)
        self.write(str(await task))

async def main():
    # this line defines a route to our fibonacci handler.
    # Input must be an integer or the server will return 404.
    app = tornado.web.Application([(r"/(\d+)", FibHandler)])
    app.listen(8888)
    await asyncio.Event().wait()

if __name__ == "__main__":
    asyncio.run(main())

Nothing too surprising here. We simply combine what we’ve already done with an existing web framework and we get an http server that you can shoot numbers at, and it will send back the nth fibonacci number (with the very slow fibonacci implementation).

Limitations

One thing to be aware of is that we are not making the computation go away here—we simply move it out of the web server’s process. The effect of this is not to speed up the response time of an individual response. The only real solution for this would be to use a faster algorithm or a faster programming language. In many cases, caching results may also improve performance.

Likewise, we are limited by the number of cores on our server. When all the workers are working on a computation, additional requests will have to wait for the workers to become available.

What we have done is allow the web server itself to keep on accepting requests and putting jobs into the queue. This is much better than the web server becoming unresponsive when all the server’s threads are blocking.

The other thing we have done is make all the machine’s cores available to do work. A normal WSGI server does not make it possible for the user to distribute work to multiple processes, and there’s no way to ensure that the server will do it in a way that makes sense for CPU intensive workloads, since most http services are not designed that way (for good reason!) and spend most of their time waiting around on a database.