Python AsyncIO: Master Asynchronous Programming
Learn Python async/await for concurrent programming. Master coroutines, tasks, event loops, and build high-performance async applications.
Moshiour Rahman
Advertisement
What is AsyncIO?
AsyncIO is Python’s built-in library for writing concurrent code using the async/await syntax. It enables efficient handling of I/O-bound operations without multi-threading complexity.
Sync vs Async
| Synchronous | Asynchronous |
|---|---|
| Blocking I/O | Non-blocking I/O |
| One task at a time | Multiple tasks concurrently |
| Simpler code | More scalable |
| Threads for concurrency | Single-threaded concurrency |
Getting Started
Basic Async Function
import asyncio
async def hello():
print("Hello")
await asyncio.sleep(1) # Non-blocking sleep
print("World")
# Run the coroutine
asyncio.run(hello())
Coroutines and Tasks
import asyncio
async def fetch_data(delay: int, data: str) -> str:
print(f"Fetching {data}...")
await asyncio.sleep(delay)
print(f"Got {data}")
return data
async def main():
# Sequential execution
result1 = await fetch_data(2, "data1")
result2 = await fetch_data(2, "data2")
# Total time: 4 seconds
# Concurrent execution with gather
results = await asyncio.gather(
fetch_data(2, "data1"),
fetch_data(2, "data2"),
fetch_data(2, "data3")
)
# Total time: 2 seconds
print(results) # ['data1', 'data2', 'data3']
asyncio.run(main())
Creating Tasks
import asyncio
async def background_task(name: str):
while True:
print(f"Task {name} running")
await asyncio.sleep(1)
async def main():
# Create task (starts running immediately)
task = asyncio.create_task(background_task("worker"))
# Do other work
await asyncio.sleep(3)
# Cancel task
task.cancel()
try:
await task
except asyncio.CancelledError:
print("Task cancelled")
asyncio.run(main())
Async Context Managers
async with
import asyncio
import aiofiles
class AsyncResource:
async def __aenter__(self):
print("Acquiring resource")
await asyncio.sleep(0.1)
return self
async def __aexit__(self, exc_type, exc_val, exc_tb):
print("Releasing resource")
await asyncio.sleep(0.1)
async def do_work(self):
print("Working...")
async def main():
async with AsyncResource() as resource:
await resource.do_work()
# File operations with aiofiles
async with aiofiles.open("file.txt", "w") as f:
await f.write("Hello, async!")
asyncio.run(main())
Async Iterators
async for
import asyncio
class AsyncCounter:
def __init__(self, stop: int):
self.stop = stop
self.current = 0
def __aiter__(self):
return self
async def __anext__(self):
if self.current >= self.stop:
raise StopAsyncIteration
await asyncio.sleep(0.1)
self.current += 1
return self.current
async def main():
async for num in AsyncCounter(5):
print(num)
asyncio.run(main())
Async Generators
import asyncio
async def async_range(start: int, stop: int):
for i in range(start, stop):
await asyncio.sleep(0.1)
yield i
async def main():
async for num in async_range(1, 6):
print(num)
# Async comprehension
numbers = [num async for num in async_range(1, 6)]
print(numbers)
asyncio.run(main())
Concurrent Execution
asyncio.gather
import asyncio
async def fetch_user(user_id: int) -> dict:
await asyncio.sleep(1)
return {"id": user_id, "name": f"User {user_id}"}
async def main():
# Run multiple coroutines concurrently
users = await asyncio.gather(
fetch_user(1),
fetch_user(2),
fetch_user(3),
return_exceptions=True # Don't fail on first exception
)
print(users)
asyncio.run(main())
asyncio.wait
import asyncio
async def task(name: str, delay: int):
await asyncio.sleep(delay)
return f"Task {name} completed"
async def main():
tasks = [
asyncio.create_task(task("A", 2)),
asyncio.create_task(task("B", 1)),
asyncio.create_task(task("C", 3))
]
# Wait for first to complete
done, pending = await asyncio.wait(
tasks,
return_when=asyncio.FIRST_COMPLETED
)
for t in done:
print(t.result())
# Cancel remaining
for t in pending:
t.cancel()
asyncio.run(main())
asyncio.as_completed
import asyncio
async def fetch(url: str) -> str:
delay = len(url) % 3 + 1
await asyncio.sleep(delay)
return f"Data from {url}"
async def main():
urls = ["url1", "url2", "url3"]
tasks = [fetch(url) for url in urls]
# Process results as they complete
for coro in asyncio.as_completed(tasks):
result = await coro
print(result)
asyncio.run(main())
Timeouts
import asyncio
async def slow_operation():
await asyncio.sleep(10)
return "Done"
async def main():
# Using wait_for
try:
result = await asyncio.wait_for(
slow_operation(),
timeout=2.0
)
except asyncio.TimeoutError:
print("Operation timed out")
# Using timeout context manager (Python 3.11+)
async with asyncio.timeout(2.0):
result = await slow_operation()
# Using timeout_at for absolute deadline
deadline = asyncio.get_event_loop().time() + 2.0
async with asyncio.timeout_at(deadline):
result = await slow_operation()
asyncio.run(main())
Synchronization Primitives
Lock
import asyncio
class Counter:
def __init__(self):
self.value = 0
self._lock = asyncio.Lock()
async def increment(self):
async with self._lock:
current = self.value
await asyncio.sleep(0.01) # Simulate work
self.value = current + 1
async def main():
counter = Counter()
# Without lock: race condition
# With lock: safe concurrent access
await asyncio.gather(*[counter.increment() for _ in range(100)])
print(f"Counter: {counter.value}") # 100
asyncio.run(main())
Semaphore
import asyncio
async def fetch_with_limit(url: str, semaphore: asyncio.Semaphore):
async with semaphore:
print(f"Fetching {url}")
await asyncio.sleep(1)
return f"Data from {url}"
async def main():
# Limit concurrent operations to 3
semaphore = asyncio.Semaphore(3)
urls = [f"url{i}" for i in range(10)]
tasks = [fetch_with_limit(url, semaphore) for url in urls]
results = await asyncio.gather(*tasks)
print(results)
asyncio.run(main())
Event
import asyncio
async def waiter(event: asyncio.Event, name: str):
print(f"{name} waiting for event")
await event.wait()
print(f"{name} got the event!")
async def setter(event: asyncio.Event):
await asyncio.sleep(2)
print("Setting event")
event.set()
async def main():
event = asyncio.Event()
await asyncio.gather(
waiter(event, "Task 1"),
waiter(event, "Task 2"),
setter(event)
)
asyncio.run(main())
Queue
import asyncio
async def producer(queue: asyncio.Queue, n: int):
for i in range(n):
await asyncio.sleep(0.1)
await queue.put(f"item-{i}")
print(f"Produced item-{i}")
await queue.put(None) # Sentinel to stop consumer
async def consumer(queue: asyncio.Queue, name: str):
while True:
item = await queue.get()
if item is None:
queue.task_done()
break
print(f"{name} consumed {item}")
await asyncio.sleep(0.2)
queue.task_done()
async def main():
queue = asyncio.Queue(maxsize=5)
await asyncio.gather(
producer(queue, 10),
consumer(queue, "Consumer 1"),
consumer(queue, "Consumer 2")
)
asyncio.run(main())
HTTP Requests with aiohttp
import asyncio
import aiohttp
async def fetch(session: aiohttp.ClientSession, url: str) -> dict:
async with session.get(url) as response:
return await response.json()
async def fetch_all(urls: list[str]) -> list[dict]:
async with aiohttp.ClientSession() as session:
tasks = [fetch(session, url) for url in urls]
return await asyncio.gather(*tasks)
async def main():
urls = [
"https://api.example.com/users/1",
"https://api.example.com/users/2",
"https://api.example.com/users/3"
]
results = await fetch_all(urls)
print(results)
asyncio.run(main())
Database Operations
import asyncio
import asyncpg
async def main():
# Connect to PostgreSQL
pool = await asyncpg.create_pool(
host='localhost',
database='mydb',
user='user',
password='password',
min_size=5,
max_size=20
)
async with pool.acquire() as conn:
# Execute query
rows = await conn.fetch("SELECT * FROM users LIMIT 10")
for row in rows:
print(dict(row))
# Insert data
await conn.execute(
"INSERT INTO users (name, email) VALUES ($1, $2)",
"John", "john@example.com"
)
# Transaction
async with conn.transaction():
await conn.execute("UPDATE accounts SET balance = balance - 100 WHERE id = 1")
await conn.execute("UPDATE accounts SET balance = balance + 100 WHERE id = 2")
await pool.close()
asyncio.run(main())
Web Server with FastAPI
from fastapi import FastAPI
import asyncio
app = FastAPI()
async def fetch_user_data(user_id: int) -> dict:
await asyncio.sleep(0.1)
return {"id": user_id, "name": f"User {user_id}"}
async def fetch_user_orders(user_id: int) -> list:
await asyncio.sleep(0.1)
return [{"order_id": 1}, {"order_id": 2}]
@app.get("/users/{user_id}")
async def get_user(user_id: int):
# Fetch data concurrently
user, orders = await asyncio.gather(
fetch_user_data(user_id),
fetch_user_orders(user_id)
)
return {"user": user, "orders": orders}
@app.get("/users")
async def get_users():
user_ids = [1, 2, 3, 4, 5]
users = await asyncio.gather(*[
fetch_user_data(uid) for uid in user_ids
])
return users
Best Practices
Error Handling
import asyncio
async def risky_operation():
await asyncio.sleep(1)
raise ValueError("Something went wrong")
async def main():
# Handle individual task errors
try:
result = await risky_operation()
except ValueError as e:
print(f"Error: {e}")
# Handle errors in gather
results = await asyncio.gather(
risky_operation(),
asyncio.sleep(1),
return_exceptions=True
)
for result in results:
if isinstance(result, Exception):
print(f"Task failed: {result}")
else:
print(f"Task succeeded: {result}")
asyncio.run(main())
Task Groups (Python 3.11+)
import asyncio
async def main():
async with asyncio.TaskGroup() as tg:
task1 = tg.create_task(fetch_data(1))
task2 = tg.create_task(fetch_data(2))
task3 = tg.create_task(fetch_data(3))
# All tasks completed successfully
print(task1.result(), task2.result(), task3.result())
asyncio.run(main())
Summary
| Concept | Purpose |
|---|---|
async def | Define coroutine |
await | Pause and wait for result |
asyncio.gather | Run coroutines concurrently |
asyncio.create_task | Schedule coroutine execution |
asyncio.Lock | Synchronize access |
asyncio.Semaphore | Limit concurrency |
asyncio.Queue | Producer-consumer pattern |
AsyncIO enables efficient, scalable Python applications for I/O-bound workloads without multi-threading complexity.
Advertisement
Moshiour Rahman
Software Architect & AI Engineer
Enterprise software architect with deep expertise in financial systems, distributed architecture, and AI-powered applications. Building large-scale systems at Fortune 500 companies. Specializing in LLM orchestration, multi-agent systems, and cloud-native solutions. I share battle-tested patterns from real enterprise projects.
Related Articles
FastAPI Tutorial Part 11: Background Tasks and Celery
Handle long-running operations in FastAPI. Learn built-in BackgroundTasks, Celery integration, task queues, and async processing patterns.
PythonFastAPI Tutorial Part 17: Performance and Caching
Optimize FastAPI performance with caching, async operations, connection pooling, and profiling. Build blazing-fast APIs.
PythonAI Agents Fundamentals: Build Your First Agent from Scratch
Master AI agents from the ground up. Learn the agent loop, build a working agent in pure Python, and understand the foundations that power LangGraph and CrewAI.
Comments
Comments are powered by GitHub Discussions.
Configure Giscus at giscus.app to enable comments.