Skip to main content

Apache Kafka - Basic Usage Tutorial

Apache Kafka is a distributed event streaming platform capable of handling trillions of events a day. It is widely used for building real-time data pipelines, streaming applications, and event-driven architectures.

When to Use Kafka

Use Kafka when you need to handle high-throughput, fault-tolerant, real-time data streaming between systems, build event-driven microservices, or process data streams in real-time.

1. What is Apache Kafka?

Apache Kafka is an open-source distributed event streaming platform that:

  • Publishes and subscribes to streams of records (like a message queue)
  • Stores streams of records durably and reliably
  • Processes streams of records as they occur

Core Concepts

ConceptDescription
ProducerApplications that publish (write) data to Kafka topics
ConsumerApplications that subscribe to (read) data from topics
TopicA category or feed name to which records are published
PartitionTopics are split into partitions for parallelism and scalability
BrokerKafka server that stores data and serves clients
ClusterA group of Kafka brokers working together
OffsetUnique identifier for each record within a partition
Consumer GroupGroup of consumers that work together to consume a topic
ZooKeeperManages and coordinates Kafka brokers (being phased out with KRaft)

Architecture Overview

┌─────────────────────────────────────────────────────────────┐
│ Kafka Cluster │
│ ┌──────────┐ ┌──────────┐ ┌──────────┐ │
│ │ Broker 1 │ │ Broker 2 │ │ Broker 3 │ │
│ └────┬─────┘ └────┬─────┘ └────┬─────┘ │
│ │ │ │ │
│ ┌────▼─────────────▼─────────────▼────┐ │
│ │ Topic: user-events │ │
│ │ Partition 0 | Partition 1 | Part 2 │ │
│ └──────────────────────────────────────┘ │
└─────────────────────────────────────────────────────────────┘
▲ │
│ ▼
┌─────┴─────┐ ┌────────┴────────┐
│ Producers │ │ Consumers │
│ (Write) │ │ (Read) │
└───────────┘ └─────────────────┘

2. Installation

The easiest way to get started with Kafka is using Docker:

Create a docker-compose.yml file:

docker-compose.yml
version: '3.8'

services:
zookeeper:
image: confluentinc/cp-zookeeper:7.5.0
hostname: zookeeper
container_name: zookeeper
ports:
- "2181:2181"
environment:
ZOOKEEPER_CLIENT_PORT: 2181
ZOOKEEPER_TICK_TIME: 2000
volumes:
- zookeeper-data:/var/lib/zookeeper/data
- zookeeper-logs:/var/lib/zookeeper/log

kafka:
image: confluentinc/cp-kafka:7.5.0
hostname: kafka
container_name: kafka
depends_on:
- zookeeper
ports:
- "9092:9092"
- "9094:9094"
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181'
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:29092,PLAINTEXT_HOST://localhost:9092
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
KAFKA_AUTO_CREATE_TOPICS_ENABLE: 'true'
volumes:
- kafka-data:/var/lib/kafka/data

volumes:
zookeeper-data:
zookeeper-logs:
kafka-data:

Start Kafka:

docker-compose up -d

Verify it's running:

docker-compose ps
docker logs kafka

Using KRaft (ZooKeeper-less)

Kafka 3.x+ supports KRaft mode (no ZooKeeper needed):

docker-compose-kraft.yml
version: '3.8'

services:
kafka:
image: confluentinc/cp-kafka:7.5.0
hostname: kafka
container_name: kafka-kraft
ports:
- "9092:9092"
environment:
KAFKA_NODE_ID: 1
KAFKA_PROCESS_ROLES: 'broker,controller'
KAFKA_CONTROLLER_QUORUM_VOTERS: '1@kafka:29093'
KAFKA_LISTENERS: 'PLAINTEXT://kafka:29092,CONTROLLER://kafka:29093,PLAINTEXT_HOST://0.0.0.0:9092'
KAFKA_ADVERTISED_LISTENERS: 'PLAINTEXT://kafka:29092,PLAINTEXT_HOST://localhost:9092'
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: 'CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT'
KAFKA_CONTROLLER_LISTENER_NAMES: 'CONTROLLER'
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
CLUSTER_ID: 'MkU3OEVBNTcwNTJENDM2Qk'
volumes:
- kafka-kraft-data:/var/lib/kafka/data

