E-MapReduce (EMR) Dataflow クラスタを使用して、Data Lake Formation (DLF) の統合メタデータサービスに基づいて、DataLake クラスタまたはカスタムクラスタ内の Hudi テーブルにアクセスできます。 このトピックでは、EMR Dataflow クラスタを DLF に接続し、Hudi テーブルから完全データを読み取る方法について説明します。
前提条件
制限事項
Dataflow クラスタのバージョンは EMR V3.38.3 以降であり、EMR V3.50.x または EMR V5.16.x を超えることはできません。
手順
手順 1: 準備を行う
hive-site.xml 構成ファイルを、DataLake クラスターの ${HIVE_CONF_DIR}
パラメーターで指定されたディレクトリから Dataflow クラスターにコピーします。
たとえば、${HIVE_CONF_DIR}
パラメータで指定されたディレクトリは /etc/taihao-apps/hive-conf/ です。
mkdir /etc/taihao-apps/hive-conf
scp root@<Internal IP address of the master-1-1 node>:/etc/taihao-apps/hive-conf/hive-site.xml /etc/taihao-apps/hive-conf/ // master-1-1 ノードの内部 IP アドレス
手順 2: Flink SQL を起動する
DLF の依存関係が Hive の依存関係の前に配置されていることを確認してください。 Hudi の依存関係は DLF の依存関係に含まれています。
DataLake クラスタにインストールされている Hive のバージョンを気にする必要はありません。 すべての Hive の依存関係はバージョン 2.3.6 を使用します。
次のコマンドを実行して、Flink YARN セッションを開始します。
yarn-session.sh -d -qu default
次のコマンドを実行して、Flink SQL を起動します。
sql-client.sh \ -j /opt/apps/FLINK/flink-current/opt/catalogs/dlf/ververica-connector-dlf-1.15-vvr-6.0.4-SNAPSHOT-jar-with-dependencies.jar \ -j /opt/apps/FLINK/flink-current/opt/catalogs/hive-2.3.6/ververica-connector-hive-2.3.6-1.15-vvr-6.0.4-SNAPSHOT-jar-with-dependencies.jar
説明上記の JAR パッケージのバージョン番号は、実際の状況に合わせて置き換えてください。
テスト中に次の構成を行います。
-- 詳細なログ出力を有効にします。 set sql-client.verbose=true; -- 結果の表示モードをテーブル形式に設定します。 set sql-client.execution.result-mode=tableau; -- チェックポイント間隔を 1 秒に設定して、チェックポイントがトリガーされた後にのみデータが表示されるようにします。 この構成は、主に手順 4 でソースデータを生成するために使用されます。 set execution.checkpointing.interval=1000;
手順 3: カタログを作成する
Flink SQL を起動した後、次のコマンドを実行して、Hudi テーブルからデータを読み取るための DLF カタログを作成します。
CREATE CATALOG dlf_catalog WITH (
'type' = 'dlf',
'access.key.id' = '<yourAccessKeyId>', -- Alibaba Cloud アカウントの AccessKey ID。
'access.key.secret' = '<yourAccessKeySecret>', -- Alibaba Cloud アカウントの AccessKey シークレット。
'warehouse' = 'oss://<bucket>/<object>', -- bucket: Object Storage Service (OSS) バケットの名前。 object: データが格納されているパス。 OSS コンソールで情報を表示できます。
'oss.endpoint' = '<oss.endpoint>', -- ${HADOOP_CONF_DIR}/core-site.xml から fs.oss.endpoint の値を取得します。
'dlf.endpoint' = '<dlf.endpoint>', -- /etc/taihao-apps/hive-conf/hive-site.xml から dlf.catalog.endpoint の値を取得します。
'dlf.region-id' = '<dlf.region-id>' -- /etc/taihao-apps/hive-conf/hive-site.xml から dlf.catalog.region の値を取得します。
);
カタログが作成されると、次の情報が返されます。
[INFO] Execute statement succeed.
手順 4: Flink SQL を使用して Hudi テーブルにデータを書き込む
Datagen コネクタを使用してソースデータをランダムに生成し、そのデータを Hudi テーブルに書き込みます。
-- ソースデータを生成します。
CREATE TABLE datagen_source (
uuid int,
age int,
ts bigint
) WITH (
'connector' = 'datagen',
'rows-per-second' = '10'
);
-- Hudi データベースと Hudi テーブルを作成します。
CREATE database dlf_catalog.testdb;
CREATE TABLE dlf_catalog.testdb.hudi_tbl1(
id int NOT NULL,
age int,
ts bigint
)
WITH(
'connector'='hudi',
'path' = 'oss://<bucket>/<object>/testdb/hudi_tbl1', -- oss://<bucket>/<object> は DLF カタログの作成時に指定されたウェアハウス、testdb は作成されたデータベースの名前、hudi_tbl1 は作成されたテーブルの名前です。
'table.type'='COPY_ON_WRITE',
'hoodie.datasource.write.recordkey.field'='id',
'hive_sync.enable'='true',
'hive_sync.table'='hudi_tbl1', -- 必須。 Hive テーブルの名前。
'hive_sync.db'='testdb', -- 必須。 Hive データベースの名前。
'hive_sync.mode' = 'hms' -- 必須。 hive_sync.mode パラメータを hms に設定します。 デフォルト値は jdbc です。
);
-- データレイクにデータを書き込みます。
INSERT INTO dlf_catalog.testdb.hudi_tbl1
SELECT uuid AS id, age, ts
FROM default_catalog.default_database.datagen_source;
-- データをクエリします。
SELECT * FROM dlf_catalog.testdb.hudi_tbl1;
手順 5: DataLake クラスタの Hudi テーブルからデータをクエリする
DataLake クラスタにログインし、Hudi テーブルからデータをクエリします。 クラスタへのログイン方法の詳細については、「クラスタにログインする」をご参照ください。
Spark SQL を使用する
詳細については、「Hudi と Spark SQL を統合する」をご参照ください。
次のコマンドを実行して、Spark SQL を起動します。
spark-sql \ --conf 'spark.serializer=org.apache.spark.serializer.KryoSerializer' \ --conf 'spark.sql.extensions=org.apache.spark.sql.hudi.HoodieSparkSessionExtension'
クラスタで Spark 3 を使用し、Hudi のバージョンが 0.11 以降の場合は、次の構成を追加する必要があります。
--conf 'spark.sql.catalog.spark_catalog=org.apache.spark.sql.hudi.catalog.HoodieCatalog'
次のコマンドを実行して、テーブルからデータをクエリします。
SELECT * FROM testdb.hudi_tbl1;
Hive CLI を使用する
次のコマンドを実行して、Hive CLI を起動します。
hive
次のコマンドを実行して、テーブルからデータをクエリします。
SELECT * FROM testdb.hudi_tbl1;