Aller au contenu

Apache Beam Cheat Sheet

Overview

Apache Beam is an open-source unified programming model for defining both batch and streaming data-parallel processing pipelines. The key innovation is write-once portability: you define your pipeline using the Beam SDK (Python, Java, or Go), and then execute it on any supported distributed processing backend (runner) including Google Cloud Dataflow, Apache Flink, Apache Spark, and direct local runners.

Beam provides a rich set of transforms including ParDo for element-wise processing, GroupByKey for aggregation, windowing for streaming data, and composite transforms for reusable pipeline components. The model handles late data, event-time processing, watermarks, and exactly-once semantics. Beam is particularly powerful for organizations that want to avoid vendor lock-in or need to run the same pipeline logic across different execution environments.

Installation

Python SDK

# Install Apache Beam Python SDK
pip install apache-beam

# Install with specific runner support
pip install apache-beam[gcp]          # Google Cloud Dataflow
pip install apache-beam[aws]          # AWS runners
pip install apache-beam[interactive]  # Interactive/notebook support

# Install with all extras
pip install apache-beam[gcp,aws,test,docs,interactive]

# Verify installation
python -c "import apache_beam; print(apache_beam.__version__)"

Java SDK

# Maven dependency
cat <<'EOF'
<dependency>
    <groupId>org.apache.beam</groupId>
    <artifactId>beam-sdks-java-core</artifactId>
    <version>2.56.0</version>
</dependency>
<dependency>
    <groupId>org.apache.beam</groupId>
    <artifactId>beam-runners-direct-java</artifactId>
    <version>2.56.0</version>
</dependency>
EOF

Core Concepts

Basic Pipeline

import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions

options = PipelineOptions([
    '--runner=DirectRunner',
    '--project=my-project',
    '--temp_location=gs://my-bucket/temp',
])

with beam.Pipeline(options=options) as p:
    (
        p
        | 'ReadCSV' >> beam.io.ReadFromText('input.csv', skip_header_lines=1)
        | 'ParseLines' >> beam.Map(lambda line: line.split(','))
        | 'FilterValid' >> beam.Filter(lambda row: len(row) >= 3)
        | 'FormatOutput' >> beam.Map(lambda row: f"{row[0]},{row[1]},{row[2]}")
        | 'WriteOutput' >> beam.io.WriteToText('output', file_name_suffix='.csv')
    )

ParDo and DoFn

class ParseEventFn(beam.DoFn):
    def setup(self):
        """Called once per worker initialization."""
        self.parser = EventParser()

    def process(self, element, timestamp=beam.DoFn.TimestampParam):
        """Process each element."""
        try:
            event = self.parser.parse(element)
            yield beam.pvalue.TaggedOutput('valid', event)
        except Exception as e:
            yield beam.pvalue.TaggedOutput('errors', {
                'raw': element,
                'error': str(e),
            })

    def teardown(self):
        """Cleanup resources."""
        self.parser.close()

# Use with multiple outputs
results = (
    p
    | 'Read' >> beam.io.ReadFromPubSub(topic='projects/proj/topics/events')
    | 'Parse' >> beam.ParDo(ParseEventFn()).with_outputs('valid', 'errors')
)

valid_events = results.valid
error_events = results.errors

GroupByKey and Aggregations

# GroupByKey
(
    p
    | 'CreatePairs' >> beam.Map(lambda x: (x['category'], x['amount']))
    | 'GroupByCategory' >> beam.GroupByKey()
    | 'SumAmounts' >> beam.MapTuple(lambda key, values: (key, sum(values)))
)

# CombinePerKey with built-in combiners
(
    p
    | beam.Map(lambda x: (x['region'], x['sales']))
    | beam.CombinePerKey(sum)
)

# CombineGlobally
(
    p
    | beam.Map(lambda x: x['amount'])
    | beam.CombineGlobally(beam.combiners.MeanCombineFn())
)

# CoGroupByKey (join)
orders = p | 'ReadOrders' >> beam.Create([('c1', 100), ('c2', 200)])
customers = p | 'ReadCustomers' >> beam.Create([('c1', 'Alice'), ('c2', 'Bob')])

joined = (
    {'orders': orders, 'customers': customers}
    | beam.CoGroupByKey()
)

I/O Connectors

ConnectorReadWrite
Text filesbeam.io.ReadFromText(path)beam.io.WriteToText(path)
Avrobeam.io.ReadFromAvro(path)beam.io.WriteToAvro(path, schema)
Parquetbeam.io.ReadFromParquet(path)beam.io.WriteToParquet(path, schema)
BigQuerybeam.io.ReadFromBigQuery(query=sql)beam.io.WriteToBigQuery(table)
Pub/Subbeam.io.ReadFromPubSub(topic=t)beam.io.WriteToPubSub(topic=t)
Kafkabeam.io.ReadFromKafka(config)beam.io.WriteToKafka(config)
JDBCbeam.io.ReadFromJdbc(config)beam.io.WriteToJdbc(config)

