すべてのプロダクト
Search
ドキュメントセンター

E-MapReduce:DLF に基づいて Dataflow クラスタを使用して Hudi テーブルのデータを読み書きする

最終更新日:Mar 27, 2025

E-MapReduce (EMR) Dataflow クラスタを使用して、Data Lake Formation (DLF) の統合メタデータサービスに基づいて、DataLake クラスタまたはカスタムクラスタ内の Hudi テーブルにアクセスできます。 このトピックでは、EMR Dataflow クラスタを DLF に接続し、Hudi テーブルから完全データを読み取る方法について説明します。

前提条件

  • EMR コンソールで Dataflow クラスタと DataLake クラスタが作成されており、これらのクラスタは同じ VPC に属しています。 詳細については、「クラスタを作成する」をご参照ください。

    重要

    DataLake クラスタを作成する際に、メタデータパラメータに [DLF 統合メタデータ] を選択する必要があります。

  • DLF がアクティブ化されています。 詳細については、「はじめに」をご参照ください。

制限事項

Dataflow クラスタのバージョンは EMR V3.38.3 以降であり、EMR V3.50.x または EMR V5.16.x を超えることはできません。

手順

  1. 手順 1: 準備を行う

  2. 手順 2: Flink SQL を起動する

  3. 手順 3: カタログを作成する

  4. 手順 4: Flink SQL を使用して Hudi テーブルにデータを書き込む

  5. 手順 5: DataLake クラスタの Hudi テーブルからデータをクエリする

手順 1: 準備を行う

hive-site.xml 構成ファイルを、DataLake クラスターの ${HIVE_CONF_DIR} パラメーターで指定されたディレクトリから Dataflow クラスターにコピーします。

たとえば、${HIVE_CONF_DIR} パラメータで指定されたディレクトリは /etc/taihao-apps/hive-conf/ です。

mkdir /etc/taihao-apps/hive-conf
scp root@<Internal IP address of the master-1-1 node>:/etc/taihao-apps/hive-conf/hive-site.xml /etc/taihao-apps/hive-conf/  // master-1-1 ノードの内部 IP アドレス

手順 2: Flink SQL を起動する

重要
  • DLF の依存関係が Hive の依存関係の前に配置されていることを確認してください。 Hudi の依存関係は DLF の依存関係に含まれています。

  • DataLake クラスタにインストールされている Hive のバージョンを気にする必要はありません。 すべての Hive の依存関係はバージョン 2.3.6 を使用します。

  1. 次のコマンドを実行して、Flink YARN セッションを開始します。

    yarn-session.sh -d -qu default
  2. 次のコマンドを実行して、Flink SQL を起動します。

    sql-client.sh \
    -j /opt/apps/FLINK/flink-current/opt/catalogs/dlf/ververica-connector-dlf-1.15-vvr-6.0.4-SNAPSHOT-jar-with-dependencies.jar \
    -j /opt/apps/FLINK/flink-current/opt/catalogs/hive-2.3.6/ververica-connector-hive-2.3.6-1.15-vvr-6.0.4-SNAPSHOT-jar-with-dependencies.jar
    説明

    上記の JAR パッケージのバージョン番号は、実際の状況に合わせて置き換えてください。

  3. テスト中に次の構成を行います。

    -- 詳細なログ出力を有効にします。
    set sql-client.verbose=true;
    -- 結果の表示モードをテーブル形式に設定します。
    set sql-client.execution.result-mode=tableau;
    -- チェックポイント間隔を 1 秒に設定して、チェックポイントがトリガーされた後にのみデータが表示されるようにします。 この構成は、主に手順 4 でソースデータを生成するために使用されます。
    set execution.checkpointing.interval=1000;

手順 3: カタログを作成する

Flink SQL を起動した後、次のコマンドを実行して、Hudi テーブルからデータを読み取るための DLF カタログを作成します。

