콘텐츠로 이동

Apache Hudi Cheat Sheet

Overview

Apache Hudi (Hadoop Upserts Deletes and Incrementals) is an open-source data lake framework that provides record-level insert, update, and delete capabilities on data stored in cloud storage or HDFS. Originally developed at Uber to support their massive-scale data lake, Hudi manages the storage of large analytical datasets while providing streaming-style ingestion, incremental processing, and efficient query performance.

Hudi introduces two storage types: Copy-on-Write (CoW) tables that store data in columnar Parquet and rewrite files on every update for read-optimized performance, and Merge-on-Read (MoR) tables that log changes to row-based delta files and compact them asynchronously for write-optimized performance. Hudi integrates with Spark, Flink, Presto, Trino, Hive, and other query engines, and provides built-in table services like compaction, clustering, cleaning, and indexing.

Installation

Spark with Hudi

# PySpark with Hudi
pyspark \
    --packages org.apache.hudi:hudi-spark3.5-bundle_2.12:0.15.0 \
    --conf "spark.serializer=org.apache.spark.serializer.KryoSerializer" \
    --conf "spark.sql.catalog.spark_catalog=org.apache.spark.sql.hudi.catalog.HoodieCatalog" \
    --conf "spark.sql.extensions=org.apache.spark.sql.hudi.HoodieSparkSessionExtension"

# Maven dependency
# org.apache.hudi:hudi-spark3.5-bundle_2.12:0.15.0
# Download Hudi Flink bundle
wget https://repo1.maven.org/maven2/org/apache/hudi/hudi-flink1.18-bundle/0.15.0/hudi-flink1.18-bundle-0.15.0.jar

# Start Flink SQL client
sql-client.sh -j hudi-flink1.18-bundle-0.15.0.jar

Hudi CLI

# Download and run Hudi CLI
wget https://repo1.maven.org/maven2/org/apache/hudi/hudi-cli-bundle_2.12/0.15.0/hudi-cli-bundle_2.12-0.15.0.jar

java -jar hudi-cli-bundle_2.12-0.15.0.jar

Core Operations

Write Data (PySpark)

from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer") \
    .config("spark.sql.extensions", "org.apache.spark.sql.hudi.HoodieSparkSessionExtension") \
    .getOrCreate()

# Hudi write options
hudi_options = {
    "hoodie.table.name": "orders",
    "hoodie.datasource.write.recordkey.field": "order_id",
    "hoodie.datasource.write.precombine.field": "updated_at",
    "hoodie.datasource.write.partitionpath.field": "order_date",
    "hoodie.datasource.write.table.type": "COPY_ON_WRITE",
    "hoodie.datasource.write.operation": "upsert",
    "hoodie.upsert.shuffle.parallelism": 2,
    "hoodie.insert.shuffle.parallelism": 2,
}

table_path = "s3://bucket/hudi/orders"

# Insert / Upsert
df.write.format("hudi") \
    .options(**hudi_options) \
    .mode("append") \
    .save(table_path)

# Overwrite
df.write.format("hudi") \
    .options(**hudi_options) \
    .mode("overwrite") \
    .save(table_path)

Read Data

# Snapshot query (latest state)
df = spark.read.format("hudi").load(table_path)

# Point-in-time query
df = spark.read.format("hudi") \
    .option("as.of.instant", "20240615120000000") \
    .load(table_path)

# Incremental query (changes since a commit)
df = spark.read.format("hudi") \
    .option("hoodie.datasource.query.type", "incremental") \
    .option("hoodie.datasource.read.begin.instanttime", "20240615000000000") \
    .option("hoodie.datasource.read.end.instanttime", "20240616000000000") \
    .load(table_path)

# Read optimized query (MoR tables only - skip delta logs)
df = spark.read.format("hudi") \
    .option("hoodie.datasource.query.type", "read_optimized") \
    .load(table_path)

SQL Operations

-- Create Hudi table
CREATE TABLE orders (
    order_id BIGINT,
    customer_id BIGINT,
    amount DECIMAL(10,2),
    status STRING,
    order_date STRING,
    updated_at TIMESTAMP
)
USING hudi
TBLPROPERTIES (
    type = 'cow',
    primaryKey = 'order_id',
    preCombineField = 'updated_at'
)
PARTITIONED BY (order_date)
LOCATION 's3://bucket/hudi/orders';

-- Insert
INSERT INTO orders VALUES (1, 100, 99.99, 'new', '2024-06-15', TIMESTAMP '2024-06-15 10:00:00');

-- Update
UPDATE orders SET status = 'shipped', updated_at = current_timestamp() WHERE order_id = 1;

-- Delete
DELETE FROM orders WHERE status = 'cancelled';

-- Merge
MERGE INTO orders AS target
USING updates AS source
ON target.order_id = source.order_id
WHEN MATCHED THEN UPDATE SET *
WHEN NOT MATCHED THEN INSERT *;

