Paimon コネクタは、ストリーミングデータレイクハウスコネクタであり、Paimon カタログとともに使用できます。このトピックでは、Paimon コネクタの使用方法について説明します。
背景情報
Apache Paimon は、ストリーミング処理とバッチ処理を統合するレイクストレージフォーマットです。高スループットの書き込みと低レイテンシーのクエリをサポートします。Alibaba Cloud オープンソースビッグデータプラットフォーム E-MapReduce 上の Flink、Spark、Hive、Trino などの一般的なコンピュートエンジンは、Paimon と緊密に統合されています。Apache Paimon を使用して、Hadoop 分散ファイルシステム (HDFS) または Object Storage Service (OSS) 上にデータレイクストレージサービスを構築し、これらのコンピュートエンジンに接続してデータレイク分析を実行できます。詳細については、「Apache Paimon」をご参照ください。
カテゴリ | 詳細 |
サポートされるタイプ | ソーステーブル、ディメンションテーブル、結果テーブル、データインジェスト先 |
実行モード | ストリーミングモードとバッチモード |
データフォーマット | サポートされていません |
特定の監視メトリック | なし |
API タイプ | SQL、データインジェスト用の YAML ジョブ |
結果テーブルのデータ更新または削除をサポート | はい |
機能
Apache Paimon は、以下のコア機能を提供します:
HDFS または OSS に基づいて、低コストで軽量なデータレイクストレージサービスを構築します。
ストリーミングモードとバッチモードで大規模なデータセットを読み書きします。
秒から分単位のデータ鮮度でバッチクエリとオンライン分析処理 (OLAP) クエリを実行します。
増分データを消費および生成します。Paimon は、従来のオフラインデータウェアハウスおよび最新のストリーミングデータウェアハウスのストレージレイヤーとして使用できます。
データを事前集約して、ストレージコストと下流の計算負荷を削減します。
データの履歴バージョンを取得します。
データを効率的にフィルタリングします。
スキーマ進化をサポートします。
制限事項
Paimon コネクタは、Flink コンピュートエンジン Ververica Runtime (VVR) バージョン 6.0.6 以降でのみサポートされます。
次の表に、Paimon と VVR のバージョンマッピングを示します。
Apache Paimon バージョン
VVR バージョン
1.3
11.4
1.2
11.2、11.3
1.1
11
1.0
8.0.11
0.9
8.0.7, 8.0.8, 8.0.9, および 8.0.10
0.8
8.0.6
0.7
8.0.5
0.6
8.0.4
0.6
8.0.3
SQL
Paimon コネクタは、SQL ジョブでソーステーブルまたは結果テーブルとして使用できます。
構文
Paimon カタログで Paimon テーブルを作成する場合、
connectorパラメーターを指定する必要はありません。Paimon テーブルを作成するための構文は次のとおりです:CREATE TABLE `<YOUR-PAIMON-CATALOG>`.`<YOUR-DB>`.paimon_table ( id BIGINT, data STRING, PRIMARY KEY (id) NOT ENFORCED ) WITH ( ... );説明Paimon カタログに Paimon テーブルをすでに作成している場合は、再度作成せずに直接使用できます。
別のカタログで Paimon 一時テーブルを作成する場合は、connector パラメーターと Paimon テーブルのストレージパスを指定する必要があります。Paimon テーブルを作成するための構文は次のとおりです:
CREATE TEMPORARY TABLE paimon_table ( id BIGINT, data STRING, PRIMARY KEY (id) NOT ENFORCED ) WITH ( 'connector' = 'paimon', 'path' = '<path-to-paimon-table-files>', 'auto-create' = 'true', -- 指定されたパスに Paimon テーブルのデータファイルが存在しない場合、ファイルが自動的に作成されます。 ... );説明パスの例:
'path' = 'oss://<bucket>/test/order.db/orders'。.dbサフィックスを省略しないでください。Paimon はこの命名規則を使用してデータベースを識別します。同じテーブルに書き込む複数のジョブは、同じパス設定を使用する必要があります。
2つのパス値が異なる場合、Paimon はそれらを異なるテーブルと見なします。物理パスが同じであっても、カタログ設定が一致しないと、同時書き込み競合、コンパクション失敗、およびデータ損失が発生する可能性があります。たとえば、
oss://b/testとoss://b/test/は、物理パスが同じであっても、末尾のスラッシュのために異なるパスと見なされます。
WITH パラメーター
パラメーター | 説明 | データの型 | 必須 | デフォルト値 | 注 |
connector | テーブルタイプ。 | String | いいえ | なし |
|
path | テーブルストレージパス。 | String | いいえ | なし |
|
auto-create | Paimon 一時テーブルを作成する際に、指定されたパスに Paimon テーブルファイルが存在しない場合にファイルを自動的に作成するかどうかを指定します。 | Boolean | いいえ | false | 有効値:
|
bucket | 各パーティションのバケット数。 | Integer | いいえ | 1 | Paimon テーブルに書き込まれたデータは、 説明 各バケットのデータ量は 5 GB 未満にすることを推奨します。 |
bucket-key | バケット化のためのキー列。 | String | いいえ | なし | Paimon テーブルに書き込まれるデータを異なるバケットに分散させるための基準となる列を指定します。 列名はカンマ (,) で区切ります。例: 説明
|
changelog-producer | 増分データを生成するメカニズム。 | String | いいえ | none | Paimon は、任意の入力データストリームに対して完全な増分データ (すべての update_after データに対応する update_before データがある) を生成でき、下流のコンシューマーにとって便利です。増分データ生成メカニズムの有効値:
増分データ生成メカニズムの選択方法の詳細については、「増分データ生成メカニズム」をご参照ください。 |
full-compaction.delta-commits | フルコンパクションの最大間隔。 | Integer | いいえ | なし | このパラメーターは、フルコンパクションを実行する必要があるスナップショットコミットの数を指定します。 |
lookup.cache-max-memory-size | Paimon ディメンションテーブルのメモリキャッシュサイズ。 | String | いいえ | 256 MB | このパラメーターは、ディメンションテーブルと lookup changelog-producer の両方のキャッシュサイズに影響します。両方のメカニズムのキャッシュサイズは、このパラメーターによって設定されます。 |
merge-engine | 同じプライマリキーを持つデータをマージするメカニズム。 | String | いいえ | deduplicate | 有効値:
データマージメカニズムの詳細な分析については、「データマージメカニズム」をご参照ください。 |
partial-update.ignore-delete | 削除 (-D) メッセージを無視するかどうかを指定します。 | Boolean | いいえ | false | 有効値:
説明
|
ignore-delete | 削除 (-D) メッセージを無視するかどうかを指定します。 | Boolean | いいえ | false | 有効値は partial-update.ignore-delete と同じです。 説明
|
partition.default-name | パーティションのデフォルト名。 | String | いいえ | __DEFAULT_PARTITION__ | パーティションキー列の値が null または空の文字列の場合、このデフォルト名がパーティション名として使用されます。 |
partition.expiration-check-interval | 期限切れのパーティションをチェックする間隔。 | String | いいえ | 1h | 詳細については、「パーティションの自動有効期限切れを設定するにはどうすればよいですか?」をご参照ください。 |
partition.expiration-time | パーティションの有効期限。 | String | いいえ | なし | パーティションの生存時間がこの値を超えると、パーティションは期限切れになります。デフォルトでは、パーティションは決して期限切れになりません。 パーティションの生存時間は、そのパーティション値から計算されます。詳細については、「パーティションの自動有効期限切れを設定するにはどうすればよいですか?」をご参照ください。 |
partition.timestamp-formatter | 時間文字列を UNIX タイムスタンプに変換するためのフォーマット文字列。 | String | いいえ | なし | パーティション値からパーティションの生存時間を抽出するためのフォーマットを設定します。詳細については、「パーティションの自動有効期限切れを設定するにはどうすればよいですか?」をご参照ください。 |
partition.timestamp-pattern | パーティション値を時間文字列に変換するためのフォーマット文字列。 | String | いいえ | なし | パーティション値からパーティションの生存時間を抽出するためのフォーマットを設定します。詳細については、「パーティションの自動有効期限切れを設定するにはどうすればよいですか?」をご参照ください。 |
scan.bounded.watermark | Paimon ソーステーブルによって生成されたデータのウォーターマークがこの値を超えると、Paimon ソーステーブルはデータの生成を停止します。 | Long | いいえ | なし | なし。 |
scan.mode | Paimon ソーステーブルのコンシューマオフセットを指定します。 | String | いいえ | default | 詳細については、「Paimon ソーステーブルのコンシューマオフセットを設定するにはどうすればよいですか?」をご参照ください。 |
scan.snapshot-id | Paimon ソーステーブルが消費を開始するスナップショットを指定します。 | Integer | いいえ | なし | 詳細については、「Paimon ソーステーブルのコンシューマオフセットを設定するにはどうすればよいですか?」をご参照ください。 |
scan.timestamp-millis | Paimon ソーステーブルが消費を開始する時点を指定します。 | Integer | いいえ | なし | 詳細については、「Paimon ソーステーブルのコンシューマオフセットを設定するにはどうすればよいですか?」をご参照ください。 |
snapshot.num-retained.max | 期限切れにならずに保持する最新のスナップショットの最大数。 | Integer | いいえ | 2147483647 | この設定または `snapshot.time-retained` が満たされ、かつ `snapshot.num-retained.min` も満たされた場合、スナップショットは期限切れになります。 |
snapshot.num-retained.min | 期限切れにならずに保持する最新のスナップショットの最小数。 | Integer | いいえ | 10 | なし。 |
snapshot.time-retained | スナップショットが期限切れになるまでの期間。 | String | いいえ | 1h | この設定または `snapshot.num-retained.max` が満たされ、かつ `snapshot.num-retained.min` も満たされた場合、スナップショットは期限切れになります。 |
write-mode | Paimon テーブルの書き込みモード。 | String | いいえ | change-log | 有効値:
書き込みモードの詳細については、「書き込みモード」をご参照ください。 |
scan.infer-parallelism | Paimon ソーステーブルの並列度を自動的に推論するかどうかを指定します。 | Boolean | いいえ | true | 有効値:
|
scan.parallelism | Paimon ソーステーブルの並列度。 | Integer | いいえ | なし | 説明 タブで、リソースモード が [エキスパート] に設定されている場合、このパラメーターは有効になりません。 |
sink.parallelism | Paimon 結果テーブルの並列度。 | Integer | いいえ | なし | 説明 タブで、リソースモード が [エキスパート] に設定されている場合、このパラメーターは有効になりません。 |
sink.clustering.by-columns | Paimon 結果テーブルへの書き込みのためのクラスタリング列を指定します。 | String | いいえ | なし | Paimon append-only テーブル (非プライマリキーテーブル) の場合、バッチジョブでこのパラメーターを設定すると、クラスタリング書き込み機能が有効になります。この機能は、特定列のデータ値の範囲に基づいてクラスタリングを行い、テーブルのクエリ速度を向上させます。 複数の列名はカンマ (,) で区切ります。例: クラスタリングの詳細については、「Apache Paimon 公式ドキュメント」をご参照ください。 |
sink.delete-strategy | システムがリトラクション (-D/-U) メッセージを正しく処理することを保証するための検証ポリシーを設定します。 | Enum | いいえ | NONE | 検証ポリシーの有効値と、リトラクションメッセージを処理するシンクオペレーターの期待される動作は次のとおりです:
説明
|
設定項目の詳細については、「Apache Paimon 公式ドキュメント」をご参照ください。
機能詳細
データ鮮度と一貫性の保証
Paimon 結果テーブルは、2フェーズコミットプロトコルを使用して、Flink ジョブの各チェックポイント中に書き込まれたデータをコミットします。したがって、データ鮮度は Flink ジョブのチェックポイント間隔と同じです。各コミットは最大で2つのスナップショットを生成します。
2つの Flink ジョブが同時に同じ Paimon テーブルに書き込む場合、2つのジョブのデータが同じバケットに書き込まれなければ、シリアライズ可能な一貫性が保証されます。2つのジョブのデータが同じバケットに書き込まれる場合、スナップショット分離の一貫性のみが保証されます。これは、テーブル内のデータが両方のジョブの結果の混合になる可能性があることを意味しますが、データは失われません。
データマージメカニズム
Paimon 結果テーブルが同じプライマリキーを持つ複数のレコードを受信した場合、これらのレコードを単一のレコードにマージして、プライマリキーの一意性を維持します。merge-engine パラメーターを設定することで、データマージの動作を指定できます。次の表に、データマージメカニズムを示します。
マージメカニズム | 詳細 |
重複排除 | 重複排除メカニズムは、デフォルトのデータマージメカニズムです。同じプライマリキーを持つ複数のレコードに対して、Paimon 結果テーブルは最新のレコードのみを保持し、その他は破棄します。 説明 最新のレコードが削除メッセージの場合、そのプライマリキーを持つすべてのレコードが破棄されます。 |
部分更新 | partial-update メカニズムを指定することで、複数のメッセージを通じてデータを段階的に更新し、最終的に完全なデータを取得できます。具体的には、同じプライマリキーを持つ新しいデータは古いデータを上書きしますが、null 値を持つ列は上書きされません。 たとえば、Paimon 結果テーブルが次の3つのレコードを順に受信するとします:
最初の列はプライマリキーです。最終結果は <1, 25.2, 10, 'This is a book'> となります。 説明
|
集約 | 一部のシナリオでは、集約された値のみに関心がある場合があります。集約メカニズムは、指定した集計関数に基づいて同じプライマリキーを持つデータを集約します。プライマリキーの一部ではない各列について、 price 列は max 関数で集約され、sales 列は sum 関数で集約されます。2つの入力レコード <1, 23.0, 15> と <1, 30.2, 20> が与えられた場合、最終結果は <1, 30.2, 35> となります。現在サポートされている集計関数とそれに対応するデータ型は次のとおりです:
説明
|
増分データ生成メカニズム
changelog-producer パラメーターを適切な増分データ生成メカニズムに設定することで、Paimon は任意の入力データストリームに対して完全な増分データを生成できます。完全な増分データとは、すべての update_after データに対応する update_before データがあることを意味します。以下にすべての増分データ生成メカニズムをリストします。詳細については、「Apache Paimon 公式ドキュメント」をご参照ください。
メカニズム | 詳細 |
None |
たとえば、下流のコンシューマーが列の合計を計算する必要があるとします。コンシューマーが最新のデータである 5 しか見ない場合、合計をどのように更新すればよいか判断できません。以前のデータが 4 であれば、合計を 1 増やすべきです。以前のデータが 6 であれば、合計を 1 減らすべきです。このようなコンシューマーは update_before データに敏感です。増分データ生成メカニズムを `none` に設定しないことを推奨します。ただし、他のメカニズムはパフォーマンスのオーバーヘッドを伴います。 説明 データベースなどの下流のコンシューマーが update_before データに敏感でない場合は、増分データ生成メカニズムを `none` に設定できます。したがって、実際のニーズに基づいてメカニズムを設定することを推奨します。 |
Input |
したがって、このメカニズムは、入力データストリーム自体が変更データキャプチャ (CDC) データなどの完全な増分データである場合にのみ使用できます。 |
Lookup |
以下で説明する Full Compaction メカニズムと比較して、Lookup メカニズムは増分データの生成においてより良いタイムラインを提供しますが、全体的により多くのリソースを消費します。 分単位の鮮度など、データ鮮度に対する要件が高い場合にこのメカニズムを使用することを推奨します。 |
Full Compaction |
Lookup メカニズムと比較して、Full Compaction メカニズムは増分データの生成におけるタイムラインが劣ります。しかし、データのフルコンパクションプロセスを利用するため、追加の計算を生成せず、全体的により少ないリソースを消費します。 時間単位の鮮度など、データ鮮度に対する要件が高くない場合にこのメカニズムを使用することを推奨します。 |
書き込みモード
Paimon テーブルは現在、以下の書き込みモードをサポートしています。
モード | 詳細 |
Change-log | change-log 書き込みモードは、Paimon テーブルのデフォルトの書き込みモードです。このモードは、プライマリキーに基づいてデータの挿入、削除、更新をサポートします。また、このモードでは前述のデータマージおよび増分データ生成メカニズムも使用できます。 |
Append-only | append-only 書き込みモードは、データの挿入のみをサポートし、プライマリキーをサポートしません。このモードは change-log モードよりも効率的であり、分単位の鮮度など、データ鮮度の要件が高くないシナリオでメッセージキューの代替として使用できます。 append-only 書き込みモードの詳細については、「Apache Paimon 公式ドキュメント」をご参照ください。append-only 書き込みモードを使用する際は、次の点に注意してください:
|
CTAS および CDAS の宛先として
Paimon テーブルは、単一テーブルまたはデータベース全体のレベルでデータのリアルタイム同期をサポートします。同期中に上流テーブルのスキーマが変更された場合、その変更は Paimon テーブルにもリアルタイムで同期されます。詳細については、「Paimon テーブルの管理」および「Paimon カタログの管理」をご参照ください。
データインジェスト
Paimon コネクタは、データインジェストのための YAML ジョブ開発でシンクとして使用できます。
構文
sink:
type: paimon
name: Paimon Sink
catalog.properties.metastore: filesystem
catalog.properties.warehouse: /path/warehouse設定項目
パラメーター | 説明 | 必須 | データの型 | デフォルト値 | 注 |
type | コネクタタイプ。 | はい | STRING | なし | 静的フィールドは |
name | シンク名。 | いいえ | STRING | なし | シンクの名前。 |
catalog.properties.metastore | Paimon カタログのタイプ。 | いいえ | STRING | filesystem | 有効値:
|
catalog.properties.* | Paimon カタログを作成するためのパラメーター。 | いいえ | STRING | なし | 詳細については、「Paimon カタログの管理」をご参照ください。 |
table.properties.* | Paimon テーブルを作成するためのパラメーター。 | いいえ | STRING | なし | 詳細については、「Paimon テーブルオプション」をご参照ください。 |
catalog.properties.warehouse | ファイルストレージのルートディレクトリ。 | いいえ | STRING | なし | このパラメーターは、 |
commit.user-prefix | データファイルをコミットするためのユーザー名プレフィックス。 | いいえ | STRING | なし | 説明 コミット競合が発生した際に競合するジョブを容易に特定できるよう、異なるジョブには異なるユーザー名を設定することを推奨します。 |
partition.key | 各パーティションテーブルのパーティションフィールド。 | いいえ | STRING | なし | 異なるテーブルはセミコロン ( |
sink.cross-partition-upsert.tables | プライマリキーにすべてのパーティションフィールドが含まれていない、パーティション間の更新が必要なテーブルを指定します。 | いいえ | STRING | なし | パーティション間で更新するテーブル。
重要
|
例
以下の例は、Paimon カタログのタイプに基づいて、Paimon をデータインジェストシンクとして設定する方法を示しています。
Paimon カタログが filesystem の場合に Alibaba Cloud OSS に書き込むための設定例:
source: type: mysql name: MySQL Source hostname: ${secret_values.mysql.hostname} port: ${mysql.port} username: ${secret_values.mysql.username} password: ${secret_values.mysql.password} tables: ${mysql.source.table} server-id: 8601-8604 sink: type: paimon name: Paimon Sink catalog.properties.metastore: filesystem catalog.properties.warehouse: oss://default/test catalog.properties.fs.oss.endpoint: oss-cn-beijing-internal.aliyuncs.com catalog.properties.fs.oss.accessKeyId: xxxxxxxx catalog.properties.fs.oss.accessKeySecret: xxxxxxxxcatalog.properties で始まるパラメーターの詳細については、「Paimon Filesystem カタログの作成」をご参照ください。
Paimon カタログが rest の場合に Alibaba Cloud DLF 2.5 に書き込むための設定例:
source: type: mysql name: MySQL Source hostname: ${secret_values.mysql.hostname} port: ${mysql.port} username: ${secret_values.mysql.username} password: ${secret_values.mysql.password} tables: ${mysql.source.table} server-id: 8601-8604 sink: type: paimon name: Paimon Sink catalog.properties.metastore: rest catalog.properties.uri: dlf_uri catalog.properties.warehouse: your_warehouse catalog.properties.token.provider: dlfcatalog.properties で始まるパラメーターの詳細については、「Flink CDC カタログ設定パラメーター」をご参照ください。
スキーマ進化
現在、データインジェストシンクとしての Paimon は、以下のスキーマ進化イベントをサポートしています:
CREATE TABLE EVENT
ADD COLUMN EVENT
ALTER COLUMN TYPE EVENT (プライマリキー列の型変更はサポートされていません)
RENAME COLUMN EVENT
DROP COLUMN EVENT
TRUNCATE TABLE EVENT
DROP TABLE EVENT
下流の Paimon テーブルが既に存在する場合、その既存のスキーマが書き込みに使用されます。システムはテーブルを再度作成しようとはしません。