すべてのプロダクト
Search
ドキュメントセンター

Realtime Compute for Apache Flink:Hudi コネクタ (廃止予定)

最終更新日:Mar 08, 2025

このトピックでは、Hudi コネクタの使用方法について説明します。

重要

市場のエコシステムの発展とサービス戦略の調整により、公式の組み込み Hudi コネクタは、Realtime Compute for Apache Flink の今後の Ververica Runtime (VVR) バージョンではサポートされなくなります。カスタムコネクタを使用して、Realtime Compute for Apache Flink を Apache Hudi に接続できます。また、データ移行を実行し、Paimon コネクタを使用して、最適化された機能とパフォーマンスを得ることをお勧めします。

背景情報

Apache Hudi は、データレイクのテーブルデータを管理するオープンソースのフレームワークです。Apache Hudi は、Alibaba Cloud Object Storage Service (OSS) または Hadoop 分散ファイルシステム (HDFS) に基づいてファイルレイアウトを編成し、原子性、一貫性、独立性、耐久性 (ACID) を保証し、効率的な行レベルのデータ更新と削除をサポートします。これにより、ETL (抽出、変換、書き出し) の開発が簡素化されます。Apache Hudi は、指定されたファイルサイズを維持するために、小さなファイルの自動管理とマージもサポートしています。これにより、データの挿入と更新中に生成される過剰な小さなファイルによるクエリパフォーマンスの低下を防ぎ、小さなファイルを手動で監視およびマージする O&M ワークロードを排除できます。

項目

説明

テーブルタイプ

ソーステーブルと結果テーブル

実行モード

ストリーミングモードとバッチモード

データ形式

該当なし

メトリック

  • ソーステーブルのメトリック

    • numRecordsIn

    • numRecordsInPerSecond

  • 結果テーブルのメトリック:

    • numRecordsOut

    • numRecordsOutPerSecond

    • currentSendTime

説明

メトリックの詳細については、「メトリック」をご参照ください。

API タイプ

DataStream API と SQL API

シンクテーブルのデータ更新または削除

サポートされています

機能

項目

説明

コア機能

  • ACID セマンティクスをサポートしています。デフォルトでは、スナップショット分離が提供されます。

  • UPSERT セマンティクスをサポートしています。UPSERT セマンティクスは、INSERT セマンティクスと UPDATE セマンティクスの組み合わせです。フルマネージド Flink は、UPSERT セマンティクスを使用して、次のルールに基づいてテーブルにデータを書き込みます。テーブルに書き込むデータレコードがテーブルに存在しない場合、フルマネージド Flink はデータレコードをテーブルに挿入します。データレコードがテーブルに既に存在する場合、フルマネージド Flink はデータレコードを更新します。INSERT INTO ステートメントを使用すると、開発コードを大幅に簡素化し、データ処理の効率を向上させることができます。

  • タイムトラベル機能に基づいて、特定の時点におけるデータバージョンの履歴詳細を提供します。これにより、効率的な方法でデータ O&M を実行でき、データ品質が向上します。

一般的なシナリオ

  • データベースからデータレイクへのデータインジェスチョンの高速化

    オフラインモードで大量のデータを読み込んでマージするために使用される従来の方法と比較して、Hudi を使用すると、ストリーミングデータをリアルタイムで非常に大きなデータセットに、より費用対効果の高い方法で更新および書き込むことができます。リアルタイム ETL プロセスで、変更データキャプチャ (CDC) データをデータレイクに直接書き込み、ダウンストリームビジネスに使用できます。たとえば、フルマネージド Flink の MySQL CDC コネクタを使用して、MySQL などのリレーショナルデータベース管理システム (RDBMS) のバイナリログデータを Hudi テーブルに書き込むことができます。

  • 増分 ETL

    ETL の増分抽出方法を使用して、Hudi から変更データストリームをリアルタイムで取得できます。この方法は、オフライン ETL スケジューリングよりも優れたリアルタイムパフォーマンスを提供し、より軽量です。たとえば、オンラインビジネスデータをオフラインストレージシステムに増分的に抽出できます。この一般的なシナリオでは、Flink エンジンは抽出されたデータを Hudi テーブルに書き込み、次に Apache Presto または Apache Spark エンジンを使用して、効率的なオンライン分析処理 (OLAP) を実行します。

  • メッセージキューイング

    少量のデータのみを処理する必要があるシナリオでは、Hudi をメッセージキューサービスとして使用して Kafka を置き換えることもできます。これにより、アプリケーション開発アーキテクチャが簡素化されます。

  • データバックフィル

    テーブルの特定の行と列の履歴の完全データを更新する場合、データレイクを使用できます。これにより、計算リソースの消費が大幅に削減され、エンドツーエンドのパフォーマンスが向上します。たとえば、この機能を使用して、Hive メタストアの Hudi テーブルから完全データと増分データを読み取り、2 つのテーブルを結合してワイドテーブルを生成できます。

利点

