All Products
Search
Document Center

Realtime Compute for Apache Flink:Hudi (retiring)

Last Updated:Mar 26, 2026
Important

The built-in Hudi connector is no longer supported in future versions of Ververica Runtime (VVR). Use custom connectors to connect Realtime Compute for Apache Flink to Apache Hudi, or migrate to the Paimon connector for optimized features and performance.

Apache Hudi is an open-source data lake framework that manages table data stored in Object Storage Service (OSS) or Hadoop Distributed File System (HDFS). It provides ACID (Atomicity, Consistency, Isolation, Durability) guarantees, row-level upserts and deletes, automatic small-file management, and time travel queries.

Core features

Feature Description
ACID semantics Snapshot isolation by default, ensuring data consistency across concurrent reads and writes.
UPSERT semantics Combines INSERT and UPDATE: if a record does not exist, it is inserted; if it already exists, it is updated. This simplifies ETL development code.
Time travel Access historical data versions at a specific point in time, enabling efficient data auditing and quality control.

Typical scenarios

Scenario Description
Database ingestion acceleration Write Change Data Capture (CDC) data (for example, MySQL binary logs via the MySQL CDC connector) directly to a Hudi table for downstream real-time ETL—more cost-effective than offline bulk loading.
Incremental ETL Extract change data streams from Hudi incrementally for lightweight, real-time ETL. Use Apache Presto or Apache Spark for downstream OLAP.
Message queuing Use Hudi as a lightweight message queue replacement for low-volume scenarios, simplifying the application architecture.
Data backfilling Join full data and incremental data from Hudi tables in a Hive metastore to generate wide tables with minimal compute overhead.

Advantages over open-source Hudi

  • Maintenance-free: Built-in Hudi connector reduces O&M complexity and provides SLA guarantees.

  • Improved data connectivity: Decouples data from compute engines, enabling seamless migration across Apache Flink, Apache Spark, Apache Presto, and Apache Hive.

  • Simplified database-to-lake ingestion: Works with the Flink CDC connector to streamline data development.

  • Enterprise-class features: Unified metadata management via Data Lake Formation (DLF) and automatic lightweight schema changes.

  • Cost-effective storage: Data stored in Apache Parquet or Apache Avro format in Alibaba Cloud OSS, with storage and computing isolation for flexible resource scaling.

Supported configurations

Item Value
Table type Source table, sink table
Running mode Streaming mode, batch mode
Data format N/A
API type DataStream API, SQL API
Data update/deletion in sink Supported
Minimum VVR version vvr-4.0.11-flink-1.13
Supported file systems OSS, HDFS, OSS-HDFS

Metrics

Table type Metrics
Source table numRecordsIn, numRecordsInPerSecond
Sink table numRecordsOut, numRecordsOutPerSecond, currentSendTime

For metric definitions, see Metrics.

Limitations

  • Minimum engine version: vvr-4.0.11-flink-1.13 or later.

  • Supported file systems: OSS, HDFS, or OSS-HDFS only.

  • Draft jobs cannot run on session clusters.

  • Field modifications are not supported through the Hudi connector. To modify fields, use Spark SQL statements in the Data Lake Formation (DLF) console.

Syntax

CREATE TEMPORARY TABLE hudi_tbl (
  uuid BIGINT,
  data STRING,
  ts   TIMESTAMP(3),
  PRIMARY KEY(uuid) NOT ENFORCED
) WITH (
  'connector' = 'hudi',
  'path' = 'oss://<yourOSSBucket>/<Custom storage directory>',
  ...
);

Parameters in the WITH clause

Basic parameters

Common parameters

