AsyncIO, AnyIO & Trio¶
zloop is an asyncio.AbstractEventLoop. Not "asyncio-like" - the real thing. So
the asyncio APIs you reach for day to day - tasks, futures, timers, streams,
executors, signals, TLS - work on top of it. And because
AnyIO runs on asyncio, AnyIO programs (Starlette,
FastAPI, HTTPX) run on zloop too - covered further down.
Let's walk through them, so you can see there are no surprises. (A handful of lower-level loop APIs aren't implemented yet; the compatibility matrix is the full picture.)
Reading the examples
The first example below is complete. The later snippets show just the
interesting async def main(): ... body - run any of them by dropping it
into the same skeleton:
import asyncio
import zloop
# async def main(): ... <- the snippet goes here
asyncio.run(main(), loop_factory=zloop.new_event_loop)
asyncio.Runner(loop_factory=...) works too if you want to reuse one loop
across several run() calls (see First steps); the body is
unchanged.
Tasks and futures¶
import asyncio
import zloop
async def work(n: int) -> int:
await asyncio.sleep(0.01 * n)
return n
async def main():
# create_task uses zloop's loop, but returns a normal asyncio.Task
task = asyncio.create_task(work(3))
# futures, gather, wait_for - all standard
results = await asyncio.gather(work(1), work(2), task)
return results
print(asyncio.run(main(), loop_factory=zloop.new_event_loop))
#> [1, 2, 3]
Why this works
zloop reuses CPython's own asyncio.Future and asyncio.Task - the same
C-accelerated objects the default loop uses. zloop only replaces the
engine (scheduling and I/O), not the coroutine machinery. More on that in
What zloop reuses.
Timers¶
async def main():
loop = asyncio.get_running_loop()
loop.call_soon(print, "now")
loop.call_later(0.1, print, "in 100ms")
loop.call_at(loop.time() + 0.2, print, "at an absolute time")
await asyncio.sleep(0.3)
Timers fire in (deadline, insertion order) - the ordering asyncio guarantees.
The heap that backs them lives in Zig.
Streams¶
The high-level streams API works unchanged:
async def main():
# an echo server
async def handle(reader, writer):
data = await reader.read(100)
writer.write(data)
await writer.drain()
writer.close()
server = await asyncio.start_server(handle, "127.0.0.1", 8888)
# ... and a client talking to it
reader, writer = await asyncio.open_connection("127.0.0.1", 8888)
writer.write(b"hello")
await writer.drain()
print(await reader.read(100)) #> b'hello'
writer.close()
server.close()
asyncio.run(main(), loop_factory=zloop.new_event_loop)
start_server, open_connection, StreamReader, StreamWriter - they're built
on the loop's create_server / create_connection, which zloop implements with
its Zig transports.
Running blocking code in threads¶
run_in_executor and asyncio.to_thread work, which means you can safely call
blocking code from async land:
import time
async def main():
loop = asyncio.get_running_loop()
# the classic
result = await loop.run_in_executor(None, time.sleep, 0.1)
# the modern shortcut
await asyncio.to_thread(time.sleep, 0.1)
This relies on a real detail
For executors to work, the loop must release the GIL while it waits for I/O - otherwise the worker threads could never run. zloop does exactly that around its blocking poll. (uvloop does too; it's easy to get wrong, so it's worth knowing it's handled.)
Signals¶
add_signal_handler works on the main thread, just like asyncio's default loop:
import signal
async def main():
loop = asyncio.get_running_loop()
stop = asyncio.Event()
loop.add_signal_handler(signal.SIGINT, stop.set)
loop.add_signal_handler(signal.SIGTERM, stop.set)
print("running - press Ctrl+C to stop")
await stop.wait()
print("shutting down gracefully ✨")
asyncio.run(main(), loop_factory=zloop.new_event_loop)
TLS¶
create_server(ssl=...) and create_connection(ssl=...) work, so anything that
speaks TLS over asyncio (HTTPS servers, secure clients) works on zloop.
import ssl
async def main():
ctx = ssl.create_default_context()
reader, writer = await asyncio.open_connection("example.com", 443, ssl=ctx)
writer.write(b"GET / HTTP/1.0\r\nHost: example.com\r\n\r\n")
await writer.drain()
status_line = (await reader.read(200)).split(b"\r\n")[0]
print(status_line) # the HTTP status line from the TLS connection
writer.close()
asyncio.run(main(), loop_factory=zloop.new_event_loop)
Note
zloop reuses asyncio's own TLS state machine (asyncio.sslproto) on top of
its Zig transports, so the handshake and record framing behave like the
default loop. A couple of TLS timeout knobs aren't wired through yet - see
Compatibility.
With AnyIO¶
AnyIO is the structured-concurrency layer that sits on top of asyncio (and Trio). It's what Starlette, FastAPI, and HTTPX use internally - so making AnyIO run on zloop means a lot of the ecosystem runs on zloop.
The good news: AnyIO's asyncio backend accepts a loop factory, so this is a clean one-liner.
import anyio
import zloop
async def main():
async with anyio.create_task_group() as tg:
tg.start_soon(anyio.sleep, 0.1)
tg.start_soon(anyio.sleep, 0.2)
return "structured concurrency on a Zig loop ✨"
print(
anyio.run(
main,
backend="asyncio",
backend_options={"loop_factory": zloop.new_event_loop}, # (1)!
)
)
- AnyIO's asyncio backend forwards
loop_factorystraight to anasyncio.Runner. Same mechanism as everywhere else.
In tests (pytest + anyio)¶
If you test with the anyio pytest plugin, you select the backend with the
anyio_backend fixture. Return a tuple to pass options - including the loop
factory:
import pytest
import zloop
@pytest.fixture
def anyio_backend():
return ("asyncio", {"loop_factory": zloop.new_event_loop}) # (1)!
- Now every
@pytest.mark.anyiotest in your suite runs on zloop. This is exactly how we run uvicorn's own test suite against zloop.
import asyncio
import pytest
@pytest.mark.anyio
async def test_runs_on_zloop():
assert type(asyncio.get_running_loop()).__module__.startswith("zloop")
Tip
This fixture trick is the easiest way to run an existing asyncio/AnyIO test
suite on zloop and see what happens. Suites that stick to TCP/TLS sockets,
streams, tasks, and timers should pass unchanged; if a suite reaches for
UDP, subprocesses, pipes, or the sock_* helpers it'll hit zloop's
unimplemented APIs. 🙂
What about Trio?¶
Short answer: Trio code can't run on zloop directly, but Trio-style AnyIO code can.
Trio is not built on asyncio - it has its own
runtime and its own event loop. zloop is an asyncio loop, and an asyncio loop
can only drive asyncio tasks, so there's no way to plug zloop into Trio's runner.
Pure-Trio programs (trio.run(...), trio.open_nursery(), Trio channels) keep
using Trio's loop.
What does work is code written against AnyIO's API. AnyIO gives you one structured-concurrency API (task groups, cancel scopes, streams) that runs on either backend - so the same AnyIO code runs on Trio or on asyncio, and when you pick the asyncio backend, it runs on zloop:
import anyio
import zloop
async def main():
async with anyio.create_task_group() as tg: # Trio-style structured concurrency
tg.start_soon(anyio.sleep, 0.1)
return "AnyIO code, asyncio backend, Zig loop ✨"
# backend="trio" would use Trio's loop instead - and ignore zloop.
print(anyio.run(main, backend="asyncio", backend_options={"loop_factory": zloop.new_event_loop}))
Rule of thumb
If your code imports trio and calls trio.run, it's on Trio's loop, not
zloop. If it imports anyio and you select the asyncio backend, it's on
zloop. The trio-asyncio bridge can
host an asyncio loop inside a Trio program, but that's a niche setup and not
something zloop targets.
What about the low-level loop methods?¶
Methods like loop.add_reader, loop.remove_reader, loop.add_writer,
loop.create_future, and loop.create_task are all implemented. The handful of
rarely-used sock_* coroutine helpers (sock_recv, sock_sendall, …) raise
NotImplementedError, matching the behavior of an unsupported method - see
Compatibility for the full matrix.