AWS Lambda Serverless Guide: Build Scalable Applications
Master AWS Lambda for serverless computing. Learn function deployment, API Gateway integration, event triggers, and best practices for production.
Moshiour Rahman
Advertisement
What is AWS Lambda?
AWS Lambda is a serverless compute service that runs your code in response to events. You pay only for the compute time consumed—no servers to manage.
Benefits
| Traditional | Serverless |
|---|---|
| Provision servers | Auto-scaling |
| Pay for idle time | Pay per request |
| Manage infrastructure | Focus on code |
| Manual scaling | Instant scaling |
Getting Started
Simple Lambda Function
# lambda_function.py
import json
def lambda_handler(event, context):
"""
event: Input data (JSON)
context: Runtime information
"""
name = event.get('name', 'World')
return {
'statusCode': 200,
'headers': {
'Content-Type': 'application/json'
},
'body': json.dumps({
'message': f'Hello, {name}!'
})
}
Deploy with AWS CLI
# Create deployment package
zip function.zip lambda_function.py
# Create Lambda function
aws lambda create-function \
--function-name my-function \
--runtime python3.11 \
--role arn:aws:iam::123456789:role/lambda-role \
--handler lambda_function.lambda_handler \
--zip-file fileb://function.zip
# Invoke function
aws lambda invoke \
--function-name my-function \
--payload '{"name": "John"}' \
response.json
Deploy with Serverless Framework
# serverless.yml
service: my-service
provider:
name: aws
runtime: python3.11
region: us-east-1
memorySize: 256
timeout: 30
functions:
hello:
handler: handler.hello
events:
- http:
path: /hello
method: get
processData:
handler: handler.process
events:
- s3:
bucket: my-bucket
event: s3:ObjectCreated:*
npm install -g serverless
serverless deploy
API Gateway Integration
REST API
# handler.py
import json
import boto3
from decimal import Decimal
dynamodb = boto3.resource('dynamodb')
table = dynamodb.Table('users')
def get_users(event, context):
response = table.scan()
return {
'statusCode': 200,
'headers': {'Content-Type': 'application/json'},
'body': json.dumps(response['Items'], default=str)
}
def get_user(event, context):
user_id = event['pathParameters']['id']
response = table.get_item(Key={'id': user_id})
if 'Item' not in response:
return {
'statusCode': 404,
'body': json.dumps({'error': 'User not found'})
}
return {
'statusCode': 200,
'body': json.dumps(response['Item'], default=str)
}
def create_user(event, context):
body = json.loads(event['body'])
item = {
'id': str(uuid.uuid4()),
'name': body['name'],
'email': body['email'],
'created_at': datetime.now().isoformat()
}
table.put_item(Item=item)
return {
'statusCode': 201,
'body': json.dumps(item)
}
# serverless.yml
functions:
getUsers:
handler: handler.get_users
events:
- http:
path: /users
method: get
getUser:
handler: handler.get_user
events:
- http:
path: /users/{id}
method: get
createUser:
handler: handler.create_user
events:
- http:
path: /users
method: post
Request Validation
def create_user(event, context):
try:
body = json.loads(event['body'])
except json.JSONDecodeError:
return {
'statusCode': 400,
'body': json.dumps({'error': 'Invalid JSON'})
}
# Validate required fields
required = ['name', 'email']
missing = [f for f in required if f not in body]
if missing:
return {
'statusCode': 400,
'body': json.dumps({
'error': f'Missing fields: {", ".join(missing)}'
})
}
# Validate email format
import re
if not re.match(r'^[\w\.-]+@[\w\.-]+\.\w+$', body['email']):
return {
'statusCode': 400,
'body': json.dumps({'error': 'Invalid email format'})
}
# Continue with creation...
Event Sources
S3 Trigger
def process_s3_event(event, context):
for record in event['Records']:
bucket = record['s3']['bucket']['name']
key = record['s3']['object']['key']
print(f"Processing {key} from {bucket}")
# Download and process file
s3 = boto3.client('s3')
response = s3.get_object(Bucket=bucket, Key=key)
content = response['Body'].read()
# Process content...
return {'statusCode': 200}
SQS Trigger
def process_sqs_messages(event, context):
for record in event['Records']:
body = json.loads(record['body'])
message_id = record['messageId']
print(f"Processing message {message_id}: {body}")
# Process message...
return {'statusCode': 200}
DynamoDB Streams
def process_dynamodb_stream(event, context):
for record in event['Records']:
event_name = record['eventName'] # INSERT, MODIFY, REMOVE
if event_name == 'INSERT':
new_item = record['dynamodb']['NewImage']
# Process new item
elif event_name == 'MODIFY':
old_item = record['dynamodb']['OldImage']
new_item = record['dynamodb']['NewImage']
# Process update
elif event_name == 'REMOVE':
old_item = record['dynamodb']['OldImage']
# Process deletion
return {'statusCode': 200}
Scheduled Events (Cron)
functions:
dailyReport:
handler: handler.daily_report
events:
- schedule: cron(0 9 * * ? *) # 9 AM UTC daily
hourlyCleanup:
handler: handler.cleanup
events:
- schedule: rate(1 hour)
Layers
Create Layer
# Create layer directory structure
mkdir -p python/lib/python3.11/site-packages
# Install dependencies
pip install requests -t python/lib/python3.11/site-packages/
# Zip layer
zip -r layer.zip python
# Create layer
aws lambda publish-layer-version \
--layer-name my-dependencies \
--zip-file fileb://layer.zip \
--compatible-runtimes python3.11
Use Layer
# serverless.yml
functions:
myFunction:
handler: handler.main
layers:
- arn:aws:lambda:us-east-1:123456789:layer:my-dependencies:1
Environment Variables
# serverless.yml
provider:
environment:
STAGE: ${opt:stage, 'dev'}
TABLE_NAME: users-${self:provider.stage}
functions:
myFunction:
handler: handler.main
environment:
SECRET_KEY: ${env:SECRET_KEY}
API_URL: https://api.example.com
import os
def lambda_handler(event, context):
table_name = os.environ['TABLE_NAME']
secret_key = os.environ['SECRET_KEY']
# Use environment variables...
Cold Starts Optimization
Provisioned Concurrency
functions:
api:
handler: handler.api
provisionedConcurrency: 5
Optimize Imports
# Bad - imports inside handler
def lambda_handler(event, context):
import boto3
import pandas as pd # Slow import
# Good - imports at module level
import boto3
# Initialize outside handler
dynamodb = boto3.resource('dynamodb')
table = dynamodb.Table('my-table')
def lambda_handler(event, context):
# Handler code
pass
Connection Reuse
import boto3
from botocore.config import Config
# Reuse connections
config = Config(
connect_timeout=5,
read_timeout=30,
retries={'max_attempts': 3}
)
# Initialize once, reuse across invocations
dynamodb = boto3.resource('dynamodb', config=config)
s3 = boto3.client('s3', config=config)
Error Handling
import logging
logger = logging.getLogger()
logger.setLevel(logging.INFO)
class ValidationError(Exception):
pass
def lambda_handler(event, context):
try:
# Validate input
if 'id' not in event:
raise ValidationError('Missing required field: id')
# Process
result = process_data(event)
return {
'statusCode': 200,
'body': json.dumps(result)
}
except ValidationError as e:
logger.warning(f"Validation error: {str(e)}")
return {
'statusCode': 400,
'body': json.dumps({'error': str(e)})
}
except Exception as e:
logger.error(f"Unexpected error: {str(e)}", exc_info=True)
return {
'statusCode': 500,
'body': json.dumps({'error': 'Internal server error'})
}
Testing
Local Testing
# test_handler.py
import json
from handler import lambda_handler
def test_hello():
event = {'name': 'John'}
context = {}
response = lambda_handler(event, context)
assert response['statusCode'] == 200
body = json.loads(response['body'])
assert body['message'] == 'Hello, John!'
def test_hello_default():
event = {}
context = {}
response = lambda_handler(event, context)
body = json.loads(response['body'])
assert body['message'] == 'Hello, World!'
SAM Local
# Install SAM CLI
pip install aws-sam-cli
# Test locally
sam local invoke MyFunction -e event.json
# Start local API
sam local start-api
Monitoring
CloudWatch Logs
import logging
logger = logging.getLogger()
logger.setLevel(logging.INFO)
def lambda_handler(event, context):
logger.info(f"Request ID: {context.aws_request_id}")
logger.info(f"Event: {json.dumps(event)}")
# Your code...
logger.info(f"Remaining time: {context.get_remaining_time_in_millis()}ms")
X-Ray Tracing
provider:
tracing:
lambda: true
apiGateway: true
from aws_xray_sdk.core import xray_recorder
from aws_xray_sdk.core import patch_all
patch_all() # Instrument AWS SDK calls
@xray_recorder.capture('process_data')
def process_data(data):
# Your code...
pass
Summary
| Feature | Use Case |
|---|---|
| API Gateway | REST APIs |
| S3 Trigger | File processing |
| SQS Trigger | Queue processing |
| DynamoDB Streams | Data sync |
| Scheduled | Cron jobs |
| Layers | Shared dependencies |
AWS Lambda enables building scalable, cost-effective serverless applications.
Advertisement
Moshiour Rahman
Software Architect & AI Engineer
Enterprise software architect with deep expertise in financial systems, distributed architecture, and AI-powered applications. Building large-scale systems at Fortune 500 companies. Specializing in LLM orchestration, multi-agent systems, and cloud-native solutions. I share battle-tested patterns from real enterprise projects.
Related Articles
AWS EC2 Deployment Guide: Complete Tutorial for Beginners
Deploy applications on AWS EC2 from scratch. Learn instance setup, security groups, load balancing, auto-scaling, and production best practices.
DevOpsStop Wrestling YAML: How to Deploy 50 AI Models with Python Loops
Infrastructure as Code shouldn't be a copy-paste nightmare. Learn how to use Pulumi and Python to programmatically deploy scalable AI infrastructure without the YAML fatigue.
DevOpsTerraform Tutorial: Infrastructure as Code for Beginners
Learn Terraform from scratch. Build and manage cloud infrastructure with code using practical AWS examples, modules, and best practices.
Comments
Comments are powered by GitHub Discussions.
Configure Giscus at giscus.app to enable comments.