تخطَّ إلى المحتوى

Kafka Connect Cheat Sheet

Overview

Kafka Connect is a framework for streaming data between Apache Kafka and other systems. It provides a scalable and reliable way to move data in and out of Kafka using source connectors (ingest data into Kafka) and sink connectors (deliver data from Kafka to external systems) without writing custom integration code.

Kafka Connect supports distributed and standalone modes, automatic offset management, schema evolution, single message transforms (SMTs), dead letter queues for error handling, and a REST API for management. The Confluent Hub provides hundreds of pre-built connectors for databases, cloud services, search engines, and file systems.

Installation

Part of Kafka Distribution

# Kafka Connect is included with Apache Kafka
wget https://downloads.apache.org/kafka/3.7.0/kafka_2.13-3.7.0.tgz
tar xzf kafka_2.13-3.7.0.tgz
cd kafka_2.13-3.7.0

Docker (Confluent Platform)

docker run -d --name kafka-connect \
  -p 8083:8083 \
  -e CONNECT_BOOTSTRAP_SERVERS=kafka:9092 \
  -e CONNECT_REST_PORT=8083 \
  -e CONNECT_GROUP_ID=connect-cluster \
  -e CONNECT_CONFIG_STORAGE_TOPIC=connect-configs \
  -e CONNECT_OFFSET_STORAGE_TOPIC=connect-offsets \
  -e CONNECT_STATUS_STORAGE_TOPIC=connect-status \
  -e CONNECT_KEY_CONVERTER=org.apache.kafka.connect.json.JsonConverter \
  -e CONNECT_VALUE_CONVERTER=org.apache.kafka.connect.json.JsonConverter \
  -e CONNECT_REST_ADVERTISED_HOST_NAME=kafka-connect \
  -e CONNECT_PLUGIN_PATH=/usr/share/confluent-hub-components \
  confluentinc/cp-kafka-connect:7.6.0

Install Connectors

# Using Confluent Hub client
confluent-hub install debezium/debezium-connector-mysql:latest
confluent-hub install confluentinc/kafka-connect-jdbc:latest
confluent-hub install confluentinc/kafka-connect-s3:latest
confluent-hub install confluentinc/kafka-connect-elasticsearch:latest

# Manual installation (copy JARs to plugin path)
mkdir -p /usr/share/confluent-hub-components/my-connector
cp my-connector.jar /usr/share/confluent-hub-components/my-connector/

Starting Connect

Distributed Mode

# Start distributed connect worker
bin/connect-distributed.sh config/connect-distributed.properties

Standalone Mode

# Start standalone (single worker, for dev/testing)
bin/connect-standalone.sh config/connect-standalone.properties connector.properties

REST API

Cluster and Plugin Management

# Get connect cluster info
curl http://localhost:8083/

# List installed plugins
curl http://localhost:8083/connector-plugins | jq .

# Validate connector config
curl -X PUT http://localhost:8083/connector-plugins/FileStreamSinkConnector/config/validate \
  -H "Content-Type: application/json" \
  -d '{"connector.class": "FileStreamSink", "tasks.max": "1", "topics": "test"}'

Connector Management

# List all connectors
curl http://localhost:8083/connectors | jq .

# Create a connector
curl -X POST http://localhost:8083/connectors \
  -H "Content-Type: application/json" \
  -d '{
    "name": "jdbc-source",
    "config": {
      "connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
      "connection.url": "jdbc:postgresql://db:5432/mydb",
      "connection.user": "user",
      "connection.password": "pass",
      "table.whitelist": "orders,customers",
      "mode": "incrementing",
      "incrementing.column.name": "id",
      "topic.prefix": "db-",
      "tasks.max": "1"
    }
  }'

# Get connector details
curl http://localhost:8083/connectors/jdbc-source | jq .

# Get connector config
curl http://localhost:8083/connectors/jdbc-source/config | jq .

# Update connector config
curl -X PUT http://localhost:8083/connectors/jdbc-source/config \
  -H "Content-Type: application/json" \
  -d '{
    "connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
    "tasks.max": "2",
    "connection.url": "jdbc:postgresql://db:5432/mydb",
    "table.whitelist": "orders,customers,products"
  }'

# Get connector status
curl http://localhost:8083/connectors/jdbc-source/status | jq .

# Pause connector
curl -X PUT http://localhost:8083/connectors/jdbc-source/pause

# Resume connector
curl -X PUT http://localhost:8083/connectors/jdbc-source/resume

# Restart connector
curl -X POST http://localhost:8083/connectors/jdbc-source/restart

# Restart a specific task
curl -X POST http://localhost:8083/connectors/jdbc-source/tasks/0/restart

# Delete connector
curl -X DELETE http://localhost:8083/connectors/jdbc-source

Common Connector Configs

JDBC Source Connector

