Apache Spark
Apache Spark is a unified analytics engine for large-scale data processing with support for batch and stream processing.
Installation
Download and Setup
# Download Spark
wget https://archive.apache.org/dist/spark/spark-3.3.2/spark-3.3.2-bin-hadoop3.tgz
tar -xzf spark-3.3.2-bin-hadoop3.tgz
cd spark-3.3.2-bin-hadoop3
# Set environment
export SPARK_HOME=$(pwd)
export PATH=$PATH:$SPARK_HOME/bin
# Verify installation
spark-shell --version
Docker
# Run Spark with Docker
docker run -it --rm \
-p 4040:4040 \
-p 8080:8080 \
-p 7077:7077 \
apache/spark:3.3.2 \
/opt/spark/bin/spark-shell
Spark-Submit
Basic Job Submission
# Submit Spark application
spark-submit \
--class org.example.SparkApp \
--master local[4] \
my-spark-job.jar
# Run with input arguments
spark-submit \
--class org.example.WordCount \
--master local \
my-spark-job.jar \
input.txt output
# PySpark job
spark-submit \
--master local[4] \
my-spark-script.py
Configuration Options
# Submit with configuration
spark-submit \
--master spark://master:7077 \
--deploy-mode cluster \
--driver-memory 4g \
--executor-memory 2g \
--num-executors 10 \
--executor-cores 4 \
--conf spark.sql.shuffle.partitions=200 \
my-job.jar
# YARN deployment
spark-submit \
--master yarn \
--deploy-mode cluster \
--driver-memory 2g \
--executor-memory 4g \
--num-executors 5 \
--executor-cores 8 \
my-job.jar
# Kubernetes deployment
spark-submit \
--master k8s://https://kubernetes-master:443 \
--deploy-mode cluster \
--driver-memory 1g \
--executor-memory 2g \
--num-executors 4 \
--executor-cores 2 \
--conf spark.kubernetes.namespace=spark \
my-job.jar
Spark-Shell
Interactive Mode
# Start Scala shell
spark-shell
# Start PySpark shell
pyspark
# With configuration
spark-shell \
--master local[8] \
--driver-memory 4g \
--executor-memory 2g
Spark-Shell Commands
// Load data
val df = spark.read.json("data.json")
// Show schema
df.printSchema()
// Show data
df.show()
// SQL query
spark.sql("SELECT COUNT(*) FROM my_table").show()
// Save data
df.write.parquet("output/path")
PySpark API
DataFrame Operations
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, sum, count
# Create SparkSession
spark = SparkSession.builder \
.appName("MyApp") \
.config("spark.sql.shuffle.partitions", "200") \
.getOrCreate()
# Load data
df = spark.read.json("input.json")
df = spark.read.csv("data.csv", header=True, inferSchema=True)
df = spark.read.parquet("data.parquet")
# Filter and transformation
filtered = df.filter(col("age") > 30)
selected = df.select("name", "age")
renamed = df.withColumnRenamed("age", "years")
# Aggregation
result = df.groupBy("department") \
.agg(sum("salary").alias("total_salary"),
count("*").alias("employee_count"))
# Save data
df.write.mode("overwrite").json("output_dir")
df.write.mode("append").parquet("output.parquet")
df.coalesce(1).write.csv("output.csv", header=True)
RDD Operations
from pyspark import SparkContext
sc = SparkContext("local", "RDD Example")
# Create RDD
rdd = sc.parallelize([1, 2, 3, 4, 5])
# Transformation
mapped = rdd.map(lambda x: x * 2)
filtered = rdd.filter(lambda x: x > 2)
paired = rdd.map(lambda x: (x, x*x))
# Actions
result = mapped.collect()
count = rdd.count()
first = rdd.first()
take = rdd.take(3)
sc.stop()
SQL and Queries
Spark SQL
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("SparkSQL").getOrCreate()
# Create temporary view
df = spark.read.json("users.json")
df.createOrReplaceTempView("users")
# SQL queries
result = spark.sql("SELECT * FROM users WHERE age > 30")
result = spark.sql("""
SELECT department, COUNT(*) as count
FROM users
GROUP BY department
ORDER BY count DESC
""")
# Save result
result.write.mode("overwrite").csv("results")
Configuration
conf/spark-defaults.conf
spark.master spark://master:7077
spark.eventLog.enabled true
spark.eventLog.dir /var/log/spark
spark.serializer org.apache.spark.serializer.KryoSerializer
spark.kryoserializer.buffer.max 256mb
spark.shuffle.sort.bypassMergeThreshold 200
spark.sql.shuffle.partitions 200
spark.driver.maxResultSize 2g
spark.memory.fraction 0.6
spark.sql.adaptive.enabled true
spark.sql.adaptive.skewJoin.enabled true
Environment Variables
# Set Spark home
export SPARK_HOME=/opt/spark
# JVM memory
export SPARK_DRIVER_MEMORY=4g
export SPARK_EXECUTOR_MEMORY=2g
# Python binary
export PYSPARK_PYTHON=/usr/bin/python3
export PYSPARK_DRIVER_PYTHON=/usr/bin/python3
Cluster Management
Standalone Cluster
# Start master node
./sbin/start-master.sh
# Start worker nodes
./sbin/start-worker.sh spark://master:7077
# Start history server
./sbin/start-history-server.sh
# Stop cluster
./sbin/stop-all.sh
YARN Integration
# Submit to YARN
spark-submit \
--master yarn \
--deploy-mode cluster \
--queue production \
my-job.jar
# Monitor on YARN
# http://yarn-master:8088
Performance Tuning
Memory Management
# Optimize memory settings
spark-submit \
--driver-memory 8g \
--executor-memory 4g \
--conf spark.executor.memoryOverhead=512m \
--conf spark.memory.fraction=0.6 \
my-job.jar
Partitioning
# Control partitioning
rdd = spark.read.csv("large-file.csv")
# Repartition data
repartitioned = rdd.repartition(100)
# Coalesce partitions
coalesced = rdd.coalesce(50)
# Write with partitioning
df.write.partitionBy("year", "month").parquet("output")
Caching
# Cache DataFrame
df = spark.read.json("data.json")
df.cache()
# Multiple actions benefit from caching
df.count()
df.groupBy("category").count()
# Remove cache
df.unpersist()
Streaming
Structured Streaming
from pyspark.sql.functions import window, col
# Read from Kafka
df = spark.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers", "localhost:9092") \
.option("subscribe", "my-topic") \
.load()
# Process data
processed = df.groupBy(window(col("timestamp"), "10 minutes")) \
.count()
# Write output
query = processed.writeStream \
.format("console") \
.option("checkpointLocation", "/tmp/checkpoint") \
.start()
query.awaitTermination()
Best Practices
- Set appropriate parallelism (executors, cores, partitions)
- Use DataFrame/Dataset API instead of RDD for better performance
- Enable SQL adaptive execution for query optimization
- Cache frequently accessed DataFrames
- Use proper partitioning for large datasets
- Monitor job execution through Spark UI
- Configure appropriate memory limits
- Use Kryo serialization for better performance
- Implement proper error handling and logging
- Test performance with representative data volumes
Resources
Last updated: 2025-03-30