Python 7 min read

gRPC with Python: Build High-Performance Microservices

Master gRPC for Python microservices. Learn Protocol Buffers, streaming, authentication, and build efficient inter-service communication.

MR

Moshiour Rahman

Advertisement

What is gRPC?

gRPC is a high-performance RPC framework using Protocol Buffers for serialization. It’s faster than REST and ideal for microservices communication.

gRPC vs REST

FeaturegRPCREST
ProtocolHTTP/2HTTP/1.1
PayloadBinary (Protobuf)JSON/XML
StreamingBidirectionalLimited
Code GenAutomaticManual
PerformanceFasterSlower

Getting Started

Installation

pip install grpcio grpcio-tools

Define Service (Proto File)

// user_service.proto
syntax = "proto3";

package user;

service UserService {
    // Unary RPC
    rpc GetUser(GetUserRequest) returns (User);
    rpc CreateUser(CreateUserRequest) returns (User);
    rpc UpdateUser(UpdateUserRequest) returns (User);
    rpc DeleteUser(DeleteUserRequest) returns (DeleteResponse);

    // Server streaming
    rpc ListUsers(ListUsersRequest) returns (stream User);

    // Client streaming
    rpc CreateUsers(stream CreateUserRequest) returns (CreateUsersResponse);

    // Bidirectional streaming
    rpc Chat(stream ChatMessage) returns (stream ChatMessage);
}

message User {
    int32 id = 1;
    string name = 2;
    string email = 3;
    string created_at = 4;
}

message GetUserRequest {
    int32 id = 1;
}

message CreateUserRequest {
    string name = 1;
    string email = 2;
}

message UpdateUserRequest {
    int32 id = 1;
    string name = 2;
    string email = 3;
}

message DeleteUserRequest {
    int32 id = 1;
}

message DeleteResponse {
    bool success = 1;
    string message = 2;
}

message ListUsersRequest {
    int32 page = 1;
    int32 page_size = 2;
}

message CreateUsersResponse {
    int32 created_count = 1;
    repeated int32 ids = 2;
}

message ChatMessage {
    string user = 1;
    string message = 2;
    string timestamp = 3;
}

Generate Python Code

python -m grpc_tools.protoc \
    -I. \
    --python_out=. \
    --grpc_python_out=. \
    user_service.proto

Server Implementation

Basic Server

import grpc
from concurrent import futures
from datetime import datetime
import user_service_pb2
import user_service_pb2_grpc

# In-memory database
users_db = {}
user_counter = 0

class UserServicer(user_service_pb2_grpc.UserServiceServicer):

    def GetUser(self, request, context):
        user_id = request.id

        if user_id not in users_db:
            context.set_code(grpc.StatusCode.NOT_FOUND)
            context.set_details(f"User {user_id} not found")
            return user_service_pb2.User()

        user = users_db[user_id]
        return user_service_pb2.User(
            id=user_id,
            name=user['name'],
            email=user['email'],
            created_at=user['created_at']
        )

    def CreateUser(self, request, context):
        global user_counter
        user_counter += 1

        user = {
            'name': request.name,
            'email': request.email,
            'created_at': datetime.now().isoformat()
        }
        users_db[user_counter] = user

        return user_service_pb2.User(
            id=user_counter,
            name=user['name'],
            email=user['email'],
            created_at=user['created_at']
        )

    def UpdateUser(self, request, context):
        if request.id not in users_db:
            context.set_code(grpc.StatusCode.NOT_FOUND)
            context.set_details(f"User {request.id} not found")
            return user_service_pb2.User()

        users_db[request.id].update({
            'name': request.name,
            'email': request.email
        })

        user = users_db[request.id]
        return user_service_pb2.User(
            id=request.id,
            name=user['name'],
            email=user['email'],
            created_at=user['created_at']
        )

    def DeleteUser(self, request, context):
        if request.id in users_db:
            del users_db[request.id]
            return user_service_pb2.DeleteResponse(
                success=True,
                message=f"User {request.id} deleted"
            )

        return user_service_pb2.DeleteResponse(
            success=False,
            message=f"User {request.id} not found"
        )

    def ListUsers(self, request, context):
        """Server streaming - returns multiple users."""
        page = request.page or 1
        page_size = request.page_size or 10
        start = (page - 1) * page_size

        user_ids = list(users_db.keys())[start:start + page_size]

        for user_id in user_ids:
            user = users_db[user_id]
            yield user_service_pb2.User(
                id=user_id,
                name=user['name'],
                email=user['email'],
                created_at=user['created_at']
            )

    def CreateUsers(self, request_iterator, context):
        """Client streaming - receives multiple users."""
        created_ids = []

        for request in request_iterator:
            global user_counter
            user_counter += 1

            users_db[user_counter] = {
                'name': request.name,
                'email': request.email,
                'created_at': datetime.now().isoformat()
            }
            created_ids.append(user_counter)

        return user_service_pb2.CreateUsersResponse(
            created_count=len(created_ids),
            ids=created_ids
        )

    def Chat(self, request_iterator, context):
        """Bidirectional streaming - chat example."""
        for message in request_iterator:
            # Echo back with timestamp
            yield user_service_pb2.ChatMessage(
                user="Server",
                message=f"Received: {message.message}",
                timestamp=datetime.now().isoformat()
            )