volumes:
kafka-kraft-data:

3. Basic Operations

Creating Topics

# Using Docker
docker exec -it kafka kafka-topics --create \
--topic user-events \
--bootstrap-server localhost:9092 \
--partitions 3 \
--replication-factor 1

# List topics
docker exec -it kafka kafka-topics --list \
--bootstrap-server localhost:9092

# Describe a topic
docker exec -it kafka kafka-topics --describe \
--topic user-events \
--bootstrap-server localhost:9092

Producing Messages

# Console producer
docker exec -it kafka kafka-console-producer \
--topic user-events \
--bootstrap-server localhost:9092

# Type messages (one per line)
# {"user_id": 1, "action": "login", "timestamp": "2024-01-01T10:00:00Z"}
# {"user_id": 2, "action": "purchase", "timestamp": "2024-01-01T10:05:00Z"}
# Press Ctrl+C to exit

Consuming Messages

# Console consumer (read from beginning)
docker exec -it kafka kafka-console-consumer \
--topic user-events \
--from-beginning \
--bootstrap-server localhost:9092

# With key and timestamp
docker exec -it kafka kafka-console-consumer \
--topic user-events \
--from-beginning \
--bootstrap-server localhost:9092 \
--property print.key=true \
--property print.timestamp=true

4. Consumer Groups

Consumer groups allow multiple consumers to work together to consume a topic:

consumer_group.py
from kafka import KafkaConsumer
import json
import sys

def create_consumer(group_id, consumer_id):
"""Create a consumer that's part of a consumer group"""
consumer = KafkaConsumer(
'user-events',
bootstrap_servers=['localhost:9092'],
auto_offset_reset='earliest',
enable_auto_commit=True,
group_id=group_id,
client_id=consumer_id,
value_deserializer=lambda m: json.loads(m.decode('utf-8'))
)
return consumer

if __name__ == "__main__":
consumer_id = sys.argv[1] if len(sys.argv) > 1 else "consumer-1"
consumer = create_consumer('user-events-group', consumer_id)

print(f"Consumer {consumer_id} started...")

try:
for message in consumer:
print(f"[{consumer_id}] Partition {message.partition}, Offset {message.offset}: {message.value}")
except KeyboardInterrupt:
pass
finally:
consumer.close()

Run multiple consumers in different terminals:

# Terminal 1
python consumer_group.py consumer-1

# Terminal 2
python consumer_group.py consumer-2

# Terminal 3
python consumer_group.py consumer-3

Each consumer in the group will handle different partitions, enabling parallel processing.

5. Advanced Producer Configuration

advanced_producer.py
from kafka import KafkaProducer
import json

producer = KafkaProducer(
bootstrap_servers=['localhost:9092'],

# Serialization
value_serializer=lambda v: json.dumps(v).encode('utf-8'),
key_serializer=lambda k: k.encode('utf-8'),

# Delivery guarantees
acks='all', # Wait for all replicas to acknowledge (strongest guarantee)
# acks=1 # Wait for leader only
# acks=0 # Don't wait (fastest but least safe)

# Retries and timeouts
retries=3,
max_in_flight_requests_per_connection=5,
request_timeout_ms=30000,

# Batching for performance
batch_size=16384, # 16KB
linger_ms=10, # Wait up to 10ms to batch messages

# Compression
compression_type='gzip', # Options: gzip, snappy, lz4, zstd

# Buffer settings
buffer_memory=33554432, # 32MB
max_block_ms=60000,
)

# Send with callback
def on_send_success(record_metadata):
print(f"Message sent to {record_metadata.topic} [{record_metadata.partition}] @ {record_metadata.offset}")

def on_send_error(exc):
print(f"Error sending message: {exc}")

producer.send('user-events', {'event': 'test'}).add_callback(on_send_success).add_errback(on_send_error)

producer.flush()
producer.close()

6. Advanced Consumer Configuration

advanced_consumer.py
from kafka import KafkaConsumer, TopicPartition
import json

