DataFlow叢集可以通過資料湖構建(DLF)的統一中繼資料服務,訪問DataLake叢集或自訂叢集中的Hudi表資料。本文為您介紹DataFlow叢集如何串連DLF並讀取Hudi全量資料。
前提條件
使用限制
DataFlow叢集版本需為EMR-3.38.3及以上,且不超過EMR-3.50.x或EMR-5.16.x。
操作流程
步驟一:環境準備
拷貝DataLake叢集中${HIVE_CONF_DIR}下的hive-site.xml到DataFlow叢集。
例如,${HIVE_CONF_DIR}為/etc/taihao-apps/hive-conf/。
mkdir /etc/taihao-apps/hive-conf
scp root@<master-1-1節點內網的IP地址>:/etc/taihao-apps/hive-conf/hive-site.xml /etc/taihao-apps/hive-conf/步驟二:啟動Flink SQL
重要
務必將DLF的依賴包放置在Hive依賴包的前面,其中DLF依賴包中嵌入了Hudi的依賴。
無需關注DataLake叢集中的Hive版本,Hive依賴均使用2.3.6版本的。
執行以下命令,啟動Flink YARN會話。
yarn-session.sh -d -qu default執行以下命令,啟動Flink SQL。
sql-client.sh \ -j /opt/apps/FLINK/flink-current/opt/catalogs/dlf/ververica-connector-dlf-1.15-vvr-6.0.4-SNAPSHOT-jar-with-dependencies.jar \ -j /opt/apps/FLINK/flink-current/opt/catalogs/hive-2.3.6/ververica-connector-hive-2.3.6-1.15-vvr-6.0.4-SNAPSHOT-jar-with-dependencies.jar說明請根據實際情況替換上述JAR包中的版本號碼。
測試時需設定以下配置。
-- 啟用詳細日誌輸出。 set sql-client.verbose=true; -- 設定結果顯示模式為表格形式。 set sql-client.execution.result-mode=tableau; -- 設定checkpoint時間間隔為1秒,確保資料在checkpoint觸發後才可見。主要用於步驟四中Source資料的產生。 set execution.checkpointing.interval=1000;
步驟三:建立Catalog
進入Flink SQL後,執行以下命令,建立DLF Catalog,用於讀取Hudi表。
CREATE CATALOG dlf_catalog WITH (
'type' = 'dlf',
'access.key.id' = '<yourAccessKeyId>', --您阿里雲帳號的AccessKey ID。
'access.key.secret' = '<yourAccessKeySecret>', --您阿里雲帳號的AccessKey Secret。
'warehouse' = 'oss://<bucket>/<object>', -- bucket:表示您建立的OSS Bucket名稱。object:表示您存放資料的路徑。您可以在OSS管理主控台上查看。
'oss.endpoint' = '<oss.endpoint>', --從${HADOOP_CONF_DIR}/core-site.xml中擷取fs.oss.endpoint的值。
'dlf.endpoint' = '<dlf.endpoint>', --從/etc/taihao-apps/hive-conf/hive-site.xml中擷取dlf.catalog.endpoint的值。
'dlf.region-id' = '<dlf.region-id>' --從/etc/taihao-apps/hive-conf/hive-site.xml中擷取dlf.catalog.region的值。
);Catalog建立成功後,會返回以下資訊。
[INFO] Execute statement succeed.步驟四:Flink SQL寫入Hudi
使用Datagen Connector隨機產生上遊Source資料,入湖Hudi表。
-- 構建上遊Source資料
CREATE TABLE datagen_source (
uuid int,
age int,
ts bigint
) WITH (
'connector' = 'datagen',
'rows-per-second' = '10'
);
-- 建立Hudi庫表
CREATE database dlf_catalog.testdb;
CREATE TABLE dlf_catalog.testdb.hudi_tbl1(
id int NOT NULL,
age int,
ts bigint
)
WITH(
'connector'='hudi',
'path' = 'oss://<bucket>/<object>/testdb/hudi_tbl1', -- oss://<bucket>/<object> 為建立dlf_catalog時的warehouse,testdb為建立的資料庫名,hudi_tbl1為表名。
'table.type'='COPY_ON_WRITE',
'hoodie.datasource.write.recordkey.field'='id',
'hive_sync.enable'='true',
'hive_sync.table'='hudi_tbl1', -- required, Hive建立的表名。
'hive_sync.db'='testdb', -- required, Hive建立的資料庫名。
'hive_sync.mode' = 'hms' -- required, 將hive sync mode設定為hms, 預設jdbc。
);
--入湖
INSERT INTO dlf_catalog.testdb.hudi_tbl1
SELECT uuid AS id, age, ts
FROM default_catalog.default_database.datagen_source;
-- 查詢驗證
SELECT * FROM dlf_catalog.testdb.hudi_tbl1;步驟五:DataLake叢集查詢Hudi
登入DataLake叢集查詢Hudi資料。登入叢集詳情請參見登入叢集。
Spark查詢
Spark查詢詳情,請參見Hudi與Spark SQL整合。
執行以下命令,啟動spark-sql。
spark-sql \ --conf 'spark.serializer=org.apache.spark.serializer.KryoSerializer' \ --conf 'spark.sql.extensions=org.apache.spark.sql.hudi.HoodieSparkSessionExtension'如果您叢集的Spark是Spark3,且Hudi為0.11及以上版本,則需額外添加以下配置。
--conf 'spark.sql.catalog.spark_catalog=org.apache.spark.sql.hudi.catalog.HoodieCatalog'執行以下命令,驗證表資訊。
SELECT * FROM testdb.hudi_tbl1;
Hudi查詢
執行以下命令,啟動Hive CLI。
hive執行以下命令,驗證表資訊。
SELECT * FROM testdb.hudi_tbl1;