{
  "name": "jdbc-source",
  "config": {
    "connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
    "connection.url": "jdbc:postgresql://db:5432/mydb",
    "connection.user": "user",
    "connection.password": "pass",
    "table.whitelist": "orders,products",
    "mode": "timestamp+incrementing",
    "timestamp.column.name": "updated_at",
    "incrementing.column.name": "id",
    "topic.prefix": "db-",
    "poll.interval.ms": "5000",
    "tasks.max": "2"
  }
}

S3 Sink Connector

{
  "name": "s3-sink",
  "config": {
    "connector.class": "io.confluent.connect.s3.S3SinkConnector",
    "tasks.max": "4",
    "topics": "orders,events",
    "s3.bucket.name": "my-data-lake",
    "s3.region": "us-east-1",
    "flush.size": "1000",
    "rotate.interval.ms": "600000",
    "storage.class": "io.confluent.connect.s3.storage.S3Storage",
    "format.class": "io.confluent.connect.s3.format.parquet.ParquetFormat",
    "partitioner.class": "io.confluent.connect.storage.partitioner.TimeBasedPartitioner",
    "path.format": "'year'=YYYY/'month'=MM/'day'=dd/'hour'=HH",
    "partition.duration.ms": "3600000",
    "locale": "en-US",
    "timezone": "UTC"
  }
}

Elasticsearch Sink Connector

{
  "name": "es-sink",
  "config": {
    "connector.class": "io.confluent.connect.elasticsearch.ElasticsearchSinkConnector",
    "tasks.max": "2",
    "topics": "logs,events",
    "connection.url": "http://elasticsearch:9200",
    "type.name": "_doc",
    "key.ignore": "true",
    "schema.ignore": "true",
    "behavior.on.malformed.documents": "warn",
    "batch.size": "2000",
    "linger.ms": "1000"
  }
}

Configuration

Worker Configuration (connect-distributed.properties)

bootstrap.servers=kafka1:9092,kafka2:9092,kafka3:9092
group.id=connect-cluster

key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable=false
value.converter.schemas.enable=false

# Internal topics
config.storage.topic=connect-configs
offset.storage.topic=connect-offsets
status.storage.topic=connect-status

config.storage.replication.factor=3
offset.storage.replication.factor=3
status.storage.replication.factor=3

# REST interface
rest.port=8083
rest.advertised.host.name=connect-worker-1

# Plugin path
plugin.path=/usr/share/confluent-hub-components,/opt/connectors

# Converter options
# For Avro with Schema Registry:
# key.converter=io.confluent.connect.avro.AvroConverter
# key.converter.schema.registry.url=http://schema-registry:8081

Single Message Transforms (SMTs)

{
  "name": "my-connector",
  "config": {
    "connector.class": "...",
    "transforms": "route,timestamp,mask",

    "transforms.route.type": "org.apache.kafka.connect.transforms.RegexRouter",
    "transforms.route.regex": "(.*)",
    "transforms.route.replacement": "prefix-$1",

    "transforms.timestamp.type": "org.apache.kafka.connect.transforms.InsertField$Value",
    "transforms.timestamp.timestamp.field": "processed_at",

    "transforms.mask.type": "org.apache.kafka.connect.transforms.MaskField$Value",
    "transforms.mask.fields": "ssn,credit_card"
  }
}

Common SMTs

TransformDescription
InsertFieldAdd field to record
ReplaceFieldInclude/exclude/rename fields
MaskFieldReplace field values with null/zero
ExtractFieldExtract single field from struct
TimestampConverterConvert timestamp formats
RegexRouterModify topic name with regex
FlattenFlatten nested structures
ValueToKeyCopy fields from value to key
FilterDrop records based on conditions
CastChange field data types

Error Handling

{
  "name": "my-connector",
  "config": {
    "errors.tolerance": "all",
    "errors.deadletterqueue.topic.name": "dlq-my-connector",
    "errors.deadletterqueue.topic.replication.factor": 3,
    "errors.deadletterqueue.context.headers.enable": true,
    "errors.retry.timeout": "60000",
    "errors.retry.delay.max.ms": "30000",
    "errors.log.enable": true,
    "errors.log.include.messages": true
  }
}

Troubleshooting

IssueSolution
Connector in FAILED stateCheck status endpoint; review Connect worker logs
Tasks rebalancing constantlyCheck for unhealthy workers; verify Kafka connectivity
No data flowingVerify source has data; check topic creation and permissions
Schema errorsEnsure converter matches data format; check Schema Registry
Connector lag growingIncrease tasks.max; add more Connect workers
Out of memoryIncrease JVM heap; reduce batch sizes; check for large messages
Plugin not foundVerify plugin path in worker config; restart Connect worker
DLQ filling upReview DLQ messages for root cause; fix source data or transforms