consumer = KafkaConsumer(
bootstrap_servers=['localhost:9092'],

# Consumer group
group_id='advanced-group',
client_id='consumer-1',

# Offset management
auto_offset_reset='earliest', # Options: earliest, latest, none
enable_auto_commit=False, # Manual commit for better control

# Deserialization
value_deserializer=lambda m: json.loads(m.decode('utf-8')),

# Performance
fetch_min_bytes=1,
fetch_max_wait_ms=500,
max_partition_fetch_bytes=1048576, # 1MB

# Session management
session_timeout_ms=10000,
heartbeat_interval_ms=3000,
max_poll_interval_ms=300000,
max_poll_records=500,
)

# Subscribe to topic
consumer.subscribe(['user-events'])

# Or assign specific partitions
# consumer.assign([TopicPartition('user-events', 0), TopicPartition('user-events', 1)])

try:
for message in consumer:
print(f"Processing: {message.value}")

# Process message...

# Manual commit after processing
consumer.commit()

except KeyboardInterrupt:
pass
finally:
consumer.close()

7. Error Handling and Resilience

resilient_consumer.py
from kafka import KafkaConsumer
from kafka.errors import KafkaError
import json
import time
import logging

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

def create_consumer():
"""Create consumer with retry logic"""
max_retries = 5
retry_delay = 5

for attempt in range(max_retries):
try:
consumer = KafkaConsumer(
'user-events',
bootstrap_servers=['localhost:9092'],
auto_offset_reset='earliest',
enable_auto_commit=False,
group_id='resilient-group',
value_deserializer=lambda m: json.loads(m.decode('utf-8'))
)
logger.info("Consumer created successfully")
return consumer
except KafkaError as e:
logger.error(f"Failed to create consumer (attempt {attempt + 1}/{max_retries}): {e}")
if attempt < max_retries - 1:
time.sleep(retry_delay)
else:
raise

def process_message(message):
"""Process a message with error handling"""
try:
event = message.value
logger.info(f"Processing: {event}")

# Your business logic here
# If processing fails, raise an exception

return True
except Exception as e:
logger.error(f"Error processing message: {e}")
return False

def main():
consumer = create_consumer()

try:
while True:
try:
records = consumer.poll(timeout_ms=1000)

for topic_partition, messages in records.items():
for message in messages:
if process_message(message):
# Only commit if processing succeeded
consumer.commit()
else:
# Handle failed message (could send to DLQ, retry, etc.)
logger.warning(f"Failed to process message at offset {message.offset}")

except KafkaError as e:
logger.error(f"Kafka error: {e}")
time.sleep(5) # Wait before retrying

except KeyboardInterrupt:
logger.info("Shutting down...")
finally:
consumer.close()
logger.info("Consumer closed")

if __name__ == "__main__":
main()

8. Common Use Cases

Event Sourcing

event_sourcing.py
from kafka import KafkaProducer, KafkaConsumer
import json
from datetime import datetime
from enum import Enum

class EventType(Enum):
USER_CREATED = "USER_CREATED"
USER_UPDATED = "USER_UPDATED"
USER_DELETED = "USER_DELETED"

class EventStore:
def __init__(self, bootstrap_servers):
self.producer = KafkaProducer(
bootstrap_servers=bootstrap_servers,
value_serializer=lambda v: json.dumps(v).encode('utf-8')
)

def publish_event(self, aggregate_id, event_type, data):
"""Publish a domain event"""
event = {
'aggregate_id': aggregate_id,
'event_type': event_type.value,
'data': data,
'timestamp': datetime.now().isoformat(),
'version': 1
}

self.producer.send(
'user-events',
key=aggregate_id.encode('utf-8'),
value=event
)
self.producer.flush()

def close(self):
self.producer.close()

# Usage
event_store = EventStore(['localhost:9092'])

# Publish events
event_store.publish_event(
aggregate_id='user-123',
event_type=EventType.USER_CREATED,
data={'name': 'John Doe', 'email': 'john@example.com'}
)

event_store.publish_event(
aggregate_id='user-123',
event_type=EventType.USER_UPDATED,
data={'name': 'John Smith'}
)

