DevOps 8 min read

Apache Kafka: Event Streaming for Modern Applications

Master Apache Kafka for event-driven architectures. Learn producers, consumers, topics, partitions, Kafka Streams, and build scalable data pipelines.

MR

Moshiour Rahman

Advertisement

What is Apache Kafka?

Apache Kafka is a distributed event streaming platform used for building real-time data pipelines and streaming applications. It’s designed for high-throughput, fault-tolerant, and scalable messaging.

Use Cases

Use CaseExample
MessagingMicroservices communication
Activity TrackingUser clickstreams
MetricsApplication monitoring
Log AggregationCentralized logging
Stream ProcessingReal-time analytics
Event SourcingOrder processing

Core Concepts

Architecture Overview

Producers → Topics (Partitions) → Consumers

         Kafka Cluster (Brokers)

          ZooKeeper / KRaft

Key Components

  • Producer: Publishes messages to topics
  • Consumer: Subscribes and reads messages
  • Topic: Category/feed of messages
  • Partition: Ordered, immutable message log
  • Broker: Kafka server node
  • Consumer Group: Set of consumers sharing workload

Getting Started

Docker Setup

# docker-compose.yml
version: '3.8'
services:
  zookeeper:
    image: confluentinc/cp-zookeeper:7.4.0
    environment:
      ZOOKEEPER_CLIENT_PORT: 2181
      ZOOKEEPER_TICK_TIME: 2000

  kafka:
    image: confluentinc/cp-kafka:7.4.0
    depends_on:
      - zookeeper
    ports:
      - "9092:9092"
    environment:
      KAFKA_BROKER_ID: 1
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9092
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
      KAFKA_AUTO_CREATE_TOPICS_ENABLE: "true"

Node.js with KafkaJS

npm install kafkajs
// kafka.ts
import { Kafka, Producer, Consumer, logLevel } from 'kafkajs';

const kafka = new Kafka({
  clientId: 'my-app',
  brokers: ['localhost:9092'],
  logLevel: logLevel.INFO
});

export const producer: Producer = kafka.producer();
export const consumer: Consumer = kafka.consumer({ groupId: 'my-group' });

export async function connectKafka() {
  await producer.connect();
  await consumer.connect();
  console.log('Kafka connected');
}

export async function disconnectKafka() {
  await producer.disconnect();
  await consumer.disconnect();
  console.log('Kafka disconnected');
}

Producers

Basic Producer

import { producer } from './kafka';

interface OrderEvent {
  orderId: string;
  userId: string;
  items: { productId: string; quantity: number }[];
  total: number;
  timestamp: Date;
}

async function sendOrder(order: OrderEvent) {
  await producer.send({
    topic: 'orders',
    messages: [
      {
        key: order.orderId,
        value: JSON.stringify(order),
        headers: {
          'event-type': 'order-created',
          'source': 'order-service'
        }
      }
    ]
  });
  console.log(`Order ${order.orderId} sent`);
}

Batch Producer

async function sendBatch(orders: OrderEvent[]) {
  const messages = orders.map(order => ({
    key: order.orderId,
    value: JSON.stringify(order)
  }));

  await producer.send({
    topic: 'orders',
    messages
  });

  console.log(`Sent ${orders.length} orders`);
}

// With compression
await producer.send({
  topic: 'orders',
  compression: CompressionTypes.GZIP,
  messages
});

Producer with Acknowledgments

import { Kafka, CompressionTypes } from 'kafkajs';

const kafka = new Kafka({
  clientId: 'my-app',
  brokers: ['localhost:9092']
});

const producer = kafka.producer({
  allowAutoTopicCreation: true,
  transactionTimeout: 30000
});

// Idempotent producer (exactly-once semantics)
const idempotentProducer = kafka.producer({
  idempotent: true,
  maxInFlightRequests: 1
});

async function sendWithAck(topic: string, message: any) {
  const result = await producer.send({
    topic,
    messages: [{ value: JSON.stringify(message) }],
    acks: -1  // Wait for all replicas
  });

  console.log('Message sent:', result);
  return result;
}

Consumers

Basic Consumer

import { consumer } from './kafka';

async function startConsumer() {
  await consumer.subscribe({
    topic: 'orders',
    fromBeginning: false  // Start from latest
  });

  await consumer.run({
    eachMessage: async ({ topic, partition, message }) => {
      const order = JSON.parse(message.value!.toString());

      console.log({
        topic,
        partition,
        offset: message.offset,
        key: message.key?.toString(),
        order
      });

      // Process order
      await processOrder(order);
    }
  });
}

async function processOrder(order: OrderEvent) {
  console.log(`Processing order ${order.orderId}`);
  // Business logic here
}

