Apache Kafka - Basic Usage Tutorial
Apache Kafka is a distributed event streaming platform capable of handling trillions of events a day. It is widely used for building real-time data pipelines, streaming applications, and event-driven architectures.
Use Kafka when you need to handle high-throughput, fault-tolerant, real-time data streaming between systems, build event-driven microservices, or process data streams in real-time.
1. What is Apache Kafka?
Apache Kafka is an open-source distributed event streaming platform that:
- Publishes and subscribes to streams of records (like a message queue)
- Stores streams of records durably and reliably
- Processes streams of records as they occur
Core Concepts
| Concept | Description |
|---|---|
| Producer | Applications that publish (write) data to Kafka topics |
| Consumer | Applications that subscribe to (read) data from topics |
| Topic | A category or feed name to which records are published |
| Partition | Topics are split into partitions for parallelism and scalability |
| Broker | Kafka server that stores data and serves clients |
| Cluster | A group of Kafka brokers working together |
| Offset | Unique identifier for each record within a partition |
| Consumer Group | Group of consumers that work together to consume a topic |
| ZooKeeper | Manages and coordinates Kafka brokers (being phased out with KRaft) |
Architecture Overview
┌─────────────────────────────────────────────────────────────┐
│ Kafka Cluster │
│ ┌──────────┐ ┌──────────┐ ┌──────────┐ │
│ │ Broker 1 │ │ Broker 2 │ │ Broker 3 │ │
│ └────┬─────┘ └────┬─────┘ └────┬─────┘ │
│ │ │ │ │
│ ┌────▼─────────────▼─────────────▼────┐ │
│ │ Topic: user-events │ │
│ │ Partition 0 | Partition 1 | Part 2 │ │
│ └──────────────────────────────────────┘ │
└─────────────────────────────────────────────────────────────┘
▲ │
│ ▼
┌─────┴─────┐ ┌────────┴────────┐
│ Producers │ │ Consumers │
│ (Write) │ │ (Read) │
└───────────┘ └─────────────────┘
2. Installation
Using Docker (Recommended for Development)
The easiest way to get started with Kafka is using Docker:
- Docker Compose
- Manual Installation
Create a docker-compose.yml file:
version: '3.8'
services:
zookeeper:
image: confluentinc/cp-zookeeper:7.5.0
hostname: zookeeper
container_name: zookeeper
ports:
- "2181:2181"
environment:
ZOOKEEPER_CLIENT_PORT: 2181
ZOOKEEPER_TICK_TIME: 2000
volumes:
- zookeeper-data:/var/lib/zookeeper/data
- zookeeper-logs:/var/lib/zookeeper/log
kafka:
image: confluentinc/cp-kafka:7.5.0
hostname: kafka
container_name: kafka
depends_on:
- zookeeper
ports:
- "9092:9092"
- "9094:9094"
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181'
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:29092,PLAINTEXT_HOST://localhost:9092
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
KAFKA_AUTO_CREATE_TOPICS_ENABLE: 'true'
volumes:
- kafka-data:/var/lib/kafka/data
volumes:
zookeeper-data:
zookeeper-logs:
kafka-data:
Start Kafka:
docker-compose up -d
Verify it's running:
docker-compose ps
docker logs kafka
On Linux/macOS:
# Download Kafka
wget https://downloads.apache.org/kafka/3.6.0/kafka_2.13-3.6.0.tgz
tar -xzf kafka_2.13-3.6.0.tgz
cd kafka_2.13-3.6.0
# Start ZooKeeper
bin/zookeeper-server-start.sh config/zookeeper.properties
# In another terminal, start Kafka
bin/kafka-server-start.sh config/server.properties
On Windows:
# Use Windows commands (.bat instead of .sh)
bin\windows\zookeeper-server-start.bat config\zookeeper.properties
bin\windows\kafka-server-start.bat config\server.properties
Using KRaft (ZooKeeper-less)
Kafka 3.x+ supports KRaft mode (no ZooKeeper needed):
version: '3.8'
services:
kafka:
image: confluentinc/cp-kafka:7.5.0
hostname: kafka
container_name: kafka-kraft
ports:
- "9092:9092"
environment:
KAFKA_NODE_ID: 1
KAFKA_PROCESS_ROLES: 'broker,controller'
KAFKA_CONTROLLER_QUORUM_VOTERS: '1@kafka:29093'
KAFKA_LISTENERS: 'PLAINTEXT://kafka:29092,CONTROLLER://kafka:29093,PLAINTEXT_HOST://0.0.0.0:9092'
KAFKA_ADVERTISED_LISTENERS: 'PLAINTEXT://kafka:29092,PLAINTEXT_HOST://localhost:9092'
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: 'CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT'
KAFKA_CONTROLLER_LISTENER_NAMES: 'CONTROLLER'
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
CLUSTER_ID: 'MkU3OEVBNTcwNTJENDM2Qk'
volumes:
- kafka-kraft-data:/var/lib/kafka/data
volumes:
kafka-kraft-data:
3. Basic Operations
Creating Topics
- Command Line
- Python (kafka-python)
# Using Docker
docker exec -it kafka kafka-topics --create \
--topic user-events \
--bootstrap-server localhost:9092 \
--partitions 3 \
--replication-factor 1
# List topics
docker exec -it kafka kafka-topics --list \
--bootstrap-server localhost:9092
# Describe a topic
docker exec -it kafka kafka-topics --describe \
--topic user-events \
--bootstrap-server localhost:9092
from kafka.admin import KafkaAdminClient, NewTopic
admin_client = KafkaAdminClient(
bootstrap_servers=['localhost:9092'],
client_id='admin-client'
)
# Create topic
topic = NewTopic(
name='user-events',
num_partitions=3,
replication_factor=1
)
try:
admin_client.create_topics(new_topics=[topic], validate_only=False)
print("Topic created successfully")
except Exception as e:
print(f"Failed to create topic: {e}")
# List topics
topics = admin_client.list_topics()
print(f"Available topics: {topics}")
admin_client.close()
Producing Messages
- Command Line
- Python Producer
- Java Producer
# Console producer
docker exec -it kafka kafka-console-producer \
--topic user-events \
--bootstrap-server localhost:9092
# Type messages (one per line)
# {"user_id": 1, "action": "login", "timestamp": "2024-01-01T10:00:00Z"}
# {"user_id": 2, "action": "purchase", "timestamp": "2024-01-01T10:05:00Z"}
# Press Ctrl+C to exit
from kafka import KafkaProducer
import json
import time
from datetime import datetime
# Create producer
producer = KafkaProducer(
bootstrap_servers=['localhost:9092'],
value_serializer=lambda v: json.dumps(v).encode('utf-8'),
key_serializer=lambda k: k.encode('utf-8') if k else None
)
# Send messages
events = [
{"user_id": 1, "action": "login", "timestamp": datetime.now().isoformat()},
{"user_id": 2, "action": "purchase", "amount": 99.99, "timestamp": datetime.now().isoformat()},
{"user_id": 1, "action": "logout", "timestamp": datetime.now().isoformat()},
]
for event in events:
# key is optional but useful for partitioning
key = f"user-{event['user_id']}"
future = producer.send(
topic='user-events',
key=key,
value=event
)
# Wait for acknowledgment
try:
record_metadata = future.get(timeout=10)
print(f"Sent: {event}")
print(f" Topic: {record_metadata.topic}")
print(f" Partition: {record_metadata.partition}")
print(f" Offset: {record_metadata.offset}")
except Exception as e:
print(f"Failed to send message: {e}")
time.sleep(1)
# Ensure all messages are sent
producer.flush()
producer.close()
print("Producer closed")
import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.serialization.StringSerializer;
import java.util.Properties;
public class KafkaProducerExample {
public static void main(String[] args) {
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
props.put(ProducerConfig.ACKS_CONFIG, "all");
KafkaProducer<String, String> producer = new KafkaProducer<>(props);
for (int i = 0; i < 10; i++) {
String key = "user-" + i;
String value = String.format("{\"user_id\": %d, \"action\": \"event-%d\"}", i, i);
ProducerRecord<String, String> record =
new ProducerRecord<>("user-events", key, value);
producer.send(record, (metadata, exception) -> {
if (exception == null) {
System.out.printf("Sent record to partition %d with offset %d%n",
metadata.partition(), metadata.offset());
} else {
exception.printStackTrace();
}
});
}
producer.flush();
producer.close();
}
}
Consuming Messages
- Command Line
- Python Consumer
- Java Consumer
# Console consumer (read from beginning)
docker exec -it kafka kafka-console-consumer \
--topic user-events \
--from-beginning \
--bootstrap-server localhost:9092
# With key and timestamp
docker exec -it kafka kafka-console-consumer \
--topic user-events \
--from-beginning \
--bootstrap-server localhost:9092 \
--property print.key=true \
--property print.timestamp=true
from kafka import KafkaConsumer
import json
# Create consumer
consumer = KafkaConsumer(
'user-events',
bootstrap_servers=['localhost:9092'],
auto_offset_reset='earliest', # Start from the beginning
enable_auto_commit=True,
group_id='user-events-group',
value_deserializer=lambda m: json.loads(m.decode('utf-8')),
key_deserializer=lambda k: k.decode('utf-8') if k else None
)
print("Waiting for messages...")
try:
for message in consumer:
print(f"\n--- New Message ---")
print(f"Topic: {message.topic}")
print(f"Partition: {message.partition}")
print(f"Offset: {message.offset}")
print(f"Key: {message.key}")
print(f"Value: {message.value}")
print(f"Timestamp: {message.timestamp}")
# Process the message
event = message.value
print(f"Processing event: {event['action']} for user {event['user_id']}")
except KeyboardInterrupt:
print("\nShutting down consumer...")
finally:
consumer.close()
print("Consumer closed")
import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.common.serialization.StringDeserializer;
import java.time.Duration;
import java.util.Collections;
import java.util.Properties;
public class KafkaConsumerExample {
public static void main(String[] args) {
Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ConsumerConfig.GROUP_ID_CONFIG, "user-events-group");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Collections.singletonList("user-events"));
try {
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records) {
System.out.printf("Received message: key=%s, value=%s, partition=%d, offset=%d%n",
record.key(), record.value(), record.partition(), record.offset());
}
}
} finally {
consumer.close();
}
}
}
4. Consumer Groups
Consumer groups allow multiple consumers to work together to consume a topic:
from kafka import KafkaConsumer
import json
import sys
def create_consumer(group_id, consumer_id):
"""Create a consumer that's part of a consumer group"""
consumer = KafkaConsumer(
'user-events',
bootstrap_servers=['localhost:9092'],
auto_offset_reset='earliest',
enable_auto_commit=True,
group_id=group_id,
client_id=consumer_id,
value_deserializer=lambda m: json.loads(m.decode('utf-8'))
)
return consumer
if __name__ == "__main__":
consumer_id = sys.argv[1] if len(sys.argv) > 1 else "consumer-1"
consumer = create_consumer('user-events-group', consumer_id)
print(f"Consumer {consumer_id} started...")
try:
for message in consumer:
print(f"[{consumer_id}] Partition {message.partition}, Offset {message.offset}: {message.value}")
except KeyboardInterrupt:
pass
finally:
consumer.close()
Run multiple consumers in different terminals:
# Terminal 1
python consumer_group.py consumer-1
# Terminal 2
python consumer_group.py consumer-2
# Terminal 3
python consumer_group.py consumer-3
Each consumer in the group will handle different partitions, enabling parallel processing.
5. Advanced Producer Configuration
from kafka import KafkaProducer
import json
producer = KafkaProducer(
bootstrap_servers=['localhost:9092'],
# Serialization
value_serializer=lambda v: json.dumps(v).encode('utf-8'),
key_serializer=lambda k: k.encode('utf-8'),
# Delivery guarantees
acks='all', # Wait for all replicas to acknowledge (strongest guarantee)
# acks=1 # Wait for leader only
# acks=0 # Don't wait (fastest but least safe)
# Retries and timeouts
retries=3,
max_in_flight_requests_per_connection=5,
request_timeout_ms=30000,
# Batching for performance
batch_size=16384, # 16KB
linger_ms=10, # Wait up to 10ms to batch messages
# Compression
compression_type='gzip', # Options: gzip, snappy, lz4, zstd
# Buffer settings
buffer_memory=33554432, # 32MB
max_block_ms=60000,
)
# Send with callback
def on_send_success(record_metadata):
print(f"Message sent to {record_metadata.topic} [{record_metadata.partition}] @ {record_metadata.offset}")
def on_send_error(exc):
print(f"Error sending message: {exc}")
producer.send('user-events', {'event': 'test'}).add_callback(on_send_success).add_errback(on_send_error)
producer.flush()
producer.close()
6. Advanced Consumer Configuration
from kafka import KafkaConsumer, TopicPartition
import json
consumer = KafkaConsumer(
bootstrap_servers=['localhost:9092'],
# Consumer group
group_id='advanced-group',
client_id='consumer-1',
# Offset management
auto_offset_reset='earliest', # Options: earliest, latest, none
enable_auto_commit=False, # Manual commit for better control
# Deserialization
value_deserializer=lambda m: json.loads(m.decode('utf-8')),
# Performance
fetch_min_bytes=1,
fetch_max_wait_ms=500,
max_partition_fetch_bytes=1048576, # 1MB
# Session management
session_timeout_ms=10000,
heartbeat_interval_ms=3000,
max_poll_interval_ms=300000,
max_poll_records=500,
)
# Subscribe to topic
consumer.subscribe(['user-events'])
# Or assign specific partitions
# consumer.assign([TopicPartition('user-events', 0), TopicPartition('user-events', 1)])
try:
for message in consumer:
print(f"Processing: {message.value}")
# Process message...
# Manual commit after processing
consumer.commit()
except KeyboardInterrupt:
pass
finally:
consumer.close()
7. Error Handling and Resilience
from kafka import KafkaConsumer
from kafka.errors import KafkaError
import json
import time
import logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
def create_consumer():
"""Create consumer with retry logic"""
max_retries = 5
retry_delay = 5
for attempt in range(max_retries):
try:
consumer = KafkaConsumer(
'user-events',
bootstrap_servers=['localhost:9092'],
auto_offset_reset='earliest',
enable_auto_commit=False,
group_id='resilient-group',
value_deserializer=lambda m: json.loads(m.decode('utf-8'))
)
logger.info("Consumer created successfully")
return consumer
except KafkaError as e:
logger.error(f"Failed to create consumer (attempt {attempt + 1}/{max_retries}): {e}")
if attempt < max_retries - 1:
time.sleep(retry_delay)
else:
raise
def process_message(message):
"""Process a message with error handling"""
try:
event = message.value
logger.info(f"Processing: {event}")
# Your business logic here
# If processing fails, raise an exception
return True
except Exception as e:
logger.error(f"Error processing message: {e}")
return False
def main():
consumer = create_consumer()
try:
while True:
try:
records = consumer.poll(timeout_ms=1000)
for topic_partition, messages in records.items():
for message in messages:
if process_message(message):
# Only commit if processing succeeded
consumer.commit()
else:
# Handle failed message (could send to DLQ, retry, etc.)
logger.warning(f"Failed to process message at offset {message.offset}")
except KafkaError as e:
logger.error(f"Kafka error: {e}")
time.sleep(5) # Wait before retrying
except KeyboardInterrupt:
logger.info("Shutting down...")
finally:
consumer.close()
logger.info("Consumer closed")
if __name__ == "__main__":
main()
8. Common Use Cases
Event Sourcing
from kafka import KafkaProducer, KafkaConsumer
import json
from datetime import datetime
from enum import Enum
class EventType(Enum):
USER_CREATED = "USER_CREATED"
USER_UPDATED = "USER_UPDATED"
USER_DELETED = "USER_DELETED"
class EventStore:
def __init__(self, bootstrap_servers):
self.producer = KafkaProducer(
bootstrap_servers=bootstrap_servers,
value_serializer=lambda v: json.dumps(v).encode('utf-8')
)
def publish_event(self, aggregate_id, event_type, data):
"""Publish a domain event"""
event = {
'aggregate_id': aggregate_id,
'event_type': event_type.value,
'data': data,
'timestamp': datetime.now().isoformat(),
'version': 1
}
self.producer.send(
'user-events',
key=aggregate_id.encode('utf-8'),
value=event
)
self.producer.flush()
def close(self):
self.producer.close()
# Usage
event_store = EventStore(['localhost:9092'])
# Publish events
event_store.publish_event(
aggregate_id='user-123',
event_type=EventType.USER_CREATED,
data={'name': 'John Doe', 'email': 'john@example.com'}
)
event_store.publish_event(
aggregate_id='user-123',
event_type=EventType.USER_UPDATED,
data={'name': 'John Smith'}
)
event_store.close()
Stream Processing
from kafka import KafkaConsumer, KafkaProducer
import json
from collections import defaultdict
from datetime import datetime, timedelta
class SimpleStreamProcessor:
"""Count events by user in 1-minute windows"""
def __init__(self):
self.consumer = KafkaConsumer(
'user-events',
bootstrap_servers=['localhost:9092'],
auto_offset_reset='earliest',
group_id='stream-processor',
value_deserializer=lambda m: json.loads(m.decode('utf-8'))
)
self.producer = KafkaProducer(
bootstrap_servers=['localhost:9092'],
value_serializer=lambda v: json.dumps(v).encode('utf-8')
)
self.window_counts = defaultdict(int)
self.current_window = None
def get_window_key(self, timestamp):
"""Get 1-minute window key for timestamp"""
dt = datetime.fromisoformat(timestamp)
window = dt.replace(second=0, microsecond=0)
return window.isoformat()
def process(self):
"""Process incoming events"""
for message in self.consumer:
event = message.value
user_id = event['user_id']
timestamp = event['timestamp']
window_key = self.get_window_key(timestamp)
count_key = f"{window_key}:{user_id}"
self.window_counts[count_key] += 1
# Publish aggregated result
result = {
'window': window_key,
'user_id': user_id,
'event_count': self.window_counts[count_key],
'processed_at': datetime.now().isoformat()
}
self.producer.send('user-event-counts', value=result)
def close(self):
self.consumer.close()
self.producer.close()
# Usage
processor = SimpleStreamProcessor()
try:
processor.process()
except KeyboardInterrupt:
processor.close()
CDC (Change Data Capture)
"""
Simulate CDC pattern: capture database changes and publish to Kafka
"""
from kafka import KafkaProducer
import json
from datetime import datetime
class CDCPublisher:
def __init__(self, bootstrap_servers):
self.producer = KafkaProducer(
bootstrap_servers=bootstrap_servers,
value_serializer=lambda v: json.dumps(v).encode('utf-8')
)
def publish_change(self, table, operation, before, after):
"""
Publish database change event
Args:
table: Database table name
operation: INSERT, UPDATE, DELETE
before: Record state before change (None for INSERT)
after: Record state after change (None for DELETE)
"""
change_event = {
'table': table,
'operation': operation,
'before': before,
'after': after,
'timestamp': datetime.now().isoformat()
}
topic = f'cdc.{table}'
key = str(after.get('id') or before.get('id')).encode('utf-8')
self.producer.send(topic, key=key, value=change_event)
self.producer.flush()
def close(self):
self.producer.close()
# Usage example
cdc = CDCPublisher(['localhost:9092'])
# INSERT
cdc.publish_change(
table='users',
operation='INSERT',
before=None,
after={'id': 1, 'name': 'Alice', 'email': 'alice@example.com'}
)
# UPDATE
cdc.publish_change(
table='users',
operation='UPDATE',
before={'id': 1, 'name': 'Alice', 'email': 'alice@example.com'},
after={'id': 1, 'name': 'Alice Smith', 'email': 'alice@example.com'}
)
# DELETE
cdc.publish_change(
table='users',
operation='DELETE',
before={'id': 1, 'name': 'Alice Smith', 'email': 'alice@example.com'},
after=None
)
cdc.close()
9. Monitoring and Management
Checking Consumer Lag
# Check consumer group status
docker exec -it kafka kafka-consumer-groups \
--bootstrap-server localhost:9092 \
--describe \
--group user-events-group
# Reset consumer group offset
docker exec -it kafka kafka-consumer-groups \
--bootstrap-server localhost:9092 \
--group user-events-group \
--reset-offsets \
--to-earliest \
--topic user-events \
--execute
Python Monitoring
from kafka import KafkaAdminClient, KafkaConsumer
from kafka.structs import TopicPartition
def get_consumer_lag(bootstrap_servers, group_id, topic):
"""Calculate consumer lag for a consumer group"""
consumer = KafkaConsumer(
bootstrap_servers=bootstrap_servers,
group_id=group_id
)
# Get partitions for topic
partitions = consumer.partitions_for_topic(topic)
if not partitions:
print(f"Topic {topic} not found")
return
lag_info = {}
for partition in partitions:
tp = TopicPartition(topic, partition)
# Get committed offset
committed = consumer.committed(tp)
committed_offset = committed if committed is not None else 0
# Get end offset (latest)
consumer.assign([tp])
consumer.seek_to_end(tp)
end_offset = consumer.position(tp)
# Calculate lag
lag = end_offset - committed_offset
lag_info[partition] = {
'committed_offset': committed_offset,
'end_offset': end_offset,
'lag': lag
}
consumer.close()
return lag_info
# Usage
lag = get_consumer_lag(['localhost:9092'], 'user-events-group', 'user-events')
print(json.dumps(lag, indent=2))
10. Best Practices
1. Topic Design
- Use meaningful topic names (e.g.,
user.events,order.created) - One topic per event type or bounded context
- Consider retention policies based on use case
- Plan partition count based on expected throughput
2. Partitioning Strategy
# Good: Partition by user_id for ordering guarantees per user
producer.send('user-events', key=f"user-{user_id}", value=event)
# Bad: Random partitioning loses ordering
producer.send('user-events', value=event)
3. Message Schema
# Good: Include versioning and metadata
message = {
'schema_version': '1.0',
'event_id': str(uuid.uuid4()),
'timestamp': datetime.now().isoformat(),
'event_type': 'user.created',
'data': {
'user_id': 123,
'name': 'John Doe'
}
}
# Consider using Schema Registry for production
# with Avro, Protobuf, or JSON Schema
4. Error Handling
- Implement Dead Letter Queues (DLQ) for failed messages
- Use idempotent producers to avoid duplicates
- Enable exactly-once semantics when needed
- Set appropriate timeouts and retries
5. Performance Tuning
# Producer optimization
producer = KafkaProducer(
batch_size=32768, # Larger batches
linger_ms=20, # Wait longer to fill batches
compression_type='lz4', # Use compression
acks='all', # Ensure durability
max_in_flight_requests_per_connection=5
)
# Consumer optimization
consumer = KafkaConsumer(
fetch_min_bytes=50000, # Fetch larger batches
max_poll_records=500, # Process more records per poll
max_partition_fetch_bytes=1048576 # 1MB per partition
)
11. Common Pitfalls and Solutions
| Issue | Cause | Solution |
|---|---|---|
| Consumer lag increasing | Slow processing | Scale consumers, optimize processing, increase partitions |
| Duplicate messages | At-least-once delivery | Implement idempotent processing, use exactly-once semantics |
| Lost messages | Producer not waiting for ack | Set acks='all', increase retries |
| Rebalancing storms | Consumers taking too long | Increase max.poll.interval.ms, optimize processing |
| Out of order messages | Multiple partitions | Use single partition or partition by key |
| Memory issues | Large batches | Tune fetch.max.bytes, max.partition.fetch.bytes |
12. Production Checklist
Before deploying to production:
- ✅ Set up proper replication (min 3 brokers)
- ✅ Configure retention policies
- ✅ Implement monitoring (Prometheus + Grafana)
- ✅ Set up alerts for consumer lag
- ✅ Use Schema Registry for schema evolution
- ✅ Implement proper error handling and DLQ
- ✅ Configure SSL/TLS and SASL authentication
- ✅ Set up backup and disaster recovery
- ✅ Test failover scenarios
- ✅ Document topic naming conventions
13. Next Steps
- Explore Kafka Streams for advanced stream processing
- Learn Kafka Connect for integrating with databases and systems
- Implement Schema Registry for schema management
- Study KSQL for SQL-like stream processing
- Investigate Kafka security (SSL, SASL, ACLs)
- Set up monitoring with Prometheus and Grafana
14. Additional Resources
- Official Kafka Documentation
- Confluent Documentation
- kafka-python Documentation
- Kafka: The Definitive Guide (Book)
Apache Kafka is a powerful platform for building real-time data pipelines and streaming applications. Start with simple producer-consumer patterns, understand partitioning and consumer groups, and gradually adopt advanced features like stream processing and exactly-once semantics as your needs grow.