event_store.close()

Stream Processing

stream_processing.py
from kafka import KafkaConsumer, KafkaProducer
import json
from collections import defaultdict
from datetime import datetime, timedelta

class SimpleStreamProcessor:
"""Count events by user in 1-minute windows"""

def __init__(self):
self.consumer = KafkaConsumer(
'user-events',
bootstrap_servers=['localhost:9092'],
auto_offset_reset='earliest',
group_id='stream-processor',
value_deserializer=lambda m: json.loads(m.decode('utf-8'))
)

self.producer = KafkaProducer(
bootstrap_servers=['localhost:9092'],
value_serializer=lambda v: json.dumps(v).encode('utf-8')
)

self.window_counts = defaultdict(int)
self.current_window = None

def get_window_key(self, timestamp):
"""Get 1-minute window key for timestamp"""
dt = datetime.fromisoformat(timestamp)
window = dt.replace(second=0, microsecond=0)
return window.isoformat()

def process(self):
"""Process incoming events"""
for message in self.consumer:
event = message.value
user_id = event['user_id']
timestamp = event['timestamp']

window_key = self.get_window_key(timestamp)
count_key = f"{window_key}:{user_id}"

self.window_counts[count_key] += 1

# Publish aggregated result
result = {
'window': window_key,
'user_id': user_id,
'event_count': self.window_counts[count_key],
'processed_at': datetime.now().isoformat()
}

self.producer.send('user-event-counts', value=result)

def close(self):
self.consumer.close()
self.producer.close()

# Usage
processor = SimpleStreamProcessor()
try:
processor.process()
except KeyboardInterrupt:
processor.close()

CDC (Change Data Capture)

cdc_example.py
"""
Simulate CDC pattern: capture database changes and publish to Kafka
"""
from kafka import KafkaProducer
import json
from datetime import datetime

class CDCPublisher:
def __init__(self, bootstrap_servers):
self.producer = KafkaProducer(
bootstrap_servers=bootstrap_servers,
value_serializer=lambda v: json.dumps(v).encode('utf-8')
)

def publish_change(self, table, operation, before, after):
"""
Publish database change event

Args:
table: Database table name
operation: INSERT, UPDATE, DELETE
before: Record state before change (None for INSERT)
after: Record state after change (None for DELETE)
"""
change_event = {
'table': table,
'operation': operation,
'before': before,
'after': after,
'timestamp': datetime.now().isoformat()
}

topic = f'cdc.{table}'
key = str(after.get('id') or before.get('id')).encode('utf-8')

self.producer.send(topic, key=key, value=change_event)
self.producer.flush()

def close(self):
self.producer.close()

# Usage example
cdc = CDCPublisher(['localhost:9092'])

# INSERT
cdc.publish_change(
table='users',
operation='INSERT',
before=None,
after={'id': 1, 'name': 'Alice', 'email': 'alice@example.com'}
)

# UPDATE
cdc.publish_change(
table='users',
operation='UPDATE',
before={'id': 1, 'name': 'Alice', 'email': 'alice@example.com'},
after={'id': 1, 'name': 'Alice Smith', 'email': 'alice@example.com'}
)

# DELETE
cdc.publish_change(
table='users',
operation='DELETE',
before={'id': 1, 'name': 'Alice Smith', 'email': 'alice@example.com'},
after=None
)

cdc.close()

9. Monitoring and Management

Checking Consumer Lag

# Check consumer group status
docker exec -it kafka kafka-consumer-groups \
--bootstrap-server localhost:9092 \
--describe \
--group user-events-group

# Reset consumer group offset
docker exec -it kafka kafka-consumer-groups \
--bootstrap-server localhost:9092 \
--group user-events-group \
--reset-offsets \
--to-earliest \
--topic user-events \
--execute

Python Monitoring

monitor.py
from kafka import KafkaAdminClient, KafkaConsumer
from kafka.structs import TopicPartition

def get_consumer_lag(bootstrap_servers, group_id, topic):
"""Calculate consumer lag for a consumer group"""
consumer = KafkaConsumer(
bootstrap_servers=bootstrap_servers,
group_id=group_id
)

