Salta ai contenuti

Delta Lake Cheat Sheet

Overview

Delta Lake is an open-source storage layer that brings reliability and performance to data lakes. Originally created by Databricks, it extends Parquet files with a transaction log that enables ACID transactions, scalable metadata handling, schema enforcement, time travel, and unified batch and streaming processing. Delta Lake tables are simply directories of Parquet files with a _delta_log subdirectory containing JSON transaction logs.

Delta Lake is the foundation of the Databricks Lakehouse Platform and is widely used in both open-source Spark environments and commercial deployments. It supports DML operations (UPDATE, DELETE, MERGE), schema evolution, change data feed for downstream CDC, optimistic concurrency control, and Z-order clustering for query optimization. Delta Lake works with Spark, Flink, Trino, Presto, and other engines through Delta Connectors and the Delta UniForm interoperability layer.

Installation

Spark with Delta Lake

# PySpark with Delta Lake
pyspark \
    --packages io.delta:delta-spark_2.12:3.1.0 \
    --conf "spark.sql.extensions=io.delta.sql.DeltaSparkSessionExtension" \
    --conf "spark.sql.catalog.spark_catalog=org.apache.spark.sql.delta.catalog.DeltaCatalog"

# pip install for Python projects
pip install delta-spark==3.1.0 pyspark==3.5.0

# Standalone Delta (without Spark)
pip install deltalake   # delta-rs Python bindings

PySpark Session Setup

from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .appName("DeltaLake") \
    .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
    .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog") \
    .config("spark.jars.packages", "io.delta:delta-spark_2.12:3.1.0") \
    .getOrCreate()

delta-rs (Rust/Python Native)

# Native Python Delta Lake (no Spark required)
import deltalake

# Read a Delta table
dt = deltalake.DeltaTable("s3://bucket/delta-table")
df = dt.to_pandas()

# Or with PyArrow
table = dt.to_pyarrow_table()

Core Operations

Create and Write

# Create Delta table from DataFrame
df = spark.createDataFrame([
    (1, "Alice", 100.0, "2024-06-01"),
    (2, "Bob", 200.0, "2024-06-01"),
], ["id", "name", "amount", "date"])

# Write as Delta table
df.write.format("delta").mode("overwrite").save("/data/customers")

# Write with partitioning
df.write.format("delta") \
    .partitionBy("date") \
    .mode("append") \
    .save("/data/orders")

# Save as managed table
df.write.format("delta").saveAsTable("default.customers")

Read

# Read Delta table
df = spark.read.format("delta").load("/data/customers")

# Read specific version (time travel)
df = spark.read.format("delta").option("versionAsOf", 3).load("/data/customers")

# Read at specific timestamp
df = spark.read.format("delta") \
    .option("timestampAsOf", "2024-06-01T00:00:00") \
    .load("/data/customers")

SQL Operations

-- Create Delta table
CREATE TABLE orders (
    order_id BIGINT,
    customer_id BIGINT,
    amount DECIMAL(10,2),
    order_date DATE
)
USING DELTA
PARTITIONED BY (order_date)
LOCATION 's3://bucket/orders';

-- Insert
INSERT INTO orders VALUES (1, 100, 99.99, DATE '2024-06-15');

-- Update
UPDATE orders SET amount = 109.99 WHERE order_id = 1;

-- Delete
DELETE FROM orders WHERE order_date < '2023-01-01';

-- Merge (upsert)
MERGE INTO orders target
USING updates source
ON target.order_id = source.order_id
WHEN MATCHED THEN UPDATE SET *
WHEN NOT MATCHED THEN INSERT *;

-- Time travel
SELECT * FROM orders VERSION AS OF 5;
SELECT * FROM orders TIMESTAMP AS OF '2024-06-01';

Table Management Commands

CommandDescription
DESCRIBE HISTORY ordersShow table history and versions
DESCRIBE DETAIL ordersShow table metadata (files, size, partitions)
DESCRIBE TABLE ordersShow table schema
SHOW TBLPROPERTIES ordersShow table properties
ALTER TABLE orders ADD COLUMNS (col TYPE)Add columns
ALTER TABLE orders SET TBLPROPERTIES (...)Set properties
OPTIMIZE ordersCompact small files
OPTIMIZE orders ZORDER BY (col1, col2)Z-order optimize for query patterns
VACUUM ordersRemove old files not in transaction log
VACUUM orders RETAIN 168 HOURSVacuum with custom retention
RESTORE orders TO VERSION AS OF 5Restore table to a specific version

Configuration