オープンソースの Hudi コミュニティと比較して、フルマネージド Flink に統合された Hudi は、より多くの利点を提供します。フルマネージド Flink に統合された Hudi は、次の利点を提供します。

  • プラットフォームとフルマネージド Flink の統合に基づくメンテナンスフリー

    フルマネージド Flink は、組み込みの Hudi コネクタを提供します。これにより、O&M の複雑さが軽減され、SLA (サービスレベル契約) が保証されます。

  • データ接続性の向上

    Hudi コネクタは、複数の Alibaba Cloud ビッグデータコンピューティングおよび分析エンジンに接続されています。これにより、データはコンピューティングエンジンから切り離され、Apache Flink、Apache Spark、Apache Presto、および Apache Hive 間でシームレスに移行できます。

  • データベースからデータレイクへのデータインジェスチョンの最適化

    Hudi コネクタは Flink CDC コネクタと連携して、データ開発を簡素化します。

  • エンタープライズクラスの機能

    Data Lake Formation (DLF) の統合メタデータビューや、自動で軽量なテーブルスキーマの変更など、エンタープライズクラスの機能がサポートされています。

  • Alibaba Cloud OSS を使用した費用対効果の高いストレージと高いスケーラビリティ

    データは、Alibaba Cloud OSS に Apache Parquet または Apache Avro 形式で保存されます。ストレージとコンピューティングは分離されており、リソースは柔軟にスケーリングできます。

制限事項

  • エンジンバージョンが vvr-4.0.11-flink-1.13 以降の Realtime Compute for Apache Flink のみが Hudi コネクタをサポートしています。

  • HDFS、Alibaba Cloud OSS、または OSS-HDFS のみがファイルシステムとして使用できます。

  • セッションクラスタではドラフトを公開できません。

  • Hudi コネクタを使用してテーブルのフィールドを変更することはできません。テーブルのフィールドを変更する場合は、DLF コンソールで Spark SQL ステートメントを使用します。

構文

CREATE TEMPORARY TABLE hudi_tbl (
  uuid BIGINT,
  data STRING,
  ts   TIMESTAMP(3),
  PRIMARY KEY(uuid) NOT ENFORCED
) WITH (
  'connector' = 'hudi',
  'path' = 'oss://<yourOSSBucket>/<カスタムストレージディレクトリ>',
  ...
);

WITH 句のパラメータ

基本パラメータ

  • 共通パラメータ

    パラメータ

    説明

    データ型

    必須

    デフォルト値

    備考

    connector

    テーブルのタイプ。

    STRING

    はい

    デフォルト値なし

    値を hudi に設定します。

    path

    テーブルのストレージパス。

    STRING

    はい

    デフォルト値なし

    テーブルは、OSS バケット、HDFS、または OSS-HDFS に保存できます。

    • OSS: パスは oss://<bucket>/<user-defined-dir> 形式です。

    • HDFS: パスは hdfs://<user-defined-dir> 形式です。

    • OSS-HDFS: パスは oss://<bucket>.<oss-hdfs-endpoint>/<user-defined-dir> 形式です。

      説明

      VVR 8.0.3 以降を使用する Realtime Compute for Apache Flink でのみ、このパラメータを OSS-HDFS パスに設定できます。

    パスのパラメータ:

    • bucket: 作成した OSS バケットの名前。

    • user-defined-dir: データが保存されるパス。

    • oss-hdfs-endpoint: OSS-HDFS サービスのエンドポイント。

      OSS コンソールの OSS バケットの [概要] ページの [ポート] セクションで、[HDFS][エンドポイント] を表示できます。

    hoodie.datasource.write.recordkey.field

    プライマリキーフィールド。

    STRING

    いいえ

    uuid

    • PRIMARY KEY 構文を使用してプライマリキーフィールドを指定できます。

    • 複数のフィールドはコンマ (,) で区切ります。

    precombine.field

    バージョンフィールド。

    STRING

    いいえ

    ts

    このフィールドは、メッセージを更新する必要があるかどうかを判断するために使用されます。

    このパラメータを構成しない場合、システムはエンジンで定義されたメッセージシーケンスに基づいてデータを更新します。

    oss.endpoint

    Alibaba Cloud OSS または OSS-HDFS のエンドポイント。

    STRING

    いいえ

    デフォルト値なし

    テーブルを OSS または OSS-HDFS に保存する場合は、このパラメータを指定する必要があります。

    • テーブルを OSS に保存する場合は、このパラメーターを OSS のエンドポイントに設定します。 OSS のエンドポイントの詳細については、「リージョンとエンドポイント」をご参照ください。

    • テーブルを OSS-HDFS に保存する場合は、このパラメーターを OSS-HDFS のエンドポイントに設定します。 OSS コンソールの OSS バケットの [概要] ページの [ポート] セクションで、HDFS[エンドポイント] を表示できます。

    accessKeyId

    Alibaba Cloud アカウントの AccessKey ID。

    STRING

    いいえ

    デフォルト値なし

    テーブルを OSS または OSS-HDFS に保存する場合は、このパラメーターを指定する必要があります。

    詳細については、「コンソール操作」をご参照ください。

    重要

    AccessKey ペアを保護するために、変数を使用して AccessKey ID と AccessKey シークレットを設定することをお勧めします。詳細については、「変数の管理」をご参照ください。

    accessKeySecret

    Alibaba Cloud アカウントの AccessKey シークレット。

    STRING

    いいえ

    デフォルト値なし

  • ソーステーブル専用の パラメーター

    パラメーター

    説明

    データ型

    必須

    デフォルト値

    備考

    read.streaming.enabled

    ストリーミング読み取りを有効にするかどうかを指定します。

    BOOLEAN

    いいえ

    false

    有効な値:

    • true: ストリーミング読み取りが有効です。

    • false: ストリーミング読み取りは無効です。

    read.start-commit

    データ消費の開始 オフセット です。

    STRING

    いいえ

    Empty

    有効な値:

    • yyyyMMddHHmmss フォーマット の 時間 : 指定された 時間 からデータが消費されます。

    • earliest: 最も古い オフセット からデータが消費されます。

    • この パラメーター が Empty の場合、最新の 時間 からデータが消費されます。

  • シンクテーブル専用の パラメーター

    パラメーター

    説明

    データ型

    必須

    デフォルト値

    備考

    write.operation

    書き込み 操作 が実行される モード。

    STRING

    いいえ

    UPSERT

    有効な値:

    • insert: データは追加モードでテーブルに書き込まれます。

    • upsert: データが更新されます。

    • bulk_insert: バッチデータは追加モードでテーブルに書き込まれます。

    hive_sync.enable

    Hiveへのメタデータ同期を有効にするかどうかを指定します。

    BOOLEAN

    いいえ

    false

    有効な値:

    • true: Hiveへのメタデータ同期が有効になります。

    • false: Hiveへのメタデータ同期が無効になります。

    hive_sync.mode

    Hiveデータが同期される モード。

    STRING

    いいえ

    hms

    有効な値:

    • hms: HiveメタストアまたはDLFにメタデータを同期する場合は、この パラメーター をhmsに設定します。

    • jdbc: Java Database Connectivity ( JDBC ) ドライバーにメタデータを同期する場合は、この パラメーター をjdbcに設定します。

    hive_sync.db

    データが同期されるHive データベース の名前。

    STRING

    いいえ

    default

    該当なし

    hive_sync.table

    データが同期されるHiveテーブルの名前。

    STRING

    いいえ

    現在のテーブルの名前

    Hudiからデータが同期されるHiveテーブルの名前には、ハイフン(-)を含めることはできません。

    dlf.catalog.region

    DLF サービス がアクティブ化されている リージョン の名前。

    STRING

    いいえ

    デフォルト値なし

    詳細については、「サポートされている リージョン と エンドポイント」をご参照ください。

    説明
    • dlf.catalog.region パラメーター は、hive_sync.mode パラメーター がhmsに設定されている場合にのみ有効になります。

    • この パラメーター の値が、dlf.catalog.endpoint パラメーター で指定された エンドポイント と一致していることを確認してください。

    dlf.catalog.endpoint

    DLFの エンドポイント。

    STRING

    いいえ

    デフォルト値なし

    詳細については、「サポートされている リージョン と エンドポイント」をご参照ください。

    説明
    • dlf.catalog.endpoint パラメーター は、hive_sync.mode パラメーター がhmsに設定されている場合にのみ有効になります。

    • dlf.catalog.endpoint パラメーター をDLFのVirtual Private Cloud ( VPC ) エンドポイント に設定することをお勧めします。たとえば、中国 (杭州) リージョン を選択した場合は、この パラメーター を dlf-vpc.cn-hangzhou.aliyuncs.com に設定します。

    • VPC間でDLFに アクセス する場合は、「Realtime Compute for Apache FlinkはVPC間で サービス にどのように アクセス しますか。」で説明されている手順に従ってください。

