Async Python has a reputation for being hard, and most of that reputation is earned in the wrong situations. People reach for asyncio because it sounds fast, hit a wall when their CPU-bound script doesn’t speed up, and conclude the whole thing is a trap. It isn’t a trap — it’s a tool with a narrow blade. Today we figure out where the blade actually cuts: I/O-bound work, and specifically I/O-bound work where you have many things waiting on the network at the same time.
This is the first of three lessons closing Module 7 — data engineering. After this we look at orchestrators (Airflow, Prefect, Dagster), and then we build a real pipeline.
The one sentence summary
asyncio is for I/O concurrency: many operations that mostly sit waiting for something to come back over a socket. Fetching 100 URLs, reading from a Postgres pool, talking to Redis, scraping a slow API. If your bottleneck is waiting, async helps. If your bottleneck is computing — a tight loop multiplying numbers, parsing JSON in a hot path, image processing — async won’t help, and you want multiprocessing or, in 3.13, free-threaded Python (we’ll come back to this).
The reason async helps for I/O is mechanical. When code does requests.get(url), the OS thread sits blocked until the response arrives. With async, await client.get(url) releases the event loop while the kernel waits on the socket; the loop runs other coroutines until something has data. One thread, one process, hundreds of in-flight requests.
The basics: async def, await, asyncio.run
import asyncio
import httpx
async def fetch(url: str) -> int:
async with httpx.AsyncClient() as client:
response = await client.get(url)
return response.status_code
async def main() -> None:
code = await fetch("https://example.com")
print(code)
asyncio.run(main())
Three things to notice. async def declares a coroutine; calling it returns a coroutine object, it doesn’t run yet. await is what actually runs it — and it’s the suspension point where the event loop can switch to another task. asyncio.run(main()) is the entry point: it creates an event loop, runs main to completion, and tears the loop down. In Python 3.13 this is the canonical way; you almost never create a loop manually anymore.
The example above is async but not concurrent — there’s only one fetch. The win comes when you have many.
asyncio.gather: parallel I/O
async def fetch_many(urls: list[str]) -> list[int]:
async with httpx.AsyncClient() as client:
tasks = [client.get(url) for url in urls]
responses = await asyncio.gather(*tasks)
return [r.status_code for r in responses]
gather schedules all the coroutines, waits for all of them, and returns their results in order. If you have 100 URLs, you’ll fire 100 requests roughly at once, each waiting on its own socket, and the whole thing finishes in roughly the time of the slowest single request — not the sum.
This is the moment people fall in love with async. It’s also the moment they accidentally DDoS an API. Which brings us to the next pattern.
asyncio.Semaphore: limit your concurrency
Almost every API has a rate limit. Almost every database has a connection pool ceiling. “Fire 1000 requests at once” is a great way to get banned, throttled, or to exhaust file descriptors. Use a semaphore.
async def fetch_with_limit(client: httpx.AsyncClient, sem: asyncio.Semaphore, url: str) -> int:
async with sem:
response = await client.get(url)
return response.status_code
async def fetch_many(urls: list[str], concurrency: int = 10) -> list[int]:
sem = asyncio.Semaphore(concurrency)
async with httpx.AsyncClient(timeout=30) as client:
tasks = [fetch_with_limit(client, sem, url) for url in urls]
return await asyncio.gather(*tasks)
async with sem lets at most concurrency coroutines into the protected block at once; the rest wait. Now you can hand it 1000 URLs and only 10 are in flight at any moment. Tune concurrency to whatever your target service tolerates — 10 is conservative, 50 is reasonable for a friendly API, 100+ if you really know what’s on the other side.
asyncio.Queue: producer / consumer
When the work isn’t a fixed list — a stream of jobs arriving over time, or work generated by other work — a queue fits better than gather.
async def producer(queue: asyncio.Queue, urls: list[str]) -> None:
for url in urls:
await queue.put(url)
for _ in range(NUM_WORKERS):
await queue.put(None) # sentinel per worker
async def consumer(queue: asyncio.Queue, client: httpx.AsyncClient, results: list) -> None:
while True:
url = await queue.get()
if url is None:
queue.task_done()
break
response = await client.get(url)
results.append((url, response.status_code))
queue.task_done()
NUM_WORKERS = 10
async def main(urls: list[str]) -> list[tuple[str, int]]:
queue: asyncio.Queue = asyncio.Queue(maxsize=100)
results: list = []
async with httpx.AsyncClient() as client:
workers = [asyncio.create_task(consumer(queue, client, results)) for _ in range(NUM_WORKERS)]
await producer(queue, urls)
await asyncio.gather(*workers)
return results
The pattern: a producer puts work onto a bounded queue; N consumers pull from it. Bounded so the producer can’t fill memory faster than consumers drain it. The None sentinels are the simplest way to tell consumers to stop; in production you’d use cancellation, but the sentinel pattern is fine for scripts.
Async context managers and async iterators
Lesson 6 covered with and iterators. Async has the equivalents. Anything that does I/O on entry or exit becomes async with:
async with httpx.AsyncClient() as client:
...
async with asyncpg.create_pool(dsn) as pool:
...
Anything that streams data lazily becomes async for:
async with httpx.AsyncClient() as client:
async with client.stream("GET", url) as response:
async for chunk in response.aiter_bytes():
process(chunk)
These aren’t separate concepts — they’re the async-flavored versions of patterns you already know. The keyword async in front means “this can suspend the loop while we’re entering / exiting / iterating.”
The libraries that matter
You can’t await requests.get(...) — requests is sync. The whole point of async is that everything in the chain is non-blocking. So the ecosystem has a parallel set of libraries:
- HTTP:
httpx(sync and async in one library, the modern default),aiohttp(async-first, older, still excellent for servers). - Postgres:
asyncpgis the fastest, full stop. Orpsycopg3 with the async API. - Redis:
redis-pyhas built-in async since 4.x. - MongoDB:
motor, the official async driver. - S3 / cloud:
aioboto3,aiobotocore. - Files:
aiofiles, but disk I/O is usually not where you want async — see the next section.
If a library you need doesn’t have an async version, you have two options: run the sync version in a worker thread, or skip async entirely.
The sync / async mixing trap
This is where people get hurt. Two rules.
Rule one: don’t call blocking sync code from inside a coroutine. If your async function calls time.sleep(5) or requests.get(url), the entire event loop stops for those 5 seconds. Every other in-flight task pauses. The whole illusion collapses.
# Bad — blocks the event loop
async def fetch(url):
return requests.get(url).text
# Good — uses an async client
async def fetch(url):
async with httpx.AsyncClient() as client:
return (await client.get(url)).text
Rule two: if you absolutely must call sync code, push it to a thread.
import asyncio
def expensive_sync_thing(path: str) -> bytes:
with open(path, "rb") as f: # blocking file I/O
return f.read()
async def main():
data = await asyncio.to_thread(expensive_sync_thing, "big.bin")
asyncio.to_thread runs the function in the default thread pool and gives you back an awaitable. The event loop keeps running other tasks while the thread does its blocking work. Use this for: filesystem operations, sync database drivers you can’t replace, CPU work that’s small (bigger CPU work should go to multiprocessing or a process pool).
Going the other direction — calling async from sync — needs asyncio.run, but only if no loop is already running. Inside an existing loop, you don’t call asyncio.run again; you await directly. Confusion here is responsible for half the “RuntimeError: This event loop is already running” questions on Stack Overflow.
asyncio vs threading vs multiprocessing vs no-GIL
For 2026, the picture is:
asyncio: I/O concurrency, single-threaded. Best when you have many concurrent network operations and a fully-async stack.threading: I/O concurrency without the async coloring; works with sync libraries. The GIL means it doesn’t help CPU-bound code in standard CPython.multiprocessing: True parallelism for CPU-bound work, at the cost of process overhead and serialization.- Free-threaded Python (3.13+, opt-in): A no-GIL build. Threads can run Python bytecode in parallel for real. Still experimental, still has performance trade-offs on the single-threaded path, but it’s landing. For CPU-bound work in 2026, this is the future; for I/O concurrency, it doesn’t change the calculus much — async is still cleaner.
Rule of thumb: I/O-bound and lots of it, async. CPU-bound, processes (or no-GIL when stable). I/O-bound and small, just write sync code and stop worrying.
When NOT to use asyncio
Most Python scripts. A nightly batch job that hits one API and writes a CSV doesn’t need async. A web scraper that fetches 5 pages doesn’t need async. A data pipeline that reads a Parquet file and writes to Postgres doesn’t need async.
The async tax is real: it colors your function signatures, complicates testing, makes stack traces harder to read, and forces every library you use to either be async or bridgeable to async. Pay that tax when the concurrency is the point. Skip it when it isn’t.
A real example: 1000 endpoints, politely
To close, the canonical script:
import asyncio
import httpx
async def fetch(client: httpx.AsyncClient, sem: asyncio.Semaphore, url: str) -> dict:
async with sem:
try:
response = await client.get(url, timeout=10)
return {"url": url, "status": response.status_code, "len": len(response.content)}
except httpx.HTTPError as e:
return {"url": url, "status": None, "error": str(e)}
async def main(urls: list[str]) -> list[dict]:
sem = asyncio.Semaphore(20)
async with httpx.AsyncClient(http2=True) as client:
tasks = [fetch(client, sem, url) for url in urls]
return await asyncio.gather(*tasks)
if __name__ == "__main__":
urls = [f"https://api.example.com/items/{i}" for i in range(1000)]
results = asyncio.run(main(urls))
ok = sum(1 for r in results if r["status"] == 200)
print(f"{ok}/{len(results)} succeeded")
That’s the shape. Semaphore for politeness, single client for connection reuse, exceptions caught per-task so one failure doesn’t sink the whole batch, results aggregated at the end. Run that on 1000 URLs and you’ll see something like 30-second total time on a job that, sequentially, would take 20 minutes.
Next lesson: when even a clever async script isn’t enough, and you need a real orchestrator. Airflow, Prefect, Dagster, and how to choose.
Citations: asyncio docs, httpx async docs, PEP 703 — making the GIL optional. Retrieved 2026-05-01.