Table Properties

ALTER TABLE orders SET TBLPROPERTIES (
    'delta.autoOptimize.optimizeWrite' = 'true',
    'delta.autoOptimize.autoCompact' = 'true',
    'delta.logRetentionDuration' = 'interval 30 days',
    'delta.deletedFileRetentionDuration' = 'interval 7 days',
    'delta.dataSkippingNumIndexedCols' = 32,
    'delta.checkpoint.writeStatsAsJson' = 'true',
    'delta.checkpoint.writeStatsAsStruct' = 'true',
    'delta.enableChangeDataFeed' = 'true',
    'delta.minReaderVersion' = '2',
    'delta.minWriterVersion' = '5'
);

Spark Configuration

spark.conf.set("spark.databricks.delta.optimizeWrite.enabled", "true")
spark.conf.set("spark.databricks.delta.autoCompact.enabled", "true")
spark.conf.set("spark.databricks.delta.retentionDurationCheck.enabled", "false")
spark.conf.set("spark.databricks.delta.schema.autoMerge.enabled", "true")
spark.conf.set("spark.databricks.delta.merge.optimizeInsertOnlyMerge.enabled", "true")

Advanced Usage

Change Data Feed (CDC)

-- Enable change data feed
ALTER TABLE orders SET TBLPROPERTIES ('delta.enableChangeDataFeed' = 'true');
# Read changes between versions
changes = spark.read.format("delta") \
    .option("readChangeFeed", "true") \
    .option("startingVersion", 5) \
    .option("endingVersion", 10) \
    .table("orders")

# Changes include: _change_type (insert, update_preimage, update_postimage, delete)
changes.filter("_change_type = 'update_postimage'").show()

# Streaming change data feed
stream = spark.readStream.format("delta") \
    .option("readChangeFeed", "true") \
    .option("startingVersion", 0) \
    .table("orders")

Streaming Reads and Writes

# Streaming read from Delta
stream_df = spark.readStream.format("delta") \
    .option("maxFilesPerTrigger", 100) \
    .load("/data/orders")

# Streaming write to Delta
query = stream_df.writeStream \
    .format("delta") \
    .outputMode("append") \
    .option("checkpointLocation", "/checkpoints/orders") \
    .trigger(processingTime="10 seconds") \
    .start("/data/orders_processed")

# Streaming with foreachBatch for complex logic
def process_batch(batch_df, batch_id):
    batch_df.write.format("delta").mode("append").save("/data/output")

stream_df.writeStream \
    .foreachBatch(process_batch) \
    .option("checkpointLocation", "/checkpoints/custom") \
    .start()

Delta Lake with delta-rs (No Spark)

from deltalake import DeltaTable, write_deltalake
import pandas as pd

# Write pandas DataFrame to Delta
df = pd.DataFrame({"id": [1, 2], "name": ["Alice", "Bob"]})
write_deltalake("/data/table", df, mode="append")

# Read Delta table
dt = DeltaTable("/data/table")
print(dt.version())
print(dt.history())

# Compact files
dt.optimize.compact()
dt.optimize.z_order(["name"])

# Vacuum old files
dt.vacuum(retention_hours=168, enforce_retention_duration=False, dry_run=False)

Liquid Clustering (Databricks)

-- Create table with liquid clustering
CREATE TABLE orders USING DELTA
CLUSTER BY (order_date, region);

-- Trigger clustering
OPTIMIZE orders;

-- Change clustering columns without rewriting
ALTER TABLE orders CLUSTER BY (customer_id, order_date);

Troubleshooting

IssueSolution
Small file problemRun OPTIMIZE regularly. Enable autoOptimize.optimizeWrite and autoCompact
Slow queriesUse Z-ORDER on frequently filtered columns. Check partition pruning
VACUUM deleting needed filesEnsure retention > 7 days. Don’t set retentionDurationCheck.enabled=false in production
Schema mismatch on writeEnable schema.autoMerge.enabled or use mergeSchema option
Concurrent write conflictsDelta uses optimistic concurrency. Retry conflicting operations
Transaction log growing largeDelta auto-checkpoints every 10 commits. Run VACUUM to clean up
Time travel version not foundCheck logRetentionDuration. Old versions may have been vacuumed
OOM during OPTIMIZEReduce spark.databricks.delta.optimize.maxFileSize. Process fewer partitions
Streaming checkpoint corruptionDelete checkpoint dir and restart from a known version
Slow MERGE operationsEnsure join keys are partition columns. Use optimizeInsertOnlyMerge where applicable