Table Types Comparison

FeatureCopy-on-Write (CoW)Merge-on-Read (MoR)
Write speedSlower (rewrites files)Faster (appends to log)
Read speedFaster (only Parquet)Slower (merges at read)
StorageHigher (full rewrites)Lower (delta logs)
Best forRead-heavy workloadsWrite-heavy / streaming
File formatParquet onlyParquet + Avro log files
CompactionNot neededRequired periodically

CLI Commands

CommandDescription
connect --path <table_path>Connect to a Hudi table
descDescribe the connected table
commits showShow commit timeline
commits rollback --commit <id>Rollback to a commit
compactions show allShow compaction plan
compaction scheduleSchedule a compaction
cleans showShow cleaning history
stats waShow write amplification stats
savepoints showShow savepoints
savepoint create --commit <id>Create a savepoint

Configuration

Key Write Configuration

hudi_options = {
    # Table identity
    "hoodie.table.name": "my_table",
    "hoodie.datasource.write.recordkey.field": "id",
    "hoodie.datasource.write.precombine.field": "ts",
    "hoodie.datasource.write.partitionpath.field": "date",

    # Table type
    "hoodie.datasource.write.table.type": "MERGE_ON_READ",  # or COPY_ON_WRITE

    # Operation type
    "hoodie.datasource.write.operation": "upsert",  # insert, bulk_insert, delete

    # Index type (for fast upserts)
    "hoodie.index.type": "BLOOM",  # BLOOM, SIMPLE, GLOBAL_BLOOM, BUCKET, RECORD_INDEX

    # Parallelism
    "hoodie.upsert.shuffle.parallelism": 200,
    "hoodie.insert.shuffle.parallelism": 200,
    "hoodie.bulkinsert.shuffle.parallelism": 200,

    # Compaction (MoR only)
    "hoodie.compact.inline": "true",
    "hoodie.compact.inline.max.delta.commits": 5,

    # Cleaning
    "hoodie.cleaner.policy": "KEEP_LATEST_COMMITS",
    "hoodie.cleaner.commits.retained": 10,

    # Clustering
    "hoodie.clustering.inline": "true",
    "hoodie.clustering.inline.max.commits": 4,
}

Advanced Usage

Incremental ETL Pipeline

# Read incremental changes from source table
incremental_df = spark.read.format("hudi") \
    .option("hoodie.datasource.query.type", "incremental") \
    .option("hoodie.datasource.read.begin.instanttime", last_checkpoint) \
    .load("s3://bucket/hudi/source_table")

# Transform
transformed_df = incremental_df \
    .filter("status != 'deleted'") \
    .withColumn("processed_at", current_timestamp())

# Write to target table
transformed_df.write.format("hudi") \
    .options(**target_hudi_options) \
    .mode("append") \
    .save("s3://bucket/hudi/target_table")
-- Flink SQL: Create Hudi sink
CREATE TABLE hudi_orders (
    order_id BIGINT PRIMARY KEY NOT ENFORCED,
    customer_id BIGINT,
    amount DECIMAL(10,2),
    order_date STRING,
    ts TIMESTAMP(3)
)
PARTITIONED BY (order_date)
WITH (
    'connector' = 'hudi',
    'path' = 's3://bucket/hudi/orders',
    'table.type' = 'MERGE_ON_READ',
    'hoodie.datasource.write.recordkey.field' = 'order_id',
    'write.precombine.field' = 'ts',
    'write.tasks' = '4',
    'compaction.tasks' = '2',
    'compaction.delta_commits' = '5'
);

-- Stream from Kafka to Hudi
INSERT INTO hudi_orders
SELECT order_id, customer_id, amount, order_date, event_time
FROM kafka_source;

Record-Level Indexing

# Use record-level index for fast lookups
hudi_options.update({
    "hoodie.index.type": "RECORD_INDEX",
    "hoodie.metadata.enable": "true",
    "hoodie.metadata.index.column.stats.enable": "true",
})

Troubleshooting

IssueSolution
Slow upsertsSwitch to BUCKET index for large tables. Increase shuffle parallelism
MoR read performance degradationRun compaction more frequently. Reduce compact.inline.max.delta.commits
Out of memory during compactionIncrease executor memory. Reduce compaction parallelism
Duplicate recordsVerify recordkey.field uniqueness. Check precombine.field ordering
Partition path errorsEnsure partitionpath.field exists and has consistent format
Timeline server issuesClear .hoodie metadata directory. Restart with fresh timeline
S3 consistency issuesEnable hoodie.consistency.check.enabled. Use S3a committer
Clustering failuresReduce max.num.groups. Check for data skew in clustering columns
Schema evolution errorsEnable hoodie.schema.on.read.enable. Use reconcileSchema option
Savepoint restore failsEnsure savepoint files exist. Use commits rollback as alternative