RxPY Cohabitation with AsyncIO

With AsyncIO being more and more popular, and an ecosystem that grows rapidly, we regularly see questions on how RxPY can cohabit with AsyncIO libraries. This post provides some elements of answer on this topic.

The RxPY documentation contains an example of a dual use of AsyncIO and RxPY. However it is possible to generalize the transition between the two frameworks for several use-cases.

Create an Observable from an asynchronous iterable

This is probably the most frequent need: You want to use an AsyncIO library that exposes an asynchronous iterable, and you want to use it as an RxPY Observable. Let's see how to implement a factory operator that creates an Observable from an asynchronous iterable. This is as simple as 15 lines of code:

def from_aiter(iter, loop):
    def on_subscribe(observer, scheduler):
        async def _aio_sub():
            try:
                async for i in iter:
                    observer.on_next(i)
                loop.call_soon(
                    observer.on_completed)
            except Exception as e:
                loop.call_soon(
                    functools.partial(observer.on_error, e))

        task = asyncio.ensure_future(_aio_sub(), loop=loop)
        return Disposable(lambda: task.cancel())

    return rx.create(on_subscribe)

This operator has a classical structure: It uses the create operator and the emission of items is done in the subscription callback. Our operator takes two parameters: The Iterable object that we want to convert, and the AsyncIO event loop.

The subscription callback schedules the _aio_sub coroutine that will emit the items. It returns a disposable that cancels the task so that the coroutine is cancelled when the Observable is disposed.

The _aio_sub coroutine iterates on the asynchronous iterable, and emits items on the Observable for each item read from the iterable. When the iterable completes, we complete the observable. Catching exceptions allows to forward any error to the Observable.

The on_completed and on_error methods are not called directly, but scheduled on the event loop so that the task completes before Observers handle the error. This prevents AsyncIO runtime warnings if the event loop is stopped in the context of these calls.

Here is a full example using this operator:

import asyncio
import functools
import rx
from rx.scheduler.eventloop import AsyncIOScheduler
from rx.disposable import Disposable


def from_aiter(iter, loop):
    def on_subscribe(observer, scheduler):
        async def _aio_sub():
            try:
                async for i in iter:
                    observer.on_next(i)
                loop.call_soon(
                    observer.on_completed)
            except Exception as e:
                loop.call_soon(
                    functools.partial(observer.on_error, e))

        task = asyncio.ensure_future(_aio_sub(), loop=loop)
        return Disposable(lambda: task.cancel())

    return rx.create(on_subscribe)


async def ticker(delay, to):
    """Yield numbers from 0 to `to` every `delay` seconds."""
    for i in range(to):
        yield i
        await asyncio.sleep(delay)


async def main(loop):
    done = asyncio.Future()

    def on_completed():
        print("completed")
        done.set_result(0)

    disposable = from_aiter(ticker(0.1, 10), loop).subscribe(
        on_next=lambda i: print("next: {}".format(i)),
        on_error=lambda e: print("error: {}".format(e)),
        on_completed=on_completed,
    )

    await done
    disposable.dispose()

loop = asyncio.get_event_loop()
loop.run_until_complete(main(loop))

The ticker function is an asynchronous generator. It yields an item each 100ms.

Converting an Observable to an asynchronous iterable

Now let's see how to do the opposite thing: Converting an Observable to an asynchronous iterable. While the former conversion was quite simple, this one is less obvious. Converting an Observable to an asynchronous iterable means implementing an Asynchronous Generator. They are defined in PEP 525. The problem with Asynchronous Generators, is that they can only be suspended via an await statement. This means that they must wait for a coroutine or a future for each item that they yield. Unfortunately this does not fit the behavior of an Observable: Asynchronous generators work with coroutines/futures while Observables work with subscription callbacks. The consequence is that a direct binding between the two frameworks is not possible.

However, this does not mean that it is not possible. AsyncIO implements a class that can help in our case: Queue. An AsyncIO queue behaves as a normal queue, but the receiver await for each items. This allows to convert an Observable to an Asynchronous Generator:

async def to_agen(obs, loop):
    queue = asyncio.Queue()

    def on_next(i):
        queue.put_nowait(i)

    disposable = obs.pipe(ops.materialize()).subscribe(
        on_next=on_next,
        scheduler=AsyncIOScheduler(loop=loop)
    )

    while True:
        i = await queue.get()
        if isinstance(i, OnNext):
            yield i.value
            queue.task_done()
        elif isinstance(i, OnError):
            disposable.dispose()
            raise(Exception(i.value))
        else:
            disposable.dispose()
            break

to_agen is an asynchronous generator: It is a coroutine that yields items. It takes an Observable and the event loop as input parameters. When being called, it creates a Queue, and subscribes to the materialized Observable. Materialize is necessary to push items, completion and errors in the queue, not just the items.

Then the generator iterates on the items of the queue, and depending on the type for item, it either yields it, returns, or raises an exception.

This operator can be used easily:

import asyncio
import os
import io
import rx
import rx.operators as ops
from rx.core.notification import OnNext, OnError, OnCompleted
from rx.scheduler.eventloop import AsyncIOScheduler


