Ahmed Sayed
20 min readFeb 3, 2024

Apache Kafka is a distributed streaming platform that has gained significant popularity for its ability to handle high-throughput, fault-tolerant messaging among applications and systems. At its core, Kafka is designed to provide durable storage and stream processing capabilities, making it an ideal choice for building real-time streaming data pipelines and applications. This article will delve into the architecture of Kafka, its key components, and how to interact with Kafka using Python.

Kafka Architecture Overview
Topics and Partitions
Producers and Consumers
Brokers
ZooKeeper
Kafka’s Distributed Nature
Kafka Streams and Connect
Handling Changes and State
Understanding Offsets
Practical Implications
Interacting with Kafka using Python
Creating a Topic
Listing Topics
Deleting a Topic
Producing Messages to Kafka
Consuming Messages from Kafka
· Using kafka-python
Installation
Producer Example
Consumer Example
Key Points
Order Maintenance in Kafka
Dead Letter Queues
Purpose of Dead Letter Queues
How Dead Letter Queues Work
Use in Various Technologies
Best Practices
Caching Strategies with Kafka
1.Client-Side Caching
2.Kafka Streams State Stores
3.Interactive Queries in Kafka Streams
4.External Caching Systems
Considerations for Caching with Kafka
· Example: Using Kafka Streams State Store for Caching
· Prerequisites
· Example Overview
· Redis as a State Store
· Consuming Messages and Caching Results
· Considerations
Partitioning in Apache Kafka
Partitioning in Databases
Partitioning in Distributed File Systems
Challenges and Considerations
How Kafka Supports CDC
Implementing CDC with Kafka
CDC Workflow with Kafka
Considerations
CDC Data from Kafka with Python
Pre-requisites
Consuming CDC Data from Kafka with Python
Using confluent-kafka-python
Using kafka-python
Notes

Kafka Architecture Overview

The architecture of Apache Kafka is built around a few core concepts: producers, consumers, brokers, topics, partitions, and the ZooKeeper coordination system. Understanding these components is crucial to leveraging Kafka effectively.

Topics and Partitions

  • Topics: The basic way Kafka manages data is through topics. A topic is a category or feed name to which records are published by producers.
  • Partitions: Each topic can be split into multiple partitions. Partitions allow the data for a topic to be parallelized, as each partition can be placed on a different server, and multiple partitions can be consumed in parallel.

Producers and Consumers

  • Producers: Producers are applications that publish (write) messages to Kafka topics. They can choose which topic (and optionally, which partition within a topic) to send messages to.
  • Consumers: Consumers read messages from topics. They can subscribe to one or more topics and consume messages in a sequence from the partitions.

Brokers

  • Brokers: A Kafka cluster consists of one or more servers known as brokers. Brokers are responsible for maintaining published data. Each broker can handle terabytes of messages without impacting performance.

ZooKeeper

  • ZooKeeper: Kafka uses ZooKeeper to manage and coordinate the Kafka brokers. ZooKeeper is used to elect leaders among the brokers, manage service discovery for brokers to find one another, and perform other cluster management activities.

Kafka’s Distributed Nature

Kafka’s architecture is inherently distributed. This design allows Kafka to be highly available and scalable. You can add more brokers to a Kafka cluster to increase its capacity and fault tolerance. Data is replicated across multiple brokers to prevent data loss in the case of a broker failure.

Kafka Streams and Connect

  • Kafka Streams: A client library for building applications and microservices, where the input and output data are stored in Kafka clusters. It lets you process and analyze data stored in Kafka with ease.
  • Kafka Connect: A tool for streaming data between Kafka and other systems in a scalable and reliable way. It provides reusable producers and consumers for connecting Kafka to external systems such as databases, key-value stores, search indexes, and file systems.

