Kafka Connect Cheat Sheet
Overview
Kafka Connect is a framework for streaming data between Apache Kafka and other systems. It provides a scalable and reliable way to move data in and out of Kafka using source connectors (ingest data into Kafka) and sink connectors (deliver data from Kafka to external systems) without writing custom integration code.
Kafka Connect supports distributed and standalone modes, automatic offset management, schema evolution, single message transforms (SMTs), dead letter queues for error handling, and a REST API for management. The Confluent Hub provides hundreds of pre-built connectors for databases, cloud services, search engines, and file systems.
Installation
Part of Kafka Distribution
# Kafka Connect is included with Apache Kafka
wget https://downloads.apache.org/kafka/3.7.0/kafka_2.13-3.7.0.tgz
tar xzf kafka_2.13-3.7.0.tgz
cd kafka_2.13-3.7.0
Docker (Confluent Platform)
docker run -d --name kafka-connect \
-p 8083:8083 \
-e CONNECT_BOOTSTRAP_SERVERS=kafka:9092 \
-e CONNECT_REST_PORT=8083 \
-e CONNECT_GROUP_ID=connect-cluster \
-e CONNECT_CONFIG_STORAGE_TOPIC=connect-configs \
-e CONNECT_OFFSET_STORAGE_TOPIC=connect-offsets \
-e CONNECT_STATUS_STORAGE_TOPIC=connect-status \
-e CONNECT_KEY_CONVERTER=org.apache.kafka.connect.json.JsonConverter \
-e CONNECT_VALUE_CONVERTER=org.apache.kafka.connect.json.JsonConverter \
-e CONNECT_REST_ADVERTISED_HOST_NAME=kafka-connect \
-e CONNECT_PLUGIN_PATH=/usr/share/confluent-hub-components \
confluentinc/cp-kafka-connect:7.6.0
Install Connectors
# Using Confluent Hub client
confluent-hub install debezium/debezium-connector-mysql:latest
confluent-hub install confluentinc/kafka-connect-jdbc:latest
confluent-hub install confluentinc/kafka-connect-s3:latest
confluent-hub install confluentinc/kafka-connect-elasticsearch:latest
# Manual installation (copy JARs to plugin path)
mkdir -p /usr/share/confluent-hub-components/my-connector
cp my-connector.jar /usr/share/confluent-hub-components/my-connector/
Starting Connect
Distributed Mode
# Start distributed connect worker
bin/connect-distributed.sh config/connect-distributed.properties
Standalone Mode
# Start standalone (single worker, for dev/testing)
bin/connect-standalone.sh config/connect-standalone.properties connector.properties
REST API
Cluster and Plugin Management
# Get connect cluster info
curl http://localhost:8083/
# List installed plugins
curl http://localhost:8083/connector-plugins | jq .
# Validate connector config
curl -X PUT http://localhost:8083/connector-plugins/FileStreamSinkConnector/config/validate \
-H "Content-Type: application/json" \
-d '{"connector.class": "FileStreamSink", "tasks.max": "1", "topics": "test"}'
Connector Management
# List all connectors
curl http://localhost:8083/connectors | jq .
# Create a connector
curl -X POST http://localhost:8083/connectors \
-H "Content-Type: application/json" \
-d '{
"name": "jdbc-source",
"config": {
"connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
"connection.url": "jdbc:postgresql://db:5432/mydb",
"connection.user": "user",
"connection.password": "pass",
"table.whitelist": "orders,customers",
"mode": "incrementing",
"incrementing.column.name": "id",
"topic.prefix": "db-",
"tasks.max": "1"
}
}'
# Get connector details
curl http://localhost:8083/connectors/jdbc-source | jq .
# Get connector config
curl http://localhost:8083/connectors/jdbc-source/config | jq .
# Update connector config
curl -X PUT http://localhost:8083/connectors/jdbc-source/config \
-H "Content-Type: application/json" \
-d '{
"connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
"tasks.max": "2",
"connection.url": "jdbc:postgresql://db:5432/mydb",
"table.whitelist": "orders,customers,products"
}'
# Get connector status
curl http://localhost:8083/connectors/jdbc-source/status | jq .
# Pause connector
curl -X PUT http://localhost:8083/connectors/jdbc-source/pause
# Resume connector
curl -X PUT http://localhost:8083/connectors/jdbc-source/resume
# Restart connector
curl -X POST http://localhost:8083/connectors/jdbc-source/restart
# Restart a specific task
curl -X POST http://localhost:8083/connectors/jdbc-source/tasks/0/restart
# Delete connector
curl -X DELETE http://localhost:8083/connectors/jdbc-source
Common Connector Configs
JDBC Source Connector
{
"name": "jdbc-source",
"config": {
"connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
"connection.url": "jdbc:postgresql://db:5432/mydb",
"connection.user": "user",
"connection.password": "pass",
"table.whitelist": "orders,products",
"mode": "timestamp+incrementing",
"timestamp.column.name": "updated_at",
"incrementing.column.name": "id",
"topic.prefix": "db-",
"poll.interval.ms": "5000",
"tasks.max": "2"
}
}
S3 Sink Connector
{
"name": "s3-sink",
"config": {
"connector.class": "io.confluent.connect.s3.S3SinkConnector",
"tasks.max": "4",
"topics": "orders,events",
"s3.bucket.name": "my-data-lake",
"s3.region": "us-east-1",
"flush.size": "1000",
"rotate.interval.ms": "600000",
"storage.class": "io.confluent.connect.s3.storage.S3Storage",
"format.class": "io.confluent.connect.s3.format.parquet.ParquetFormat",
"partitioner.class": "io.confluent.connect.storage.partitioner.TimeBasedPartitioner",
"path.format": "'year'=YYYY/'month'=MM/'day'=dd/'hour'=HH",
"partition.duration.ms": "3600000",
"locale": "en-US",
"timezone": "UTC"
}
}
Elasticsearch Sink Connector
{
"name": "es-sink",
"config": {
"connector.class": "io.confluent.connect.elasticsearch.ElasticsearchSinkConnector",
"tasks.max": "2",
"topics": "logs,events",
"connection.url": "http://elasticsearch:9200",
"type.name": "_doc",
"key.ignore": "true",
"schema.ignore": "true",
"behavior.on.malformed.documents": "warn",
"batch.size": "2000",
"linger.ms": "1000"
}
}
Configuration
Worker Configuration (connect-distributed.properties)
bootstrap.servers=kafka1:9092,kafka2:9092,kafka3:9092
group.id=connect-cluster
key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable=false
value.converter.schemas.enable=false
# Internal topics
config.storage.topic=connect-configs
offset.storage.topic=connect-offsets
status.storage.topic=connect-status
config.storage.replication.factor=3
offset.storage.replication.factor=3
status.storage.replication.factor=3
# REST interface
rest.port=8083
rest.advertised.host.name=connect-worker-1
# Plugin path
plugin.path=/usr/share/confluent-hub-components,/opt/connectors
# Converter options
# For Avro with Schema Registry:
# key.converter=io.confluent.connect.avro.AvroConverter
# key.converter.schema.registry.url=http://schema-registry:8081
Single Message Transforms (SMTs)
{
"name": "my-connector",
"config": {
"connector.class": "...",
"transforms": "route,timestamp,mask",
"transforms.route.type": "org.apache.kafka.connect.transforms.RegexRouter",
"transforms.route.regex": "(.*)",
"transforms.route.replacement": "prefix-$1",
"transforms.timestamp.type": "org.apache.kafka.connect.transforms.InsertField$Value",
"transforms.timestamp.timestamp.field": "processed_at",
"transforms.mask.type": "org.apache.kafka.connect.transforms.MaskField$Value",
"transforms.mask.fields": "ssn,credit_card"
}
}
Common SMTs
| Transform | Description |
|---|---|
InsertField | Add field to record |
ReplaceField | Include/exclude/rename fields |
MaskField | Replace field values with null/zero |
ExtractField | Extract single field from struct |
TimestampConverter | Convert timestamp formats |
RegexRouter | Modify topic name with regex |
Flatten | Flatten nested structures |
ValueToKey | Copy fields from value to key |
Filter | Drop records based on conditions |
Cast | Change field data types |
Error Handling
{
"name": "my-connector",
"config": {
"errors.tolerance": "all",
"errors.deadletterqueue.topic.name": "dlq-my-connector",
"errors.deadletterqueue.topic.replication.factor": 3,
"errors.deadletterqueue.context.headers.enable": true,
"errors.retry.timeout": "60000",
"errors.retry.delay.max.ms": "30000",
"errors.log.enable": true,
"errors.log.include.messages": true
}
}
Troubleshooting
| Issue | Solution |
|---|---|
| Connector in FAILED state | Check status endpoint; review Connect worker logs |
| Tasks rebalancing constantly | Check for unhealthy workers; verify Kafka connectivity |
| No data flowing | Verify source has data; check topic creation and permissions |
| Schema errors | Ensure converter matches data format; check Schema Registry |
| Connector lag growing | Increase tasks.max; add more Connect workers |
| Out of memory | Increase JVM heap; reduce batch sizes; check for large messages |
| Plugin not found | Verify plugin path in worker config; restart Connect worker |
| DLQ filling up | Review DLQ messages for root cause; fix source data or transforms |