Kafka Streams Cheat Sheet
Overview
Kafka Streams is a client library for building applications and microservices that process and transform data stored in Apache Kafka. It combines the simplicity of writing standard Java/Scala applications with the benefits of Kafka’s server-side cluster technology, enabling real-time stream processing without requiring a separate processing cluster.
Kafka Streams provides a dual API: the high-level Streams DSL for common transformations and the lower-level Processor API for custom processing logic. Key features include exactly-once processing semantics, fault-tolerant local state stores, windowed operations, stream-table joins, and interactive queries for exposing state store data via REST APIs.
Installation
Maven
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-streams</artifactId>
<version>3.7.0</version>
</dependency>
Gradle
implementation 'org.apache.kafka:kafka-streams:3.7.0'
Python (Faust Alternative)
# Python doesn't have official Kafka Streams, use Faust
pip install faust-streaming
Basic Application
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.common.serialization.Serdes;
import java.util.Properties;
public class StreamApp {
public static void main(String[] args) {
Properties props = new Properties();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "my-stream-app");
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
StreamsBuilder builder = new StreamsBuilder();
KStream<String, String> source = builder.stream("input-topic");
source.filter((key, value) -> value != null && !value.isEmpty())
.mapValues(value -> value.toUpperCase())
.to("output-topic");
KafkaStreams streams = new KafkaStreams(builder.build(), props);
// Add shutdown hook
Runtime.getRuntime().addShutdownHook(new Thread(streams::close));
streams.start();
}
}
Stream DSL Operations
Stateless Transformations
KStream<String, String> stream = builder.stream("input");
// Filter
stream.filter((key, value) -> value.contains("important"))
.to("filtered");
// Map key and value
stream.map((key, value) -> KeyValue.pair(value.substring(0, 3), value))
.to("mapped");
// Map values only
stream.mapValues(value -> value.toUpperCase())
.to("uppercased");
// FlatMap (one to many)
stream.flatMapValues(value -> Arrays.asList(value.split("\\s+")))
.to("words");
// Branch (split stream)
Map<String, KStream<String, String>> branches = stream.split()
.branch((key, value) -> value.startsWith("A"), Branched.as("a-branch"))
.branch((key, value) -> value.startsWith("B"), Branched.as("b-branch"))
.defaultBranch(Branched.as("other"));
branches.get("a-branch").to("a-topic");
branches.get("b-branch").to("b-topic");
// Merge streams
KStream<String, String> merged = stream1.merge(stream2);
// SelectKey
stream.selectKey((key, value) -> value.split(",")[0])
.to("rekeyed");
// Peek (side effects, no modification)
stream.peek((key, value) -> System.out.println("Processing: " + key))
.to("output");
Stateful Transformations
// Count by key
KTable<String, Long> counts = stream
.groupByKey()
.count(Materialized.as("count-store"));
// Aggregate
KTable<String, Double> totals = stream
.groupByKey()
.aggregate(
() -> 0.0, // initializer
(key, value, aggregate) -> aggregate + Double.parseDouble(value),
Materialized.<String, Double, KeyValueStore<Bytes, byte[]>>as("totals-store")
.withValueSerde(Serdes.Double())
);
// Reduce
KTable<String, String> latest = stream
.groupByKey()
.reduce((oldValue, newValue) -> newValue,
Materialized.as("latest-store"));
KTable Operations
// Read a topic as a table (changelog semantics)
KTable<String, String> users = builder.table("users-topic",
Materialized.as("users-store"));
// Table lookups
KTable<String, String> filtered = users.filter((key, value) ->
value != null && value.contains("active"));
// Map values
KTable<String, Integer> lengths = users.mapValues(value -> value.length());
// Convert table to stream
KStream<String, String> userStream = users.toStream();
// GlobalKTable (replicated on all instances)
GlobalKTable<String, String> config = builder.globalTable("config-topic",
Materialized.as("config-store"));
Windowing
import org.apache.kafka.streams.kstream.TimeWindows;
import org.apache.kafka.streams.kstream.SessionWindows;
import org.apache.kafka.streams.kstream.SlidingWindows;
// Tumbling window (fixed, non-overlapping)
KTable<Windowed<String>, Long> tumblingCounts = stream
.groupByKey()
.windowedBy(TimeWindows.ofSizeWithNoGrace(Duration.ofMinutes(5)))
.count(Materialized.as("tumbling-counts"));
// Hopping window (fixed, overlapping)
KTable<Windowed<String>, Long> hoppingCounts = stream
.groupByKey()
.windowedBy(TimeWindows.ofSizeAndGrace(
Duration.ofMinutes(5),
Duration.ofMinutes(1))
.advanceBy(Duration.ofMinutes(1)))
.count(Materialized.as("hopping-counts"));
// Session window (activity-based)
KTable<Windowed<String>, Long> sessionCounts = stream
.groupByKey()
.windowedBy(SessionWindows.ofInactivityGapWithNoGrace(Duration.ofMinutes(30)))
.count(Materialized.as("session-counts"));
// Sliding window
KTable<Windowed<String>, Long> slidingCounts = stream
.groupByKey()
.windowedBy(SlidingWindows.ofTimeDifferenceWithNoGrace(Duration.ofMinutes(5)))
.count(Materialized.as("sliding-counts"));
Joins
// Stream-Stream join (windowed)
KStream<String, String> orders = builder.stream("orders");
KStream<String, String> payments = builder.stream("payments");
KStream<String, String> joined = orders.join(
payments,
(orderValue, paymentValue) -> orderValue + " | " + paymentValue,
JoinWindows.ofTimeDifferenceWithNoGrace(Duration.ofMinutes(5)),
StreamJoined.with(Serdes.String(), Serdes.String(), Serdes.String())
);
// Stream-Table join (enrichment)
KTable<String, String> customers = builder.table("customers");
KStream<String, String> enriched = orders.join(
customers,
(orderValue, customerValue) -> orderValue + " customer=" + customerValue
);
// Stream-GlobalKTable join (broadcast join)
GlobalKTable<String, String> products = builder.globalTable("products");
KStream<String, String> enrichedOrders = orders.join(
products,
(key, value) -> extractProductId(value), // key mapper
(orderValue, productValue) -> orderValue + " product=" + productValue
);
// Left join (keep records without match)
KStream<String, String> leftJoined = orders.leftJoin(
customers,
(orderValue, customerValue) ->
orderValue + " customer=" + (customerValue != null ? customerValue : "unknown")
);
Configuration
Properties props = new Properties();
// Required
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "my-app");
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka:9092");
// Serialization
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.StringSerde.class);
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.StringSerde.class);
// Processing guarantee
props.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, StreamsConfig.EXACTLY_ONCE_V2);
// Performance tuning
props.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 4);
props.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 1000);
props.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 10 * 1024 * 1024L);
// State store
props.put(StreamsConfig.STATE_DIR_CONFIG, "/var/kafka-streams");
// Replication for internal topics
props.put(StreamsConfig.REPLICATION_FACTOR_CONFIG, 3);
// Error handling
props.put(StreamsConfig.DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG,
LogAndContinueExceptionHandler.class);
props.put(StreamsConfig.DEFAULT_PRODUCTION_EXCEPTION_HANDLER_CLASS_CONFIG,
DefaultProductionExceptionHandler.class);
Interactive Queries
// Access local state stores for serving queries
ReadOnlyKeyValueStore<String, Long> store =
streams.store(
StoreQueryParameters.fromNameAndType("count-store", QueryableStoreTypes.keyValueStore())
);
// Get a value
Long count = store.get("myKey");
// Iterate all entries
try (KeyValueIterator<String, Long> iter = store.all()) {
while (iter.hasNext()) {
KeyValue<String, Long> entry = iter.next();
System.out.println(entry.key + " = " + entry.value);
}
}
// Range query
try (KeyValueIterator<String, Long> range = store.range("a", "z")) {
// process range
}
// Windowed store query
ReadOnlyWindowStore<String, Long> windowStore =
streams.store(
StoreQueryParameters.fromNameAndType("windowed-store", QueryableStoreTypes.windowStore())
);
WindowStoreIterator<Long> windowIter = windowStore.fetch(
"myKey",
Instant.now().minus(Duration.ofHours(1)),
Instant.now()
);
Advanced Usage
Custom State Store
StoreBuilder<KeyValueStore<String, String>> storeBuilder =
Stores.keyValueStoreBuilder(
Stores.persistentKeyValueStore("my-store"),
Serdes.String(),
Serdes.String()
).withCachingEnabled()
.withLoggingEnabled(Collections.emptyMap());
builder.addStateStore(storeBuilder);
Processor API
builder.stream("input")
.process(() -> new Processor<String, String, String, String>() {
private ProcessorContext<String, String> context;
private KeyValueStore<String, String> store;
@Override
public void init(ProcessorContext<String, String> context) {
this.context = context;
this.store = context.getStateStore("my-store");
}
@Override
public void process(Record<String, String> record) {
store.put(record.key(), record.value());
context.forward(record.withValue(record.value().toUpperCase()));
}
}, "my-store");
Monitoring
// Register metrics reporter
props.put(StreamsConfig.METRICS_RECORDING_LEVEL_CONFIG, "DEBUG");
// Key JMX metrics:
// kafka.streams:type=stream-metrics — thread-level metrics
// kafka.streams:type=stream-task-metrics — per-task metrics
// kafka.streams:type=stream-processor-node-metrics — per-node metrics
// kafka.streams:type=stream-state-metrics — state store metrics
Troubleshooting
| Issue | Solution |
|---|---|
| Rebalancing frequently | Increase session.timeout.ms; check for slow processing |
| State store corruption | Delete state directory and restart; data replays from changelog |
| High consumer lag | Increase num.stream.threads; scale out instances |
| Deserialization errors | Configure error handler; check serde matches data format |
| Out of memory | Reduce cache.max.bytes.buffering; increase JVM heap |
| Exactly-once failures | Ensure Kafka brokers support transactions; use EXACTLY_ONCE_V2 |
| Slow joins | Ensure co-partitioning; check join window size |
| Topology errors | Verify all input topics exist; check stream/table source topics |