Paimon コネクタは、ストリーミングデータレイクハウスコネクタであり、Paimon カタログと併用できます。 このトピックでは、Paimon コネクタの使用方法について説明します。
背景情報
Apache Paimon は、ストリーミング処理とバッチ処理を統合するレイクストレージフォーマットです。 高スループットの書き込みと低レイテンシーのクエリをサポートします。 Alibaba Cloud のオープンソースビッグデータプラットフォームである E-MapReduce 上の Flink、Spark、Hive、Trino などの一般的なコンピュートエンジンは、Paimon と緊密に統合されています。 Apache Paimon を使用して、Hadoop 分散ファイルシステム (HDFS) または Object Storage Service (OSS) 上にデータレイクストレージサービスを構築し、これらのコンピュートエンジンに接続してデータレイク分析を行うことができます。 詳細については、「Apache Paimon」をご参照ください。
カテゴリ | 詳細 |
サポートされるタイプ | ソーステーブル、ディメンションテーブル、結果テーブル、データインジェスト先 |
実行モード | ストリーミングモードとバッチモード |
データフォーマット | サポートされていません |
特定のモニタリングメトリック | なし |
API タイプ | SQL、データインジェスト用の YAML ジョブ |
結果テーブルでのデータの更新または削除のサポート | はい |
機能
Apache Paimon は、以下のコア機能を提供します。
HDFS または OSS に基づいて、低コストで軽量なデータレイクストレージサービスを構築します。
ストリーミングモードとバッチモードで大規模なデータセットを読み書きします。
秒から分単位のデータ鮮度でバッチクエリとオンライン分析処理 (OLAP) クエリを実行します。
増分データを消費および生成します。 Paimon は、従来のオフラインデータウェアハウスと最新のストリーミングデータウェアハウスのストレージレイヤーとして使用できます。
データを事前集約して、ストレージコストと下流のコンピューティング負荷を削減します。
データの履歴バージョンを取得します。
データを効率的にフィルタリングします。
スキーマ進化をサポートします。
制限事項
Paimon コネクタは、Flink コンピュートエンジンの Ververica Runtime (VVR) バージョン 6.0.6 以降でのみサポートされます。
次の表に、Paimon と VVR のバージョンマッピングを示します。
Apache Paimon バージョン
VVR バージョン
1.1
11
1.0
8.0.11
0.9
8.0.7、8.0.8、8.0.9、8.0.10
0.8
8.0.6
0.7
8.0.5
0.6
8.0.4
0.6
8.0.3
SQL
Paimon コネクタは、SQL ジョブでソーステーブルまたは結果テーブルとして使用できます。
構文
Paimon カタログで Paimon テーブルを作成する場合、
connectorパラメーターを指定する必要はありません。 Paimon テーブルを作成するための構文は次のとおりです。CREATE TABLE `<YOUR-PAIMON-CATALOG>`.`<YOUR-DB>`.paimon_table ( id BIGINT, data STRING, PRIMARY KEY (id) NOT ENFORCED ) WITH ( ... );説明Paimon カタログに Paimon テーブルをすでに作成している場合は、再度作成することなく直接使用できます。
別のカタログで Paimon 一時テーブルを作成する場合は、connector パラメーターと Paimon テーブルのストレージパスを指定する必要があります。 Paimon テーブルを作成するための構文は次のとおりです。
CREATE TEMPORARY TABLE paimon_table ( id BIGINT, data STRING, PRIMARY KEY (id) NOT ENFORCED ) WITH ( 'connector' = 'paimon', 'path' = '<path-to-paimon-table-files>', 'auto-create' = 'true', -- 指定されたパスに Paimon テーブルデータファイルが存在しない場合、ファイルが自動的に作成されます。 ... );説明パスの例:
'path' = 'oss://<bucket>/test/order.db/orders'。.dbサフィックスを省略しないでください。 Paimon はこの命名規則を使用してデータベースを識別します。同じテーブルに書き込む複数のジョブは、同じパス構成を使用する必要があります。
2 つのパス値が異なる場合、Paimon はそれらを異なるテーブルと見なします。 物理パスが同じであっても、カタログ構成が一致しないと、同時書き込みの競合、コンパクションの失敗、データ損失が発生する可能性があります。 たとえば、
oss://b/testとoss://b/test/は、物理パスが同じであっても、末尾のスラッシュがあるため異なるパスと見なされます。
WITH パラメーター
パラメーター | 説明 | データの型 | 必須 | デフォルト値 | 注 |
connector | テーブルのタイプ。 | String | いいえ | なし |
|
path | テーブルのストレージパス。 | String | いいえ | なし |
|
auto-create | Paimon 一時テーブルを作成する際に、指定されたパスに Paimon テーブルファイルが存在しない場合にファイルを自動的に作成するかどうかを指定します。 | ブール値 | いいえ | false | 有効値:
|
bucket | 各パーティションのバケット数。 | Integer | いいえ | 1 | Paimon テーブルに書き込まれたデータは、 説明 各バケットのデータ量は 5 GB 未満にすることを推奨します。 |
bucket-key | バケット化のためのキー列。 | String | いいえ | なし | Paimon テーブルに書き込まれるデータを、どの列に基づいて異なるバケットに分散するかを指定します。 列名はカンマ (,) で区切ります。例: 説明
|
changelog-producer | 増分データを生成するメカニズム。 | String | いいえ | none | Paimon は、任意の入力データストリームに対して完全な増分データ (すべての `update_after` データに対応する `update_before` データがある) を生成でき、下流のコンシューマーにとって便利です。 増分データ生成メカニズムの有効値:
増分データを生成するメカニズムの選択方法の詳細については、「増分データ生成メカニズム」をご参照ください。 |
full-compaction.delta-commits | 完全なコンパクションの最大間隔。 | Integer | いいえ | なし | このパラメーターは、完全なコンパクションを実行する必要があるスナップショットのコミット数を指定します。 |
lookup.cache-max-memory-size | Paimon ディメンションテーブルのメモリキャッシュサイズ。 | String | いいえ | 256 MB | このパラメーターは、ディメンションテーブルと lookup changelog-producer の両方のキャッシュサイズに影響します。 両方のメカニズムのキャッシュサイズは、このパラメーターによって構成されます。 |
merge-engine | 同じプライマリキーを持つデータをマージするメカニズム。 | String | いいえ | deduplicate | 有効値:
データマージメカニズムの詳細な分析については、「データマージメカニズム」をご参照ください。 |
partial-update.ignore-delete | 削除 (-D) メッセージを無視するかどうかを指定します。 | ブール値 | いいえ | false | 有効値:
説明
|
ignore-delete | 削除 (-D) メッセージを無視するかどうかを指定します。 | ブール値 | いいえ | false | 有効値は partial-update.ignore-delete と同じです。 説明
|
partition.default-name | パーティションのデフォルト名。 | String | いいえ | __DEFAULT_PARTITION__ | パーティションキー列の値が null または空の文字列の場合、このデフォルト名がパーティション名として使用されます。 |
partition.expiration-check-interval | 期限切れのパーティションをチェックする間隔。 | String | いいえ | 1h | 詳細については、「自動パーティション期限切れを設定するにはどうすればよいですか?」をご参照ください。 |
partition.expiration-time | パーティションの有効期限。 | String | いいえ | なし | パーティションの生存時間がこの値を超えると、パーティションは期限切れになります。 デフォルトでは、パーティションは期限切れになりません。 パーティションの生存時間は、そのパーティション値から計算されます。 詳細については、「自動パーティション期限切れを設定するにはどうすればよいですか?」をご参照ください。 |
partition.timestamp-formatter | 時間文字列を UNIX タイムスタンプに変換するためのフォーマット文字列。 | String | いいえ | なし | パーティション値からパーティションの生存時間を抽出するためのフォーマットを設定します。 詳細については、「自動パーティション期限切れを設定するにはどうすればよいですか?」をご参照ください。 |
partition.timestamp-pattern | パーティション値を時間文字列に変換するためのフォーマット文字列。 | String | いいえ | なし | パーティション値からパーティションの生存時間を抽出するためのフォーマットを設定します。 詳細については、「自動パーティション期限切れを設定するにはどうすればよいですか?」をご参照ください。 |
scan.bounded.watermark | Paimon ソーステーブルによって生成されたデータのウォーターマークがこの値を超えると、Paimon ソーステーブルはデータの生成を停止します。 | Long | いいえ | なし | なし。 |
scan.mode | Paimon ソーステーブルのコンシューマオフセットを指定します。 | String | いいえ | default | 詳細については、「Paimon ソーステーブルのコンシューマオフセットを設定するにはどうすればよいですか?」をご参照ください。 |
scan.snapshot-id | Paimon ソーステーブルが消費を開始するスナップショットを指定します。 | Integer | いいえ | なし | 詳細については、「Paimon ソーステーブルのコンシューマオフセットを設定するにはどうすればよいですか?」をご参照ください。 |
scan.timestamp-millis | Paimon ソーステーブルが消費を開始する時点を指定します。 | Integer | いいえ | なし | 詳細については、「Paimon ソーステーブルのコンシューマオフセットを設定するにはどうすればよいですか?」をご参照ください。 |
snapshot.num-retained.max | 期限切れにせずに保持する最新のスナップショットの最大数。 | Integer | いいえ | 2147483647 | この設定または `snapshot.time-retained` が満たされ、かつ `snapshot.num-retained.min` も満たされた場合、スナップショットは期限切れになります。 |
snapshot.num-retained.min | 期限切れにせずに保持する最新のスナップショットの最小数。 | Integer | いいえ | 10 | なし。 |
snapshot.time-retained | スナップショットが期限切れになるまでの期間。 | String | いいえ | 1h | この設定または `snapshot.num-retained.max` が満たされ、かつ `snapshot.num-retained.min` も満たされた場合、スナップショットは期限切れになります。 |
write-mode | Paimon テーブルの書き込みモード。 | String | いいえ | change-log | 有効値:
書き込みモードの詳細については、「書き込みモード」をご参照ください。 |
scan.infer-parallelism | Paimon ソーステーブルの並列度を自動的に推測するかどうかを指定します。 | ブール値 | いいえ | true | 有効値:
|
scan.parallelism | Paimon ソーステーブルの並列度。 | Integer | いいえ | なし | 説明 タブで、[リソースモード] が [エキスパート] に設定されている場合、このパラメーターは有効になりません。 |
sink.parallelism | Paimon 結果テーブルの並列度。 | Integer | いいえ | なし | 説明 タブで、[リソースモード] が [エキスパート] に設定されている場合、このパラメーターは有効になりません。 |
sink.clustering.by-columns | Paimon 結果テーブルへの書き込みのためのクラスタリング列を指定します。 | String | いいえ | なし | Paimon append-only テーブル (非プライマリキーテーブル) の場合、バッチジョブでこのパラメーターを設定すると、クラスタリング書き込み機能が有効になります。 この機能は、特定列のデータを値の範囲でクラスタリングし、テーブルのクエリ速度を向上させます。 複数の列名をカンマ (,) で区切ります。例: クラスタリングの詳細については、Apache Paimon 公式ドキュメントをご参照ください。 |
sink.delete-strategy | システムがリトラクション (-D/-U) メッセージを正しく処理することを保証するための検証ポリシーを設定します。 | Enum | いいえ | NONE | 検証ポリシーの有効値と、リトラクションメッセージを処理するシンクオペレーターの期待される動作は次のとおりです:
説明
|
設定項目の詳細については、Apache Paimon 公式ドキュメントをご参照ください。
機能詳細
データの鮮度と一貫性の保証
Paimon 結果テーブルは、2 フェーズコミットプロトコルを使用して、Flink ジョブの各チェックポイント中に書き込まれたデータをコミットします。 したがって、データの鮮度は Flink ジョブのチェックポイント間隔と同じです。 各コミットは最大 2 つのスナップショットを生成します。
2 つの Flink ジョブが同時に同じ Paimon テーブルに書き込む場合、2 つのジョブのデータが同じバケットに書き込まれない場合は、シリアライズ可能な一貫性が保証されます。 2 つのジョブのデータが同じバケットに書き込まれる場合は、スナップショット分離の一貫性のみが保証されます。 これは、テーブル内のデータが両方のジョブの結果の混合である可能性があることを意味しますが、データは失われません。
データマージメカニズム
Paimon 結果テーブルが同じプライマリキーを持つ複数のレコードを受信すると、これらのレコードを 1 つのレコードにマージして、プライマリキーの一意性を維持します。 merge-engine パラメーターを設定することで、データマージの動作を指定できます。 次の表に、データマージメカニズムを示します。
メカニズム | 詳細 |
Deduplicate | Deduplicate メカニズムは、デフォルトのデータマージメカニズムです。 同じプライマリキーを持つ複数のレコードの場合、Paimon 結果テーブルは最新のレコードのみを保持し、その他は破棄します。 説明 最新のレコードが削除メッセージの場合、そのプライマリキーを持つすべてのレコードが破棄されます。 |
部分更新 | partial-update メカニズムを指定することで、複数のメッセージを介してデータを段階的に更新し、最終的に完全なデータを取得できます。 具体的には、同じプライマリキーを持つ新しいデータは古いデータを上書きしますが、null 値を持つ列は上書きされません。 たとえば、Paimon 結果テーブルが次の 3 つのレコードを順番に受信するとします。
最初の列はプライマリキーです。 最終結果は <1, 25.2, 10, 'This is a book'> です。 説明
|
集約 | 一部のシナリオでは、集約された値のみに関心がある場合があります。 集約メカニズムは、指定した集計関数に基づいて、同じプライマリキーを持つデータを集約します。 プライマリキーの一部ではない各列について、 price 列は max 関数を使用して集計され、sales 列は sum 関数を使用して集計されます。 2 つの入力レコード <1, 23.0, 15> と <1, 30.2, 20> が与えられた場合、最終結果は <1, 30.2, 35> です。 現在サポートされている集計関数とそれに対応するデータ型は次のとおりです。
説明
|
増分データ生成メカニズム
changelog-producer パラメーターを適切な増分データ生成メカニズムに設定することで、Paimon は任意の入力データストリームに対して完全な増分データを生成できます。 完全な増分データとは、すべての `update_after` データに対応する `update_before` データがあることを意味します。 以下に、すべての増分データ生成メカニズムをリストします。 詳細については、Apache Paimon 公式ドキュメントをご参照ください。
メカニズム | 詳細 |
None |
たとえば、下流のコンシューマーが列の合計を計算する必要があるとします。 コンシューマーが最新のデータである 5 しか見ない場合、合計をどのように更新するかを判断できません。 以前のデータが 4 だった場合は、合計を 1 増やす必要があります。 以前のデータが 6 だった場合は、合計を 1 減らす必要があります。 このようなコンシューマーは `update_before` データに敏感です。 増分データ生成メカニズムを `none` に設定しないことを推奨します。 ただし、他のメカニズムではパフォーマンスのオーバーヘッドが発生します。 説明 データベースなどの下流のコンシューマーが `update_before` データに敏感でない場合は、増分データ生成メカニズムを `none` に設定できます。 したがって、実際のニーズに基づいてメカニズムを設定することを推奨します。 |
Input |
したがって、このメカニズムは、Change Data Capture (CDC) データなど、入力データストリーム自体が完全な増分データである場合にのみ使用できます。 |
Lookup |
以下で説明する Full Compaction メカニズムと比較して、Lookup メカニズムは増分データを生成するためのタイムラインが優れていますが、全体的に多くのリソースを消費します。 分単位の鮮度など、データ鮮度に対する要件が高い場合に、このメカニズムを使用することを推奨します。 |
Full Compaction |
Lookup メカニズムと比較して、Full Compaction メカニズムは増分データを生成するためのタイムラインが劣ります。 ただし、データの完全なコンパクションプロセスを利用し、余分な計算を生成しないため、全体的に消費するリソースが少なくなります。 時間単位の鮮度など、データ鮮度に対する要件が高くない場合に、このメカニズムを使用することを推奨します。 |
書き込みモード
Paimon テーブルは現在、次の書き込みモードをサポートしています。
モード | 詳細 |
Change-log | change-log 書き込みモードは、Paimon テーブルのデフォルトの書き込みモードです。 このモードは、プライマリキーに基づいてデータの挿入、削除、更新をサポートします。 このモードでは、前述のデータマージおよび増分データ生成メカニズムも使用できます。 |
Append-only | append-only 書き込みモードは、データの挿入のみをサポートし、プライマリキーをサポートしません。 このモードは change-log モードよりも効率的であり、分単位の鮮度など、データ鮮度の要件が高くないシナリオでメッセージキューの代替として使用できます。 append-only 書き込みモードの詳細については、Apache Paimon 公式ドキュメントをご参照ください。 append-only 書き込みモードを使用する場合は、次の点にご注意ください。
|
CTAS および CDAS の宛先として
Paimon テーブルは、単一テーブルまたはデータベース全体のレベルでのデータのリアルタイム同期をサポートします。 同期中に上流テーブルのスキーマが変更された場合、その変更は Paimon テーブルにもリアルタイムで同期されます。 詳細については、「Paimon テーブルの管理」および「Paimon カタログの管理」をご参照ください。
データインジェスト
Paimon コネクタは、データインジェストのための YAML ジョブ開発でシンクとして使用できます。
構文
sink:
type: paimon
name: Paimon Sink
catalog.properties.metastore: filesystem
catalog.properties.warehouse: /path/warehouse設定項目
パラメーター | 説明 | 必須 | データの型 | デフォルト値 | 注 |
type | コネクタのタイプ。 | はい | STRING | なし | このパラメーターを |
name | シンク名。 | いいえ | STRING | なし | シンクの名前。 |
catalog.properties.metastore | Paimon カタログのタイプ。 | いいえ | STRING | filesystem | 有効値:
|
catalog.properties.* | Paimon カタログを作成するためのパラメーター。 | いいえ | STRING | なし | 詳細については、「Paimon カタログの管理」をご参照ください。 |
table.properties.* | Paimon テーブルを作成するためのパラメーター。 | いいえ | STRING | なし | 詳細については、「Paimon テーブルオプション」をご参照ください。 |
catalog.properties.warehouse | ファイルストレージのルートディレクトリ。 | いいえ | STRING | なし | このパラメーターは、 |
commit.user-prefix | データファイルをコミットするためのユーザー名プレフィックス。 | いいえ | STRING | なし | 説明 コミットの競合が発生したときに競合するジョブを簡単に見つけられるように、異なるジョブに異なるユーザー名を設定することを推奨します。 |
partition.key | 各パーティションテーブルのパーティションフィールド。 | いいえ | STRING | なし | 異なるテーブルの区切りにはセミコロン ( |
sink.cross-partition-upsert.tables | パーティション間の更新が必要なテーブルを指定します。プライマリキーにはすべてのパーティションフィールドが含まれていません。 | いいえ | STRING | なし | パーティションをまたいで更新するテーブル。
重要
|
例
次の例は、Paimon カタログのタイプに基づいて、Paimon をデータインジェストシンクとして設定する方法を示しています。
Paimon カタログが filesystem の場合に Alibaba Cloud OSS に書き込むための設定例:
source: type: mysql name: MySQL Source hostname: ${secret_values.mysql.hostname} port: ${mysql.port} username: ${secret_values.mysql.username} password: ${secret_values.mysql.password} tables: ${mysql.source.table} server-id: 8601-8604 sink: type: paimon name: Paimon Sink catalog.properties.metastore: filesystem catalog.properties.warehouse: oss://default/test catalog.properties.fs.oss.endpoint: oss-cn-beijing-internal.aliyuncs.com catalog.properties.fs.oss.accessKeyId: xxxxxxxx catalog.properties.fs.oss.accessKeySecret: xxxxxxxxcatalog.properties で始まるパラメーターの詳細については、「Paimon Filesystem カタログの作成」をご参照ください。
Paimon カタログが rest の場合に Alibaba Cloud DLF 2.5 に書き込むための設定例:
source: type: mysql name: MySQL Source hostname: ${secret_values.mysql.hostname} port: ${mysql.port} username: ${secret_values.mysql.username} password: ${secret_values.mysql.password} tables: ${mysql.source.table} server-id: 8601-8604 sink: type: paimon name: Paimon Sink sink: type: paimon name: Paimon Sink catalog.properties.metastore: rest catalog.properties.uri: dlf_uri catalog.properties.warehouse: your_warehouse catalog.properties.token.provider: dlfcatalog.properties で始まるパラメーターの詳細については、「Flink CDC カタログ設定パラメーター」をご参照ください。
スキーマ進化
現在、データインジェストシンクとしての Paimon は、次のスキーマ進化イベントをサポートしています。
CREATE TABLE EVENT
ADD COLUMN EVENT
ALTER COLUMN TYPE EVENT (プライマリキー列の型の変更はサポートされていません)
RENAME COLUMN EVENT
DROP COLUMN EVENT
TRUNCATE TABLE EVENT
DROP TABLE EVENT
下流の Paimon テーブルがすでに存在する場合、その既存のスキーマが書き込みに使用されます。 システムはテーブルを再度作成しようとはしません。
よくある質問
Paimon テーブルにデータを書き込むときに「Heartbeat of TaskManager timed out」エラーが発生するのはなぜですか?
Paimon テーブルにデータを書き込むときに「Sink materializer must not be used with Paimon sink」エラーが発生するのはなぜですか?
Paimon テーブルにデータを書き込むときに「File deletion conflicts detected」または「LSM conflicts detected」エラーが発生するのはなぜですか?
Paimon テーブルからデータを読み取るときに「File xxx not found, Possible causes」エラーが発生するのはなぜですか?