Python coroutines with async and await

Posted on Tue 04 April 2017 in blog

This is a short introduction of the basic vocabulary and knowledge needed to start with async and await, we'll go from "scratch", but not from the whole "let's use generators" era, when everything started, but from a contemporary (Python 3.5) point of view with natives coroutines, async, await.

Coroutine

A coroutine is a function whose execution can be suspended.

Coroutines are a great tool to avoid the callback hell, still offering a non-blocking way to express ourselves.

When a function has to wait for something, suspending it instead of blocking allows to do something else. In other words, it permits concurrency (without involving threads or other processes).

Without coroutines there are two solutions: Block or use callbacks.

A typical callback example in a imaginary language may look like:

function pong_handler(client)
{
    client.on('data', function (data)
    {
        client.on('data_written', function ()
        {
            client.close()
        });
        client.write(data)
        client.flush()
    });
}

With coroutines, it would look like:

async function pong_handler()
{
    client.write(await client.read())
    await client.flush()
    client.close()
}

Coroutines in Python

To be exhaustive, there are two kinds of coroutines in Python:

  • generator-based coroutines
  • native coroutines

Generator-based coroutines, are the old-style ones, but you may enconter some of them, they are written using the @types.coroutine (or @asyncio.coroutine) decorator:

@types.coroutine
def get_then_print(url):
    ...

Native coroutines, the ones you should remember, are defined using the async keyword:

async def get_then_print(url):
    ...

A coroutine function, when called returns a coroutine object:

>>> async def tum():
...     print("tum")
...
>>> tum()
<coroutine object tum at 0x7fa294538468>

From this coroutine object the coroutine function can be manipulated:

>>> async def tum():
...     print("tum")
...
>>> a_coroutine_object = tum()
>>> a_coroutine_object.send(None)
tum
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
  StopIteration

As you can see, calling tun() did not execute the print("tum"), but calling .send(None) on the coroutine object did (see PEP 342). Also, send told us the coroutine is complete by throwing a StopIteration.

Coroutine invocation

As a coroutine just return a coroutine object when called, at some point, some code have to call send(None) on it. But that's typically the role of a "main loop". The main loop will also watch for StopIteration.

Getting result from a coroutine call

A coroutine give its returned value in a StopIteration exception, meaning "Job is done, stop trying to un-suspend me", but typically a scheduler / event loop will do that for you and provide the result by one way or another.

There's also a keyword dedicated to pull a value from a coroutine: await. Given an awaitable, await tries to get a value from it, and if the awaitable suspends, await also suspends the current coroutine, and so on, up to the .send() caller.

As long as no coroutine suspends themselves, there is no need call send(None) a second time to restore the coroutines. So, there's no need for a loop or a scheduler, simply calling send(None) once is enough:

async def two():
    return 2


async def four():
    return await two() + await two()


async def eight():
    return await four() + await four()


coro = eight()
coro.send(None)

Which gets StopIteration: 8, in a completly synchronous way, despite the vocabulary used.

Suspending a coroutine

This is just not possible for a coroutine to suspend itself. But the whole chain of await can be suspended using a yield in a "future-like object".

A future-like object is an object with an __await__ method, which can yield a value which will traverse the whole await chain up to the caller of .send(None), at this point, an event loop, a scheduler, a tranpoline, whatever it's called need to understand why the future-like object did it by reading the value received from the send(None) call, act accordingly, like restoring another coroutine, wait for an event on the network, whatever.

Awaitables

awaitable are objects that can be awaited with an await keyword.

Basically a coroutine or an object with an __await__ method returning an iterator are both awaitables.

Managing coroutines

The Python library provide asyncio which contains an event loop which take the scheduler role between coroutines, allowing them to suspend, trying to understand why, and restoring them at the right time.

Coroutine manager, step 1

The first step for a coroutine manager is probably to call send(None), which is enough to do some work:

async def two():
    return 2


async def four():
    return await two() + await two()


async def eight():
    return await four() + await four()

def coro_manager(coro):
    try:
        coro.send(None)
    except StopIteration as stop:
        return stop.value

print(coro_manager(eight()))
# prints 8

But this coro_manager will just don't terminate the work if a suspension occurs:

class Awaitable:
    def __await__(self):
        yield


async def wont_terminate_here():
    await Awaitable()
    print("Terminated")
    return 42

print(coro_manager(wont_terminate_here()))
# prints 'None', but no "Terminated" to be seen

So a coro_manager should probably restore a suspended coroutine, by calling back send(None), like this frenetic_coro_manager:

def frenetic_coro_manager(coro):
    try:
        while True:
            coro.send(None)
    except StopIteration as stop:
        return stop.value

We're now getting "Terminated" followed by "42".

But here, this frenetic_coro_manager can only execute a single coroutine, blindly restoring it if it suspends. Here is an implementation working with a list of coroutines instead of a single one, restoring a random one:

def frenetic_coro_manager(*coros):
    coros = list(coros)
    while coros:
        coro = random.choice(coros)
        try:
            coro.send(None)
        except StopIteration as stop:
            coros.remove(coro)

Still print "Terminated" followed by "42", but can work with multiple coroutines, like:

async def tum():
    while True:
        await Awaitable()
        print("Tum")


async def pak():
    while True:
        await Awaitable()
        print("Pak")

frenetic_coro_manager(tum(), pak())

This time, this frenetically print "Tum"s and "Pak"s. But the interesting point is that both coroutines co-existed, by co-executing their while True.

Here, await Awaitable() is only usefull to give the hand back to the frenetic_coro_manager, but it awaits nothing, which is semantically… empty.

