Simple Stream Processing With I/O Operations
Suppose we need to process a data stream in a sequence of steps (or “operations” or “functions”). If all the steps are CPU-bound, then we just chain them up. Each data item goes through the CPU-bound functions one by one; the CPU is fully utilized. Now, if one or more of the steps involve I/O, typical examples being disk I/O or http service calls, things get interesting.
The interesting part is that I/O-bound operations spend much time waiting on something external of the CPU. If we naively call the I/O function and wait for its result, much of the time is wasted, because during the time of waiting, the CPU may very well do something else useful. The solution, of course, is concurrency—let multiple calls to the I/O function be active at the same time, so that their waiting periods overlap. Nowadays a good way to do I/O concurrency is using
asyncio. In this post, I’ll develop a few utilities to make this task nice and easy.
Async generators and async iterators
Since much of the code will run in an async context, I’ll require the data stream to be an async iterable, that is, we’re going to consume the data with
aync for ... whenever we can. As it turned out, we need a little more than async iterable—we need the data stream to be an async iterator so that we don’t have to be in the
async for ... loop. Instead, we could request “the next item” by calling
data.__anext__() directly. That gives the algorithm a lot of extra flexibility and, fortunately, this extra doesn’t require a lot of extra effort.
In fact, the preferred way to create async iterators is by async generators. (Analogous to the recommendation of using generators to produce iterators.)
An async generator is an async function that contains
yield statements. For example, below is a small convenience function that turns a (sync) iterable into an async iterator:
import asyncio from collections.abc import Iterable, AsyncIterator async def stream(x: Iterable) -> AsyncIterator: for xx in x: yield xx
Let’s verify the types of things:
In : x = range(10) In : y = stream(x) In : type(y) Out: async_generator In : isinstance(y, AsyncIterator) Out: True
Note that although
stream is an async function, we don’t call it with
await stream(...). Instead we directly call
stream(...) and the result is an async generator, which satisfies the interface of
AsyncIterator. A little puzzling, I know. Just get used to it. In fact,
AsyncGenerator inherits from
Sure enough, we can consume the elements of an
AsyncIterator (or just
In : import asyncio In : async def foo(): ...: async for z in y: ...: print(z) ...: In : asyncio.run(foo()) 0 1 2 3 4 5 6 7 8 9 In :
I’m going to call the mechanism that handles I/O operations a
It takes data items out of an
AsyncIterator, makes concurrent calls to an async I/O function, and outputs the results in the same order that the data items come in.
import typing from typing import Callable, Awaitable, Any, TypeVar T = TypeVar('T') async def transform( in_stream: typing.AsyncIterator[T], func: Callable[[T], Awaitable[Any]], *, workers: int, **func_args, ): if workers < 2: async for x in in_stream: y = await func(x, **func_args) yield y return # to be continued ...
func represents an async I/O operation.
(It doesn’t have to be “I/O”. For example, it can be the async API to a service that is running in other processes, such as the model service I described previously.)
workers specify how many concurrent calls are allowed to
func. The case where
workers < 2 is simple and has no concurrency at all, so we get it out of the way first. The interesting part comes next.
Because the input could be an infinite stream, the design must take care to control input consumption, concurrent calls, and output production so that no component starves or gets overwhelmed. A good tool for such control is a size-capped queue. The design revolves around four questions:
- How is input consumed?
- How is output returned?
- How are input items processed concurrently?
- How is the order of output items controlled to follow that of the input?
# ... continuing out_stream = asyncio.Queue(workers * 8) lock = asyncio.Lock() finished = False NO_MORE_DATA = object() async def _process(in_stream, lock, out_stream, func, **kwargs): nonlocal finished while not finished: async with lock: if finished: break try: x = await in_stream.__anext__() except StopAsyncIteration: finished = True await out_stream.put(NO_MORE_DATA) break fut = asyncio.Future() await out_stream.put(fut) y = await func(x, **kwargs) fut.set_result(y) t_workers = [ asyncio.create_task(_process( in_stream, lock, out_stream, func, **func_args)) for _ in range(workers) ] while True: fut = await out_stream.get() if fut is NO_MORE_DATA: break yield await fut for t in t_workers: await t
Concurrency is achieved by as many as
workers count of “background” tasks,
each of which independently and repeatedly fetches items from the input stream, processes them, and places results in an output queue.
As such, the input stream is not consumed in an
async for ... loop. Rather, each task directly calls
__anext__ of the input stream (which is an
AsyncIterator) to request the next item whenever the task is ready to work on the next item. The task ends when
__anext__ indicates the input stream has run out.
The tasks are launched by
asyncio.create_task and scheduled by the event loop to run “soon”.
workers is 4, then 4 copies of the function
_process are running concurrently. At the end, we take care to
await on each task to be sure they finish up properly.
Once one task sees the end of
it needs to prevent other tasks from calling
in_stream.__anext__. This is achieved by an indicator variable
finished and a lock.
Please look closely at what is controlled by the lock.
Although the loop is controlled by
while not finished, the indicator
finished is checked again after the lock is acquired, because, during the time of acquiring the lock,
finished may have been set by another task. After this check finds
finished won’t be changed by another task (because the code that changes
finished is in the locked block).
Also in the locked scope is the fetching of the next item from
in_stream, as well as handling of
StopAsyncIteration. This means that no two tasks are trying to fetch from
in_stream at the same time. If
in_stream has run out, then the task that first knows it will set
True. When another task acquires the lock and intends to fetch from
in_stream, it will see
finished and quit.
The order of output is maintained by
Specifically, once an item is fetched out of
Future object is immediately created and placed in the output queue to “hold the spot” for the item.
After this, the item is processed by
func—this can take as long as needed, since the spot for the result is already secured. Once done, the result is set to the
Please note that placement of the
Future object in the output queue also happens with the lock. Otherwise the order of the outputs can not be guaranteed. The lock also guarantees that the indicator
NO_MORE_DATA is the very last element placed in
Finally, all these background tasks are set up and doing their job. All that remains to be done is wait on the output queue, and yield the results as they become available. One caveat is that the elements in the queue are not results, but rather
Future objects. When a
Future object is taken off the queue, it may not contain result yet, so we
await it and yield the result.
Let’s verify it works.
import asyncio import random import pytest async def f1(x): await asyncio.sleep(random.random() * 0.01) return x + 3.8 @pytest.mark.asyncio async def test_transform(): x = list(range(10000)) expected = [v + 3.8 for v in x] s = transform(stream(x), f1, workers=10) got = [v async for v in s] assert got == expected
transform, a queue holds
Future objects in order to maintain order of the elements.
If the processing of a later element has finished sooner,
it will wait for its turn to be de-queued and yielded, because
the queue is first-in first-out.
This could be suboptimal if the application does not need the output to be in the same order as the input.
These applications may use the “unordered” version below.
async def unordered_transform( in_stream: typing.AsyncIterator[T], func: Callable[[T], Awaitable[Any]], *, workers: int = None, **func_args, ): assert workers > 1 out_stream = asyncio.Queue(workers * 8) lock = asyncio.Lock() finished = False NO_MORE_DATA = object() n_active_workers = workers async def _process(in_stream, lock, out_stream, func, **kwargs): nonlocal finished while not finished: async with lock: if finished: break try: x = await in_stream.__anext__() except StopAsyncIteration: finished = True break y = await func(x, **kwargs) await out_stream.put(y) nonlocal n_active_workers n_active_workers -= 1 if n_active_workers == 0: await out_stream.put(NO_MORE_DATA) t_workers = [ asyncio.create_task(_process( in_stream, lock, out_stream, func, **func_args, )) for _ in range(workers) ] while True: y = await out_stream.get() if y is NO_MORE_DATA: break yield y for t in t_workers: await t
This differs from
transform at two places.
First, it does not put
Future objects in the output queue to “hold spots”, because there is no need to maintain particular order of the elements. The input item is processed, and only the result is placed in the output queue.
Second, the indicator
NO_MORE_DATA is not placed in the output queue as soon as the task sees the end of the input stream.
The reason is that at this moment there may very well be items being processed in other tasks. When they are done and their results enter the queue, they would appear after
NO_MORE_DATA! The solution is to enqueue
NO_MORE_DATA by the very last task that shuts down, and just before it shuts down.
A slight variant to “transformer” is a “sink”, which processes data but does not produce results. Or, more accurately, we don’t care to receive the results. Examples include writing data to files, inserting to databases, sending out emails, etc.
The verb for a “sink” is “drain”, so that’s the name of the function. It simply uses
unordered_transform and ignores their output.
async def drain( in_stream: typing.AsyncIterable[T], func: Callable[[T], Awaitable[Any]], *, workers: int = None, log_every: int = 1000, **func_args, ) -> int: if workers is not None and workers < 2: trans = transform( in_stream, func, workers=workers, **func_args, ) else: trans = unordered_transform( in_stream, func, workers=workers, **func_args, ) n = 0 async for _ in trans: if log_every: n += 1 if n % log_every == 0: print('drained', n) return n
The function reports progress every 1000 elements.
This is not needed in
unordered_transform because the downstream consumer can do that in its own way.
Batch and un-batch
Some operations can take advantage of “vectorization”, i.e., processing many items at once at higher efficiency than processing them one by one. A small convenience function can bundle up input elements ahead of such vectorized functions.
async def batch(in_stream: AsyncIterable, batch_size: int): assert 0 < batch_size <= 10000 batch_ =  n = 0 async for x in in_stream: batch_.append(x) n += 1 if n >= batch_size: yield batch_ batch_ =  n = 0 if n: yield batch_
Suppose a vectorized function takes
list input and produces
list of results, but the function coming next prefers to process one item at a time. The following small utility un-bundles the stream for it.
async def unbatch(in_stream: AsyncIterable): async for batch in in_stream: for x in batch: yield x
unbatch do not have to be used in pairs.
It all depends on the actual need of the pipeline.
Look at the function
It does not consider situations like:
the next item is not coming after a long wait, so go ahead and produce the items collected so far as a smaller batch.
It always collects the specified number of items (with the only exception of the final batch, which may be partial).
This works fine if the input stream always has abundance of supply.
Otherwise there could be scenarios of glaring inefficiency.
Such scenarios could be mitigated by having a “buffer” ahead of it.
A “buffer” is a speed stabilizer. The idea is simple: proactively fetch items from the input stream; if the downstream function does it need them now or consumes them at a lower pace than they are being fetched, just store them temporarily in a private pool.
async def buffer(in_stream: AsyncIterable, buffer_size: int): out_stream = asyncio.Queue(maxsize=buffer_size) NO_MORE_DATA = object() async def buff(in_stream, out_stream): async for x in in_stream: await out_stream.put(x) await out_stream.put(NO_MORE_DATA) t = asyncio.create_task(buff(in_stream, out_stream)) while True: x = await out_stream.get() if x is NO_MORE_DATA: break yield x await t
If the downstream consumer is slow, the function
buff will proactively collect items from
in_stream and stuff
out_stream to its capacity.
Put it all together
Let’s cook up an example to show off all these utilities.
x = [1, 5, 6, 3, 7, 9, 2, 4, 4, 5, 1] async def diff_shoot(batch): return range(max(batch) - min(batch)) async def scale(x): return x * 2 class Sink: def __init__(self): self.sum = 0 async def __call__(self, x): self.sum += x batches = batch(stream(x), 3) shots = transform(batches, diff_shoot, workers=3) flat = buffer(unbatch(shots), 10) doubled = unordered_transform(flat, scale, workers=2) mysink = Sink() n = asyncio.run(drain(doubled, mysink, workers=1)) print('processed', n, 'elements') print('sum is', mysink.sum)
Before running it, let’s figure out what the result should be.
[1, 5, 6]
[3, 7, 9]
[2, 4, 4]
diff_shootwill find the ranges of these bundles to be
5 (= 6 - 1),
6 (= 9 - 3),
2 (= 4 - 2),
4 (= 5 - 1), and produce the ranges
unbatchwill flatten these out into the stream
0, 1, 2, 3, 4, 0, 1, 2, 3, 4, 5, 0, 1, 0, 1, 2, 3
These are 17 numbers and
bufferdoes not change this.
(Note that the upstream of
listobjects, and that’s fine.)
scaledoubles each of these numbers.
mysinkwill add them up.
All in all, the final sum should be 64.
Run this in
In : ...: ...: x = [1, 5, 6, 3, 7, 9, 2, 4, 4, 5, 1] ...: ...: async def diff_shoot(batch): ...: return list(range(max(batch) - min(batch))) ...: ...: ...: async def scale(x): ...: return x * 2 ...: ...: ...: class Sink: ...: def __init__(self): ...: self.sum = 0 ...: ...: async def __call__(self, x): ...: self.sum += x ...: ...: ...: ...: batches = batch(stream(x), 3) ...: shots = transform(batches, diff_shoot, workers=3) ...: flat = buffer(unbatch(shots), 10) ...: doubled = unordered_transform(flat, scale, workers=2) ...: ...: mysink = Sink() ...: ...: n = asyncio.run(drain(doubled, mysink, workers=1)) ...: print('processed', n, 'elements') ...: print('sum is', mysink.sum) ...: processed 17 elements sum is 64 In :
A very nice feature of these functions is that the
AsyncIterator type unites them all. The output of one can be the input of another. As a result, they can be used in a “pipe” fashion in flexible ways.
unordered_transform are like “mappers”, but they don’t have to take one input and generate one output. They could aggregate and expand, with the help of
The final example shows that
drain could behave like a “reducer”. But not quite, as
drain can not produce an output stream. That’s something to think about!