Async fan-out without a bound is just overload with cleaner syntax. Context boundaries and backpressure keep async services honest.
Core answer
Use async context managers to make acquisition and cleanup visible. Use semaphores, queues, or bounded worker patterns when downstream capacity is finite.
# [CURRENT - 3.10-3.14] Works on Python 3.10+import asynciofrom dataclasses import dataclass@dataclass(frozen=True, slots=True)class FetchJob: url: strasync def fetch(job: FetchJob, gate: asyncio.Semaphore) -> str: async with gate: await asyncio.sleep(0.01) return f"ok:{job.url}"async def main() -> None: gate = asyncio.Semaphore(2) jobs = [FetchJob(f"/orders/{index}") for index in range(4)] print(await asyncio.gather(*(fetch(job, gate) for job in jobs)))asyncio.run(main())Why this design exists
Awaitable I/O makes concurrency cheap enough to overuse. Async context managers keep cleanup tied to scope; backpressure protects connection pools, file descriptors, memory, and downstream rate limits.
Mechanics and CPython internals
async with calls __aenter__ and awaits __aexit__ around the block. Semaphore suspends tasks when permits are exhausted rather than creating real CPU parallelism. asyncio.as_completed lets consumers react in completion order when input order is not the required output contract.
# [CURRENT - 3.10-3.14] Works on Python 3.10+import asynciofrom contextlib import asynccontextmanagerfrom dataclasses import dataclassfrom collections.abc import AsyncIterator@dataclass(frozen=True, slots=True)class Lease: name: str@asynccontextmanagerasync def lease(name: str) -> AsyncIterator[Lease]: await asyncio.sleep(0) try: yield Lease(name) finally: await asyncio.sleep(0)async def main() -> None: async with lease("pool-slot") as slot: print(slot)asyncio.run(main())Complexity and tradeoffs
Bounding concurrency limits live tasks and downstream pressure. It can increase tail latency under load because excess work waits. That delay is often the safer tradeoff compared with unbounded retries and collapse.
Idiomatic patterns and refactoring
Refactor unbounded task creation into bounded collection before the first traffic spike proves the point.
# [CURRENT - 3.10-3.14] Works on Python 3.10+import asynciofrom dataclasses import dataclass@dataclass(frozen=True, slots=True)class Page: number: intasync def load(page: Page) -> str: await asyncio.sleep(0.01) return f"page-{page.number}"async def bounded(pages: list[Page], limit: int) -> list[str]: gate = asyncio.Semaphore(limit) async def one(page: Page) -> str: async with gate: return await load(page) return await asyncio.gather(*(one(page) for page in pages))print(asyncio.run(bounded([Page(1), Page(2), Page(3)], 2)))Common mistakes and edge cases
Do not use gather over an unbounded input just because each item is awaitable. Do not swallow cancellation during cleanup. Use asyncio.to_thread() for blocking synchronous boundaries, but remember CPU-bound Python work still contends with the GIL.
When to use / When NOT to use
Use async contexts and concurrency bounds whenever resource lifetime or capacity matters.
Do not add semaphores as decoration around work whose real flow-control contract belongs in a queue, upstream rate limit, or database pool.