All Products
Search
Document Center

E-MapReduce:Use a Dataflow cluster to read data from and write data to Hudi tables based on DLF

Last Updated:Mar 26, 2025

You can use an E-MapReduce (EMR) Dataflow cluster to access Hudi tables in a DataLake cluster or custom cluster based on the unified metadata service of Data Lake Formation (DLF). This topic describes how to connect an EMR Dataflow cluster to DLF and read full data from a Hudi table.

Prerequisites

  • A Dataflow cluster and a DataLake cluster are created in the EMR console, and the clusters belong to the same virtual private cloud (VPC). For more information, see Create a cluster.

    Important

    You must select DLF Unified Metadata for the Metadata parameter when you create the DataLake cluster.

  • DLF is activated. For more information, see Getting started.

Limits

The version of the Dataflow cluster must be EMR V3.38.3 or later and cannot exceed EMR V3.50.x or EMR V5.16.x.

Procedure

  1. Step 1: Make preparations

  2. Step 2: Start Flink SQL

  3. Step 3: Create a catalog

  4. Step 4: Write data to a Hudi table by using Flink SQL

  5. Step 5: Query data from the Hudi table in the DataLake cluster

Step 1: Make preparations

Copy the hive-site.xml configuration file in the directory that is specified by the ${HIVE_CONF_DIR} parameter for the DataLake cluster to the Dataflow cluster.

For example, the directory specified by the ${HIVE_CONF_DIR} parameter is /etc/taihao-apps/hive-conf/.

mkdir /etc/taihao-apps/hive-conf
scp root@<Internal IP address of the master-1-1 node>:/etc/taihao-apps/hive-conf/hive-site.xml /etc/taihao-apps/hive-conf/

Step 2: Start Flink SQL

Important
  • Make sure that the dependencies of DLF are placed before the dependencies of Hive. The dependencies of Hudi are included in the dependencies of DLF.

  • You do not need to take note of the version of Hive that is installed in the DataLake cluster. All Hive dependencies use version 2.3.6.

  1. Run the following command to start a Flink YARN session:

    yarn-session.sh -d -qu default
  2. Run the following command to start Flink SQL:

    sql-client.sh \
    -j /opt/apps/FLINK/flink-current/opt/catalogs/dlf/ververica-connector-dlf-1.15-vvr-6.0.4-SNAPSHOT-jar-with-dependencies.jar \
    -j /opt/apps/FLINK/flink-current/opt/catalogs/hive-2.3.6/ververica-connector-hive-2.3.6-1.15-vvr-6.0.4-SNAPSHOT-jar-with-dependencies.jar
    Note

    Replace the version numbers of the preceding JAR packages based on the actual situation.

  3. Make the following configurations during the test.

    -- Enable detailed log output. 
    set sql-client.verbose=true;
    -- Set the result display mode to the table format. 
    set sql-client.execution.result-mode=tableau;
    -- Set the checkpoint interval to 1 second to ensure that data is visible only after a checkpoint is triggered. This configuration is mainly used for generating source data in Step 4. 
    set execution.checkpointing.interval=1000;

Step 3: Create a catalog

After you start Flink SQL, run the following command to create a DLF catalog for reading data from a Hudi table:

CREATE CATALOG dlf_catalog WITH (
     'type' = 'dlf',
     'access.key.id' = '<yourAccessKeyId>', -- The AccessKey ID of your Alibaba Cloud account. 
     'access.key.secret' = '<yourAccessKeySecret>', -- The AccessKey secret of your Alibaba Cloud account. 
     'warehouse' = 'oss://<bucket>/<object>', -- bucket: the name of your Object Storage Service (OSS) bucket. object: the path in which your data is stored. You can view the information in the OSS console. 
     'oss.endpoint' = '<oss.endpoint>', -- Obtain the value of fs.oss.endpoint from ${HADOOP_CONF_DIR}/core-site.xml. 
     'dlf.endpoint' = '<dlf.endpoint>', -- Obtain the value of dlf.catalog.endpoint from /etc/taihao-apps/hive-conf/hive-site.xml. 
     'dlf.region-id' = '<dlf.region-id>' -- Obtain the value of dlf.catalog.region from /etc/taihao-apps/hive-conf/hive-site.xml. 
 );

After the catalog is created, the following information is returned:

[INFO] Execute statement succeed.

Step 4: Write data to a Hudi table by using Flink SQL

Use a Datagen connector to randomly generate source data and write the data to a Hudi table.

-- Generate source data.
CREATE TABLE datagen_source (
  uuid int,
  age int,
  ts bigint
) WITH (
  'connector' = 'datagen',
  'rows-per-second' = '10'
);

-- Create a Hudi database and a Hudi table.
CREATE database dlf_catalog.testdb;
CREATE TABLE dlf_catalog.testdb.hudi_tbl1(
  id int NOT NULL,
  age int,
  ts bigint
)
WITH(
  'connector'='hudi',
  'path' = 'oss://<bucket>/<object>/testdb/hudi_tbl1', -- oss://<bucket>/<object> is the warehouse specified when the DLF catalog is created, testdb is the name of the created database, and hudi_tbl1 is the name of the created table. 
  'table.type'='COPY_ON_WRITE',
  'hoodie.datasource.write.recordkey.field'='id',
  'hive_sync.enable'='true',
  'hive_sync.table'='hudi_tbl1',    -- Required. The name of the Hive table. 
  'hive_sync.db'='testdb',            -- Required. The name of the Hive database. 
  'hive_sync.mode' = 'hms'          -- Required. Set the hive_sync.mode parameter to hms. The default value is jdbc. 
);

-- Write data into the data lake.
INSERT INTO dlf_catalog.testdb.hudi_tbl1
SELECT uuid AS id, age, ts
FROM default_catalog.default_database.datagen_source;

-- Query the data.
SELECT * FROM dlf_catalog.testdb.hudi_tbl1;

Step 5: Query data from the Hudi table in the DataLake cluster

Log on to the DataLake cluster and query data from the Hudi table. For more information about how to log on to a cluster, see Log on to a cluster.

  • Use Spark SQL

    For more information, see Integrate Hudi with Spark SQL.

    1. Run the following command to start Spark SQL:

      spark-sql \
      --conf 'spark.serializer=org.apache.spark.serializer.KryoSerializer' \
      --conf 'spark.sql.extensions=org.apache.spark.sql.hudi.HoodieSparkSessionExtension'

      If your cluster uses Spark 3 and the version of Hudi is 0.11 or later, you must add the following configuration:

      --conf 'spark.sql.catalog.spark_catalog=org.apache.spark.sql.hudi.catalog.HoodieCatalog'
    2. Run the following command to query data from the table:

      SELECT * FROM testdb.hudi_tbl1;
  • Use Hive CLI

    1. Run the following command to start Hive CLI:

      hive
    2. Run the following command to query data from the table:

      SELECT * FROM testdb.hudi_tbl1;