async def to_agen(obs, loop):
    queue = asyncio.Queue()

    def on_next(i):
        queue.put_nowait(i)

    disposable = obs.pipe(ops.materialize()).subscribe(
        on_next=on_next,
        scheduler=AsyncIOScheduler(loop=loop)
    )

    while True:
        i = await queue.get()
        if isinstance(i, OnNext):
            yield i.value
            queue.task_done()
        elif isinstance(i, OnError):
            disposable.dispose()
            raise(Exception(i.value))
        else:
            disposable.dispose()
            break


async def main(loop):
    gen = to_agen(rx.from_([1, 2, 3, 4]), loop)
    async for i in gen:
        print(i)

    print("done")


loop = asyncio.get_event_loop()
loop.run_until_complete(main(loop))

Wrapping Observable via asynchronous generators is great because it respects the behavior of Observables subscription: Generators are executed lazily, at the first iteration. So the Observable is subscribed only when the generator is iterated, not created.

Back-pressure on an Async Iterable

Finally let's see a possible way to handle back-pressure. Back-pressure is a complex topic, and its support has been removed from RxPY v3 so that dedicated additional packages can be written. The major difficulty with reactive programming and back-pressure is that it is often implemented as a bidirectional communication: The - usual - downstream direction is used to emit items, and an upstream direction allows the Observer to control the emission. In other words, back-pressure is often implemented by changing push based Observables to pull based solutions.

Alternatively, back-pressure can be implemented with feedback loops: In such solutions, there is no change in the implementation of the Observables, but the applications are written as Directed Cycle Graphs. This cycle is the way to control the emission of the items. While such a solution requires more design work for the application developer, I think that it is a better fit of reactive programming. So let's see how to implement backpressure in RxPY on a source that is an asynchronous iterable.

If you have an Asynchronous Iterable that emits items faster than your Reactive code can consume, then back-pressure via a feedback loop can be implemented with few modifications in the from_aiter operator that we created earlier. The idea consists in emitting items each time an item is received on a feedback Observable, instead of being driven by the Iterable.

Here is the new implementation that supports both immediate and feedback modes:

def from_aiter(iter, feedback, loop):
    def on_subscribe(observer, scheduler):
        async def _aio_sub():
            ...

        async def _aio_next():
            try:
                i = await iter.__anext__()
                observer.on_next(i)
            except StopAsyncIteration:
                observer.on_completed()
            except Exception as e:
                observer.on_error(e)

        if feedback is not None:
            return feedback.subscribe(
                on_next=lambda i: asyncio.ensure_future(
                    _aio_next(), loop=loop)
            )
        else:
            task = asyncio.ensure_future(_aio_sub(), loop=loop)
            return Disposable(lambda: task.cancel())

    return rx.create(on_subscribe)

If a feedback Observable is provided, then for each received item on the feedback Observable, the next item is awaited from the asynchronous Iterable. The received item is then emitted on the Observable.

This new behavior can be used this way:

import asyncio
import functools
import rx
from rx.scheduler.eventloop import AsyncIOScheduler
from rx.disposable import Disposable
from rx.subject import Subject
import rx.operators as ops


def from_aiter(iter, feedback, loop):
    def on_subscribe(observer, scheduler):
        async def _aio_sub():
            try:
                async for i in iter:
                    observer.on_next(i)
                loop.call_soon(observer.on_completed)
            except Exception as e:
                loop.call_soon(functools.partial(
                    observer.on_error, e))

        async def _aio_next():
            try:
                i = await iter.__anext__()
                observer.on_next(i)
            except StopAsyncIteration:
                observer.on_completed()
            except Exception as e:
                observer.on_error(e)

        if feedback is not None:
            return feedback.subscribe(
                on_next=lambda i: asyncio.ensure_future(
                    _aio_next(), loop=loop)
            )
        else:
            task = asyncio.ensure_future(_aio_sub(), loop=loop)
            return Disposable(lambda: task.cancel())

    return rx.create(on_subscribe)


async def ticker(delay, to):
    """Yield numbers from 0 to `to` every `delay` seconds."""
    for i in range(to):
        yield i
        await asyncio.sleep(delay)


async def main(loop):
    fb = Subject()
    done = asyncio.Future()

    def on_completed():
        print("completed")
        done.set_result(0)

    fb_bootstap = rx.from_([True])
    obs = from_aiter(
        ticker(0.1, 5),
        rx.concat(fb_bootstap, fb), loop).pipe(
            ops.share(),
            ops.delay(1.0),
        )

    disposable1 = obs.subscribe(fb,
        scheduler=AsyncIOScheduler(loop=loop))
    disposable2 = obs.subscribe(
        on_next=lambda i: print("next: {}".format(i)),
        on_error=lambda e: print("error: {}".format(e)),
        on_completed=on_completed,
        scheduler=AsyncIOScheduler(loop=loop),
    )

    await done
    disposable1.dispose()
    disposable2.dispose()

loop = asyncio.get_event_loop()
loop.run_until_complete(main(loop))

In this example, the feedback Observable is a Subject. An Observable is created from the ticker asynchronous iterable. We emulate slow processing with the delay operator. The feedback loop is bootstrapped with a single item, meaning that at most one item is processed at a time. More items can be processed simultaneously (in terms of io multiplexing) by emitting more items in fb_bootstrap.

Conclusion

RxPY and AsyncIO can complement themselves quite easily. The examples presented here should cover many needs when a conversion between RxPY and AsyncIO is necessary.