But it's possible for the yield of the awaitable object to yield something usefull to the coroutine manager, like an ETA before calling send again:

class Sleep:
    def __init__(self, seconds):
        self.sleep_until = datetime.now() + timedelta(seconds=seconds)

    def __await__(self):
        yield self.sleep_until


async def tum():
    while True:
        await Sleep(1)
        print("Tum")


async def pak():
    while True:
        await Sleep(2)
        print("Pak")


def measured_coro_manager(*coros):
    coros = [(datetime.now(), coro) for coro in coros]
    heapq.heapify(coros)
    while coros:
        exec_at, coro = heapq.heappop(coros)
        if exec_at > datetime.now():
            sleep((exec_at - datetime.now()).total_seconds())
        try:
            exec_at = coro.send(None)
        except StopIteration as stop:
            pass
        else:
            heapq.heappush(coros, (exec_at, coro))

print(measured_coro_manager(tum(), pak()))

Gives:

Tum
Pak
Tum
Tum
Pak
Tum
Tum
Pak
…

In this implementation, the "main loop" measured_coro_manager waited gracefully as specified by Sleep before calling send(None).

From this point we can go further by playing with the dining philosophers problem:

import heapq
from time import sleep
from datetime import datetime, timedelta


class Sleep:
    """Basic Sleep implementation, tightly paired with `measured_coro_manager`
    which expect each `yield` to give an absolute datetime of revival.
    """
    def __init__(self, seconds):
        self.sleep_until = datetime.now() + timedelta(seconds=seconds)

    def __await__(self):
        yield self.sleep_until


class ForkTaking:
    """Asynchronous context manager describing the action of taking a
    fork.  The fork may be a context manager by itself, but it's nice
    to get verbose logging by knowing which is taking what.

    """
    def __init__(self, fork, philosopher):
        self.philosopher = philosopher
        self.fork = fork

    async def __aenter__(self):
        self.philosopher.speak("Need fork {}".format(self.fork.number))
        while self.fork.held_by:
            # self.philosopher.speak("Need fork {}".format(self.fork.number))
            await Sleep(.1)
        self.fork.held_by = self.philosopher
        self.philosopher.speak("Took fork {}".format(self.fork.number))
        return self

    async def __aexit__(self, exc_type, exc, traceback):
        self.philosopher.speak("Release fork {}".format(self.fork.number))
        self.fork.held_by = None


class Fork:
    """Represent a fork a Philosopher can grab and release via
    an asynchronous context manager.
    """
    def __init__(self, number):
        self.number = number
        self.held_by = None

    def take(self, philosopher):
        return ForkTaking(self, philosopher)

    def __str__(self):
        return "<Fork {}>".format(self.number)


class Philosopher:
    """Having a reference to a left fork and a right fork so he can grab them,
    the philosopher thinks and eat.
    """
    def __init__(self, number, left_fork, right_fork):
        self.number = number
        print("Hello, I'm philosopher {} with forks {} and {}".format(
            number, left_fork, right_fork))
        self.left_fork = left_fork
        self.right_fork = right_fork
        self.hungry_since = None

    def __str__(self):
        return "<Philosopher {}>".format(self.number)

    def speak(self, message):
        print(' ' * self.number * 20, message)

    async def behave(self):
        """Basic implementations of rules:
         - think
         - hungry, grab forks
         - eat
        in a loop.
        """
        if self.left_fork.number > self.right_fork.number:
            first_fork, second_fork = self.left_fork, self.right_fork
        else:
            first_fork, second_fork = self.right_fork, self.left_fork
        while True:
            self.speak("*thinking*")
            await Sleep(.5)
            self.hungry_since = datetime.now()
            self.speak("I'm hungry!")
            async with first_fork.take(self):
                await Sleep(0)
                async with second_fork.take(self):
                    self.hungry_since = None
                    self.speak("*eating*")
                    await Sleep(.5)


class Table():
    """The table is responsible of creating Forks and Philosophers,
    and distributing forks to them, what a powerfull table.
    """
    def __init__(self):
        self.forks = [Fork(i) for i in range(5)]
        self.philosophers = []
        for i in range(5):
            self.philosophers.append(Philosopher(
                i,
                self.forks[i - 1],
                self.forks[i]))


async def check_life(philosophers):
    while True:
        await Sleep(1)
        for philosopher in philosophers:
            if philosopher.hungry_since is None:
                continue
            if philosopher.hungry_since < datetime.now() - timedelta(seconds=60):
                print("Philosopher {} is dead".format(philosopher))


def measured_coro_manager(*coros):
    coros = [(datetime.now(), i, coro) for i, coro in enumerate(coros)]
    i = len(coros)
    heapq.heapify(coros)
    while coros:
        exec_at, _, coro = heapq.heappop(coros)
        now = datetime.now()
        if exec_at > now:
            sleep((exec_at - now).total_seconds())
        try:
            exec_at = coro.send(None)
        except StopIteration as stop:
            pass
        else:
            i += 1
            heapq.heappush(coros, (exec_at, i, coro))

table = Table()
measured_coro_manager(check_life(table.philosophers),
                      *[philosopher.behave() for
                        philosopher in
                        table.philosophers])

And you can play with this a lot, typically in the behave method, you can remove the fork order and always start by taking the left fork, you can add or remove, increment or decrements Sleeps, and so on...

I/O

Now, the next step is almost obviously networking: Why not rewriting measured_coro_manager so it can not only sleep by also use select, poll to wait for network event, and combine this with asnchronous read and write syscalls? That's exactly what's asyncio is, see selector_events.py in the asyncio module. I hope now the separation between asyncio, async and await is clearer.