Handling Changes and State

  • Offsets for Tracking Progress: Kafka uses offsets to track the position of consumers in a partition. Consumers commit their offset as they process messages, which Kafka uses to know which messages have been successfully processed. This allows consumers to restart from the last committed offset, ensuring no messages are missed or duplicated under normal operation, thus handling changes in the consumer state gracefully.
  • Log Compaction for State Changes: Kafka offers a log compaction feature that ensures the retention of only the last known value for each key within a partition. This is particularly useful for maintaining the latest state of an entity. When log compaction is enabled, Kafka periodically compacts the log by removing older records with the same key, leaving only the most recent update for each key. This feature is essential for use cases where only the latest state is relevant, such as in event sourcing or maintaining materialized views.
  • Replication for Fault Tolerance: Kafka replicates partitions across multiple brokers to ensure fault tolerance. Each partition has one leader and zero or more follower replicas. All writes and reads go through the leader replica to ensure consistency. If the leader fails, one of the followers can be promoted to be the new leader, ensuring high availability. Replication ensures that message order and changes are preserved even in the event of broker failures.
  • Transactions for Atomic Changes: Kafka supports transactions, allowing producers to write multiple messages across partitions atomically. This means either all messages in the transaction are visible to consumers or none are. Transactions are crucial for ensuring consistency when a single logical change involves multiple messages across different partitions.

Understanding Offsets

Here are some key points about offsets in Kafka:

  • Sequential and Immutable: Offsets are sequential. When a new message is produced to a partition, it gets the next available offset. This sequence ensures that the order of messages is maintained within a partition. Messages are immutable once written, and their offsets do not change.
  • Partition Scope: Offsets are scoped to a partition; this means that each partition has its own sequence of offsets starting from zero. Therefore, a message in Kafka can be uniquely identified by a combination of its topic name, partition number, and offset within that partition.
  • Consumer Tracking: Kafka consumers use offsets to track their position (i.e., progress) within a partition. When a consumer reads a message, it advances its offset to point to the next message it expects to read. This mechanism allows consumers to resume reading from where they left off, even after restarts or failures.
  • Committing Offsets: Consumers periodically commit their offsets back to Kafka (or to an external store). This committed offset represents the consumer’s current position within the partition and ensures that the consumer does not reprocess messages it has already consumed, providing at least once processing semantics.
  • Offset Retention: Kafka retains messages for a configurable period, not based on the number of messages or their offsets. Once a message expires (based on retention policies), it is eligible for deletion, and its storage space can be reclaimed. However, the offset sequence continues from the last value; it does not reset or change because of message deletion.

Practical Implications

  • Message Ordering: Kafka guarantees the order of messages only within a partition, not across partitions. Consumers can rely on this order when processing messages.
  • Scalability and Parallelism: By dividing topics into partitions and using offsets within each partition, Kafka allows for scalable and parallel consumption. Different consumers (or consumer groups) can read from different partitions independently at their own pace.
  • Fault Tolerance: The use of offsets, combined with Kafka’s replication features, ensures that messages can be reliably processed even in the event of consumer failure. Consumers can pick up processing from the last committed offset.
  • Custom Processing Logic: Advanced users can manipulate offsets for custom processing needs, such as replaying messages, skipping messages, or implementing exactly-once processing semantics in conjunction with Kafka’s transactional features.

In summary, offsets are a fundamental concept in Kafka that enables efficient, ordered, and reliable message processing in distributed systems. They facilitate Kafka’s high-throughput capabilities while supporting consumer scalability and fault-tolerant design.

Interacting with Kafka using Python

Python developers can interact with Kafka through the confluent-kafka-python library or the kafka-python library. Both provide comprehensive tools to produce and consume messages from a Kafka cluster.

Installation

pip install confluent-kafka

Creating a Topic

To create a topic, you can use the AdminClient class from confluent-kafka. Here's how:

from confluent_kafka.admin import AdminClient, NewTopic

# Configuration: Replace 'localhost:9092' with your Kafka broker's address
config = {'bootstrap.servers': 'localhost:9092'}

# Create an AdminClient
admin_client = AdminClient(config)

# Define topic specifications
topic_name = "my_new_topic"
num_partitions = 3
replication_factor = 1 # Adjust based on your cluster setup

# Create a NewTopic object
new_topic = NewTopic(topic_name, num_partitions=num_partitions, replication_factor=replication_factor)