Batch Consumer

await consumer.run({
  eachBatch: async ({
    batch,
    resolveOffset,
    heartbeat,
    isRunning,
    isStale
  }) => {
    for (const message of batch.messages) {
      if (!isRunning() || isStale()) break;

      const order = JSON.parse(message.value!.toString());
      await processOrder(order);

      resolveOffset(message.offset);
      await heartbeat();
    }
  }
});

Multiple Topics

await consumer.subscribe({
  topics: ['orders', 'payments', 'shipments'],
  fromBeginning: false
});

await consumer.run({
  eachMessage: async ({ topic, message }) => {
    switch (topic) {
      case 'orders':
        await handleOrder(message);
        break;
      case 'payments':
        await handlePayment(message);
        break;
      case 'shipments':
        await handleShipment(message);
        break;
    }
  }
});

Consumer Groups

// Multiple consumers in same group share partitions
const consumer1 = kafka.consumer({ groupId: 'order-processors' });
const consumer2 = kafka.consumer({ groupId: 'order-processors' });

// Different groups receive all messages independently
const analyticsConsumer = kafka.consumer({ groupId: 'analytics' });
const notificationConsumer = kafka.consumer({ groupId: 'notifications' });

Topics and Partitions

Topic Management

const admin = kafka.admin();

async function createTopics() {
  await admin.connect();

  await admin.createTopics({
    topics: [
      {
        topic: 'orders',
        numPartitions: 6,
        replicationFactor: 3,
        configEntries: [
          { name: 'retention.ms', value: '604800000' },  // 7 days
          { name: 'cleanup.policy', value: 'delete' }
        ]
      },
      {
        topic: 'users',
        numPartitions: 3,
        replicationFactor: 3,
        configEntries: [
          { name: 'cleanup.policy', value: 'compact' }  // Keep latest per key
        ]
      }
    ]
  });

  await admin.disconnect();
}

async function listTopics() {
  await admin.connect();
  const topics = await admin.listTopics();
  console.log('Topics:', topics);
  await admin.disconnect();
}

Partition Assignment

// Messages with same key go to same partition
await producer.send({
  topic: 'orders',
  messages: [
    { key: 'user-123', value: JSON.stringify(order1) },
    { key: 'user-123', value: JSON.stringify(order2) },  // Same partition
    { key: 'user-456', value: JSON.stringify(order3) }   // Different partition
  ]
});

// Custom partitioner
const producer = kafka.producer({
  createPartitioner: () => {
    return ({ topic, partitionMetadata, message }) => {
      // Custom logic to determine partition
      const numPartitions = partitionMetadata.length;
      const key = message.key?.toString() || '';
      return Math.abs(hashCode(key)) % numPartitions;
    };
  }
});

Error Handling

Producer Error Handling

producer.on('producer.connect', () => {
  console.log('Producer connected');
});

producer.on('producer.disconnect', () => {
  console.log('Producer disconnected');
});

producer.on('producer.network.request_timeout', (payload) => {
  console.error('Request timeout:', payload);
});

async function sendWithRetry(topic: string, message: any, retries = 3) {
  for (let attempt = 1; attempt <= retries; attempt++) {
    try {
      return await producer.send({
        topic,
        messages: [{ value: JSON.stringify(message) }]
      });
    } catch (error) {
      console.error(`Attempt ${attempt} failed:`, error);
      if (attempt === retries) throw error;
      await new Promise(r => setTimeout(r, 1000 * attempt));
    }
  }
}

Consumer Error Handling

await consumer.run({
  eachMessage: async ({ topic, partition, message }) => {
    try {
      await processMessage(message);
    } catch (error) {
      console.error('Processing error:', error);

      // Send to dead letter queue
      await producer.send({
        topic: `${topic}.dlq`,
        messages: [{
          key: message.key,
          value: message.value,
          headers: {
            ...message.headers,
            'error': error.message,
            'original-topic': topic,
            'failed-at': new Date().toISOString()
          }
        }]
      });
    }
  }
});

// Handle crashes
consumer.on('consumer.crash', async ({ error, restart }) => {
  console.error('Consumer crashed:', error);
  // Optionally restart
  // await restart();
});

Event-Driven Microservices

Order Service

// order-service.ts
class OrderService {
  async createOrder(orderData: CreateOrderDTO): Promise<Order> {
    const order = await this.orderRepository.create(orderData);

    // Publish event
    await producer.send({
      topic: 'orders',
      messages: [{
        key: order.id,
        value: JSON.stringify({
          type: 'ORDER_CREATED',
          data: order,
          timestamp: new Date().toISOString()
        })
      }]
    });

    return order;
  }
}

Inventory Service

