Zum Inhalt springen

Apache Flink

Apache Flink is a distributed stream and batch processing engine with real-time capabilities and powerful state management.

Installation

Download and Setup

# Download Flink
wget https://archive.apache.org/dist/flink/flink-1.17.1/flink-1.17.1-bin-scala_2.12.tgz
tar -xzf flink-1.17.1-bin-scala_2.12.tgz
cd flink-1.17.1

# Start local cluster
./bin/start-cluster.sh

# Check status
./bin/taskmanager.sh status

# Stop cluster
./bin/stop-cluster.sh

Docker

# Start Flink with Docker Compose
cat > docker-compose.yml << 'EOF'
version: '3'
services:
  jobmanager:
    image: flink:1.17.1
    command: jobmanager
    environment:
      - JOB_MANAGER_RPC_ADDRESS=jobmanager
    ports:
      - "8081:8081"

  taskmanager:
    image: flink:1.17.1
    command: taskmanager
    environment:
      - JOB_MANAGER_RPC_ADDRESS=jobmanager
    depends_on:
      - jobmanager
EOF

docker-compose up -d
# Web UI: http://localhost:8081

Job Submission

Submit Jobs

# Submit JAR job
./bin/flink run my-job.jar

# Submit with custom main class
./bin/flink run -c com.example.StreamingJob my-job.jar

# Submit with arguments
./bin/flink run my-job.jar --input /data/input.txt --output /data/output

# Submit with custom parallelism
./bin/flink run -p 4 my-job.jar

# Submit with custom configuration
./bin/flink run \
  -c com.example.MyJob \
  -D jobmanager.memory.process.size=1024m \
  -D taskmanager.memory.process.size=2048m \
  my-job.jar

Job Management

# List running jobs
./bin/flink list -r

# Cancel job
./bin/flink cancel <job-id>

# Stop job (graceful shutdown)
./bin/flink stop <job-id>

# Get job details
./bin/flink info <job-id>

# Modify parallelism (rescale job)
./bin/flink modify <job-id> -p 8

Savepoints and Checkpoints

Savepoint Operations

# Trigger savepoint
./bin/flink savepoint <job-id> /path/to/savepoints

# Savepoint with shutdown
./bin/flink savepoint <job-id> /path/to/savepoints -d

# Resume from savepoint
./bin/flink run \
  -s /path/to/savepoint/savepoint-0000-0 \
  my-job.jar

# List savepoints
ls -la /path/to/savepoints/

Checkpoint Configuration

# conf/flink-conf.yaml
execution.checkpointing.mode: EXACTLY_ONCE
execution.checkpointing.interval: 60000
execution.checkpointing.timeout: 600000
state.checkpoints.dir: file:///data/checkpoints
state.savepoints.dir: file:///data/savepoints
state.backend: rocksdb

Configuration

# Job and Task Manager
jobmanager.memory.process.size: 1600m
taskmanager.memory.process.size: 1728m
taskmanager.numberOfTaskSlots: 4

# Network
taskmanager.network.memory.fraction: 0.1
taskmanager.network.memory.max: 1gb

# State backend
state.backend: rocksdb
state.backend.incremental: true
state.checkpoints.dir: file:///var/flink/checkpoints

# Savepoints
state.savepoints.dir: file:///var/flink/savepoints

# RocksDB settings
state.backend.rocksdb.timer-service.factory: ROCKSDB
state.backend.rocksdb.block.cache-size: 64mb
state.backend.rocksdb.write-buffer-size: 64mb

# High availability
high-availability: zookeeper
high-availability.zookeeper.quorum: localhost:2181
high-availability.cluster-id: /flink

# Logging
logger.org.apache.flink: INFO
jobmanager.logs.dir: /var/log/flink

Environment Variables

# Set JVM options
export JVM_ARGS="-Xms1024m -Xmx2048m"

# Set Flink home
export FLINK_HOME=/opt/flink

# Task manager logs
export LOG_DIR=/var/log/flink

CLI Commands

Cluster Operations

# Start cluster
./bin/start-cluster.sh

# Start job manager
./bin/jobmanager.sh start

# Start task manager
./bin/taskmanager.sh start

# Stop all processes
./bin/stop-cluster.sh

# Check component status
./bin/taskmanager.sh status
./bin/jobmanager.sh status

Utilities

# Calculate SHA256 of JAR
./bin/flink-hash.sh my-job.jar

# Analyze JAR
./bin/analyze-jar.sh my-job.jar

# Explain job
./bin/flink info my-job.jar