# Create the topic
fs = admin_client.create_topics([new_topic])

# Wait for each operation to finish
for topic, f in fs.items():
try:
f.result() # The result itself is None
print(f"Topic {topic} created")
except Exception as e:
print(f"Failed to create topic {topic}: {e}")

Listing Topics

You might want to list all topics to verify your new topic was created or to see what topics are available:

# Get the list of topics
metadata = admin_client.list_topics(timeout=10)

# Print the topic names
print("Topics in the cluster:")
for topic in metadata.topics:
print(topic)

Deleting a Topic

Similarly, you can delete a topic using the delete_topics method. Be cautious with this operation, as it will remove the topic and all its data:

# Define the topic name to delete
topic_to_delete = "my_old_topic"

# Delete the topic
fs = admin_client.delete_topics([topic_to_delete], operation_timeout=30)

# Wait for the operation to finish
for topic, f in fs.items():
try:
f.result() # The result itself is None
print(f"Topic {topic} deleted")
except Exception as e:
print(f"Failed to delete topic {topic}: {e}")

Producing Messages to Kafka

Here’s a simple example of how to produce messages to a Kafka topic using confluent-kafka-python:

from confluent_kafka import Producer

p = Producer({'bootstrap.servers': 'localhost:9092'})

def delivery_report(err, msg):
if err is not None:
print('Message delivery failed: {}'.format(err))
else:
print('Message delivered to {} [{}]'.format(msg.topic(), msg.partition()))

# Asynchronous message production
p.produce('my_topic', 'hello, world!', callback=delivery_report)

# Wait for any outstanding messages to be delivered
p.flush()

Consuming Messages from Kafka

Consuming messages is just as straightforward. Here’s how you can do it using confluent-kafka-python:

from confluent_kafka import Consumer, KafkaError

c = Consumer({
'bootstrap.servers': 'localhost:9092',
'group.id': 'my_group',
'auto.offset.reset': 'earliest'
})

c.subscribe(['my_topic'])

try:
while True:
msg = c.poll(1.0) # timeout in seconds

if msg is None:
continue
if msg.error():
if msg.error().code() == KafkaError._PARTITION_EOF:
# End of partition event
print('%% %s [%d] reached end at offset %d\n' %
(msg.topic(), msg.partition(), msg.offset()))
elif msg.error():
raise KafkaException(msg.error())
else:
# Message is a normal message
print('Received message: {}'.format(msg.value().decode('utf-8')))
finally:
# Clean up on exit
c.close()

Using kafka-python

The kafka-python library is a pure-Python implementation of the Kafka protocol, providing an easier-to-use interface at the expense of some performance compared to confluent-kafka-python.

Installation

pip install kafka-python

Producer Example

from kafka import KafkaProducer

producer = KafkaProducer(bootstrap_servers=['localhost:9092'])

# Produce a message
producer.send('test_topic', b'Hello, Kafka!')

# Wait for all messages to be sent
producer.flush()

Consumer Example

from kafka import KafkaConsumer

consumer = KafkaConsumer(
'test_topic',
bootstrap_servers=['localhost:9092'],
auto_offset_reset='earliest',
group_id='my_group'
)

for message in consumer:
print(f"Received message: {message.value.decode('utf-8')}")

Key Points

  • Configuration: Replace ’localhost:9092' with the address of your Kafka broker(s). Adjust the topic names and other configurations as needed for your setup.
  • Performance: For high-throughput applications,confluent-kafka-python is recommended due to its efficient use of the native Kafka library.
  • Functionality: Both libraries offer extensive Kafka functionality, but their interfaces and performance characteristics differ. Choose based on your application’s requirements and your preference for implementation language (C extension vs. pure Python).

Order Maintenance in Kafka

  • Per-Partition Order: Kafka guarantees order only within a partition, not across partitions in a topic. When messages are produced to a partition, Kafka appends them sequentially, assigning each new message the next offset in the sequence. Consumers read messages from a partition in the order of these offsets, ensuring that the order in which messages are produced is maintained when they are consumed.
  • Partitioning Strategy: Producers can specify a partition or a partitioning key when sending a message. If a partition is specified directly, Kafka sends the message to that partition. If a partitioning key is provided, Kafka uses a hash of the key to consistently map messages with the same key to the same partition. This ensures that all messages with the same key (which often represents the same entity or aggregate root) are ordered correctly relative to each other.

