コンテンツにスキップ

Kafka Streams Cheat Sheet

Overview

Kafka Streams is a client library for building applications and microservices that process and transform data stored in Apache Kafka. It combines the simplicity of writing standard Java/Scala applications with the benefits of Kafka’s server-side cluster technology, enabling real-time stream processing without requiring a separate processing cluster.

Kafka Streams provides a dual API: the high-level Streams DSL for common transformations and the lower-level Processor API for custom processing logic. Key features include exactly-once processing semantics, fault-tolerant local state stores, windowed operations, stream-table joins, and interactive queries for exposing state store data via REST APIs.

Installation

Maven

<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-streams</artifactId>
    <version>3.7.0</version>
</dependency>

Gradle

implementation 'org.apache.kafka:kafka-streams:3.7.0'

Python (Faust Alternative)

# Python doesn't have official Kafka Streams, use Faust
pip install faust-streaming

Basic Application

import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.common.serialization.Serdes;

import java.util.Properties;

public class StreamApp {
    public static void main(String[] args) {
        Properties props = new Properties();
        props.put(StreamsConfig.APPLICATION_ID_CONFIG, "my-stream-app");
        props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
        props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());

        StreamsBuilder builder = new StreamsBuilder();

        KStream<String, String> source = builder.stream("input-topic");

        source.filter((key, value) -> value != null && !value.isEmpty())
              .mapValues(value -> value.toUpperCase())
              .to("output-topic");

        KafkaStreams streams = new KafkaStreams(builder.build(), props);

        // Add shutdown hook
        Runtime.getRuntime().addShutdownHook(new Thread(streams::close));

        streams.start();
    }
}

Stream DSL Operations

Stateless Transformations

KStream<String, String> stream = builder.stream("input");

// Filter
stream.filter((key, value) -> value.contains("important"))
      .to("filtered");

// Map key and value
stream.map((key, value) -> KeyValue.pair(value.substring(0, 3), value))
      .to("mapped");

// Map values only
stream.mapValues(value -> value.toUpperCase())
      .to("uppercased");

// FlatMap (one to many)
stream.flatMapValues(value -> Arrays.asList(value.split("\\s+")))
      .to("words");

// Branch (split stream)
Map<String, KStream<String, String>> branches = stream.split()
    .branch((key, value) -> value.startsWith("A"), Branched.as("a-branch"))
    .branch((key, value) -> value.startsWith("B"), Branched.as("b-branch"))
    .defaultBranch(Branched.as("other"));

branches.get("a-branch").to("a-topic");
branches.get("b-branch").to("b-topic");

// Merge streams
KStream<String, String> merged = stream1.merge(stream2);

// SelectKey
stream.selectKey((key, value) -> value.split(",")[0])
      .to("rekeyed");

// Peek (side effects, no modification)
stream.peek((key, value) -> System.out.println("Processing: " + key))
      .to("output");

Stateful Transformations

// Count by key
KTable<String, Long> counts = stream
    .groupByKey()
    .count(Materialized.as("count-store"));

// Aggregate
KTable<String, Double> totals = stream
    .groupByKey()
    .aggregate(
        () -> 0.0,  // initializer
        (key, value, aggregate) -> aggregate + Double.parseDouble(value),
        Materialized.<String, Double, KeyValueStore<Bytes, byte[]>>as("totals-store")
            .withValueSerde(Serdes.Double())
    );

// Reduce
KTable<String, String> latest = stream
    .groupByKey()
    .reduce((oldValue, newValue) -> newValue,
            Materialized.as("latest-store"));

KTable Operations

// Read a topic as a table (changelog semantics)
KTable<String, String> users = builder.table("users-topic",
    Materialized.as("users-store"));

// Table lookups
KTable<String, String> filtered = users.filter((key, value) ->
    value != null && value.contains("active"));

// Map values
KTable<String, Integer> lengths = users.mapValues(value -> value.length());

// Convert table to stream
KStream<String, String> userStream = users.toStream();

// GlobalKTable (replicated on all instances)
GlobalKTable<String, String> config = builder.globalTable("config-topic",
    Materialized.as("config-store"));

Windowing

import org.apache.kafka.streams.kstream.TimeWindows;
import org.apache.kafka.streams.kstream.SessionWindows;
import org.apache.kafka.streams.kstream.SlidingWindows;

// Tumbling window (fixed, non-overlapping)
KTable<Windowed<String>, Long> tumblingCounts = stream
    .groupByKey()
    .windowedBy(TimeWindows.ofSizeWithNoGrace(Duration.ofMinutes(5)))
    .count(Materialized.as("tumbling-counts"));

// Hopping window (fixed, overlapping)
KTable<Windowed<String>, Long> hoppingCounts = stream
    .groupByKey()
    .windowedBy(TimeWindows.ofSizeAndGrace(
        Duration.ofMinutes(5),
        Duration.ofMinutes(1))
        .advanceBy(Duration.ofMinutes(1)))
    .count(Materialized.as("hopping-counts"));

// Session window (activity-based)
KTable<Windowed<String>, Long> sessionCounts = stream
    .groupByKey()
    .windowedBy(SessionWindows.ofInactivityGapWithNoGrace(Duration.ofMinutes(30)))
    .count(Materialized.as("session-counts"));

