このトピックでは、Hudi コネクタの使用方法について説明します。
市場のエコシステムの発展とサービス戦略の調整により、公式の組み込み Hudi コネクタは、Realtime Compute for Apache Flink の今後の Ververica Runtime (VVR) バージョンではサポートされなくなります。カスタムコネクタを使用して、Realtime Compute for Apache Flink を Apache Hudi に接続できます。また、データ移行を実行し、Paimon コネクタを使用して、最適化された機能とパフォーマンスを得ることをお勧めします。
背景情報
Apache Hudi は、データレイクのテーブルデータを管理するオープンソースのフレームワークです。Apache Hudi は、Alibaba Cloud Object Storage Service (OSS) または Hadoop 分散ファイルシステム (HDFS) に基づいてファイルレイアウトを編成し、原子性、一貫性、独立性、耐久性 (ACID) を保証し、効率的な行レベルのデータ更新と削除をサポートします。これにより、ETL (抽出、変換、書き出し) の開発が簡素化されます。Apache Hudi は、指定されたファイルサイズを維持するために、小さなファイルの自動管理とマージもサポートしています。これにより、データの挿入と更新中に生成される過剰な小さなファイルによるクエリパフォーマンスの低下を防ぎ、小さなファイルを手動で監視およびマージする O&M ワークロードを排除できます。
項目 | 説明 |
テーブルタイプ | ソーステーブルと結果テーブル |
実行モード | ストリーミングモードとバッチモード |
データ形式 | 該当なし |
メトリック |
説明 メトリックの詳細については、「メトリック」をご参照ください。 |
API タイプ | DataStream API と SQL API |
シンクテーブルのデータ更新または削除 | サポートされています |
機能
項目 | 説明 |
コア機能 |
|
一般的なシナリオ |
|
利点 | オープンソースの Hudi コミュニティと比較して、フルマネージド Flink に統合された Hudi は、より多くの利点を提供します。フルマネージド Flink に統合された Hudi は、次の利点を提供します。
|
制限事項
エンジンバージョンが vvr-4.0.11-flink-1.13 以降の Realtime Compute for Apache Flink のみが Hudi コネクタをサポートしています。
HDFS、Alibaba Cloud OSS、または OSS-HDFS のみがファイルシステムとして使用できます。
セッションクラスタではドラフトを公開できません。
Hudi コネクタを使用してテーブルのフィールドを変更することはできません。テーブルのフィールドを変更する場合は、DLF コンソールで Spark SQL ステートメントを使用します。
構文
CREATE TEMPORARY TABLE hudi_tbl (
uuid BIGINT,
data STRING,
ts TIMESTAMP(3),
PRIMARY KEY(uuid) NOT ENFORCED
) WITH (
'connector' = 'hudi',
'path' = 'oss://<yourOSSBucket>/<カスタムストレージディレクトリ>',
...
);WITH 句のパラメータ
基本パラメータ
共通パラメータ
パラメータ
説明
データ型
必須
デフォルト値
備考
connector
テーブルのタイプ。
STRING
はい
デフォルト値なし
値を hudi に設定します。
path
テーブルのストレージパス。
STRING
はい
デフォルト値なし
テーブルは、OSS バケット、HDFS、または OSS-HDFS に保存できます。
OSS: パスは
oss://<bucket>/<user-defined-dir>形式です。HDFS: パスは
hdfs://<user-defined-dir>形式です。OSS-HDFS: パスは
oss://<bucket>.<oss-hdfs-endpoint>/<user-defined-dir>形式です。説明VVR 8.0.3 以降を使用する Realtime Compute for Apache Flink でのみ、このパラメータを OSS-HDFS パスに設定できます。
パスのパラメータ:
bucket: 作成した OSS バケットの名前。
user-defined-dir: データが保存されるパス。
oss-hdfs-endpoint: OSS-HDFS サービスのエンドポイント。
OSS コンソールの OSS バケットの [概要] ページの [ポート] セクションで、[HDFS] の [エンドポイント] を表示できます。
hoodie.datasource.write.recordkey.field
プライマリキーフィールド。
STRING
いいえ
uuid
PRIMARY KEY 構文を使用してプライマリキーフィールドを指定できます。
複数のフィールドはコンマ (,) で区切ります。
precombine.field
バージョンフィールド。
STRING
いいえ
ts
このフィールドは、メッセージを更新する必要があるかどうかを判断するために使用されます。
このパラメータを構成しない場合、システムはエンジンで定義されたメッセージシーケンスに基づいてデータを更新します。
oss.endpoint
Alibaba Cloud OSS または OSS-HDFS のエンドポイント。
STRING
いいえ
デフォルト値なし
テーブルを OSS または OSS-HDFS に保存する場合は、このパラメータを指定する必要があります。
テーブルを OSS に保存する場合は、このパラメーターを OSS のエンドポイントに設定します。 OSS のエンドポイントの詳細については、「リージョンとエンドポイント」をご参照ください。
テーブルを OSS-HDFS に保存する場合は、このパラメーターを OSS-HDFS のエンドポイントに設定します。 OSS コンソールの OSS バケットの [概要] ページの [ポート] セクションで、HDFS の [エンドポイント] を表示できます。
accessKeyId
Alibaba Cloud アカウントの AccessKey ID。
STRING
いいえ
デフォルト値なし
テーブルを OSS または OSS-HDFS に保存する場合は、このパラメーターを指定する必要があります。
詳細については、「コンソール操作」をご参照ください。
重要AccessKey ペアを保護するために、変数を使用して AccessKey ID と AccessKey シークレットを設定することをお勧めします。詳細については、「変数の管理」をご参照ください。
accessKeySecret
Alibaba Cloud アカウントの AccessKey シークレット。
STRING
いいえ
デフォルト値なし
ソーステーブル専用の パラメーター
パラメーター
説明
データ型
必須
デフォルト値
備考
read.streaming.enabled
ストリーミング読み取りを有効にするかどうかを指定します。
BOOLEAN
いいえ
false
有効な値:
true: ストリーミング読み取りが有効です。
false: ストリーミング読み取りは無効です。
read.start-commit
データ消費の開始 オフセット です。
STRING
いいえ
Empty
有効な値:
yyyyMMddHHmmss フォーマット の 時間 : 指定された 時間 からデータが消費されます。
earliest: 最も古い オフセット からデータが消費されます。
この パラメーター が Empty の場合、最新の 時間 からデータが消費されます。
シンクテーブル専用の パラメーター
パラメーター
説明
データ型
必須
デフォルト値
備考
write.operation
書き込み 操作 が実行される モード。
STRING
いいえ
UPSERT
有効な値:
insert: データは追加モードでテーブルに書き込まれます。
upsert: データが更新されます。
bulk_insert: バッチデータは追加モードでテーブルに書き込まれます。
hive_sync.enable
Hiveへのメタデータ同期を有効にするかどうかを指定します。
BOOLEAN
いいえ
false
有効な値:
true: Hiveへのメタデータ同期が有効になります。
false: Hiveへのメタデータ同期が無効になります。
hive_sync.mode
Hiveデータが同期される モード。
STRING
いいえ
hms
有効な値:
hms: HiveメタストアまたはDLFにメタデータを同期する場合は、この パラメーター をhmsに設定します。
jdbc: Java Database Connectivity ( JDBC ) ドライバーにメタデータを同期する場合は、この パラメーター をjdbcに設定します。
hive_sync.db
データが同期されるHive データベース の名前。
STRING
いいえ
default
該当なし
hive_sync.table
データが同期されるHiveテーブルの名前。
STRING
いいえ
現在のテーブルの名前
Hudiからデータが同期されるHiveテーブルの名前には、ハイフン(-)を含めることはできません。
dlf.catalog.region
DLF サービス がアクティブ化されている リージョン の名前。
STRING
いいえ
デフォルト値なし
詳細については、「サポートされている リージョン と エンドポイント」をご参照ください。
説明dlf.catalog.region パラメーター は、hive_sync.mode パラメーター がhmsに設定されている場合にのみ有効になります。
この パラメーター の値が、dlf.catalog.endpoint パラメーター で指定された エンドポイント と一致していることを確認してください。
dlf.catalog.endpoint
DLFの エンドポイント。
STRING
いいえ
デフォルト値なし
詳細については、「サポートされている リージョン と エンドポイント」をご参照ください。
説明dlf.catalog.endpoint パラメーター は、hive_sync.mode パラメーター がhmsに設定されている場合にのみ有効になります。
dlf.catalog.endpoint パラメーター をDLFのVirtual Private Cloud ( VPC ) エンドポイント に設定することをお勧めします。たとえば、中国 (杭州) リージョン を選択した場合は、この パラメーター を dlf-vpc.cn-hangzhou.aliyuncs.com に設定します。
VPC間でDLFに アクセス する場合は、「Realtime Compute for Apache FlinkはVPC間で サービス にどのように アクセス しますか。」で説明されている手順に従ってください。
詳細パラメーター
Hudi は、さまざまな読み取りおよび書き込みシナリオをサポートしています。次の表は、さまざまなシナリオで設定できるパラメーターについて説明しています。
並列処理のパラメーター
パラメーター | 説明 | デフォルト値 | 備考 |
write.tasks | 書き込みタスクの並列度。各書き込みタスクは、1 ~ N 個のバケットに順番にデータを書き込みます。 | 4 | 書き込みタスクの並列処理を増やしても、小さなファイルの数は変わりません。 |
write.bucket_assign.tasks | バケットアサイナーオペレーターの並列度。 | Realtime Compute for Apache Flink デプロイの並列度 | 書き込みタスクの並列度とバケットアサイナーオペレーターの並列度を上げると、小さなファイルの数が増加します。 |
write.index_bootstrap.tasks | インデックスブートストラップオペレーターの並列度。 | Realtime Compute for Apache Flink デプロイの並列度 |
|
read.tasks | ストリーミングおよびバッチ読み取りオペレーターの並列度。 | 4 | 該当なし |
compaction.tasks | オンラインコンパクションオペレーターの並列度。 | 4 | オンラインコンパクションは、オフラインコンパクションよりも多くのリソースを消費します。オフラインコンパクションを実行することをお勧めします。 |
オンラインコンパクションのパラメーター
パラメーター | 説明 | デフォルト値 | 備考 |
compaction.schedule.enabled | コンパクションプランをスケジュールに従って生成するかどうかを指定します。 | true | 有効な値:
説明 compaction.async.enabled パラメーターが false に設定されている場合でも、このパラメーターを true に設定することをお勧めします。 |
compaction.async.enabled | 非同期コンパクションを有効にするかどうかを指定します。 | true | 有効な値:
説明 このパラメーターを false に設定して、オンラインコンパクションを無効にすることができます。ただし、compaction.schedule.enabled パラメーターを true に設定することをお勧めします。この場合、スケジュールに従って生成されたコンパクションプランを実行するために、オフライン非同期コンパクションを実行できます。 |
compaction.tasks | コンパクションタスクの並列度。 | 4 | 該当なし |
compaction.trigger.strategy | コンパクションをトリガーするために使用される戦略。 | num_commits | 有効な値:
|
compaction.delta_commits | コンパクションをトリガーするために必要なコミットの最大数。 | 5 | 該当なし |
compaction.delta_seconds | コンパクションがトリガーされる間隔。 | 3600 | 単位: 秒。 |
compaction.max_memory | コンパクションおよび重複排除に使用されるハッシュマップの最大メモリ。 | 100 MB | リソースが十分にある場合は、このパラメーターの値を 1 GB に変更することをお勧めします。 |
compaction.target_io | 各コンパクションプランの最大 I/O スループット。 | 500 GB | 該当なし |
ファイル関連のパラメーター
ファイル関連のパラメーターは、ファイルサイズを管理するために使用されます。次の表に、サポートされているパラメーターを示します。
パラメーター | 説明 | デフォルト値 | 備考 |
hoodie.parquet.max.file.size | データを書き込むことができる Parquet ファイルの最大サイズ。 Parquet ファイルに書き込まれるデータがこのパラメーターで指定されたサイズを超える場合、超過データは新しいファイルグループに書き込まれます。 | 120 * 1024 * 1024 バイト (120 MB) | 単位: バイト。 |
hoodie.parquet.small.file.limit | 小さなファイルのファイルサイズのしきい値。サイズがこのパラメーターの値未満のファイルは、小さなファイルと見なされます。 | 104857600 バイト (100 MB) |
|
hoodie.copyonwrite.record.size.estimate | レコードサイズの推定値。 | 1024 バイト (1 KB) |
|
Hadoop 関連のパラメーター
パラメーター | 説明 | デフォルト値 | 備考 |
hadoop.${you option key} | hadoop. プレフィックスを使用して指定された Hadoop 設定項目。 | デフォルト値なし | 複数の Hadoop 設定項目を同時に指定できます。 説明 このパラメーターは、Hudi 0.12.0 以降でサポートされています。クラスタ間のコミットと実行の要件を満たすために、DDL 文を使用してジョブごとの Hadoop 構成を指定できます。 |
データ書き込みのパラメーター
Hudi は、バッチ書き込みやストリーミング書き込みなど、さまざまな書き込み方法をサポートしています。Hudi は、変更ログデータやログデータなど、さまざまなデータ型をサポートしています。Hudi はまた、異なるインデックススキームもサポートしています。
バッチ書き込みのパラメーター
他のデータソースから Hudi に既存のデータをインポートする場合、バッチインポート機能を使用して既存のデータを Hudi テーブルにインポートできます。
パラメーター
説明
デフォルト値
備考
write.operation
書き込み操作のタイプ。
upsert
有効な値:
upsert: データが挿入および更新されます。
insert: データが挿入されます。
bulk_insert: データはバッチで書き込まれます。
説明このパラメーターを bulk_insert に設定すると、Avro ベースのデータシリアル化とデータコンパクションは実行されません。データのインポート後に重複排除は実行されません。データの一意性に関する要件が高い場合は、インポートされたデータが一意であることを確認してください。
write.operation パラメーターを bulk_insert に設定できるのは、バッチ実行モードの場合のみです。このモードでは、システムは入力データをパーティション名でソートし、デフォルトで Hudi テーブルにデータを書き込みます。このようにして、書き込み操作は異なるファイル間で頻繁に切り替えられず、システムパフォーマンスが低下しません。
write.tasks
bulk_insert 書き込みタスクの並列度。
Realtime Compute for Apache Flink デプロイの並列度
bulk_insert 書き込みタスクの並列度は、write.tasks パラメーターで指定され、小さなファイルの数に影響します。
理論的には、bulk_insert 書き込みタスクの並列度はバケットの数と同じです。各バケットのファイルに書き込まれるデータがファイルの最大サイズに達すると、データは新しいファイルハンドルにロールオーバーされます。したがって、書き込みファイルの最終的な数は、bulk_insert 書き込みタスクの並列度以上になります。Parquet ファイルの最大サイズは 120 MB です。
write.bulk_insert.shuffle_input
バッチ挿入タスクのパーティションフィールドに基づいて入力データをシャッフルするかどうかを指定します。
true
Hudi 0.11.0 以降では、このパラメーターを true に設定して、小さなファイルの数を減らすことができます。ただし、これによりデータスキューが発生する可能性があります。
write.bulk_insert.sort_input
バッチ挿入タスクのパーティションフィールドに基づいて入力データをソートするかどうかを指定します。
true
Hudi 0.11.0 以降では、複数のパーティションにデータを書き込む書き込みタスクを実行する場合、このパラメーターを true に設定して、小さなファイルの数を減らすことができます。.
write.sort.memory
ソートオペレーターの使用可能な管理対象メモリ。
128
単位: MB。
変更ログモードを有効にするパラメーター
変更ログモードでは、Hudi はメッセージのすべての変更 (INSERT、UPDATE_BEFORE、UPDATE_AFTER、および DELETE) を保持し、Flink エンジンのステートフルコンピューティングと連携して、エンドツーエンドのほぼリアルタイムのデータウェアハウスを実装します。Hudi の Merge on Read (MOR) テーブルは、行指向ストレージに基づくメッセージのすべての変更の保持をサポートしています。フルマネージド Flink は、ストリーミングモードで MOR テーブルを読み取って、すべての変更レコードを消費できます。
説明変更ログモード以外では、単一のストリーミング読み取りのバッチデータセットの中間変更をマージできます。すべての中間結果は、バッチ読み取り (スナップショット読み取り) でマージされます。中間状態は、書き込まれているかどうかに関係なく無視されます。
パラメーター
説明
デフォルト値
備考
changelog.enabled
すべての変更を消費するかどうかを指定します。
false
有効な値:
true: すべての変更を消費できます。
false: すべての変更を消費できるわけではありません。UPSERT セマンティクスが使用されます。すべてのメッセージの中で最後にマージされたメッセージのみが保証され、中間変更はマージされる可能性があります。
説明changelog.enabled パラメーターを true に設定した後も、非同期コンパクションタスクは中間変更を 1 つのデータレコードにマージします。ストリーミング読み取り中にデータが最も早い機会に消費されない場合、データのコンパクション後に最後のデータレコードのみを読み取ることができます。データコンパクションのバッファー時間を変更して、リーダーがデータを読み取って消費するための特定の時間を確保できます。たとえば、compaction.delta_commits パラメーターを 5 に、compaction.delta_seconds パラメーターを 3600 に設定できます。
追記モード (Hudi 0.10.0 以降でサポート)
追記モードでは:
小さなファイルポリシーは MOR テーブルに適用されます。データは追記モードで Avro ログファイルに書き込まれます。
小さなファイルポリシーは COW テーブルには適用されません。Copy on Write (COW) テーブルにデータが書き込まれるたびに、新しい Parquet ファイルが生成されます。
クラスタリングポリシーのパラメーター
Hudi は、INSERT モードでの小さなファイルの問題を解決するために、さまざまなクラスタリングポリシーをサポートしています。
インラインクラスタリングのパラメーター (COW テーブルでのみサポート)
パラメーター
説明
デフォルト値
備考
write.insert.cluster
データの書き込み中に小さなファイルをマージするかどうかを指定します。
false
有効な値:
true: データの書き込み中に小さなファイルがマージされます。
false: データの書き込み中に小さなファイルはマージされません。
説明デフォルトでは、COW テーブルで INSERT 操作が実行されると、小さなファイルはマージされません。このパラメーターを true に設定すると、INSERT 操作が実行されるたびに既存の小さなファイルがマージされます。ただし、重複排除は実行されません。その結果、書き込みスループットが低下します。
非同期クラスタリングのパラメーター (Hudi 0.12.0 以降でサポート)
パラメーター
説明
デフォルト値
備考
clustering.schedule.enabled
データの書き込み中にクラスタリングプランをスケジュールするかどうかを指定します。
false
このパラメーターを true に設定すると、クラスタリングプランは定期的にスケジュールされます。
clustering.delta_commits
クラスタリングプランを生成するために必要なコミットの数。
4
clustering.schedule.enabled パラメーターが true に設定されている場合、このパラメーターが有効になります。
clustering.async.enabled
クラスタリングプランを非同期で実行するかどうかを指定します。
false
このパラメーターを true に設定すると、クラスタリングプランは小さなファイルをマージするために定期的に非同期で実行されます。
clustering.tasks
クラスタリングタスクの並列度。
4
該当なし
clustering.plan.strategy.target.file.max.bytes
クラスタリングのファイルの最大サイズ。
1024 * 1024 * 1024
単位: バイト。
clustering.plan.strategy.small.file.limit
クラスタリングに使用される小さなファイルのファイルサイズのしきい値。
600
サイズがこのパラメーターの値未満のファイルのみをクラスタリングに使用できます。
clustering.plan.strategy.sort.columns
クラスタリングのためにデータをソートする基準となる列。
デフォルト値なし
特別なソートフィールドを指定できます。
クラスタリングプラン戦略のパラメーター
パラメーター
説明
デフォルト値
備考
clustering.plan.partition.filter.mode
クラスタリングプランの作成に使用されるパーティションフィルターモード。
NONE
有効な値:
NONE: パーティションはフィルタリングされません。すべてのパーティションがクラスタリングに選択されます。
RECENT_DAYS: データが日単位でパーティション分割されている場合、指定された日数の最近のパーティションがクラスタリングに選択されます。
SELECTED_PARTITIONS: 指定されたパーティションがクラスタリングに選択されます。
clustering.plan.strategy.daybased.lookback.partitions
clustering.plan.partition.filter.mode パラメーターが RECENT_DAYS に設定されている場合に、パーティションをクラスタリングに選択する基準となる日数。
2
このパラメーターは、clustering.plan.partition.filter.mode パラメーターが RECENT_DAYS に設定されている場合にのみ有効になります。
clustering.plan.strategy.cluster.begin.partition
パーティションをフィルタリングするために使用される開始パーティション。
デフォルト値なし
このパラメーターは、clustering.plan.partition.filter.mode パラメーターが SELECTED_PARTITIONS に設定されている場合にのみ有効になります。
clustering.plan.strategy.cluster.end.partition
パーティションをフィルタリングするために使用される終了パーティション。
デフォルト値なし
このパラメーターは、clustering.plan.partition.filter.mode パラメーターが SELECTED_PARTITIONS に設定されている場合にのみ有効になります。
clustering.plan.strategy.partition.regex.pattern
パーティションを指定するために使用される正規表現。
デフォルト値なし
該当なし
clustering.plan.strategy.partition.selected
選択されたパーティション。
デフォルト値なし
複数のパーティションはコンマ (,) で区切ります。
バケットインデックスタイプに関連するパラメーター
説明次の表のパラメーターは、Hudi 0.11.0 以降でサポートされています。
パラメーター
説明
デフォルト値
備考
index.type
インデックスのタイプ。
FLINK_STATE
有効な値:
FLINK_STATE: Flink 状態インデックスタイプが使用されます。
BUCKET: バケットインデックスタイプが使用されます。
index.type=bucketを構成する場合、バケットインデックスタイプはパーティション間の変更をサポートしていないため、index.global.enabledパラメーターを true に設定することは無効です。この場合、グローバルインデックス機能が有効になっていても、複数のパーティションで重複排除を実行することはできません。
テーブル内のデータレコードが 5 億件を超えるなど、データ量が大きい場合、Flink 状態のストレージオーバーヘッドがボトルネックになる可能性があります。バケットインデックスタイプは、固定ハッシュポリシーを使用して、同じキーを含むデータを同じファイルグループに割り当てます。これは、インデックスのストレージとクエリのオーバーヘッドを防ぐのに役立ちます。バケットインデックスタイプと Flink 状態インデックスタイプには、次の違いがあります。
Flink 状態インデックスタイプと比較して、バケットインデックスタイプにはストレージとコンピューティングのオーバーヘッドがありません。バケットインデックスタイプは、Flink 状態インデックスタイプよりも優れたパフォーマンスを提供します。
バケットインデックスタイプを使用する場合、バケットの数を増やすことはできません。Flink 状態インデックスタイプを使用する場合、ファイルサイズに基づいてファイルの数を動的に増やすことができます。
バケットインデックスタイプは、パーティション間の変更をサポートしていません。データがパーティションに書き込まれると、バケットインデックスは変更されません。たとえば、削除操作は現在のパーティションのデータにのみ影響し、他のパーティションの同じデータには影響しません。
説明入力データが CDC ストリーミングデータの場合、この制限は適用されません。
hoodie.bucket.index.hash.field
バケットインデックスタイプを使用する場合のハッシュキーフィールド。
プライマリキー
このパラメーターは、プライマリキーのサブセットに設定できます。
hoodie.bucket.index.num.buckets
バケットインデックスタイプを使用する場合のバケットの数。
4
デフォルトでは、このパラメーターの値は各パーティションのバケットの数です。構成後、このパラメーターの値を変更することはできません。
データ読み取りのパラメーター
Hudi は、バッチ読み取り、ストリーミング読み取り、増分データ読み取りなど、さまざまな読み取り方法をサポートしています。Hudi はまた、変更ログを消費および転送して、エンドツーエンドの増分 ETL を実装することもできます。
ストリーミング読み取りのパラメーター
デフォルトでは、テーブルにはスナップショット読み取りが使用されます。この場合、最新の完全スナップショットデータが一度に読み取られ、返されます。read.streaming.enabled パラメーターを true に設定して、ストリーミング読み取りを有効にすることができます。read.start-commit パラメーターを構成して、ストリーミング読み取りの開始オフセットを指定できます。read.start-commit パラメーターを earliest に設定して、最も早いオフセットからデータを消費できるようにすることができます。
パラメーター
説明
デフォルト値
備考
read.streaming.enabled
ストリーミング読み取りを有効にするかどうかを指定します。
false
有効な値:
true: ストリーミング読み取りが有効になります。
false: ストリーミング読み取りが無効になります。
read.start-commit
ストリーミング読み取りの開始オフセット。
空のまま
有効な値:
yyyyMMddHHmmss 形式の時間: 指定された時間からデータが消費されます。
earliest: データは最も早いオフセットから消費されます。
このパラメーターを空のままにすると、データは最新の時間から消費されます。
clean.retain_commits
クリーナーによって保持できる履歴コミットの最大数。
30
履歴コミットの数がこのパラメーターの値を超えると、超過した履歴コミットは削除されます。変更ログモードでは、このパラメーターを使用して、変更ログを保持する期間を制御できます。たとえば、チェックポイント期間が 5 分の場合、変更ログはデフォルトで少なくとも 150 分間保持されます。
重要変更ログのストリーミング読み取りは、Hudi 0.10.0 以降でのみサポートされています。変更ログモードでは、Hudi はダウンストリームコンシューマーが消費するために変更ログを一定期間保持します。
変更ログはコンパクションタスクでマージされる場合があります。この場合、中間レコードが削除され、計算結果に影響を与える可能性があります。
増分データ読み取りのパラメーター (Hudi 0.10.0 以降でサポート)
フルマネージド Flink は、DataStream コネクタを使用した増分消費、バッチ増分消費、およびタイムトラベル機能を使用した特定の時点でのデータのバッチ消費をサポートしています。
パラメーター
説明
デフォルト値
備考
read.start-commit
データ消費の開始オフセット。
最新のオフセットからのコミット
これらのパラメーターの値は yyyyMMddHHmmss 形式です。
間隔は閉区間であり、開始オフセットと終了オフセットが含まれます。
read.end-commit
データ消費の終了オフセット。
最新のオフセットからのコミット
サンプルコード
ソーステーブルのサンプルコード
CREATE TEMPORARY TABLE blackhole (
id INT NOT NULL PRIMARY KEY NOT ENFORCED,
data STRING,
ts TIMESTAMP(3)
) WITH (
'connector' = 'blackhole'
);
CREATE TEMPORARY TABLE hudi_tbl (
id INT NOT NULL PRIMARY KEY NOT ENFORCED,
data STRING,
ts TIMESTAMP(3)
) WITH (
'connector' = 'hudi',
'oss.endpoint' = '<yourOSSEndpoint>',
'accessKeyId' = '${secret_values.ak_id}',
'accessKeySecret' = '${secret_values.ak_secret}',
'path' = 'oss://<yourOSSBucket>/<Custom storage directory>',
'table.type' = 'MERGE_ON_READ',
'read.streaming.enabled' = 'true'
);
-- ストリーミングモードで最新のコミットからデータを読み取り、Blackhole に書き込みます。
INSERT INTO blackhole SELECT * from hudi_tbl;シンクテーブルのサンプルコード
CREATE TEMPORARY TABLE datagen(
id INT NOT NULL PRIMARY KEY NOT ENFORCED,
data STRING,
ts TIMESTAMP(3)
) WITH (
'connector' = 'datagen' ,
'rows-per-second'='100'
);
CREATE TEMPORARY TABLE hudi_tbl (
id INT NOT NULL PRIMARY KEY NOT ENFORCED,
data STRING,
ts TIMESTAMP(3)
) WITH (
'connector' = 'hudi',
'oss.endpoint' = '<yourOSSEndpoint>',
'accessKeyId' = '${secret_values.ak_id}',
'accessKeySecret' = '${secret_values.ak_secret}',
'path' = 'oss://<yourOSSBucket>/<Custom storage directory>',
'table.type' = 'MERGE_ON_READ'
);
INSERT INTO hudi_tbl SELECT * from datagen;DataStream API
DataStream API を呼び出してデータの読み取りまたは書き込みを行うには、関連タイプの DataStream コネクタを使用して Realtime Compute for Apache Flink に接続する必要があります。 DataStream コネクタの構成方法の詳細については、「DataStream コネクタの設定」をご参照ください。
maven pom
使用されている VVR バージョンに基づいて、Realtime Compute for Apache Flink と Hudi のバージョンを指定します。
<properties> <maven.compiler.source>8</maven.compiler.source> <maven.compiler.target>8</maven.compiler.target> <flink.version>1.15.4</flink.version> <hudi.version>0.13.1</hudi.version> </properties> <dependencies> <!-- flink --> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-streaming-java</artifactId> <version>${flink.version}</version> <scope>provided</scope> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-table-common</artifactId> <version>${flink.version}</version> <scope>provided</scope> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-table-api-java-bridge</artifactId> <version>${flink.version}</version> <scope>provided</scope> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-table-planner_2.12</artifactId> <version>${flink.version}</version> <scope>provided</scope> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-clients</artifactId> <version>${flink.version}</version> <scope>provided</scope> </dependency> <!-- hudi --> <dependency> <groupId>org.apache.hudi</groupId> <artifactId>hudi-flink1.15-bundle</artifactId> <version>${hudi.version}</version> <scope>provided</scope> </dependency> <!-- oss --> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-common</artifactId> <version>3.3.2</version> <scope>provided</scope> </dependency> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-aliyun</artifactId> <version>3.3.2</version> <scope>provided</scope> </dependency> <!-- dlf --> <dependency> <groupId>com.aliyun.datalake</groupId> <artifactId>metastore-client-hive2</artifactId> <version>0.2.14</version> <scope>provided</scope> </dependency> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-mapreduce-client-core</artifactId> <version>2.5.1</version> <scope>provided</scope> </dependency> </dependencies>重要DLF で使用される特定の依存関係は、
hive-commonやhive-execなどのオープンソースの Hive バージョンと競合します。オンプレミス環境で DLF をテストする場合は、hive-common および hive-exec JAR パッケージをダウンロードし、IntelliJ IDEA に手動でインポートできます。Hudi にデータを書き込む
import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.table.data.GenericRowData; import org.apache.flink.table.data.RowData; import org.apache.flink.table.data.StringData; import org.apache.hudi.common.model.HoodieTableType; import org.apache.hudi.configuration.FlinkOptions; import org.apache.hudi.util.HoodiePipeline; import java.util.HashMap; import java.util.Map; public class FlinkHudiQuickStart { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); String dbName = "test_db"; String tableName = "test_tbl"; String basePath = "oss://xxx"; Map<String, String> options = new HashMap<>(); // hudi conf options.put(FlinkOptions.PATH.key(), basePath); options.put(FlinkOptions.TABLE_TYPE.key(), HoodieTableType.MERGE_ON_READ.name()); options.put(FlinkOptions.PRECOMBINE_FIELD.key(), "ts"); options.put(FlinkOptions.DATABASE_NAME.key(), dbName); options.put(FlinkOptions.TABLE_NAME.key(), tableName); // oss conf options.put("hadoop.fs.oss.accessKeyId", "xxx"); options.put("hadoop.fs.oss.accessKeySecret", "xxx"); // ローカルデバッグの場合はパブリックエンドポイント (例: oss-cn-hangzhou.aliyuncs.com) を使用します。クラスタ送信の場合は内部エンドポイント (例: oss-cn-hangzhou-internal.aliyuncs.com) を使用します。 options.put("hadoop.fs.oss.endpoint", "xxx"); options.put("hadoop.fs.AbstractFileSystem.oss.impl", "org.apache.hadoop.fs.aliyun.oss.OSS"); options.put("hadoop.fs.oss.impl", "org.apache.hadoop.fs.aliyun.oss.AliyunOSSFileSystem"); // dlf conf options.put(FlinkOptions.HIVE_SYNC_ENABLED.key(), "true"); // Hudi データを DLF に同期するかどうかを決定できます。 options.put(FlinkOptions.HIVE_SYNC_MODE.key(), "hms"); options.put(FlinkOptions.HIVE_SYNC_DB.key(), dbName); options.put(FlinkOptions.HIVE_SYNC_TABLE.key(), tableName); options.put("hadoop.dlf.catalog.id", "xxx"); options.put("hadoop.dlf.catalog.accessKeyId", "xxx"); options.put("hadoop.dlf.catalog.accessKeySecret", "xxx"); options.put("hadoop.dlf.catalog.region", "xxx"); // ローカルデバッグの場合はパブリックエンドポイント (例: dlf.cn-hangzhou.aliyuncs.com) を使用します。クラスタ送信の場合は内部エンドポイント (例: dlf-vpc.cn-hangzhou.aliyuncs.com) を使用します。 options.put("hadoop.dlf.catalog.endpoint", "xxx"); options.put("hadoop.hive.imetastoreclient.factory.class", "com.aliyun.datalake.metastore.hive2.DlfMetaStoreClientFactory"); DataStream<RowData> dataStream = env.fromElements( GenericRowData.of(StringData.fromString("id1"), StringData.fromString("name1"), 22, StringData.fromString("1001"), StringData.fromString("p1")), GenericRowData.of(StringData.fromString("id2"), StringData.fromString("name2"), 32, StringData.fromString("1002"), StringData.fromString("p2")) ); HoodiePipeline.Builder builder = HoodiePipeline.builder(tableName) .column("uuid string") .column("name string") .column("age int") .column("ts string") .column("`partition` string") .pk("uuid") .partition("partition") .options(options); builder.sink(dataStream, false); // 2 番目のパラメーターは、入力データストリームが有界かどうかを示します env.execute("Flink_Hudi_Quick_Start"); } }