Parameter Required Default Description
connector Yes Set to hudi.
path Yes Storage path of the table. Supported formats: OSS (oss://<bucket>/<user-defined-dir>), HDFS (hdfs://<user-defined-dir>), OSS-HDFS (oss://<bucket>.<oss-hdfs-endpoint>/<user-defined-dir>). OSS-HDFS paths require VVR 8.0.3 or later. Find the OSS-HDFS endpoint in the Port section of the OSS bucket Overview page.
hoodie.datasource.write.recordkey.field No uuid Primary key field. Separate multiple fields with commas. Alternatively, use the PRIMARY KEY syntax in the DDL.
precombine.field No ts Version field used to determine update ordering. If not set, updates follow the message sequence defined by the engine.
oss.endpoint No Required when storing data in OSS or OSS-HDFS. For OSS endpoints, see Regions and endpoints. For OSS-HDFS endpoints, see the Port section of the OSS bucket Overview page.
accessKeyId No AccessKey ID. Required for OSS and OSS-HDFS. Store credentials as variables instead of hardcoding them. See Manage variables.
accessKeySecret No AccessKey secret. Required for OSS and OSS-HDFS.
Important

To protect your AccessKey pair, store the AccessKey ID and AccessKey secret as variables. See Manage variables.

Source table parameters

Parameter Required Default Description
read.streaming.enabled No false Set to true to enable streaming read. By default, snapshot read is used, which returns the latest full snapshot.
read.start-commit No (blank) Start offset for streaming read. Format: yyyyMMddHHmmss for a specific time, or earliest to read from the beginning. Leave blank to read from the latest commit.

Sink table parameters

Parameter Required Default Description
write.operation No UPSERT Write mode. Valid values: insert (append), upsert (insert or update), bulk_insert (batch append).
hive_sync.enable No false Set to true to sync metadata to Apache Hive.
hive_sync.mode No hms Sync mode. hms syncs to a Hive metastore or DLF. jdbc syncs via a Java Database Connectivity (JDBC) driver.
hive_sync.db No default Target Hive database name.
hive_sync.table No Current table name Target Hive table name. Must not contain hyphens (-).
dlf.catalog.region No Region where DLF is activated. Takes effect only when hive_sync.mode is hms. See Supported regions and endpoints. Must match the region specified by dlf.catalog.endpoint.
dlf.catalog.endpoint No DLF endpoint. Takes effect only when hive_sync.mode is hms. Use the VPC endpoint for lower latency—for example, dlf-vpc.cn-hangzhou.aliyuncs.com for the China (Hangzhou) region. See Supported regions and endpoints. For cross-VPC access, see How does Realtime Compute for Apache Flink access a service across VPCs?

Advanced parameters

Parallelism parameters

Parameter Default Description
write.tasks 4 Parallelism of write tasks. Each task writes to 1 or more buckets in sequence. Increasing this value does not increase small-file count.
write.bucket_assign.tasks Deployment parallelism Parallelism of bucket assigner operators. Increasing this value increases small-file count.
write.index_bootstrap.tasks Deployment parallelism Parallelism of index bootstrap operators. Takes effect only when index.bootstrap.enabled is true. Increasing this value improves bootstrap throughput, but checkpointing may be blocked during bootstrap—increase checkpoint failure tolerance if needed.
read.tasks 4 Parallelism of streaming and batch read operators.
compaction.tasks 4 Parallelism of online compaction operators. Online compaction consumes more resources than offline compaction; prefer offline compaction for production workloads.

Online compaction parameters

Parameter Default Description
compaction.schedule.enabled true Whether to generate compaction plans on schedule. Keep this true even when asynchronous compaction is disabled—offline compaction can then execute the scheduled plans.
compaction.async.enabled true Whether to run compaction asynchronously. Set to false to disable online compaction while keeping plan generation active.
compaction.tasks 4 Parallelism of compaction tasks.
compaction.trigger.strategy num_commits Strategy used to trigger compaction. Valid values: num_commits, time_elapsed, num_and_time, num_or_time.
compaction.delta_commits 5 Number of commits required to trigger compaction. Used with num_commits, num_and_time, or num_or_time.
compaction.delta_seconds 3600 Interval in seconds between compaction triggers. Used with time_elapsed, num_and_time, or num_or_time.
compaction.max_memory 100 MB Maximum memory for the hash map used during compaction and deduplication. Increase to 1 GB if resources allow.
compaction.target_io 500 GB Maximum I/O throughput per compaction plan.

File size parameters

These parameters control how Hudi manages file sizes to prevent small-file accumulation.

Parameter Default Description
hoodie.parquet.max.file.size 120 MB (120 × 1024 × 1024 bytes) Maximum size of a Parquet file. Data exceeding this threshold is written to a new file group.
hoodie.parquet.small.file.limit 100 MB (104,857,600 bytes) Files smaller than this threshold are treated as small files. During writes, Hudi appends to existing small files instead of creating new ones.
hoodie.copyonwrite.record.size.estimate 1 KB (1,024 bytes) Estimated record size. If not set, Hudi computes this dynamically from committed metadata.

Hadoop configuration parameters

Parameter Default Description
hadoop.${option key} Hadoop configuration items, specified with the hadoop. prefix. Supported in Hudi 0.12.0 and later. Use DDL statements to specify per-job Hadoop configurations for cross-cluster scenarios. Multiple items can be specified simultaneously.

Data write parameters

Batch write

Use batch write to import existing data from other sources into a Hudi table.

bulk_insert skips Avro serialization, compaction, and deduplication. Guarantee source data uniqueness before using this mode. bulk_insert is only valid in batch execution mode.
Parameter Default Description
write.operation upsert Write type. Set to bulk_insert for batch writes.
write.tasks Deployment parallelism Parallelism for bulk_insert tasks. The final number of output files is greater than or equal to this value (data rolls over to a new file when the 120 MB Parquet limit is reached).
write.bulk_insert.shuffle_input true Whether to shuffle input data by partition field before writing. Available in Hudi 0.11.0 and later. Reduces small-file count but may cause data skew.
write.bulk_insert.sort_input true Whether to sort input data by partition field before writing. Available in Hudi 0.11.0 and later. Reduces small-file count when a single task writes across multiple partitions.
write.sort.memory 128 Available managed memory for the sort operator, in MB.

Changelog mode

In changelog mode, Hudi retains all change events—INSERT, UPDATE_BEFORE, UPDATE_AFTER, and DELETE—enabling end-to-end near-real-time data warehousing with Flink's stateful computation. Merge On Read (MOR) tables support this mode.

In non-changelog mode, intermediate changes within a batch are merged. Snapshot read returns only the final merged result; intermediate states are not visible regardless of the write path.

After enabling changelog mode, the asynchronous compaction task still merges intermediate changes. Set compaction.delta_commits=5 and compaction.delta_seconds=3600 to give downstream consumers enough time to read records before they are compacted.

Parameter Default Description
changelog.enabled false Set to true to retain all change events. When false, only the final merged record is guaranteed; intermediate changes may be merged.

Append mode

Supported in Hudi 0.10.0 and later.

  • MOR tables: The small-file policy applies. Data is written to Apache Avro log files in append mode.

  • Copy On Write (COW) tables: The small-file policy does not apply. A new Apache Parquet file is created for each write.

Clustering parameters

Hudi supports clustering to resolve small-file accumulation in INSERT mode.

Inline clustering (COW tables only)

Parameter Default Description
write.insert.cluster false Set to true to merge small files during writes. Each INSERT operation merges existing small files, but deduplication is not performed, and write throughput decreases.

Async clustering (Hudi 0.12.0 and later)

Parameter Default Description
clustering.schedule.enabled false Set to true to periodically schedule a clustering plan.
clustering.delta_commits 4 Number of commits required to generate a clustering plan. Takes effect only when clustering.schedule.enabled is true.
clustering.async.enabled false Set to true to run the clustering plan asynchronously at regular intervals.
clustering.tasks 4 Parallelism of clustering tasks.
clustering.plan.strategy.target.file.max.bytes 1 GiB (1,073,741,824 bytes) Target maximum file size for clustering output.
clustering.plan.strategy.small.file.limit 600 Files smaller than this threshold (in bytes) are eligible for clustering.
clustering.plan.strategy.sort.columns Columns used to sort data during clustering.

Clustering plan strategies

Parameter Default Description
clustering.plan.partition.filter.mode NONE Partition filter mode. Valid values: NONE (all partitions), RECENT_DAYS (partitions from the last N days), SELECTED_PARTITIONS (specific partitions).
clustering.plan.strategy.daybased.lookback.partitions 2 Number of recent days to select partitions for clustering. Takes effect only when filter.mode is RECENT_DAYS.
clustering.plan.strategy.cluster.begin.partition Start partition for range filtering. Takes effect only when filter.mode is SELECTED_PARTITIONS.
clustering.plan.strategy.cluster.end.partition End partition for range filtering. Takes effect only when filter.mode is SELECTED_PARTITIONS.
clustering.plan.strategy.partition.regex.pattern Regular expression for selecting partitions.
clustering.plan.strategy.partition.selected Comma-separated list of selected partitions.

Choose an index type

Hudi supports two index types. Use this table to choose the one that fits your workload.

Dimension FLINK_STATE BUCKET
Storage/compute overhead Yes (state backend) None
Performance Depends on state backend Better (no state overhead)
File group flexibility Dynamically assigns records based on file size Fixed number of buckets (cannot increase after initial configuration)
Cross-partition changes Supported Not supported (exception: Change Data Capture (CDC) streaming input)
When to use Tables with fewer than 500 million records, or workloads requiring cross-partition updates Tables with more than 500 million records where state overhead is a bottleneck
When index.type is set to BUCKET, setting index.global.enabled=true has no effect—bucket index does not support cross-partition deduplication.
Parameter Default Description
index.type FLINK_STATE Index type. Valid values: FLINK_STATE, BUCKET.
hoodie.bucket.index.hash.field Primary key Hash key field for bucket index. Can be a subset of the primary key.
hoodie.bucket.index.num.buckets 4 Number of buckets per partition. Cannot be changed after the table is created.
The bucket index parameters are supported in Hudi 0.11.0 and later.

Data read parameters

Hudi supports three read patterns using the same set of parameters.

Pattern Configuration
Streaming read Set read.streaming.enabled=true and optionally read.start-commit
Incremental batch read Set both read.start-commit and read.end-commit; the interval is closed (inclusive on both ends)
Time travel Set only read.end-commit; reads a snapshot at that specific commit

Streaming read parameters

By default, reading a Hudi table uses snapshot read—the latest full snapshot is returned at once. Set read.streaming.enabled=true to switch to streaming read.

Parameter Default Description
read.streaming.enabled false Set to true to enable streaming read.
read.start-commit (blank) Start offset. Format: yyyyMMddHHmmss for a specific time, or earliest to read from the beginning. Leave blank to start from the latest commit.
clean.retain_commits 30 Maximum number of historical commits retained by the cleaner. Commits beyond this limit are deleted. For example, with a 5-minute checkpointing interval, the default value of 30 retains changelogs for at least 150 minutes.
Important

Streaming read of changelogs requires Hudi 0.10.0 or later. Compaction tasks may merge changelogs, removing intermediate records and potentially affecting downstream calculations.

Incremental read parameters

Parameter Default Description
read.start-commit Latest commit Start of the read range, in yyyyMMddHHmmss format.
read.end-commit Latest commit End of the read range, in yyyyMMddHHmmss format. The range is closed (both endpoints inclusive).

Examples

Source table

CREATE TEMPORARY TABLE blackhole (
  id INT NOT NULL PRIMARY KEY NOT ENFORCED,
  data STRING,
  ts TIMESTAMP(3)
) WITH (
  'connector' = 'blackhole'
);

CREATE TEMPORARY TABLE hudi_tbl (
  id INT NOT NULL PRIMARY KEY NOT ENFORCED,
  data STRING,
  ts TIMESTAMP(3)
) WITH (
  'connector' = 'hudi',
  'oss.endpoint' = '<yourOSSEndpoint>',
  'accessKeyId' = '${secret_values.ak_id}',
  'accessKeySecret' = '${secret_values.ak_secret}',
  'path' = 'oss://<yourOSSBucket>/<Custom storage directory>',
  'table.type' = 'MERGE_ON_READ',
  'read.streaming.enabled' = 'true'
);

-- Read from the latest commit in streaming mode and write to Blackhole.
INSERT INTO blackhole SELECT * FROM hudi_tbl;

Sink table

CREATE TEMPORARY TABLE datagen (
  id INT NOT NULL PRIMARY KEY NOT ENFORCED,
  data STRING,
  ts TIMESTAMP(3)
) WITH (
  'connector' = 'datagen',
  'rows-per-second' = '100'
);

CREATE TEMPORARY TABLE hudi_tbl (
  id INT NOT NULL PRIMARY KEY NOT ENFORCED,
  data STRING,
  ts TIMESTAMP(3)
) WITH (
  'connector' = 'hudi',
  'oss.endpoint' = '<yourOSSEndpoint>',
  'accessKeyId' = '${secret_values.ak_id}',
  'accessKeySecret' = '${secret_values.ak_secret}',
  'path' = 'oss://<yourOSSBucket>/<Custom storage directory>',
  'table.type' = 'MERGE_ON_READ'
);

INSERT INTO hudi_tbl SELECT * FROM datagen;

DataStream API

Important

To use the DataStream API, configure a DataStream connector for Realtime Compute for Apache Flink. See Settings of DataStream connectors.

Maven dependencies

Match the dependency versions to your VVR version.

<properties>
  <maven.compiler.source>8</maven.compiler.source>
  <maven.compiler.target>8</maven.compiler.target>
  <flink.version>1.15.4</flink.version>
  <hudi.version>0.13.1</hudi.version>
</properties>

<dependencies>
  <!-- Flink -->
  <dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-streaming-java</artifactId>
    <version>${flink.version}</version>
    <scope>provided</scope>
  </dependency>
  <dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-table-common</artifactId>
    <version>${flink.version}</version>
    <scope>provided</scope>
  </dependency>
  <dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-table-api-java-bridge</artifactId>
    <version>${flink.version}</version>
    <scope>provided</scope>
  </dependency>
  <dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-table-planner_2.12</artifactId>
    <version>${flink.version}</version>
    <scope>provided</scope>
  </dependency>
  <dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-clients</artifactId>
    <version>${flink.version}</version>
    <scope>provided</scope>
  </dependency>

  <!-- Hudi -->
  <dependency>
    <groupId>org.apache.hudi</groupId>
    <artifactId>hudi-flink1.15-bundle</artifactId>
    <version>${hudi.version}</version>
    <scope>provided</scope>
  </dependency>

  <!-- OSS -->
  <dependency>
    <groupId>org.apache.hadoop</groupId>
    <artifactId>hadoop-common</artifactId>
    <version>3.3.2</version>
    <scope>provided</scope>
  </dependency>
  <dependency>
    <groupId>org.apache.hadoop</groupId>
    <artifactId>hadoop-aliyun</artifactId>
    <version>3.3.2</version>
    <scope>provided</scope>
  </dependency>

  <!-- DLF -->
  <dependency>
    <groupId>com.aliyun.datalake</groupId>
    <artifactId>metastore-client-hive2</artifactId>
    <version>0.2.14</version>
    <scope>provided</scope>
  </dependency>
  <dependency>
    <groupId>org.apache.hadoop</groupId>
    <artifactId>hadoop-mapreduce-client-core</artifactId>
    <version>2.5.1</version>
    <scope>provided</scope>
  </dependency>
</dependencies>
Important

DLF dependencies conflict with open-source Apache Hive versions (hive-common, hive-exec). For local DLF testing, download the custom hive-common and hive-exec JAR packages and import them manually in IntelliJ IDEA.

Write data to Hudi

The following example writes data to a Hudi MOR table in OSS and optionally syncs metadata to DLF.

import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.data.GenericRowData;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.data.StringData;
import org.apache.hudi.common.model.HoodieTableType;
import org.apache.hudi.configuration.FlinkOptions;
import org.apache.hudi.util.HoodiePipeline;

import java.util.HashMap;
import java.util.Map;

public class FlinkHudiQuickStart {

  public static void main(String[] args) throws Exception {
    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

    String dbName = "test_db";
    String tableName = "test_tbl";
    String basePath = "oss://xxx";

    Map<String, String> options = new HashMap<>();

    // Hudi configuration
    options.put(FlinkOptions.PATH.key(), basePath);
    options.put(FlinkOptions.TABLE_TYPE.key(), HoodieTableType.MERGE_ON_READ.name());
    options.put(FlinkOptions.PRECOMBINE_FIELD.key(), "ts");
    options.put(FlinkOptions.DATABASE_NAME.key(), dbName);
    options.put(FlinkOptions.TABLE_NAME.key(), tableName);

    // OSS configuration
    // Use the public endpoint for local debugging (e.g., oss-cn-hangzhou.aliyuncs.com)
    // Use the internal endpoint for cluster submission (e.g., oss-cn-hangzhou-internal.aliyuncs.com)
    options.put("hadoop.fs.oss.accessKeyId", "xxx");
    options.put("hadoop.fs.oss.accessKeySecret", "xxx");
    options.put("hadoop.fs.oss.endpoint", "xxx");
    options.put("hadoop.fs.AbstractFileSystem.oss.impl", "org.apache.hadoop.fs.aliyun.oss.OSS");
    options.put("hadoop.fs.oss.impl", "org.apache.hadoop.fs.aliyun.oss.AliyunOSSFileSystem");

    // DLF configuration (optional — remove if not syncing to DLF)
    // Use the public endpoint for local debugging (e.g., dlf.cn-hangzhou.aliyuncs.com)
    // Use the VPC endpoint for cluster submission (e.g., dlf-vpc.cn-hangzhou.aliyuncs.com)
    options.put(FlinkOptions.HIVE_SYNC_ENABLED.key(), "true");
    options.put(FlinkOptions.HIVE_SYNC_MODE.key(), "hms");
    options.put(FlinkOptions.HIVE_SYNC_DB.key(), dbName);
    options.put(FlinkOptions.HIVE_SYNC_TABLE.key(), tableName);
    options.put("hadoop.dlf.catalog.id", "xxx");
    options.put("hadoop.dlf.catalog.accessKeyId", "xxx");
    options.put("hadoop.dlf.catalog.accessKeySecret", "xxx");
    options.put("hadoop.dlf.catalog.region", "xxx");
    options.put("hadoop.dlf.catalog.endpoint", "xxx");
    options.put("hadoop.hive.imetastoreclient.factory.class",
        "com.aliyun.datalake.metastore.hive2.DlfMetaStoreClientFactory");

    DataStream<RowData> dataStream = env.fromElements(
        GenericRowData.of(StringData.fromString("id1"), StringData.fromString("name1"), 22,
            StringData.fromString("1001"), StringData.fromString("p1")),
        GenericRowData.of(StringData.fromString("id2"), StringData.fromString("name2"), 32,
            StringData.fromString("1002"), StringData.fromString("p2"))
    );

    HoodiePipeline.Builder builder = HoodiePipeline.builder(tableName)
        .column("uuid string")
        .column("name string")
        .column("age int")
        .column("ts string")
        .column("`partition` string")
        .pk("uuid")
        .partition("partition")
        .options(options);

    // Second parameter: whether the input stream is bounded (true = batch, false = streaming)
    builder.sink(dataStream, false);
    env.execute("Flink_Hudi_Quick_Start");
  }
}

FAQ