The built-in Hudi connector is no longer supported in future versions of Ververica Runtime (VVR). Use custom connectors to connect Realtime Compute for Apache Flink to Apache Hudi, or migrate to the Paimon connector for optimized features and performance.
Apache Hudi is an open-source data lake framework that manages table data stored in Object Storage Service (OSS) or Hadoop Distributed File System (HDFS). It provides ACID (Atomicity, Consistency, Isolation, Durability) guarantees, row-level upserts and deletes, automatic small-file management, and time travel queries.
Core features
| Feature | Description |
|---|---|
| ACID semantics | Snapshot isolation by default, ensuring data consistency across concurrent reads and writes. |
| UPSERT semantics | Combines INSERT and UPDATE: if a record does not exist, it is inserted; if it already exists, it is updated. This simplifies ETL development code. |
| Time travel | Access historical data versions at a specific point in time, enabling efficient data auditing and quality control. |
Typical scenarios
| Scenario | Description |
|---|---|
| Database ingestion acceleration | Write Change Data Capture (CDC) data (for example, MySQL binary logs via the MySQL CDC connector) directly to a Hudi table for downstream real-time ETL—more cost-effective than offline bulk loading. |
| Incremental ETL | Extract change data streams from Hudi incrementally for lightweight, real-time ETL. Use Apache Presto or Apache Spark for downstream OLAP. |
| Message queuing | Use Hudi as a lightweight message queue replacement for low-volume scenarios, simplifying the application architecture. |
| Data backfilling | Join full data and incremental data from Hudi tables in a Hive metastore to generate wide tables with minimal compute overhead. |
Advantages over open-source Hudi
-
Maintenance-free: Built-in Hudi connector reduces O&M complexity and provides SLA guarantees.
-
Improved data connectivity: Decouples data from compute engines, enabling seamless migration across Apache Flink, Apache Spark, Apache Presto, and Apache Hive.
-
Simplified database-to-lake ingestion: Works with the Flink CDC connector to streamline data development.
-
Enterprise-class features: Unified metadata management via Data Lake Formation (DLF) and automatic lightweight schema changes.
-
Cost-effective storage: Data stored in Apache Parquet or Apache Avro format in Alibaba Cloud OSS, with storage and computing isolation for flexible resource scaling.
Supported configurations
| Item | Value |
|---|---|
| Table type | Source table, sink table |
| Running mode | Streaming mode, batch mode |
| Data format | N/A |
| API type | DataStream API, SQL API |
| Data update/deletion in sink | Supported |
| Minimum VVR version | vvr-4.0.11-flink-1.13 |
| Supported file systems | OSS, HDFS, OSS-HDFS |
Metrics
| Table type | Metrics |
|---|---|
| Source table | numRecordsIn, numRecordsInPerSecond |
| Sink table | numRecordsOut, numRecordsOutPerSecond, currentSendTime |
For metric definitions, see Metrics.
Limitations
-
Minimum engine version: vvr-4.0.11-flink-1.13 or later.
-
Supported file systems: OSS, HDFS, or OSS-HDFS only.
-
Draft jobs cannot run on session clusters.
-
Field modifications are not supported through the Hudi connector. To modify fields, use Spark SQL statements in the Data Lake Formation (DLF) console.
Syntax
CREATE TEMPORARY TABLE hudi_tbl (
uuid BIGINT,
data STRING,
ts TIMESTAMP(3),
PRIMARY KEY(uuid) NOT ENFORCED
) WITH (
'connector' = 'hudi',
'path' = 'oss://<yourOSSBucket>/<Custom storage directory>',
...
);
Parameters in the WITH clause
Basic parameters
Common parameters
| Parameter | Required | Default | Description |
|---|---|---|---|
connector |
Yes | — | Set to hudi. |
path |
Yes | — | Storage path of the table. Supported formats: OSS (oss://<bucket>/<user-defined-dir>), HDFS (hdfs://<user-defined-dir>), OSS-HDFS (oss://<bucket>.<oss-hdfs-endpoint>/<user-defined-dir>). OSS-HDFS paths require VVR 8.0.3 or later. Find the OSS-HDFS endpoint in the Port section of the OSS bucket Overview page. |
hoodie.datasource.write.recordkey.field |
No | uuid |
Primary key field. Separate multiple fields with commas. Alternatively, use the PRIMARY KEY syntax in the DDL. |
precombine.field |
No | ts |
Version field used to determine update ordering. If not set, updates follow the message sequence defined by the engine. |
oss.endpoint |
No | — | Required when storing data in OSS or OSS-HDFS. For OSS endpoints, see Regions and endpoints. For OSS-HDFS endpoints, see the Port section of the OSS bucket Overview page. |
accessKeyId |
No | — | AccessKey ID. Required for OSS and OSS-HDFS. Store credentials as variables instead of hardcoding them. See Manage variables. |
accessKeySecret |
No | — | AccessKey secret. Required for OSS and OSS-HDFS. |
To protect your AccessKey pair, store the AccessKey ID and AccessKey secret as variables. See Manage variables.
Source table parameters
| Parameter | Required | Default | Description |
|---|---|---|---|
read.streaming.enabled |
No | false |
Set to true to enable streaming read. By default, snapshot read is used, which returns the latest full snapshot. |
read.start-commit |
No | (blank) | Start offset for streaming read. Format: yyyyMMddHHmmss for a specific time, or earliest to read from the beginning. Leave blank to read from the latest commit. |
Sink table parameters
| Parameter | Required | Default | Description |
|---|---|---|---|
write.operation |
No | UPSERT |
Write mode. Valid values: insert (append), upsert (insert or update), bulk_insert (batch append). |
hive_sync.enable |
No | false |
Set to true to sync metadata to Apache Hive. |
hive_sync.mode |
No | hms |
Sync mode. hms syncs to a Hive metastore or DLF. jdbc syncs via a Java Database Connectivity (JDBC) driver. |
hive_sync.db |
No | default |
Target Hive database name. |
hive_sync.table |
No | Current table name | Target Hive table name. Must not contain hyphens (-). |
dlf.catalog.region |
No | — | Region where DLF is activated. Takes effect only when hive_sync.mode is hms. See Supported regions and endpoints. Must match the region specified by dlf.catalog.endpoint. |
dlf.catalog.endpoint |
No | — | DLF endpoint. Takes effect only when hive_sync.mode is hms. Use the VPC endpoint for lower latency—for example, dlf-vpc.cn-hangzhou.aliyuncs.com for the China (Hangzhou) region. See Supported regions and endpoints. For cross-VPC access, see How does Realtime Compute for Apache Flink access a service across VPCs? |
Advanced parameters
Parallelism parameters
| Parameter | Default | Description |
|---|---|---|
write.tasks |
4 |
Parallelism of write tasks. Each task writes to 1 or more buckets in sequence. Increasing this value does not increase small-file count. |
write.bucket_assign.tasks |
Deployment parallelism | Parallelism of bucket assigner operators. Increasing this value increases small-file count. |
write.index_bootstrap.tasks |
Deployment parallelism | Parallelism of index bootstrap operators. Takes effect only when index.bootstrap.enabled is true. Increasing this value improves bootstrap throughput, but checkpointing may be blocked during bootstrap—increase checkpoint failure tolerance if needed. |
read.tasks |
4 |
Parallelism of streaming and batch read operators. |
compaction.tasks |
4 |
Parallelism of online compaction operators. Online compaction consumes more resources than offline compaction; prefer offline compaction for production workloads. |
Online compaction parameters
| Parameter | Default | Description |
|---|---|---|
compaction.schedule.enabled |
true |
Whether to generate compaction plans on schedule. Keep this true even when asynchronous compaction is disabled—offline compaction can then execute the scheduled plans. |
compaction.async.enabled |
true |
Whether to run compaction asynchronously. Set to false to disable online compaction while keeping plan generation active. |
compaction.tasks |
4 |
Parallelism of compaction tasks. |
compaction.trigger.strategy |
num_commits |
Strategy used to trigger compaction. Valid values: num_commits, time_elapsed, num_and_time, num_or_time. |
compaction.delta_commits |
5 |
Number of commits required to trigger compaction. Used with num_commits, num_and_time, or num_or_time. |
compaction.delta_seconds |
3600 |
Interval in seconds between compaction triggers. Used with time_elapsed, num_and_time, or num_or_time. |
compaction.max_memory |
100 MB |
Maximum memory for the hash map used during compaction and deduplication. Increase to 1 GB if resources allow. |
compaction.target_io |
500 GB |
Maximum I/O throughput per compaction plan. |
File size parameters
These parameters control how Hudi manages file sizes to prevent small-file accumulation.
| Parameter | Default | Description |
|---|---|---|
hoodie.parquet.max.file.size |
120 MB (120 × 1024 × 1024 bytes) | Maximum size of a Parquet file. Data exceeding this threshold is written to a new file group. |
hoodie.parquet.small.file.limit |
100 MB (104,857,600 bytes) | Files smaller than this threshold are treated as small files. During writes, Hudi appends to existing small files instead of creating new ones. |
hoodie.copyonwrite.record.size.estimate |
1 KB (1,024 bytes) | Estimated record size. If not set, Hudi computes this dynamically from committed metadata. |
Hadoop configuration parameters
| Parameter | Default | Description |
|---|---|---|
hadoop.${option key} |
— | Hadoop configuration items, specified with the hadoop. prefix. Supported in Hudi 0.12.0 and later. Use DDL statements to specify per-job Hadoop configurations for cross-cluster scenarios. Multiple items can be specified simultaneously. |
Data write parameters
Batch write
Use batch write to import existing data from other sources into a Hudi table.
bulk_insertskips Avro serialization, compaction, and deduplication. Guarantee source data uniqueness before using this mode.bulk_insertis only valid in batch execution mode.
| Parameter | Default | Description |
|---|---|---|
write.operation |
upsert |
Write type. Set to bulk_insert for batch writes. |
write.tasks |
Deployment parallelism | Parallelism for bulk_insert tasks. The final number of output files is greater than or equal to this value (data rolls over to a new file when the 120 MB Parquet limit is reached). |
write.bulk_insert.shuffle_input |
true |
Whether to shuffle input data by partition field before writing. Available in Hudi 0.11.0 and later. Reduces small-file count but may cause data skew. |
write.bulk_insert.sort_input |
true |
Whether to sort input data by partition field before writing. Available in Hudi 0.11.0 and later. Reduces small-file count when a single task writes across multiple partitions. |
write.sort.memory |
128 |
Available managed memory for the sort operator, in MB. |
Changelog mode
In changelog mode, Hudi retains all change events—INSERT, UPDATE_BEFORE, UPDATE_AFTER, and DELETE—enabling end-to-end near-real-time data warehousing with Flink's stateful computation. Merge On Read (MOR) tables support this mode.
In non-changelog mode, intermediate changes within a batch are merged. Snapshot read returns only the final merged result; intermediate states are not visible regardless of the write path.
After enabling changelog mode, the asynchronous compaction task still merges intermediate changes. Set compaction.delta_commits=5 and compaction.delta_seconds=3600 to give downstream consumers enough time to read records before they are compacted.
| Parameter | Default | Description |
|---|---|---|
changelog.enabled |
false |
Set to true to retain all change events. When false, only the final merged record is guaranteed; intermediate changes may be merged. |
Append mode
Supported in Hudi 0.10.0 and later.
-
MOR tables: The small-file policy applies. Data is written to Apache Avro log files in append mode.
-
Copy On Write (COW) tables: The small-file policy does not apply. A new Apache Parquet file is created for each write.
Clustering parameters
Hudi supports clustering to resolve small-file accumulation in INSERT mode.
Inline clustering (COW tables only)
| Parameter | Default | Description |
|---|---|---|
write.insert.cluster |
false |
Set to true to merge small files during writes. Each INSERT operation merges existing small files, but deduplication is not performed, and write throughput decreases. |
Async clustering (Hudi 0.12.0 and later)
| Parameter | Default | Description |
|---|---|---|
clustering.schedule.enabled |
false |
Set to true to periodically schedule a clustering plan. |
clustering.delta_commits |
4 |
Number of commits required to generate a clustering plan. Takes effect only when clustering.schedule.enabled is true. |
clustering.async.enabled |
false |
Set to true to run the clustering plan asynchronously at regular intervals. |
clustering.tasks |
4 |
Parallelism of clustering tasks. |
clustering.plan.strategy.target.file.max.bytes |
1 GiB (1,073,741,824 bytes) | Target maximum file size for clustering output. |
clustering.plan.strategy.small.file.limit |
600 |
Files smaller than this threshold (in bytes) are eligible for clustering. |
clustering.plan.strategy.sort.columns |
— | Columns used to sort data during clustering. |
Clustering plan strategies
| Parameter | Default | Description |
|---|---|---|
clustering.plan.partition.filter.mode |
NONE |
Partition filter mode. Valid values: NONE (all partitions), RECENT_DAYS (partitions from the last N days), SELECTED_PARTITIONS (specific partitions). |
clustering.plan.strategy.daybased.lookback.partitions |
2 |
Number of recent days to select partitions for clustering. Takes effect only when filter.mode is RECENT_DAYS. |
clustering.plan.strategy.cluster.begin.partition |
— | Start partition for range filtering. Takes effect only when filter.mode is SELECTED_PARTITIONS. |
clustering.plan.strategy.cluster.end.partition |
— | End partition for range filtering. Takes effect only when filter.mode is SELECTED_PARTITIONS. |
clustering.plan.strategy.partition.regex.pattern |
— | Regular expression for selecting partitions. |
clustering.plan.strategy.partition.selected |
— | Comma-separated list of selected partitions. |
Choose an index type
Hudi supports two index types. Use this table to choose the one that fits your workload.
| Dimension | FLINK_STATE | BUCKET |
|---|---|---|
| Storage/compute overhead | Yes (state backend) | None |
| Performance | Depends on state backend | Better (no state overhead) |
| File group flexibility | Dynamically assigns records based on file size | Fixed number of buckets (cannot increase after initial configuration) |
| Cross-partition changes | Supported | Not supported (exception: Change Data Capture (CDC) streaming input) |
| When to use | Tables with fewer than 500 million records, or workloads requiring cross-partition updates | Tables with more than 500 million records where state overhead is a bottleneck |
Whenindex.typeis set toBUCKET, settingindex.global.enabled=truehas no effect—bucket index does not support cross-partition deduplication.
| Parameter | Default | Description |
|---|---|---|
index.type |
FLINK_STATE |
Index type. Valid values: FLINK_STATE, BUCKET. |
hoodie.bucket.index.hash.field |
Primary key | Hash key field for bucket index. Can be a subset of the primary key. |
hoodie.bucket.index.num.buckets |
4 |
Number of buckets per partition. Cannot be changed after the table is created. |
The bucket index parameters are supported in Hudi 0.11.0 and later.
Data read parameters
Hudi supports three read patterns using the same set of parameters.
| Pattern | Configuration |
|---|---|
| Streaming read | Set read.streaming.enabled=true and optionally read.start-commit |
| Incremental batch read | Set both read.start-commit and read.end-commit; the interval is closed (inclusive on both ends) |
| Time travel | Set only read.end-commit; reads a snapshot at that specific commit |
Streaming read parameters
By default, reading a Hudi table uses snapshot read—the latest full snapshot is returned at once. Set read.streaming.enabled=true to switch to streaming read.
| Parameter | Default | Description |
|---|---|---|
read.streaming.enabled |
false |
Set to true to enable streaming read. |
read.start-commit |
(blank) | Start offset. Format: yyyyMMddHHmmss for a specific time, or earliest to read from the beginning. Leave blank to start from the latest commit. |
clean.retain_commits |
30 |
Maximum number of historical commits retained by the cleaner. Commits beyond this limit are deleted. For example, with a 5-minute checkpointing interval, the default value of 30 retains changelogs for at least 150 minutes. |
Streaming read of changelogs requires Hudi 0.10.0 or later. Compaction tasks may merge changelogs, removing intermediate records and potentially affecting downstream calculations.
Incremental read parameters
| Parameter | Default | Description |
|---|---|---|
read.start-commit |
Latest commit | Start of the read range, in yyyyMMddHHmmss format. |
read.end-commit |
Latest commit | End of the read range, in yyyyMMddHHmmss format. The range is closed (both endpoints inclusive). |
Examples
Source table
CREATE TEMPORARY TABLE blackhole (
id INT NOT NULL PRIMARY KEY NOT ENFORCED,
data STRING,
ts TIMESTAMP(3)
) WITH (
'connector' = 'blackhole'
);
CREATE TEMPORARY TABLE hudi_tbl (
id INT NOT NULL PRIMARY KEY NOT ENFORCED,
data STRING,
ts TIMESTAMP(3)
) WITH (
'connector' = 'hudi',
'oss.endpoint' = '<yourOSSEndpoint>',
'accessKeyId' = '${secret_values.ak_id}',
'accessKeySecret' = '${secret_values.ak_secret}',
'path' = 'oss://<yourOSSBucket>/<Custom storage directory>',
'table.type' = 'MERGE_ON_READ',
'read.streaming.enabled' = 'true'
);
-- Read from the latest commit in streaming mode and write to Blackhole.
INSERT INTO blackhole SELECT * FROM hudi_tbl;
Sink table
CREATE TEMPORARY TABLE datagen (
id INT NOT NULL PRIMARY KEY NOT ENFORCED,
data STRING,
ts TIMESTAMP(3)
) WITH (
'connector' = 'datagen',
'rows-per-second' = '100'
);
CREATE TEMPORARY TABLE hudi_tbl (
id INT NOT NULL PRIMARY KEY NOT ENFORCED,
data STRING,
ts TIMESTAMP(3)
) WITH (
'connector' = 'hudi',
'oss.endpoint' = '<yourOSSEndpoint>',
'accessKeyId' = '${secret_values.ak_id}',
'accessKeySecret' = '${secret_values.ak_secret}',
'path' = 'oss://<yourOSSBucket>/<Custom storage directory>',
'table.type' = 'MERGE_ON_READ'
);
INSERT INTO hudi_tbl SELECT * FROM datagen;
DataStream API
To use the DataStream API, configure a DataStream connector for Realtime Compute for Apache Flink. See Settings of DataStream connectors.
Maven dependencies
Match the dependency versions to your VVR version.
<properties>
<maven.compiler.source>8</maven.compiler.source>
<maven.compiler.target>8</maven.compiler.target>
<flink.version>1.15.4</flink.version>
<hudi.version>0.13.1</hudi.version>
</properties>
<dependencies>
<!-- Flink -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-common</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-api-java-bridge</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner_2.12</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<!-- Hudi -->
<dependency>
<groupId>org.apache.hudi</groupId>
<artifactId>hudi-flink1.15-bundle</artifactId>
<version>${hudi.version}</version>
<scope>provided</scope>
</dependency>
<!-- OSS -->
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
<version>3.3.2</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-aliyun</artifactId>
<version>3.3.2</version>
<scope>provided</scope>
</dependency>
<!-- DLF -->
<dependency>
<groupId>com.aliyun.datalake</groupId>
<artifactId>metastore-client-hive2</artifactId>
<version>0.2.14</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-mapreduce-client-core</artifactId>
<version>2.5.1</version>
<scope>provided</scope>
</dependency>
</dependencies>
DLF dependencies conflict with open-source Apache Hive versions (hive-common, hive-exec). For local DLF testing, download the custom hive-common and hive-exec JAR packages and import them manually in IntelliJ IDEA.
Write data to Hudi
The following example writes data to a Hudi MOR table in OSS and optionally syncs metadata to DLF.
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.data.GenericRowData;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.data.StringData;
import org.apache.hudi.common.model.HoodieTableType;
import org.apache.hudi.configuration.FlinkOptions;
import org.apache.hudi.util.HoodiePipeline;
import java.util.HashMap;
import java.util.Map;
public class FlinkHudiQuickStart {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
String dbName = "test_db";
String tableName = "test_tbl";
String basePath = "oss://xxx";
Map<String, String> options = new HashMap<>();
// Hudi configuration
options.put(FlinkOptions.PATH.key(), basePath);
options.put(FlinkOptions.TABLE_TYPE.key(), HoodieTableType.MERGE_ON_READ.name());
options.put(FlinkOptions.PRECOMBINE_FIELD.key(), "ts");
options.put(FlinkOptions.DATABASE_NAME.key(), dbName);
options.put(FlinkOptions.TABLE_NAME.key(), tableName);
// OSS configuration
// Use the public endpoint for local debugging (e.g., oss-cn-hangzhou.aliyuncs.com)
// Use the internal endpoint for cluster submission (e.g., oss-cn-hangzhou-internal.aliyuncs.com)
options.put("hadoop.fs.oss.accessKeyId", "xxx");
options.put("hadoop.fs.oss.accessKeySecret", "xxx");
options.put("hadoop.fs.oss.endpoint", "xxx");
options.put("hadoop.fs.AbstractFileSystem.oss.impl", "org.apache.hadoop.fs.aliyun.oss.OSS");
options.put("hadoop.fs.oss.impl", "org.apache.hadoop.fs.aliyun.oss.AliyunOSSFileSystem");
// DLF configuration (optional — remove if not syncing to DLF)
// Use the public endpoint for local debugging (e.g., dlf.cn-hangzhou.aliyuncs.com)
// Use the VPC endpoint for cluster submission (e.g., dlf-vpc.cn-hangzhou.aliyuncs.com)
options.put(FlinkOptions.HIVE_SYNC_ENABLED.key(), "true");
options.put(FlinkOptions.HIVE_SYNC_MODE.key(), "hms");
options.put(FlinkOptions.HIVE_SYNC_DB.key(), dbName);
options.put(FlinkOptions.HIVE_SYNC_TABLE.key(), tableName);
options.put("hadoop.dlf.catalog.id", "xxx");
options.put("hadoop.dlf.catalog.accessKeyId", "xxx");
options.put("hadoop.dlf.catalog.accessKeySecret", "xxx");
options.put("hadoop.dlf.catalog.region", "xxx");
options.put("hadoop.dlf.catalog.endpoint", "xxx");
options.put("hadoop.hive.imetastoreclient.factory.class",
"com.aliyun.datalake.metastore.hive2.DlfMetaStoreClientFactory");
DataStream<RowData> dataStream = env.fromElements(
GenericRowData.of(StringData.fromString("id1"), StringData.fromString("name1"), 22,
StringData.fromString("1001"), StringData.fromString("p1")),
GenericRowData.of(StringData.fromString("id2"), StringData.fromString("name2"), 32,
StringData.fromString("1002"), StringData.fromString("p2"))
);
HoodiePipeline.Builder builder = HoodiePipeline.builder(tableName)
.column("uuid string")
.column("name string")
.column("age int")
.column("ts string")
.column("`partition` string")
.pk("uuid")
.partition("partition")
.options(options);
// Second parameter: whether the input stream is bounded (true = batch, false = streaming)
builder.sink(dataStream, false);
env.execute("Flink_Hudi_Quick_Start");
}
}