asyncio without accidental overload

Use TaskGroup, gather, queues, and semaphores with clear failure boundaries

`TaskGroup` (PEP 654, Python 3.11) provides structured concurrency: if one task raises an unhandled exception, all sibling tasks are cancelled, and an `ExceptionGroup` is raised once all tasks complete. This is fundamentally different from `asyncio.gather()`, which collects results or individual exceptions but does not cancel siblings on failure. `TaskGroup` uses an `async with` block as the failure boundary. Tasks created within the group are owned by it. On exit, the group awaits all tasks. On exception, it cancels remaining tasks. For concurrency limits, pair `TaskGroup` with `asyncio.Semaphore`. For blocking work, use `asyncio.to_thread()`. <a href="/async-context-backpressure">Learn backpressure and semaphore patterns</a>. <a href="/async-foundations-awaitables">Review awaitable basics first</a>. <a href="/runtime-gil-performance">Understand threading vs async tradeoffs</a>.

Understand.
Visualize.
Master.

Python in Depth

An interactive engineering reference for Python internals

Quick note

Tasks need an owner or they become cleanup debt.

:)
Python version

Targets Python 3.10–3.14. Python 3.9 and below are End-of-Life.

TABLE OF CONTENTS
6.7asyncio without accidental overload

Use TaskGroup, gather, queues, and semaphores with clear failure boundaries

TaskGroup gives related child tasks one failure boundary. That is different from merely collecting awaitables into a result list.

Core answer

Use TaskGroup in Python 3.11+ when subtasks belong to one parent operation and should finish or fail together.

# [CURRENT - 3.11-3.14] Requires Python 3.11+ [PEP 654]
import asyncio
from dataclasses import dataclass
@dataclass(frozen=True, slots=True)
class ProfilePage:
user: str
orders: tuple[str, ...]
async def user(user_id: str) -> str:
await asyncio.sleep(0.01)
return user_id
async def orders(user_id: str) -> tuple[str, ...]:
await asyncio.sleep(0.01)
return (f"{user_id}:ORD-1",)
async def page(user_id: str) -> ProfilePage:
async with asyncio.TaskGroup() as group:
user_task = group.create_task(user(user_id))
order_task = group.create_task(orders(user_id))
return ProfilePage(user_task.result(), order_task.result())
print(asyncio.run(page("U-7")))

Why this design exists

Structured concurrency makes child-task ownership visible. TaskGroup uses cancellation and exception grouping so failures do not silently leave sibling work detached. PEP 654 supplies the ExceptionGroup and except* machinery that makes multiple child failures representable.

Mechanics and CPython internals

The task group is an asynchronous context manager. On non-cancellation child failure, it cancels siblings, waits for shutdown, and raises grouped exceptions after the boundary. Cancellation is ordinary async control flow, so cleanup belongs in try/finally.

# [CURRENT - 3.11-3.14] Requires Python 3.11+ [PEP 654]
import asyncio
from dataclasses import dataclass
@dataclass(frozen=True, slots=True)
class Dependency:
name: str
fail: bool
async def call(dep: Dependency) -> str:
await asyncio.sleep(0.01)
if dep.fail:
raise OSError(dep.name)
return dep.name
async def main() -> None:
try:
async with asyncio.TaskGroup() as group:
group.create_task(call(Dependency("profile", True)))
group.create_task(call(Dependency("orders", False)))
except* OSError as failures:
print(len(failures.exceptions))
asyncio.run(main())

Complexity and tradeoffs

Task creation and cancellation have overhead, but the key tradeoff is failure semantics. gather is useful for aggregation; TaskGroup is useful for ownership. A task group does not bound fan-out by itself, so pair it with semaphores or queues when capacity matters.

Idiomatic patterns and refactoring

Refactor fire-and-forget sibling tasks into one context boundary.

# [CURRENT - 3.11-3.14] Requires Python 3.11+ [PEP 654]
import asyncio
from dataclasses import dataclass
@dataclass(frozen=True, slots=True)
class Audit:
order_id: str
async def write_audit(audit: Audit) -> None:
await asyncio.sleep(0)
print(audit.order_id)
async def detached(audit: Audit) -> None:
asyncio.create_task(write_audit(audit))
await asyncio.sleep(0)
async def owned(audit: Audit) -> None:
async with asyncio.TaskGroup() as group:
group.create_task(write_audit(audit))
asyncio.run(owned(Audit("ORD-7")))

Common mistakes and edge cases

Do not swallow CancelledError without a reason and cleanup plan. Do not expect TaskGroup to preserve input result order as an aggregation API. Do not create unbounded child tasks and call it structured enough.

When to use / When NOT to use

Use TaskGroup for one operation with related child tasks and explicit failure ownership.

Do not use it when a sequential await is clearer or when aggregation semantics from gather are the actual requirement.

Further reading

  • Official docs: task groups
  • Official docs: cancellation
  • PEP 654: exception groups and except*
  • CPython source: task groups
BOARD NOTESContext
WHY NO BENCHMARK?

This topic is better taught with structure, semantics, and cross-references than with a synthetic chart.

Tasks need an owner or they become cleanup debt.

RELATED GUIDES
NEXT CHECKS
Contribute