// inventory-service.ts
async function startInventoryConsumer() {
  await consumer.subscribe({ topic: 'orders' });

  await consumer.run({
    eachMessage: async ({ message }) => {
      const event = JSON.parse(message.value!.toString());

      switch (event.type) {
        case 'ORDER_CREATED':
          await reserveInventory(event.data);
          break;
        case 'ORDER_CANCELLED':
          await releaseInventory(event.data);
          break;
      }
    }
  });
}

async function reserveInventory(order: Order) {
  for (const item of order.items) {
    await inventoryRepository.reserve(item.productId, item.quantity);
  }

  // Publish inventory reserved event
  await producer.send({
    topic: 'inventory',
    messages: [{
      key: order.id,
      value: JSON.stringify({
        type: 'INVENTORY_RESERVED',
        orderId: order.id,
        timestamp: new Date().toISOString()
      })
    }]
  });
}

Saga Pattern

// saga-orchestrator.ts
interface SagaStep {
  execute: () => Promise<void>;
  compensate: () => Promise<void>;
}

class OrderSaga {
  private steps: SagaStep[] = [];
  private completedSteps: SagaStep[] = [];

  async run(order: Order) {
    this.steps = [
      {
        execute: () => this.reserveInventory(order),
        compensate: () => this.releaseInventory(order)
      },
      {
        execute: () => this.processPayment(order),
        compensate: () => this.refundPayment(order)
      },
      {
        execute: () => this.createShipment(order),
        compensate: () => this.cancelShipment(order)
      }
    ];

    try {
      for (const step of this.steps) {
        await step.execute();
        this.completedSteps.push(step);
      }
    } catch (error) {
      console.error('Saga failed, compensating...', error);
      await this.compensate();
      throw error;
    }
  }

  private async compensate() {
    for (const step of this.completedSteps.reverse()) {
      await step.compensate();
    }
  }
}

Kafka Streams (Java)

// For reference - Kafka Streams is Java-based
Properties props = new Properties();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "order-processor");
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");

StreamsBuilder builder = new StreamsBuilder();

// Read from topic
KStream<String, Order> orders = builder.stream("orders");

// Process stream
KTable<String, Long> orderCounts = orders
    .groupBy((key, order) -> order.getUserId())
    .count();

// Write to topic
orderCounts.toStream().to("order-counts");

KafkaStreams streams = new KafkaStreams(builder.build(), props);
streams.start();

Monitoring

Consumer Lag

const admin = kafka.admin();

async function getConsumerLag(groupId: string) {
  await admin.connect();

  const offsets = await admin.fetchOffsets({
    groupId,
    topics: ['orders']
  });

  const topicOffsets = await admin.fetchTopicOffsets('orders');

  const lag = offsets.map(o => {
    const topicOffset = topicOffsets.find(
      to => to.partition === o.partition
    );
    return {
      partition: o.partition,
      currentOffset: parseInt(o.offset),
      latestOffset: parseInt(topicOffset?.offset || '0'),
      lag: parseInt(topicOffset?.offset || '0') - parseInt(o.offset)
    };
  });

  await admin.disconnect();
  return lag;
}

Metrics

const { InstrumentationEvent } = require('kafkajs');

kafka.on('consumer.group_join', (event) => {
  console.log('Consumer joined group:', event.payload);
});

kafka.on('consumer.fetch', (event) => {
  console.log('Fetch metrics:', event.payload);
});

// Prometheus metrics
const { Registry, Counter, Gauge } = require('prom-client');
const register = new Registry();

const messagesProduced = new Counter({
  name: 'kafka_messages_produced_total',
  help: 'Total messages produced',
  labelNames: ['topic']
});

const consumerLag = new Gauge({
  name: 'kafka_consumer_lag',
  help: 'Consumer lag',
  labelNames: ['topic', 'partition', 'group']
});

register.registerMetric(messagesProduced);
register.registerMetric(consumerLag);

Summary

ConceptPurpose
ProducerSend messages
ConsumerReceive messages
TopicMessage category
PartitionParallel processing
Consumer GroupLoad balancing
OffsetMessage position

Apache Kafka enables building scalable, fault-tolerant event-driven systems for real-time data processing.

Advertisement

MR

Moshiour Rahman

Software Architect & AI Engineer

Share:
MR

Moshiour Rahman

Software Architect & AI Engineer

Enterprise software architect with deep expertise in financial systems, distributed architecture, and AI-powered applications. Building large-scale systems at Fortune 500 companies. Specializing in LLM orchestration, multi-agent systems, and cloud-native solutions. I share battle-tested patterns from real enterprise projects.

Related Articles

Comments

Comments are powered by GitHub Discussions.

Configure Giscus at giscus.app to enable comments.