Paimon Catalog とともに、ストリーミングデータレイクハウスコネクタである Paimon コネクタを使用できます。このトピックでは、Paimon コネクタの使用方法について説明します。
背景情報
Apache Paimon は、ストリーミング処理とバッチ処理を統合するレイクストレージフォーマットです。高スループットでの書き込みと低遅延でのクエリをサポートしています。Alibaba Cloud のオープンソースビッグデータプラットフォーム E-MapReduce 上で一般的に使用される Flink、Spark、Hive、Trino などのコンピュートエンジンは、Apache Paimon と緊密に統合されています。Apache Paimon を使用して、Hadoop 分散ファイルシステム (HDFS) または Object Storage Service (OSS) 上にデータレイクストレージサービスを構築し、これらのコンピュートエンジンに接続してデータレイク分析を実行できます。詳細については、「Apache Paimon」をご参照ください。
カテゴリ | 詳細 |
サポートされるタイプ | ソーステーブル、ディメンションテーブル、結果テーブル、データインジェスト先 |
実行モード | ストリーミングモードおよびバッチモード |
データフォーマット | 未対応 |
特定の監視メトリック | なし |
API タイプ | SQL、YAML データインジェストジョブ |
結果テーブルでのデータ更新または削除のサポート | はい |
機能
Apache Paimon は、以下の主要機能を提供します。
HDFS または OSS 上に、低コストで軽量なデータレイクストレージサービスを構築します。
大規模データセットをストリーミングモードおよびバッチモードで読み書きします。
秒単位から分単位のデータ鮮度で、バッチクエリおよびオンライン分析処理 (OLAP) クエリを実行します。
増分データの消費および生成を行います。Paimon は、従来のオフラインデータウェアハウスおよび最新のストリーミングデータウェアハウスの両方のストレージレイヤーとして使用できます。
事前集約により、ストレージコストおよびダウンストリームの計算負荷を削減します。
データの過去のバージョンを取得します。
データを効率的にフィルタリングします。
スキーマ進化をサポートします。
制限事項と推奨事項
Paimon コネクタは、Flink コンピュートエンジン Ververica Runtime (VVR) バージョン 6.0.6 以降でのみサポートされます。
以下の表は、Paimon と VVR のバージョンマッピングを示しています。
Apache Paimon バージョン
VVR バージョン
1.3
11.4
1.2
11.2、11.3
1.1
11
1.0
8.0.11
0.9
8.0.7、8.0.8、8.0.9、および 8.0.10
0.8
8.0.6
0.7
8.0.5
0.6
8.0.4
0.6
8.0.3
同時書き込みシナリオにおけるストレージの推奨事項
複数のジョブが同時に同じ Paimon テーブルを更新する場合、ファイル操作の原子性に関する制限により、標準的な OSS ストレージ (oss://) ではまれにコミット競合やジョブエラーが発生する可能性があります。
常に安定した書き込みを保証するためには、強力な原子性保証を提供するメタデータまたはストレージサービスの使用を推奨します。特に、Paimon メタデータおよびストレージサービスを統合管理できる Data Lake Formation (DLF) の使用を優先的に推奨します。また、OSS-HDFS または HDFS を使用することも可能です。
SQL
Paimon コネクタは、SQL ジョブ内でソーステーブルまたは結果テーブルとして使用できます。
構文
Paimon Catalog 内に Paimon テーブルを作成する場合、
connectorパラメーターを指定する必要はありません。Paimon テーブルの作成構文は次のとおりです。CREATE TABLE `<YOUR-PAIMON-CATALOG>`.`<YOUR-DB>`.paimon_table ( id BIGINT, data STRING, PRIMARY KEY (id) NOT ENFORCED ) WITH ( ... );説明Paimon Catalog 内にすでに Paimon テーブルが作成済みの場合は、再度作成せずに直接使用できます。
他のカタログ内に Paimon 一時テーブルを作成する場合は、connector パラメーターおよび Paimon テーブルのストレージパスを指定する必要があります。Paimon テーブルの作成構文は次のとおりです。
CREATE TEMPORARY TABLE paimon_table ( id BIGINT, data STRING, PRIMARY KEY (id) NOT ENFORCED ) WITH ( 'connector' = 'paimon', 'path' = '<path-to-paimon-table-files>', 'auto-create' = 'true', -- 指定されたパスに Paimon テーブルのデータファイルが存在しない場合、自動的にファイルが作成されます。 ... );説明パスの例:
'path' = 'oss://<bucket>/test/order.db/orders'。Paimon は、データベースを識別するためにこの命名規則を使用するため、.dbサフィックスを省略しないでください。同じテーブルに書き込む複数のジョブは、同じパス設定を使用する必要があります。
2 つのパス値が異なる場合、Paimon はそれらを異なるテーブルと見なします。物理パスが同一であっても、Catalog 設定の不一致により、同時書き込み時の競合、コンパクション失敗、およびデータ損失が発生する可能性があります。たとえば、
oss://b/testとoss://b/test/は末尾のスラッシュの有無により異なるパスと見なされますが、物理パスは同一です。
WITH パラメーター
パラメーター | 説明 | データの型 | 必須 | デフォルト値 | 備考 |
connector | テーブルタイプ。 | String | いいえ | なし |
|
path | テーブルのストレージパス。 | String | いいえ | なし |
|
auto-create | Paimon 一時テーブルを作成する際に、指定されたパスに Paimon テーブルファイルが存在しない場合に自動的にファイルを作成するかどうかを指定します。 | Boolean | いいえ | false | 有効値:
|
bucket | 各パーティション内のバケット数。 | Integer | いいえ | 1 | Paimon テーブルに書き込まれるデータは、 説明 各バケットのデータ量は 5 GB 未満になるように推奨します。 |
bucket-key | バケット分割のためのキー列。 | String | いいえ | なし | Paimon テーブルに書き込まれるデータを異なるバケットに分散させるために使用する列を指定します。 列名はカンマ (,) で区切ります。たとえば、 説明
|
changelog-producer | 増分データを生成するメカニズム。 | String | いいえ | none | Paimon は、任意の入力データストリームに対して完全な増分データ (すべての update_after データに対応する update_before データが存在する) を生成できます。これは、ダウンストリームのコンシューマーにとって便利です。増分データ生成メカニズムの有効値は次のとおりです。
増分データ生成メカニズムの選択方法の詳細については、「増分データ生成メカニズム」をご参照ください。 |
full-compaction.delta-commits | 完全コンパクションの最大間隔。 | Integer | いいえ | なし | このパラメーターは、完全コンパクションを実行する必要があるスナップショットコミットの回数を指定します。 |
lookup.cache-max-memory-size | Paimon ディメンションテーブルのメモリキャッシュサイズ。 | String | いいえ | 256 MB | このパラメーターは、ディメンションテーブルおよび lookup changelog-producer の両方のキャッシュサイズに影響します。両方のメカニズムのキャッシュサイズは、このパラメーターによって設定されます。 |
merge-engine | 同じプライマリキーを持つデータをマージするメカニズム。 | String | いいえ | deduplicate | 有効値:
データマージメカニズムの詳細な分析については、「データマージメカニズム」をご参照ください。 |
partial-update.ignore-delete | 削除 (-D) メッセージを無視するかどうかを指定します。 | Boolean | いいえ | false | 有効値:
説明
|
ignore-delete | 削除 (-D) メッセージを無視するかどうかを指定します。 | Boolean | いいえ | false | 有効値は partial-update.ignore-delete と同じです。 説明
|
partition.default-name | パーティションのデフォルト名。 | String | いいえ | __DEFAULT_PARTITION__ | パーティションキー列の値が NULL または空文字列の場合、このデフォルト名がパーティション名として使用されます。 |
partition.expiration-check-interval | 期限切れパーティションをチェックする間隔。 | String | いいえ | 1h | 詳細については、「自動パーティション有効期限の設定方法」をご参照ください。 |
partition.expiration-time | パーティションの有効期限時間。 | String | いいえ | なし | パーティションの生存時間がこの値を超えると、パーティションは有効期限切れになります。デフォルトでは、パーティションは有効期限切れになりません。 パーティションの生存時間は、そのパーティション値から計算されます。詳細については、「自動パーティション有効期限の設定方法」をご参照ください。 |
partition.timestamp-formatter | 時間文字列を UNIX タイムスタンプに変換するためのフォーマット文字列。 | String | いいえ | なし | パーティション値からパーティション生存時間を抽出するためのフォーマットを設定します。詳細については、「自動パーティション有効期限の設定方法」をご参照ください。 |
partition.timestamp-pattern | パーティション値をタイムスタンプ文字列に変換するためのフォーマット文字列。 | String | いいえ | なし | パーティション値からパーティション生存時間を抽出するためのフォーマットを設定します。詳細については、「自動パーティション有効期限の設定方法」をご参照ください。 |
scan.bounded.watermark | Paimon ソーステーブルによって生成されるデータのウォーターマークがこの値を超えると、Paimon ソーステーブルはデータ生成を停止します。 | Long | いいえ | なし | なし。 |
scan.mode | Paimon ソーステーブルのコンシューマーオフセットを指定します。 | String | いいえ | default | 詳細については、「Paimon ソーステーブルのコンシューマーオフセットの設定方法」をご参照ください。 |
scan.snapshot-id | Paimon ソーステーブルが消費を開始するスナップショットを指定します。 | Integer | いいえ | なし | 詳細については、「Paimon ソーステーブルのコンシューマーオフセットの設定方法」をご参照ください。 |
scan.timestamp-millis | Paimon ソーステーブルが消費を開始する時点を指定します。 | Integer | いいえ | なし | 詳細については、「Paimon ソーステーブルのコンシューマーオフセットの設定方法」をご参照ください。 |
snapshot.num-retained.max | 有効期限切れにならずに保持される最新スナップショットの最大数。 | Integer | いいえ | 2147483647 | この設定または `snapshot.time-retained` のいずれかが満たされ、かつ `snapshot.num-retained.min` も満たされた場合、スナップショットは有効期限切れになります。 |
snapshot.num-retained.min | 有効期限切れにならずに保持される最新スナップショットの最小数。 | Integer | いいえ | 10 | なし。 |
snapshot.time-retained | スナップショットが有効期限切れになるまでの期間。 | String | いいえ | 1h | この設定または `snapshot.num-retained.max` のいずれかが満たされ、かつ `snapshot.num-retained.min` も満たされた場合、スナップショットは有効期限切れになります。 |
write-mode | Paimon テーブルの書き込みモード。 | String | いいえ | change-log | 有効値:
書き込みモードの詳細については、「書き込みモード」をご参照ください。 |
scan.infer-parallelism | Paimon ソーステーブルの並行度を自動的に推測するかどうかを指定します。 | Boolean | いいえ | true | 有効値:
|
scan.parallelism | Paimon ソーステーブルの並行度。 | Integer | いいえ | なし | 説明 タブで、リソースモード がエキスパートに設定されている場合、このパラメーターは有効になりません。 |
sink.parallelism | Paimon 結果テーブルの並行度。 | Integer | いいえ | なし | 説明 タブで、リソースモード がエキスパートに設定されている場合、このパラメーターは有効になりません。 |
sink.clustering.by-columns | Paimon 結果テーブルへの書き込み時にクラスタリング列を指定します。 | String | いいえ | なし | Paimon append-only テーブル (非プライマリキーテーブル) では、バッチジョブでこのパラメーターを設定すると、クラスタリング書き込み機能が有効になります。この機能は、特定の列の値範囲に基づいてデータをクラスタリングし、テーブルのクエリ速度を向上させます。 複数の列名はカンマ (,) で区切ります。たとえば、 クラスタリングの詳細については、公式 Apache Paimon ドキュメントをご参照ください。 |
sink.delete-strategy | リトラクション (-D/-U) メッセージを正しく処理していることを保証するための検証ポリシーを設定します。 | Enum | いいえ | NONE | 検証ポリシーの有効値およびリトラクションメッセージを処理する際のシンクオペレーターの期待される動作は次のとおりです。
説明
|
設定項目の詳細については、公式 Apache Paimon ドキュメントをご参照ください。
機能の詳細
データ鮮度および一貫性保証
Paimon 結果テーブルは、Flink ジョブの各チェックポイント中に、書き込まれたデータをコミットするために 2 相コミットプロトコルを使用します。そのため、データ鮮度は Flink ジョブのチェックポイント間隔と同じになります。各コミットにより、最大 2 つのスナップショットが生成されます。
2 つの Flink ジョブが同時に同じ Paimon テーブルに書き込む場合、2 つのジョブからのデータが同じバケットに書き込まれない場合、直列化可能性の一貫性が保証されます。2 つのジョブからのデータが同じバケットに書き込まれる場合、スナップショット分離の一貫性のみが保証されます。つまり、テーブル内のデータは両方のジョブの結果が混在したものになる可能性がありますが、データが失われることはありません。
データマージの仕組み
Paimon 結果テーブルが同じプライマリキーを持つ複数のレコードを受信すると、これらのレコードを単一のレコードにマージして、プライマリキーの一意性を維持します。merge-engine パラメーターを設定することで、データマージ動作を指定できます。以下の表は、データマージメカニズムを示しています。
マージメカニズム | 詳細 |
Deduplicate | Deduplicate メカニズムは、デフォルトのデータマージメカニズムです。同じプライマリキーを持つ複数のレコードについて、Paimon 結果テーブルは最新のレコードのみを保持し、他のレコードは破棄します。 説明 最新のレコードが削除メッセージの場合、そのプライマリキーを持つすべてのレコードが破棄されます。 |
Partial Update | Partial Update メカニズムを指定することで、複数のメッセージを通じてデータを段階的に更新し、最終的に完全なデータを取得できます。具体的には、同じプライマリキーを持つ新しいデータは古いデータを上書きしますが、NULL 値を持つ列は上書きされません。 たとえば、Paimon 結果テーブルが次の 3 つのレコードを順番に受信するとします。
最初の列がプライマリキーです。最終的な結果は <1, 25.2, 10, 'This is a book'> になります。 説明
|
Aggregation | 一部のシナリオでは、集計値のみに関心がある場合があります。Aggregation メカニズムは、指定した集計関数に基づいて同じプライマリキーを持つデータを集約します。プライマリキーの一部ではない各列について、 price 列は max 関数を使用して集約され、sales 列は sum 関数を使用して集約されます。<1, 23.0, 15> および <1, 30.2, 20> の 2 つの入力レコードが与えられた場合、最終的な結果は <1, 30.2, 35> になります。現在サポートされている集計関数および対応するデータの型は次のとおりです。
説明
|
増分データ生成メカニズム
changelog-producer パラメーターを適切な増分データ生成メカニズムに設定することで、Paimon は任意の入力データストリームに対して完全な増分データを生成できます。完全な増分データとは、すべての update_after データに対応する update_before データが存在することを意味します。以下に、すべての増分データ生成メカニズムを示します。詳細については、公式 Apache Paimon ドキュメントをご参照ください。
メカニズム | 詳細 |
None |
たとえば、ダウンストリームのコンシューマーが列の合計を計算する必要があるとします。コンシューマーが最新のデータ (5) のみを確認した場合、合計をどのように更新すべきかを判断できません。前のデータが 4 だった場合、合計を 1 増やす必要があります。前のデータが 6 だった場合、合計を 1 減らす必要があります。このようなコンシューマーは update_before データに敏感です。none に設定しないことを推奨します。ただし、他のメカニズムはパフォーマンスオーバーヘッドを伴います。 説明 データベースなど、update_before データに敏感でないダウンストリームのコンシューマーを使用する場合、増分データ生成メカニズムを `none` に設定できます。そのため、実際のニーズに基づいてメカニズムを設定することを推奨します。 |
Input |
したがって、このメカニズムは、入力データストリーム自体が完全な増分データ (Change Data Capture (CDC) データなど) である場合にのみ使用できます。 |
Lookup |
以下で説明する Full Compaction メカニズムと比較して、Lookup メカニズムは増分データの生成においてより優れたタイムリー性を提供しますが、全体的により多くのリソースを消費します。 分単位のデータ鮮度など、データ鮮度の要件が高い場合にこのメカニズムの使用を推奨します。 |
Full Compaction |
Lookup メカニズムと比較して、Full Compaction メカニズムは増分データの生成においてタイムリー性が劣ります。ただし、データの完全コンパクションプロセスを利用し、追加の計算を生成しないため、全体的により少ないリソースを消費します。 時間単位のデータ鮮度など、データ鮮度の要件が高くない場合にこのメカニズムの使用を推奨します。 |
書き込みモード
Paimon テーブルは現在、以下の書き込みモードをサポートしています。
モード | 詳細 |
Change-log | Change-log 書き込みモードは、Paimon テーブルのデフォルトの書き込みモードです。このモードは、プライマリキーに基づいてデータの挿入、削除、および更新をサポートします。また、このモードでは、前述のデータマージおよび増分データ生成メカニズムも使用できます。 |
Append-only | Append-only 書き込みモードは、データ挿入のみをサポートし、プライマリキーをサポートしません。このモードは change-log モードよりも効率的であり、分単位のデータ鮮度など、データ鮮度の要件が高くないシナリオでメッセージキューの代替として使用できます。 Append-only 書き込みモードの詳細については、公式 Apache Paimon ドキュメントをご参照ください。Append-only 書き込みモードを使用する際は、以下の点に注意してください。
|
CTAS および CDAS の宛先として
Paimon テーブルは、単一テーブルレベルまたはデータベース全体レベルでデータのリアルタイム同期をサポートします。同期中にアップストリームテーブルのスキーマが変更された場合、その変更も Paimon テーブルにリアルタイムで同期されます。詳細については、「Paimon テーブルの管理」および「Paimon Catalog の管理」をご参照ください。
データインジェスト
Paimon コネクタは、YAML ジョブ開発でデータインジェストのシンクとして使用できます。
構文
sink:
type: paimon
name: Paimon Sink
catalog.properties.metastore: filesystem
catalog.properties.warehouse: /path/warehouse設定項目
パラメーター | 説明 | 必須 | データの型 | デフォルト値 | 備考 |
type | コネクタタイプ。 | はい | STRING | なし | 静的フィールドは |
name | シンク名。 | いいえ | STRING | なし | シンクの名前。 |
catalog.properties.metastore | Paimon Catalog のタイプ。 | いいえ | STRING | filesystem | 有効値:
|
catalog.properties.* | Paimon Catalog を作成するためのパラメーター。 | いいえ | STRING | なし | 詳細については、「Paimon Catalog の管理」をご参照ください。 |
table.properties.* | Paimon テーブルを作成するためのパラメーター。 | いいえ | STRING | なし | 詳細については、「Paimon テーブルオプション」をご参照ください。 |
catalog.properties.warehouse | ファイルストレージのルートディレクトリ。 | いいえ | STRING | なし | このパラメーターは、 |
commit.user-prefix | データファイルをコミットする際のユーザー名プレフィックス。 | いいえ | STRING | なし | 説明 コミット競合が発生した場合に競合ジョブを簡単に特定できるように、異なるジョブに異なるユーザー名を設定することを推奨します。 |
partition.key | 各パーティションテーブルのパーティションフィールド。 | いいえ | STRING | なし | 異なるテーブルはセミコロン ( |
sink.cross-partition-upsert.tables | プライマリキーにすべてのパーティションフィールドが含まれていないため、パーティション間の更新を必要とするテーブルを指定します。 | いいえ | STRING | なし | パーティション間で更新するテーブル。
重要
|
例
以下の例は、Paimon Catalog タイプに基づいて Paimon をデータインジェストシンクとして設定する方法を示しています。
Paimon Catalog が filesystem の場合、Alibaba Cloud OSS への書き込みの設定例:
source: type: mysql name: MySQL Source hostname: ${secret_values.mysql.hostname} port: ${mysql.port} username: ${secret_values.mysql.username} password: ${secret_values.mysql.password} tables: ${mysql.source.table} server-id: 8601-8604 sink: type: paimon name: Paimon Sink catalog.properties.metastore: filesystem catalog.properties.warehouse: oss://default/test catalog.properties.fs.oss.endpoint: oss-cn-beijing-internal.aliyuncs.com catalog.properties.fs.oss.accessKeyId: xxxxxxxx catalog.properties.fs.oss.accessKeySecret: xxxxxxxxcatalog.properties で始まるパラメーターの詳細については、「Paimon Filesystem Catalog の作成」をご参照ください。
Paimon Catalog が rest の場合、Alibaba Cloud Data Lake Formation への書き込みの設定例:
source: type: mysql name: MySQL Source hostname: ${secret_values.mysql.hostname} port: ${mysql.port} username: ${secret_values.mysql.username} password: ${secret_values.mysql.password} tables: ${mysql.source.table} server-id: 8601-8604 sink: type: paimon name: Paimon Sink catalog.properties.metastore: rest catalog.properties.uri: dlf_uri catalog.properties.warehouse: your_warehouse catalog.properties.token.provider: dlf # (オプション) 削除ベクターを有効にして読み取りパフォーマンスを向上させます。 table.properties.deletion-vectors.enabled: truecatalog.properties で始まるパラメーターの詳細については、「Flink CDC Catalog 設定パラメーター」をご参照ください。
スキーマ進化
現在、データインジェストシンクとしての Paimon は、以下のスキーマ進化イベントをサポートしています。
CREATE TABLE EVENT
ADD COLUMN EVENT
ALTER COLUMN TYPE EVENT (プライマリキー列の型の変更はサポートされていません)
RENAME COLUMN EVENT
DROP COLUMN EVENT
TRUNCATE TABLE EVENT
DROP TABLE EVENT
ダウンストリームの Paimon テーブルがすでに存在する場合、その既存のスキーマが書き込みに使用されます。システムは再度テーブルを作成しようとしません。