Python 7 min read

Python AsyncIO: Master Asynchronous Programming

Learn Python async/await for concurrent programming. Master coroutines, tasks, event loops, and build high-performance async applications.

MR

Moshiour Rahman

Advertisement

What is AsyncIO?

AsyncIO is Python’s built-in library for writing concurrent code using the async/await syntax. It enables efficient handling of I/O-bound operations without multi-threading complexity.

Sync vs Async

SynchronousAsynchronous
Blocking I/ONon-blocking I/O
One task at a timeMultiple tasks concurrently
Simpler codeMore scalable
Threads for concurrencySingle-threaded concurrency

Getting Started

Basic Async Function

import asyncio

async def hello():
    print("Hello")
    await asyncio.sleep(1)  # Non-blocking sleep
    print("World")

# Run the coroutine
asyncio.run(hello())

Coroutines and Tasks

import asyncio

async def fetch_data(delay: int, data: str) -> str:
    print(f"Fetching {data}...")
    await asyncio.sleep(delay)
    print(f"Got {data}")
    return data

async def main():
    # Sequential execution
    result1 = await fetch_data(2, "data1")
    result2 = await fetch_data(2, "data2")
    # Total time: 4 seconds

    # Concurrent execution with gather
    results = await asyncio.gather(
        fetch_data(2, "data1"),
        fetch_data(2, "data2"),
        fetch_data(2, "data3")
    )
    # Total time: 2 seconds
    print(results)  # ['data1', 'data2', 'data3']

asyncio.run(main())

Creating Tasks

import asyncio

async def background_task(name: str):
    while True:
        print(f"Task {name} running")
        await asyncio.sleep(1)

async def main():
    # Create task (starts running immediately)
    task = asyncio.create_task(background_task("worker"))

    # Do other work
    await asyncio.sleep(3)

    # Cancel task
    task.cancel()

    try:
        await task
    except asyncio.CancelledError:
        print("Task cancelled")

asyncio.run(main())

Async Context Managers

async with

import asyncio
import aiofiles

class AsyncResource:
    async def __aenter__(self):
        print("Acquiring resource")
        await asyncio.sleep(0.1)
        return self

    async def __aexit__(self, exc_type, exc_val, exc_tb):
        print("Releasing resource")
        await asyncio.sleep(0.1)

    async def do_work(self):
        print("Working...")

async def main():
    async with AsyncResource() as resource:
        await resource.do_work()

    # File operations with aiofiles
    async with aiofiles.open("file.txt", "w") as f:
        await f.write("Hello, async!")

asyncio.run(main())

Async Iterators

async for

import asyncio

class AsyncCounter:
    def __init__(self, stop: int):
        self.stop = stop
        self.current = 0

    def __aiter__(self):
        return self

    async def __anext__(self):
        if self.current >= self.stop:
            raise StopAsyncIteration
        await asyncio.sleep(0.1)
        self.current += 1
        return self.current

async def main():
    async for num in AsyncCounter(5):
        print(num)

asyncio.run(main())

Async Generators

import asyncio

async def async_range(start: int, stop: int):
    for i in range(start, stop):
        await asyncio.sleep(0.1)
        yield i

async def main():
    async for num in async_range(1, 6):
        print(num)

    # Async comprehension
    numbers = [num async for num in async_range(1, 6)]
    print(numbers)

asyncio.run(main())

Concurrent Execution

asyncio.gather

import asyncio

async def fetch_user(user_id: int) -> dict:
    await asyncio.sleep(1)
    return {"id": user_id, "name": f"User {user_id}"}

async def main():
    # Run multiple coroutines concurrently
    users = await asyncio.gather(
        fetch_user(1),
        fetch_user(2),
        fetch_user(3),
        return_exceptions=True  # Don't fail on first exception
    )
    print(users)

asyncio.run(main())

asyncio.wait

import asyncio

async def task(name: str, delay: int):
    await asyncio.sleep(delay)
    return f"Task {name} completed"