詳細パラメーター

Hudi は、さまざまな読み取りおよび書き込みシナリオをサポートしています。次の表は、さまざまなシナリオで設定できるパラメーターについて説明しています。

並列処理のパラメーター

パラメーター

説明

デフォルト値

備考

write.tasks

書き込みタスクの並列度。各書き込みタスクは、1 ~ N 個のバケットに順番にデータを書き込みます。

4

書き込みタスクの並列処理を増やしても、小さなファイルの数は変わりません

write.bucket_assign.tasks

バケットアサイナーオペレーターの並列度。

Realtime Compute for Apache Flink デプロイの並列度

書き込みタスクの並列度とバケットアサイナーオペレーターの並列度を上げると、小さなファイルの数が増加します。

write.index_bootstrap.tasks

インデックスブートストラップオペレーターの並列度。

Realtime Compute for Apache Flink デプロイの並列度

  • このパラメーターは、index.bootstrap.enabled パラメーターが true に設定されている場合にのみ有効になります。

  • 並列度を上げると、ブートストラップステージの効率が向上します。ブートストラップステージでは、チェックポイントがブロックされる可能性があります。この問題を解決するには、チェックポイントの障害許容回数を大きな値に設定します。

read.tasks

ストリーミングおよびバッチ読み取りオペレーターの並列度。

4

該当なし

compaction.tasks

オンラインコンパクションオペレーターの並列度。

4

オンラインコンパクションは、オフラインコンパクションよりも多くのリソースを消費します。オフラインコンパクションを実行することをお勧めします。

オンラインコンパクションのパラメーター

パラメーター

説明

デフォルト値

備考

compaction.schedule.enabled

コンパクションプランをスケジュールに従って生成するかどうかを指定します。

true

有効な値:

  • true: コンパクションプランはスケジュールに従って生成されます。

  • false: コンパクションプランはスケジュールに従って生成されません。

説明

compaction.async.enabled パラメーターが false に設定されている場合でも、このパラメーターを true に設定することをお勧めします。

compaction.async.enabled

