The Various Methods of Change Data Capture (CDC) with Examples and Code Snippets

Ahmed Sayed
3 min readAug 18, 2023

Change Data Capture (CDC) is a design pattern that allows you to track changes in your database so that you can respond to them in a programmatic way. It’s a crucial part of many data integration and data replication processes. In this article, we will discuss various methods of CDC and provide examples and code snippets to illustrate each one.

1. Timestamps

The simplest CDC method involves adding a timestamp column to each record in your database. By comparing the timestamp of the last change you processed to the timestamps in the database, you can identify and process new changes.

For example, in SQL you might add a last_modified column to your table:

ALTER TABLE orders
ADD COLUMN last_modified TIMESTAMP DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP;

Then, to find new changes, you would query:

SELECT * FROM orders
WHERE last_modified > '2023-08-18 00:00:00';

This method is straightforward and works with virtually any database, but it can miss changes if multiple changes occur to the same record within the resolution of the timestamp.

2. Log-Based CDC

Log-based CDC captures changes by reading the database’s transaction log, a record of all changes made to the database. This method can capture all changes, including deletions and multiple changes to the same record.

Here’s an example using Debezium, a popular open-source CDC tool, with a MySQL database:

# Configure Debezium connector
{
"name": "inventory-connector",
"config": {
"connector.class": "io.debezium.connector.mysql.MySqlConnector",
"database.hostname": "mysql",
"database.port": "3306",
"database.user": "debezium",
"database.password": "dbz",
"database.server.id": "184054",
"database.server.name": "dbserver1",
"database.whitelist": "inventory",
"database.history.kafka.bootstrap.servers": "kafka:9092",
"database.history.kafka.topic": "schema-changes.inventory"
}
}

his configuration tells Debezium to connect to a MySQL database and monitor changes, which it will then produce to the specified Kafka topic.

3. Trigger-Based CDC

Trigger-based CDC uses database triggers to capture changes. When a change occurs, the database automatically runs a piece of code (the trigger) that logs the change.

Here’s an example in PostgreSQL:

-- Create a table to hold changes
CREATE TABLE orders_audit(
id serial PRIMARY KEY,
operation CHAR(1) NOT NULL,
stamp TIMESTAMP NOT NULL,
userid TEXT NOT NULL,
old_data TEXT,
new_data TEXT
);

-- Create a trigger function
CREATE OR REPLACE FUNCTION audit_orders() RETURNS TRIGGER AS $orders_audit$
BEGIN
IF (TG_OP = 'DELETE') THEN
INSERT INTO orders_audit SELECT 'D', now(), user, OLD.*;
RETURN OLD;
ELSIF (TG_OP = 'UPDATE') THEN
INSERT INTO orders_audit SELECT 'U', now(), user, NEW.*;
RETURN NEW;
ELSIF (TG_OP = 'INSERT') THEN
INSERT INTO orders_audit SELECT 'I', now(), user, NEW.*;
RETURN NEW;
END IF;
RETURN NULL;
END;
$orders_audit$ LANGUAGE plpgsql;

-- Attach the trigger to the orders table
CREATE TRIGGER orders_audit
AFTER INSERT OR UPDATE OR DELETE ON orders
FOR EACH ROW EXECUTE PROCEDURE audit_orders();

This code sets up a trigger that logs all changes to the orders table into the orders_audit table.

4. Query-Based CDC

Query-based CDC involves regularly querying the database and comparing the current state to a saved snapshot. This method can work with any database, but it may not capture all changes if multiple changes occur between queries.

Here’s an example using Python and SQLAlchemy:

from sqlalchemy import create_engine, MetaData, Table, select
from sqlalchemy.orm import sessionmaker
from datetime import datetime

engine = create_engine('postgresql://user:pass@localhost/dbname')
Session = sessionmaker(bind=engine)

# Load the current state
metadata = MetaData()
orders = Table('orders', metadata, autoload_with=engine)
last_query_time = datetime.now()

while True:
session = Session()
for row in session.execute(select([orders]).where(orders.c.last_modified > last_query_time)):
print(f"New change: {row}")
last_query_time = datetime.now()
session.close()

This Python script uses SQLAlchemy to periodically query the orders table for new changes.

In conclusion, the CDC method you choose depends on your specific requirements and the capabilities of your database. Regardless of the method, CDC is a power tool for keeping your data systems in sync and responding to changes in real time. It’s an essential part of many modern data architectures, and understanding how it works will enable you to build more efficient and effective data pipelines.

--

--

Ahmed Sayed

Data Engineer Lead | Azure Data Solutions | Data Architect