BigQuery Example

(
    p
    | 'ReadBQ' >> beam.io.ReadFromBigQuery(
        query='SELECT * FROM `project.dataset.table` WHERE date > "2024-01-01"',
        use_standard_sql=True,
    )
    | 'Transform' >> beam.Map(process_row)
    | 'WriteBQ' >> beam.io.WriteToBigQuery(
        'project:dataset.output_table',
        schema='name:STRING,amount:FLOAT,date:DATE',
        write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND,
        create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED,
    )
)

Configuration

Pipeline Options

from apache_beam.options.pipeline_options import (
    PipelineOptions, GoogleCloudOptions, StandardOptions, WorkerOptions
)

options = PipelineOptions()

# Standard options
standard = options.view_as(StandardOptions)
standard.runner = 'DataflowRunner'
standard.streaming = True

# Google Cloud options
gcp = options.view_as(GoogleCloudOptions)
gcp.project = 'my-project'
gcp.region = 'us-central1'
gcp.temp_location = 'gs://my-bucket/temp'
gcp.staging_location = 'gs://my-bucket/staging'

# Worker options
worker = options.view_as(WorkerOptions)
worker.machine_type = 'n1-standard-4'
worker.max_num_workers = 20
worker.autoscaling_algorithm = 'THROUGHPUT_BASED'
worker.disk_size_gb = 100

Running on Different Runners

# Direct runner (local testing)
python pipeline.py --runner DirectRunner

# Google Cloud Dataflow
python pipeline.py \
    --runner DataflowRunner \
    --project my-project \
    --region us-central1 \
    --temp_location gs://bucket/temp \
    --staging_location gs://bucket/staging

# Apache Flink
python pipeline.py \
    --runner FlinkRunner \
    --flink_master localhost:8081

# Apache Spark
python pipeline.py \
    --runner SparkRunner \
    --spark_master_url spark://host:7077

Advanced Usage

Windowing for Streaming

from apache_beam import window

(
    p
    | 'ReadStream' >> beam.io.ReadFromPubSub(topic=topic)
    | 'ParseJSON' >> beam.Map(json.loads)
    | 'AddTimestamp' >> beam.Map(
        lambda x: beam.window.TimestampedValue(x, x['event_time']))
    | 'Window' >> beam.WindowInto(
        window.SlidingWindows(size=300, period=60),
        trigger=beam.trigger.AfterWatermark(
            early=beam.trigger.AfterProcessingTime(30),
            late=beam.trigger.AfterCount(1),
        ),
        accumulation_mode=beam.trigger.AccumulationMode.ACCUMULATING,
        allowed_lateness=3600,
    )
    | 'CountPerWindow' >> beam.CombineGlobally(beam.combiners.CountCombineFn())
        .without_defaults()
)

Composite Transforms

class NormalizeAndValidate(beam.PTransform):
    def __init__(self, schema):
        self.schema = schema

    def expand(self, pcoll):
        return (
            pcoll
            | 'Normalize' >> beam.Map(self._normalize)
            | 'Validate' >> beam.Filter(self._validate)
        )

    def _normalize(self, element):
        return {k.lower(): v for k, v in element.items()}

    def _validate(self, element):
        return all(k in element for k in self.schema)

# Usage
p | 'Clean' >> NormalizeAndValidate(schema=['id', 'name', 'value'])

Side Inputs

# Use a PCollection as a side input
lookup_table = (
    p
    | 'ReadLookup' >> beam.io.ReadFromText('lookup.csv')
    | 'ParseLookup' >> beam.Map(lambda l: (l.split(',')[0], l.split(',')[1]))
)

(
    p
    | 'ReadMain' >> beam.io.ReadFromText('main.csv')
    | 'Enrich' >> beam.Map(
        lambda row, lookup: enrich(row, dict(lookup)),
        lookup=beam.pvalue.AsIter(lookup_table),
    )
)

Troubleshooting

IssueSolution
Pipeline stuck with no outputCheck for unbounded PCollections without windowing. Add beam.WindowInto()
Serialization errorsEnsure all DoFn attributes are picklable. Use setup() for non-serializable objects
OOM on workersIncrease machine type or use beam.Reshuffle() to redistribute data
Late data being droppedIncrease allowed_lateness in windowing config
Slow pipeline performanceUse beam.Reshuffle() after hot-key operations. Increase max_num_workers
BigQuery quota errorsUse STREAMING_INSERTS for low-latency or FILE_LOADS for batch. Add retry logic
Type hint errorsAdd @beam.typehints.with_input_types() and @beam.typehints.with_output_types()
Dataflow job fails on startupCheck IAM permissions, network config, and staging bucket accessibility
Side input too largeUse AsDict for key-value lookups instead of AsList for large side inputs
Watermark not advancingCheck timestamp extraction. Ensure source is emitting watermark updates