非同期コンパクションを有効にするかどうかを指定します。

true

有効な値:

  • true: 非同期コンパクションが有効になります。

  • false: 非同期コンパクションが無効になります。

説明

このパラメーターを false に設定して、オンラインコンパクションを無効にすることができます。ただし、compaction.schedule.enabled パラメーターを true に設定することをお勧めします。この場合、スケジュールに従って生成されたコンパクションプランを実行するために、オフライン非同期コンパクションを実行できます。

compaction.tasks

コンパクションタスクの並列度。

4

該当なし

compaction.trigger.strategy

コンパクションをトリガーするために使用される戦略。

num_commits

有効な値:

  • num_commits: コミット数が指定された値に達すると、コンパクションがトリガーされます。

  • time_elapsed: コンパクションは指定された間隔でトリガーされます。

  • num_and_time: コミット数と間隔の両方が指定された値に達すると、コンパクションがトリガーされます。

  • num_or_time: コミット数または間隔が指定された値に達すると、コンパクションがトリガーされます。

compaction.delta_commits

コンパクションをトリガーするために必要なコミットの最大数。

5

該当なし

compaction.delta_seconds

コンパクションがトリガーされる間隔。

3600

単位: 秒。

compaction.max_memory

コンパクションおよび重複排除に使用されるハッシュマップの最大メモリ。

100 MB

リソースが十分にある場合は、このパラメーターの値を 1 GB に変更することをお勧めします。

compaction.target_io

各コンパクションプランの最大 I/O スループット。

500 GB

該当なし

ファイル関連のパラメーター

ファイル関連のパラメーターは、ファイルサイズを管理するために使用されます。次の表に、サポートされているパラメーターを示します。

パラメーター

説明

デフォルト値

備考

hoodie.parquet.max.file.size

データを書き込むことができる Parquet ファイルの最大サイズ。

Parquet ファイルに書き込まれるデータがこのパラメーターで指定されたサイズを超える場合、超過データは新しいファイルグループに書き込まれます。

120 * 1024 * 1024 バイト

(120 MB)

単位: バイト。

hoodie.parquet.small.file.limit

小さなファイルのファイルサイズのしきい値。サイズがこのパラメーターの値未満のファイルは、小さなファイルと見なされます。

104857600 バイト (100 MB)

  • 単位: バイト。

  • データの書き込み中に、Hudi は新しいファイルにデータを書き込む代わりに、既存の小さなファイルに追記モードでデータを書き込もうとします。

hoodie.copyonwrite.record.size.estimate

レコードサイズの推定値。

1024 バイト (1 KB)

  • 単位: バイト。

  • このパラメーターを指定しない場合、Hudi はコミットされたメタデータに基づいてレコードサイズを動的に計算します。

Hadoop 関連のパラメーター

パラメーター

説明

デフォルト値

備考

hadoop.${you option key}

hadoop. プレフィックスを使用して指定された Hadoop 設定項目。

デフォルト値なし

複数の Hadoop 設定項目を同時に指定できます。

説明

このパラメーターは、Hudi 0.12.0 以降でサポートされています。クラスタ間のコミットと実行の要件を満たすために、DDL 文を使用してジョブごとの Hadoop 構成を指定できます。

データ書き込みのパラメーター

