All Products
Search
Document Center

E-MapReduce:Use a Dataflow cluster to read data from and write data to Hudi tables based on DLF

Last Updated:Mar 26, 2026

Use an E-MapReduce (EMR) Dataflow cluster to access Apache Hudi tables stored in a DataLake cluster or custom cluster through the unified metadata service of Data Lake Formation (DLF). This topic walks you through connecting a Dataflow cluster to DLF, writing data to a Hudi table using Flink SQL, and querying that data from a DataLake cluster.

Prerequisites

Before you begin, ensure that you have:

  • A Dataflow cluster and a DataLake cluster created in the EMR console, both belonging to the same virtual private cloud (VPC). For more information, see Create a cluster.

    Important

    When creating the DataLake cluster, select DLF Unified Metadata for the Metadata parameter.

  • DLF activated. For more information, see Getting started.

Limitations

The Dataflow cluster version must be EMR V3.38.3 or later, and must not exceed EMR V3.50.x or EMR V5.16.x.

Step 1: Copy the Hive configuration file

Copy hive-site.xml from the DataLake cluster to the Dataflow cluster. The file is in the directory specified by the ${HIVE_CONF_DIR} parameter (for example, /etc/taihao-apps/hive-conf/).

Run the following commands on the Dataflow cluster:

mkdir /etc/taihao-apps/hive-conf
scp root@<internal-ip-of-master-1-1>:/etc/taihao-apps/hive-conf/hive-site.xml /etc/taihao-apps/hive-conf/

Replace <internal-ip-of-master-1-1> with the internal IP address of the master-1-1 node in the DataLake cluster.

Step 2: Start Flink SQL

Important

Place the DLF JAR before the Hive JAR. The DLF JAR already includes Hudi dependencies, so no separate Hudi JAR is needed. All Hive dependencies use version 2.3.6 regardless of the Hive version installed in the DataLake cluster.

  1. Start a Flink YARN session:

    yarn-session.sh -d -qu default
  2. Start Flink SQL, loading the DLF and Hive JARs in that order:

    sql-client.sh \
    -j /opt/apps/FLINK/flink-current/opt/catalogs/dlf/ververica-connector-dlf-1.15-vvr-6.0.4-SNAPSHOT-jar-with-dependencies.jar \
    -j /opt/apps/FLINK/flink-current/opt/catalogs/hive-2.3.6/ververica-connector-hive-2.3.6-1.15-vvr-6.0.4-SNAPSHOT-jar-with-dependencies.jar
    Note

    Replace the JAR version numbers with the versions installed on your cluster.

  3. Set the following configurations in the Flink SQL client:

    -- Enable detailed log output.
    SET sql-client.verbose=true;
    -- Display results in table format.
    SET sql-client.execution.result-mode=tableau;
    -- Set checkpoint interval to 1 second. Data is visible only after a checkpoint completes.
    SET execution.checkpointing.interval=1000;

Step 3: Create a DLF catalog

Run the following statement to create a DLF catalog:

CREATE CATALOG dlf_catalog WITH (
    'type'              = 'dlf',
    'access.key.id'     = '<yourAccessKeyId>',     -- Required. AccessKey ID of your Alibaba Cloud account.
    'access.key.secret' = '<yourAccessKeySecret>', -- Required. AccessKey secret of your Alibaba Cloud account.
    'warehouse'         = 'oss://<bucket>/<object>',    -- Required. OSS path where data is stored. Check the OSS console for the bucket and path.
    'oss.endpoint'      = '<oss-endpoint>',           -- Required. Get the value of fs.oss.endpoint from ${HADOOP_CONF_DIR}/core-site.xml.
    'dlf.endpoint'      = '<dlf-endpoint>',           -- Required. Get the value of dlf.catalog.endpoint from /etc/taihao-apps/hive-conf/hive-site.xml.
    'dlf.region-id'     = '<region-id>'               -- Required. Get the value of dlf.catalog.region from /etc/taihao-apps/hive-conf/hive-site.xml.
);

A successful run returns:

[INFO] Execute statement succeed.

Step 4: Write data to a Hudi table

Use a Datagen connector to generate sample data and write it to a Hudi table in the DLF catalog.

-- Create a source table that generates random data at 10 rows per second.
CREATE TABLE datagen_source (
  uuid INT,
  age  INT,
  ts   BIGINT
) WITH (
  'connector'       = 'datagen',
  'rows-per-second' = '10'
);

-- Create a Hudi database and table in the DLF catalog.
CREATE DATABASE dlf_catalog.testdb;

CREATE TABLE dlf_catalog.testdb.hudi_tbl1 (
  id  INT NOT NULL,
  age INT,
  ts  BIGINT
) WITH (
  'connector'                               = 'hudi',
  'path'                                    = 'oss://<bucket>/<object>/testdb/hudi_tbl1', -- oss://<bucket>/<object> is the warehouse specified when the DLF catalog is created.
  'table.type'                              = 'COPY_ON_WRITE',
  'hoodie.datasource.write.recordkey.field' = 'id',
  'hive_sync.enable'                        = 'true',
  'hive_sync.table'                         = 'hudi_tbl1', -- Required. Hive table name.
  'hive_sync.db'                            = 'testdb',    -- Required. Hive database name.
  'hive_sync.mode'                          = 'hms'        -- Required. Must be set to hms; the default is jdbc.
);

-- Write the generated data into the Hudi table.
INSERT INTO dlf_catalog.testdb.hudi_tbl1
SELECT uuid AS id, age, ts
FROM default_catalog.default_database.datagen_source;

-- Query data to verify the write.
SELECT * FROM dlf_catalog.testdb.hudi_tbl1;

Step 5: Query data from the DataLake cluster

Log on to the DataLake cluster. For more information, see Log on to a cluster. Then query the Hudi table using either Spark SQL or Hive CLI.

Use Spark SQL

For more information on using Spark with Hudi, see Integrate Hudi with Spark SQL.

  1. Start Spark SQL:

    spark-sql \
      --conf 'spark.serializer=org.apache.spark.serializer.KryoSerializer' \
      --conf 'spark.sql.extensions=org.apache.spark.sql.hudi.HoodieSparkSessionExtension'

    If your cluster uses Spark 3 and the version of Hudi is 0.11 or later, you must also add the following configuration:

    --conf 'spark.sql.catalog.spark_catalog=org.apache.spark.sql.hudi.catalog.HoodieCatalog'
  2. Query the Hudi table:

    SELECT * FROM testdb.hudi_tbl1;

Use Hive CLI

  1. Start Hive CLI:

    hive
  2. Query the Hudi table:

    SELECT * FROM testdb.hudi_tbl1;