Salta ai contenuti

Debezium Cheat Sheet

Overview

Debezium is an open-source distributed platform for change data capture (CDC). It monitors databases and produces event streams for every row-level insert, update, and delete operation, allowing downstream systems to react to data changes in real time. Built on top of Apache Kafka Connect, Debezium captures changes from transaction logs rather than polling, ensuring low-latency, reliable, and complete change streams without impacting source database performance.

Debezium supports connectors for PostgreSQL, MySQL, MongoDB, SQL Server, Oracle, Cassandra, Db2, and more. Each change event includes the full before and after state of the row, the operation type, transaction metadata, and source information. This makes Debezium ideal for data replication, cache invalidation, search index updates, event-driven microservices, and building real-time data pipelines. Debezium can run as a Kafka Connect cluster, as a standalone server, or embedded in custom applications.

Installation

Kafka Connect Deployment

# Download Kafka Connect with Debezium connectors
# Add to Kafka Connect plugin path
mkdir -p /kafka/connect/debezium
cd /kafka/connect/debezium

# Download connector plugins
wget https://repo1.maven.org/maven2/io/debezium/debezium-connector-postgres/2.6.0.Final/debezium-connector-postgres-2.6.0.Final-plugin.tar.gz
tar -xzf debezium-connector-postgres-2.6.0.Final-plugin.tar.gz

wget https://repo1.maven.org/maven2/io/debezium/debezium-connector-mysql/2.6.0.Final/debezium-connector-mysql-2.6.0.Final-plugin.tar.gz
tar -xzf debezium-connector-mysql-2.6.0.Final-plugin.tar.gz

Docker Compose Setup

# docker-compose.yml
version: '3.8'
services:
  zookeeper:
    image: confluentinc/cp-zookeeper:7.6.0
    environment:
      ZOOKEEPER_CLIENT_PORT: 2181

  kafka:
    image: confluentinc/cp-kafka:7.6.0
    depends_on: [zookeeper]
    ports: ["9092:9092"]
    environment:
      KAFKA_BROKER_ID: 1
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1

  connect:
    image: debezium/connect:2.6
    depends_on: [kafka]
    ports: ["8083:8083"]
    environment:
      BOOTSTRAP_SERVERS: kafka:9092
      GROUP_ID: debezium-connect
      CONFIG_STORAGE_TOPIC: connect_configs
      OFFSET_STORAGE_TOPIC: connect_offsets
      STATUS_STORAGE_TOPIC: connect_statuses

  postgres:
    image: debezium/postgres:16
    ports: ["5432:5432"]
    environment:
      POSTGRES_USER: postgres
      POSTGRES_PASSWORD: postgres
      POSTGRES_DB: inventory

Debezium Server (Standalone)

# For non-Kafka sinks (Pulsar, Kinesis, Redis, HTTP, etc.)
wget https://repo1.maven.org/maven2/io/debezium/debezium-server-dist/2.6.0.Final/debezium-server-dist-2.6.0.Final.tar.gz
tar -xzf debezium-server-dist-2.6.0.Final.tar.gz
cd debezium-server

Core Connector Configuration

PostgreSQL Connector

{
  "name": "postgres-connector",
  "config": {
    "connector.class": "io.debezium.connector.postgresql.PostgresConnector",
    "database.hostname": "postgres",
    "database.port": "5432",
    "database.user": "debezium",
    "database.password": "dbz_password",
    "database.dbname": "inventory",
    "topic.prefix": "dbserver1",
    "schema.include.list": "public",
    "table.include.list": "public.orders,public.customers",
    "plugin.name": "pgoutput",
    "slot.name": "debezium_slot",
    "publication.name": "dbz_publication",
    "snapshot.mode": "initial",
    "tombstones.on.delete": "true",
    "key.converter": "org.apache.kafka.connect.json.JsonConverter",
    "value.converter": "org.apache.kafka.connect.json.JsonConverter",
    "key.converter.schemas.enable": "false",
    "value.converter.schemas.enable": "true"
  }
}

MySQL Connector

{
  "name": "mysql-connector",
  "config": {
    "connector.class": "io.debezium.connector.mysql.MySqlConnector",
    "database.hostname": "mysql",
    "database.port": "3306",
    "database.user": "debezium",
    "database.password": "dbz_password",
    "database.server.id": "184054",
    "topic.prefix": "mysql_server",
    "database.include.list": "inventory",
    "table.include.list": "inventory.orders,inventory.products",
    "schema.history.internal.kafka.bootstrap.servers": "kafka:9092",
    "schema.history.internal.kafka.topic": "schema-changes.inventory",
    "snapshot.mode": "initial",
    "include.schema.changes": "true"
  }
}

