gRPC with Python: Build High-Performance Microservices
Master gRPC for Python microservices. Learn Protocol Buffers, streaming, authentication, and build efficient inter-service communication.
Moshiour Rahman
Advertisement
What is gRPC?
gRPC is a high-performance RPC framework using Protocol Buffers for serialization. It’s faster than REST and ideal for microservices communication.
gRPC vs REST
| Feature | gRPC | REST |
|---|---|---|
| Protocol | HTTP/2 | HTTP/1.1 |
| Payload | Binary (Protobuf) | JSON/XML |
| Streaming | Bidirectional | Limited |
| Code Gen | Automatic | Manual |
| Performance | Faster | Slower |
Getting Started
Installation
pip install grpcio grpcio-tools
Define Service (Proto File)
// user_service.proto
syntax = "proto3";
package user;
service UserService {
// Unary RPC
rpc GetUser(GetUserRequest) returns (User);
rpc CreateUser(CreateUserRequest) returns (User);
rpc UpdateUser(UpdateUserRequest) returns (User);
rpc DeleteUser(DeleteUserRequest) returns (DeleteResponse);
// Server streaming
rpc ListUsers(ListUsersRequest) returns (stream User);
// Client streaming
rpc CreateUsers(stream CreateUserRequest) returns (CreateUsersResponse);
// Bidirectional streaming
rpc Chat(stream ChatMessage) returns (stream ChatMessage);
}
message User {
int32 id = 1;
string name = 2;
string email = 3;
string created_at = 4;
}
message GetUserRequest {
int32 id = 1;
}
message CreateUserRequest {
string name = 1;
string email = 2;
}
message UpdateUserRequest {
int32 id = 1;
string name = 2;
string email = 3;
}
message DeleteUserRequest {
int32 id = 1;
}
message DeleteResponse {
bool success = 1;
string message = 2;
}
message ListUsersRequest {
int32 page = 1;
int32 page_size = 2;
}
message CreateUsersResponse {
int32 created_count = 1;
repeated int32 ids = 2;
}
message ChatMessage {
string user = 1;
string message = 2;
string timestamp = 3;
}
Generate Python Code
python -m grpc_tools.protoc \
-I. \
--python_out=. \
--grpc_python_out=. \
user_service.proto
Server Implementation
Basic Server
import grpc
from concurrent import futures
from datetime import datetime
import user_service_pb2
import user_service_pb2_grpc
# In-memory database
users_db = {}
user_counter = 0
class UserServicer(user_service_pb2_grpc.UserServiceServicer):
def GetUser(self, request, context):
user_id = request.id
if user_id not in users_db:
context.set_code(grpc.StatusCode.NOT_FOUND)
context.set_details(f"User {user_id} not found")
return user_service_pb2.User()
user = users_db[user_id]
return user_service_pb2.User(
id=user_id,
name=user['name'],
email=user['email'],
created_at=user['created_at']
)
def CreateUser(self, request, context):
global user_counter
user_counter += 1
user = {
'name': request.name,
'email': request.email,
'created_at': datetime.now().isoformat()
}
users_db[user_counter] = user
return user_service_pb2.User(
id=user_counter,
name=user['name'],
email=user['email'],
created_at=user['created_at']
)
def UpdateUser(self, request, context):
if request.id not in users_db:
context.set_code(grpc.StatusCode.NOT_FOUND)
context.set_details(f"User {request.id} not found")
return user_service_pb2.User()
users_db[request.id].update({
'name': request.name,
'email': request.email
})
user = users_db[request.id]
return user_service_pb2.User(
id=request.id,
name=user['name'],
email=user['email'],
created_at=user['created_at']
)
def DeleteUser(self, request, context):
if request.id in users_db:
del users_db[request.id]
return user_service_pb2.DeleteResponse(
success=True,
message=f"User {request.id} deleted"
)
return user_service_pb2.DeleteResponse(
success=False,
message=f"User {request.id} not found"
)
def ListUsers(self, request, context):
"""Server streaming - returns multiple users."""
page = request.page or 1
page_size = request.page_size or 10
start = (page - 1) * page_size
user_ids = list(users_db.keys())[start:start + page_size]
for user_id in user_ids:
user = users_db[user_id]
yield user_service_pb2.User(
id=user_id,
name=user['name'],
email=user['email'],
created_at=user['created_at']
)
def CreateUsers(self, request_iterator, context):
"""Client streaming - receives multiple users."""
created_ids = []
for request in request_iterator:
global user_counter
user_counter += 1
users_db[user_counter] = {
'name': request.name,
'email': request.email,
'created_at': datetime.now().isoformat()
}
created_ids.append(user_counter)
return user_service_pb2.CreateUsersResponse(
created_count=len(created_ids),
ids=created_ids
)
def Chat(self, request_iterator, context):
"""Bidirectional streaming - chat example."""
for message in request_iterator:
# Echo back with timestamp
yield user_service_pb2.ChatMessage(
user="Server",
message=f"Received: {message.message}",
timestamp=datetime.now().isoformat()
)
def serve():
server = grpc.server(futures.ThreadPoolExecutor(max_workers=10))
user_service_pb2_grpc.add_UserServiceServicer_to_server(
UserServicer(), server
)
server.add_insecure_port('[::]:50051')
server.start()
print("Server started on port 50051")
server.wait_for_termination()
if __name__ == '__main__':
serve()
Client Implementation
Unary Calls
import grpc
import user_service_pb2
import user_service_pb2_grpc
def run():
with grpc.insecure_channel('localhost:50051') as channel:
stub = user_service_pb2_grpc.UserServiceStub(channel)
# Create user
response = stub.CreateUser(
user_service_pb2.CreateUserRequest(
name="John Doe",
email="john@example.com"
)
)
print(f"Created user: {response.id} - {response.name}")
# Get user
user = stub.GetUser(
user_service_pb2.GetUserRequest(id=response.id)
)
print(f"Got user: {user.name} ({user.email})")
# Update user
updated = stub.UpdateUser(
user_service_pb2.UpdateUserRequest(
id=user.id,
name="John Smith",
email="john.smith@example.com"
)
)
print(f"Updated: {updated.name}")
# Delete user
result = stub.DeleteUser(
user_service_pb2.DeleteUserRequest(id=user.id)
)
print(f"Delete result: {result.message}")
if __name__ == '__main__':
run()
Streaming Calls
import grpc
import user_service_pb2
import user_service_pb2_grpc
def stream_examples():
with grpc.insecure_channel('localhost:50051') as channel:
stub = user_service_pb2_grpc.UserServiceStub(channel)
# Server streaming - list users
print("\n--- Server Streaming: List Users ---")
for user in stub.ListUsers(
user_service_pb2.ListUsersRequest(page=1, page_size=10)
):
print(f"User: {user.id} - {user.name}")
# Client streaming - create multiple users
print("\n--- Client Streaming: Create Users ---")
def generate_users():
users = [
("Alice", "alice@example.com"),
("Bob", "bob@example.com"),
("Charlie", "charlie@example.com")
]
for name, email in users:
yield user_service_pb2.CreateUserRequest(
name=name,
email=email
)
response = stub.CreateUsers(generate_users())
print(f"Created {response.created_count} users: {list(response.ids)}")
# Bidirectional streaming - chat
print("\n--- Bidirectional Streaming: Chat ---")
def generate_messages():
messages = ["Hello", "How are you?", "Goodbye"]
for msg in messages:
yield user_service_pb2.ChatMessage(
user="Client",
message=msg
)
for response in stub.Chat(generate_messages()):
print(f"[{response.user}]: {response.message}")
if __name__ == '__main__':
stream_examples()
Authentication
Token-Based Auth
import grpc
from grpc import StatusCode
class AuthInterceptor(grpc.ServerInterceptor):
def __init__(self, valid_tokens):
self.valid_tokens = valid_tokens
def intercept_service(self, continuation, handler_call_details):
metadata = dict(handler_call_details.invocation_metadata)
token = metadata.get('authorization')
if token not in self.valid_tokens:
def abort(ignored_request, context):
context.abort(StatusCode.UNAUTHENTICATED, 'Invalid token')
return grpc.unary_unary_rpc_method_handler(abort)
return continuation(handler_call_details)
# Server with interceptor
def serve_with_auth():
interceptor = AuthInterceptor({'valid-token-123'})
server = grpc.server(
futures.ThreadPoolExecutor(max_workers=10),
interceptors=[interceptor]
)
# ... add servicers
server.start()
# Client with auth metadata
def client_with_auth():
with grpc.insecure_channel('localhost:50051') as channel:
stub = user_service_pb2_grpc.UserServiceStub(channel)
metadata = [('authorization', 'valid-token-123')]
response = stub.GetUser(
user_service_pb2.GetUserRequest(id=1),
metadata=metadata
)
SSL/TLS
# Server with SSL
def serve_with_ssl():
server = grpc.server(futures.ThreadPoolExecutor(max_workers=10))
with open('server.key', 'rb') as f:
private_key = f.read()
with open('server.crt', 'rb') as f:
certificate_chain = f.read()
server_credentials = grpc.ssl_server_credentials([
(private_key, certificate_chain)
])
server.add_secure_port('[::]:50051', server_credentials)
server.start()
# Client with SSL
def client_with_ssl():
with open('ca.crt', 'rb') as f:
root_certificates = f.read()
credentials = grpc.ssl_channel_credentials(root_certificates)
with grpc.secure_channel('localhost:50051', credentials) as channel:
stub = user_service_pb2_grpc.UserServiceStub(channel)
# ... make calls
Error Handling
import grpc
class UserServicer(user_service_pb2_grpc.UserServiceServicer):
def GetUser(self, request, context):
try:
if request.id <= 0:
context.abort(
grpc.StatusCode.INVALID_ARGUMENT,
"User ID must be positive"
)
if request.id not in users_db:
context.abort(
grpc.StatusCode.NOT_FOUND,
f"User {request.id} not found"
)
# ... return user
except Exception as e:
context.abort(
grpc.StatusCode.INTERNAL,
f"Internal error: {str(e)}"
)
# Client error handling
def client_with_error_handling():
with grpc.insecure_channel('localhost:50051') as channel:
stub = user_service_pb2_grpc.UserServiceStub(channel)
try:
user = stub.GetUser(
user_service_pb2.GetUserRequest(id=999)
)
except grpc.RpcError as e:
print(f"gRPC Error: {e.code()}")
print(f"Details: {e.details()}")
if e.code() == grpc.StatusCode.NOT_FOUND:
print("User not found")
elif e.code() == grpc.StatusCode.INVALID_ARGUMENT:
print("Invalid request")
Async gRPC
import grpc
import asyncio
import user_service_pb2
import user_service_pb2_grpc
class AsyncUserServicer(user_service_pb2_grpc.UserServiceServicer):
async def GetUser(self, request, context):
# Async database call
user = await get_user_from_db(request.id)
if not user:
await context.abort(grpc.StatusCode.NOT_FOUND, "Not found")
return user_service_pb2.User(**user)
async def ListUsers(self, request, context):
async for user in fetch_users_stream():
yield user_service_pb2.User(**user)
async def serve_async():
server = grpc.aio.server()
user_service_pb2_grpc.add_UserServiceServicer_to_server(
AsyncUserServicer(), server
)
server.add_insecure_port('[::]:50051')
await server.start()
await server.wait_for_termination()
# Async client
async def async_client():
async with grpc.aio.insecure_channel('localhost:50051') as channel:
stub = user_service_pb2_grpc.UserServiceStub(channel)
# Unary call
response = await stub.GetUser(
user_service_pb2.GetUserRequest(id=1)
)
# Streaming
async for user in stub.ListUsers(
user_service_pb2.ListUsersRequest()
):
print(user.name)
if __name__ == '__main__':
asyncio.run(serve_async())
Health Checks
from grpc_health.v1 import health_pb2, health_pb2_grpc
from grpc_health.v1.health import HealthServicer
def serve_with_health():
server = grpc.server(futures.ThreadPoolExecutor(max_workers=10))
# Add health service
health_servicer = HealthServicer()
health_pb2_grpc.add_HealthServicer_to_server(health_servicer, server)
# Set service status
health_servicer.set('UserService', health_pb2.HealthCheckResponse.SERVING)
# Add your services
user_service_pb2_grpc.add_UserServiceServicer_to_server(
UserServicer(), server
)
server.add_insecure_port('[::]:50051')
server.start()
Summary
| Feature | gRPC Method |
|---|---|
| Single request/response | Unary RPC |
| Server sends stream | Server streaming |
| Client sends stream | Client streaming |
| Both stream | Bidirectional streaming |
| Type safety | Protocol Buffers |
gRPC provides high-performance, type-safe communication for building scalable microservices.
Advertisement
Moshiour Rahman
Software Architect & AI Engineer
Enterprise software architect with deep expertise in financial systems, distributed architecture, and AI-powered applications. Building large-scale systems at Fortune 500 companies. Specializing in LLM orchestration, multi-agent systems, and cloud-native solutions. I share battle-tested patterns from real enterprise projects.
Related Articles
FastAPI Tutorial: Build Modern Python APIs
Master FastAPI for building high-performance Python APIs. Learn async endpoints, validation, authentication, database integration, and deployment.
PythonFastAPI Tutorial Part 5: Dependency Injection - Share Logic Across Endpoints
Master FastAPI dependency injection for clean, reusable code. Learn database sessions, authentication, pagination, and complex dependency chains.
PythonFastAPI Tutorial Part 20: Building a Production-Ready API
Build a complete production-ready FastAPI application. Combine all concepts into a real-world e-commerce API with authentication, database, and deployment.
Comments
Comments are powered by GitHub Discussions.
Configure Giscus at giscus.app to enable comments.