Hudi は、バッチ書き込みやストリーミング書き込みなど、さまざまな書き込み方法をサポートしています。Hudi は、変更ログデータやログデータなど、さまざまなデータ型をサポートしています。Hudi はまた、異なるインデックススキームもサポートしています。

  • バッチ書き込みのパラメーター

    他のデータソースから Hudi に既存のデータをインポートする場合、バッチインポート機能を使用して既存のデータを Hudi テーブルにインポートできます。

    パラメーター

    説明

    デフォルト値

    備考

    write.operation

    書き込み操作のタイプ。

    upsert

    有効な値:

    • upsert: データが挿入および更新されます。

    • insert: データが挿入されます。

    • bulk_insert: データはバッチで書き込まれます。

      説明
      • このパラメーターを bulk_insert に設定すると、Avro ベースのデータシリアル化とデータコンパクションは実行されません。データのインポート後に重複排除は実行されません。データの一意性に関する要件が高い場合は、インポートされたデータが一意であることを確認してください。

      • write.operation パラメーターを bulk_insert に設定できるのは、バッチ実行モードの場合のみです。このモードでは、システムは入力データをパーティション名でソートし、デフォルトで Hudi テーブルにデータを書き込みます。このようにして、書き込み操作は異なるファイル間で頻繁に切り替えられず、システムパフォーマンスが低下しません。

    write.tasks

    bulk_insert 書き込みタスクの並列度。

    Realtime Compute for Apache Flink デプロイの並列度

    bulk_insert 書き込みタスクの並列度は、write.tasks パラメーターで指定され、小さなファイルの数に影響します。

    理論的には、bulk_insert 書き込みタスクの並列度はバケットの数と同じです。各バケットのファイルに書き込まれるデータがファイルの最大サイズに達すると、データは新しいファイルハンドルにロールオーバーされます。したがって、書き込みファイルの最終的な数は、bulk_insert 書き込みタスクの並列度以上になります。Parquet ファイルの最大サイズは 120 MB です。

    write.bulk_insert.shuffle_input

    バッチ挿入タスクのパーティションフィールドに基づいて入力データをシャッフルするかどうかを指定します。

    true

    Hudi 0.11.0 以降では、このパラメーターを true に設定して、小さなファイルの数を減らすことができます。ただし、これによりデータスキューが発生する可能性があります。

    write.bulk_insert.sort_input

    バッチ挿入タスクのパーティションフィールドに基づいて入力データをソートするかどうかを指定します。

    true

    Hudi 0.11.0 以降では、複数のパーティションにデータを書き込む書き込みタスクを実行する場合、このパラメーターを true に設定して、小さなファイルの数を減らすことができます。.

    write.sort.memory

    ソートオペレーターの使用可能な管理対象メモリ。

    128

    単位: MB。

  • 変更ログモードを有効にするパラメーター

    変更ログモードでは、Hudi はメッセージのすべての変更 (INSERT、UPDATE_BEFORE、UPDATE_AFTER、および DELETE) を保持し、Flink エンジンのステートフルコンピューティングと連携して、エンドツーエンドのほぼリアルタイムのデータウェアハウスを実装します。Hudi の Merge on Read (MOR) テーブルは、行指向ストレージに基づくメッセージのすべての変更の保持をサポートしています。フルマネージド Flink は、ストリーミングモードで MOR テーブルを読み取って、すべての変更レコードを消費できます。

    説明

    変更ログモード以外では、単一のストリーミング読み取りのバッチデータセットの中間変更をマージできます。すべての中間結果は、バッチ読み取り (スナップショット読み取り) でマージされます。中間状態は、書き込まれているかどうかに関係なく無視されます。

    パラメーター

    説明

    デフォルト値

    備考

    changelog.enabled

    すべての変更を消費するかどうかを指定します。

    false

    有効な値:

    • true: すべての変更を消費できます。

    • false: すべての変更を消費できるわけではありません。UPSERT セマンティクスが使用されます。すべてのメッセージの中で最後にマージされたメッセージのみが保証され、中間変更はマージされる可能性があります。

    説明

    changelog.enabled パラメーターを true に設定した後も、非同期コンパクションタスクは中間変更を 1 つのデータレコードにマージします。ストリーミング読み取り中にデータが最も早い機会に消費されない場合、データのコンパクション後に最後のデータレコードのみを読み取ることができます。データコンパクションのバッファー時間を変更して、リーダーがデータを読み取って消費するための特定の時間を確保できます。たとえば、compaction.delta_commits パラメーターを 5 に、compaction.delta_seconds パラメーターを 3600 に設定できます。

  • 追記モード (Hudi 0.10.0 以降でサポート)

    追記モードでは:

    • 小さなファイルポリシーは MOR テーブルに適用されます。データは追記モードで Avro ログファイルに書き込まれます。

    • 小さなファイルポリシーは COW テーブルには適用されません。Copy on Write (COW) テーブルにデータが書き込まれるたびに、新しい Parquet ファイルが生成されます。

クラスタリングポリシーのパラメーター

