Post

Apache Kafka Tutorial: Complete Guide from Beginner to Expert

Master Apache Kafka with this comprehensive tutorial covering installation, configuration, producers, consumers, Kafka Streams, Connect, and production deployment. Learn streaming platform concepts, real-world examples, and best practices for distributed systems.

Apache Kafka Tutorial: Complete Guide from Beginner to Expert

🎯 Learning Path Overview

flowchart TD
    A[🟒 Beginner] --> B[🟑 Intermediate]
    B --> C[🟠 Advanced]
    C --> D[πŸ”΄ Expert]

    A1["πŸ“š Concepts & Setup"] --> A
    A2["βš™οΈ Basic Operations"] --> A

    B1["πŸ”§ Configuration"] --> B
    B2["πŸ“Š Monitoring"] --> B

    C1["πŸ”Œ Connect & Streams"] --> C
    C2["πŸ”’ Security"] --> C

    D1["☁️ Production & K8s"] --> D
    D2["πŸ—οΈ Architecture Patterns"] --> D

πŸ“‹ Quick Start Checklist

  • Java 11+ installed
  • Docker & Docker Compose setup
  • 8GB+ RAM available
  • Basic command line knowledge
  • Text editor/IDE ready

πŸ“š Table of Contents


🟒 Beginner Level

What is Apache Kafka?

Apache Kafka is a distributed streaming platform that acts as a high-throughput message broker for real-time data pipelines. It stores streams of records in fault-tolerant, scalable topics that can be consumed by multiple applications.

Core Concepts

Core Components

  • Broker: A Kafka server that stores and serves messages
  • Topic: A category or feed name to which messages are published
  • Partition: An ordered, immutable sequence of messages within a topic
  • Offset: A unique identifier for each message within a partition

Applications

  • Producer: An application that publishes messages to Kafka topics
  • Consumer: An application that subscribes to topics and processes messages
  • Consumer Group: A group of consumers that work together to consume a topic

Why Use Kafka?

Kafka decouples data producers from consumers, enabling scalable real-time data processing and reliable message delivery. It provides durability, high availability, and can handle millions of messages per second across distributed systems.

Performance Benchmarks

  • Throughput: 2M+ messages/sec per broker
  • Latency: <10ms end-to-end
  • Storage: Petabyte-scale retention
  • Availability: 99.99% uptime in production

Kafka Architecture Overview

flowchart LR
    P1[Producer 1] --> K[Kafka Cluster]
    P2[Producer 2] --> K
    P3[Producer 3] --> K

    K --> C1[Consumer 1]
    K --> C2[Consumer 2]
    K --> C3[Consumer Group]

    subgraph K ["Kafka Cluster"]
        T1[Topic A]
        T2[Topic B]
        T3[Topic C]
    end

    subgraph T1 ["Topic Partitions"]
        P0[Partition 0]
        P1_1[Partition 1]
        P2_1[Partition 2]
    end

Common Use Cases

mindmap
  root((Kafka Use Cases))
    Real-time Analytics
      Monitoring Dashboards
      Live Metrics
      Fraud Detection
    Event Sourcing
      Microservices
      Audit Logs
      State Management
    Data Integration
      ETL Pipelines
      CDC (Change Data Capture)
      Multi-system Sync
    IoT & Streaming
      Sensor Data
      Location Tracking
      Real-time Processing

Key Components Overview

Kafka Connect - Data Integration Framework A framework for connecting Kafka with external systems like databases, file systems, and cloud services. It provides pre-built connectors and handles data ingestion/export without writing custom code.
Kafka Streams - Stream Processing Library A Java library for building real-time stream processing applications that read from and write to Kafka topics. It enables transformations, aggregations, and joins on streaming data with exactly-once processing guarantees.
KRaft - Consensus Protocol Kafka's new consensus protocol that eliminates the dependency on Apache Zookeeper for metadata management. It simplifies deployment, improves scalability, and reduces operational complexity by making Kafka self-managing.