# Get partitions for topic
partitions = consumer.partitions_for_topic(topic)

if not partitions:
print(f"Topic {topic} not found")
return

lag_info = {}

for partition in partitions:
tp = TopicPartition(topic, partition)

# Get committed offset
committed = consumer.committed(tp)
committed_offset = committed if committed is not None else 0

# Get end offset (latest)
consumer.assign([tp])
consumer.seek_to_end(tp)
end_offset = consumer.position(tp)

# Calculate lag
lag = end_offset - committed_offset

lag_info[partition] = {
'committed_offset': committed_offset,
'end_offset': end_offset,
'lag': lag
}

consumer.close()
return lag_info

# Usage
lag = get_consumer_lag(['localhost:9092'], 'user-events-group', 'user-events')
print(json.dumps(lag, indent=2))

10. Best Practices

1. Topic Design

  • Use meaningful topic names (e.g., user.events, order.created)
  • One topic per event type or bounded context
  • Consider retention policies based on use case
  • Plan partition count based on expected throughput

2. Partitioning Strategy

# Good: Partition by user_id for ordering guarantees per user
producer.send('user-events', key=f"user-{user_id}", value=event)

# Bad: Random partitioning loses ordering
producer.send('user-events', value=event)

3. Message Schema

# Good: Include versioning and metadata
message = {
'schema_version': '1.0',
'event_id': str(uuid.uuid4()),
'timestamp': datetime.now().isoformat(),
'event_type': 'user.created',
'data': {
'user_id': 123,
'name': 'John Doe'
}
}

# Consider using Schema Registry for production
# with Avro, Protobuf, or JSON Schema

4. Error Handling

  • Implement Dead Letter Queues (DLQ) for failed messages
  • Use idempotent producers to avoid duplicates
  • Enable exactly-once semantics when needed
  • Set appropriate timeouts and retries

5. Performance Tuning

# Producer optimization
producer = KafkaProducer(
batch_size=32768, # Larger batches
linger_ms=20, # Wait longer to fill batches
compression_type='lz4', # Use compression
acks='all', # Ensure durability
max_in_flight_requests_per_connection=5
)

# Consumer optimization
consumer = KafkaConsumer(
fetch_min_bytes=50000, # Fetch larger batches
max_poll_records=500, # Process more records per poll
max_partition_fetch_bytes=1048576 # 1MB per partition
)

11. Common Pitfalls and Solutions

IssueCauseSolution
Consumer lag increasingSlow processingScale consumers, optimize processing, increase partitions
Duplicate messagesAt-least-once deliveryImplement idempotent processing, use exactly-once semantics
Lost messagesProducer not waiting for ackSet acks='all', increase retries
Rebalancing stormsConsumers taking too longIncrease max.poll.interval.ms, optimize processing
Out of order messagesMultiple partitionsUse single partition or partition by key
Memory issuesLarge batchesTune fetch.max.bytes, max.partition.fetch.bytes

12. Production Checklist

Before deploying to production:

  • ✅ Set up proper replication (min 3 brokers)
  • ✅ Configure retention policies
  • ✅ Implement monitoring (Prometheus + Grafana)
  • ✅ Set up alerts for consumer lag
  • ✅ Use Schema Registry for schema evolution
  • ✅ Implement proper error handling and DLQ
  • ✅ Configure SSL/TLS and SASL authentication
  • ✅ Set up backup and disaster recovery
  • ✅ Test failover scenarios
  • ✅ Document topic naming conventions

13. Next Steps

  • Explore Kafka Streams for advanced stream processing
  • Learn Kafka Connect for integrating with databases and systems
  • Implement Schema Registry for schema management
  • Study KSQL for SQL-like stream processing
  • Investigate Kafka security (SSL, SASL, ACLs)
  • Set up monitoring with Prometheus and Grafana

14. Additional Resources


Summary

Apache Kafka is a powerful platform for building real-time data pipelines and streaming applications. Start with simple producer-consumer patterns, understand partitioning and consumer groups, and gradually adopt advanced features like stream processing and exactly-once semantics as your needs grow.