Dead Letter Queues

A “dead letter queue” (DLQ), sometimes referred to as a “dead queue,” is a service used in message processing and queue management systems to handle messages that cannot be delivered or processed successfully. This concept is widely applicable in systems that use message queues for asynchronous communication between different parts of an application or between different applications.

Purpose of Dead Letter Queues

The primary purpose of a dead letter queue is to isolate messages that have failed processing for one reason or another, such as:

  • Message Format Issues: The message doesn’t conform to the expected format, making it unprocessable by the consumer.
  • Processing Failures: The message processing fails due to some business logic condition or system error.
  • Retry Limit Exceeded: The message has been retried a certain number of times without success, indicating either a persistent failure condition or the need for manual intervention.
  • Resource Constraints: Temporary issues like database unavailability or third-party service downtime that prevent message processing.

By segregating these messages, a system ensures that problematic messages do not block the processing of other valid messages in the queue. Additionally, it allows developers and administrators to investigate the cause of the failure, correct any issues, and decide whether to reprocess or discard the failed messages.

How Dead Letter Queues Work

In practice, when a message in the primary queue cannot be processed after several attempts (due to errors, exceeding the maximum retry limit, etc.), it is moved to a separate queue designated as the dead letter queue. The process is usually automatic, managed by the message queueing system or the application logic.

Once in the dead letter queue, the messages can be monitored and analyzed. This analysis can help in identifying common failure reasons, which can be invaluable for improving the robustness and reliability of the application. After addressing the issues, administrators can manually reprocess the messages by sending them back to the original queue or to a different service for handling.

Use in Various Technologies

Many modern message brokers and cloud services provide built-in support for dead letter queues, including:

  • Amazon SQS: AWS Simple Queue Service (SQS) allows defining dead letter queues for both standard and FIFO queues, helping manage message processing failures gracefully.
  • RabbitMQ: Offers DLX (Dead Letter Exchanges), where messages from a queue can be rerouted to a DLX after failing to be processed.
  • Azure Service Bus: Supports configuring DLQs for queues and subscriptions, enabling efficient management of undeliverable messages.
  • Apache Kafka: While Kafka doesn’t have a built-in DLQ concept, similar functionality can be implemented using topic configurations and consumer group logic to handle failed messages.

Best Practices

When using dead letter queues, it’s important to:

  • Monitor and Alert: Regularly monitor the DLQ and set up alerts for when messages are added to the queue. This helps in quickly identifying and resolving issues.
  • Analyze Failure Causes: Investigate the reasons for message failures to improve system design and message handling.
  • Manage Retries Intelligently: Implement exponential backoff and jitter for retries to reduce the load on the system and increase the chance of successful processing without overwhelming resources.
  • Secure Sensitive Data: Ensure that messages in the DLQ, especially those containing sensitive information, are secured and access-controlled.

Dead letter queues are a critical component in the design of robust, fault-tolerant distributed systems, providing a mechanism to deal with message processing failures efficiently.

Caching Strategies with Kafka

When integrating caching with Kafka, there are several approaches and considerations to keep in mind:

1.Client-Side Caching

Applications consuming data from Kafka topics can implement client-side caching to store frequently accessed data in memory. This approach can significantly reduce the need to fetch the same data repeatedly from Kafka topics, especially for use cases like:

  • Aggregating data over a window of time.
  • Joining streams of data where certain datasets have high read access patterns.
  • Caching the results of expensive computations for quick access.

2.Kafka Streams State Stores

Kafka Streams, the stream processing library of Kafka, provides built-in state management capabilities through state stores, which can be used as caches. State stores can be either in-memory, persistent, or custom implemented. They are used for storing and querying data that is part of the stream processing computation, such as:

  • Aggregations
  • Windowed computations
  • Join operations