βš™οΈ Prerequisites Checker

System Requirements

  • Java 11+ installed (Java 8 deprecated)
    1
    
    java -version  # Should show 11 or higher
    
  • 8GB+ RAM recommended for production
  • Docker & Docker Compose for containerized setup
  • Basic command line knowledge

Resource Calculator

EnvironmentBrokersRAM/BrokerStorage/BrokerNetwork
Development12GB10GB1Gbps
Testing34GB100GB1Gbps
Production3-516GB+1TB+10Gbps

πŸš€ Installation Options

Download and Setup:

1
2
3
4
# Download latest Kafka
wget https://downloads.apache.org/kafka/2.13-3.8.0/kafka_2.13-3.8.0.tgz
tar -xzf kafka_2.13-3.8.0.tgz
cd kafka_2.13-3.8.0

Initialize KRaft:

1
2
3
4
5
6
# Generate cluster UUID
KAFKA_CLUSTER_ID="$(bin/kafka-storage.sh random-uuid)"
echo "Generated Cluster ID: $KAFKA_CLUSTER_ID"

# Format storage
bin/kafka-storage.sh format -t $KAFKA_CLUSTER_ID -c config/kraft/server.properties

Start Kafka:

1
2
# Start Kafka in KRaft mode
bin/kafka-server-start.sh config/kraft/server.properties

Verification:

1
2
3
# Check if Kafka is running
netstat -an | grep 9092
# Should show: tcp46      0      0  *.9092                 *.*                    LISTEN

Create docker-compose.yml:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
# docker-compose.yml
version: '3.8'
services:
  kafka:
    image: apache/kafka:3.8.0
    hostname: kafka
    container_name: kafka
    ports:
      - "9092:9092"
      - "9093:9093"
    environment:
      KAFKA_NODE_ID: 1
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: 'CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT'
      KAFKA_ADVERTISED_LISTENERS: 'PLAINTEXT://localhost:9092'
      KAFKA_PROCESS_ROLES: 'broker,controller'
      KAFKA_CONTROLLER_QUORUM_VOTERS: '1@kafka:29093'
      KAFKA_LISTENERS: 'PLAINTEXT://kafka:29092,CONTROLLER://kafka:29093'
      KAFKA_INTER_BROKER_LISTENER_NAME: 'PLAINTEXT'
      KAFKA_CONTROLLER_LISTENER_NAMES: 'CONTROLLER'
      KAFKA_LOG_DIRS: '/tmp/kraft-combined-logs'
      CLUSTER_ID: 'MkU3OEVBNTcwNTJENDM2Qk'
    volumes:
      - kafka-data:/tmp/kraft-combined-logs
    healthcheck:
      test: ["CMD-SHELL", "kafka-topics.sh --bootstrap-server localhost:9092 --list"]
      interval: 30s
      timeout: 10s
      retries: 3

volumes:
  kafka-data:

Start and Verify:

1
2
3
4
5
6
# Start services
docker-compose up -d

# Check health
docker-compose ps
docker-compose logs kafka

🚫 Common Installation Issues

flowchart TD
    A[Installation Issue] --> B{Port 9092 in use?}
    B -->|Yes| C["Kill process: lsof -ti:9092 | xargs kill"]
    B -->|No| D{Java version?}
    D -->|< 11| E["Update Java: brew install openjdk@11"]
    D -->|>= 11| F{Memory issue?}
    F -->|Yes| G["Increase heap: export KAFKA_HEAP_OPTS='-Xmx2G -Xms2G'"]
    F -->|No| H[Check logs for specific error]

πŸ“Š Installation Verification

1
2
3
# Test installation
echo "test message" | kafka-console-producer.sh --bootstrap-server localhost:9092 --topic test-topic
kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test-topic --from-beginning

βš™οΈ Basic Operations

Service Management

Start Kafka:

1
2
3
4
5
6
# Start Kafka (KRaft mode - no Zookeeper needed)
bin/kafka-server-start.sh config/kraft/server.propertiesd)
bin/kafka-server-start.sh config/kraft/server.properties

# Start in background
nohup bin/kafka-server-start.sh config/kraft/server.properties > kafka.log 2>&1 &

Stop and Monitor:

1
2
3
4
5
# Stop Kafka
bin/kafka-server-stop.sh

# Check status
ps aux | grep kafka

πŸ“ Topic Management

Create and Configure Topics:

1
2
3
4
5
6
7
8
9
10
# Create topic with custom configuration
bin/kafka-topics.sh --create --bootstrap-server localhost:9092 \
  --replication-factor 1 --partitions 3 --topic my-topic \
  --config retention.ms=86400000 --config compression.type=snappy

# List all topics
bin/kafka-topics.sh --list --bootstrap-server localhost:9092

# Describe topic (shows partitions, replicas, configs)
bin/kafka-topics.sh --describe --bootstrap-server localhost:9092 --topic my-topic

Modify and Delete Topics:

1
2
3
4
5
6
7
8
9
10
# Modify topic (increase partitions)
bin/kafka-topics.sh --alter --bootstrap-server localhost:9092 \
  --topic my-topic --partitions 6

# Delete topic
bin/kafka-topics.sh --delete --bootstrap-server localhost:9092 --topic my-topic

# Topic configuration
bin/kafka-configs.sh --bootstrap-server localhost:9092 \
  --entity-type topics --entity-name my-topic --describe

Topic Partition Strategy

flowchart LR
    subgraph "Topic: orders"
        P0["Partition 0<br/>user-1, user-4"]
        P1["Partition 1<br/>user-2, user-5"]
        P2["Partition 2<br/>user-3, user-6"]
    end

    K1["Key: user-1"] --> P0
    K2["Key: user-2"] --> P1
    K3["Key: user-3"] --> P2

πŸ“€ Producer & Consumer

Basic Producer:

1
2
# Basic producer
bin/kafka-console-producer.sh --bootstrap-server localhost:9092 --topic my-topic

Producer with Key-Value Pairs:

1
2
3
4
# Producer with key-value pairs
bin/kafka-console-producer.sh --bootstrap-server localhost:9092 --topic my-topic \
  --property "parse.key=true" --property "key.separator=:"
# Input: user1:Hello World

Producer with Headers:

1
2
3
4
5
# Producer with headers
bin/kafka-console-producer.sh --bootstrap-server localhost:9092 --topic my-topic \
  --property "parse.headers=true" --property "headers.delimiter=," \
  --property "headers.separator=:"
# Input: header1:value1,header2:value2\tMessage content

Consumer Examples:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
# Consumer from beginning
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 \
  --topic my-topic --from-beginning

# Consumer with group (load balancing)
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 \
  --topic my-topic --group my-group

# Consumer showing keys and headers
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 \
  --topic my-topic --property print.key=true --property print.headers=true

# Consumer from specific offset
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 \
  --topic my-topic --partition 0 --offset 100

πŸ“Š Message Flow Visualization

sequenceDiagram
    participant P as Producer
    participant K as Kafka Broker
    participant C1 as Consumer 1
    participant C2 as Consumer 2

    P->>K: Send message (key: user1)
    K->>K: Route to partition based on key
    K->>C1: Deliver to consumer group member
    P->>K: Send message (key: user2)
    K->>C2: Deliver to different consumer

    Note over K: Messages persisted to disk
    Note over C1,C2: Consumers commit offsets

πŸŽ† Quick Win: First Message

1
2
3
4
5
# Terminal 1: Start consumer
kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic hello-world

# Terminal 2: Send message
echo "Hello Kafka!" | kafka-console-producer.sh --bootstrap-server localhost:9092 --topic hello-world

🟑 Intermediate Level

Apache Kafka Ecosystem 🌐

Kafka is more than just a message broker - it’s a complete streaming platform with rich ecosystem of tools and frameworks:

flowchart LR
    %% Data Sources
    DB[("πŸ—„οΈ<br/>Databases<br/>(MySQL, PostgreSQL)")]
    API["🌐<br/>REST APIs<br/>(Microservices)"]
    FILES["πŸ“„<br/>Log Files<br/>(Application Logs)"]
    IOT["πŸ“‘<br/>IoT Sensors<br/>(Real-time Data)"]

    %% Kafka Connect
    SC["πŸ“₯<br/>Source<br/>Connectors"]
    SINK["πŸ“€<br/>Sink<br/>Connectors"]

    %% Kafka Core - Central
    BROKER["⚑<br/>KAFKA<br/>BROKERS<br/>(Core Platform)"]
    KRAFT["πŸ”§<br/>KRaft<br/>(Consensus)"]

    %% Stream Processing
    KS["🌊<br/>Kafka Streams<br/>(Java Library)"]
    KSQL["πŸ’Ύ<br/>ksqlDB<br/>(SQL Engine)"]
    FLINK["⚑<br/>Apache Flink<br/>(Stream Processing)"]

    %% Schema & Management
    SR["πŸ“‹<br/>Schema<br/>Registry"]
    UI["πŸ–₯️<br/>Kafka UI<br/>(Management)"]

    %% Target Systems
    DW[("🏒<br/>Data Warehouse<br/>(Snowflake, BigQuery)")]
    ELASTIC["πŸ”<br/>Elasticsearch<br/>(Search & Analytics)"]
    S3["☁️<br/>Cloud Storage<br/>(S3, GCS, Azure)"]
    DASH["πŸ“Š<br/>Dashboards<br/>(Grafana, Tableau)"]

    %% Connections - Left to Right Flow
    DB --> SC
    API --> SC
    FILES --> SC
    IOT --> SC

    SC --> BROKER
    KRAFT -.-> BROKER
    SR --> BROKER
    UI --> BROKER

    BROKER --> SINK
    BROKER --> KS
    BROKER --> KSQL
    BROKER --> FLINK

    SINK --> DW
    SINK --> ELASTIC
    SINK --> S3
    KS --> DASH
    KSQL --> DASH
    FLINK --> DASH

    %% Styling
    classDef coreKafka fill:#ff6b35,stroke:#333,stroke-width:4px,color:#fff,font-size:14px
    classDef ecosystem fill:#4a90e2,stroke:#333,stroke-width:3px,color:#fff,font-size:12px
    classDef external fill:#7ed321,stroke:#333,stroke-width:2px,color:#fff,font-size:12px

    class BROKER,KRAFT coreKafka
    class SC,SINK,KS,KSQL,FLINK,SR,UI ecosystem
    class DB,API,FILES,IOT,DW,ELASTIC,S3,DASH external

Ecosystem Components

Core Platform:

  • Kafka Brokers: Message storage and distribution
  • KRaft: Self-managing consensus (replaces Zookeeper)

Data Integration:

Stream Processing:

Schema & Governance:

Monitoring & Operations:

  • JMX Metrics: Built-in monitoring via Java Management Extensions
  • Prometheus + Grafana: Modern monitoring stack
  • Kafka UI: Web-based cluster management

Core Concepts Deep Dive

Partitions & Replication

1
2
3
4
5
6
7
# Multi-partition topic
bin/kafka-topics.sh --create --bootstrap-server localhost:9092 \
  --replication-factor 3 --partitions 6 --topic orders

# Producer with key (ensures ordering per key)
bin/kafka-console-producer.sh --bootstrap-server localhost:9092 \
  --topic orders --property "parse.key=true" --property "key.separator=:"

Consumer Groups

1
2
3
4
5
6
7
# Check consumer group status
bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 \
  --group my-group --describe

# Reset consumer offset
bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 \
  --group my-group --topic my-topic --reset-offsets --to-earliest --execute

Configuration Tuning

Producer Configuration