REST API Commands

CommandDescription
GET /connectorsList all connectors
POST /connectorsCreate a new connector
GET /connectors/{name}Get connector details
GET /connectors/{name}/statusGet connector status
PUT /connectors/{name}/configUpdate connector configuration
PUT /connectors/{name}/pausePause a connector
PUT /connectors/{name}/resumeResume a connector
POST /connectors/{name}/restartRestart a connector
DELETE /connectors/{name}Delete a connector
GET /connectors/{name}/tasksGet connector tasks
POST /connectors/{name}/tasks/{id}/restartRestart a specific task

CLI Examples

# Register a connector
curl -X POST http://localhost:8083/connectors \
    -H "Content-Type: application/json" \
    -d @postgres-connector.json

# Check connector status
curl http://localhost:8083/connectors/postgres-connector/status | jq

# List all connectors
curl http://localhost:8083/connectors | jq

# Pause connector
curl -X PUT http://localhost:8083/connectors/postgres-connector/pause

# Resume connector
curl -X PUT http://localhost:8083/connectors/postgres-connector/resume

# Delete connector
curl -X DELETE http://localhost:8083/connectors/postgres-connector

# Check available connector plugins
curl http://localhost:8083/connector-plugins | jq

Configuration

Snapshot Modes

ModeDescription
initialSnapshot existing data, then stream changes
initial_onlyOnly snapshot, no streaming
neverSkip snapshot, only stream new changes
when_neededSnapshot if offsets are missing
schema_onlyCapture schema only, no data snapshot
recoveryRecover from corrupted/lost history topic

Signal-Based Incremental Snapshots

-- Create signal table in source database
CREATE TABLE debezium_signal (
    id VARCHAR(42) PRIMARY KEY,
    type VARCHAR(32) NOT NULL,
    data VARCHAR(2048) NULL
);

-- Trigger incremental snapshot
INSERT INTO debezium_signal (id, type, data) VALUES (
    'snapshot-001',
    'execute-snapshot',
    '{"data-collections": ["public.orders"], "type": "incremental"}'
);

Advanced Usage

Single Message Transforms (SMTs)

{
  "transforms": "route,unwrap,filter",
  "transforms.route.type": "org.apache.kafka.connect.transforms.RegexRouter",
  "transforms.route.regex": "([^.]+)\\.([^.]+)\\.([^.]+)",
  "transforms.route.replacement": "$3",
  "transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState",
  "transforms.unwrap.drop.tombstones": "false",
  "transforms.unwrap.delete.handling.mode": "rewrite",
  "transforms.unwrap.add.fields": "op,table,source.ts_ms",
  "transforms.filter.type": "io.debezium.transforms.Filter",
  "transforms.filter.language": "jsr223.groovy",
  "transforms.filter.condition": "value.after.status != 'deleted'"
}

Change Event Structure

{
  "schema": {},
  "payload": {
    "before": {
      "id": 1001,
      "name": "Original Name",
      "status": "active"
    },
    "after": {
      "id": 1001,
      "name": "Updated Name",
      "status": "active"
    },
    "source": {
      "version": "2.6.0.Final",
      "connector": "postgresql",
      "name": "dbserver1",
      "ts_ms": 1700000000000,
      "db": "inventory",
      "schema": "public",
      "table": "customers",
      "txId": 12345,
      "lsn": 98765432
    },
    "op": "u",
    "ts_ms": 1700000000123
  }
}

Troubleshooting

IssueSolution
Connector fails to startCheck database credentials and network connectivity. Verify plugin.name matches DB config
Replication slot not createdEnsure DB user has REPLICATION privilege. For PostgreSQL, set wal_level=logical
Snapshot taking too longUse snapshot.mode=schema_only and do incremental snapshots. Increase snapshot.fetch.size
Missing change eventsCheck replication slot health. Verify table.include.list patterns match
Kafka topic not createdCheck Kafka auto-create setting or create topics manually before starting
Connector status FAILEDCheck Connect logs. Restart with POST /connectors/{name}/restart
High replication lagIncrease task count. Monitor WAL/binlog size and disk space
Schema evolution errorsEnable schema.history topic. Use Avro with Schema Registry for compatibility
OOM in Connect workerIncrease Connect heap size. Reduce max.batch.size and max.queue.size
Duplicate events on restartThis is expected with at-least-once delivery. Use idempotent consumers