最適な結果を得るには、Paimon Catalog とともに Paimon コネクタを使用してください。本トピックでは、ストリーミングデータレイクハウス向けに Paimon コネクタを活用する方法について説明します。
背景情報
Apache Paimon は、ストリーミング処理とバッチ処理を統合したデータレイクストレージフォーマットです。高スループットでの書き込みと低遅延でのクエリをサポートしています。Alibaba Cloud のオープンソースビッグデータプラットフォーム E-MapReduce 上で動作する主要なコンピュートエンジン(Flink、Spark、Hive、Trino)は、Paimon と良好に連携します。また、HDFS やクラウドベースの Object Storage Service 上で、独自のデータレイクストレージサービスを迅速に構築できます。その後、対応するコンピュートエンジンを接続して、データレイクの分析を実行できます。詳細については、「Apache Paimon」をご参照ください。
カテゴリ | 詳細 |
サポートされているタイプ | ソーステーブル、ディメンションテーブル、および結果テーブル(データインジェスト先) |
実行モード | ストリーミングモードおよびバッチモード |
データフォーマット | 非対応 |
固有の監視メトリクス | なし |
API 種別 | SQL および YAML ベースのデータインジェストジョブ |
結果テーブルへのデータ更新および削除の対応 | はい |
主な特徴
Apache Paimon は以下のコア機能を提供します。
HDFS または Object Storage Service 上で、軽量かつ低コストのデータレイクストレージサービスを構築します。
ストリーミングモードおよびバッチモードの両方で、大規模なデータセットの読み書きを行います。
数分から数秒単位のデータ新鮮度(freshness)で、バッチおよび OLAP クエリを実行します。
増分データのインジェストおよび生成を実現し、従来のオフラインデータウェアハウスおよび現代的なストリーミングデータウェアハウスのすべてのレイヤーにおけるストレージとして機能します。
事前集約により、ストレージコストおよび下流の計算負荷を削減します。
履歴バージョンへのロールバックが可能です。
データを効率的にフィルター処理できます。
スキーマ進化をサポートします。
使用制限および推奨事項
Paimon コネクタは、Ververica Runtime (VVR) バージョン 6.0.6 以降でのみサポートされます。
以下の表は、Paimon コミュニティ版と Realtime Compute for Apache Flink エンジン(VVR)のバージョン間の対応関係を示しています。
Apache Paimon バージョン
VVR バージョン
1.3
11.4
1.2
11.2 および 11.3
1.1
11
1.0
8.0.11
0.9
8.0.7、8.0.8、8.0.9、および 8.0.10
0.8
8.0.6
0.7
8.0.5
0.6
8.0.4
0.6
8.0.3
同時書き込み時のストレージ推奨事項
複数のジョブが同一の Paimon テーブルに対して同時に更新を行う場合、標準の OSS ストレージ(oss://)では、ファイル操作の原子性制限により、まれにコミット競合やジョブ失敗が発生する可能性があります。
安定的で継続的な書き込みを確保するには、強力な原子性を保証するメタデータまたはストレージサービスを使用してください。Data Lake Formation (DLF) の使用を推奨します。Data Lake Formation (DLF) は、Paimon メタデータとストレージ管理を統合します。または、OSS-HDFS または HDFS を使用してください。
パラメーター設定の有効化
Paimon テーブルの構成パラメーターを変更した後は、関連するジョブを再起動して、新しい設定を有効化してください。実行中のジョブは、そのような変更を動的に検出・読み込むことはできません。
パーティション削除後の物理クリーンアップ遅延
DROP PARTITIONを実行しても、基盤となる物理データファイルは即座に削除されません。
この操作は論理削除のみを実行します。Paimon は、最新のスナップショットから該当パーティションのメタデータを削除します。Paimon はタイムトラベルをサポートしているため、履歴スナップショットは引き続き該当パーティションのデータファイルを参照します。物理ファイルは、該当パーティションを参照するすべてのスナップショットが有効期限切れとなり、Paimon のスナップショット有効期限切れメカニズムによってクリーンアップされた後にのみ削除されます。
SQL
SQL ジョブにおいて、Paimon コネクタをソーステーブルまたは結果テーブルとして使用できます。
構文
Paimon Catalog 内で Paimon テーブルを作成する場合は、
connectorパラメーターを省略します。構文は以下のとおりです。CREATE TABLE `<YOUR-PAIMON-CATALOG>`.`<YOUR-DB>`.paimon_table ( id BIGINT, data STRING, PRIMARY KEY (id) NOT ENFORCED ) WITH ( ... );説明Paimon Catalog 内に既に Paimon テーブルを作成済みの場合は、再作成せずにそのままご利用ください。
他のカタログ内で Paimon の一時テーブルを作成する場合は、connector パラメーターおよび Paimon テーブルファイルのパスを指定します。構文は以下のとおりです。
CREATE TEMPORARY TABLE paimon_table ( id BIGINT, data STRING, PRIMARY KEY (id) NOT ENFORCED ) WITH ( 'connector' = 'paimon', 'path' = '<path-to-paimon-table-files>', 'auto-create' = 'true', -- パスに Paimon テーブルデータが存在しない場合、自動的にファイルを作成します。 ... );説明パスの例:
'path' = 'oss://<bucket>/test/order.db/orders'。.dbサフィックスを省略しないでください。 Paimon は、このサフィックスを使用してデータベースを識別します。同一テーブルへ書き込みを行う複数のジョブは、同じパスを共有する必要があります。
パスが異なる場合、たとえ物理的な場所が同一であっても、別のテーブルと見なされます。たとえば、
oss://b/testとoss://b/test/は末尾のスラッシュのみが異なりますが、異なるテーブルを指します。カタログ構成が不一致の場合、同時書き込み競合、コンパクション失敗、またはデータ損失が発生する可能性があります。
WITH パラメーター
パラメーター | 説明 | データの型 | 必須 | デフォルト値 | 備考 |
connector | テーブル種別。 | String | いいえ | なし |
|
path | テーブルのストレージパス。 | String | いいえ | なし |
|
auto-create | Paimon の一時テーブルを作成する際に、指定されたパスに Paimon テーブルファイルが存在しない場合、自動的にファイルを作成します。 | Boolean | いいえ | false | 有効な値:
|
file.format | テーブル内のデータファイルのストレージクラス。 | String | いいえ | parquet | 有効な値:
|
bucket | パーティションあたりのバケット数。 | Integer | いいえ | 1 | Paimon テーブルへの書き込みデータは、 説明 バケットサイズは 5 GB 未満を推奨します。 |
bucket-key | バケットキー列。 | String | いいえ | なし | データがバケット間でどのように分散されるかを決定する列を指定します。 列名は英語のカンマ(,)で区切ります。たとえば、 説明
|
changelog-producer | 増分データを生成するメカニズム。 | String | いいえ | none | Paimon は、任意の入力データストリームに対して、完全なチェンジログ(update_before および update_after レコードが一致)を生成できます。これにより、下流のコンシューマーが更新を正しく処理できるようになります。有効な値:
チェンジログプロデューサーの選択に関するガイドラインについては、「チェンジログプロデューサー」をご参照ください。 |
full-compaction.delta-commits | フルコンパクション間の最大間隔。 | Integer | いいえ | なし | このパラメーターは、フルコンパクションを実行するまでのスナップショットコミット数を設定します。 |
lookup.cache-max-memory-size | Paimon ディメンションテーブルのメモリキャッシュサイズ。 | String | いいえ | 256 MB | この設定は、ディメンションテーブルのキャッシュサイズおよび lookup changelog-producer のキャッシュサイズの両方に適用されます。 |
merge-engine | 同一のプライマリキーを持つ行をマージするメカニズム。 | String | いいえ | deduplicate | 有効な値:
マージエンジンの詳細については、「マージエンジン」をご参照ください。 |
partial-update.ignore-delete | delete(-D)メッセージを無視するかどうか。 | Boolean | いいえ | false | 有効な値:
説明
|
ignore-delete | delete(-D)メッセージを無視するかどうか。 | Boolean | いいえ | false | partial-update.ignore-delete と同じ有効な値です。 説明
|
partition.default-name | デフォルトのパーティション名。 | String | いいえ | __DEFAULT_PARTITION__ | パーティション列の値が null または空文字列の場合に使用されるパーティション名です。 |
partition.expiration-check-interval | 期限切れパーティションをチェックする頻度。 | String | いいえ | 1h | 詳細については、「自動パーティション有効期限切れの設定方法」をご参照ください。 |
partition.expiration-time | パーティションの保持期間。 | String | いいえ | なし | この期間経過後にパーティションが有効期限切れとなります。デフォルトでは、パーティションは有効期限切れになりません。 期間はパーティション値から算出されます。詳細については、「自動パーティション有効期限切れの設定方法」をご参照ください。 |
partition.timestamp-formatter | タイム文字列をフォーマット済みのタイムスタンプに変換します。 | String | いいえ | なし | パーティション値からパーティションの経過時間を抽出するために使用されるフォーマットを指定します。詳細については、「自動パーティション有効期限切れの設定方法」をご参照ください。 |
partition.timestamp-pattern | パーティション値をタイムスタンプ文字列に変換するためのフォーマット文字列。 | String | いいえ | なし | パーティション値からパーティションの経過時間を抽出するために使用されるフォーマットを指定します。詳細については、「自動パーティション有効期限切れの設定方法」をご参照ください。 |
scan.bounded.watermark | Paimon ソーステーブルの Watermark がこの値を超えると、読み取りを停止します。 | Long | いいえ | なし | なし。 |
scan.mode | Paimon ソーステーブルのコンシューマオフセット。 | String | いいえ | default | 詳細については、「Paimon ソーステーブルのコンシューマオフセットの設定方法」をご参照ください。 |
scan.snapshot-id | 読み取り開始対象のスナップショット ID。 | Integer | いいえ | なし | 詳細については、「Paimon ソーステーブルのコンシューマオフセットの設定方法」をご参照ください。 |
scan.timestamp-millis | 読み取り開始対象のタイムスタンプ。 | Integer | いいえ | なし | 詳細については、「Paimon ソーステーブルのコンシューマオフセットの設定方法」をご参照ください。 |
snapshot.num-retained.max | 保持する最新のスナップショットの最大数。 | Integer | いいえ | 2147483647 | スナップショットは、この制限または snapshot.time-retained のいずれかに該当し、かつ snapshot.num-retained.min を満たす場合に有効期限切れとなります。 |
snapshot.num-retained.min | 保持する最新のスナップショットの最小数。 | Integer | いいえ | 10 | なし。 |
snapshot.time-retained | スナップショットの有効期限までの時間。 | String | いいえ | 1h | スナップショットは、この制限または snapshot.num-retained.max のいずれかに該当し、かつ snapshot.num-retained.min を満たす場合に有効期限切れとなります。 |
write-mode | Paimon テーブルの書き込みモード。 | String | いいえ | change-log | 有効な値:
詳細については、「書き込みモード」をご参照ください。 |
scan.infer-parallelism | Paimon ソーステーブルの並列度を自動的に推論するかどうか。 | Boolean | いいえ | true | 有効な値:
|
scan.parallelism | Paimon ソーステーブルの並列度。 | Integer | いいえ | なし | 説明 このパラメーターは、 タブで リソース割り当てモード がエキスパートモードに設定されている場合、効果がありません。 |
sink.parallelism | Paimon 結果テーブルの並列度。 | Integer | いいえ | なし | 説明 このパラメーターは、 タブで リソース割り当てモード がエキスパートモードに設定されている場合、効果がありません。 |
sink.clustering.by-columns | Paimon 結果テーブルのクラスタリング列。 | String | いいえ | なし | Paimon の append-only テーブル(プライマリキーを持たないテーブル) の場合、バッチジョブでこのパラメーターを設定してクラスタリング書き込みを有効化できます。クラスタリングは、指定された列の値範囲に基づいてデータをグループ化することで、クエリ速度を向上させます。 列名はカンマ(,)で区切ります。例: 詳細については、「Apache Paimon ドキュメント」をご参照ください。 |
sink.delete-strategy | retract(-D/-U)メッセージを正しく処理するための検証戦略。 | Enum | いいえ | NONE | 有効な戦略および必要な sink 動作:
説明
|
その他の構成オプションについては、「Apache Paimon ドキュメント」をご参照ください。
機能の詳細
データ新鮮度および一貫性保証
Paimon 結果テーブルは、Flink ジョブの各チェックポイントで二相コミットプロトコルを使用してデータをコミットします。そのため、データ新鮮度は Flink ジョブのチェックポイント間隔に対応します。各コミットでは、最大で 2 つのスナップショットが生成されます。
2 つの Flink ジョブが同一の Paimon テーブルに同時に書き込む場合、データが異なるバケットに書き込まれていれば直列化可能(serializable)な一貫性が保証されます。一方、同一のバケットに書き込まれている場合、スナップショット隔離(snapshot isolation)の一貫性しか保証されません。つまり、テーブルのデータは両方のジョブの結果が混在する可能性がありますが、データ損失は発生しません。
マージエンジン
Paimon 結果テーブルが同一のプライマリキーを持つ複数の行を受信した場合、一意性を保つためにそれらを 1 行にマージします。merge-engine パラメーターを設定することで、マージの動作を制御できます。以下の表に各オプションを示します。
マージエンジン | 詳細 |
Deduplicate | Deduplicate はデフォルトのマージエンジンです。同一のプライマリキーを持つ行に対して、Paimon 結果テーブルは最も新しい行のみを保持し、残りを破棄します。 説明 最も新しい行が delete メッセージである場合、該当するプライマリキーを持つすべての行が破棄されます。 |
Partial update | Partial update を使用すると、複数のメッセージにわたってデータを段階的に更新できます。同一のプライマリキーを持つ新しい行は既存の行を上書きしますが、null の列は変更されません。 たとえば、Paimon 結果テーブルが以下の順序で行を受信したとします。
最初の列がプライマリキーである場合、最終的な結果は <1, 25.2, 10, 'This is a book'> となります。 説明
|
Aggregation | 一部のケースでは、集計値のみが必要になることがあります。Aggregation を使用すると、Paimon は指定された集計関数を用いて、同一のプライマリキーを持つ行をマージします。各プライマリキー以外の列に対しては、 price 列は max で集計され、sales 列は sum で集計されます。入力が <1, 23.0, 15> および <1, 30.2, 20> の場合、結果は <1, 30.2, 35> となります。サポートされる集計関数およびデータ型:
説明
|
増分データ生成メカニズム
changelog-producer パラメーターを設定することで、任意の入力データストリームに対して完全なチェンジログ(update_before および update_after レコードが一致)を生成できます。以下の表に、利用可能なすべてのチェンジログプロデューサーを示します。詳細については、「Apache Paimon ドキュメント」をご参照ください。
メカニズム | 詳細 |
None |
たとえば、下流のコンシューマーが列の合計を計算するとします。最新の値 5 のみが表示される場合、合計をどう更新すべきか判断できません。前の値が 4 であれば 1 を加算すべきですが、6 であれば 1 を減算すべきです。update_before データに敏感なコンシューマーは、none を避けるべきです。ただし、他のプロデューサーはパフォーマンスオーバーヘッドを伴います。 説明 下流のコンシューマー(たとえばデータベース)が update_before データを必要としない場合、none は許容されます。ご自身の要件に応じて選択してください。 |
Input |
したがって、このオプションは、入力ストリーム自体が CDC データなど完全なチェンジログである場合にのみ使用してください。 |
Lookup |
Full Compaction と比較して、Lookup はより新鮮なチェンジログを提供しますが、より多くのリソースを消費します。 分単位の更新など、高い新鮮度を要求する増分データにこの機能を推奨します。 |
Full Compaction |
Lookup と比較して、Full Compaction は新鮮度の低いチェンジログを提供しますが、既存のコンパクション作業に合わせて実行されるため、リソース使用量は少なくなります。 増分データの新鮮度要件が厳しくない場合(たとえば、時間単位の更新で十分な場合)にこのアプローチを推奨します。 |
書き込みモード
Paimon テーブルは以下の書き込みモードをサポートします。
モード | 詳細 |
Change-log | Change-log はデフォルトの書き込みモードです。プライマリキーに基づく insert、delete、update 操作をサポートします。また、このモードではマージエンジンおよびチェンジログプロデューサーも使用できます。 |
Append-only | Append-only モードは insert のみをサポートし、プライマリキーを使用しません。change-log モードよりも効率的です。中程度のデータ新鮮度(たとえば分単位)で十分な場合のメッセージキューの代替としてご利用ください。 詳細については、「Apache Paimon ドキュメント」をご参照ください。Append-only モードを使用する際は、以下の点にご注意ください。
|
CTAS および CDAS の対象
Paimon テーブルは、単一テーブルまたはデータベース全体のリアルタイム同期をサポートします。上流テーブルのスキーマ変更は、リアルタイムで Paimon テーブルに同期されます。詳細については、「Paimon テーブルの管理」および「Paimon Catalog の管理」をご参照ください。
データインジェスト
YAML ベースのデータインジェストジョブにおいて、Paimon コネクタを結果テーブルとして使用できます。
構文
sink:
type: paimon
name: Paimon Sink
catalog.properties.metastore: filesystem
catalog.properties.warehouse: /path/warehouse構成オプション
パラメーター | 説明 | 必須 | データの型 | デフォルト値 | 備考 |
type | コネクタ種別。 | はい | STRING | なし | 固定値: |
name | シンク名。 | いいえ | STRING | なし | シンクの名前。 |
catalog.properties.metastore | Paimon Catalog の種別。 | いいえ | STRING | filesystem | 有効な値:
|
catalog.properties.* | Paimon Catalog を作成するためのパラメーター。 | いいえ | STRING | なし | 詳細については、「Paimon Catalog の管理」をご参照ください。 |
table.properties.* | Paimon テーブルを作成するためのパラメーター。 | いいえ | STRING | なし | 詳細については、「Paimon テーブルのオプション」をご参照ください。 |
catalog.properties.warehouse | ファイルストレージのルートディレクトリ。 | いいえ | STRING | なし | このパラメーターは、 |
commit.user-prefix | データファイルのコミット時に使用されるユーザー名プレフィックス。 | いいえ | STRING | なし | 説明 コミット失敗時に競合するジョブを特定できるよう、異なるジョブで異なるユーザー名を使用してください。 |
partition.key | 各パーティションテーブルのパーティションフィールド。 | いいえ | STRING | なし | テーブルは |
sink.cross-partition-upsert.tables | クロスパーティションアップサート(プライマリキーにすべてのパーティションフィールドが含まれていない)を必要とするテーブル。 | いいえ | STRING | なし | クロスパーティションアップサートを必要とするテーブルに適用されます。
重要
|
sink.commit.parallelism | コミット演算子の並列度。 | いいえ | INTEGER | なし | コミット演算子がボトルネックとなっている場合、この値を増加させることでパフォーマンスを向上させられます。 VVR 11.6 以降でのみサポートされます。 説明 このパラメーターを設定すると、演算子の並列度が変更されます。ステートフルジョブを再起動する際は、AllowNonRestoredState を有効化してください。 |
使用例
Paimon Catalog の種別に応じて、Paimon コネクタをデータインジェスト結果テーブルとして構成します。
Paimon Catalog を filesystem として使用し、Alibaba Cloud OSS に書き込む場合の構成例:
source: type: mysql name: MySQL Source hostname: ${secret_values.mysql.hostname} port: ${mysql.port} username: ${secret_values.mysql.username} password: ${secret_values.mysql.password} tables: ${mysql.source.table} server-id: 8601-8604 sink: type: paimon name: Paimon Sink catalog.properties.metastore: filesystem catalog.properties.warehouse: oss://default/test catalog.properties.fs.oss.endpoint: oss-cn-beijing-internal.aliyuncs.com catalog.properties.fs.oss.accessKeyId: xxxxxxxx catalog.properties.fs.oss.accessKeySecret: xxxxxxxxcatalog.properties で始まるパラメーターの意味については、「Paimon Filesystem Catalog の作成」をご参照ください。
REST Catalog を使用し、Alibaba Cloud Data Lake Formation に書き込む場合の例:
source: type: mysql name: MySQL Source hostname: ${secret_values.mysql.hostname} port: ${mysql.port} username: ${secret_values.mysql.username} password: ${secret_values.mysql.password} tables: ${mysql.source.table} server-id: 8601-8604 sink: type: paimon name: Paimon Sink catalog.properties.metastore: rest catalog.properties.uri: dlf_uri catalog.properties.warehouse: your_warehouse catalog.properties.token.provider: dlf # (任意)読み取りパフォーマンスを向上させるため、削除ベクターを有効化します。 table.properties.deletion-vectors.enabled: truecatalog.properties で始まるパラメーターの意味については、「Flink CDC カタログ構成パラメーター」をご参照ください。
スキーマ進化
データインジェスト結果テーブルとして Paimon を使用する場合、以下のスキーマ進化イベントをサポートします。
CREATE TABLE EVENT
ADD COLUMN EVENT
ALTER COLUMN TYPE EVENT(プライマリキー列の型変更は非対応)
RENAME COLUMN EVENT
DROP COLUMN EVENT
TRUNCATE TABLE EVENT
DROP TABLE EVENT
下流の Paimon テーブルが既に存在する場合、Paimon は既存のスキーマを使用して書き込みを行い、テーブルの再作成を試みません。
よくある質問
Paimon ジョブが「Heartbeat of TaskManager timed out」というエラーで失敗するのはなぜですか?
Paimon ジョブが「Sink materializer must not be used with Paimon sink」というエラーで失敗するのはなぜですか?
Paimon ジョブが「File deletion conflicts detected」または「LSM conflicts detected」というエラーで失敗するのはなぜですか?
Paimon ジョブが「File xxx not found, Possible causes」というエラーで失敗するのはなぜですか?