Hudi は、INSERT モードでの小さなファイルの問題を解決するために、さまざまなクラスタリングポリシーをサポートしています。

  • インラインクラスタリングのパラメーター (COW テーブルでのみサポート)

    パラメーター

    説明

    デフォルト値

    備考

    write.insert.cluster

    データの書き込み中に小さなファイルをマージするかどうかを指定します。

    false

    有効な値:

    • true: データの書き込み中に小さなファイルがマージされます。

    • false: データの書き込み中に小さなファイルはマージされません。

    説明

    デフォルトでは、COW テーブルで INSERT 操作が実行されると、小さなファイルはマージされません。このパラメーターを true に設定すると、INSERT 操作が実行されるたびに既存の小さなファイルがマージされます。ただし、重複排除は実行されません。その結果、書き込みスループットが低下します。

  • 非同期クラスタリングのパラメーター (Hudi 0.12.0 以降でサポート)

    パラメーター

    説明

    デフォルト値

    備考

    clustering.schedule.enabled

    データの書き込み中にクラスタリングプランをスケジュールするかどうかを指定します。

    false

    このパラメーターを true に設定すると、クラスタリングプランは定期的にスケジュールされます。

    clustering.delta_commits

    クラスタリングプランを生成するために必要なコミットの数。

    4

    clustering.schedule.enabled パラメーターが true に設定されている場合、このパラメーターが有効になります。

    clustering.async.enabled

    クラスタリングプランを非同期で実行するかどうかを指定します。

    false

    このパラメーターを true に設定すると、クラスタリングプランは小さなファイルをマージするために定期的に非同期で実行されます。

    clustering.tasks

    クラスタリングタスクの並列度。

    4

    該当なし

    clustering.plan.strategy.target.file.max.bytes

    クラスタリングのファイルの最大サイズ。

    1024 * 1024 * 1024

    単位: バイト。

    clustering.plan.strategy.small.file.limit

    クラスタリングに使用される小さなファイルのファイルサイズのしきい値。

    600

    サイズがこのパラメーターの値未満のファイルのみをクラスタリングに使用できます。

    clustering.plan.strategy.sort.columns

    クラスタリングのためにデータをソートする基準となる列。

    デフォルト値なし

    特別なソートフィールドを指定できます。

  • クラスタリングプラン戦略のパラメーター

    パラメーター

    説明

    デフォルト値

    備考

    clustering.plan.partition.filter.mode

    クラスタリングプランの作成に使用されるパーティションフィルターモード。

    NONE

    有効な値:

    • NONE: パーティションはフィルタリングされません。すべてのパーティションがクラスタリングに選択されます。

    • RECENT_DAYS: データが日単位でパーティション分割されている場合、指定された日数の最近のパーティションがクラスタリングに選択されます。

    • SELECTED_PARTITIONS: 指定されたパーティションがクラスタリングに選択されます。

    clustering.plan.strategy.daybased.lookback.partitions

    clustering.plan.partition.filter.mode パラメーターが RECENT_DAYS に設定されている場合に、パーティションをクラスタリングに選択する基準となる日数。

    2

    このパラメーターは、clustering.plan.partition.filter.mode パラメーターが RECENT_DAYS に設定されている場合にのみ有効になります。

    clustering.plan.strategy.cluster.begin.partition

    パーティションをフィルタリングするために使用される開始パーティション。

    デフォルト値なし

    このパラメーターは、clustering.plan.partition.filter.mode パラメーターが SELECTED_PARTITIONS に設定されている場合にのみ有効になります。

    clustering.plan.strategy.cluster.end.partition

    パーティションをフィルタリングするために使用される終了パーティション。

    デフォルト値なし

    このパラメーターは、clustering.plan.partition.filter.mode パラメーターが SELECTED_PARTITIONS に設定されている場合にのみ有効になります。

    clustering.plan.strategy.partition.regex.pattern

    パーティションを指定するために使用される正規表現。

    デフォルト値なし

    該当なし

    clustering.plan.strategy.partition.selected

    選択されたパーティション。

    デフォルト値なし

    複数のパーティションはコンマ (,) で区切ります。

  • バケットインデックスタイプに関連するパラメーター

    説明

    次の表のパラメーターは、Hudi 0.11.0 以降でサポートされています。

    パラメーター

    説明

    デフォルト値

    備考

    index.type

    インデックスのタイプ。

    FLINK_STATE

    有効な値:

    • FLINK_STATE: Flink 状態インデックスタイプが使用されます。

    • BUCKET: バケットインデックスタイプが使用されます。

      index.type=bucket を構成する場合、バケットインデックスタイプはパーティション間の変更をサポートしていないため、index.global.enabled パラメーターを true に設定することは無効です。この場合、グローバルインデックス機能が有効になっていても、複数のパーティションで重複排除を実行することはできません。

    テーブル内のデータレコードが 5 億件を超えるなど、データ量が大きい場合、Flink 状態のストレージオーバーヘッドがボトルネックになる可能性があります。バケットインデックスタイプは、固定ハッシュポリシーを使用して、同じキーを含むデータを同じファイルグループに割り当てます。これは、インデックスのストレージとクエリのオーバーヘッドを防ぐのに役立ちます。バケットインデックスタイプと Flink 状態インデックスタイプには、次の違いがあります。

    • Flink 状態インデックスタイプと比較して、バケットインデックスタイプにはストレージとコンピューティングのオーバーヘッドがありません。バケットインデックスタイプは、Flink 状態インデックスタイプよりも優れたパフォーマンスを提供します。

    • バケットインデックスタイプを使用する場合、バケットの数を増やすことはできません。Flink 状態インデックスタイプを使用する場合、ファイルサイズに基づいてファイルの数を動的に増やすことができます。

    • バケットインデックスタイプは、パーティション間の変更をサポートしていません。データがパーティションに書き込まれると、バケットインデックスは変更されません。たとえば、削除操作は現在のパーティションのデータにのみ影響し、他のパーティションの同じデータには影響しません。

      説明

      入力データが CDC ストリーミングデータの場合、この制限は適用されません。

    hoodie.bucket.index.hash.field

    バケットインデックスタイプを使用する場合のハッシュキーフィールド。

    プライマリキー

    このパラメーターは、プライマリキーのサブセットに設定できます。

    hoodie.bucket.index.num.buckets

    バケットインデックスタイプを使用する場合のバケットの数。

    4

    デフォルトでは、このパラメーターの値は各パーティションのバケットの数です。構成後、このパラメーターの値を変更することはできません。