def serve():
    server = grpc.server(futures.ThreadPoolExecutor(max_workers=10))
    user_service_pb2_grpc.add_UserServiceServicer_to_server(
        UserServicer(), server
    )
    server.add_insecure_port('[::]:50051')
    server.start()
    print("Server started on port 50051")
    server.wait_for_termination()

if __name__ == '__main__':
    serve()

Client Implementation

Unary Calls

import grpc
import user_service_pb2
import user_service_pb2_grpc

def run():
    with grpc.insecure_channel('localhost:50051') as channel:
        stub = user_service_pb2_grpc.UserServiceStub(channel)

        # Create user
        response = stub.CreateUser(
            user_service_pb2.CreateUserRequest(
                name="John Doe",
                email="john@example.com"
            )
        )
        print(f"Created user: {response.id} - {response.name}")

        # Get user
        user = stub.GetUser(
            user_service_pb2.GetUserRequest(id=response.id)
        )
        print(f"Got user: {user.name} ({user.email})")

        # Update user
        updated = stub.UpdateUser(
            user_service_pb2.UpdateUserRequest(
                id=user.id,
                name="John Smith",
                email="john.smith@example.com"
            )
        )
        print(f"Updated: {updated.name}")

        # Delete user
        result = stub.DeleteUser(
            user_service_pb2.DeleteUserRequest(id=user.id)
        )
        print(f"Delete result: {result.message}")

if __name__ == '__main__':
    run()

Streaming Calls

import grpc
import user_service_pb2
import user_service_pb2_grpc

def stream_examples():
    with grpc.insecure_channel('localhost:50051') as channel:
        stub = user_service_pb2_grpc.UserServiceStub(channel)

        # Server streaming - list users
        print("\n--- Server Streaming: List Users ---")
        for user in stub.ListUsers(
            user_service_pb2.ListUsersRequest(page=1, page_size=10)
        ):
            print(f"User: {user.id} - {user.name}")

        # Client streaming - create multiple users
        print("\n--- Client Streaming: Create Users ---")

        def generate_users():
            users = [
                ("Alice", "alice@example.com"),
                ("Bob", "bob@example.com"),
                ("Charlie", "charlie@example.com")
            ]
            for name, email in users:
                yield user_service_pb2.CreateUserRequest(
                    name=name,
                    email=email
                )

        response = stub.CreateUsers(generate_users())
        print(f"Created {response.created_count} users: {list(response.ids)}")

        # Bidirectional streaming - chat
        print("\n--- Bidirectional Streaming: Chat ---")

        def generate_messages():
            messages = ["Hello", "How are you?", "Goodbye"]
            for msg in messages:
                yield user_service_pb2.ChatMessage(
                    user="Client",
                    message=msg
                )

        for response in stub.Chat(generate_messages()):
            print(f"[{response.user}]: {response.message}")

if __name__ == '__main__':
    stream_examples()

Authentication

Token-Based Auth

import grpc
from grpc import StatusCode

class AuthInterceptor(grpc.ServerInterceptor):
    def __init__(self, valid_tokens):
        self.valid_tokens = valid_tokens

    def intercept_service(self, continuation, handler_call_details):
        metadata = dict(handler_call_details.invocation_metadata)
        token = metadata.get('authorization')

        if token not in self.valid_tokens:
            def abort(ignored_request, context):
                context.abort(StatusCode.UNAUTHENTICATED, 'Invalid token')
            return grpc.unary_unary_rpc_method_handler(abort)

        return continuation(handler_call_details)

# Server with interceptor
def serve_with_auth():
    interceptor = AuthInterceptor({'valid-token-123'})

    server = grpc.server(
        futures.ThreadPoolExecutor(max_workers=10),
        interceptors=[interceptor]
    )
    # ... add servicers
    server.start()

# Client with auth metadata
def client_with_auth():
    with grpc.insecure_channel('localhost:50051') as channel:
        stub = user_service_pb2_grpc.UserServiceStub(channel)

        metadata = [('authorization', 'valid-token-123')]

        response = stub.GetUser(
            user_service_pb2.GetUserRequest(id=1),
            metadata=metadata
        )

SSL/TLS

