Apache Kafka: Event Streaming for Modern Applications
Master Apache Kafka for event-driven architectures. Learn producers, consumers, topics, partitions, Kafka Streams, and build scalable data pipelines.
Moshiour Rahman
Advertisement
What is Apache Kafka?
Apache Kafka is a distributed event streaming platform used for building real-time data pipelines and streaming applications. It’s designed for high-throughput, fault-tolerant, and scalable messaging.
Use Cases
| Use Case | Example |
|---|---|
| Messaging | Microservices communication |
| Activity Tracking | User clickstreams |
| Metrics | Application monitoring |
| Log Aggregation | Centralized logging |
| Stream Processing | Real-time analytics |
| Event Sourcing | Order processing |
Core Concepts
Architecture Overview
Producers → Topics (Partitions) → Consumers
↓
Kafka Cluster (Brokers)
↓
ZooKeeper / KRaft
Key Components
- Producer: Publishes messages to topics
- Consumer: Subscribes and reads messages
- Topic: Category/feed of messages
- Partition: Ordered, immutable message log
- Broker: Kafka server node
- Consumer Group: Set of consumers sharing workload
Getting Started
Docker Setup
# docker-compose.yml
version: '3.8'
services:
zookeeper:
image: confluentinc/cp-zookeeper:7.4.0
environment:
ZOOKEEPER_CLIENT_PORT: 2181
ZOOKEEPER_TICK_TIME: 2000
kafka:
image: confluentinc/cp-kafka:7.4.0
depends_on:
- zookeeper
ports:
- "9092:9092"
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9092
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
KAFKA_AUTO_CREATE_TOPICS_ENABLE: "true"
Node.js with KafkaJS
npm install kafkajs
// kafka.ts
import { Kafka, Producer, Consumer, logLevel } from 'kafkajs';
const kafka = new Kafka({
clientId: 'my-app',
brokers: ['localhost:9092'],
logLevel: logLevel.INFO
});
export const producer: Producer = kafka.producer();
export const consumer: Consumer = kafka.consumer({ groupId: 'my-group' });
export async function connectKafka() {
await producer.connect();
await consumer.connect();
console.log('Kafka connected');
}
export async function disconnectKafka() {
await producer.disconnect();
await consumer.disconnect();
console.log('Kafka disconnected');
}
Producers
Basic Producer
import { producer } from './kafka';
interface OrderEvent {
orderId: string;
userId: string;
items: { productId: string; quantity: number }[];
total: number;
timestamp: Date;
}
async function sendOrder(order: OrderEvent) {
await producer.send({
topic: 'orders',
messages: [
{
key: order.orderId,
value: JSON.stringify(order),
headers: {
'event-type': 'order-created',
'source': 'order-service'
}
}
]
});
console.log(`Order ${order.orderId} sent`);
}
Batch Producer
async function sendBatch(orders: OrderEvent[]) {
const messages = orders.map(order => ({
key: order.orderId,
value: JSON.stringify(order)
}));
await producer.send({
topic: 'orders',
messages
});
console.log(`Sent ${orders.length} orders`);
}
// With compression
await producer.send({
topic: 'orders',
compression: CompressionTypes.GZIP,
messages
});
Producer with Acknowledgments
import { Kafka, CompressionTypes } from 'kafkajs';
const kafka = new Kafka({
clientId: 'my-app',
brokers: ['localhost:9092']
});
const producer = kafka.producer({
allowAutoTopicCreation: true,
transactionTimeout: 30000
});
// Idempotent producer (exactly-once semantics)
const idempotentProducer = kafka.producer({
idempotent: true,
maxInFlightRequests: 1
});
async function sendWithAck(topic: string, message: any) {
const result = await producer.send({
topic,
messages: [{ value: JSON.stringify(message) }],
acks: -1 // Wait for all replicas
});
console.log('Message sent:', result);
return result;
}
Consumers
Basic Consumer
import { consumer } from './kafka';
async function startConsumer() {
await consumer.subscribe({
topic: 'orders',
fromBeginning: false // Start from latest
});
await consumer.run({
eachMessage: async ({ topic, partition, message }) => {
const order = JSON.parse(message.value!.toString());
console.log({
topic,
partition,
offset: message.offset,
key: message.key?.toString(),
order
});
// Process order
await processOrder(order);
}
});
}
async function processOrder(order: OrderEvent) {
console.log(`Processing order ${order.orderId}`);
// Business logic here
}
Batch Consumer
await consumer.run({
eachBatch: async ({
batch,
resolveOffset,
heartbeat,
isRunning,
isStale
}) => {
for (const message of batch.messages) {
if (!isRunning() || isStale()) break;
const order = JSON.parse(message.value!.toString());
await processOrder(order);
resolveOffset(message.offset);
await heartbeat();
}
}
});
Multiple Topics
await consumer.subscribe({
topics: ['orders', 'payments', 'shipments'],
fromBeginning: false
});
await consumer.run({
eachMessage: async ({ topic, message }) => {
switch (topic) {
case 'orders':
await handleOrder(message);
break;
case 'payments':
await handlePayment(message);
break;
case 'shipments':
await handleShipment(message);
break;
}
}
});
Consumer Groups
// Multiple consumers in same group share partitions
const consumer1 = kafka.consumer({ groupId: 'order-processors' });
const consumer2 = kafka.consumer({ groupId: 'order-processors' });
// Different groups receive all messages independently
const analyticsConsumer = kafka.consumer({ groupId: 'analytics' });
const notificationConsumer = kafka.consumer({ groupId: 'notifications' });
Topics and Partitions
Topic Management
const admin = kafka.admin();
async function createTopics() {
await admin.connect();
await admin.createTopics({
topics: [
{
topic: 'orders',
numPartitions: 6,
replicationFactor: 3,
configEntries: [
{ name: 'retention.ms', value: '604800000' }, // 7 days
{ name: 'cleanup.policy', value: 'delete' }
]
},
{
topic: 'users',
numPartitions: 3,
replicationFactor: 3,
configEntries: [
{ name: 'cleanup.policy', value: 'compact' } // Keep latest per key
]
}
]
});
await admin.disconnect();
}
async function listTopics() {
await admin.connect();
const topics = await admin.listTopics();
console.log('Topics:', topics);
await admin.disconnect();
}
Partition Assignment
// Messages with same key go to same partition
await producer.send({
topic: 'orders',
messages: [
{ key: 'user-123', value: JSON.stringify(order1) },
{ key: 'user-123', value: JSON.stringify(order2) }, // Same partition
{ key: 'user-456', value: JSON.stringify(order3) } // Different partition
]
});
// Custom partitioner
const producer = kafka.producer({
createPartitioner: () => {
return ({ topic, partitionMetadata, message }) => {
// Custom logic to determine partition
const numPartitions = partitionMetadata.length;
const key = message.key?.toString() || '';
return Math.abs(hashCode(key)) % numPartitions;
};
}
});
Error Handling
Producer Error Handling
producer.on('producer.connect', () => {
console.log('Producer connected');
});
producer.on('producer.disconnect', () => {
console.log('Producer disconnected');
});
producer.on('producer.network.request_timeout', (payload) => {
console.error('Request timeout:', payload);
});
async function sendWithRetry(topic: string, message: any, retries = 3) {
for (let attempt = 1; attempt <= retries; attempt++) {
try {
return await producer.send({
topic,
messages: [{ value: JSON.stringify(message) }]
});
} catch (error) {
console.error(`Attempt ${attempt} failed:`, error);
if (attempt === retries) throw error;
await new Promise(r => setTimeout(r, 1000 * attempt));
}
}
}
Consumer Error Handling
await consumer.run({
eachMessage: async ({ topic, partition, message }) => {
try {
await processMessage(message);
} catch (error) {
console.error('Processing error:', error);
// Send to dead letter queue
await producer.send({
topic: `${topic}.dlq`,
messages: [{
key: message.key,
value: message.value,
headers: {
...message.headers,
'error': error.message,
'original-topic': topic,
'failed-at': new Date().toISOString()
}
}]
});
}
}
});
// Handle crashes
consumer.on('consumer.crash', async ({ error, restart }) => {
console.error('Consumer crashed:', error);
// Optionally restart
// await restart();
});
Event-Driven Microservices
Order Service
// order-service.ts
class OrderService {
async createOrder(orderData: CreateOrderDTO): Promise<Order> {
const order = await this.orderRepository.create(orderData);
// Publish event
await producer.send({
topic: 'orders',
messages: [{
key: order.id,
value: JSON.stringify({
type: 'ORDER_CREATED',
data: order,
timestamp: new Date().toISOString()
})
}]
});
return order;
}
}
Inventory Service
// inventory-service.ts
async function startInventoryConsumer() {
await consumer.subscribe({ topic: 'orders' });
await consumer.run({
eachMessage: async ({ message }) => {
const event = JSON.parse(message.value!.toString());
switch (event.type) {
case 'ORDER_CREATED':
await reserveInventory(event.data);
break;
case 'ORDER_CANCELLED':
await releaseInventory(event.data);
break;
}
}
});
}
async function reserveInventory(order: Order) {
for (const item of order.items) {
await inventoryRepository.reserve(item.productId, item.quantity);
}
// Publish inventory reserved event
await producer.send({
topic: 'inventory',
messages: [{
key: order.id,
value: JSON.stringify({
type: 'INVENTORY_RESERVED',
orderId: order.id,
timestamp: new Date().toISOString()
})
}]
});
}
Saga Pattern
// saga-orchestrator.ts
interface SagaStep {
execute: () => Promise<void>;
compensate: () => Promise<void>;
}
class OrderSaga {
private steps: SagaStep[] = [];
private completedSteps: SagaStep[] = [];
async run(order: Order) {
this.steps = [
{
execute: () => this.reserveInventory(order),
compensate: () => this.releaseInventory(order)
},
{
execute: () => this.processPayment(order),
compensate: () => this.refundPayment(order)
},
{
execute: () => this.createShipment(order),
compensate: () => this.cancelShipment(order)
}
];
try {
for (const step of this.steps) {
await step.execute();
this.completedSteps.push(step);
}
} catch (error) {
console.error('Saga failed, compensating...', error);
await this.compensate();
throw error;
}
}
private async compensate() {
for (const step of this.completedSteps.reverse()) {
await step.compensate();
}
}
}
Kafka Streams (Java)
// For reference - Kafka Streams is Java-based
Properties props = new Properties();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "order-processor");
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
StreamsBuilder builder = new StreamsBuilder();
// Read from topic
KStream<String, Order> orders = builder.stream("orders");
// Process stream
KTable<String, Long> orderCounts = orders
.groupBy((key, order) -> order.getUserId())
.count();
// Write to topic
orderCounts.toStream().to("order-counts");
KafkaStreams streams = new KafkaStreams(builder.build(), props);
streams.start();
Monitoring
Consumer Lag
const admin = kafka.admin();
async function getConsumerLag(groupId: string) {
await admin.connect();
const offsets = await admin.fetchOffsets({
groupId,
topics: ['orders']
});
const topicOffsets = await admin.fetchTopicOffsets('orders');
const lag = offsets.map(o => {
const topicOffset = topicOffsets.find(
to => to.partition === o.partition
);
return {
partition: o.partition,
currentOffset: parseInt(o.offset),
latestOffset: parseInt(topicOffset?.offset || '0'),
lag: parseInt(topicOffset?.offset || '0') - parseInt(o.offset)
};
});
await admin.disconnect();
return lag;
}
Metrics
const { InstrumentationEvent } = require('kafkajs');
kafka.on('consumer.group_join', (event) => {
console.log('Consumer joined group:', event.payload);
});
kafka.on('consumer.fetch', (event) => {
console.log('Fetch metrics:', event.payload);
});
// Prometheus metrics
const { Registry, Counter, Gauge } = require('prom-client');
const register = new Registry();
const messagesProduced = new Counter({
name: 'kafka_messages_produced_total',
help: 'Total messages produced',
labelNames: ['topic']
});
const consumerLag = new Gauge({
name: 'kafka_consumer_lag',
help: 'Consumer lag',
labelNames: ['topic', 'partition', 'group']
});
register.registerMetric(messagesProduced);
register.registerMetric(consumerLag);
Summary
| Concept | Purpose |
|---|---|
| Producer | Send messages |
| Consumer | Receive messages |
| Topic | Message category |
| Partition | Parallel processing |
| Consumer Group | Load balancing |
| Offset | Message position |
Apache Kafka enables building scalable, fault-tolerant event-driven systems for real-time data processing.
Advertisement
Moshiour Rahman
Software Architect & AI Engineer
Enterprise software architect with deep expertise in financial systems, distributed architecture, and AI-powered applications. Building large-scale systems at Fortune 500 companies. Specializing in LLM orchestration, multi-agent systems, and cloud-native solutions. I share battle-tested patterns from real enterprise projects.
Related Articles
Docker Compose for Microservices: Complete Development Guide
Master Docker Compose for local microservices development. Learn multi-container orchestration, networking, volumes, and production-ready configurations.
DevOpsService Mesh with Istio: Complete Kubernetes Guide
Master Istio service mesh for Kubernetes. Learn traffic management, security, observability, and build resilient microservices architectures.
DevOpsPostgreSQL Advanced Guide: From Queries to Performance Tuning
Master PostgreSQL with advanced SQL queries, indexing strategies, performance optimization, JSON support, and production database management.
Comments
Comments are powered by GitHub Discussions.
Configure Giscus at giscus.app to enable comments.