Zum Inhalt springen

Apache Spark

Apache Spark is a unified analytics engine for large-scale data processing with support for batch and stream processing.

Installation

Download and Setup

# Download Spark
wget https://archive.apache.org/dist/spark/spark-3.3.2/spark-3.3.2-bin-hadoop3.tgz
tar -xzf spark-3.3.2-bin-hadoop3.tgz
cd spark-3.3.2-bin-hadoop3

# Set environment
export SPARK_HOME=$(pwd)
export PATH=$PATH:$SPARK_HOME/bin

# Verify installation
spark-shell --version

Docker

# Run Spark with Docker
docker run -it --rm \
  -p 4040:4040 \
  -p 8080:8080 \
  -p 7077:7077 \
  apache/spark:3.3.2 \
  /opt/spark/bin/spark-shell

Spark-Submit

Basic Job Submission

# Submit Spark application
spark-submit \
  --class org.example.SparkApp \
  --master local[4] \
  my-spark-job.jar

# Run with input arguments
spark-submit \
  --class org.example.WordCount \
  --master local \
  my-spark-job.jar \
  input.txt output

# PySpark job
spark-submit \
  --master local[4] \
  my-spark-script.py

Configuration Options

# Submit with configuration
spark-submit \
  --master spark://master:7077 \
  --deploy-mode cluster \
  --driver-memory 4g \
  --executor-memory 2g \
  --num-executors 10 \
  --executor-cores 4 \
  --conf spark.sql.shuffle.partitions=200 \
  my-job.jar

# YARN deployment
spark-submit \
  --master yarn \
  --deploy-mode cluster \
  --driver-memory 2g \
  --executor-memory 4g \
  --num-executors 5 \
  --executor-cores 8 \
  my-job.jar

# Kubernetes deployment
spark-submit \
  --master k8s://https://kubernetes-master:443 \
  --deploy-mode cluster \
  --driver-memory 1g \
  --executor-memory 2g \
  --num-executors 4 \
  --executor-cores 2 \
  --conf spark.kubernetes.namespace=spark \
  my-job.jar

Spark-Shell

Interactive Mode

# Start Scala shell
spark-shell

# Start PySpark shell
pyspark

# With configuration
spark-shell \
  --master local[8] \
  --driver-memory 4g \
  --executor-memory 2g

Spark-Shell Commands

// Load data
val df = spark.read.json("data.json")

// Show schema
df.printSchema()

// Show data
df.show()

// SQL query
spark.sql("SELECT COUNT(*) FROM my_table").show()

// Save data
df.write.parquet("output/path")

PySpark API

DataFrame Operations

from pyspark.sql import SparkSession
from pyspark.sql.functions import col, sum, count

# Create SparkSession
spark = SparkSession.builder \
    .appName("MyApp") \
    .config("spark.sql.shuffle.partitions", "200") \
    .getOrCreate()

# Load data
df = spark.read.json("input.json")
df = spark.read.csv("data.csv", header=True, inferSchema=True)
df = spark.read.parquet("data.parquet")

# Filter and transformation
filtered = df.filter(col("age") > 30)
selected = df.select("name", "age")
renamed = df.withColumnRenamed("age", "years")

# Aggregation
result = df.groupBy("department") \
    .agg(sum("salary").alias("total_salary"),
         count("*").alias("employee_count"))

# Save data
df.write.mode("overwrite").json("output_dir")
df.write.mode("append").parquet("output.parquet")
df.coalesce(1).write.csv("output.csv", header=True)

RDD Operations

from pyspark import SparkContext

sc = SparkContext("local", "RDD Example")

# Create RDD
rdd = sc.parallelize([1, 2, 3, 4, 5])

# Transformation
mapped = rdd.map(lambda x: x * 2)
filtered = rdd.filter(lambda x: x > 2)
paired = rdd.map(lambda x: (x, x*x))

# Actions
result = mapped.collect()
count = rdd.count()
first = rdd.first()
take = rdd.take(3)

sc.stop()

SQL and Queries

Spark SQL

from pyspark.sql import SparkSession

spark = SparkSession.builder.appName("SparkSQL").getOrCreate()

# Create temporary view
df = spark.read.json("users.json")
df.createOrReplaceTempView("users")

# SQL queries
result = spark.sql("SELECT * FROM users WHERE age > 30")
result = spark.sql("""
    SELECT department, COUNT(*) as count
    FROM users
    GROUP BY department
    ORDER BY count DESC
""")

# Save result
result.write.mode("overwrite").csv("results")

Configuration

conf/spark-defaults.conf

spark.master                     spark://master:7077
spark.eventLog.enabled           true
spark.eventLog.dir               /var/log/spark
spark.serializer                 org.apache.spark.serializer.KryoSerializer
spark.kryoserializer.buffer.max  256mb
spark.shuffle.sort.bypassMergeThreshold    200
spark.sql.shuffle.partitions     200
spark.driver.maxResultSize       2g
spark.memory.fraction            0.6
spark.sql.adaptive.enabled       true
spark.sql.adaptive.skewJoin.enabled true

Environment Variables

# Set Spark home
export SPARK_HOME=/opt/spark

# JVM memory
export SPARK_DRIVER_MEMORY=4g
export SPARK_EXECUTOR_MEMORY=2g

# Python binary
export PYSPARK_PYTHON=/usr/bin/python3
export PYSPARK_DRIVER_PYTHON=/usr/bin/python3

Cluster Management

Standalone Cluster

# Start master node
./sbin/start-master.sh

# Start worker nodes
./sbin/start-worker.sh spark://master:7077

# Start history server
./sbin/start-history-server.sh

# Stop cluster
./sbin/stop-all.sh

YARN Integration

# Submit to YARN
spark-submit \
  --master yarn \
  --deploy-mode cluster \
  --queue production \
  my-job.jar

# Monitor on YARN
# http://yarn-master:8088

Performance Tuning

Memory Management

# Optimize memory settings
spark-submit \
  --driver-memory 8g \
  --executor-memory 4g \
  --conf spark.executor.memoryOverhead=512m \
  --conf spark.memory.fraction=0.6 \
  my-job.jar

Partitioning

# Control partitioning
rdd = spark.read.csv("large-file.csv")

# Repartition data
repartitioned = rdd.repartition(100)

# Coalesce partitions
coalesced = rdd.coalesce(50)

# Write with partitioning
df.write.partitionBy("year", "month").parquet("output")

Caching

# Cache DataFrame
df = spark.read.json("data.json")
df.cache()

# Multiple actions benefit from caching
df.count()
df.groupBy("category").count()

# Remove cache
df.unpersist()

Streaming

Structured Streaming

from pyspark.sql.functions import window, col

# Read from Kafka
df = spark.readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "localhost:9092") \
    .option("subscribe", "my-topic") \
    .load()

# Process data
processed = df.groupBy(window(col("timestamp"), "10 minutes")) \
    .count()

# Write output
query = processed.writeStream \
    .format("console") \
    .option("checkpointLocation", "/tmp/checkpoint") \
    .start()

query.awaitTermination()

Best Practices

  • Set appropriate parallelism (executors, cores, partitions)
  • Use DataFrame/Dataset API instead of RDD for better performance
  • Enable SQL adaptive execution for query optimization
  • Cache frequently accessed DataFrames
  • Use proper partitioning for large datasets
  • Monitor job execution through Spark UI
  • Configure appropriate memory limits
  • Use Kryo serialization for better performance
  • Implement proper error handling and logging
  • Test performance with representative data volumes

Resources


Last updated: 2025-03-30