データ読み取りのパラメーター

  • Hudi は、バッチ読み取り、ストリーミング読み取り、増分データ読み取りなど、さまざまな読み取り方法をサポートしています。Hudi はまた、変更ログを消費および転送して、エンドツーエンドの増分 ETL を実装することもできます。

    • ストリーミング読み取りのパラメーター

      デフォルトでは、テーブルにはスナップショット読み取りが使用されます。この場合、最新の完全スナップショットデータが一度に読み取られ、返されます。read.streaming.enabled パラメーターを true に設定して、ストリーミング読み取りを有効にすることができます。read.start-commit パラメーターを構成して、ストリーミング読み取りの開始オフセットを指定できます。read.start-commit パラメーターを earliest に設定して、最も早いオフセットからデータを消費できるようにすることができます。

      パラメーター

      説明

      デフォルト値

      備考

      read.streaming.enabled

      ストリーミング読み取りを有効にするかどうかを指定します。

      false

      有効な値:

      • true: ストリーミング読み取りが有効になります。

      • false: ストリーミング読み取りが無効になります。

      read.start-commit

      ストリーミング読み取りの開始オフセット。

      空のまま

      有効な値:

      • yyyyMMddHHmmss 形式の時間: 指定された時間からデータが消費されます。

      • earliest: データは最も早いオフセットから消費されます。

      • このパラメーターを空のままにすると、データは最新の時間から消費されます。

      clean.retain_commits

      クリーナーによって保持できる履歴コミットの最大数。

      30

      履歴コミットの数がこのパラメーターの値を超えると、超過した履歴コミットは削除されます。変更ログモードでは、このパラメーターを使用して、変更ログを保持する期間を制御できます。たとえば、チェックポイント期間が 5 分の場合、変更ログはデフォルトで少なくとも 150 分間保持されます。

      重要
      • 変更ログのストリーミング読み取りは、Hudi 0.10.0 以降でのみサポートされています。変更ログモードでは、Hudi はダウンストリームコンシューマーが消費するために変更ログを一定期間保持します。

      • 変更ログはコンパクションタスクでマージされる場合があります。この場合、中間レコードが削除され、計算結果に影響を与える可能性があります。

    • 増分データ読み取りのパラメーター (Hudi 0.10.0 以降でサポート)

      フルマネージド Flink は、DataStream コネクタを使用した増分消費、バッチ増分消費、およびタイムトラベル機能を使用した特定の時点でのデータのバッチ消費をサポートしています。

      パラメーター

      説明

      デフォルト値

      備考

      read.start-commit

      データ消費の開始オフセット。

      最新のオフセットからのコミット

      これらのパラメーターの値は yyyyMMddHHmmss 形式です。

      間隔は閉区間であり、開始オフセットと終了オフセットが含まれます。

      read.end-commit

      データ消費の終了オフセット。

      最新のオフセットからのコミット

サンプルコード

  • ソーステーブルのサンプルコード

CREATE TEMPORARY TABLE blackhole (
  id INT NOT NULL PRIMARY KEY NOT ENFORCED,
  data STRING,
  ts TIMESTAMP(3)
) WITH (
  'connector' = 'blackhole'      
);

CREATE TEMPORARY TABLE hudi_tbl (
  id INT NOT NULL PRIMARY KEY NOT ENFORCED,
  data STRING,
  ts TIMESTAMP(3)
) WITH (
  'connector' = 'hudi', 
  'oss.endpoint' = '<yourOSSEndpoint>', 
  'accessKeyId' = '${secret_values.ak_id}', 
  'accessKeySecret' = '${secret_values.ak_secret}', 
  'path' = 'oss://<yourOSSBucket>/<Custom storage directory>',
  'table.type' = 'MERGE_ON_READ',
  'read.streaming.enabled' = 'true'
);

-- ストリーミングモードで最新のコミットからデータを読み取り、Blackhole に書き込みます。
INSERT INTO blackhole SELECT * from hudi_tbl;
  • シンクテーブルのサンプルコード

CREATE TEMPORARY TABLE datagen(
  id INT NOT NULL PRIMARY KEY NOT ENFORCED,
  data  STRING,
  ts TIMESTAMP(3)
) WITH (
  'connector' = 'datagen' ,
  'rows-per-second'='100' 
);

CREATE TEMPORARY TABLE hudi_tbl (
  id INT NOT NULL PRIMARY KEY NOT ENFORCED,
  data STRING,
  ts TIMESTAMP(3)
) WITH (
  'connector' = 'hudi', 
  'oss.endpoint' = '<yourOSSEndpoint>', 
  'accessKeyId' = '${secret_values.ak_id}', 
  'accessKeySecret' = '${secret_values.ak_secret}', 
  'path' = 'oss://<yourOSSBucket>/<Custom storage directory>',
  'table.type' = 'MERGE_ON_READ'
);

INSERT INTO hudi_tbl SELECT * from datagen;

DataStream API

重要