Using Kafka Streams state stores allows for efficient, scalable, and fault-tolerant stateful processing directly within your Kafka application.

3.Interactive Queries in Kafka Streams

Kafka Streams also supports interactive queries, which allow applications to query the state stored in state stores directly. This can be used to expose the current state of a stream processing application to external queries, essentially using the state store as a cache that can be queried in real-time.

4.External Caching Systems

For more complex caching needs or when caching data not directly processed by Kafka Streams, external caching systems like Redis, Memcached, or Apache Ignite can be used. These systems can cache data processed from Kafka topics and serve it with low latency to applications, reducing the load on Kafka consumers and other backend systems.

Considerations for Caching with Kafka

  • Consistency: Ensure that the cache is updated or invalidated appropriately to maintain data consistency across the system.
  • Scalability: Choose a caching solution that can scale out as your Kafka streams and consumer applications grow.
  • Fault Tolerance: Consider the durability of your cache, especially for stateful stream processing. Kafka Streams state stores, for example, are fault-tolerant when backed by Kafka changelog topics.
  • Latency: Evaluate the impact of caching on your application’s latency requirements. Caching can reduce latency for read-heavy operations but might introduce complexity in maintaining cache consistency.

Example: Using Kafka Streams State Store for Caching

Kafka Streams is a client library for building applications and microservices where the input and output data are stored in Kafka topics. Kafka Streams combines the simplicity of writing and deploying standard Java and Scala applications on the client side with the benefits of Kafka’s server-side cluster technology. While Kafka Streams is primarily a Java library, if you’re looking to implement similar functionality using Python, especially for stateful operations like caching, you would typically need to work around the lack of direct Kafka Streams support in Python.

However, you can achieve similar outcomes by using Kafka Python clients (confluent-kafka-python or kafka-python) for interaction with Kafka topics and integrating with an external caching or state store mechanism in Python. Here's a conceptual example that demonstrates how you might approach building a simple caching layer for Kafka message processing in Python, using an in-memory cache like Redis for storage.

Prerequisites

  • Kafka cluster running and accessible.
  • Python environment set up.
  • kafka-python and redis libraries installed. You can install them using pip:
pip install kafka-python redis
  • Redis server running and accessible.

Example Overview

This example involves two main components:

  1. Producer: Publishes messages to a Kafka topic.
  2. Consumer with Caching: Consumes messages from the Kafka topic, processes them, and stores the results in a Redis cache for quick access.

Redis as a State Store

First, set up a simple Redis client in Python:

import redis

# Connect to Redis server
redis_client = redis.Redis(host='localhost', port=6379, db=0)

# Function to get data from cache
def get_from_cache(key):
return redis_client.get(key)

# Function to set data in cache
def set_in_cache(key, value):
redis_client.set(key, value)

Consuming Messages and Caching Results

Here’s how you might consume messages from Kafka, process them, and cache the results using Redis:

from kafka import KafkaConsumer
import json

# Initialize Kafka consumer
consumer = KafkaConsumer(
'your_topic_name',
bootstrap_servers=['localhost:9092'],
auto_offset_reset='earliest',
value_deserializer=lambda x: json.loads(x.decode('utf-8'))
)

for message in consumer:
msg_value = message.value
# Assume msg_value contains an 'id' and 'data' that you're processing
processed_data = f"Processed {msg_value['data']}" # Placeholder for your processing logic

# Cache the processed data using the message's id as the key
set_in_cache(msg_value['id'], processed_data)

print(f"Cached processed data for ID {msg_value['id']}")

Considerations

  • Processing Logic: This example assumes a simple processing logic placeholder. Your actual processing could be anything from data transformation to complex analytics.
  • Serialization/Deserialization: The example uses JSON for simplicity. Depending on your use case, you might need more complex serialization/deserialization logic for both Kafka messages and Redis caching.
  • Error Handling: Ensure to implement error handling, especially for network errors, deserialization errors, and Redis command errors.
  • Scalability and Performance: Depending on the volume of data and the complexity of your processing logic, consider performance implications. Redis operations are generally fast, but network latency and processing overhead can accumulate.