Streaming Applications

Java Example

public class StreamingJob {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env =
            StreamExecutionEnvironment.getExecutionEnvironment();

        DataStream<String> text =
            env.readTextFile("input.txt");

        DataStream<Tuple2<String, Integer>> counts = text
            .flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {
                public void flatMap(String value,
                    Collector<Tuple2<String, Integer>> out) {
                    String[] words = value.split("\\W+");
                    for (String word : words) {
                        if (word.length() > 0) {
                            out.collect(new Tuple2<>(word, 1));
                        }
                    }
                }
            })
            .keyBy(0)
            .sum(1);

        counts.print();

        env.execute("Stream Counting");
    }
}

Python Example

from pyflink.datastream import StreamExecutionEnvironment
from pyflink.datastream.functions import MapFunction
from pyflink.common.typeinfo import Types

class Multiplier(MapFunction):
    def map(self, value):
        return value[0], value[1] * 2

env = StreamExecutionEnvironment.get_execution_environment()

data_stream = env.from_collection(
    [(1, 2), (2, 3), (3, 4)],
    type_info=Types.TUPLE([Types.INT(), Types.INT()])
)

mapped_stream = data_stream.map(Multiplier())

mapped_stream.print()

env.execute("PyFlink Stream Job")

Monitoring and Debugging

Metrics

# Query metrics via REST API
curl http://localhost:8081/metrics

# Get job manager metrics
curl http://localhost:8081/jobmanager/metrics

# Get task manager metrics
curl http://localhost:8081/taskmanagers/metrics

Logging

# View job manager logs
tail -f $FLINK_HOME/log/flink-*-jobmanager-*.log

# View task manager logs
tail -f $FLINK_HOME/log/flink-*-taskmanager-*.log

# Enable debug logging
# In conf/log4j.properties:
# log4j.rootLogger=DEBUG, file, console

REST API

# Get overview
curl http://localhost:8081/v1/overview

# Get job details
curl http://localhost:8081/v1/jobs/<job-id>

# Get task manager info
curl http://localhost:8081/v1/taskmanagers

# Get running jobs
curl http://localhost:8081/v1/jobs?status=RUNNING

High Availability Setup

ZooKeeper Configuration

# Flink HA with ZooKeeper
high-availability: zookeeper
high-availability.zookeeper.quorum: zk-1:2181,zk-2:2181,zk-3:2181
high-availability.zookeeper.path.root: /flink
high-availability.cluster-id: /prod-cluster

# Storage for HA metadata
high-availability.storageDir: hdfs:///flink/ha

Docker Compose HA Setup

version: '3'
services:
  zookeeper:
    image: zookeeper:latest
    environment:
      ZOO_CFG_EXTRA: "server.1=zk:2888:3888"
    ports:
      - "2181:2181"

  jobmanager:
    image: flink:1.17.1
    command: jobmanager
    environment:
      - JOB_MANAGER_RPC_ADDRESS=jobmanager
      - FLINK_PROPERTIES=high-availability=zookeeper
        high-availability.zookeeper.quorum=zookeeper:2181
    ports:
      - "8081:8081"

  taskmanager:
    image: flink:1.17.1
    command: taskmanager
    environment:
      - JOB_MANAGER_RPC_ADDRESS=jobmanager

Performance Tuning

Memory Configuration

# Adjust memory settings
jobmanager.memory.process.size: 2048m
jobmanager.memory.jvmoverhead.fraction: 0.1

taskmanager.memory.process.size: 4096m
taskmanager.memory.task.heap.size: 2560m
taskmanager.memory.managed.size: 1024m
taskmanager.memory.jvmoverhead.fraction: 0.1

Network Optimization

# Network buffer tuning
taskmanager.network.memory.min: 64mb
taskmanager.network.memory.max: 1gb
taskmanager.network.memory.fraction: 0.1
taskmanager.network.tcp.send.buffer.size: 2mb
taskmanager.network.tcp.receive.buffer.size: 2mb

Best Practices

  • Set appropriate memory limits for JobManager and TaskManager
  • Enable checkpointing for fault tolerance
  • Use savepoints for planned updates and version upgrades
  • Monitor task latency and throughput metrics
  • Set proper parallelism based on available resources
  • Use exactly-once semantics for critical data
  • Implement state cleanup for memory efficiency
  • Test job scaling before production deployment
  • Monitor RocksDB memory usage with managed state
  • Regular backup of savepoint directories

Resources


Last updated: 2025-03-30