# Server with SSL
def serve_with_ssl():
    server = grpc.server(futures.ThreadPoolExecutor(max_workers=10))

    with open('server.key', 'rb') as f:
        private_key = f.read()
    with open('server.crt', 'rb') as f:
        certificate_chain = f.read()

    server_credentials = grpc.ssl_server_credentials([
        (private_key, certificate_chain)
    ])

    server.add_secure_port('[::]:50051', server_credentials)
    server.start()

# Client with SSL
def client_with_ssl():
    with open('ca.crt', 'rb') as f:
        root_certificates = f.read()

    credentials = grpc.ssl_channel_credentials(root_certificates)

    with grpc.secure_channel('localhost:50051', credentials) as channel:
        stub = user_service_pb2_grpc.UserServiceStub(channel)
        # ... make calls

Error Handling

import grpc

class UserServicer(user_service_pb2_grpc.UserServiceServicer):

    def GetUser(self, request, context):
        try:
            if request.id <= 0:
                context.abort(
                    grpc.StatusCode.INVALID_ARGUMENT,
                    "User ID must be positive"
                )

            if request.id not in users_db:
                context.abort(
                    grpc.StatusCode.NOT_FOUND,
                    f"User {request.id} not found"
                )

            # ... return user

        except Exception as e:
            context.abort(
                grpc.StatusCode.INTERNAL,
                f"Internal error: {str(e)}"
            )

# Client error handling
def client_with_error_handling():
    with grpc.insecure_channel('localhost:50051') as channel:
        stub = user_service_pb2_grpc.UserServiceStub(channel)

        try:
            user = stub.GetUser(
                user_service_pb2.GetUserRequest(id=999)
            )
        except grpc.RpcError as e:
            print(f"gRPC Error: {e.code()}")
            print(f"Details: {e.details()}")

            if e.code() == grpc.StatusCode.NOT_FOUND:
                print("User not found")
            elif e.code() == grpc.StatusCode.INVALID_ARGUMENT:
                print("Invalid request")

Async gRPC

import grpc
import asyncio
import user_service_pb2
import user_service_pb2_grpc

class AsyncUserServicer(user_service_pb2_grpc.UserServiceServicer):

    async def GetUser(self, request, context):
        # Async database call
        user = await get_user_from_db(request.id)

        if not user:
            await context.abort(grpc.StatusCode.NOT_FOUND, "Not found")

        return user_service_pb2.User(**user)

    async def ListUsers(self, request, context):
        async for user in fetch_users_stream():
            yield user_service_pb2.User(**user)

async def serve_async():
    server = grpc.aio.server()
    user_service_pb2_grpc.add_UserServiceServicer_to_server(
        AsyncUserServicer(), server
    )
    server.add_insecure_port('[::]:50051')
    await server.start()
    await server.wait_for_termination()

# Async client
async def async_client():
    async with grpc.aio.insecure_channel('localhost:50051') as channel:
        stub = user_service_pb2_grpc.UserServiceStub(channel)

        # Unary call
        response = await stub.GetUser(
            user_service_pb2.GetUserRequest(id=1)
        )

        # Streaming
        async for user in stub.ListUsers(
            user_service_pb2.ListUsersRequest()
        ):
            print(user.name)

if __name__ == '__main__':
    asyncio.run(serve_async())

Health Checks

from grpc_health.v1 import health_pb2, health_pb2_grpc
from grpc_health.v1.health import HealthServicer

def serve_with_health():
    server = grpc.server(futures.ThreadPoolExecutor(max_workers=10))

    # Add health service
    health_servicer = HealthServicer()
    health_pb2_grpc.add_HealthServicer_to_server(health_servicer, server)

    # Set service status
    health_servicer.set('UserService', health_pb2.HealthCheckResponse.SERVING)

    # Add your services
    user_service_pb2_grpc.add_UserServiceServicer_to_server(
        UserServicer(), server
    )

    server.add_insecure_port('[::]:50051')
    server.start()

Summary

FeaturegRPC Method
Single request/responseUnary RPC
Server sends streamServer streaming
Client sends streamClient streaming
Both streamBidirectional streaming
Type safetyProtocol Buffers

gRPC provides high-performance, type-safe communication for building scalable microservices.

Advertisement

MR

Moshiour Rahman

Software Architect & AI Engineer

Share:
MR

Moshiour Rahman

Software Architect & AI Engineer

Enterprise software architect with deep expertise in financial systems, distributed architecture, and AI-powered applications. Building large-scale systems at Fortune 500 companies. Specializing in LLM orchestration, multi-agent systems, and cloud-native solutions. I share battle-tested patterns from real enterprise projects.

Related Articles

Comments

Comments are powered by GitHub Discussions.

Configure Giscus at giscus.app to enable comments.