Apache Beam Cheat Sheet
Overview
Apache Beam is an open-source unified programming model for defining both batch and streaming data-parallel processing pipelines. The key innovation is write-once portability: you define your pipeline using the Beam SDK (Python, Java, or Go), and then execute it on any supported distributed processing backend (runner) including Google Cloud Dataflow, Apache Flink, Apache Spark, and direct local runners.
Beam provides a rich set of transforms including ParDo for element-wise processing, GroupByKey for aggregation, windowing for streaming data, and composite transforms for reusable pipeline components. The model handles late data, event-time processing, watermarks, and exactly-once semantics. Beam is particularly powerful for organizations that want to avoid vendor lock-in or need to run the same pipeline logic across different execution environments.
Installation
Python SDK
# Install Apache Beam Python SDK
pip install apache-beam
# Install with specific runner support
pip install apache-beam[gcp] # Google Cloud Dataflow
pip install apache-beam[aws] # AWS runners
pip install apache-beam[interactive] # Interactive/notebook support
# Install with all extras
pip install apache-beam[gcp,aws,test,docs,interactive]
# Verify installation
python -c "import apache_beam; print(apache_beam.__version__)"
Java SDK
# Maven dependency
cat <<'EOF'
<dependency>
<groupId>org.apache.beam</groupId>
<artifactId>beam-sdks-java-core</artifactId>
<version>2.56.0</version>
</dependency>
<dependency>
<groupId>org.apache.beam</groupId>
<artifactId>beam-runners-direct-java</artifactId>
<version>2.56.0</version>
</dependency>
EOF
Core Concepts
Basic Pipeline
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
options = PipelineOptions([
'--runner=DirectRunner',
'--project=my-project',
'--temp_location=gs://my-bucket/temp',
])
with beam.Pipeline(options=options) as p:
(
p
| 'ReadCSV' >> beam.io.ReadFromText('input.csv', skip_header_lines=1)
| 'ParseLines' >> beam.Map(lambda line: line.split(','))
| 'FilterValid' >> beam.Filter(lambda row: len(row) >= 3)
| 'FormatOutput' >> beam.Map(lambda row: f"{row[0]},{row[1]},{row[2]}")
| 'WriteOutput' >> beam.io.WriteToText('output', file_name_suffix='.csv')
)
ParDo and DoFn
class ParseEventFn(beam.DoFn):
def setup(self):
"""Called once per worker initialization."""
self.parser = EventParser()
def process(self, element, timestamp=beam.DoFn.TimestampParam):
"""Process each element."""
try:
event = self.parser.parse(element)
yield beam.pvalue.TaggedOutput('valid', event)
except Exception as e:
yield beam.pvalue.TaggedOutput('errors', {
'raw': element,
'error': str(e),
})
def teardown(self):
"""Cleanup resources."""
self.parser.close()
# Use with multiple outputs
results = (
p
| 'Read' >> beam.io.ReadFromPubSub(topic='projects/proj/topics/events')
| 'Parse' >> beam.ParDo(ParseEventFn()).with_outputs('valid', 'errors')
)
valid_events = results.valid
error_events = results.errors
GroupByKey and Aggregations
# GroupByKey
(
p
| 'CreatePairs' >> beam.Map(lambda x: (x['category'], x['amount']))
| 'GroupByCategory' >> beam.GroupByKey()
| 'SumAmounts' >> beam.MapTuple(lambda key, values: (key, sum(values)))
)
# CombinePerKey with built-in combiners
(
p
| beam.Map(lambda x: (x['region'], x['sales']))
| beam.CombinePerKey(sum)
)
# CombineGlobally
(
p
| beam.Map(lambda x: x['amount'])
| beam.CombineGlobally(beam.combiners.MeanCombineFn())
)
# CoGroupByKey (join)
orders = p | 'ReadOrders' >> beam.Create([('c1', 100), ('c2', 200)])
customers = p | 'ReadCustomers' >> beam.Create([('c1', 'Alice'), ('c2', 'Bob')])
joined = (
{'orders': orders, 'customers': customers}
| beam.CoGroupByKey()
)
I/O Connectors
| Connector | Read | Write |
|---|---|---|
| Text files | beam.io.ReadFromText(path) | beam.io.WriteToText(path) |
| Avro | beam.io.ReadFromAvro(path) | beam.io.WriteToAvro(path, schema) |
| Parquet | beam.io.ReadFromParquet(path) | beam.io.WriteToParquet(path, schema) |
| BigQuery | beam.io.ReadFromBigQuery(query=sql) | beam.io.WriteToBigQuery(table) |
| Pub/Sub | beam.io.ReadFromPubSub(topic=t) | beam.io.WriteToPubSub(topic=t) |
| Kafka | beam.io.ReadFromKafka(config) | beam.io.WriteToKafka(config) |
| JDBC | beam.io.ReadFromJdbc(config) | beam.io.WriteToJdbc(config) |
BigQuery Example
(
p
| 'ReadBQ' >> beam.io.ReadFromBigQuery(
query='SELECT * FROM `project.dataset.table` WHERE date > "2024-01-01"',
use_standard_sql=True,
)
| 'Transform' >> beam.Map(process_row)
| 'WriteBQ' >> beam.io.WriteToBigQuery(
'project:dataset.output_table',
schema='name:STRING,amount:FLOAT,date:DATE',
write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND,
create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED,
)
)
Configuration
Pipeline Options
from apache_beam.options.pipeline_options import (
PipelineOptions, GoogleCloudOptions, StandardOptions, WorkerOptions
)
options = PipelineOptions()
# Standard options
standard = options.view_as(StandardOptions)
standard.runner = 'DataflowRunner'
standard.streaming = True
# Google Cloud options
gcp = options.view_as(GoogleCloudOptions)
gcp.project = 'my-project'
gcp.region = 'us-central1'
gcp.temp_location = 'gs://my-bucket/temp'
gcp.staging_location = 'gs://my-bucket/staging'
# Worker options
worker = options.view_as(WorkerOptions)
worker.machine_type = 'n1-standard-4'
worker.max_num_workers = 20
worker.autoscaling_algorithm = 'THROUGHPUT_BASED'
worker.disk_size_gb = 100
Running on Different Runners
# Direct runner (local testing)
python pipeline.py --runner DirectRunner
# Google Cloud Dataflow
python pipeline.py \
--runner DataflowRunner \
--project my-project \
--region us-central1 \
--temp_location gs://bucket/temp \
--staging_location gs://bucket/staging
# Apache Flink
python pipeline.py \
--runner FlinkRunner \
--flink_master localhost:8081
# Apache Spark
python pipeline.py \
--runner SparkRunner \
--spark_master_url spark://host:7077
Advanced Usage
Windowing for Streaming
from apache_beam import window
(
p
| 'ReadStream' >> beam.io.ReadFromPubSub(topic=topic)
| 'ParseJSON' >> beam.Map(json.loads)
| 'AddTimestamp' >> beam.Map(
lambda x: beam.window.TimestampedValue(x, x['event_time']))
| 'Window' >> beam.WindowInto(
window.SlidingWindows(size=300, period=60),
trigger=beam.trigger.AfterWatermark(
early=beam.trigger.AfterProcessingTime(30),
late=beam.trigger.AfterCount(1),
),
accumulation_mode=beam.trigger.AccumulationMode.ACCUMULATING,
allowed_lateness=3600,
)
| 'CountPerWindow' >> beam.CombineGlobally(beam.combiners.CountCombineFn())
.without_defaults()
)
Composite Transforms
class NormalizeAndValidate(beam.PTransform):
def __init__(self, schema):
self.schema = schema
def expand(self, pcoll):
return (
pcoll
| 'Normalize' >> beam.Map(self._normalize)
| 'Validate' >> beam.Filter(self._validate)
)
def _normalize(self, element):
return {k.lower(): v for k, v in element.items()}
def _validate(self, element):
return all(k in element for k in self.schema)
# Usage
p | 'Clean' >> NormalizeAndValidate(schema=['id', 'name', 'value'])
Side Inputs
# Use a PCollection as a side input
lookup_table = (
p
| 'ReadLookup' >> beam.io.ReadFromText('lookup.csv')
| 'ParseLookup' >> beam.Map(lambda l: (l.split(',')[0], l.split(',')[1]))
)
(
p
| 'ReadMain' >> beam.io.ReadFromText('main.csv')
| 'Enrich' >> beam.Map(
lambda row, lookup: enrich(row, dict(lookup)),
lookup=beam.pvalue.AsIter(lookup_table),
)
)
Troubleshooting
| Issue | Solution |
|---|---|
| Pipeline stuck with no output | Check for unbounded PCollections without windowing. Add beam.WindowInto() |
| Serialization errors | Ensure all DoFn attributes are picklable. Use setup() for non-serializable objects |
| OOM on workers | Increase machine type or use beam.Reshuffle() to redistribute data |
| Late data being dropped | Increase allowed_lateness in windowing config |
| Slow pipeline performance | Use beam.Reshuffle() after hot-key operations. Increase max_num_workers |
| BigQuery quota errors | Use STREAMING_INSERTS for low-latency or FILE_LOADS for batch. Add retry logic |
| Type hint errors | Add @beam.typehints.with_input_types() and @beam.typehints.with_output_types() |
| Dataflow job fails on startup | Check IAM permissions, network config, and staging bucket accessibility |
| Side input too large | Use AsDict for key-value lookups instead of AsList for large side inputs |
| Watermark not advancing | Check timestamp extraction. Ensure source is emitting watermark updates |