1
2
3
4
5
6
7
8
# producer.properties
acks=all
retries=2147483647
batch.size=16384
linger.ms=5
compression.type=snappy
max.in.flight.requests.per.connection=5
enable.idempotence=true

Consumer Configuration

1
2
3
4
5
6
# consumer.properties
auto.offset.reset=earliest
enable.auto.commit=false
max.poll.records=500
session.timeout.ms=30000
heartbeat.interval.ms=3000

Broker Configuration

1
2
3
4
5
6
7
8
9
# server.properties
num.network.threads=8
num.io.threads=8
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
log.retention.hours=168
log.segment.bytes=1073741824
log.retention.check.interval.ms=300000

Monitoring & Observability

Modern Monitoring with Prometheus

1
2
3
4
5
6
7
8
9
# Add JMX exporter to Kafka
kafka:
  image: apache/kafka:3.8.0
  environment:
    KAFKA_JMX_PORT: 9999
    KAFKA_OPTS: '-javaagent:/opt/jmx_prometheus_javaagent.jar=7071:/opt/kafka-jmx-config.yml'
  volumes:
    - ./jmx_prometheus_javaagent.jar:/opt/jmx_prometheus_javaagent.jar
    - ./kafka-jmx-config.yml:/opt/kafka-jmx-config.yml

Key Metrics to Monitor

  • Throughput: MessagesInPerSec, BytesInPerSec
  • Latency: ProduceRequestLatency, FetchRequestLatency
  • Consumer Lag: records-lag-max
  • Broker Health: UnderReplicatedPartitions, OfflinePartitionsCount

🟠 Advanced Level

Kafka Connect

Setup Distributed Mode

1
2
3
4
5
6
7
8
9
10
# connect-distributed.properties
bootstrap.servers=localhost:9092
group.id=connect-cluster
key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable=false
value.converter.schemas.enable=false
offset.storage.topic=connect-offsets
config.storage.topic=connect-configs
status.storage.topic=connect-status
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
# Start Connect
bin/connect-distributed.sh config/connect-distributed.properties

# Create connector via REST API
curl -X POST http://localhost:8083/connectors \
  -H "Content-Type: application/json" \
  -d '{
    "name": "file-source",
    "config": {
      "connector.class": "FileStreamSource",
      "tasks.max": "1",
      "file": "/tmp/input.txt",
      "topic": "file-topic"
    }
  }'

Kafka Streams

Basic Stream Processing

1
2
3
4
5
6
7
8
9
10
11
12
13
14
// StreamsExample.java
Properties props = new Properties();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "streams-example");
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");

StreamBuilder builder = new StreamBuilder();
KStream<String, String> source = builder.stream("input-topic");

source.filter((key, value) -> value.length() > 5)
      .mapValues(value -> value.toUpperCase())
      .to("output-topic");

KafkaStreams streams = new KafkaStreams(builder.build(), props);
streams.start();

Stateful Operations

1
2
3
4
5
6
7
8
// Word count example
KStream<String, String> textLines = builder.stream("text-input");
KTable<String, Long> wordCounts = textLines
    .flatMapValues(textLine -> Arrays.asList(textLine.toLowerCase().split("\\W+")))
    .groupBy((key, word) -> word)
    .count(Materialized.as("counts-store"));

wordCounts.toStream().to("word-count-output", Produced.with(Serdes.String(), Serdes.Long()));

Schema Management

Schema Registry (Docker)

1
2
3
4
5
6
7
8
9
10
11
# Add to docker-compose.yml
schema-registry:
  image: confluentinc/cp-schema-registry:7.5.0
  hostname: schema-registry
  depends_on:
    - kafka
  ports:
    - "8081:8081"
  environment:
    SCHEMA_REGISTRY_HOST_NAME: schema-registry
    SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS: 'kafka:29092'
