Skip to content

AsyncIO, AnyIO & Trio

zloop is an asyncio.AbstractEventLoop. Not "asyncio-like" - the real thing. So the asyncio APIs you reach for day to day - tasks, futures, timers, streams, executors, signals, TLS - work on top of it. And because AnyIO runs on asyncio, AnyIO programs (Starlette, FastAPI, HTTPX) run on zloop too - covered further down.

Let's walk through them, so you can see there are no surprises. (A handful of lower-level loop APIs aren't implemented yet; the compatibility matrix is the full picture.)

Reading the examples

The first example below is complete. The later snippets show just the interesting async def main(): ... body - run any of them by dropping it into the same skeleton:

import asyncio

import zloop

# async def main(): ...  <- the snippet goes here

asyncio.run(main(), loop_factory=zloop.new_event_loop)

asyncio.Runner(loop_factory=...) works too if you want to reuse one loop across several run() calls (see First steps); the body is unchanged.

Tasks and futures

import asyncio

import zloop


async def work(n: int) -> int:
    await asyncio.sleep(0.01 * n)
    return n


async def main():
    # create_task uses zloop's loop, but returns a normal asyncio.Task
    task = asyncio.create_task(work(3))

    # futures, gather, wait_for - all standard
    results = await asyncio.gather(work(1), work(2), task)
    return results


print(asyncio.run(main(), loop_factory=zloop.new_event_loop))
#> [1, 2, 3]

Why this works

zloop reuses CPython's own asyncio.Future and asyncio.Task - the same C-accelerated objects the default loop uses. zloop only replaces the engine (scheduling and I/O), not the coroutine machinery. More on that in What zloop reuses.

Timers

async def main():
    loop = asyncio.get_running_loop()

    loop.call_soon(print, "now")
    loop.call_later(0.1, print, "in 100ms")
    loop.call_at(loop.time() + 0.2, print, "at an absolute time")

    await asyncio.sleep(0.3)

Timers fire in (deadline, insertion order) - the ordering asyncio guarantees. The heap that backs them lives in Zig.

Streams

The high-level streams API works unchanged:

async def main():
    # an echo server
    async def handle(reader, writer):
        data = await reader.read(100)
        writer.write(data)
        await writer.drain()
        writer.close()

    server = await asyncio.start_server(handle, "127.0.0.1", 8888)

    # ... and a client talking to it
    reader, writer = await asyncio.open_connection("127.0.0.1", 8888)
    writer.write(b"hello")
    await writer.drain()
    print(await reader.read(100))  #> b'hello'

    writer.close()
    server.close()


asyncio.run(main(), loop_factory=zloop.new_event_loop)

start_server, open_connection, StreamReader, StreamWriter - they're built on the loop's create_server / create_connection, which zloop implements with its Zig transports.

Running blocking code in threads

run_in_executor and asyncio.to_thread work, which means you can safely call blocking code from async land:

import time


async def main():
    loop = asyncio.get_running_loop()

    # the classic
    result = await loop.run_in_executor(None, time.sleep, 0.1)

    # the modern shortcut
    await asyncio.to_thread(time.sleep, 0.1)

This relies on a real detail

