π― Learning Path Overview
flowchart TD
A[π’ Beginner] --> B[π‘ Intermediate]
B --> C[π Advanced]
C --> D[π΄ Expert]
A1["π Concepts & Setup"] --> A
A2["βοΈ Basic Operations"] --> A
B1["π§ Configuration"] --> B
B2["π Monitoring"] --> B
C1["π Connect & Streams"] --> C
C2["π Security"] --> C
D1["βοΈ Production & K8s"] --> D
D2["ποΈ Architecture Patterns"] --> D
π Quick Start Checklist
- Java 11+ installed
- Docker & Docker Compose setup
- 8GB+ RAM available
- Basic command line knowledge
- Text editor/IDE ready
π Table of Contents
π’ Beginner Level
What is Apache Kafka?
Apache Kafka is a distributed streaming platform that acts as a high-throughput message broker for real-time data pipelines. It stores streams of records in fault-tolerant, scalable topics that can be consumed by multiple applications.
Core Concepts
Core Components
- Broker: A Kafka server that stores and serves messages
- Topic: A category or feed name to which messages are published
- Partition: An ordered, immutable sequence of messages within a topic
- Offset: A unique identifier for each message within a partition
Applications
- Producer: An application that publishes messages to Kafka topics
- Consumer: An application that subscribes to topics and processes messages
- Consumer Group: A group of consumers that work together to consume a topic
Why Use Kafka?
Kafka decouples data producers from consumers, enabling scalable real-time data processing and reliable message delivery. It provides durability, high availability, and can handle millions of messages per second across distributed systems.
- Throughput: 2M+ messages/sec per broker
- Latency: <10ms end-to-end
- Storage: Petabyte-scale retention
- Availability: 99.99% uptime in production
Kafka Architecture Overview
flowchart LR
P1[Producer 1] --> K[Kafka Cluster]
P2[Producer 2] --> K
P3[Producer 3] --> K
K --> C1[Consumer 1]
K --> C2[Consumer 2]
K --> C3[Consumer Group]
subgraph K ["Kafka Cluster"]
T1[Topic A]
T2[Topic B]
T3[Topic C]
end
subgraph T1 ["Topic Partitions"]
P0[Partition 0]
P1_1[Partition 1]
P2_1[Partition 2]
end
Common Use Cases
mindmap
root((Kafka Use Cases))
Real-time Analytics
Monitoring Dashboards
Live Metrics
Fraud Detection
Event Sourcing
Microservices
Audit Logs
State Management
Data Integration
ETL Pipelines
CDC (Change Data Capture)
Multi-system Sync
IoT & Streaming
Sensor Data
Location Tracking
Real-time Processing
Key Components Overview
Kafka Connect - Data Integration Framework
A framework for connecting Kafka with external systems like databases, file systems, and cloud services. It provides pre-built connectors and handles data ingestion/export without writing custom code. Kafka Streams - Stream Processing Library
A Java library for building real-time stream processing applications that read from and write to Kafka topics. It enables transformations, aggregations, and joins on streaming data with exactly-once processing guarantees. KRaft - Consensus Protocol
Kafka's new consensus protocol that eliminates the dependency on Apache Zookeeper for metadata management. It simplifies deployment, improves scalability, and reduces operational complexity by making Kafka self-managing. βοΈ Prerequisites Checker
System Requirements
- Java 11+ installed (Java 8 deprecated)
1
| java -version # Should show 11 or higher
|
- 8GB+ RAM recommended for production
- Docker & Docker Compose for containerized setup
- Basic command line knowledge
Resource Calculator
Environment | Brokers | RAM/Broker | Storage/Broker | Network |
---|
Development | 1 | 2GB | 10GB | 1Gbps |
Testing | 3 | 4GB | 100GB | 1Gbps |
Production | 3-5 | 16GB+ | 1TB+ | 10Gbps |
π Installation Options
Option 1: KRaft Mode (Recommended - No Zookeeper)
Download and Setup:
1
2
3
4
| # Download latest Kafka
wget https://downloads.apache.org/kafka/2.13-3.8.0/kafka_2.13-3.8.0.tgz
tar -xzf kafka_2.13-3.8.0.tgz
cd kafka_2.13-3.8.0
|
Initialize KRaft:
1
2
3
4
5
6
| # Generate cluster UUID
KAFKA_CLUSTER_ID="$(bin/kafka-storage.sh random-uuid)"
echo "Generated Cluster ID: $KAFKA_CLUSTER_ID"
# Format storage
bin/kafka-storage.sh format -t $KAFKA_CLUSTER_ID -c config/kraft/server.properties
|
Start Kafka:
1
2
| # Start Kafka in KRaft mode
bin/kafka-server-start.sh config/kraft/server.properties
|
Verification:
1
2
3
| # Check if Kafka is running
netstat -an | grep 9092
# Should show: tcp46 0 0 *.9092 *.* LISTEN
|
Option 2: Docker with KRaft (Recommended) π³
Create docker-compose.yml:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
| # docker-compose.yml
version: '3.8'
services:
kafka:
image: apache/kafka:3.8.0
hostname: kafka
container_name: kafka
ports:
- "9092:9092"
- "9093:9093"
environment:
KAFKA_NODE_ID: 1
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: 'CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT'
KAFKA_ADVERTISED_LISTENERS: 'PLAINTEXT://localhost:9092'
KAFKA_PROCESS_ROLES: 'broker,controller'
KAFKA_CONTROLLER_QUORUM_VOTERS: '1@kafka:29093'
KAFKA_LISTENERS: 'PLAINTEXT://kafka:29092,CONTROLLER://kafka:29093'
KAFKA_INTER_BROKER_LISTENER_NAME: 'PLAINTEXT'
KAFKA_CONTROLLER_LISTENER_NAMES: 'CONTROLLER'
KAFKA_LOG_DIRS: '/tmp/kraft-combined-logs'
CLUSTER_ID: 'MkU3OEVBNTcwNTJENDM2Qk'
volumes:
- kafka-data:/tmp/kraft-combined-logs
healthcheck:
test: ["CMD-SHELL", "kafka-topics.sh --bootstrap-server localhost:9092 --list"]
interval: 30s
timeout: 10s
retries: 3
volumes:
kafka-data:
|
Start and Verify:
1
2
3
4
5
6
| # Start services
docker-compose up -d
# Check health
docker-compose ps
docker-compose logs kafka
|
π« Common Installation Issues
flowchart TD
A[Installation Issue] --> B{Port 9092 in use?}
B -->|Yes| C["Kill process: lsof -ti:9092 | xargs kill"]
B -->|No| D{Java version?}
D -->|< 11| E["Update Java: brew install openjdk@11"]
D -->|>= 11| F{Memory issue?}
F -->|Yes| G["Increase heap: export KAFKA_HEAP_OPTS='-Xmx2G -Xms2G'"]
F -->|No| H[Check logs for specific error]
π Installation Verification
1
2
3
| # Test installation
echo "test message" | kafka-console-producer.sh --bootstrap-server localhost:9092 --topic test-topic
kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test-topic --from-beginning
|
βοΈ Basic Operations
Service Management
Start Kafka:
1
2
3
4
5
6
| # Start Kafka (KRaft mode - no Zookeeper needed)
bin/kafka-server-start.sh config/kraft/server.propertiesd)
bin/kafka-server-start.sh config/kraft/server.properties
# Start in background
nohup bin/kafka-server-start.sh config/kraft/server.properties > kafka.log 2>&1 &
|
Stop and Monitor:
1
2
3
4
5
| # Stop Kafka
bin/kafka-server-stop.sh
# Check status
ps aux | grep kafka
|
π Topic Management
Create and Configure Topics:
1
2
3
4
5
6
7
8
9
10
| # Create topic with custom configuration
bin/kafka-topics.sh --create --bootstrap-server localhost:9092 \
--replication-factor 1 --partitions 3 --topic my-topic \
--config retention.ms=86400000 --config compression.type=snappy
# List all topics
bin/kafka-topics.sh --list --bootstrap-server localhost:9092
# Describe topic (shows partitions, replicas, configs)
bin/kafka-topics.sh --describe --bootstrap-server localhost:9092 --topic my-topic
|
Modify and Delete Topics:
1
2
3
4
5
6
7
8
9
10
| # Modify topic (increase partitions)
bin/kafka-topics.sh --alter --bootstrap-server localhost:9092 \
--topic my-topic --partitions 6
# Delete topic
bin/kafka-topics.sh --delete --bootstrap-server localhost:9092 --topic my-topic
# Topic configuration
bin/kafka-configs.sh --bootstrap-server localhost:9092 \
--entity-type topics --entity-name my-topic --describe
|
Topic Partition Strategy
flowchart LR
subgraph "Topic: orders"
P0["Partition 0<br/>user-1, user-4"]
P1["Partition 1<br/>user-2, user-5"]
P2["Partition 2<br/>user-3, user-6"]
end
K1["Key: user-1"] --> P0
K2["Key: user-2"] --> P1
K3["Key: user-3"] --> P2
π€ Producer & Consumer
Basic Producer:
1
2
| # Basic producer
bin/kafka-console-producer.sh --bootstrap-server localhost:9092 --topic my-topic
|
Producer with Key-Value Pairs:
1
2
3
4
| # Producer with key-value pairs
bin/kafka-console-producer.sh --bootstrap-server localhost:9092 --topic my-topic \
--property "parse.key=true" --property "key.separator=:"
# Input: user1:Hello World
|
Producer with Headers:
1
2
3
4
5
| # Producer with headers
bin/kafka-console-producer.sh --bootstrap-server localhost:9092 --topic my-topic \
--property "parse.headers=true" --property "headers.delimiter=," \
--property "headers.separator=:"
# Input: header1:value1,header2:value2\tMessage content
|
Consumer Examples:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
| # Consumer from beginning
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 \
--topic my-topic --from-beginning
# Consumer with group (load balancing)
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 \
--topic my-topic --group my-group
# Consumer showing keys and headers
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 \
--topic my-topic --property print.key=true --property print.headers=true
# Consumer from specific offset
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 \
--topic my-topic --partition 0 --offset 100
|
π Message Flow Visualization
sequenceDiagram
participant P as Producer
participant K as Kafka Broker
participant C1 as Consumer 1
participant C2 as Consumer 2
P->>K: Send message (key: user1)
K->>K: Route to partition based on key
K->>C1: Deliver to consumer group member
P->>K: Send message (key: user2)
K->>C2: Deliver to different consumer
Note over K: Messages persisted to disk
Note over C1,C2: Consumers commit offsets
π Quick Win: First Message
1
2
3
4
5
| # Terminal 1: Start consumer
kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic hello-world
# Terminal 2: Send message
echo "Hello Kafka!" | kafka-console-producer.sh --bootstrap-server localhost:9092 --topic hello-world
|
Apache Kafka Ecosystem π
Kafka is more than just a message broker - itβs a complete streaming platform with rich ecosystem of tools and frameworks:
flowchart LR
%% Data Sources
DB[("ποΈ<br/>Databases<br/>(MySQL, PostgreSQL)")]
API["π<br/>REST APIs<br/>(Microservices)"]
FILES["π<br/>Log Files<br/>(Application Logs)"]
IOT["π‘<br/>IoT Sensors<br/>(Real-time Data)"]
%% Kafka Connect
SC["π₯<br/>Source<br/>Connectors"]
SINK["π€<br/>Sink<br/>Connectors"]
%% Kafka Core - Central
BROKER["β‘<br/>KAFKA<br/>BROKERS<br/>(Core Platform)"]
KRAFT["π§<br/>KRaft<br/>(Consensus)"]
%% Stream Processing
KS["π<br/>Kafka Streams<br/>(Java Library)"]
KSQL["πΎ<br/>ksqlDB<br/>(SQL Engine)"]
FLINK["β‘<br/>Apache Flink<br/>(Stream Processing)"]
%% Schema & Management
SR["π<br/>Schema<br/>Registry"]
UI["π₯οΈ<br/>Kafka UI<br/>(Management)"]
%% Target Systems
DW[("π’<br/>Data Warehouse<br/>(Snowflake, BigQuery)")]
ELASTIC["π<br/>Elasticsearch<br/>(Search & Analytics)"]
S3["βοΈ<br/>Cloud Storage<br/>(S3, GCS, Azure)"]
DASH["π<br/>Dashboards<br/>(Grafana, Tableau)"]
%% Connections - Left to Right Flow
DB --> SC
API --> SC
FILES --> SC
IOT --> SC
SC --> BROKER
KRAFT -.-> BROKER
SR --> BROKER
UI --> BROKER
BROKER --> SINK
BROKER --> KS
BROKER --> KSQL
BROKER --> FLINK
SINK --> DW
SINK --> ELASTIC
SINK --> S3
KS --> DASH
KSQL --> DASH
FLINK --> DASH
%% Styling
classDef coreKafka fill:#ff6b35,stroke:#333,stroke-width:4px,color:#fff,font-size:14px
classDef ecosystem fill:#4a90e2,stroke:#333,stroke-width:3px,color:#fff,font-size:12px
classDef external fill:#7ed321,stroke:#333,stroke-width:2px,color:#fff,font-size:12px
class BROKER,KRAFT coreKafka
class SC,SINK,KS,KSQL,FLINK,SR,UI ecosystem
class DB,API,FILES,IOT,DW,ELASTIC,S3,DASH external
Ecosystem Components
Core Platform:
- Kafka Brokers: Message storage and distribution
- KRaft: Self-managing consensus (replaces Zookeeper)
Data Integration:
Stream Processing:
Schema & Governance:
Monitoring & Operations:
- JMX Metrics: Built-in monitoring via Java Management Extensions
- Prometheus + Grafana: Modern monitoring stack
- Kafka UI: Web-based cluster management
Core Concepts Deep Dive
Partitions & Replication
1
2
3
4
5
6
7
| # Multi-partition topic
bin/kafka-topics.sh --create --bootstrap-server localhost:9092 \
--replication-factor 3 --partitions 6 --topic orders
# Producer with key (ensures ordering per key)
bin/kafka-console-producer.sh --bootstrap-server localhost:9092 \
--topic orders --property "parse.key=true" --property "key.separator=:"
|
Consumer Groups
1
2
3
4
5
6
7
| # Check consumer group status
bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 \
--group my-group --describe
# Reset consumer offset
bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 \
--group my-group --topic my-topic --reset-offsets --to-earliest --execute
|
Configuration Tuning
Producer Configuration
1
2
3
4
5
6
7
8
| # producer.properties
acks=all
retries=2147483647
batch.size=16384
linger.ms=5
compression.type=snappy
max.in.flight.requests.per.connection=5
enable.idempotence=true
|
Consumer Configuration
1
2
3
4
5
6
| # consumer.properties
auto.offset.reset=earliest
enable.auto.commit=false
max.poll.records=500
session.timeout.ms=30000
heartbeat.interval.ms=3000
|
Broker Configuration
1
2
3
4
5
6
7
8
9
| # server.properties
num.network.threads=8
num.io.threads=8
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
log.retention.hours=168
log.segment.bytes=1073741824
log.retention.check.interval.ms=300000
|
Monitoring & Observability
Modern Monitoring with Prometheus
1
2
3
4
5
6
7
8
9
| # Add JMX exporter to Kafka
kafka:
image: apache/kafka:3.8.0
environment:
KAFKA_JMX_PORT: 9999
KAFKA_OPTS: '-javaagent:/opt/jmx_prometheus_javaagent.jar=7071:/opt/kafka-jmx-config.yml'
volumes:
- ./jmx_prometheus_javaagent.jar:/opt/jmx_prometheus_javaagent.jar
- ./kafka-jmx-config.yml:/opt/kafka-jmx-config.yml
|
Key Metrics to Monitor
- Throughput: MessagesInPerSec, BytesInPerSec
- Latency: ProduceRequestLatency, FetchRequestLatency
- Consumer Lag: records-lag-max
- Broker Health: UnderReplicatedPartitions, OfflinePartitionsCount
π Advanced Level
Kafka Connect
Setup Distributed Mode
1
2
3
4
5
6
7
8
9
10
| # connect-distributed.properties
bootstrap.servers=localhost:9092
group.id=connect-cluster
key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable=false
value.converter.schemas.enable=false
offset.storage.topic=connect-offsets
config.storage.topic=connect-configs
status.storage.topic=connect-status
|
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
| # Start Connect
bin/connect-distributed.sh config/connect-distributed.properties
# Create connector via REST API
curl -X POST http://localhost:8083/connectors \
-H "Content-Type: application/json" \
-d '{
"name": "file-source",
"config": {
"connector.class": "FileStreamSource",
"tasks.max": "1",
"file": "/tmp/input.txt",
"topic": "file-topic"
}
}'
|
Kafka Streams
Basic Stream Processing
1
2
3
4
5
6
7
8
9
10
11
12
13
14
| // StreamsExample.java
Properties props = new Properties();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "streams-example");
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
StreamBuilder builder = new StreamBuilder();
KStream<String, String> source = builder.stream("input-topic");
source.filter((key, value) -> value.length() > 5)
.mapValues(value -> value.toUpperCase())
.to("output-topic");
KafkaStreams streams = new KafkaStreams(builder.build(), props);
streams.start();
|
Stateful Operations
1
2
3
4
5
6
7
8
| // Word count example
KStream<String, String> textLines = builder.stream("text-input");
KTable<String, Long> wordCounts = textLines
.flatMapValues(textLine -> Arrays.asList(textLine.toLowerCase().split("\\W+")))
.groupBy((key, word) -> word)
.count(Materialized.as("counts-store"));
wordCounts.toStream().to("word-count-output", Produced.with(Serdes.String(), Serdes.Long()));
|
Schema Management
Schema Registry (Docker)
1
2
3
4
5
6
7
8
9
10
11
| # Add to docker-compose.yml
schema-registry:
image: confluentinc/cp-schema-registry:7.5.0
hostname: schema-registry
depends_on:
- kafka
ports:
- "8081:8081"
environment:
SCHEMA_REGISTRY_HOST_NAME: schema-registry
SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS: 'kafka:29092'
|
1
2
3
4
| # Register schema
curl -X POST -H "Content-Type: application/vnd.schemaregistry.v1+json" \
--data '{"schema":"{\"type\":\"record\",\"name\":\"User\",\"fields\":[{\"name\":\"id\",\"type\":\"int\"},{\"name\":\"name\",\"type\":\"string\"}]}"}' \
http://localhost:8081/subjects/user-value/versions
|
Modern Serialization (JSON Schema)
1
2
3
4
5
6
7
8
| // Preferred over Avro for simpler use cases
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "io.confluent.kafka.serializers.json.KafkaJsonSchemaSerializer");
props.put("schema.registry.url", "http://localhost:8081");
Producer<String, User> producer = new KafkaProducer<>(props);
|
Security Configuration
SSL Setup
1
2
3
4
5
6
| # Generate keystore
keytool -keystore kafka.server.keystore.jks -alias localhost \
-validity 365 -genkey -keyalg RSA
# Create truststore
keytool -keystore kafka.server.truststore.jks -alias CARoot -import -file ca-cert
|
1
2
3
4
5
6
7
8
| # server.properties
listeners=SSL://localhost:9093
security.inter.broker.protocol=SSL
ssl.keystore.location=/path/to/kafka.server.keystore.jks
ssl.keystore.password=password
ssl.key.password=password
ssl.truststore.location=/path/to/kafka.server.truststore.jks
ssl.truststore.password=password
|
π΄ Expert Level
Production Architecture
Multi-Datacenter Replication
1
2
3
4
5
6
7
8
9
10
11
12
13
| # mm2.properties (MirrorMaker 2.0)
clusters=primary,backup
primary.bootstrap.servers=primary-cluster:9092
backup.bootstrap.servers=backup-cluster:9092
primary->backup.enabled=true
primary->backup.topics=.*
backup->primary.enabled=false
replication.factor=3
checkpoints.topic.replication.factor=3
heartbeats.topic.replication.factor=3
offset-syncs.topic.replication.factor=3
|
1
2
3
4
5
6
7
8
9
10
11
| # High-throughput producer
acks=1
compression.type=lz4
batch.size=65536
linger.ms=10
buffer.memory=67108864
# High-throughput consumer
fetch.min.bytes=50000
fetch.max.wait.ms=500
max.partition.fetch.bytes=2097152
|
Custom Development
Custom Partitioner
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
| public class CustomPartitioner implements Partitioner {
@Override
public int partition(String topic, Object key, byte[] keyBytes,
Object value, byte[] valueBytes, Cluster cluster) {
List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
int numPartitions = partitions.size();
if (key instanceof String) {
String keyStr = (String) key;
if (keyStr.startsWith("VIP")) {
return 0; // VIP messages go to partition 0
}
}
return Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions;
}
}
|
Custom Serializer
1
2
3
4
5
6
7
8
9
10
11
12
| public class CustomSerializer implements Serializer<CustomObject> {
@Override
public byte[] serialize(String topic, CustomObject data) {
if (data == null) return null;
try {
return objectMapper.writeValueAsBytes(data);
} catch (JsonProcessingException e) {
throw new SerializationException("Error serializing object", e);
}
}
}
|
Kubernetes Deployment
Strimzi Operator (Current Best Practice)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
| # kafka-cluster.yaml
apiVersion: kafka.strimzi.io/v1beta2
kind: Kafka
metadata:
name: my-cluster
spec:
kafka:
version: 3.8.0
replicas: 3
listeners:
- name: plain
port: 9092
type: internal
tls: false
- name: tls
port: 9093
type: internal
tls: true
config:
offsets.topic.replication.factor: 3
transaction.state.log.replication.factor: 3
transaction.state.log.min.isr: 2
# KRaft mode configuration
process.roles: broker,controller
controller.quorum.voters: 1@my-cluster-kafka-0.my-cluster-kafka-brokers:9090,2@my-cluster-kafka-1.my-cluster-kafka-brokers:9090,3@my-cluster-kafka-2.my-cluster-kafka-brokers:9090
storage:
type: persistent-claim
size: 100Gi
class: fast-ssd
# Remove zookeeper section for KRaft mode
entityOperator:
topicOperator: {}
userOperator: {}
|
Cloud-Native Patterns
Event Sourcing with Kafka
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
| @Component
public class EventStore {
private final KafkaTemplate<String, Event> kafkaTemplate;
public CompletableFuture<SendResult<String, Event>> saveEvent(String aggregateId, Event event) {
ProducerRecord<String, Event> record =
new ProducerRecord<>("events", aggregateId, event);
return kafkaTemplate.send(record);
}
@KafkaListener(topics = "events")
public void handleEvent(Event event, @Header("kafka_receivedMessageKey") String aggregateId) {
// Process event
}
}
|
CQRS with Spring Boot
1
2
3
4
5
6
7
8
9
10
11
12
13
| @RestController
public class OrderController {
@Autowired
private KafkaTemplate<String, OrderEvent> kafkaTemplate;
@PostMapping("/orders")
public ResponseEntity<String> createOrder(@RequestBody CreateOrderCommand command) {
OrderCreatedEvent event = new OrderCreatedEvent(command.getOrderId());
kafkaTemplate.send("order-events", command.getOrderId(), event);
return ResponseEntity.accepted().build();
}
}
|
Troubleshooting
1
2
3
4
5
6
7
8
9
10
11
12
| # Check cluster metadata (KRaft)
bin/kafka-metadata-shell.sh --snapshot /tmp/kraft-combined-logs/__cluster_metadata-0/00000000000000000000.log
# Performance testing
bin/kafka-producer-perf-test.sh --topic perf-test --num-records 1000000 \
--record-size 1024 --throughput 10000 --producer-props bootstrap.servers=localhost:9092
# Consumer lag monitoring
bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --group my-group --describe
# Log analysis
bin/kafka-dump-log.sh --files /tmp/kraft-combined-logs/my-topic-0/00000000000000000000.log --print-data-log
|
Tool | Type | Features | Best For |
---|
AKHQ | Web UI | Topics, consumers, schema registry | Development |
Kafdrop | Web UI | Lightweight, topic browsing | Quick debugging |
Confluent Control Center | Enterprise | Full monitoring, alerting | Production |
Kafka Tool | Desktop | GUI client, message inspection | Local development |
AKHQ Setup (Recommended)
1
2
3
4
5
6
7
8
9
10
11
12
| # Add to docker-compose.yml
akhq:
image: tchiotludo/akhq:latest
ports:
- "8080:8080"
environment:
AKHQ_CONFIGURATION: |
akhq:
connections:
docker-kafka-server:
properties:
bootstrap.servers: "kafka:29092"
|
CLI Shortcuts & Aliases
1
2
3
4
5
6
7
8
9
10
11
| # Add to ~/.bashrc or ~/.zshrc
alias kt='kafka-topics.sh --bootstrap-server localhost:9092'
alias kp='kafka-console-producer.sh --bootstrap-server localhost:9092'
alias kc='kafka-console-consumer.sh --bootstrap-server localhost:9092'
alias kcg='kafka-consumer-groups.sh --bootstrap-server localhost:9092'
# Usage examples
kt --list
kp --topic test
kc --topic test --from-beginning
kcg --group my-group --describe
|
Monitoring Dashboard Templates
Grafana Dashboard JSON
1
2
3
4
5
6
7
8
9
10
11
12
13
14
| {
"dashboard": {
"title": "Kafka Monitoring",
"panels": [
{
"title": "Messages per Second",
"type": "graph",
"targets": [{
"expr": "rate(kafka_server_brokertopicmetrics_messagesinpersec_total[5m])"
}]
}
]
}
}
|
π Real-World Examples
E-commerce Order Processing Pipeline
flowchart LR
UI[Web UI] --> API[Order API]
API --> K1[orders-created]
K1 --> INV[Inventory Service]
K1 --> PAY[Payment Service]
INV --> K2[inventory-updated]
PAY --> K3[payment-processed]
K2 --> FUL[Fulfillment]
K3 --> FUL
FUL --> K4[order-shipped]
K4 --> NOT[Notification Service]
Implementation
Spring Boot Order Service:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
| @RestController
public class OrderController {
@Autowired
private KafkaTemplate<String, OrderEvent> kafkaTemplate;
@PostMapping("/orders")
public ResponseEntity<String> createOrder(@RequestBody Order order) {
OrderCreatedEvent event = new OrderCreatedEvent(
order.getId(),
order.getCustomerId(),
order.getItems(),
Instant.now()
);
kafkaTemplate.send("orders-created", order.getId(), event);
return ResponseEntity.accepted().body(order.getId());
}
}
@KafkaListener(topics = "payment-processed")
public void handlePaymentProcessed(PaymentEvent event) {
// Update order status
orderService.markAsPaid(event.getOrderId());
// Trigger fulfillment
FulfillmentEvent fulfillmentEvent = new FulfillmentEvent(event.getOrderId());
kafkaTemplate.send("fulfillment-requested", event.getOrderId(), fulfillmentEvent);
}
|
IoT Sensor Data Pipeline
flowchart TD
S1[Temperature Sensors] --> K[Kafka]
S2[Humidity Sensors] --> K
S3[Motion Sensors] --> K
K --> KS[Kafka Streams]
KS --> A1[Real-time Alerts]
KS --> A2[Anomaly Detection]
KS --> DB[(Time Series DB)]
DB --> D[Dashboard]
Kafka Streams Processing
IoT Data Processing:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
| StreamBuilder builder = new StreamBuilder();
// Temperature anomaly detection
KStream<String, SensorReading> temperatureStream = builder
.stream("sensor-readings")
.filter((key, reading) -> "temperature".equals(reading.getType()));
KTable<Windowed<String>, Double> avgTemperature = temperatureStream
.groupByKey()
.windowedBy(TimeWindows.of(Duration.ofMinutes(5)))
.aggregate(
() -> new TemperatureAggregate(),
(key, reading, aggregate) -> aggregate.add(reading.getValue()),
Materialized.as("temperature-averages")
)
.mapValues(TemperatureAggregate::getAverage);
// Alert on high temperature
avgTemperature
.toStream()
.filter((window, avgTemp) -> avgTemp > 35.0)
.mapValues(temp -> new Alert("HIGH_TEMPERATURE", temp))
.to("temperature-alerts");
|
Microservices Event-Driven Architecture
C4Context
title Event-Driven Microservices with Kafka
Person(customer, "Customer")
System(ui, "Web UI")
System_Boundary(services, "Microservices") {
System(order, "Order Service")
System(inventory, "Inventory Service")
System(payment, "Payment Service")
System(notification, "Notification Service")
}
System(kafka, "Kafka Event Bus")
Rel(customer, ui, "Places order")
Rel(ui, order, "HTTP")
Rel(order, kafka, "Publishes events")
Rel(kafka, inventory, "Consumes events")
Rel(kafka, payment, "Consumes events")
Rel(kafka, notification, "Consumes events")
π Resources
Books & Courses
Conference Talks & Videos
Next Steps & Modern Ecosystem
This comprehensive guide reflects current Kafka best practices as of 2024. KRaft mode is now production-ready and recommended for all new deployments. π
Found this helpful? β Star the repository and share with your team!