Delta Lake Cheat Sheet
Overview
Delta Lake is an open-source storage layer that brings reliability and performance to data lakes. Originally created by Databricks, it extends Parquet files with a transaction log that enables ACID transactions, scalable metadata handling, schema enforcement, time travel, and unified batch and streaming processing. Delta Lake tables are simply directories of Parquet files with a _delta_log subdirectory containing JSON transaction logs.
Delta Lake is the foundation of the Databricks Lakehouse Platform and is widely used in both open-source Spark environments and commercial deployments. It supports DML operations (UPDATE, DELETE, MERGE), schema evolution, change data feed for downstream CDC, optimistic concurrency control, and Z-order clustering for query optimization. Delta Lake works with Spark, Flink, Trino, Presto, and other engines through Delta Connectors and the Delta UniForm interoperability layer.
Installation
Spark with Delta Lake
# PySpark with Delta Lake
pyspark \
--packages io.delta:delta-spark_2.12:3.1.0 \
--conf "spark.sql.extensions=io.delta.sql.DeltaSparkSessionExtension" \
--conf "spark.sql.catalog.spark_catalog=org.apache.spark.sql.delta.catalog.DeltaCatalog"
# pip install for Python projects
pip install delta-spark==3.1.0 pyspark==3.5.0
# Standalone Delta (without Spark)
pip install deltalake # delta-rs Python bindings
PySpark Session Setup
from pyspark.sql import SparkSession
spark = SparkSession.builder \
.appName("DeltaLake") \
.config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
.config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog") \
.config("spark.jars.packages", "io.delta:delta-spark_2.12:3.1.0") \
.getOrCreate()
delta-rs (Rust/Python Native)
# Native Python Delta Lake (no Spark required)
import deltalake
# Read a Delta table
dt = deltalake.DeltaTable("s3://bucket/delta-table")
df = dt.to_pandas()
# Or with PyArrow
table = dt.to_pyarrow_table()
Core Operations
Create and Write
# Create Delta table from DataFrame
df = spark.createDataFrame([
(1, "Alice", 100.0, "2024-06-01"),
(2, "Bob", 200.0, "2024-06-01"),
], ["id", "name", "amount", "date"])
# Write as Delta table
df.write.format("delta").mode("overwrite").save("/data/customers")
# Write with partitioning
df.write.format("delta") \
.partitionBy("date") \
.mode("append") \
.save("/data/orders")
# Save as managed table
df.write.format("delta").saveAsTable("default.customers")
Read
# Read Delta table
df = spark.read.format("delta").load("/data/customers")
# Read specific version (time travel)
df = spark.read.format("delta").option("versionAsOf", 3).load("/data/customers")
# Read at specific timestamp
df = spark.read.format("delta") \
.option("timestampAsOf", "2024-06-01T00:00:00") \
.load("/data/customers")
SQL Operations
-- Create Delta table
CREATE TABLE orders (
order_id BIGINT,
customer_id BIGINT,
amount DECIMAL(10,2),
order_date DATE
)
USING DELTA
PARTITIONED BY (order_date)
LOCATION 's3://bucket/orders';
-- Insert
INSERT INTO orders VALUES (1, 100, 99.99, DATE '2024-06-15');
-- Update
UPDATE orders SET amount = 109.99 WHERE order_id = 1;
-- Delete
DELETE FROM orders WHERE order_date < '2023-01-01';
-- Merge (upsert)
MERGE INTO orders target
USING updates source
ON target.order_id = source.order_id
WHEN MATCHED THEN UPDATE SET *
WHEN NOT MATCHED THEN INSERT *;
-- Time travel
SELECT * FROM orders VERSION AS OF 5;
SELECT * FROM orders TIMESTAMP AS OF '2024-06-01';
Table Management Commands
| Command | Description |
|---|---|
DESCRIBE HISTORY orders | Show table history and versions |
DESCRIBE DETAIL orders | Show table metadata (files, size, partitions) |
DESCRIBE TABLE orders | Show table schema |
SHOW TBLPROPERTIES orders | Show table properties |
ALTER TABLE orders ADD COLUMNS (col TYPE) | Add columns |
ALTER TABLE orders SET TBLPROPERTIES (...) | Set properties |
OPTIMIZE orders | Compact small files |
OPTIMIZE orders ZORDER BY (col1, col2) | Z-order optimize for query patterns |
VACUUM orders | Remove old files not in transaction log |
VACUUM orders RETAIN 168 HOURS | Vacuum with custom retention |
RESTORE orders TO VERSION AS OF 5 | Restore table to a specific version |
Configuration
Table Properties
ALTER TABLE orders SET TBLPROPERTIES (
'delta.autoOptimize.optimizeWrite' = 'true',
'delta.autoOptimize.autoCompact' = 'true',
'delta.logRetentionDuration' = 'interval 30 days',
'delta.deletedFileRetentionDuration' = 'interval 7 days',
'delta.dataSkippingNumIndexedCols' = 32,
'delta.checkpoint.writeStatsAsJson' = 'true',
'delta.checkpoint.writeStatsAsStruct' = 'true',
'delta.enableChangeDataFeed' = 'true',
'delta.minReaderVersion' = '2',
'delta.minWriterVersion' = '5'
);
Spark Configuration
spark.conf.set("spark.databricks.delta.optimizeWrite.enabled", "true")
spark.conf.set("spark.databricks.delta.autoCompact.enabled", "true")
spark.conf.set("spark.databricks.delta.retentionDurationCheck.enabled", "false")
spark.conf.set("spark.databricks.delta.schema.autoMerge.enabled", "true")
spark.conf.set("spark.databricks.delta.merge.optimizeInsertOnlyMerge.enabled", "true")
Advanced Usage
Change Data Feed (CDC)
-- Enable change data feed
ALTER TABLE orders SET TBLPROPERTIES ('delta.enableChangeDataFeed' = 'true');
# Read changes between versions
changes = spark.read.format("delta") \
.option("readChangeFeed", "true") \
.option("startingVersion", 5) \
.option("endingVersion", 10) \
.table("orders")
# Changes include: _change_type (insert, update_preimage, update_postimage, delete)
changes.filter("_change_type = 'update_postimage'").show()
# Streaming change data feed
stream = spark.readStream.format("delta") \
.option("readChangeFeed", "true") \
.option("startingVersion", 0) \
.table("orders")
Streaming Reads and Writes
# Streaming read from Delta
stream_df = spark.readStream.format("delta") \
.option("maxFilesPerTrigger", 100) \
.load("/data/orders")
# Streaming write to Delta
query = stream_df.writeStream \
.format("delta") \
.outputMode("append") \
.option("checkpointLocation", "/checkpoints/orders") \
.trigger(processingTime="10 seconds") \
.start("/data/orders_processed")
# Streaming with foreachBatch for complex logic
def process_batch(batch_df, batch_id):
batch_df.write.format("delta").mode("append").save("/data/output")
stream_df.writeStream \
.foreachBatch(process_batch) \
.option("checkpointLocation", "/checkpoints/custom") \
.start()
Delta Lake with delta-rs (No Spark)
from deltalake import DeltaTable, write_deltalake
import pandas as pd
# Write pandas DataFrame to Delta
df = pd.DataFrame({"id": [1, 2], "name": ["Alice", "Bob"]})
write_deltalake("/data/table", df, mode="append")
# Read Delta table
dt = DeltaTable("/data/table")
print(dt.version())
print(dt.history())
# Compact files
dt.optimize.compact()
dt.optimize.z_order(["name"])
# Vacuum old files
dt.vacuum(retention_hours=168, enforce_retention_duration=False, dry_run=False)
Liquid Clustering (Databricks)
-- Create table with liquid clustering
CREATE TABLE orders USING DELTA
CLUSTER BY (order_date, region);
-- Trigger clustering
OPTIMIZE orders;
-- Change clustering columns without rewriting
ALTER TABLE orders CLUSTER BY (customer_id, order_date);
Troubleshooting
| Issue | Solution |
|---|---|
| Small file problem | Run OPTIMIZE regularly. Enable autoOptimize.optimizeWrite and autoCompact |
| Slow queries | Use Z-ORDER on frequently filtered columns. Check partition pruning |
| VACUUM deleting needed files | Ensure retention > 7 days. Don’t set retentionDurationCheck.enabled=false in production |
| Schema mismatch on write | Enable schema.autoMerge.enabled or use mergeSchema option |
| Concurrent write conflicts | Delta uses optimistic concurrency. Retry conflicting operations |
| Transaction log growing large | Delta auto-checkpoints every 10 commits. Run VACUUM to clean up |
| Time travel version not found | Check logRetentionDuration. Old versions may have been vacuumed |
| OOM during OPTIMIZE | Reduce spark.databricks.delta.optimize.maxFileSize. Process fewer partitions |
| Streaming checkpoint corruption | Delete checkpoint dir and restart from a known version |
| Slow MERGE operations | Ensure join keys are partition columns. Use optimizeInsertOnlyMerge where applicable |