async def main():
    tasks = [
        asyncio.create_task(task("A", 2)),
        asyncio.create_task(task("B", 1)),
        asyncio.create_task(task("C", 3))
    ]

    # Wait for first to complete
    done, pending = await asyncio.wait(
        tasks,
        return_when=asyncio.FIRST_COMPLETED
    )

    for t in done:
        print(t.result())

    # Cancel remaining
    for t in pending:
        t.cancel()

asyncio.run(main())

asyncio.as_completed

import asyncio

async def fetch(url: str) -> str:
    delay = len(url) % 3 + 1
    await asyncio.sleep(delay)
    return f"Data from {url}"

async def main():
    urls = ["url1", "url2", "url3"]
    tasks = [fetch(url) for url in urls]

    # Process results as they complete
    for coro in asyncio.as_completed(tasks):
        result = await coro
        print(result)

asyncio.run(main())

Timeouts

import asyncio

async def slow_operation():
    await asyncio.sleep(10)
    return "Done"

async def main():
    # Using wait_for
    try:
        result = await asyncio.wait_for(
            slow_operation(),
            timeout=2.0
        )
    except asyncio.TimeoutError:
        print("Operation timed out")

    # Using timeout context manager (Python 3.11+)
    async with asyncio.timeout(2.0):
        result = await slow_operation()

    # Using timeout_at for absolute deadline
    deadline = asyncio.get_event_loop().time() + 2.0
    async with asyncio.timeout_at(deadline):
        result = await slow_operation()

asyncio.run(main())

Synchronization Primitives

Lock

import asyncio

class Counter:
    def __init__(self):
        self.value = 0
        self._lock = asyncio.Lock()

    async def increment(self):
        async with self._lock:
            current = self.value
            await asyncio.sleep(0.01)  # Simulate work
            self.value = current + 1

async def main():
    counter = Counter()

    # Without lock: race condition
    # With lock: safe concurrent access
    await asyncio.gather(*[counter.increment() for _ in range(100)])
    print(f"Counter: {counter.value}")  # 100

asyncio.run(main())

Semaphore

import asyncio

async def fetch_with_limit(url: str, semaphore: asyncio.Semaphore):
    async with semaphore:
        print(f"Fetching {url}")
        await asyncio.sleep(1)
        return f"Data from {url}"

async def main():
    # Limit concurrent operations to 3
    semaphore = asyncio.Semaphore(3)

    urls = [f"url{i}" for i in range(10)]
    tasks = [fetch_with_limit(url, semaphore) for url in urls]

    results = await asyncio.gather(*tasks)
    print(results)

asyncio.run(main())

Event

import asyncio

async def waiter(event: asyncio.Event, name: str):
    print(f"{name} waiting for event")
    await event.wait()
    print(f"{name} got the event!")

async def setter(event: asyncio.Event):
    await asyncio.sleep(2)
    print("Setting event")
    event.set()

async def main():
    event = asyncio.Event()

    await asyncio.gather(
        waiter(event, "Task 1"),
        waiter(event, "Task 2"),
        setter(event)
    )

asyncio.run(main())

Queue

import asyncio

async def producer(queue: asyncio.Queue, n: int):
    for i in range(n):
        await asyncio.sleep(0.1)
        await queue.put(f"item-{i}")
        print(f"Produced item-{i}")
    await queue.put(None)  # Sentinel to stop consumer

async def consumer(queue: asyncio.Queue, name: str):
    while True:
        item = await queue.get()
        if item is None:
            queue.task_done()
            break
        print(f"{name} consumed {item}")
        await asyncio.sleep(0.2)
        queue.task_done()

async def main():
    queue = asyncio.Queue(maxsize=5)

    await asyncio.gather(
        producer(queue, 10),
        consumer(queue, "Consumer 1"),
        consumer(queue, "Consumer 2")
    )

asyncio.run(main())

HTTP Requests with aiohttp

import asyncio
import aiohttp

async def fetch(session: aiohttp.ClientSession, url: str) -> dict:
    async with session.get(url) as response:
        return await response.json()