Partitioning in Apache Kafka

In Apache Kafka, a distributed streaming platform, partitioning refers to dividing a topic into multiple partitions. Here’s how partitioning works and its benefits in Kafka:

  • Parallelism: Partitions allow multiple consumers to read from a topic in parallel, significantly increasing the system’s throughput.
  • Scalability: By distributing partitions across different brokers (servers) in a Kafka cluster, Kafka scales out to handle more data and more consumers.
  • Fault Tolerance: Replicating partitions across multiple brokers ensures that data is not lost if a broker fails, enhancing the system’s reliability.
  • Ordering Guarantees: Within a partition, messages are ordered by the time they are received. Kafka guarantees order within a partition but not across partitions in a topic.

Partitioning in Databases

Partitioning in databases involves dividing a table into multiple pieces, where each piece can be stored on different nodes of a database cluster. The primary goals of partitioning in databases are:

  • Performance Optimization: Queries can be executed on smaller subsets of data, potentially in parallel, reducing response times.
  • Improved Management: Large tables can be more easily managed when partitioned, as maintenance operations can be performed on smaller parts of the table.
  • Data Distribution: Partitioning allows data to be distributed across a cluster, improving load balancing and reducing the risk of bottlenecks.

Databases typically support different partitioning strategies, such as range partitioning, hash partitioning, and list partitioning, each suited for different use cases.

Partitioning in Distributed File Systems

Distributed file systems, like Hadoop’s HDFS, also use partitioning to store data across multiple nodes. In these systems, files are divided into blocks or chunks, which are then distributed across the cluster. This approach has several advantages:

  • Fault Tolerance: Replicating blocks across different nodes ensures data availability even if some nodes fail.
  • Scalability: The system can store and process large volumes of data by adding more nodes to the cluster.
  • Parallel Processing: Data processing frameworks, like MapReduce, can process different blocks of a file in parallel, significantly speeding up computations.

Challenges and Considerations

While partitioning offers numerous benefits, it also introduces challenges that need to be carefully managed:

  • Partitioning Strategy: Choosing the right partitioning strategy is crucial for achieving the desired performance and scalability benefits. The strategy should match the access patterns and data distribution of the application.
  • Data Skew: Poorly designed partitioning schemes can lead to uneven data distribution, known as data skew, which can cause hotspots and degrade performance.
  • Rebalancing: As data volumes grow or access patterns change, partitions may need to be rebalanced across the cluster, which can be a complex and resource-intensive process.

How Kafka Supports CDC

Kafka’s architecture and ecosystem provide a robust foundation for implementing CDC:

  • Distributed System: Kafka’s distributed nature allows it to handle high volumes of data, making it suitable for capturing and distributing large-scale change data streams.
  • Fault Tolerance: Kafka’s replication and partitioning mechanisms ensure that data is reliably stored and available even in the event of node failures, crucial for maintaining accurate and consistent data streams in a CDC pipeline.
  • Scalability: Kafka can scale out to accommodate growing data volumes by adding more brokers to a cluster. This scalability is essential for CDC, where the volume of change data can increase as the source systems grow.
  • Real-time Processing: Kafka enables real-time data processing and streaming analytics, allowing businesses to react quickly to changes captured from the source databases.

Implementing CDC with Kafka

Implementing CDC with Kafka typically involves using Kafka Connect, a tool for streaming data between Kafka and other systems like databases, in a scalable and reliable way. Kafka Connect comes with a number of connectors that can be used for CDC, including:

  • Debezium: An open-source CDC platform built on top of Kafka Connect. Debezium provides a suite of connectors for various databases (e.g., MySQL, PostgreSQL, MongoDB, SQL Server) to capture row-level changes as they happen and publish them to Kafka topics.
  • Confluent JDBC Connector: While primarily used for importing data from databases into Kafka and exporting data from Kafka to databases, it can be configured for basic CDC scenarios.

