全部產品
Search
文件中心

E-MapReduce:DataFlow叢集通過DLF讀寫Hudi表

更新時間:Mar 05, 2025

DataFlow叢集可以通過資料湖構建(DLF)的統一中繼資料服務,訪問DataLake叢集或自訂叢集中的Hudi表資料。本文為您介紹DataFlow叢集如何串連DLF並讀取Hudi全量資料。

前提條件

  • 已在E-MapReduce控制台上建立DataFlow叢集和DataLake叢集,且在同一VPC下,詳情請參見建立叢集

    重要

    建立DataLake叢集時,中繼資料需為DLF統一中繼資料

  • 已開通資料湖構建DLF,詳情請參見快速入門

使用限制

DataFlow叢集版本需為EMR-3.38.3及以上,且不超過EMR-3.50.x或EMR-5.16.x。

操作流程

  1. 步驟一:環境準備

  2. 步驟二:啟動Flink SQL

  3. 步驟三:建立Catalog

  4. 步驟四:Flink SQL寫入Hudi

  5. 步驟五:DataLake叢集查詢Hudi

步驟一:環境準備

拷貝DataLake叢集中${HIVE_CONF_DIR}下的hive-site.xml到DataFlow叢集。

例如,${HIVE_CONF_DIR}/etc/taihao-apps/hive-conf/

mkdir /etc/taihao-apps/hive-conf
scp root@<master-1-1節點內網的IP地址>:/etc/taihao-apps/hive-conf/hive-site.xml /etc/taihao-apps/hive-conf/

步驟二:啟動Flink SQL

重要
  • 務必將DLF的依賴包放置在Hive依賴包的前面,其中DLF依賴包中嵌入了Hudi的依賴。

  • 無需關注DataLake叢集中的Hive版本,Hive依賴均使用2.3.6版本的。

  1. 執行以下命令,啟動Flink YARN會話。

    yarn-session.sh -d -qu default
  2. 執行以下命令,啟動Flink SQL。

    sql-client.sh \
    -j /opt/apps/FLINK/flink-current/opt/catalogs/dlf/ververica-connector-dlf-1.15-vvr-6.0.4-SNAPSHOT-jar-with-dependencies.jar \
    -j /opt/apps/FLINK/flink-current/opt/catalogs/hive-2.3.6/ververica-connector-hive-2.3.6-1.15-vvr-6.0.4-SNAPSHOT-jar-with-dependencies.jar
    說明

    請根據實際情況替換上述JAR包中的版本號碼。

  3. 測試時需設定以下配置。

    -- 啟用詳細日誌輸出。
    set sql-client.verbose=true;
    -- 設定結果顯示模式為表格形式。
    set sql-client.execution.result-mode=tableau;
    -- 設定checkpoint時間間隔為1秒,確保資料在checkpoint觸發後才可見。主要用於步驟四中Source資料的產生。
    set execution.checkpointing.interval=1000;

步驟三:建立Catalog

進入Flink SQL後,執行以下命令,建立DLF Catalog,用於讀取Hudi表。

CREATE CATALOG dlf_catalog WITH (
     'type' = 'dlf',
     'access.key.id' = '<yourAccessKeyId>', --您阿里雲帳號的AccessKey ID。
     'access.key.secret' = '<yourAccessKeySecret>', --您阿里雲帳號的AccessKey Secret。
     'warehouse' = 'oss://<bucket>/<object>', -- bucket:表示您建立的OSS Bucket名稱。object:表示您存放資料的路徑。您可以在OSS管理主控台上查看。
     'oss.endpoint' = '<oss.endpoint>', --從${HADOOP_CONF_DIR}/core-site.xml中擷取fs.oss.endpoint的值。
     'dlf.endpoint' = '<dlf.endpoint>', --從/etc/taihao-apps/hive-conf/hive-site.xml中擷取dlf.catalog.endpoint的值。
     'dlf.region-id' = '<dlf.region-id>' --從/etc/taihao-apps/hive-conf/hive-site.xml中擷取dlf.catalog.region的值。
 );

Catalog建立成功後,會返回以下資訊。

[INFO] Execute statement succeed.

步驟四:Flink SQL寫入Hudi

使用Datagen Connector隨機產生上遊Source資料,入湖Hudi表。

-- 構建上遊Source資料
CREATE TABLE datagen_source (
  uuid int,
  age int,
  ts bigint
) WITH (
  'connector' = 'datagen',
  'rows-per-second' = '10'
);

-- 建立Hudi庫表
CREATE database dlf_catalog.testdb;
CREATE TABLE dlf_catalog.testdb.hudi_tbl1(
  id int NOT NULL,
  age int,
  ts bigint
)
WITH(
  'connector'='hudi',
  'path' = 'oss://<bucket>/<object>/testdb/hudi_tbl1',   -- oss://<bucket>/<object> 為建立dlf_catalog時的warehouse,testdb為建立的資料庫名,hudi_tbl1為表名。
  'table.type'='COPY_ON_WRITE',
  'hoodie.datasource.write.recordkey.field'='id',
  'hive_sync.enable'='true',
  'hive_sync.table'='hudi_tbl1',    -- required, Hive建立的表名。
  'hive_sync.db'='testdb',            -- required, Hive建立的資料庫名。
  'hive_sync.mode' = 'hms'          -- required, 將hive sync mode設定為hms, 預設jdbc。
);

--入湖
INSERT INTO dlf_catalog.testdb.hudi_tbl1
SELECT uuid AS id, age, ts
FROM default_catalog.default_database.datagen_source;

-- 查詢驗證
SELECT * FROM dlf_catalog.testdb.hudi_tbl1;

步驟五:DataLake叢集查詢Hudi

登入DataLake叢集查詢Hudi資料。登入叢集詳情請參見登入叢集

  • Spark查詢

    Spark查詢詳情,請參見Hudi與Spark SQL整合

    1. 執行以下命令,啟動spark-sql。

      spark-sql \
      --conf 'spark.serializer=org.apache.spark.serializer.KryoSerializer' \
      --conf 'spark.sql.extensions=org.apache.spark.sql.hudi.HoodieSparkSessionExtension'

      如果您叢集的Spark是Spark3,且Hudi為0.11及以上版本,則需額外添加以下配置。

      --conf 'spark.sql.catalog.spark_catalog=org.apache.spark.sql.hudi.catalog.HoodieCatalog'
    2. 執行以下命令,驗證表資訊。

      SELECT * FROM testdb.hudi_tbl1;
  • Hudi查詢

    1. 執行以下命令,啟動Hive CLI。

      hive
    2. 執行以下命令,驗證表資訊。

      SELECT * FROM testdb.hudi_tbl1;