Python coroutines with async and await
Posted on Tue 04 April 2017 in blog
This is a short introduction of the basic vocabulary and knowledge
needed to start with async
and await
, we'll go from "scratch", but
not from the whole "let's use generators" era, when everything
started, but from a contemporary (Python 3.5) point of view with
natives coroutines
, async
, await
.
Coroutine
A coroutine is a function whose execution can be suspended.
Coroutines are a great tool to avoid the callback hell, still offering a non-blocking way to express ourselves.
When a function has to wait for something, suspending it instead of blocking allows to do something else. In other words, it permits concurrency (without involving threads or other processes).
Without coroutines there are two solutions: Block or use callbacks.
A typical callback example in a imaginary language may look like:
function pong_handler(client)
{
client.on('data', function (data)
{
client.on('data_written', function ()
{
client.close()
});
client.write(data)
client.flush()
});
}
With coroutines, it would look like:
async function pong_handler()
{
client.write(await client.read())
await client.flush()
client.close()
}
Coroutines in Python
To be exhaustive, there are two kinds of coroutines in Python:
- generator-based coroutines
- native coroutines
Generator-based coroutines, are the old-style ones, but you may
enconter some of them, they are written using the @types.coroutine
(or @asyncio.coroutine
) decorator:
@types.coroutine
def get_then_print(url):
...
Native coroutines, the ones you should remember, are defined using the
async
keyword:
async def get_then_print(url):
...
A coroutine function
, when called returns a coroutine object
:
>>> async def tum():
... print("tum")
...
>>> tum()
<coroutine object tum at 0x7fa294538468>
From this coroutine object
the coroutine function
can be manipulated:
>>> async def tum():
... print("tum")
...
>>> a_coroutine_object = tum()
>>> a_coroutine_object.send(None)
tum
Traceback (most recent call last):
File "<stdin>", line 1, in <module>
StopIteration
As you can see, calling tun()
did not execute the print("tum")
,
but calling .send(None)
on the coroutine object did (see PEP 342). Also, send
told us the coroutine is complete by throwing a
StopIteration
.
Coroutine invocation
As a coroutine just return a coroutine object when called, at some
point, some code have to call send(None)
on it. But that's typically
the role of a "main loop". The main loop will also watch for
StopIteration
.
Getting result from a coroutine call
A coroutine give its returned value in a StopIteration
exception,
meaning "Job is done, stop trying to un-suspend me", but typically a
scheduler / event loop will do that for you and provide the result
by one way or another.
There's also a keyword dedicated to pull a value from a coroutine:
await
. Given an awaitable
, await
tries to get a value from it,
and if the awaitable
suspends, await
also suspends the current
coroutine, and so on, up to the .send()
caller.
As long as no coroutine suspends themselves, there is no need call
send(None)
a second time to restore the coroutines. So, there's no
need for a loop or a scheduler, simply calling send(None)
once is
enough:
async def two():
return 2
async def four():
return await two() + await two()
async def eight():
return await four() + await four()
coro = eight()
coro.send(None)
Which gets StopIteration: 8
, in a completly synchronous way, despite
the vocabulary used.
Suspending a coroutine
This is just not possible for a coroutine to suspend itself. But the
whole chain of await
can be suspended using a yield
in a
"future-like object".
A future-like
object is an object with an __await__
method, which
can yield
a value which will traverse the whole await
chain up to
the caller of .send(None)
, at this point, an event loop, a
scheduler, a tranpoline, whatever it's called need to understand why
the future-like object
did it by reading the value received from the
send(None)
call, act accordingly, like restoring another coroutine,
wait for an event on the network, whatever.
Awaitables
awaitable
are objects that can be awaited
with an await
keyword.
Basically a coroutine
or an object with an __await__
method
returning an iterator are both awaitables.
Managing coroutines
The Python library provide asyncio
which contains an event loop
which take the scheduler role between coroutines, allowing them to
suspend, trying to understand why, and restoring them at the right
time.
Coroutine manager, step 1
The first step for a coroutine manager is probably to call
send(None)
, which is enough to do some work:
async def two():
return 2
async def four():
return await two() + await two()
async def eight():
return await four() + await four()
def coro_manager(coro):
try:
coro.send(None)
except StopIteration as stop:
return stop.value
print(coro_manager(eight()))
# prints 8
But this coro_manager
will just don't terminate the work if a
suspension occurs:
class Awaitable:
def __await__(self):
yield
async def wont_terminate_here():
await Awaitable()
print("Terminated")
return 42
print(coro_manager(wont_terminate_here()))
# prints 'None', but no "Terminated" to be seen
So a coro_manager
should probably restore a suspended coroutine, by
calling back send(None)
, like this frenetic_coro_manager
:
def frenetic_coro_manager(coro):
try:
while True:
coro.send(None)
except StopIteration as stop:
return stop.value
We're now getting "Terminated" followed by "42".
But here, this frenetic_coro_manager
can only execute a single coroutine,
blindly restoring it if it suspends. Here is an implementation working
with a list of coroutines instead of a single one, restoring a random
one:
def frenetic_coro_manager(*coros):
coros = list(coros)
while coros:
coro = random.choice(coros)
try:
coro.send(None)
except StopIteration as stop:
coros.remove(coro)
Still print "Terminated" followed by "42", but can work with multiple coroutines, like:
async def tum():
while True:
await Awaitable()
print("Tum")
async def pak():
while True:
await Awaitable()
print("Pak")
frenetic_coro_manager(tum(), pak())
This time, this frenetically print "Tum"s and "Pak"s. But the
interesting point is that both coroutines co-existed, by co-executing
their while True
.
Here, await Awaitable()
is only usefull to give the hand back to the frenetic_coro_manager
, but it awaits nothing, which is semantically… empty.
But it's possible for the yield
of the awaitable object
to yield
something usefull to the coroutine manager, like an ETA before calling
send
again:
class Sleep:
def __init__(self, seconds):
self.sleep_until = datetime.now() + timedelta(seconds=seconds)
def __await__(self):
yield self.sleep_until
async def tum():
while True:
await Sleep(1)
print("Tum")
async def pak():
while True:
await Sleep(2)
print("Pak")
def measured_coro_manager(*coros):
coros = [(datetime.now(), coro) for coro in coros]
heapq.heapify(coros)
while coros:
exec_at, coro = heapq.heappop(coros)
if exec_at > datetime.now():
sleep((exec_at - datetime.now()).total_seconds())
try:
exec_at = coro.send(None)
except StopIteration as stop:
pass
else:
heapq.heappush(coros, (exec_at, coro))
print(measured_coro_manager(tum(), pak()))
Gives:
Tum
Pak
Tum
Tum
Pak
Tum
Tum
Pak
…
In this implementation, the "main loop" measured_coro_manager
waited
gracefully as specified by Sleep
before calling send(None)
.
From this point we can go further by playing with the dining philosophers problem:
import heapq
from time import sleep
from datetime import datetime, timedelta
class Sleep:
"""Basic Sleep implementation, tightly paired with `measured_coro_manager`
which expect each `yield` to give an absolute datetime of revival.
"""
def __init__(self, seconds):
self.sleep_until = datetime.now() + timedelta(seconds=seconds)
def __await__(self):
yield self.sleep_until
class ForkTaking:
"""Asynchronous context manager describing the action of taking a
fork. The fork may be a context manager by itself, but it's nice
to get verbose logging by knowing which is taking what.
"""
def __init__(self, fork, philosopher):
self.philosopher = philosopher
self.fork = fork
async def __aenter__(self):
self.philosopher.speak("Need fork {}".format(self.fork.number))
while self.fork.held_by:
# self.philosopher.speak("Need fork {}".format(self.fork.number))
await Sleep(.1)
self.fork.held_by = self.philosopher
self.philosopher.speak("Took fork {}".format(self.fork.number))
return self
async def __aexit__(self, exc_type, exc, traceback):
self.philosopher.speak("Release fork {}".format(self.fork.number))
self.fork.held_by = None
class Fork:
"""Represent a fork a Philosopher can grab and release via
an asynchronous context manager.
"""
def __init__(self, number):
self.number = number
self.held_by = None
def take(self, philosopher):
return ForkTaking(self, philosopher)
def __str__(self):
return "<Fork {}>".format(self.number)
class Philosopher:
"""Having a reference to a left fork and a right fork so he can grab them,
the philosopher thinks and eat.
"""
def __init__(self, number, left_fork, right_fork):
self.number = number
print("Hello, I'm philosopher {} with forks {} and {}".format(
number, left_fork, right_fork))
self.left_fork = left_fork
self.right_fork = right_fork
self.hungry_since = None
def __str__(self):
return "<Philosopher {}>".format(self.number)
def speak(self, message):
print(' ' * self.number * 20, message)
async def behave(self):
"""Basic implementations of rules:
- think
- hungry, grab forks
- eat
in a loop.
"""
if self.left_fork.number > self.right_fork.number:
first_fork, second_fork = self.left_fork, self.right_fork
else:
first_fork, second_fork = self.right_fork, self.left_fork
while True:
self.speak("*thinking*")
await Sleep(.5)
self.hungry_since = datetime.now()
self.speak("I'm hungry!")
async with first_fork.take(self):
await Sleep(0)
async with second_fork.take(self):
self.hungry_since = None
self.speak("*eating*")
await Sleep(.5)
class Table():
"""The table is responsible of creating Forks and Philosophers,
and distributing forks to them, what a powerfull table.
"""
def __init__(self):
self.forks = [Fork(i) for i in range(5)]
self.philosophers = []
for i in range(5):
self.philosophers.append(Philosopher(
i,
self.forks[i - 1],
self.forks[i]))
async def check_life(philosophers):
while True:
await Sleep(1)
for philosopher in philosophers:
if philosopher.hungry_since is None:
continue
if philosopher.hungry_since < datetime.now() - timedelta(seconds=60):
print("Philosopher {} is dead".format(philosopher))
def measured_coro_manager(*coros):
coros = [(datetime.now(), i, coro) for i, coro in enumerate(coros)]
i = len(coros)
heapq.heapify(coros)
while coros:
exec_at, _, coro = heapq.heappop(coros)
now = datetime.now()
if exec_at > now:
sleep((exec_at - now).total_seconds())
try:
exec_at = coro.send(None)
except StopIteration as stop:
pass
else:
i += 1
heapq.heappush(coros, (exec_at, i, coro))
table = Table()
measured_coro_manager(check_life(table.philosophers),
*[philosopher.behave() for
philosopher in
table.philosophers])
And you can play with this a lot, typically in the behave
method,
you can remove the fork order and always start by taking the left
fork, you can add or remove, increment or decrements Sleep
s, and so
on...
I/O
Now, the next step is almost obviously networking: Why not rewriting
measured_coro_manager
so it can not only sleep by also use select
,
poll
to wait for network event, and combine this with asnchronous
read
and write
syscalls? That's exactly what's asyncio
is, see
selector_events.py
in the asyncio
module. I hope now the
separation between asyncio
, async
and await
is clearer.