DataStream API を呼び出してデータの読み取りまたは書き込みを行うには、関連タイプの DataStream コネクタを使用して Realtime Compute for Apache Flink に接続する必要があります。 DataStream コネクタの構成方法の詳細については、「DataStream コネクタの設定」をご参照ください。

  • maven pom

    使用されている VVR バージョンに基づいて、Realtime Compute for Apache Flink と Hudi のバージョンを指定します。

    <properties>
      <maven.compiler.source>8</maven.compiler.source>
      <maven.compiler.target>8</maven.compiler.target>
      <flink.version>1.15.4</flink.version>
      <hudi.version>0.13.1</hudi.version>
    </properties>
    
    <dependencies>
      <!-- flink -->
      <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-streaming-java</artifactId>
        <version>${flink.version}</version>
        <scope>provided</scope>
      </dependency>
      <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-table-common</artifactId>
        <version>${flink.version}</version>
        <scope>provided</scope>
      </dependency>
      <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-table-api-java-bridge</artifactId>
        <version>${flink.version}</version>
        <scope>provided</scope>
      </dependency>
      <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-table-planner_2.12</artifactId>
        <version>${flink.version}</version>
        <scope>provided</scope>
      </dependency>
      <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-clients</artifactId>
        <version>${flink.version}</version>
        <scope>provided</scope>
      </dependency>
    
      <!-- hudi -->
      <dependency>
        <groupId>org.apache.hudi</groupId>
        <artifactId>hudi-flink1.15-bundle</artifactId>
        <version>${hudi.version}</version>
        <scope>provided</scope>
      </dependency>
    
      <!-- oss -->
      <dependency>
        <groupId>org.apache.hadoop</groupId>
        <artifactId>hadoop-common</artifactId>
        <version>3.3.2</version>
        <scope>provided</scope>
      </dependency>
      <dependency>
        <groupId>org.apache.hadoop</groupId>
        <artifactId>hadoop-aliyun</artifactId>
        <version>3.3.2</version>
        <scope>provided</scope>
      </dependency>
    
      <!-- dlf -->
      <dependency>
        <groupId>com.aliyun.datalake</groupId>
        <artifactId>metastore-client-hive2</artifactId>
        <version>0.2.14</version>
        <scope>provided</scope>
      </dependency>
      <dependency>
        <groupId>org.apache.hadoop</groupId>
        <artifactId>hadoop-mapreduce-client-core</artifactId>
        <version>2.5.1</version>
        <scope>provided</scope>
      </dependency>
    </dependencies>
    重要

    DLF で使用される特定の依存関係は、hive-commonhive-exec などのオープンソースの Hive バージョンと競合します。オンプレミス環境で DLF をテストする場合は、hive-common および hive-exec JAR パッケージをダウンロードし、IntelliJ IDEA に手動でインポートできます。

  • Hudi にデータを書き込む

    import org.apache.flink.streaming.api.datastream.DataStream;
    import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    import org.apache.flink.table.data.GenericRowData;
    import org.apache.flink.table.data.RowData;
    import org.apache.flink.table.data.StringData;
    import org.apache.hudi.common.model.HoodieTableType;
    import org.apache.hudi.configuration.FlinkOptions;
    import org.apache.hudi.util.HoodiePipeline;
    
    import java.util.HashMap;
    import java.util.Map;
    
    public class FlinkHudiQuickStart {
    
      public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    
        String dbName = "test_db";
        String tableName = "test_tbl";
        String basePath = "oss://xxx";
    
        Map<String, String> options = new HashMap<>();
        // hudi conf
        options.put(FlinkOptions.PATH.key(), basePath);
        options.put(FlinkOptions.TABLE_TYPE.key(), HoodieTableType.MERGE_ON_READ.name());
        options.put(FlinkOptions.PRECOMBINE_FIELD.key(), "ts");
        options.put(FlinkOptions.DATABASE_NAME.key(), dbName);
        options.put(FlinkOptions.TABLE_NAME.key(), tableName);
        // oss conf
        options.put("hadoop.fs.oss.accessKeyId", "xxx");
        options.put("hadoop.fs.oss.accessKeySecret", "xxx");
        // ローカルデバッグの場合はパブリックエンドポイント (例: oss-cn-hangzhou.aliyuncs.com) を使用します。クラスタ送信の場合は内部エンドポイント (例: oss-cn-hangzhou-internal.aliyuncs.com) を使用します。
        options.put("hadoop.fs.oss.endpoint", "xxx");
        options.put("hadoop.fs.AbstractFileSystem.oss.impl", "org.apache.hadoop.fs.aliyun.oss.OSS");
        options.put("hadoop.fs.oss.impl", "org.apache.hadoop.fs.aliyun.oss.AliyunOSSFileSystem");
        // dlf conf
        options.put(FlinkOptions.HIVE_SYNC_ENABLED.key(), "true"); // Hudi データを DLF に同期するかどうかを決定できます。
        options.put(FlinkOptions.HIVE_SYNC_MODE.key(), "hms");
        options.put(FlinkOptions.HIVE_SYNC_DB.key(), dbName);
        options.put(FlinkOptions.HIVE_SYNC_TABLE.key(), tableName);
        options.put("hadoop.dlf.catalog.id", "xxx");
        options.put("hadoop.dlf.catalog.accessKeyId", "xxx");
        options.put("hadoop.dlf.catalog.accessKeySecret", "xxx");
        options.put("hadoop.dlf.catalog.region", "xxx");
        // ローカルデバッグの場合はパブリックエンドポイント (例: dlf.cn-hangzhou.aliyuncs.com) を使用します。クラスタ送信の場合は内部エンドポイント (例: dlf-vpc.cn-hangzhou.aliyuncs.com) を使用します。
        options.put("hadoop.dlf.catalog.endpoint", "xxx");
        options.put("hadoop.hive.imetastoreclient.factory.class", "com.aliyun.datalake.metastore.hive2.DlfMetaStoreClientFactory");
    
        DataStream<RowData> dataStream = env.fromElements(
            GenericRowData.of(StringData.fromString("id1"), StringData.fromString("name1"), 22,
                StringData.fromString("1001"), StringData.fromString("p1")),
            GenericRowData.of(StringData.fromString("id2"), StringData.fromString("name2"), 32,
                StringData.fromString("1002"), StringData.fromString("p2"))
        );
    
        HoodiePipeline.Builder builder = HoodiePipeline.builder(tableName)
            .column("uuid string")
            .column("name string")
            .column("age int")
            .column("ts string")
            .column("`partition` string")
            .pk("uuid")
            .partition("partition")
            .options(options);
    
        builder.sink(dataStream, false); // 2 番目のパラメーターは、入力データストリームが有界かどうかを示します
        env.execute("Flink_Hudi_Quick_Start");
      }
    }

よくある質問