CREATE CATALOG dlf_catalog WITH (
     'type' = 'dlf',
     'access.key.id' = '<yourAccessKeyId>', -- Alibaba Cloud アカウントの AccessKey ID。
     'access.key.secret' = '<yourAccessKeySecret>', -- Alibaba Cloud アカウントの AccessKey シークレット。
     'warehouse' = 'oss://<bucket>/<object>', -- bucket: Object Storage Service (OSS) バケットの名前。 object: データが格納されているパス。 OSS コンソールで情報を表示できます。
     'oss.endpoint' = '<oss.endpoint>', -- ${HADOOP_CONF_DIR}/core-site.xml から fs.oss.endpoint の値を取得します。
     'dlf.endpoint' = '<dlf.endpoint>', -- /etc/taihao-apps/hive-conf/hive-site.xml から dlf.catalog.endpoint の値を取得します。
     'dlf.region-id' = '<dlf.region-id>' -- /etc/taihao-apps/hive-conf/hive-site.xml から dlf.catalog.region の値を取得します。
 );

カタログが作成されると、次の情報が返されます。

[INFO] Execute statement succeed.

手順 4: Flink SQL を使用して Hudi テーブルにデータを書き込む

Datagen コネクタを使用してソースデータをランダムに生成し、そのデータを Hudi テーブルに書き込みます。

-- ソースデータを生成します。
CREATE TABLE datagen_source (
  uuid int,
  age int,
  ts bigint
) WITH (
  'connector' = 'datagen',
  'rows-per-second' = '10'
);

-- Hudi データベースと Hudi テーブルを作成します。
CREATE database dlf_catalog.testdb;
CREATE TABLE dlf_catalog.testdb.hudi_tbl1(
  id int NOT NULL,
  age int,
  ts bigint
)
WITH(
  'connector'='hudi',
  'path' = 'oss://<bucket>/<object>/testdb/hudi_tbl1', -- oss://<bucket>/<object> は DLF カタログの作成時に指定されたウェアハウス、testdb は作成されたデータベースの名前、hudi_tbl1 は作成されたテーブルの名前です。
  'table.type'='COPY_ON_WRITE',
  'hoodie.datasource.write.recordkey.field'='id',
  'hive_sync.enable'='true',
  'hive_sync.table'='hudi_tbl1',    -- 必須。 Hive テーブルの名前。
  'hive_sync.db'='testdb',            -- 必須。 Hive データベースの名前。
  'hive_sync.mode' = 'hms'          -- 必須。 hive_sync.mode パラメータを hms に設定します。 デフォルト値は jdbc です。
);

-- データレイクにデータを書き込みます。
INSERT INTO dlf_catalog.testdb.hudi_tbl1
SELECT uuid AS id, age, ts
FROM default_catalog.default_database.datagen_source;

-- データをクエリします。
SELECT * FROM dlf_catalog.testdb.hudi_tbl1;

手順 5: DataLake クラスタの Hudi テーブルからデータをクエリする

DataLake クラスタにログインし、Hudi テーブルからデータをクエリします。 クラスタへのログイン方法の詳細については、「クラスタにログインする」をご参照ください。

  • Spark SQL を使用する

    詳細については、「Hudi と Spark SQL を統合する」をご参照ください。

    1. 次のコマンドを実行して、Spark SQL を起動します。

      spark-sql \
      --conf 'spark.serializer=org.apache.spark.serializer.KryoSerializer' \
      --conf 'spark.sql.extensions=org.apache.spark.sql.hudi.HoodieSparkSessionExtension'

      クラスタで Spark 3 を使用し、Hudi のバージョンが 0.11 以降の場合は、次の構成を追加する必要があります。

      --conf 'spark.sql.catalog.spark_catalog=org.apache.spark.sql.hudi.catalog.HoodieCatalog'
    2. 次のコマンドを実行して、テーブルからデータをクエリします。

      SELECT * FROM testdb.hudi_tbl1;
  • Hive CLI を使用する

    1. 次のコマンドを実行して、Hive CLI を起動します。

      hive
    2. 次のコマンドを実行して、テーブルからデータをクエリします。

      SELECT * FROM testdb.hudi_tbl1;