For executors to work, the loop must release the GIL while it waits for I/O - otherwise the worker threads could never run. zloop does exactly that around its blocking poll. (uvloop does too; it's easy to get wrong, so it's worth knowing it's handled.)

Signals

add_signal_handler works on the main thread, just like asyncio's default loop:

import signal


async def main():
    loop = asyncio.get_running_loop()
    stop = asyncio.Event()

    loop.add_signal_handler(signal.SIGINT, stop.set)
    loop.add_signal_handler(signal.SIGTERM, stop.set)

    print("running - press Ctrl+C to stop")
    await stop.wait()
    print("shutting down gracefully ✨")


asyncio.run(main(), loop_factory=zloop.new_event_loop)

TLS

create_server(ssl=...) and create_connection(ssl=...) work, so anything that speaks TLS over asyncio (HTTPS servers, secure clients) works on zloop.

import ssl


async def main():
    ctx = ssl.create_default_context()
    reader, writer = await asyncio.open_connection("example.com", 443, ssl=ctx)
    writer.write(b"GET / HTTP/1.0\r\nHost: example.com\r\n\r\n")
    await writer.drain()
    status_line = (await reader.read(200)).split(b"\r\n")[0]
    print(status_line)  # the HTTP status line from the TLS connection
    writer.close()


asyncio.run(main(), loop_factory=zloop.new_event_loop)

Note

zloop reuses asyncio's own TLS state machine (asyncio.sslproto) on top of its Zig transports, so the handshake and record framing behave like the default loop. A couple of TLS timeout knobs aren't wired through yet - see Compatibility.

With AnyIO

AnyIO is the structured-concurrency layer that sits on top of asyncio (and Trio). It's what Starlette, FastAPI, and HTTPX use internally - so making AnyIO run on zloop means a lot of the ecosystem runs on zloop.

The good news: AnyIO's asyncio backend accepts a loop factory, so this is a clean one-liner.

main.py
import anyio

import zloop


async def main():
    async with anyio.create_task_group() as tg:
        tg.start_soon(anyio.sleep, 0.1)
        tg.start_soon(anyio.sleep, 0.2)
    return "structured concurrency on a Zig loop ✨"


print(
    anyio.run(
        main,
        backend="asyncio",
        backend_options={"loop_factory": zloop.new_event_loop},  # (1)!
    )
)
  1. AnyIO's asyncio backend forwards loop_factory straight to an asyncio.Runner. Same mechanism as everywhere else.

In tests (pytest + anyio)

If you test with the anyio pytest plugin, you select the backend with the anyio_backend fixture. Return a tuple to pass options - including the loop factory:

conftest.py
import pytest

import zloop


@pytest.fixture
def anyio_backend():
    return ("asyncio", {"loop_factory": zloop.new_event_loop})  # (1)!
  1. Now every @pytest.mark.anyio test in your suite runs on zloop. This is exactly how we run uvicorn's own test suite against zloop.
test_something.py
import asyncio

import pytest


@pytest.mark.anyio
async def test_runs_on_zloop():
    assert type(asyncio.get_running_loop()).__module__.startswith("zloop")

Tip

This fixture trick is the easiest way to run an existing asyncio/AnyIO test suite on zloop and see what happens. Suites that stick to TCP/TLS sockets, streams, tasks, and timers should pass unchanged; if a suite reaches for UDP, subprocesses, pipes, or the sock_* helpers it'll hit zloop's unimplemented APIs. 🙂

What about Trio?

Short answer: Trio code can't run on zloop directly, but Trio-style AnyIO code can.

Trio is not built on asyncio - it has its own runtime and its own event loop. zloop is an asyncio loop, and an asyncio loop can only drive asyncio tasks, so there's no way to plug zloop into Trio's runner. Pure-Trio programs (trio.run(...), trio.open_nursery(), Trio channels) keep using Trio's loop.

What does work is code written against AnyIO's API. AnyIO gives you one structured-concurrency API (task groups, cancel scopes, streams) that runs on either backend - so the same AnyIO code runs on Trio or on asyncio, and when you pick the asyncio backend, it runs on zloop:

anyio_on_zloop.py
import anyio

import zloop


async def main():
    async with anyio.create_task_group() as tg:  # Trio-style structured concurrency
        tg.start_soon(anyio.sleep, 0.1)
    return "AnyIO code, asyncio backend, Zig loop ✨"


# backend="trio" would use Trio's loop instead - and ignore zloop.
print(anyio.run(main, backend="asyncio", backend_options={"loop_factory": zloop.new_event_loop}))

Rule of thumb

If your code imports trio and calls trio.run, it's on Trio's loop, not zloop. If it imports anyio and you select the asyncio backend, it's on zloop. The trio-asyncio bridge can host an asyncio loop inside a Trio program, but that's a niche setup and not something zloop targets.

What about the low-level loop methods?

Methods like loop.add_reader, loop.remove_reader, loop.add_writer, loop.create_future, and loop.create_task are all implemented. The handful of rarely-used sock_* coroutine helpers (sock_recv, sock_sendall, …) raise NotImplementedError, matching the behavior of an unsupported method - see Compatibility for the full matrix.