Apache Flink
Apache Flink is a distributed stream and batch processing engine with real-time capabilities and powerful state management.
Installation
Download and Setup
# Download Flink
wget https://archive.apache.org/dist/flink/flink-1.17.1/flink-1.17.1-bin-scala_2.12.tgz
tar -xzf flink-1.17.1-bin-scala_2.12.tgz
cd flink-1.17.1
# Start local cluster
./bin/start-cluster.sh
# Check status
./bin/taskmanager.sh status
# Stop cluster
./bin/stop-cluster.sh
Docker
# Start Flink with Docker Compose
cat > docker-compose.yml << 'EOF'
version: '3'
services:
jobmanager:
image: flink:1.17.1
command: jobmanager
environment:
- JOB_MANAGER_RPC_ADDRESS=jobmanager
ports:
- "8081:8081"
taskmanager:
image: flink:1.17.1
command: taskmanager
environment:
- JOB_MANAGER_RPC_ADDRESS=jobmanager
depends_on:
- jobmanager
EOF
docker-compose up -d
# Web UI: http://localhost:8081
Job Submission
Submit Jobs
# Submit JAR job
./bin/flink run my-job.jar
# Submit with custom main class
./bin/flink run -c com.example.StreamingJob my-job.jar
# Submit with arguments
./bin/flink run my-job.jar --input /data/input.txt --output /data/output
# Submit with custom parallelism
./bin/flink run -p 4 my-job.jar
# Submit with custom configuration
./bin/flink run \
-c com.example.MyJob \
-D jobmanager.memory.process.size=1024m \
-D taskmanager.memory.process.size=2048m \
my-job.jar
Job Management
# List running jobs
./bin/flink list -r
# Cancel job
./bin/flink cancel <job-id>
# Stop job (graceful shutdown)
./bin/flink stop <job-id>
# Get job details
./bin/flink info <job-id>
# Modify parallelism (rescale job)
./bin/flink modify <job-id> -p 8
Savepoints and Checkpoints
Savepoint Operations
# Trigger savepoint
./bin/flink savepoint <job-id> /path/to/savepoints
# Savepoint with shutdown
./bin/flink savepoint <job-id> /path/to/savepoints -d
# Resume from savepoint
./bin/flink run \
-s /path/to/savepoint/savepoint-0000-0 \
my-job.jar
# List savepoints
ls -la /path/to/savepoints/
Checkpoint Configuration
# conf/flink-conf.yaml
execution.checkpointing.mode: EXACTLY_ONCE
execution.checkpointing.interval: 60000
execution.checkpointing.timeout: 600000
state.checkpoints.dir: file:///data/checkpoints
state.savepoints.dir: file:///data/savepoints
state.backend: rocksdb
Configuration
flink-conf.yaml Settings
# Job and Task Manager
jobmanager.memory.process.size: 1600m
taskmanager.memory.process.size: 1728m
taskmanager.numberOfTaskSlots: 4
# Network
taskmanager.network.memory.fraction: 0.1
taskmanager.network.memory.max: 1gb
# State backend
state.backend: rocksdb
state.backend.incremental: true
state.checkpoints.dir: file:///var/flink/checkpoints
# Savepoints
state.savepoints.dir: file:///var/flink/savepoints
# RocksDB settings
state.backend.rocksdb.timer-service.factory: ROCKSDB
state.backend.rocksdb.block.cache-size: 64mb
state.backend.rocksdb.write-buffer-size: 64mb
# High availability
high-availability: zookeeper
high-availability.zookeeper.quorum: localhost:2181
high-availability.cluster-id: /flink
# Logging
logger.org.apache.flink: INFO
jobmanager.logs.dir: /var/log/flink
Environment Variables
# Set JVM options
export JVM_ARGS="-Xms1024m -Xmx2048m"
# Set Flink home
export FLINK_HOME=/opt/flink
# Task manager logs
export LOG_DIR=/var/log/flink
CLI Commands
Cluster Operations
# Start cluster
./bin/start-cluster.sh
# Start job manager
./bin/jobmanager.sh start
# Start task manager
./bin/taskmanager.sh start
# Stop all processes
./bin/stop-cluster.sh
# Check component status
./bin/taskmanager.sh status
./bin/jobmanager.sh status
Utilities
# Calculate SHA256 of JAR
./bin/flink-hash.sh my-job.jar
# Analyze JAR
./bin/analyze-jar.sh my-job.jar
# Explain job
./bin/flink info my-job.jar
Streaming Applications
Java Example
public class StreamingJob {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<String> text =
env.readTextFile("input.txt");
DataStream<Tuple2<String, Integer>> counts = text
.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {
public void flatMap(String value,
Collector<Tuple2<String, Integer>> out) {
String[] words = value.split("\\W+");
for (String word : words) {
if (word.length() > 0) {
out.collect(new Tuple2<>(word, 1));
}
}
}
})
.keyBy(0)
.sum(1);
counts.print();
env.execute("Stream Counting");
}
}
Python Example
from pyflink.datastream import StreamExecutionEnvironment
from pyflink.datastream.functions import MapFunction
from pyflink.common.typeinfo import Types
class Multiplier(MapFunction):
def map(self, value):
return value[0], value[1] * 2
env = StreamExecutionEnvironment.get_execution_environment()
data_stream = env.from_collection(
[(1, 2), (2, 3), (3, 4)],
type_info=Types.TUPLE([Types.INT(), Types.INT()])
)
mapped_stream = data_stream.map(Multiplier())
mapped_stream.print()
env.execute("PyFlink Stream Job")
Monitoring and Debugging
Metrics
# Query metrics via REST API
curl http://localhost:8081/metrics
# Get job manager metrics
curl http://localhost:8081/jobmanager/metrics
# Get task manager metrics
curl http://localhost:8081/taskmanagers/metrics
Logging
# View job manager logs
tail -f $FLINK_HOME/log/flink-*-jobmanager-*.log
# View task manager logs
tail -f $FLINK_HOME/log/flink-*-taskmanager-*.log
# Enable debug logging
# In conf/log4j.properties:
# log4j.rootLogger=DEBUG, file, console
REST API
# Get overview
curl http://localhost:8081/v1/overview
# Get job details
curl http://localhost:8081/v1/jobs/<job-id>
# Get task manager info
curl http://localhost:8081/v1/taskmanagers
# Get running jobs
curl http://localhost:8081/v1/jobs?status=RUNNING
High Availability Setup
ZooKeeper Configuration
# Flink HA with ZooKeeper
high-availability: zookeeper
high-availability.zookeeper.quorum: zk-1:2181,zk-2:2181,zk-3:2181
high-availability.zookeeper.path.root: /flink
high-availability.cluster-id: /prod-cluster
# Storage for HA metadata
high-availability.storageDir: hdfs:///flink/ha
Docker Compose HA Setup
version: '3'
services:
zookeeper:
image: zookeeper:latest
environment:
ZOO_CFG_EXTRA: "server.1=zk:2888:3888"
ports:
- "2181:2181"
jobmanager:
image: flink:1.17.1
command: jobmanager
environment:
- JOB_MANAGER_RPC_ADDRESS=jobmanager
- FLINK_PROPERTIES=high-availability=zookeeper
high-availability.zookeeper.quorum=zookeeper:2181
ports:
- "8081:8081"
taskmanager:
image: flink:1.17.1
command: taskmanager
environment:
- JOB_MANAGER_RPC_ADDRESS=jobmanager
Performance Tuning
Memory Configuration
# Adjust memory settings
jobmanager.memory.process.size: 2048m
jobmanager.memory.jvmoverhead.fraction: 0.1
taskmanager.memory.process.size: 4096m
taskmanager.memory.task.heap.size: 2560m
taskmanager.memory.managed.size: 1024m
taskmanager.memory.jvmoverhead.fraction: 0.1
Network Optimization
# Network buffer tuning
taskmanager.network.memory.min: 64mb
taskmanager.network.memory.max: 1gb
taskmanager.network.memory.fraction: 0.1
taskmanager.network.tcp.send.buffer.size: 2mb
taskmanager.network.tcp.receive.buffer.size: 2mb
Best Practices
- Set appropriate memory limits for JobManager and TaskManager
- Enable checkpointing for fault tolerance
- Use savepoints for planned updates and version upgrades
- Monitor task latency and throughput metrics
- Set proper parallelism based on available resources
- Use exactly-once semantics for critical data
- Implement state cleanup for memory efficiency
- Test job scaling before production deployment
- Monitor RocksDB memory usage with managed state
- Regular backup of savepoint directories
Resources
Last updated: 2025-03-30