Confluent Schema Registry Cheat Sheet
Overview
Confluent Schema Registry is a serving layer for metadata that provides a RESTful interface for storing, retrieving, and managing Avro, Protobuf, and JSON schemas. It is tightly integrated with Apache Kafka and enables schema evolution while ensuring data compatibility between producers and consumers.
Schema Registry stores a versioned history of all schemas, enforces compatibility rules to prevent breaking changes, and provides serializers/deserializers for Kafka clients. This ensures that data written to Kafka always conforms to a defined schema, enabling reliable data pipelines and preventing schema drift in event-driven architectures.
Installation
Docker
docker run -d --name schema-registry \
-p 8081:8081 \
-e SCHEMA_REGISTRY_HOST_NAME=schema-registry \
-e SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS=kafka:9092 \
-e SCHEMA_REGISTRY_LISTENERS=http://0.0.0.0:8081 \
confluentinc/cp-schema-registry:7.6.0
Docker Compose
version: '3'
services:
schema-registry:
image: confluentinc/cp-schema-registry:7.6.0
ports:
- "8081:8081"
environment:
SCHEMA_REGISTRY_HOST_NAME: schema-registry
SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS: kafka:9092
SCHEMA_REGISTRY_LISTENERS: http://0.0.0.0:8081
SCHEMA_REGISTRY_SCHEMA_COMPATIBILITY_LEVEL: backward
SCHEMA_REGISTRY_AVRO_COMPATIBILITY_LEVEL: backward
depends_on:
- kafka
Configuration Properties
# schema-registry.properties
listeners=http://0.0.0.0:8081
kafkastore.bootstrap.servers=kafka1:9092,kafka2:9092,kafka3:9092
kafkastore.topic=_schemas
kafkastore.topic.replication.factor=3
schema.compatibility.level=backward
host.name=schema-registry
REST API
Schema Subjects
# List all subjects
curl http://localhost:8081/subjects | jq .
# List versions for a subject
curl http://localhost:8081/subjects/orders-value/versions | jq .
# Get latest schema
curl http://localhost:8081/subjects/orders-value/versions/latest | jq .
# Get specific version
curl http://localhost:8081/subjects/orders-value/versions/1 | jq .
# Get schema by global ID
curl http://localhost:8081/schemas/ids/1 | jq .
# Delete a subject (soft delete)
curl -X DELETE http://localhost:8081/subjects/orders-value | jq .
# Delete a subject (permanent)
curl -X DELETE http://localhost:8081/subjects/orders-value?permanent=true | jq .
# Delete specific version
curl -X DELETE http://localhost:8081/subjects/orders-value/versions/1 | jq .
Register Schemas
# Register Avro schema
curl -X POST http://localhost:8081/subjects/orders-value/versions \
-H "Content-Type: application/vnd.schemaregistry.v1+json" \
-d '{
"schemaType": "AVRO",
"schema": "{\"type\":\"record\",\"name\":\"Order\",\"namespace\":\"com.example\",\"fields\":[{\"name\":\"orderId\",\"type\":\"string\"},{\"name\":\"amount\",\"type\":\"double\"},{\"name\":\"customerId\",\"type\":\"string\"},{\"name\":\"timestamp\",\"type\":\"long\"}]}"
}'
# Register JSON Schema
curl -X POST http://localhost:8081/subjects/events-value/versions \
-H "Content-Type: application/vnd.schemaregistry.v1+json" \
-d '{
"schemaType": "JSON",
"schema": "{\"type\":\"object\",\"properties\":{\"eventType\":{\"type\":\"string\"},\"payload\":{\"type\":\"object\"},\"timestamp\":{\"type\":\"integer\"}},\"required\":[\"eventType\",\"timestamp\"]}"
}'
# Register Protobuf schema
curl -X POST http://localhost:8081/subjects/users-value/versions \
-H "Content-Type: application/vnd.schemaregistry.v1+json" \
-d '{
"schemaType": "PROTOBUF",
"schema": "syntax = \"proto3\";\npackage com.example;\n\nmessage User {\n string user_id = 1;\n string name = 2;\n string email = 3;\n int64 created_at = 4;\n}"
}'
Compatibility Testing
# Test compatibility of new schema with latest
curl -X POST http://localhost:8081/compatibility/subjects/orders-value/versions/latest \
-H "Content-Type: application/vnd.schemaregistry.v1+json" \
-d '{
"schemaType": "AVRO",
"schema": "{\"type\":\"record\",\"name\":\"Order\",\"fields\":[{\"name\":\"orderId\",\"type\":\"string\"},{\"name\":\"amount\",\"type\":\"double\"},{\"name\":\"customerId\",\"type\":\"string\"},{\"name\":\"timestamp\",\"type\":\"long\"},{\"name\":\"status\",\"type\":[\"null\",\"string\"],\"default\":null}]}"
}'
# Test against specific version
curl -X POST http://localhost:8081/compatibility/subjects/orders-value/versions/1 \
-H "Content-Type: application/vnd.schemaregistry.v1+json" \
-d @new-schema.json
Compatibility Modes
| Mode | Description | Safe Changes |
|---|---|---|
BACKWARD (default) | New schema can read old data | Add optional fields, remove fields |
BACKWARD_TRANSITIVE | New schema can read all old data | Same as BACKWARD, checked against all versions |
FORWARD | Old schema can read new data | Remove optional fields, add fields |
FORWARD_TRANSITIVE | Old schema can read all new data | Same as FORWARD, checked against all versions |
FULL | Both backward and forward compatible | Add/remove optional fields only |
FULL_TRANSITIVE | Full compatibility against all versions | Most restrictive |
NONE | No compatibility checking | Any change allowed |
Set Compatibility
# Set global compatibility level
curl -X PUT http://localhost:8081/config \
-H "Content-Type: application/vnd.schemaregistry.v1+json" \
-d '{"compatibility": "BACKWARD"}'
# Get global compatibility level
curl http://localhost:8081/config | jq .
# Set per-subject compatibility
curl -X PUT http://localhost:8081/config/orders-value \
-H "Content-Type: application/vnd.schemaregistry.v1+json" \
-d '{"compatibility": "FULL"}'
# Get per-subject compatibility
curl http://localhost:8081/config/orders-value | jq .
# Delete per-subject override (revert to global)
curl -X DELETE http://localhost:8081/config/orders-value
Producer/Consumer Integration
Java Producer with Avro
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "io.confluent.kafka.serializers.KafkaAvroSerializer");
props.put("schema.registry.url", "http://localhost:8081");
props.put("auto.register.schemas", true);
KafkaProducer<String, GenericRecord> producer = new KafkaProducer<>(props);
Schema schema = new Schema.Parser().parse(new File("order.avsc"));
GenericRecord order = new GenericData.Record(schema);
order.put("orderId", "123");
order.put("amount", 99.99);
order.put("customerId", "cust-1");
order.put("timestamp", System.currentTimeMillis());
producer.send(new ProducerRecord<>("orders", order.get("orderId").toString(), order));
Python Producer with Avro
from confluent_kafka import SerializingProducer
from confluent_kafka.schema_registry import SchemaRegistryClient
from confluent_kafka.schema_registry.avro import AvroSerializer
schema_registry = SchemaRegistryClient({'url': 'http://localhost:8081'})
schema_str = """
{
"type": "record",
"name": "Order",
"fields": [
{"name": "orderId", "type": "string"},
{"name": "amount", "type": "double"},
{"name": "customerId", "type": "string"}
]
}
"""
avro_serializer = AvroSerializer(schema_registry, schema_str)
producer = SerializingProducer({
'bootstrap.servers': 'localhost:9092',
'key.serializer': lambda k, ctx: k.encode('utf-8'),
'value.serializer': avro_serializer,
})
producer.produce(topic='orders', key='123',
value={'orderId': '123', 'amount': 99.99, 'customerId': 'cust-1'})
producer.flush()
JSON Schema Consumer
from confluent_kafka import DeserializingConsumer
from confluent_kafka.schema_registry.json_schema import JSONDeserializer
json_deserializer = JSONDeserializer(schema_str)
consumer = DeserializingConsumer({
'bootstrap.servers': 'localhost:9092',
'group.id': 'my-group',
'auto.offset.reset': 'earliest',
'key.deserializer': lambda k, ctx: k.decode('utf-8'),
'value.deserializer': json_deserializer,
})
consumer.subscribe(['events'])
while True:
msg = consumer.poll(1.0)
if msg is not None:
print(f"Key: {msg.key()}, Value: {msg.value()}")
Schema Evolution Examples
Backward Compatible (Add Optional Field)
{
"type": "record",
"name": "Order",
"fields": [
{"name": "orderId", "type": "string"},
{"name": "amount", "type": "double"},
{"name": "customerId", "type": "string"},
{"name": "status", "type": ["null", "string"], "default": null}
]
}
Forward Compatible (Remove Optional Field)
{
"type": "record",
"name": "Order",
"fields": [
{"name": "orderId", "type": "string"},
{"name": "amount", "type": "double"}
]
}
Advanced Usage
Schema References (Protobuf/JSON Schema)
# Register a referenced schema first
curl -X POST http://localhost:8081/subjects/address-value/versions \
-H "Content-Type: application/vnd.schemaregistry.v1+json" \
-d '{
"schemaType": "JSON",
"schema": "{\"type\":\"object\",\"properties\":{\"street\":{\"type\":\"string\"},\"city\":{\"type\":\"string\"}}}"
}'
# Register schema with reference
curl -X POST http://localhost:8081/subjects/customer-value/versions \
-H "Content-Type: application/vnd.schemaregistry.v1+json" \
-d '{
"schemaType": "JSON",
"schema": "{\"type\":\"object\",\"properties\":{\"name\":{\"type\":\"string\"},\"address\":{\"$ref\":\"address.json\"}}}",
"references": [
{"name": "address.json", "subject": "address-value", "version": 1}
]
}'
Mode Management
# Set mode (READWRITE, READONLY, IMPORT)
curl -X PUT http://localhost:8081/mode \
-H "Content-Type: application/json" \
-d '{"mode": "READWRITE"}'
# Get current mode
curl http://localhost:8081/mode | jq .
Troubleshooting
| Issue | Solution |
|---|---|
| Schema registration fails | Check compatibility mode; test with compatibility endpoint first |
| Serialization error in producer | Verify schema registry URL; check schema matches data |
| Deserialization error in consumer | Ensure consumer has access to schema registry; check schema ID |
| Compatibility check fails | Review allowed changes for current mode; consider NONE for breaking changes |
| Schema registry won’t start | Verify Kafka connectivity; check _schemas topic exists |
| High latency | Enable schema caching in clients; increase cache size |
| Subject not found | Check naming convention (topic-value / topic-key); list all subjects |
| Registry OOM | Increase JVM heap; reduce schema history retention |