1
2
3
4
# Register schema
curl -X POST -H "Content-Type: application/vnd.schemaregistry.v1+json" \
  --data '{"schema":"{\"type\":\"record\",\"name\":\"User\",\"fields\":[{\"name\":\"id\",\"type\":\"int\"},{\"name\":\"name\",\"type\":\"string\"}]}"}' \
  http://localhost:8081/subjects/user-value/versions

Modern Serialization (JSON Schema)

1
2
3
4
5
6
7
8
// Preferred over Avro for simpler use cases
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "io.confluent.kafka.serializers.json.KafkaJsonSchemaSerializer");
props.put("schema.registry.url", "http://localhost:8081");

Producer<String, User> producer = new KafkaProducer<>(props);

Security Configuration

SSL Setup

1
2
3
4
5
6
# Generate keystore
keytool -keystore kafka.server.keystore.jks -alias localhost \
  -validity 365 -genkey -keyalg RSA

# Create truststore
keytool -keystore kafka.server.truststore.jks -alias CARoot -import -file ca-cert
1
2
3
4
5
6
7
8
# server.properties
listeners=SSL://localhost:9093
security.inter.broker.protocol=SSL
ssl.keystore.location=/path/to/kafka.server.keystore.jks
ssl.keystore.password=password
ssl.key.password=password
ssl.truststore.location=/path/to/kafka.server.truststore.jks
ssl.truststore.password=password

πŸ”΄ Expert Level

Production Architecture

Multi-Datacenter Replication

1
2
3
4
5
6
7
8
9
10
11
12
13
# mm2.properties (MirrorMaker 2.0)
clusters=primary,backup
primary.bootstrap.servers=primary-cluster:9092
backup.bootstrap.servers=backup-cluster:9092

primary->backup.enabled=true
primary->backup.topics=.*
backup->primary.enabled=false

replication.factor=3
checkpoints.topic.replication.factor=3
heartbeats.topic.replication.factor=3
offset-syncs.topic.replication.factor=3

Performance Tuning

1
2
3
4
5
6
7
8
9
10
11
# High-throughput producer
acks=1
compression.type=lz4
batch.size=65536
linger.ms=10
buffer.memory=67108864

# High-throughput consumer
fetch.min.bytes=50000
fetch.max.wait.ms=500
max.partition.fetch.bytes=2097152

Custom Development

Custom Partitioner

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
public class CustomPartitioner implements Partitioner {
    @Override
    public int partition(String topic, Object key, byte[] keyBytes,
                        Object value, byte[] valueBytes, Cluster cluster) {
        List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
        int numPartitions = partitions.size();

        if (key instanceof String) {
            String keyStr = (String) key;
            if (keyStr.startsWith("VIP")) {
                return 0; // VIP messages go to partition 0
            }
        }

        return Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions;
    }
}

Custom Serializer

1
2
3
4
5
6
7
8
9
10
11
12
public class CustomSerializer implements Serializer<CustomObject> {
    @Override
    public byte[] serialize(String topic, CustomObject data) {
        if (data == null) return null;

        try {
            return objectMapper.writeValueAsBytes(data);
        } catch (JsonProcessingException e) {
            throw new SerializationException("Error serializing object", e);
        }
    }
}

Kubernetes Deployment

Strimzi Operator (Current Best Practice)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
# kafka-cluster.yaml
apiVersion: kafka.strimzi.io/v1beta2
kind: Kafka
metadata:
  name: my-cluster
spec:
  kafka:
    version: 3.8.0
    replicas: 3
    listeners:
      - name: plain
        port: 9092
        type: internal
        tls: false
      - name: tls
        port: 9093
        type: internal
        tls: true
    config:
      offsets.topic.replication.factor: 3
      transaction.state.log.replication.factor: 3
      transaction.state.log.min.isr: 2
      # KRaft mode configuration
      process.roles: broker,controller
      controller.quorum.voters: 1@my-cluster-kafka-0.my-cluster-kafka-brokers:9090,2@my-cluster-kafka-1.my-cluster-kafka-brokers:9090,3@my-cluster-kafka-2.my-cluster-kafka-brokers:9090
    storage:
      type: persistent-claim
      size: 100Gi
      class: fast-ssd
  # Remove zookeeper section for KRaft mode
  entityOperator:
    topicOperator: {}
    userOperator: {}

