組み込みの Hudi コネクタは、Ververica Runtime (VVR) の将来のバージョンではサポートされなくなります。カスタムコネクタを使用して Realtime Compute for Apache Flink を Apache Hudi に接続するか、最適化された機能とパフォーマンスを得るために Paimon コネクタに移行してください。
Apache Hudi は、Object Storage Service (OSS) または Hadoop 分散ファイルシステム (HDFS) に保存されたテーブルデータを管理するオープンソースのデータレイクフレームワークです。ACID (原子性、一貫性、分離性、永続性) の保証、行レベルのアップサートと削除、小規模ファイルの自動管理、およびタイムトラベルクエリを提供します。
主な特徴
| 特徴 | 説明 |
|---|---|
| ACID セマンティクス | デフォルトでスナップショット分離を提供し、同時読み取り/書き込みにおけるデータ整合性を保証します。 |
| UPSERT セマンティクス | INSERT と UPDATE を組み合わせます。レコードが存在しない場合は挿入され、存在する場合は更新されます。これにより、ETL 開発コードが簡素化されます。 |
| タイムトラベル | 特定の時点の履歴データバージョンにアクセスでき、効率的なデータ監査と品質管理を可能にします。 |
代表的なシナリオ
| シナリオ | 説明 |
|---|---|
| データベース取り込みの高速化 | Change Data Capture (CDC) データ (例:MySQL CDC コネクタ経由の MySQL バイナリログ) を Hudi テーブルに直接書き込み、ダウンストリームのリアルタイム ETL を実現します。オフラインの一括ロードよりもコスト効率が高いです。 |
| 増分 ETL | Hudi から変更データストリームを増分抽出し、軽量なリアルタイム ETL を実現します。ダウンストリームの OLAP には Apache Presto または Apache Spark を使用します。 |
| メッセージキューイング | 小規模なシナリオで Hudi を軽量なメッセージキューの代替として使用し、アプリケーションアーキテクチャを簡素化します。 |
| データバックフィル | Hive メタストア内の Hudi テーブルから完全データと増分データを結合し、最小限のコンピュートオーバーヘッドでワイドテーブルを生成します。 |
オープンソース Hudi に対する利点
-
メンテナンスフリー:組み込みの Hudi コネクタにより、運用保守の複雑さが軽減され、SLA が保証されます。
-
データ接続性の向上:データをコンピュートエンジンから分離し、Apache Flink、Apache Spark、Apache Presto、Apache Hive 間でのシームレスな移行を可能にします。
-
データベースからレイクへの取り込みの簡素化:Flink CDC コネクタと連携し、データ開発を効率化します。
-
エンタープライズクラスの機能:Data Lake Formation (DLF) による統合メタデータ管理と、自動的な軽量スキーマ変更を提供します。
-
コスト効率の高いストレージ:データは Alibaba Cloud OSS に Apache Parquet または Apache Avro フォーマットで保存され、ストレージとコンピューティングの分離により柔軟なリソースのスケーリングが可能です。
サポートされる構成
| 項目 | 値 |
|---|---|
| テーブルタイプ | ソーステーブル、結果テーブル |
| 実行モード | ストリーミングモード、バッチモード |
| データフォーマット | N/A |
| API タイプ | DataStream API、SQL API |
| sink 内のデータの更新/削除 | サポート |
| 最小 VVR バージョン | vvr-4.0.11-flink-1.13 |
| サポートされるファイルシステム | OSS、HDFS、OSS-HDFS |
メトリクス
| テーブルタイプ | メトリクス |
|---|---|
| ソーステーブル | numRecordsIn、numRecordsInPerSecond |
| 結果テーブル | numRecordsOut、numRecordsOutPerSecond、currentSendTime |
メトリックの定義については、「メトリック」をご参照ください。
制限事項
-
最小エンジンバージョン:vvr-4.0.11-flink-1.13 以降。
-
サポートされるファイルシステム:OSS、HDFS、または OSS-HDFS のみ。
-
ドラフトジョブはセッションクラスターでは実行できません。
-
Hudi コネクタによるフィールドの変更はサポートされていません。フィールドを変更するには、Data Lake Formation (DLF) コンソールで Spark SQL ステートメントを使用してください。
構文
CREATE TEMPORARY TABLE hudi_tbl (
uuid BIGINT,
data STRING,
ts TIMESTAMP(3),
PRIMARY KEY(uuid) NOT ENFORCED
) WITH (
'connector' = 'hudi',
'path' = 'oss://<yourOSSBucket>/<Custom storage directory>',
...
);
WITH 句のパラメーター
基本パラメーター
共通パラメーター
| パラメーター | 必須 | デフォルト | 説明 |
|---|---|---|---|
connector |
はい | — | hudi に設定します。 |
path |
はい | — | テーブルのストレージパス。サポートされるフォーマット:OSS (oss://<bucket>/<user-defined-dir>)、HDFS (hdfs://<user-defined-dir>)、OSS-HDFS (oss://<bucket>.<oss-hdfs-endpoint>/<user-defined-dir>)。OSS-HDFS パスには VVR 8.0.3 以降が必要です。OSS-HDFS エンドポイントは、OSS バケットの [概要] ページの [ポート] セクションで確認できます。 |
hoodie.datasource.write.recordkey.field |
いいえ | uuid |
プライマリキーフィールド。複数のフィールドはカンマで区切ります。または、DDL で PRIMARY KEY 構文を使用することもできます。 |
precombine.field |
いいえ | ts |
更新順序を決定するために使用されるバージョンフィールド。設定しない場合、更新はエンジンによって定義されたメッセージシーケンスに従います。 |
oss.endpoint |
いいえ | — | データを OSS または OSS-HDFS に保存する場合に必要です。OSS エンドポイントについては、「リージョンとエンドポイント」をご参照ください。OSS-HDFS エンドポイントについては、OSS バケットの [概要] ページの [ポート] セクションをご参照ください。 |
accessKeyId |
いいえ | — | AccessKey ID。OSS および OSS-HDFS で必要です。認証情報はハードコーディングするのではなく、変数として保存してください。詳細については、「変数の管理」をご参照ください。 |
accessKeySecret |
いいえ | — | AccessKey Secret。OSS および OSS-HDFS で必要です。 |
AccessKey ペアを保護するには、AccessKey ID と AccessKey Secret を変数として保存します。詳細については、「変数の管理」をご参照ください。
ソーステーブルのパラメーター
| パラメーター | 必須 | デフォルト | 説明 |
|---|---|---|---|
read.streaming.enabled |
いいえ | false |
ストリーミング読み取りを有効にするには true に設定します。デフォルトではスナップショット読み取りが使用され、最新の完全なスナップショットが返されます。 |
read.start-commit |
いいえ | (ブランク) | ストリーミング読み取りの開始オフセット。フォーマット:特定の時間の場合は yyyyMMddHHmmss、最初から読み取る場合は earliest。最新のコミットから読み取る場合はブランクのままにします。 |
シンクテーブルのパラメーター
| パラメーター | 必須 | デフォルト | 説明 |
|---|---|---|---|
write.operation |
いいえ | UPSERT |
書き込みモード。有効な値:insert (追加)、upsert (挿入または更新)、bulk_insert (バッチ追加)。 |
hive_sync.enable |
いいえ | false |
メタデータを Apache Hive に同期するには true に設定します。 |
hive_sync.mode |
いいえ | hms |
同期モード。hms は Hive メタストアまたは DLF に同期します。jdbc は Java Database Connectivity (JDBC) ドライバー経由で同期します。 |
hive_sync.db |
いいえ | default |
ターゲット Hive データベース名。 |
hive_sync.table |
いいえ | 現在のテーブル名 | ターゲット Hive テーブル名。ハイフン (-) を含めることはできません。 |
dlf.catalog.region |
いいえ | — | DLF が有効化されているリージョン。hive_sync.mode が hms の場合にのみ有効です。詳細については、「サポートされているリージョンとエンドポイント」をご参照ください。dlf.catalog.endpoint で指定されたリージョンと一致する必要があります。 |
dlf.catalog.endpoint |
いいえ | — | DLF エンドポイント。hive_sync.mode が hms の場合にのみ有効です。レイテンシを低減するために VPC エンドポイントを使用してください。たとえば、中国 (杭州) リージョンの場合は dlf-vpc.cn-hangzhou.aliyuncs.com です。詳細については、「サポートされているリージョンとエンドポイント」をご参照ください。VPC 間アクセスについては、「Realtime Compute for Apache Flink はどのようにして VPC をまたいでサービスにアクセスしますか? |
詳細パラメーター
並列処理パラメーター
| パラメーター | デフォルト | 説明 |
|---|---|---|
write.tasks |
4 |
書き込みタスクの並列度。各タスクは 1 つ以上のバケットに順次書き込みます。この値を増やしても小規模ファイル数は増加しません。 |
write.bucket_assign.tasks |
Deployment の並列度 | バケットアサイナー演算子の並列度。この値を増やすと小規模ファイル数が増加します。 |
write.index_bootstrap.tasks |
Deployment の並列度 | インデックスブートストラップ演算子の並列度。index.bootstrap.enabled が true の場合にのみ有効です。この値を増やすとブートストラップのスループットが向上しますが、ブートストラップ中にチェックポイント処理がブロックされる可能性があります。必要に応じてチェックポイント失敗の許容度を増やしてください。 |
read.tasks |
4 |
ストリーミングおよびバッチ読み取り演算子の並列度。 |
compaction.tasks |
4 |
オンラインコンパクション演算子の並列度。オンラインコンパクションはオフラインコンパクションよりも多くのリソースを消費します。本番ワークロードではオフラインコンパクションを推奨します。 |
オンラインコンパクションパラメーター
| パラメーター | デフォルト | 説明 |
|---|---|---|
compaction.schedule.enabled |
true |
スケジュールに基づいてコンパクション計画を生成するかどうか。非同期コンパクションが無効になっている場合でも、この値を true に保つことで、オフラインコンパクションがスケジュールされた計画を実行できます。 |
compaction.async.enabled |
true |
コンパクションを非同期で実行するかどうか。false に設定すると、計画生成をアクティブに保ちながらオンラインコンパクションを無効にします。 |
compaction.tasks |
4 |
コンパクションタスクの並列度。 |
compaction.trigger.strategy |
num_commits |
コンパクションをトリガーするために使用される戦略。有効な値:num_commits、time_elapsed、num_and_time、num_or_time。 |
compaction.delta_commits |
5 |
コンパクションをトリガーするために必要なコミット数。num_commits、num_and_time、または num_or_time と共に使用されます。 |
compaction.delta_seconds |
3600 |
コンパクショントリガー間の間隔 (秒)。time_elapsed、num_and_time、または num_or_time と共に使用されます。 |
compaction.max_memory |
100 MB |
コンパクションおよび重複排除中に使用されるハッシュマップの最大メモリ。リソースが許す場合は 1 GB に増やしてください。 |
compaction.target_io |
500 GB |
コンパクション計画ごとの最大 I/O スループット。 |
ファイルサイズパラメーター
これらのパラメーターは、Hudi が小規模ファイルの蓄積を防ぐためにファイルサイズを管理する方法を制御します。
| パラメーター | デフォルト | 説明 |
|---|---|---|
hoodie.parquet.max.file.size |
120 MB (120 × 1024 × 1024 バイト) | Parquet ファイルの最大サイズ。このしきい値を超えるデータは新しいファイルグループに書き込まれます。 |
hoodie.parquet.small.file.limit |
100 MB (104,857,600 バイト) | このしきい値より小さいファイルは小規模ファイルとして扱われます。書き込み中、Hudi は新しいファイルを作成する代わりに既存の小規模ファイルに追加します。 |
hoodie.copyonwrite.record.size.estimate |
1 KB (1,024 バイト) | 推定レコードサイズ。設定されていない場合、Hudi はコミットされたメタデータから動的にこれを計算します。 |
Hadoop 構成パラメーター
| パラメーター | デフォルト | 説明 |
|---|---|---|
hadoop.${option key} |
— | Hadoop 設定項目。hadoop. プレフィックスで指定します。Hudi 0.12.0 以降でサポートされています。DDL ステートメントを使用して、クラスター間シナリオのジョブごとの Hadoop 構成を指定します。複数の項目を同時に指定できます。 |
データ書き込みパラメーター
バッチ書き込み
バッチ書き込みを使用して、他のソースから Hudi テーブルに既存データをインポートします。
bulk_insertは Avro シリアル化、コンパクション、および重複排除をスキップします。このモードを使用する前に、ソースデータの一意性を保証してください。bulk_insertはバッチ実行モードでのみ有効です。
| パラメーター | デフォルト | 説明 |
|---|---|---|
write.operation |
upsert |
書き込みタイプ。バッチ書き込みの場合は bulk_insert に設定します。 |
write.tasks |
Deployment の並列度 | bulk_insert タスクの並列度。最終的な出力ファイル数はこの値以上になります (120 MB の Parquet 制限に達すると、データは新しいファイルにロールオーバーされます)。 |
write.bulk_insert.shuffle_input |
true |
書き込み前に入力データをパーティションフィールドでシャッフルするかどうか。Hudi 0.11.0 以降で利用可能です。小規模ファイル数を減らしますが、データスキューを引き起こす可能性があります。 |
write.bulk_insert.sort_input |
true |
書き込み前に入力データをパーティションフィールドでソートするかどうか。Hudi 0.11.0 以降で利用可能です。単一のタスクが複数のパーティションに書き込む場合に小規模ファイル数を減らします。 |
write.sort.memory |
128 |
ソート演算子で利用可能な管理メモリ (MB)。 |
変更ログモード
変更ログモードでは、Hudi はすべての変更イベント (INSERT、UPDATE_BEFORE、UPDATE_AFTER、DELETE) を保持し、Flink のステートフル計算によるエンドツーエンドのニアリアルタイムデータウェアハウジングを可能にします。Merge On Read (MOR) テーブルはこのモードをサポートします。
非変更ログモードでは、バッチ内の中間変更はマージされます。スナップショット読み取りは最終的にマージされた結果のみを返します。書き込みパスに関係なく、中間状態は表示されません。
変更ログモードを有効にした後も、非同期コンパクションタスクは中間変更をマージします。compaction.delta_commits=5 および compaction.delta_seconds=3600 を設定して、ダウンストリームコンシューマーがレコードをコンパクションされる前に読み取るのに十分な時間を与えてください。
| パラメーター | デフォルト | 説明 |
|---|---|---|
changelog.enabled |
false |
すべての変更イベントを保持するには true に設定します。false の場合、最終的にマージされたレコードのみが保証されます。中間変更はマージされる可能性があります。 |
追加モード
Hudi 0.10.0 以降でサポートされています。
-
MOR テーブル:小規模ファイルポリシーが適用されます。データは追加モードで Apache Avro ログファイルに書き込まれます。
-
Copy On Write (COW) テーブル:小規模ファイルポリシーは適用されません。書き込みごとに新しい Apache Parquet ファイルが作成されます。
クラスタリングパラメーター
Hudi は、INSERT モードでの小規模ファイルの蓄積を解決するためにクラスタリングをサポートしています。
インラインクラスタリング (COW テーブルのみ)
| パラメーター | デフォルト | 説明 |
|---|---|---|
write.insert.cluster |
false |
書き込み中に小規模ファイルをマージするには true に設定します。各 INSERT 操作は既存の小規模ファイルをマージしますが、重複排除は実行されず、書き込みスループットは低下します。 |
非同期クラスタリング (Hudi 0.12.0 以降)
| パラメーター | デフォルト | 説明 |
|---|---|---|
clustering.schedule.enabled |
false |
定期的にクラスタリング計画をスケジュールするには true に設定します。 |
clustering.delta_commits |
4 |
クラスタリング計画を生成するために必要なコミット数。clustering.schedule.enabled が true の場合にのみ有効です。 |
clustering.async.enabled |
false |
クラスタリング計画を定期的に非同期で実行するには true に設定します。 |
clustering.tasks |
4 |
クラスタリングタスクの並列度。 |
clustering.plan.strategy.target.file.max.bytes |
1 GiB (1,073,741,824 バイト) | クラスタリング出力のターゲット最大ファイルサイズ。 |
clustering.plan.strategy.small.file.limit |
600 |
このしきい値 (バイト単位) より小さいファイルはクラスタリングの対象となります。 |
clustering.plan.strategy.sort.columns |
— | クラスタリング中にデータをソートするために使用される列。 |
クラスタリング計画戦略
| パラメーター | デフォルト | 説明 |
|---|---|---|
clustering.plan.partition.filter.mode |
NONE |
パーティションフィルターモード。有効な値:NONE (すべてのパーティション)、RECENT_DAYS (過去 N 日間のパーティション)、SELECTED_PARTITIONS (特定のパーティション)。 |
clustering.plan.strategy.daybased.lookback.partitions |
2 |
クラスタリングのためにパーティションを選択する最近の日数。filter.mode が RECENT_DAYS の場合にのみ有効です。 |
clustering.plan.strategy.cluster.begin.partition |
— | 範囲フィルタリングの開始パーティション。filter.mode が SELECTED_PARTITIONS の場合にのみ有効です。 |
clustering.plan.strategy.cluster.end.partition |
— | 範囲フィルタリングの終了パーティション。filter.mode が SELECTED_PARTITIONS の場合にのみ有効です。 |
clustering.plan.strategy.partition.regex.pattern |
— | パーティションを選択するための正規表現。 |
clustering.plan.strategy.partition.selected |
— | 選択されたパーティションのカンマ区切りリスト。 |
インデックスタイプの選択
Hudi は 2 つのインデックスタイプをサポートしています。この表を使用して、ワークロードに適したものを選択してください。
| ディメンション | FLINK_STATE | BUCKET |
|---|---|---|
| ストレージ/コンピュートオーバーヘッド | あり (状態バックエンド) | なし |
| パフォーマンス | 状態バックエンドに依存 | 優れている (状態オーバーヘッドなし) |
| ファイルグループの柔軟性 | ファイルサイズに基づいてレコードを動的に割り当てる | 固定バケット数 (初期構成後に増やすことはできない) |
| パーティション間変更 | サポート | サポートされていない (例外:Change Data Capture (CDC) ストリーミング入力) |
| 使用するタイミング | 5 億レコード未満のテーブル、またはパーティション間の更新が必要なワークロード | 5 億レコードを超え、状態オーバーヘッドがボトルネックとなるテーブル |
index.typeがBUCKETに設定されている場合、index.global.enabled=trueを設定しても効果はありません。バケットインデックスはパーティション間の重複排除をサポートしていません。
| パラメーター | デフォルト | 説明 |
|---|---|---|
index.type |
FLINK_STATE |
インデックスタイプ。有効な値:FLINK_STATE、BUCKET。 |
hoodie.bucket.index.hash.field |
プライマリキー | バケットインデックスのハッシュキーフィールド。プライマリキーのサブセットにすることができます。 |
hoodie.bucket.index.num.buckets |
4 |
パーティションごとのバケット数。テーブル作成後に変更することはできません。 |
バケットインデックスのパラメーターは Hudi 0.11.0 以降でサポートされています。
データ読み取りパラメーター
Hudi は同じパラメーターセットを使用して 3 つの読み取りパターンをサポートします。
| パターン | 構成 |
|---|---|
| ストリーミング読み取り | read.streaming.enabled=true を設定し、オプションで read.start-commit |
| 増分バッチ読み取り | read.start-commit と read.end-commit の両方を設定します。間隔は閉区間です (両端を含む)。 |
| タイムトラベル | read.end-commit のみを設定します。その特定のコミットのスナップショットを読み取ります。 |
ストリーミング読み取りパラメーター
デフォルトでは、Hudi テーブルの読み取りはスナップショット読み取りを使用します。最新の完全なスナップショットが一度に返されます。read.streaming.enabled=true を設定して、ストリーミング読み取りに切り替えます。
| パラメーター | デフォルト | 説明 |
|---|---|---|
read.streaming.enabled |
false |
ストリーミング読み取りを有効にするには true に設定します。 |
read.start-commit |
(ブランク) | 開始オフセット。フォーマット:特定の時間の場合は yyyyMMddHHmmss、最初から読み取る場合は earliest。最新のコミットから開始する場合はブランクのままにします。 |
clean.retain_commits |
30 |
クリーナーによって保持される履歴コミットの最大数。この制限を超えるコミットは削除されます。たとえば、5 分のチェックポイント間隔の場合、デフォルト値の 30 は少なくとも 150 分間の変更ログを保持します。 |
変更ログのストリーミング読み取りには Hudi 0.10.0 以降が必要です。コンパクションタスクは変更ログをマージし、中間レコードを削除する可能性があり、ダウンストリームの計算に影響を与える可能性があります。
増分読み取りパラメーター
| パラメーター | デフォルト | 説明 |
|---|---|---|
read.start-commit |
最新のコミット | 読み取り範囲の開始。yyyyMMddHHmmss フォーマット。 |
read.end-commit |
最新のコミット | 読み取り範囲の終了。yyyyMMddHHmmss フォーマット。範囲は閉区間です (両端を含む)。 |
例
ソーステーブル
CREATE TEMPORARY TABLE blackhole (
id INT NOT NULL PRIMARY KEY NOT ENFORCED,
data STRING,
ts TIMESTAMP(3)
) WITH (
'connector' = 'blackhole'
);
CREATE TEMPORARY TABLE hudi_tbl (
id INT NOT NULL PRIMARY KEY NOT ENFORCED,
data STRING,
ts TIMESTAMP(3)
) WITH (
'connector' = 'hudi',
'oss.endpoint' = '<yourOSSEndpoint>',
'accessKeyId' = '${secret_values.ak_id}',
'accessKeySecret' = '${secret_values.ak_secret}',
'path' = 'oss://<yourOSSBucket>/<Custom storage directory>',
'table.type' = 'MERGE_ON_READ',
'read.streaming.enabled' = 'true'
);
-- ストリーミングモードで最新のコミットから読み取り、Blackhole に書き込みます。
INSERT INTO blackhole SELECT * FROM hudi_tbl;
シンクテーブル
CREATE TEMPORARY TABLE datagen (
id INT NOT NULL PRIMARY KEY NOT ENFORCED,
data STRING,
ts TIMESTAMP(3)
) WITH (
'connector' = 'datagen',
'rows-per-second' = '100'
);
CREATE TEMPORARY TABLE hudi_tbl (
id INT NOT NULL PRIMARY KEY NOT ENFORCED,
data STRING,
ts TIMESTAMP(3)
) WITH (
'connector' = 'hudi',
'oss.endpoint' = '<yourOSSEndpoint>',
'accessKeyId' = '${secret_values.ak_id}',
'accessKeySecret' = '${secret_values.ak_secret}',
'path' = 'oss://<yourOSSBucket>/<Custom storage directory>',
'table.type' = 'MERGE_ON_READ'
);
INSERT INTO hudi_tbl SELECT * FROM datagen;
DataStream API
DataStream API を使用するには、Realtime Compute for Apache Flink 用の DataStream コネクタを構成する必要があります。詳細については、「DataStream コネクタの設定項目」をご参照ください。
Maven 依存関係
依存関係のバージョンをご利用の VVR バージョンに合わせてください。
<properties>
<maven.compiler.source>8</maven.compiler.source>
<maven.compiler.target>8</maven.compiler.target>
<flink.version>1.15.4</flink.version>
<hudi.version>0.13.1</hudi.version>
</properties>
<dependencies>
<!-- Flink -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-common</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-api-java-bridge</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner_2.12</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<!-- Hudi -->
<dependency>
<groupId>org.apache.hudi</groupId>
<artifactId>hudi-flink1.15-bundle</artifactId>
<version>${hudi.version}</version>
<scope>provided</scope>
</dependency>
<!-- OSS -->
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
<version>3.3.2</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-aliyun</artifactId>
<version>3.3.2</version>
<scope>provided</scope>
</dependency>
<!-- DLF -->
<dependency>
<groupId>com.aliyun.datalake</groupId>
<artifactId>metastore-client-hive2</artifactId>
<version>0.2.14</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-mapreduce-client-core</artifactId>
<version>2.5.1</version>
<scope>provided</scope>
</dependency>
</dependencies>
DLF の依存関係は、オープンソースの Apache Hive バージョン (hive-common、hive-exec) と競合します。ローカルで DLF をテストするには、カスタムの hive-common および hive-exec JAR パッケージをダウンロードし、IntelliJ IDEA に手動でインポートしてください。
Hudi へのデータ書き込み
次の例では、データを OSS の Hudi MOR テーブルに書き込み、オプションでメタデータを DLF に同期します。
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.data.GenericRowData;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.data.StringData;
import org.apache.hudi.common.model.HoodieTableType;
import org.apache.hudi.configuration.FlinkOptions;
import org.apache.hudi.util.HoodiePipeline;
import java.util.HashMap;
import java.util.Map;
public class FlinkHudiQuickStart {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
String dbName = "test_db";
String tableName = "test_tbl";
String basePath = "oss://xxx";
Map<String, String> options = new HashMap<>();
// Hudi 構成
options.put(FlinkOptions.PATH.key(), basePath);
options.put(FlinkOptions.TABLE_TYPE.key(), HoodieTableType.MERGE_ON_READ.name());
options.put(FlinkOptions.PRECOMBINE_FIELD.key(), "ts");
options.put(FlinkOptions.DATABASE_NAME.key(), dbName);
options.put(FlinkOptions.TABLE_NAME.key(), tableName);
// OSS 構成
// ローカルデバッグにはパブリックエンドポイントを使用します (例:oss-cn-hangzhou.aliyuncs.com)
// クラスターへのサブミットには内部エンドポイントを使用します (例:oss-cn-hangzhou-internal.aliyuncs.com)
options.put("hadoop.fs.oss.accessKeyId", "xxx");
options.put("hadoop.fs.oss.accessKeySecret", "xxx");
options.put("hadoop.fs.oss.endpoint", "xxx");
options.put("hadoop.fs.AbstractFileSystem.oss.impl", "org.apache.hadoop.fs.aliyun.oss.OSS");
options.put("hadoop.fs.oss.impl", "org.apache.hadoop.fs.aliyun.oss.AliyunOSSFileSystem");
// DLF 構成 (オプション — DLF に同期しない場合は削除)
// ローカルデバッグにはパブリックエンドポイントを使用します (例:dlf.cn-hangzhou.aliyuncs.com)
// クラスターへのサブミットには VPC エンドポイントを使用します (例:dlf-vpc.cn-hangzhou.aliyuncs.com)
options.put(FlinkOptions.HIVE_SYNC_ENABLED.key(), "true");
options.put(FlinkOptions.HIVE_SYNC_MODE.key(), "hms");
options.put(FlinkOptions.HIVE_SYNC_DB.key(), dbName);
options.put(FlinkOptions.HIVE_SYNC_TABLE.key(), tableName);
options.put("hadoop.dlf.catalog.id", "xxx");
options.put("hadoop.dlf.catalog.accessKeyId", "xxx");
options.put("hadoop.dlf.catalog.accessKeySecret", "xxx");
options.put("hadoop.dlf.catalog.region", "xxx");
options.put("hadoop.dlf.catalog.endpoint", "xxx");
options.put("hadoop.hive.imetastoreclient.factory.class",
"com.aliyun.datalake.metastore.hive2.DlfMetaStoreClientFactory");
DataStream<RowData> dataStream = env.fromElements(
GenericRowData.of(StringData.fromString("id1"), StringData.fromString("name1"), 22,
StringData.fromString("1001"), StringData.fromString("p1")),
GenericRowData.of(StringData.fromString("id2"), StringData.fromString("name2"), 32,
StringData.fromString("1002"), StringData.fromString("p2"))
);
HoodiePipeline.Builder builder = HoodiePipeline.builder(tableName)
.column("uuid string")
.column("name string")
.column("age int")
.column("ts string")
.column("`partition` string")
.pk("uuid")
.partition("partition")
.options(options);
// 2番目のパラメーター:入力ストリームが有界かどうか (true = バッチ、false = ストリーミング)
builder.sink(dataStream, false);
env.execute("Flink_Hudi_Quick_Start");
}
}