Use an E-MapReduce (EMR) Dataflow cluster to access Apache Hudi tables stored in a DataLake cluster or custom cluster through the unified metadata service of Data Lake Formation (DLF). This topic walks you through connecting a Dataflow cluster to DLF, writing data to a Hudi table using Flink SQL, and querying that data from a DataLake cluster.
Prerequisites
Before you begin, ensure that you have:
-
A Dataflow cluster and a DataLake cluster created in the EMR console, both belonging to the same virtual private cloud (VPC). For more information, see Create a cluster.
ImportantWhen creating the DataLake cluster, select DLF Unified Metadata for the Metadata parameter.
-
DLF activated. For more information, see Getting started.
Limitations
The Dataflow cluster version must be EMR V3.38.3 or later, and must not exceed EMR V3.50.x or EMR V5.16.x.
Step 1: Copy the Hive configuration file
Copy hive-site.xml from the DataLake cluster to the Dataflow cluster. The file is in the directory specified by the ${HIVE_CONF_DIR} parameter (for example, /etc/taihao-apps/hive-conf/).
Run the following commands on the Dataflow cluster:
mkdir /etc/taihao-apps/hive-conf
scp root@<internal-ip-of-master-1-1>:/etc/taihao-apps/hive-conf/hive-site.xml /etc/taihao-apps/hive-conf/
Replace <internal-ip-of-master-1-1> with the internal IP address of the master-1-1 node in the DataLake cluster.
Step 2: Start Flink SQL
Place the DLF JAR before the Hive JAR. The DLF JAR already includes Hudi dependencies, so no separate Hudi JAR is needed. All Hive dependencies use version 2.3.6 regardless of the Hive version installed in the DataLake cluster.
-
Start a Flink YARN session:
yarn-session.sh -d -qu default -
Start Flink SQL, loading the DLF and Hive JARs in that order:
sql-client.sh \ -j /opt/apps/FLINK/flink-current/opt/catalogs/dlf/ververica-connector-dlf-1.15-vvr-6.0.4-SNAPSHOT-jar-with-dependencies.jar \ -j /opt/apps/FLINK/flink-current/opt/catalogs/hive-2.3.6/ververica-connector-hive-2.3.6-1.15-vvr-6.0.4-SNAPSHOT-jar-with-dependencies.jarNoteReplace the JAR version numbers with the versions installed on your cluster.
-
Set the following configurations in the Flink SQL client:
-- Enable detailed log output. SET sql-client.verbose=true; -- Display results in table format. SET sql-client.execution.result-mode=tableau; -- Set checkpoint interval to 1 second. Data is visible only after a checkpoint completes. SET execution.checkpointing.interval=1000;
Step 3: Create a DLF catalog
Run the following statement to create a DLF catalog:
CREATE CATALOG dlf_catalog WITH (
'type' = 'dlf',
'access.key.id' = '<yourAccessKeyId>', -- Required. AccessKey ID of your Alibaba Cloud account.
'access.key.secret' = '<yourAccessKeySecret>', -- Required. AccessKey secret of your Alibaba Cloud account.
'warehouse' = 'oss://<bucket>/<object>', -- Required. OSS path where data is stored. Check the OSS console for the bucket and path.
'oss.endpoint' = '<oss-endpoint>', -- Required. Get the value of fs.oss.endpoint from ${HADOOP_CONF_DIR}/core-site.xml.
'dlf.endpoint' = '<dlf-endpoint>', -- Required. Get the value of dlf.catalog.endpoint from /etc/taihao-apps/hive-conf/hive-site.xml.
'dlf.region-id' = '<region-id>' -- Required. Get the value of dlf.catalog.region from /etc/taihao-apps/hive-conf/hive-site.xml.
);
A successful run returns:
[INFO] Execute statement succeed.
Step 4: Write data to a Hudi table
Use a Datagen connector to generate sample data and write it to a Hudi table in the DLF catalog.
-- Create a source table that generates random data at 10 rows per second.
CREATE TABLE datagen_source (
uuid INT,
age INT,
ts BIGINT
) WITH (
'connector' = 'datagen',
'rows-per-second' = '10'
);
-- Create a Hudi database and table in the DLF catalog.
CREATE DATABASE dlf_catalog.testdb;
CREATE TABLE dlf_catalog.testdb.hudi_tbl1 (
id INT NOT NULL,
age INT,
ts BIGINT
) WITH (
'connector' = 'hudi',
'path' = 'oss://<bucket>/<object>/testdb/hudi_tbl1', -- oss://<bucket>/<object> is the warehouse specified when the DLF catalog is created.
'table.type' = 'COPY_ON_WRITE',
'hoodie.datasource.write.recordkey.field' = 'id',
'hive_sync.enable' = 'true',
'hive_sync.table' = 'hudi_tbl1', -- Required. Hive table name.
'hive_sync.db' = 'testdb', -- Required. Hive database name.
'hive_sync.mode' = 'hms' -- Required. Must be set to hms; the default is jdbc.
);
-- Write the generated data into the Hudi table.
INSERT INTO dlf_catalog.testdb.hudi_tbl1
SELECT uuid AS id, age, ts
FROM default_catalog.default_database.datagen_source;
-- Query data to verify the write.
SELECT * FROM dlf_catalog.testdb.hudi_tbl1;
Step 5: Query data from the DataLake cluster
Log on to the DataLake cluster. For more information, see Log on to a cluster. Then query the Hudi table using either Spark SQL or Hive CLI.
Use Spark SQL
For more information on using Spark with Hudi, see Integrate Hudi with Spark SQL.
-
Start Spark SQL:
spark-sql \ --conf 'spark.serializer=org.apache.spark.serializer.KryoSerializer' \ --conf 'spark.sql.extensions=org.apache.spark.sql.hudi.HoodieSparkSessionExtension'If your cluster uses Spark 3 and the version of Hudi is 0.11 or later, you must also add the following configuration:
--conf 'spark.sql.catalog.spark_catalog=org.apache.spark.sql.hudi.catalog.HoodieCatalog' -
Query the Hudi table:
SELECT * FROM testdb.hudi_tbl1;
Use Hive CLI
-
Start Hive CLI:
hive -
Query the Hudi table:
SELECT * FROM testdb.hudi_tbl1;