Cloud-Native Patterns

Event Sourcing with Kafka

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
@Component
public class EventStore {
    private final KafkaTemplate<String, Event> kafkaTemplate;

    public CompletableFuture<SendResult<String, Event>> saveEvent(String aggregateId, Event event) {
        ProducerRecord<String, Event> record =
            new ProducerRecord<>("events", aggregateId, event);
        return kafkaTemplate.send(record);
    }

    @KafkaListener(topics = "events")
    public void handleEvent(Event event, @Header("kafka_receivedMessageKey") String aggregateId) {
        // Process event
    }
}

CQRS with Spring Boot

1
2
3
4
5
6
7
8
9
10
11
12
13
@RestController
public class OrderController {

    @Autowired
    private KafkaTemplate<String, OrderEvent> kafkaTemplate;

    @PostMapping("/orders")
    public ResponseEntity<String> createOrder(@RequestBody CreateOrderCommand command) {
        OrderCreatedEvent event = new OrderCreatedEvent(command.getOrderId());
        kafkaTemplate.send("order-events", command.getOrderId(), event);
        return ResponseEntity.accepted().build();
    }
}

Troubleshooting

Modern Debugging Tools

1
2
3
4
5
6
7
8
9
10
11
12
# Check cluster metadata (KRaft)
bin/kafka-metadata-shell.sh --snapshot /tmp/kraft-combined-logs/__cluster_metadata-0/00000000000000000000.log

# Performance testing
bin/kafka-producer-perf-test.sh --topic perf-test --num-records 1000000 \
  --record-size 1024 --throughput 10000 --producer-props bootstrap.servers=localhost:9092

# Consumer lag monitoring
bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --group my-group --describe

# Log analysis
bin/kafka-dump-log.sh --files /tmp/kraft-combined-logs/my-topic-0/00000000000000000000.log --print-data-log

πŸ› οΈ Tools & Utilities

Kafka UI Tools Comparison

ToolTypeFeaturesBest For
AKHQWeb UITopics, consumers, schema registryDevelopment
KafdropWeb UILightweight, topic browsingQuick debugging
Confluent Control CenterEnterpriseFull monitoring, alertingProduction
Kafka ToolDesktopGUI client, message inspectionLocal development
1
2
3
4
5
6
7
8
9
10
11
12
# Add to docker-compose.yml
akhq:
  image: tchiotludo/akhq:latest
  ports:
    - "8080:8080"
  environment:
    AKHQ_CONFIGURATION: |
      akhq:
        connections:
          docker-kafka-server:
            properties:
              bootstrap.servers: "kafka:29092"

CLI Shortcuts & Aliases

1
2
3
4
5
6
7
8
9
10
11
# Add to ~/.bashrc or ~/.zshrc
alias kt='kafka-topics.sh --bootstrap-server localhost:9092'
alias kp='kafka-console-producer.sh --bootstrap-server localhost:9092'
alias kc='kafka-console-consumer.sh --bootstrap-server localhost:9092'
alias kcg='kafka-consumer-groups.sh --bootstrap-server localhost:9092'

# Usage examples
kt --list
kp --topic test
kc --topic test --from-beginning
kcg --group my-group --describe

Monitoring Dashboard Templates

Grafana Dashboard JSON

1
2
3
4
5
6
7
8
9
10
11
12
13
14
{
  "dashboard": {
    "title": "Kafka Monitoring",
    "panels": [
      {
        "title": "Messages per Second",
        "type": "graph",
        "targets": [{
          "expr": "rate(kafka_server_brokertopicmetrics_messagesinpersec_total[5m])"
        }]
      }
    ]
  }
}

