Overview
Debezium is an open-source distributed platform for change data capture (CDC). It monitors databases and produces event streams for every row-level insert, update, and delete operation, allowing downstream systems to react to data changes in real time. Built on top of Apache Kafka Connect, Debezium captures changes from transaction logs rather than polling, ensuring low-latency, reliable, and complete change streams without impacting source database performance.
Debezium supports connectors for PostgreSQL, MySQL, MongoDB, SQL Server, Oracle, Cassandra, Db2, and more. Each change event includes the full before and after state of the row, the operation type, transaction metadata, and source information. This makes Debezium ideal for data replication, cache invalidation, search index updates, event-driven microservices, and building real-time data pipelines. Debezium can run as a Kafka Connect cluster, as a standalone server, or embedded in custom applications.
Installation
Kafka Connect Deployment
# Download Kafka Connect with Debezium connectors
# Add to Kafka Connect plugin path
mkdir -p /kafka/connect/debezium
cd /kafka/connect/debezium
# Download connector plugins
wget https://repo1.maven.org/maven2/io/debezium/debezium-connector-postgres/2.6.0.Final/debezium-connector-postgres-2.6.0.Final-plugin.tar.gz
tar -xzf debezium-connector-postgres-2.6.0.Final-plugin.tar.gz
wget https://repo1.maven.org/maven2/io/debezium/debezium-connector-mysql/2.6.0.Final/debezium-connector-mysql-2.6.0.Final-plugin.tar.gz
tar -xzf debezium-connector-mysql-2.6.0.Final-plugin.tar.gz
Docker Compose Setup
# docker-compose.yml
version: '3.8'
services:
zookeeper:
image: confluentinc/cp-zookeeper:7.6.0
environment:
ZOOKEEPER_CLIENT_PORT: 2181
kafka:
image: confluentinc/cp-kafka:7.6.0
depends_on: [zookeeper]
ports: ["9092:9092"]
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
connect:
image: debezium/connect:2.6
depends_on: [kafka]
ports: ["8083:8083"]
environment:
BOOTSTRAP_SERVERS: kafka:9092
GROUP_ID: debezium-connect
CONFIG_STORAGE_TOPIC: connect_configs
OFFSET_STORAGE_TOPIC: connect_offsets
STATUS_STORAGE_TOPIC: connect_statuses
postgres:
image: debezium/postgres:16
ports: ["5432:5432"]
environment:
POSTGRES_USER: postgres
POSTGRES_PASSWORD: postgres
POSTGRES_DB: inventory
Debezium Server (Standalone)
# For non-Kafka sinks (Pulsar, Kinesis, Redis, HTTP, etc.)
wget https://repo1.maven.org/maven2/io/debezium/debezium-server-dist/2.6.0.Final/debezium-server-dist-2.6.0.Final.tar.gz
tar -xzf debezium-server-dist-2.6.0.Final.tar.gz
cd debezium-server
Core Connector Configuration
PostgreSQL Connector
{
"name": "postgres-connector",
"config": {
"connector.class": "io.debezium.connector.postgresql.PostgresConnector",
"database.hostname": "postgres",
"database.port": "5432",
"database.user": "debezium",
"database.password": "dbz_password",
"database.dbname": "inventory",
"topic.prefix": "dbserver1",
"schema.include.list": "public",
"table.include.list": "public.orders,public.customers",
"plugin.name": "pgoutput",
"slot.name": "debezium_slot",
"publication.name": "dbz_publication",
"snapshot.mode": "initial",
"tombstones.on.delete": "true",
"key.converter": "org.apache.kafka.connect.json.JsonConverter",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"key.converter.schemas.enable": "false",
"value.converter.schemas.enable": "true"
}
}
MySQL Connector
{
"name": "mysql-connector",
"config": {
"connector.class": "io.debezium.connector.mysql.MySqlConnector",
"database.hostname": "mysql",
"database.port": "3306",
"database.user": "debezium",
"database.password": "dbz_password",
"database.server.id": "184054",
"topic.prefix": "mysql_server",
"database.include.list": "inventory",
"table.include.list": "inventory.orders,inventory.products",
"schema.history.internal.kafka.bootstrap.servers": "kafka:9092",
"schema.history.internal.kafka.topic": "schema-changes.inventory",
"snapshot.mode": "initial",
"include.schema.changes": "true"
}
}
REST API Commands
| Command | Description |
|---|
GET /connectors | List all connectors |
POST /connectors | Create a new connector |
GET /connectors/{name} | Get connector details |
GET /connectors/{name}/status | Get connector status |
PUT /connectors/{name}/config | Update connector configuration |
PUT /connectors/{name}/pause | Pause a connector |
PUT /connectors/{name}/resume | Resume a connector |
POST /connectors/{name}/restart | Restart a connector |
DELETE /connectors/{name} | Delete a connector |
GET /connectors/{name}/tasks | Get connector tasks |
POST /connectors/{name}/tasks/{id}/restart | Restart a specific task |
CLI Examples
# Register a connector
curl -X POST http://localhost:8083/connectors \
-H "Content-Type: application/json" \
-d @postgres-connector.json
# Check connector status
curl http://localhost:8083/connectors/postgres-connector/status | jq
# List all connectors
curl http://localhost:8083/connectors | jq
# Pause connector
curl -X PUT http://localhost:8083/connectors/postgres-connector/pause
# Resume connector
curl -X PUT http://localhost:8083/connectors/postgres-connector/resume
# Delete connector
curl -X DELETE http://localhost:8083/connectors/postgres-connector
# Check available connector plugins
curl http://localhost:8083/connector-plugins | jq
Configuration
Snapshot Modes
| Mode | Description |
|---|
initial | Snapshot existing data, then stream changes |
initial_only | Only snapshot, no streaming |
never | Skip snapshot, only stream new changes |
when_needed | Snapshot if offsets are missing |
schema_only | Capture schema only, no data snapshot |
recovery | Recover from corrupted/lost history topic |
Signal-Based Incremental Snapshots
-- Create signal table in source database
CREATE TABLE debezium_signal (
id VARCHAR(42) PRIMARY KEY,
type VARCHAR(32) NOT NULL,
data VARCHAR(2048) NULL
);
-- Trigger incremental snapshot
INSERT INTO debezium_signal (id, type, data) VALUES (
'snapshot-001',
'execute-snapshot',
'{"data-collections": ["public.orders"], "type": "incremental"}'
);
Advanced Usage
{
"transforms": "route,unwrap,filter",
"transforms.route.type": "org.apache.kafka.connect.transforms.RegexRouter",
"transforms.route.regex": "([^.]+)\\.([^.]+)\\.([^.]+)",
"transforms.route.replacement": "$3",
"transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState",
"transforms.unwrap.drop.tombstones": "false",
"transforms.unwrap.delete.handling.mode": "rewrite",
"transforms.unwrap.add.fields": "op,table,source.ts_ms",
"transforms.filter.type": "io.debezium.transforms.Filter",
"transforms.filter.language": "jsr223.groovy",
"transforms.filter.condition": "value.after.status != 'deleted'"
}
Change Event Structure
{
"schema": {},
"payload": {
"before": {
"id": 1001,
"name": "Original Name",
"status": "active"
},
"after": {
"id": 1001,
"name": "Updated Name",
"status": "active"
},
"source": {
"version": "2.6.0.Final",
"connector": "postgresql",
"name": "dbserver1",
"ts_ms": 1700000000000,
"db": "inventory",
"schema": "public",
"table": "customers",
"txId": 12345,
"lsn": 98765432
},
"op": "u",
"ts_ms": 1700000000123
}
}
Troubleshooting
| Issue | Solution |
|---|
| Connector fails to start | Check database credentials and network connectivity. Verify plugin.name matches DB config |
| Replication slot not created | Ensure DB user has REPLICATION privilege. For PostgreSQL, set wal_level=logical |
| Snapshot taking too long | Use snapshot.mode=schema_only and do incremental snapshots. Increase snapshot.fetch.size |
| Missing change events | Check replication slot health. Verify table.include.list patterns match |
| Kafka topic not created | Check Kafka auto-create setting or create topics manually before starting |
| Connector status FAILED | Check Connect logs. Restart with POST /connectors/{name}/restart |
| High replication lag | Increase task count. Monitor WAL/binlog size and disk space |
| Schema evolution errors | Enable schema.history topic. Use Avro with Schema Registry for compatibility |
| OOM in Connect worker | Increase Connect heap size. Reduce max.batch.size and max.queue.size |
| Duplicate events on restart | This is expected with at-least-once delivery. Use idempotent consumers |