// Sliding window
KTable<Windowed<String>, Long> slidingCounts = stream
    .groupByKey()
    .windowedBy(SlidingWindows.ofTimeDifferenceWithNoGrace(Duration.ofMinutes(5)))
    .count(Materialized.as("sliding-counts"));

Joins

// Stream-Stream join (windowed)
KStream<String, String> orders = builder.stream("orders");
KStream<String, String> payments = builder.stream("payments");

KStream<String, String> joined = orders.join(
    payments,
    (orderValue, paymentValue) -> orderValue + " | " + paymentValue,
    JoinWindows.ofTimeDifferenceWithNoGrace(Duration.ofMinutes(5)),
    StreamJoined.with(Serdes.String(), Serdes.String(), Serdes.String())
);

// Stream-Table join (enrichment)
KTable<String, String> customers = builder.table("customers");

KStream<String, String> enriched = orders.join(
    customers,
    (orderValue, customerValue) -> orderValue + " customer=" + customerValue
);

// Stream-GlobalKTable join (broadcast join)
GlobalKTable<String, String> products = builder.globalTable("products");

KStream<String, String> enrichedOrders = orders.join(
    products,
    (key, value) -> extractProductId(value),  // key mapper
    (orderValue, productValue) -> orderValue + " product=" + productValue
);

// Left join (keep records without match)
KStream<String, String> leftJoined = orders.leftJoin(
    customers,
    (orderValue, customerValue) ->
        orderValue + " customer=" + (customerValue != null ? customerValue : "unknown")
);

Configuration

Properties props = new Properties();

// Required
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "my-app");
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka:9092");

// Serialization
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.StringSerde.class);
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.StringSerde.class);

// Processing guarantee
props.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, StreamsConfig.EXACTLY_ONCE_V2);

// Performance tuning
props.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 4);
props.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 1000);
props.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 10 * 1024 * 1024L);

// State store
props.put(StreamsConfig.STATE_DIR_CONFIG, "/var/kafka-streams");

// Replication for internal topics
props.put(StreamsConfig.REPLICATION_FACTOR_CONFIG, 3);

// Error handling
props.put(StreamsConfig.DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG,
    LogAndContinueExceptionHandler.class);
props.put(StreamsConfig.DEFAULT_PRODUCTION_EXCEPTION_HANDLER_CLASS_CONFIG,
    DefaultProductionExceptionHandler.class);

Interactive Queries

// Access local state stores for serving queries
ReadOnlyKeyValueStore<String, Long> store =
    streams.store(
        StoreQueryParameters.fromNameAndType("count-store", QueryableStoreTypes.keyValueStore())
    );

// Get a value
Long count = store.get("myKey");

// Iterate all entries
try (KeyValueIterator<String, Long> iter = store.all()) {
    while (iter.hasNext()) {
        KeyValue<String, Long> entry = iter.next();
        System.out.println(entry.key + " = " + entry.value);
    }
}

// Range query
try (KeyValueIterator<String, Long> range = store.range("a", "z")) {
    // process range
}

// Windowed store query
ReadOnlyWindowStore<String, Long> windowStore =
    streams.store(
        StoreQueryParameters.fromNameAndType("windowed-store", QueryableStoreTypes.windowStore())
    );

WindowStoreIterator<Long> windowIter = windowStore.fetch(
    "myKey",
    Instant.now().minus(Duration.ofHours(1)),
    Instant.now()
);

Advanced Usage

Custom State Store

StoreBuilder<KeyValueStore<String, String>> storeBuilder =
    Stores.keyValueStoreBuilder(
        Stores.persistentKeyValueStore("my-store"),
        Serdes.String(),
        Serdes.String()
    ).withCachingEnabled()
     .withLoggingEnabled(Collections.emptyMap());

builder.addStateStore(storeBuilder);

Processor API

builder.stream("input")
    .process(() -> new Processor<String, String, String, String>() {
        private ProcessorContext<String, String> context;
        private KeyValueStore<String, String> store;

        @Override
        public void init(ProcessorContext<String, String> context) {
            this.context = context;
            this.store = context.getStateStore("my-store");
        }

        @Override
        public void process(Record<String, String> record) {
            store.put(record.key(), record.value());
            context.forward(record.withValue(record.value().toUpperCase()));
        }
    }, "my-store");

Monitoring

// Register metrics reporter
props.put(StreamsConfig.METRICS_RECORDING_LEVEL_CONFIG, "DEBUG");

// Key JMX metrics:
// kafka.streams:type=stream-metrics — thread-level metrics
// kafka.streams:type=stream-task-metrics — per-task metrics
// kafka.streams:type=stream-processor-node-metrics — per-node metrics
// kafka.streams:type=stream-state-metrics — state store metrics

Troubleshooting

IssueSolution
Rebalancing frequentlyIncrease session.timeout.ms; check for slow processing
State store corruptionDelete state directory and restart; data replays from changelog
High consumer lagIncrease num.stream.threads; scale out instances
Deserialization errorsConfigure error handler; check serde matches data format
Out of memoryReduce cache.max.bytes.buffering; increase JVM heap
Exactly-once failuresEnsure Kafka brokers support transactions; use EXACTLY_ONCE_V2
Slow joinsEnsure co-partitioning; check join window size
Topology errorsVerify all input topics exist; check stream/table source topics