Salta ai contenuti

Confluent Schema Registry Cheat Sheet

Overview

Confluent Schema Registry is a serving layer for metadata that provides a RESTful interface for storing, retrieving, and managing Avro, Protobuf, and JSON schemas. It is tightly integrated with Apache Kafka and enables schema evolution while ensuring data compatibility between producers and consumers.

Schema Registry stores a versioned history of all schemas, enforces compatibility rules to prevent breaking changes, and provides serializers/deserializers for Kafka clients. This ensures that data written to Kafka always conforms to a defined schema, enabling reliable data pipelines and preventing schema drift in event-driven architectures.

Installation

Docker

docker run -d --name schema-registry \
  -p 8081:8081 \
  -e SCHEMA_REGISTRY_HOST_NAME=schema-registry \
  -e SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS=kafka:9092 \
  -e SCHEMA_REGISTRY_LISTENERS=http://0.0.0.0:8081 \
  confluentinc/cp-schema-registry:7.6.0

Docker Compose

version: '3'
services:
  schema-registry:
    image: confluentinc/cp-schema-registry:7.6.0
    ports:
      - "8081:8081"
    environment:
      SCHEMA_REGISTRY_HOST_NAME: schema-registry
      SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS: kafka:9092
      SCHEMA_REGISTRY_LISTENERS: http://0.0.0.0:8081
      SCHEMA_REGISTRY_SCHEMA_COMPATIBILITY_LEVEL: backward
      SCHEMA_REGISTRY_AVRO_COMPATIBILITY_LEVEL: backward
    depends_on:
      - kafka

Configuration Properties

# schema-registry.properties
listeners=http://0.0.0.0:8081
kafkastore.bootstrap.servers=kafka1:9092,kafka2:9092,kafka3:9092
kafkastore.topic=_schemas
kafkastore.topic.replication.factor=3
schema.compatibility.level=backward
host.name=schema-registry

REST API

Schema Subjects

# List all subjects
curl http://localhost:8081/subjects | jq .

# List versions for a subject
curl http://localhost:8081/subjects/orders-value/versions | jq .

# Get latest schema
curl http://localhost:8081/subjects/orders-value/versions/latest | jq .

# Get specific version
curl http://localhost:8081/subjects/orders-value/versions/1 | jq .

# Get schema by global ID
curl http://localhost:8081/schemas/ids/1 | jq .

# Delete a subject (soft delete)
curl -X DELETE http://localhost:8081/subjects/orders-value | jq .

# Delete a subject (permanent)
curl -X DELETE http://localhost:8081/subjects/orders-value?permanent=true | jq .

# Delete specific version
curl -X DELETE http://localhost:8081/subjects/orders-value/versions/1 | jq .

Register Schemas

# Register Avro schema
curl -X POST http://localhost:8081/subjects/orders-value/versions \
  -H "Content-Type: application/vnd.schemaregistry.v1+json" \
  -d '{
    "schemaType": "AVRO",
    "schema": "{\"type\":\"record\",\"name\":\"Order\",\"namespace\":\"com.example\",\"fields\":[{\"name\":\"orderId\",\"type\":\"string\"},{\"name\":\"amount\",\"type\":\"double\"},{\"name\":\"customerId\",\"type\":\"string\"},{\"name\":\"timestamp\",\"type\":\"long\"}]}"
  }'

# Register JSON Schema
curl -X POST http://localhost:8081/subjects/events-value/versions \
  -H "Content-Type: application/vnd.schemaregistry.v1+json" \
  -d '{
    "schemaType": "JSON",
    "schema": "{\"type\":\"object\",\"properties\":{\"eventType\":{\"type\":\"string\"},\"payload\":{\"type\":\"object\"},\"timestamp\":{\"type\":\"integer\"}},\"required\":[\"eventType\",\"timestamp\"]}"
  }'

# Register Protobuf schema
curl -X POST http://localhost:8081/subjects/users-value/versions \
  -H "Content-Type: application/vnd.schemaregistry.v1+json" \
  -d '{
    "schemaType": "PROTOBUF",
    "schema": "syntax = \"proto3\";\npackage com.example;\n\nmessage User {\n  string user_id = 1;\n  string name = 2;\n  string email = 3;\n  int64 created_at = 4;\n}"
  }'

Compatibility Testing

# Test compatibility of new schema with latest
curl -X POST http://localhost:8081/compatibility/subjects/orders-value/versions/latest \
  -H "Content-Type: application/vnd.schemaregistry.v1+json" \
  -d '{
    "schemaType": "AVRO",
    "schema": "{\"type\":\"record\",\"name\":\"Order\",\"fields\":[{\"name\":\"orderId\",\"type\":\"string\"},{\"name\":\"amount\",\"type\":\"double\"},{\"name\":\"customerId\",\"type\":\"string\"},{\"name\":\"timestamp\",\"type\":\"long\"},{\"name\":\"status\",\"type\":[\"null\",\"string\"],\"default\":null}]}"
  }'

# Test against specific version
curl -X POST http://localhost:8081/compatibility/subjects/orders-value/versions/1 \
  -H "Content-Type: application/vnd.schemaregistry.v1+json" \
  -d @new-schema.json

Compatibility Modes

ModeDescriptionSafe Changes
BACKWARD (default)New schema can read old dataAdd optional fields, remove fields
BACKWARD_TRANSITIVENew schema can read all old dataSame as BACKWARD, checked against all versions
FORWARDOld schema can read new dataRemove optional fields, add fields
FORWARD_TRANSITIVEOld schema can read all new dataSame as FORWARD, checked against all versions
FULLBoth backward and forward compatibleAdd/remove optional fields only
FULL_TRANSITIVEFull compatibility against all versionsMost restrictive
NONENo compatibility checkingAny change allowed

