Overview
Apache Hudi (Hadoop Upserts Deletes and Incrementals) is an open-source data lake framework that provides record-level insert, update, and delete capabilities on data stored in cloud storage or HDFS. Originally developed at Uber to support their massive-scale data lake, Hudi manages the storage of large analytical datasets while providing streaming-style ingestion, incremental processing, and efficient query performance.
Hudi introduces two storage types: Copy-on-Write (CoW) tables that store data in columnar Parquet and rewrite files on every update for read-optimized performance, and Merge-on-Read (MoR) tables that log changes to row-based delta files and compact them asynchronously for write-optimized performance. Hudi integrates with Spark, Flink, Presto, Trino, Hive, and other query engines, and provides built-in table services like compaction, clustering, cleaning, and indexing.
Installation
Spark with Hudi
# PySpark with Hudi
pyspark \
--packages org.apache.hudi:hudi-spark3.5-bundle_2.12:0.15.0 \
--conf "spark.serializer=org.apache.spark.serializer.KryoSerializer" \
--conf "spark.sql.catalog.spark_catalog=org.apache.spark.sql.hudi.catalog.HoodieCatalog" \
--conf "spark.sql.extensions=org.apache.spark.sql.hudi.HoodieSparkSessionExtension"
# Maven dependency
# org.apache.hudi:hudi-spark3.5-bundle_2.12:0.15.0
Flink with Hudi
# Download Hudi Flink bundle
wget https://repo1.maven.org/maven2/org/apache/hudi/hudi-flink1.18-bundle/0.15.0/hudi-flink1.18-bundle-0.15.0.jar
# Start Flink SQL client
sql-client.sh -j hudi-flink1.18-bundle-0.15.0.jar
Hudi CLI
# Download and run Hudi CLI
wget https://repo1.maven.org/maven2/org/apache/hudi/hudi-cli-bundle_2.12/0.15.0/hudi-cli-bundle_2.12-0.15.0.jar
java -jar hudi-cli-bundle_2.12-0.15.0.jar
Core Operations
Write Data (PySpark)
from pyspark.sql import SparkSession
spark = SparkSession.builder \
.config("spark.serializer", "org.apache.spark.serializer.KryoSerializer") \
.config("spark.sql.extensions", "org.apache.spark.sql.hudi.HoodieSparkSessionExtension") \
.getOrCreate()
# Hudi write options
hudi_options = {
"hoodie.table.name": "orders",
"hoodie.datasource.write.recordkey.field": "order_id",
"hoodie.datasource.write.precombine.field": "updated_at",
"hoodie.datasource.write.partitionpath.field": "order_date",
"hoodie.datasource.write.table.type": "COPY_ON_WRITE",
"hoodie.datasource.write.operation": "upsert",
"hoodie.upsert.shuffle.parallelism": 2,
"hoodie.insert.shuffle.parallelism": 2,
}
table_path = "s3://bucket/hudi/orders"
# Insert / Upsert
df.write.format("hudi") \
.options(**hudi_options) \
.mode("append") \
.save(table_path)
# Overwrite
df.write.format("hudi") \
.options(**hudi_options) \
.mode("overwrite") \
.save(table_path)
Read Data
# Snapshot query (latest state)
df = spark.read.format("hudi").load(table_path)
# Point-in-time query
df = spark.read.format("hudi") \
.option("as.of.instant", "20240615120000000") \
.load(table_path)
# Incremental query (changes since a commit)
df = spark.read.format("hudi") \
.option("hoodie.datasource.query.type", "incremental") \
.option("hoodie.datasource.read.begin.instanttime", "20240615000000000") \
.option("hoodie.datasource.read.end.instanttime", "20240616000000000") \
.load(table_path)
# Read optimized query (MoR tables only - skip delta logs)
df = spark.read.format("hudi") \
.option("hoodie.datasource.query.type", "read_optimized") \
.load(table_path)
SQL Operations
-- Create Hudi table
CREATE TABLE orders (
order_id BIGINT,
customer_id BIGINT,
amount DECIMAL(10,2),
status STRING,
order_date STRING,
updated_at TIMESTAMP
)
USING hudi
TBLPROPERTIES (
type = 'cow',
primaryKey = 'order_id',
preCombineField = 'updated_at'
)
PARTITIONED BY (order_date)
LOCATION 's3://bucket/hudi/orders';
-- Insert
INSERT INTO orders VALUES (1, 100, 99.99, 'new', '2024-06-15', TIMESTAMP '2024-06-15 10:00:00');
-- Update
UPDATE orders SET status = 'shipped', updated_at = current_timestamp() WHERE order_id = 1;
-- Delete
DELETE FROM orders WHERE status = 'cancelled';
-- Merge
MERGE INTO orders AS target
USING updates AS source
ON target.order_id = source.order_id
WHEN MATCHED THEN UPDATE SET *
WHEN NOT MATCHED THEN INSERT *;
Table Types Comparison
| Feature | Copy-on-Write (CoW) | Merge-on-Read (MoR) |
|---|
| Write speed | Slower (rewrites files) | Faster (appends to log) |
| Read speed | Faster (only Parquet) | Slower (merges at read) |
| Storage | Higher (full rewrites) | Lower (delta logs) |
| Best for | Read-heavy workloads | Write-heavy / streaming |
| File format | Parquet only | Parquet + Avro log files |
| Compaction | Not needed | Required periodically |
CLI Commands
| Command | Description |
|---|
connect --path <table_path> | Connect to a Hudi table |
desc | Describe the connected table |
commits show | Show commit timeline |
commits rollback --commit <id> | Rollback to a commit |
compactions show all | Show compaction plan |
compaction schedule | Schedule a compaction |
cleans show | Show cleaning history |
stats wa | Show write amplification stats |
savepoints show | Show savepoints |
savepoint create --commit <id> | Create a savepoint |
Configuration
Key Write Configuration
hudi_options = {
# Table identity
"hoodie.table.name": "my_table",
"hoodie.datasource.write.recordkey.field": "id",
"hoodie.datasource.write.precombine.field": "ts",
"hoodie.datasource.write.partitionpath.field": "date",
# Table type
"hoodie.datasource.write.table.type": "MERGE_ON_READ", # or COPY_ON_WRITE
# Operation type
"hoodie.datasource.write.operation": "upsert", # insert, bulk_insert, delete
# Index type (for fast upserts)
"hoodie.index.type": "BLOOM", # BLOOM, SIMPLE, GLOBAL_BLOOM, BUCKET, RECORD_INDEX
# Parallelism
"hoodie.upsert.shuffle.parallelism": 200,
"hoodie.insert.shuffle.parallelism": 200,
"hoodie.bulkinsert.shuffle.parallelism": 200,
# Compaction (MoR only)
"hoodie.compact.inline": "true",
"hoodie.compact.inline.max.delta.commits": 5,
# Cleaning
"hoodie.cleaner.policy": "KEEP_LATEST_COMMITS",
"hoodie.cleaner.commits.retained": 10,
# Clustering
"hoodie.clustering.inline": "true",
"hoodie.clustering.inline.max.commits": 4,
}
Advanced Usage
Incremental ETL Pipeline
# Read incremental changes from source table
incremental_df = spark.read.format("hudi") \
.option("hoodie.datasource.query.type", "incremental") \
.option("hoodie.datasource.read.begin.instanttime", last_checkpoint) \
.load("s3://bucket/hudi/source_table")
# Transform
transformed_df = incremental_df \
.filter("status != 'deleted'") \
.withColumn("processed_at", current_timestamp())
# Write to target table
transformed_df.write.format("hudi") \
.options(**target_hudi_options) \
.mode("append") \
.save("s3://bucket/hudi/target_table")
Streaming Ingestion with Flink
-- Flink SQL: Create Hudi sink
CREATE TABLE hudi_orders (
order_id BIGINT PRIMARY KEY NOT ENFORCED,
customer_id BIGINT,
amount DECIMAL(10,2),
order_date STRING,
ts TIMESTAMP(3)
)
PARTITIONED BY (order_date)
WITH (
'connector' = 'hudi',
'path' = 's3://bucket/hudi/orders',
'table.type' = 'MERGE_ON_READ',
'hoodie.datasource.write.recordkey.field' = 'order_id',
'write.precombine.field' = 'ts',
'write.tasks' = '4',
'compaction.tasks' = '2',
'compaction.delta_commits' = '5'
);
-- Stream from Kafka to Hudi
INSERT INTO hudi_orders
SELECT order_id, customer_id, amount, order_date, event_time
FROM kafka_source;
Record-Level Indexing
# Use record-level index for fast lookups
hudi_options.update({
"hoodie.index.type": "RECORD_INDEX",
"hoodie.metadata.enable": "true",
"hoodie.metadata.index.column.stats.enable": "true",
})
Troubleshooting
| Issue | Solution |
|---|
| Slow upserts | Switch to BUCKET index for large tables. Increase shuffle parallelism |
| MoR read performance degradation | Run compaction more frequently. Reduce compact.inline.max.delta.commits |
| Out of memory during compaction | Increase executor memory. Reduce compaction parallelism |
| Duplicate records | Verify recordkey.field uniqueness. Check precombine.field ordering |
| Partition path errors | Ensure partitionpath.field exists and has consistent format |
| Timeline server issues | Clear .hoodie metadata directory. Restart with fresh timeline |
| S3 consistency issues | Enable hoodie.consistency.check.enabled. Use S3a committer |
| Clustering failures | Reduce max.num.groups. Check for data skew in clustering columns |
| Schema evolution errors | Enable hoodie.schema.on.read.enable. Use reconcileSchema option |
| Savepoint restore fails | Ensure savepoint files exist. Use commits rollback as alternative |