CDC Workflow with Kafka

  1. Capture Changes: A CDC connector (like Debezium) monitors the source database logs (binlog in MySQL, WAL in PostgreSQL, etc.) for changes. This approach does not require polling the database, thus reducing the impact on the source database’s performance.
  2. Publish to Kafka: Changes captured by the connector are published to Kafka topics, with each change event typically containing information such as the type of change (insert, update, delete), the timestamp of the change, and the data that was changed.
  3. Consume and Process: Downstream systems, analytics platforms, or other Kafka consumers can subscribe to these topics to receive the change events in real-time. These events can then be used to update data stores, trigger business processes, or feed real-time analytics systems.

Considerations

  • Event Order: Ensuring the correct order of change events is critical, especially for accurately reflecting updates and deletes. Kafka’s partitioning and ordering guarantees within a partition help address this challenge.
  • Schema Evolution: Managing schema changes in the source database and reflecting those changes in the downstream systems is essential for maintaining data integrity and consistency.
  • Filtering and Transformation: In some cases, you might need to filter or transform change data before it’s consumed by downstream systems. Kafka Streams and Kafka Connect transformations can be used for this purpose.

CDC Data from Kafka with Python

Implementing Change Data Capture (CDC) with Apache Kafka can be efficiently accomplished using the Kafka Python client to produce or consume change data records. However, the more common practice involves leveraging existing CDC tools like Debezium with Kafka Connect to capture changes from databases and then using Python to consume these changes from Kafka topics. Below, we will outline how to consume change data from a Kafka topic using Python, assuming Debezium and Kafka Connect are already set up to capture and stream database changes to Kafka.

Pre-requisites

  • Apache Kafka and Kafka Connect are running with Debezium installed.
  • A Debezium connector is configured to capture changes from a source database and publish them to a Kafka topic.

Consuming CDC Data from Kafka with Python

To consume change data from Kafka, you can use the confluent-kafka-pythonclient or the kafka-python package. Here’s how you can do it with both:

Using confluent-kafka-python

First, install the package if you haven’t already:

pip install confluent-kafka

Then, use the following Python code to consume messages:

from confluent_kafka import Consumer, KafkaError

# Kafka consumer configuration
config = {
'bootstrap.servers': 'localhost:9092',
'group.id': 'cdc-group',
'auto.offset.reset': 'earliest'
}

# Create a consumer instance
consumer = Consumer(config)

# Subscribe to the CDC topic
cdc_topic = 'dbserver1.inventory.customers' # Example topic name
consumer.subscribe([cdc_topic])

try:
while True:
msg = consumer.poll(1.0) # Poll for messages

if msg is None:
continue # No message available within the timeout
if msg.error():
if msg.error().code() == KafkaError._PARTITION_EOF:
# End of partition event
continue
else:
print(msg.error())
break

# Extract the message value (change data) as a string
value = msg.value().decode('utf-8')
print(f'Received message: {value}')
finally:
# Clean up on exit
consumer.close()

Using kafka-python

Install the package if needed:

pip install kafka-python

Consume messages with the following code:

from kafka import KafkaConsumer

# Kafka consumer configuration
consumer = KafkaConsumer(
'dbserver1.inventory.customers', # Example topic name
bootstrap_servers=['localhost:9092'],
auto_offset_reset='earliest',
group_id='cdc-group'
)

try:
for message in consumer:
# message value and key are raw bytes -- decode if necessary!
value = message.value.decode('utf-8')
print(f"Received message: {value}")
finally:
consumer.close()

Notes

  • Topic Names: The example topicdbserver1.inventory. customer is a placeholder. You should replace it with the actual topic name that Debezium uses to publish changes from your source database. Debezium topic names typically follow the pattern{connector-name}.{database-name}.{table-name}.
  • Message Processing: The code examples above simply print the received messages. In a real application, you would likely deserialize the message value (which is usually in JSON format for Debezium) and then process it according to your application’s logic.
  • Error Handling: Proper error handling and logging should be implemented for production use.
Ahmed Sayed
Ahmed Sayed

Written by Ahmed Sayed

Data Engineer Lead | Azure Data Solutions | Data Architect

Responses (1)