Set Compatibility

# Set global compatibility level
curl -X PUT http://localhost:8081/config \
  -H "Content-Type: application/vnd.schemaregistry.v1+json" \
  -d '{"compatibility": "BACKWARD"}'

# Get global compatibility level
curl http://localhost:8081/config | jq .

# Set per-subject compatibility
curl -X PUT http://localhost:8081/config/orders-value \
  -H "Content-Type: application/vnd.schemaregistry.v1+json" \
  -d '{"compatibility": "FULL"}'

# Get per-subject compatibility
curl http://localhost:8081/config/orders-value | jq .

# Delete per-subject override (revert to global)
curl -X DELETE http://localhost:8081/config/orders-value

Producer/Consumer Integration

Java Producer with Avro

Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "io.confluent.kafka.serializers.KafkaAvroSerializer");
props.put("schema.registry.url", "http://localhost:8081");
props.put("auto.register.schemas", true);

KafkaProducer<String, GenericRecord> producer = new KafkaProducer<>(props);

Schema schema = new Schema.Parser().parse(new File("order.avsc"));
GenericRecord order = new GenericData.Record(schema);
order.put("orderId", "123");
order.put("amount", 99.99);
order.put("customerId", "cust-1");
order.put("timestamp", System.currentTimeMillis());

producer.send(new ProducerRecord<>("orders", order.get("orderId").toString(), order));

Python Producer with Avro

from confluent_kafka import SerializingProducer
from confluent_kafka.schema_registry import SchemaRegistryClient
from confluent_kafka.schema_registry.avro import AvroSerializer

schema_registry = SchemaRegistryClient({'url': 'http://localhost:8081'})

schema_str = """
{
  "type": "record",
  "name": "Order",
  "fields": [
    {"name": "orderId", "type": "string"},
    {"name": "amount", "type": "double"},
    {"name": "customerId", "type": "string"}
  ]
}
"""

avro_serializer = AvroSerializer(schema_registry, schema_str)

producer = SerializingProducer({
    'bootstrap.servers': 'localhost:9092',
    'key.serializer': lambda k, ctx: k.encode('utf-8'),
    'value.serializer': avro_serializer,
})

producer.produce(topic='orders', key='123',
                 value={'orderId': '123', 'amount': 99.99, 'customerId': 'cust-1'})
producer.flush()

JSON Schema Consumer

from confluent_kafka import DeserializingConsumer
from confluent_kafka.schema_registry.json_schema import JSONDeserializer

json_deserializer = JSONDeserializer(schema_str)

consumer = DeserializingConsumer({
    'bootstrap.servers': 'localhost:9092',
    'group.id': 'my-group',
    'auto.offset.reset': 'earliest',
    'key.deserializer': lambda k, ctx: k.decode('utf-8'),
    'value.deserializer': json_deserializer,
})

consumer.subscribe(['events'])
while True:
    msg = consumer.poll(1.0)
    if msg is not None:
        print(f"Key: {msg.key()}, Value: {msg.value()}")

Schema Evolution Examples

Backward Compatible (Add Optional Field)

{
  "type": "record",
  "name": "Order",
  "fields": [
    {"name": "orderId", "type": "string"},
    {"name": "amount", "type": "double"},
    {"name": "customerId", "type": "string"},
    {"name": "status", "type": ["null", "string"], "default": null}
  ]
}

Forward Compatible (Remove Optional Field)

{
  "type": "record",
  "name": "Order",
  "fields": [
    {"name": "orderId", "type": "string"},
    {"name": "amount", "type": "double"}
  ]
}

Advanced Usage

Schema References (Protobuf/JSON Schema)

# Register a referenced schema first
curl -X POST http://localhost:8081/subjects/address-value/versions \
  -H "Content-Type: application/vnd.schemaregistry.v1+json" \
  -d '{
    "schemaType": "JSON",
    "schema": "{\"type\":\"object\",\"properties\":{\"street\":{\"type\":\"string\"},\"city\":{\"type\":\"string\"}}}"
  }'

# Register schema with reference
curl -X POST http://localhost:8081/subjects/customer-value/versions \
  -H "Content-Type: application/vnd.schemaregistry.v1+json" \
  -d '{
    "schemaType": "JSON",
    "schema": "{\"type\":\"object\",\"properties\":{\"name\":{\"type\":\"string\"},\"address\":{\"$ref\":\"address.json\"}}}",
    "references": [
      {"name": "address.json", "subject": "address-value", "version": 1}
    ]
  }'

Mode Management

# Set mode (READWRITE, READONLY, IMPORT)
curl -X PUT http://localhost:8081/mode \
  -H "Content-Type: application/json" \
  -d '{"mode": "READWRITE"}'

# Get current mode
curl http://localhost:8081/mode | jq .

Troubleshooting

IssueSolution
Schema registration failsCheck compatibility mode; test with compatibility endpoint first
Serialization error in producerVerify schema registry URL; check schema matches data
Deserialization error in consumerEnsure consumer has access to schema registry; check schema ID
Compatibility check failsReview allowed changes for current mode; consider NONE for breaking changes
Schema registry won’t startVerify Kafka connectivity; check _schemas topic exists
High latencyEnable schema caching in clients; increase cache size
Subject not foundCheck naming convention (topic-value / topic-key); list all subjects
Registry OOMIncrease JVM heap; reduce schema history retention