async def fetch_all(urls: list[str]) -> list[dict]:
    async with aiohttp.ClientSession() as session:
        tasks = [fetch(session, url) for url in urls]
        return await asyncio.gather(*tasks)

async def main():
    urls = [
        "https://api.example.com/users/1",
        "https://api.example.com/users/2",
        "https://api.example.com/users/3"
    ]

    results = await fetch_all(urls)
    print(results)

asyncio.run(main())

Database Operations

import asyncio
import asyncpg

async def main():
    # Connect to PostgreSQL
    pool = await asyncpg.create_pool(
        host='localhost',
        database='mydb',
        user='user',
        password='password',
        min_size=5,
        max_size=20
    )

    async with pool.acquire() as conn:
        # Execute query
        rows = await conn.fetch("SELECT * FROM users LIMIT 10")
        for row in rows:
            print(dict(row))

        # Insert data
        await conn.execute(
            "INSERT INTO users (name, email) VALUES ($1, $2)",
            "John", "john@example.com"
        )

        # Transaction
        async with conn.transaction():
            await conn.execute("UPDATE accounts SET balance = balance - 100 WHERE id = 1")
            await conn.execute("UPDATE accounts SET balance = balance + 100 WHERE id = 2")

    await pool.close()

asyncio.run(main())

Web Server with FastAPI

from fastapi import FastAPI
import asyncio

app = FastAPI()

async def fetch_user_data(user_id: int) -> dict:
    await asyncio.sleep(0.1)
    return {"id": user_id, "name": f"User {user_id}"}

async def fetch_user_orders(user_id: int) -> list:
    await asyncio.sleep(0.1)
    return [{"order_id": 1}, {"order_id": 2}]

@app.get("/users/{user_id}")
async def get_user(user_id: int):
    # Fetch data concurrently
    user, orders = await asyncio.gather(
        fetch_user_data(user_id),
        fetch_user_orders(user_id)
    )
    return {"user": user, "orders": orders}

@app.get("/users")
async def get_users():
    user_ids = [1, 2, 3, 4, 5]
    users = await asyncio.gather(*[
        fetch_user_data(uid) for uid in user_ids
    ])
    return users

Best Practices

Error Handling

import asyncio

async def risky_operation():
    await asyncio.sleep(1)
    raise ValueError("Something went wrong")

async def main():
    # Handle individual task errors
    try:
        result = await risky_operation()
    except ValueError as e:
        print(f"Error: {e}")

    # Handle errors in gather
    results = await asyncio.gather(
        risky_operation(),
        asyncio.sleep(1),
        return_exceptions=True
    )

    for result in results:
        if isinstance(result, Exception):
            print(f"Task failed: {result}")
        else:
            print(f"Task succeeded: {result}")

asyncio.run(main())

Task Groups (Python 3.11+)

import asyncio

async def main():
    async with asyncio.TaskGroup() as tg:
        task1 = tg.create_task(fetch_data(1))
        task2 = tg.create_task(fetch_data(2))
        task3 = tg.create_task(fetch_data(3))

    # All tasks completed successfully
    print(task1.result(), task2.result(), task3.result())

asyncio.run(main())

Summary

ConceptPurpose
async defDefine coroutine
awaitPause and wait for result
asyncio.gatherRun coroutines concurrently
asyncio.create_taskSchedule coroutine execution
asyncio.LockSynchronize access
asyncio.SemaphoreLimit concurrency
asyncio.QueueProducer-consumer pattern

AsyncIO enables efficient, scalable Python applications for I/O-bound workloads without multi-threading complexity.

Advertisement

MR

Moshiour Rahman

Software Architect & AI Engineer

Share:
MR

Moshiour Rahman

Software Architect & AI Engineer

Enterprise software architect with deep expertise in financial systems, distributed architecture, and AI-powered applications. Building large-scale systems at Fortune 500 companies. Specializing in LLM orchestration, multi-agent systems, and cloud-native solutions. I share battle-tested patterns from real enterprise projects.

Related Articles

Comments

Comments are powered by GitHub Discussions.

Configure Giscus at giscus.app to enable comments.