🌍 Real-World Examples

E-commerce Order Processing Pipeline

flowchart LR
    UI[Web UI] --> API[Order API]
    API --> K1[orders-created]
    K1 --> INV[Inventory Service]
    K1 --> PAY[Payment Service]
    INV --> K2[inventory-updated]
    PAY --> K3[payment-processed]
    K2 --> FUL[Fulfillment]
    K3 --> FUL
    FUL --> K4[order-shipped]
    K4 --> NOT[Notification Service]

Implementation

Spring Boot Order Service:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
@RestController
public class OrderController {

    @Autowired
    private KafkaTemplate<String, OrderEvent> kafkaTemplate;

    @PostMapping("/orders")
    public ResponseEntity<String> createOrder(@RequestBody Order order) {
        OrderCreatedEvent event = new OrderCreatedEvent(
            order.getId(),
            order.getCustomerId(),
            order.getItems(),
            Instant.now()
        );

        kafkaTemplate.send("orders-created", order.getId(), event);
        return ResponseEntity.accepted().body(order.getId());
    }
}

@KafkaListener(topics = "payment-processed")
public void handlePaymentProcessed(PaymentEvent event) {
    // Update order status
    orderService.markAsPaid(event.getOrderId());

    // Trigger fulfillment
    FulfillmentEvent fulfillmentEvent = new FulfillmentEvent(event.getOrderId());
    kafkaTemplate.send("fulfillment-requested", event.getOrderId(), fulfillmentEvent);
}

IoT Sensor Data Pipeline

flowchart TD
    S1[Temperature Sensors] --> K[Kafka]
    S2[Humidity Sensors] --> K
    S3[Motion Sensors] --> K

    K --> KS[Kafka Streams]
    KS --> A1[Real-time Alerts]
    KS --> A2[Anomaly Detection]
    KS --> DB[(Time Series DB)]

    DB --> D[Dashboard]

Kafka Streams Processing

IoT Data Processing:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
StreamBuilder builder = new StreamBuilder();

// Temperature anomaly detection
KStream<String, SensorReading> temperatureStream = builder
    .stream("sensor-readings")
    .filter((key, reading) -> "temperature".equals(reading.getType()));

KTable<Windowed<String>, Double> avgTemperature = temperatureStream
    .groupByKey()
    .windowedBy(TimeWindows.of(Duration.ofMinutes(5)))
    .aggregate(
        () -> new TemperatureAggregate(),
        (key, reading, aggregate) -> aggregate.add(reading.getValue()),
        Materialized.as("temperature-averages")
    )
    .mapValues(TemperatureAggregate::getAverage);

// Alert on high temperature
avgTemperature
    .toStream()
    .filter((window, avgTemp) -> avgTemp > 35.0)
    .mapValues(temp -> new Alert("HIGH_TEMPERATURE", temp))
    .to("temperature-alerts");

Microservices Event-Driven Architecture

C4Context
    title Event-Driven Microservices with Kafka

    Person(customer, "Customer")
    System(ui, "Web UI")

    System_Boundary(services, "Microservices") {
        System(order, "Order Service")
        System(inventory, "Inventory Service")
        System(payment, "Payment Service")
        System(notification, "Notification Service")
    }

    System(kafka, "Kafka Event Bus")

    Rel(customer, ui, "Places order")
    Rel(ui, order, "HTTP")
    Rel(order, kafka, "Publishes events")
    Rel(kafka, inventory, "Consumes events")
    Rel(kafka, payment, "Consumes events")
    Rel(kafka, notification, "Consumes events")

πŸ”— Resources

Community & Learning

Books & Courses

Tools & Extensions

Conference Talks & Videos

Next Steps & Modern Ecosystem


This comprehensive guide reflects current Kafka best practices as of 2024. KRaft mode is now production-ready and recommended for all new deployments. πŸš€

Found this helpful? ⭐ Star the repository and share with your team!

This post is licensed under CC BY 4.0 by the author.