Paimon コネクタは Apache Paimon カタログと一緒に使用することをお勧めします。このトピックでは、Paimon コネクタの使用方法について説明します。
背景情報
Apache Paimon は、ストリーミングモードとバッチモードでデータを処理できるデータレイクストレージです。Apache Paimon は、高スループットのデータ書き込みと低レイテンシのデータクエリをサポートします。Apache Paimon は、Flink、Spark、Hive、Trino など、Alibaba Cloud E-MapReduce (EMR) の一般的なコンピューティングエンジンと互換性があります。Apache Paimon を使用して、Apsara File Storage for HDFS (HDFS) または Object Storage Service (OSS) 上にデータレイクストレージサービスを効率的にデプロイし、前述のコンピューティングエンジンに接続してデータレイク分析を実行できます。詳細については、「Apache Paimon」をご参照ください。
項目 | 説明 |
サポートされるタイプ | ソーステーブル、ディメンションテーブル、結果テーブル、およびデータインジェストシンク |
実行モード | ストリーミングモードとバッチモード |
データフォーマット | N/A |
メトリック | N/A |
API タイプ | SQL API とデータインジェスト YAML API |
結果テーブルでのデータの更新または削除 | サポートされています |
特徴
Apache Paimon は、次の特徴を提供します。
HDFS または OSS に基づく低コストで軽量なデータレイクストレージサービス。
ストリーミングモードとバッチモードでの大規模データセットに対する読み取りおよび書き込み操作。
数分または数秒以内のバッチクエリおよびオンライン分析処理 (OLAP) クエリ。
増分データの消費と生成。Apache Paimon は、従来のオフラインデータウェアハウスおよびストリーミングデータウェアハウスのストレージとして使用できます。
ストレージコストと下流のコンピューティングワークロードを削減するためのデータ事前集約。
履歴バージョンのデータバックトラッキング。
効率的なデータフィルタリング。
テーブルスキーマの変更。
制限事項
Ververica Runtime (VVR) 6.0.6 以降を使用する Realtime Compute for Apache Flink のみが Paimon コネクタをサポートします。
次の表に、Apache Paimon と VVR の異なるバージョン間の互換性を示します。
Apache Paimon バージョン
VVR バージョン
1.1
11
1.0
8.0.11
0.9
8.0.7、8.0.8、8.0.9、および 8.0.10
0.8
8.0.6
0.7
8.0.5
0.6
8.0.4
0.6
8.0.3
SQL
Paimon コネクタは、SQL ドラフトで Paimon テーブルからデータを読み取ったり、Paimon テーブルにデータを書き込んだりするために使用できます。
構文
Apache Paimon カタログで Apache Paimon テーブルを作成する場合、
connectorオプションを指定する必要はありません。次のサンプルコードは、Apache Paimon カタログで Apache Paimon テーブルを作成するための構文を示しています。
CREATE TABLE `<YOUR-PAIMON-CATALOG>`.`<YOUR-DB>`.paimon_table (
id BIGINT,
data STRING,
PRIMARY KEY (id) NOT ENFORCED
) WITH (
...
);Apache Paimon カタログに Apache Paimon テーブルを作成済みの場合は、テーブルを再作成することなく直接使用できます。
Apache Paimon 以外のストレージのカタログに一時的な Apache Paimon テーブルを作成する場合は、connector オプションと Apache Paimon テーブルのストレージパスを指定する必要があります。次のサンプルコードは、そのようなカタログで Apache Paimon テーブルを作成するための構文を示しています。
CREATE TEMPORARY TABLE paimon_table ( id BIGINT, data STRING, PRIMARY KEY (id) NOT ENFORCED ) WITH ( 'connector' = 'paimon', 'path' = '<path-to-paimon-table-files>', 'auto-create'='true', -- 指定されたパスに Apache Paimon テーブルファイルが存在しない場合、ファイルが自動的に作成されます。 ... );
WITH 句のコネクタオプション
オプション | 説明 | タイプ | 必須 | デフォルト値 | 備考 |
connector | テーブルのタイプ。 | String | いいえ | デフォルト値なし |
|
path | テーブルのストレージパス。 | String | いいえ | デフォルト値なし |
|
auto-create | 一時的な Apache Paimon テーブルを作成するときに、指定されたパスに Apache Paimon テーブルファイルが存在しない場合に Apache Paimon テーブルファイルを自動的に作成するかどうかを指定します。 | Boolean | いいえ | false | 有効な値:
|
bucket | 各パーティションのバケット数。 | Integer | いいえ | 1 | Apache Paimon テーブルに書き込まれるデータは、 説明 各バケットのデータは 5 GB 未満のサイズにすることをお勧めします。 |
bucket-key | バケットキー列。 | String | いいえ | デフォルト値なし | Apache Paimon テーブルに書き込まれるデータが異なるバケットに分散される基準となる列。 列名をコンマ (,) で区切ります。たとえば、 説明
|
changelog-producer | 増分データ生成メカニズム。 | String | いいえ | none | Apache Paimon は、下流のデータ消費を容易にするために、任意の入力データストリームに対して完全な増分データを生成できます。各 UPDATE_AFTER データレコードは、UPDATE_BEFORE データレコードに対応します。有効な値:
増分データ生成メカニズムの選択方法の詳細については、このトピックの「増分データ生成メカニズム」セクションをご参照ください。 |
full-compaction.delta-commits | 完全なコンパクションが実行される最大間隔。 | Integer | いいえ | デフォルト値なし | コミットセーブポイントの数がこのオプションの値に達すると、完全なコンパクションが確実にトリガーされます。 |
lookup.cache-max-memory-size | Apache Paimon ディメンションテーブルのメモリキャッシュサイズ。 | String | いいえ | 256 MB | このオプションの値は、ディメンションテーブルとルックアップチェンジログプロデューサーの両方のキャッシュサイズを決定します。 |
merge-engine | 同じプライマリキーを持つデータをマージするメカニズム。 | String | いいえ | deduplicate | 有効な値:
データマージメカニズムの詳細については、このトピックの「データマージメカニズム」セクションをご参照ください。 |
partial-update.ignore-delete | 削除メッセージを無視するかどうかを指定します。 | Boolean | いいえ | false | 有効な値:
説明
|
ignore-delete | 削除メッセージを無視するかどうかを指定します。 | Boolean | いいえ | false | その有効な値は partial-update.ignore-delete と同じです。 説明
|
partition.default-name | パーティションのデフォルト名。 | String | いいえ | __DEFAULT_PARTITION__ | パーティションキー列の値が null または空の文字列の場合、このオプションの値がパーティション名として使用されます。 |
partition.expiration-check-interval | システムがパーティションの有効期限切れをチェックする間隔。 | String | いいえ | 1h | 詳細については、「自動パーティションの有効期限切れを構成するにはどうすればよいですか?」をご参照ください。 |
partition.expiration-time | パーティションの有効期間。 | String | いいえ | デフォルト値なし | パーティションが存在する期間がこのオプションの値を超えると、パーティションは有効期限切れになります。デフォルトでは、パーティションは決して有効期限切れになりません。 パーティションが存在する期間は、パーティションの値に基づいて計算されます。詳細については、「自動パーティションの有効期限切れを構成するにはどうすればよいですか?」をご参照ください。 |
partition.timestamp-formatter | 時間文字列をタイムスタンプに変換するために使用されるパターン。 | String | いいえ | デフォルト値なし | このオプションは、パーティション値からパーティションが存在する期間を抽出するために使用されるパターンを指定します。詳細については、「自動パーティションの有効期限切れを構成するにはどうすればよいですか?」をご参照ください。 |
partition.timestamp-pattern | パーティション値を時間文字列に変換するために使用されるパターン。 | String | いいえ | デフォルト値なし | このオプションは、パーティション値からパーティションが存在する期間を抽出するために使用されるパターンを指定します。詳細については、「アップストリームおよびダウンストリームストレージに関するよくある質問」トピックの「自動パーティションの有効期限切れを構成するにはどうすればよいですか?」セクションをご参照ください。 |
scan.bounded.watermark | 有界ストリーミングモードの終了条件。Apache Paimon ソーステーブルのデータのウォーターマークがこのオプションの値を超えると、Apache Paimon ソーステーブルのデータの生成が終了します。 | Long | いいえ | デフォルト値なし | N/A。 |
scan.mode | Apache Paimon ソーステーブルのコンシューマオフセット。 | String | いいえ | default | 詳細については、「Apache Paimon ソーステーブルのコンシューマオフセットを指定するにはどうすればよいですか?」をご参照ください。 |
scan.snapshot-id | Apache Paimon ソーステーブルがデータの消費を開始するセーブポイントの ID。 | Integer | いいえ | デフォルト値なし | 詳細については、「Apache Paimon ソーステーブルのコンシューマオフセットを指定するにはどうすればよいですか?」をご参照ください。 |
scan.timestamp-millis | Apache Paimon ソーステーブルがデータの消費を開始する時点。 | Integer | いいえ | デフォルト値なし | 詳細については、「Apache Paimon ソーステーブルのコンシューマオフセットを指定するにはどうすればよいですか?」をご参照ください。 |
snapshot.num-retained.max | 保持できる最新のセーブポイントの最大数。 | Integer | いいえ | 2147483647 | セーブポイントの有効期限切れは、snapshot.num-retained.max オプションまたは snapshot.time-retained オプションで指定された条件が満たされ、かつ snapshot.num-retained.min オプションで指定された条件が満たされた場合にのみトリガーされます。 |
snapshot.num-retained.min | 保持できる最新のセーブポイントの最小数。 | Integer | いいえ | 10 | N/A。 |
snapshot.time-retained | セーブポイントを保持できる期間。 | String | いいえ | 1h | セーブポイントの有効期限切れは、snapshot.num-retained.max オプションまたは snapshot.time-retained オプションで指定された条件が満たされ、かつ snapshot.num-retained.min オプションで指定された条件が満たされた場合にのみトリガーされます。 |
write-mode | Apache Paimon テーブルの書き込みモード。 | String | いいえ | change-log | 有効な値:
書き込みモードの詳細については、「書き込みモード」をご参照ください。 |
scan.infer-parallelism | Apache Paimon ソーステーブルの並列処理の次数を自動的に推測するかどうかを指定します。 | Boolean | いいえ | true | 有効な値:
|
scan.parallelism | Apache Paimon ソーステーブルの並列処理の次数。 | Integer | いいえ | デフォルト値なし | 説明 このオプションはエキスパートモードでは有効になりません。 に移動して、[mode] オプションの値を確認してください。 |
sink.parallelism | Apache Paimon 結果テーブルの並列処理。 | Integer | いいえ | デフォルト値なし | 説明 このオプションはエキスパートモードでは有効になりません。 に移動して、[mode] オプションの値を確認してください。 |
sink.clustering.by-columns | Apache Paimon 結果テーブルにデータを書き込むために使用されるクラスタリング列。 | String | いいえ | デフォルト値なし | プライマリキーのない Apache Paimon append-only テーブルの場合、バッチデプロイメントでこのオプションを指定して、データ書き込みのクラスタリング機能を有効にできます。この機能が有効になると、データはサイズによって特定の列にクラスタリングされて表示されます。これにより、テーブルのクエリ速度が向上します。 複数の列名をコンマ (,) で区切ります。例: クラスタリング機能の詳細については、「クラスタリング」をご参照ください。 |
sink.delete-strategy | システムがリトラクション (削除および更新前) メッセージを処理するための検証戦略を指定します。
| Enum | いいえ | NONE | 有効な値:
説明
|
設定項目の詳細については、「Configuration」をご参照ください。
特徴
データの鮮度と一貫性の保証
Apache Paimon 結果テーブルは、Flink デプロイメントでチェックポイントが生成されるたびに、2 フェーズコミットプロトコル (2PC) を使用して書き込まれたデータをコミットします。したがって、データの鮮度は Flink デプロイメントのチェックポイント間隔に基づいています。データがコミットされるたびに最大 2 つのセーブポイントが生成される可能性があります。
2 つの Flink デプロイメントが同時に Apache Paimon テーブルにデータを書き込むが、異なるバケットにデータを書き込む場合、シリアライズ可能な一貫性が保証されます。Flink デプロイメントが同じバケットにデータを書き込む場合、セーブポイントの分離レベルの一貫性のみが保証されます。この場合、テーブルには 2 つのデプロイメントの結果が含まれる可能性がありますが、データは失われません。
データマージメカニズム
Apache Paimon 結果テーブルが同じプライマリキーを持つ複数のデータレコードを受信すると、Apache Paimon 結果テーブルはデータレコードを 1 つのデータレコードにマージして、プライマリキーの一意性を保証します。merge-engine オプションを指定して、データマージメカニズムを指定できます。次の表に、さまざまなデータマージメカニズムを示します。
データマージメカニズム | 説明 |
deduplicate | これは merge-engine オプションのデフォルト値です。データマージメカニズムが deduplicate で、複数のデータレコードが同じプライマリキーを持つ場合、Apache Paimon 結果テーブルは最新のデータレコードのみを保持し、他のデータレコードは破棄します。 説明 最新のデータレコードが削除メッセージの場合、同じプライマリキーを持つすべてのデータレコードが破棄されます。 |
partial-update | partial-update メカニズムを使用すると、複数のメッセージを使用してデータを更新し、最終的に完全なデータを取得できます。既存のデータと同じプライマリキーを持つ新しいデータは、既存のデータを上書きします。NULL 値を持つ列は既存のデータを上書きできません。 たとえば、Apache Paimon 結果テーブルは次のデータレコードを順番に受信します。
最初の列がプライマリキーの場合、最終結果は <1, 25.2, 10, 'This is a book'> になります。 説明
|
aggregation | 特定のシナリオでは、集約された値のみが必要になる場合があります。集約メカニズムは、指定された集計関数に基づいて同じプライマリキーを持つデータを集約できます。プライマリキーではない各列に集計関数を指定するには、 この例では、price 列のデータは max 関数に基づいて集約され、sales 列のデータは sum 関数に基づいて集約されます。入力データレコードが <1, 23.0, 15> と <1, 30.2, 20> の場合、結果は <1, 30.2, 35> になります。サポートされている集計関数とデータ型のマッピング:
説明
|
増分データ生成メカニズム
増分データ生成メカニズムは、changelog-producer オプションによって指定されます。Apache Paimon は、任意の入力データストリームに対して完全な増分データを生成できます。各 UPDATE_AFTER データレコードは、UPDATE_BEFORE データレコードに対応します。次の表に、すべての増分データ生成メカニズムを示します。詳細については、「Configuration」をご参照ください。
増分データ生成メカニズム | 説明 |
None | None は たとえば、下流のコンシューマーが列の合計を計算したい場合、コンシューマーが最新の値 5 のみを取得した場合、下流のコンシューマーは合計をどのように更新するかを判断できません。元の値が 4 の場合、合計は 1 増加する必要があります。元の値が 6 の場合、合計は 1 減少する必要があります。このタイプのコンシューマーは UPDATE_BEFORE データに敏感です。このタイプのコンシューマーの場合、changelog-producer オプションを none に設定しないことをお勧めします。ただし、他の増分データ生成メカニズムはパフォーマンスの低下を引き起こす可能性があります。 説明 下流のコンシューマーが UPDATE_BEFORE データに敏感でないデータベースである場合は、changelog-producer オプションを none に設定できます。ビジネス要件に基づいてこのオプションを指定することをお勧めします。 |
Input |
したがって、この増分データ生成メカニズムは、Change Data Capture (CDC) データなどの入力データストリームが完全である場合にのみ使用できます。 |
Lookup |
lookup メカニズムは、増分データの生成において full-compaction メカニズムよりも効率的です。ただし、lookup メカニズムはより多くのリソースを消費します。 増分データの鮮度に対する要件が高いシナリオでは、lookup メカニズムを使用することをお勧めします。たとえば、数分以内の増分データが必要です。 |
Full Compaction |
lookup メカニズムと比較して、完全なコンパクションメカニズムは増分データの生成効率が低くなります。ただし、完全なコンパクションメカニズムは、データの完全なコンパクションプロセスに基づいて追加の計算を引き起こさないため、消費されるリソースが少なくなります。 増分データの鮮度に対する要件が高くないシナリオでは、完全なコンパクションメカニズムを使用することをお勧めします。たとえば、数時間以内の増分データが必要です。 |
書き込みモード
次の表に、Apache Paimon テーブルでサポートされている書き込みモードを示します。
書き込みモード | 説明 |
Change-log | change-log は、Apache Paimon テーブルのデフォルトの書き込みモードです。change-log モードでは、テーブルのプライマリキーに基づいて Apache Paimon テーブルにデータを挿入、削除、および更新できます。change-log モードでは、データマージメカニズムと増分データ生成メカニズムも使用できます。 |
Append-only | append-only モードでは、Apache Paimon テーブルはデータの挿入のみを許可し、プライマリキーに基づく操作はサポートしません。append-only モードは、change-log モードよりも効率的です。append-only モードでは、Apache Paimon テーブルは、データの鮮度要件が高くないシナリオで Message Queue の代替として使用できます。たとえば、数時間以内のデータが必要です。 append-only モードの詳細については、「Configuration」をご参照ください。append-only モードを使用する場合は、次の点に注意してください。
|
CTAS および CDAS ベースのデータ同期
CREATE TABLE AS および CREATE DATABASE AS 文を実行して、単一のテーブルまたはデータベース全体から Paimon テーブルにデータとスキーマの変更をリアルタイムで同期できます。詳細については、「Paimon カタログの管理」トピックの「Apache Paimon テーブルの管理」セクションをご参照ください。
データインジェスト
YAML ドラフトで Paimon コネクタを使用して、Paimon テーブルにデータを書き込むことができます。
構文
sink:
type: paimon
name: Paimon Sink
catalog.properties.metastore: filesystem
catalog.properties.warehouse: /path/warehouseコネクタオプション
オプション | 説明 | 必須 | データの型 | デフォルト値 | 備考 |
type | コネクタタイプ。 | サポートされています | STRING | デフォルト値なし | 値を |
name | シンクの名前。 | いいえ | STRING | デフォルト値なし | N/A. |
catalog.properties.metastore | Paimon カタログのメタストアタイプ。 | いいえ | STRING | filesystem | 有効な値:
|
catalog.properties.* | Paimon カタログのオプションをパイプラインに渡します。 | いいえ | STRING | デフォルト値なし | 詳細については、「Paimon カタログの管理」をご参照ください。 |
table.properties.* | Paimon テーブルのオプションをパイプラインに渡します。 | いいえ | STRING | デフォルト値なし | 詳細については、「Paimon テーブルオプション」をご参照ください。 |
catalog.properties.warehouse | カタログのウェアハウスルートパス。 | いいえ | STRING | デフォルト値なし | このオプションは、 |
commit.user | データファイルをコミットするためのユーザー名。 | いいえ | STRING | デフォルト値なし | 説明 コミットの競合が発生した場合に問題のあるデプロイメントを迅速に特定するために、各デプロイメントに一意のユーザー名を使用することをお勧めします。 |
partition.key | 各パーティションテーブルのパーティションキー。 | いいえ | STRING | デフォルト値なし | テーブルはセミコロン |
サンプルコード
Paimon コネクタを使用して、Paimon カタログのタイプに基づいて Paimon テーブルにデータをインジェストできます。
Paimon カタログタイプが filesystem の場合、Alibaba Cloud OSS にデータを書き込みます。サンプルコード:
source:
type: mysql
name: MySQL Source
hostname: ${secret_values.mysql.hostname}
port: ${mysql.port}
username: ${secret_values.mysql.username}
password: ${secret_values.mysql.password}
tables: ${mysql.source.table}
server-id: 8601-8604
sink:
type: paimon
name: Paimon Sink
catalog.properties.metastore: filesystem
catalog.properties.warehouse: oss://default/test
catalog.properties.fs.oss.endpoint: oss-cn-beijing-internal.aliyuncs.com
catalog.properties.fs.oss.accessKeyId: xxxxxxxx
catalog.properties.fs.oss.accessKeySecret: xxxxxxxxcatalog.properties で始まるオプションの詳細については、「Paimon Filesystem カタログの作成」をご参照ください。
Paimon カタログタイプが rest の場合、DLF にデータを書き込みます。サンプルコード:
source:
type: mysql
name: MySQL Source
hostname: ${secret_values.mysql.hostname}
port: ${mysql.port}
username: ${secret_values.mysql.username}
password: ${secret_values.mysql.password}
tables: ${mysql.source.table}
server-id: 8601-8604
sink:
type: paimon
name: Paimon Sink
sink:
type: paimon
name: Paimon Sink
catalog.properties.metastore: rest
catalog.properties.uri: dlf_uri
catalog.properties.warehouse: your_warehouse
catalog.properties.token.provider: dlfcatalog.properties で始まるオプションの詳細については、「Flink CDC を使用した DLF へのアクセス」をご参照ください。