すべてのプロダクト
Search
ドキュメントセンター

Realtime Compute for Apache Flink:Paimon コネクタ

最終更新日:Nov 09, 2025

Paimon コネクタは Apache Paimon カタログと一緒に使用することをお勧めします。このトピックでは、Paimon コネクタの使用方法について説明します。

背景情報

Apache Paimon は、ストリーミングモードとバッチモードでデータを処理できるデータレイクストレージです。Apache Paimon は、高スループットのデータ書き込みと低レイテンシのデータクエリをサポートします。Apache Paimon は、Flink、Spark、Hive、Trino など、Alibaba Cloud E-MapReduce (EMR) の一般的なコンピューティングエンジンと互換性があります。Apache Paimon を使用して、Apsara File Storage for HDFS (HDFS) または Object Storage Service (OSS) 上にデータレイクストレージサービスを効率的にデプロイし、前述のコンピューティングエンジンに接続してデータレイク分析を実行できます。詳細については、「Apache Paimon」をご参照ください。

項目

説明

サポートされるタイプ

ソーステーブル、ディメンションテーブル、結果テーブル、およびデータインジェストシンク

実行モード

ストリーミングモードとバッチモード

データフォーマット

N/A

メトリック

N/A

API タイプ

SQL API とデータインジェスト YAML API

結果テーブルでのデータの更新または削除

サポートされています

特徴

Apache Paimon は、次の特徴を提供します。

  • HDFS または OSS に基づく低コストで軽量なデータレイクストレージサービス。

  • ストリーミングモードとバッチモードでの大規模データセットに対する読み取りおよび書き込み操作。

  • 数分または数秒以内のバッチクエリおよびオンライン分析処理 (OLAP) クエリ。

  • 増分データの消費と生成。Apache Paimon は、従来のオフラインデータウェアハウスおよびストリーミングデータウェアハウスのストレージとして使用できます。

  • ストレージコストと下流のコンピューティングワークロードを削減するためのデータ事前集約。

  • 履歴バージョンのデータバックトラッキング。

  • 効率的なデータフィルタリング。

  • テーブルスキーマの変更。

制限事項

  • Ververica Runtime (VVR) 6.0.6 以降を使用する Realtime Compute for Apache Flink のみが Paimon コネクタをサポートします。

  • 次の表に、Apache Paimon と VVR の異なるバージョン間の互換性を示します。

    Apache Paimon バージョン

    VVR バージョン

    1.1

    11

    1.0

    8.0.11

    0.9

    8.0.7、8.0.8、8.0.9、および 8.0.10

    0.8

    8.0.6

    0.7

    8.0.5

    0.6

    8.0.4

    0.6

    8.0.3

SQL

Paimon コネクタは、SQL ドラフトで Paimon テーブルからデータを読み取ったり、Paimon テーブルにデータを書き込んだりするために使用できます。

構文

  • Apache Paimon カタログで Apache Paimon テーブルを作成する場合、connector オプションを指定する必要はありません。次のサンプルコードは、Apache Paimon カタログで Apache Paimon テーブルを作成するための構文を示しています。

CREATE TABLE `<YOUR-PAIMON-CATALOG>`.`<YOUR-DB>`.paimon_table (
  id BIGINT,
  data STRING,
  PRIMARY KEY (id) NOT ENFORCED
) WITH (
  ...
);
説明

Apache Paimon カタログに Apache Paimon テーブルを作成済みの場合は、テーブルを再作成することなく直接使用できます。

  • Apache Paimon 以外のストレージのカタログに一時的な Apache Paimon テーブルを作成する場合は、connector オプションと Apache Paimon テーブルのストレージパスを指定する必要があります。次のサンプルコードは、そのようなカタログで Apache Paimon テーブルを作成するための構文を示しています。

    CREATE TEMPORARY TABLE paimon_table (
      id BIGINT,
      data STRING,
      PRIMARY KEY (id) NOT ENFORCED
    ) WITH (
      'connector' = 'paimon',
      'path' = '<path-to-paimon-table-files>',
      'auto-create'='true', -- 指定されたパスに Apache Paimon テーブルファイルが存在しない場合、ファイルが自動的に作成されます。
      ...
    );

WITH 句のコネクタオプション

オプション

説明

タイプ

必須

デフォルト値

備考

connector

テーブルのタイプ。

String

いいえ

デフォルト値なし

  • Apache Paimon カタログで Apache Paimon テーブルを作成する場合、このオプションを指定する必要はありません。

  • Apache Paimon 以外のストレージのカタログに一時的な Apache Paimon テーブルを作成する場合は、値を paimon に設定します。

path

テーブルのストレージパス。

String

いいえ

デフォルト値なし

  • Apache Paimon カタログで Apache Paimon テーブルを作成する場合、このオプションを指定する必要はありません。

  • Apache Paimon 以外のストレージのカタログに一時的な Apache Paimon テーブルを作成する場合は、このオプションをテーブルを保存する HDFS または OSS ディレクトリに設定します。

auto-create

一時的な Apache Paimon テーブルを作成するときに、指定されたパスに Apache Paimon テーブルファイルが存在しない場合に Apache Paimon テーブルファイルを自動的に作成するかどうかを指定します。

Boolean

いいえ

false

有効な値:

  • false (デフォルト): 指定されたパスに Apache Paimon テーブルファイルが存在しない場合、エラーが返されます。

  • true: 指定されたパスが存在しない場合、システムは自動的に Apache Paimon テーブルファイルを作成します。

bucket

各パーティションのバケット数。

Integer

いいえ

1

Apache Paimon テーブルに書き込まれるデータは、bucket-key オプションで指定された列に基づいて各バケットに分散されます。

説明

各バケットのデータは 5 GB 未満のサイズにすることをお勧めします。

bucket-key

バケットキー列。

String

いいえ

デフォルト値なし

Apache Paimon テーブルに書き込まれるデータが異なるバケットに分散される基準となる列。

列名をコンマ (,) で区切ります。たとえば、'bucket-key' = 'order_id,cust_id' は、データが order_id 列と cust_id 列に基づいてバケットに分散されることを示します。

説明
  • このオプションを指定しない場合、データはプライマリキーに基づいて分散されます。

  • Apache Paimon テーブルにプライマリキーが指定されていない場合、データはすべての列の値に基づいて分散されます。

changelog-producer

増分データ生成メカニズム。

String

いいえ

none

Apache Paimon は、下流のデータ消費を容易にするために、任意の入力データストリームに対して完全な増分データを生成できます。各 UPDATE_AFTER データレコードは、UPDATE_BEFORE データレコードに対応します。有効な値:

  • none (デフォルト): 増分データは生成されません。下流のコンシューマーは、ストリーミングモードで Apache Paimon テーブルからデータを読み取ることができます。ただし、下流のコンシューマーによって読み取られる増分データには UPDATE_AFTER データのみが含まれ、UPDATE_BEFORE データは含まれません。

  • input: 入力データストリームは、デュアルライトモードで増分データとして増分データファイルに書き込まれます。

  • full-compaction: 完全なコンパクションが実行されるたびに、完全な増分データが生成されます。

  • lookup: コミットセーブポイントが実行される前に、完全な増分データが生成されます。

増分データ生成メカニズムの選択方法の詳細については、このトピックの「増分データ生成メカニズム」セクションをご参照ください。

full-compaction.delta-commits

完全なコンパクションが実行される最大間隔。

Integer

いいえ

デフォルト値なし

コミットセーブポイントの数がこのオプションの値に達すると、完全なコンパクションが確実にトリガーされます。

lookup.cache-max-memory-size

Apache Paimon ディメンションテーブルのメモリキャッシュサイズ。

String

いいえ

256 MB

このオプションの値は、ディメンションテーブルとルックアップチェンジログプロデューサーの両方のキャッシュサイズを決定します。

merge-engine

同じプライマリキーを持つデータをマージするメカニズム。

String

いいえ

deduplicate

有効な値:

  • deduplicate: 最新のデータレコードのみが保持されます。

  • partial-update: 最新のデータと同じプライマリキーを持つ既存のデータは、null でない列で最新のデータによって上書きされます。他の列のデータは変更されません。

  • aggregation: 事前集約を実行するために集計関数が指定されます。

データマージメカニズムの詳細については、このトピックの「データマージメカニズム」セクションをご参照ください。

partial-update.ignore-delete

削除メッセージを無視するかどうかを指定します。

Boolean

いいえ

false

有効な値:

  • true: 削除メッセージは無視されます。

  • false: 削除メッセージは無視されません。sequence.field または他のオプションを構成して、シンクが削除メッセージをどのように処理するかを設定する必要があります。そうしないと、IllegalStateException や IllegalArgumentException のようなエラーが発生します。

説明
  • VVR 8.0.6 以前を使用する Realtime Compute for Apache Flink では、このオプションは merge-engine = partial-update が構成されている場合にのみ有効です。

  • VVR 8.0.7 以降を使用する Realtime Compute for Apache Flink では、このオプションは部分更新以外のシナリオにも適用でき、機能的には ignore-delete オプションと同等です。この場合、代わりに ignore-delete を使用することをお勧めします。

  • 削除メッセージを無視する必要があるかどうかは、実際のシナリオによって異なります。ビジネス要件に基づいてこのオプションを構成する必要があります。

ignore-delete

削除メッセージを無視するかどうかを指定します。

Boolean

いいえ

false

その有効な値は partial-update.ignore-delete と同じです。

説明
  • Ververica Runtime (VVR) 8.0.7 以降を使用する Realtime Compute for Apache Flink のみがこのオプションをサポートします。

  • このオプションは、partial-update.ignore-delete オプションと機能的に同等です。partial-update.ignore-delete オプションの代わりに ignore-delete オプションを使用し、両方のオプションを同時に構成することは避けてください。

partition.default-name

パーティションのデフォルト名。

String

いいえ

__DEFAULT_PARTITION__

パーティションキー列の値が null または空の文字列の場合、このオプションの値がパーティション名として使用されます。

partition.expiration-check-interval

システムがパーティションの有効期限切れをチェックする間隔。

String

いいえ

1h

詳細については、「自動パーティションの有効期限切れを構成するにはどうすればよいですか?」をご参照ください。

partition.expiration-time

パーティションの有効期間。

String

いいえ

デフォルト値なし

パーティションが存在する期間がこのオプションの値を超えると、パーティションは有効期限切れになります。デフォルトでは、パーティションは決して有効期限切れになりません。

パーティションが存在する期間は、パーティションの値に基づいて計算されます。詳細については、「自動パーティションの有効期限切れを構成するにはどうすればよいですか?」をご参照ください。

partition.timestamp-formatter

時間文字列をタイムスタンプに変換するために使用されるパターン。

String

いいえ

デフォルト値なし

このオプションは、パーティション値からパーティションが存在する期間を抽出するために使用されるパターンを指定します。詳細については、「自動パーティションの有効期限切れを構成するにはどうすればよいですか?」をご参照ください。

partition.timestamp-pattern

パーティション値を時間文字列に変換するために使用されるパターン。

String

いいえ

デフォルト値なし

このオプションは、パーティション値からパーティションが存在する期間を抽出するために使用されるパターンを指定します。詳細については、「アップストリームおよびダウンストリームストレージに関するよくある質問」トピックの「自動パーティションの有効期限切れを構成するにはどうすればよいですか?」セクションをご参照ください。

scan.bounded.watermark

有界ストリーミングモードの終了条件。Apache Paimon ソーステーブルのデータのウォーターマークがこのオプションの値を超えると、Apache Paimon ソーステーブルのデータの生成が終了します。

Long

いいえ

デフォルト値なし

N/A。

scan.mode

Apache Paimon ソーステーブルのコンシューマオフセット。

String

いいえ

default

詳細については、「Apache Paimon ソーステーブルのコンシューマオフセットを指定するにはどうすればよいですか?」をご参照ください。

scan.snapshot-id

Apache Paimon ソーステーブルがデータの消費を開始するセーブポイントの ID。

Integer

いいえ

デフォルト値なし

詳細については、「Apache Paimon ソーステーブルのコンシューマオフセットを指定するにはどうすればよいですか?」をご参照ください。

scan.timestamp-millis

Apache Paimon ソーステーブルがデータの消費を開始する時点。

Integer

いいえ

デフォルト値なし

詳細については、「Apache Paimon ソーステーブルのコンシューマオフセットを指定するにはどうすればよいですか?」をご参照ください。

snapshot.num-retained.max

保持できる最新のセーブポイントの最大数。

Integer

いいえ

2147483647

セーブポイントの有効期限切れは、snapshot.num-retained.max オプションまたは snapshot.time-retained オプションで指定された条件が満たされ、かつ snapshot.num-retained.min オプションで指定された条件が満たされた場合にのみトリガーされます。

snapshot.num-retained.min

保持できる最新のセーブポイントの最小数。

Integer

いいえ

10

N/A。

snapshot.time-retained

セーブポイントを保持できる期間。

String

いいえ

1h

セーブポイントの有効期限切れは、snapshot.num-retained.max オプションまたは snapshot.time-retained オプションで指定された条件が満たされ、かつ snapshot.num-retained.min オプションで指定された条件が満たされた場合にのみトリガーされます。

write-mode

Apache Paimon テーブルの書き込みモード。

String

いいえ

change-log

有効な値:

  • change-log: データは、プライマリキーに基づいて Apache Paimon テーブルに挿入、削除、および更新されます。

  • append-only: Apache Paimon テーブルはデータの挿入のみを許可し、プライマリキーに基づく操作はサポートしません。このモードは、change-log モードよりも効率的です。

書き込みモードの詳細については、「書き込みモード」をご参照ください。

scan.infer-parallelism

Apache Paimon ソーステーブルの並列処理の次数を自動的に推測するかどうかを指定します。

Boolean

いいえ

true

有効な値:

  • true: Apache Paimon ソーステーブルの並列処理は、バケット数に基づいて自動的に推測されます。

  • false: Ververica Platform (VVP) に基づいて構成されたデフォルトの並列処理の次数が使用されます。リソース構成がエキスパートモードの場合、構成された並列処理の次数が使用されます。

scan.parallelism

Apache Paimon ソーステーブルの並列処理の次数。

Integer

いいえ

デフォルト値なし

説明

このオプションはエキスパートモードでは有効になりません。[Configuration] > [Resources] に移動して、[mode] オプションの値を確認してください。

sink.parallelism

Apache Paimon 結果テーブルの並列処理。

Integer

いいえ

デフォルト値なし

説明

このオプションはエキスパートモードでは有効になりません。[Configuration] > [Resources] に移動して、[mode] オプションの値を確認してください。

sink.clustering.by-columns

Apache Paimon 結果テーブルにデータを書き込むために使用されるクラスタリング列。

String

いいえ

デフォルト値なし

プライマリキーのない Apache Paimon append-only テーブルの場合、バッチデプロイメントでこのオプションを指定して、データ書き込みのクラスタリング機能を有効にできます。この機能が有効になると、データはサイズによって特定の列にクラスタリングされて表示されます。これにより、テーブルのクエリ速度が向上します。

複数の列名をコンマ (,) で区切ります。例: 'col1,col2'

クラスタリング機能の詳細については、「クラスタリング」をご参照ください。

sink.delete-strategy

システムがリトラクション (削除および更新前) メッセージを処理するための検証戦略を指定します。

Enum

いいえ

NONE

有効な値:

  • NONE (デフォルト): 検証は実行されません。

  • IGNORE_DELETE: シンクオペレーターはリトラクションメッセージを無視する必要があります。

  • NON_PK_FIELD_TO_NULL: シンクオペレーターは update_before メッセージを無視する必要があります。delete メッセージを受信した場合、対応するプライマリキー値を保持し、非プライマリキー列の値を削除する必要があります。

    この値は、データが複数のシンクから単一のテーブルに書き込まれる部分更新シナリオに適しています。

  • DELETE_ROW_ON_PK: シンクオペレーターは update_before メッセージを無視し、delete メッセージに対応するデータレコードを削除する必要があります。

  • CHANGELOG_STANDARD: シンクオペレーターは、受信した update_before および delete メッセージに対応するデータレコードを削除する必要があります。

説明
  • Ververica Runtime (VVR) 8.0.8 以降を使用する Realtime Compute for Apache Flink のみがこのオプションをサポートします。

  • ignore-delete や merge-engine などのオプションを構成して、Paimon Sink がリトラクションメッセージを処理する方法を管理できます。このオプションは、リトラクションの動作が期待どおりであるかどうかを確認するために使用されます。実際のリトラクション動作が期待から逸脱している場合、Paimon Sink は問題のある操作を防ぎ、エラーを報告します。エラーは、ignore-delete や merge-engine などのオプションを調整してリトラクション動作を修正するようにガイドします。

説明

設定項目の詳細については、「Configuration」をご参照ください。

特徴

データの鮮度と一貫性の保証

Apache Paimon 結果テーブルは、Flink デプロイメントでチェックポイントが生成されるたびに、2 フェーズコミットプロトコル (2PC) を使用して書き込まれたデータをコミットします。したがって、データの鮮度は Flink デプロイメントのチェックポイント間隔に基づいています。データがコミットされるたびに最大 2 つのセーブポイントが生成される可能性があります。

2 つの Flink デプロイメントが同時に Apache Paimon テーブルにデータを書き込むが、異なるバケットにデータを書き込む場合、シリアライズ可能な一貫性が保証されます。Flink デプロイメントが同じバケットにデータを書き込む場合、セーブポイントの分離レベルの一貫性のみが保証されます。この場合、テーブルには 2 つのデプロイメントの結果が含まれる可能性がありますが、データは失われません。

データマージメカニズム

Apache Paimon 結果テーブルが同じプライマリキーを持つ複数のデータレコードを受信すると、Apache Paimon 結果テーブルはデータレコードを 1 つのデータレコードにマージして、プライマリキーの一意性を保証します。merge-engine オプションを指定して、データマージメカニズムを指定できます。次の表に、さまざまなデータマージメカニズムを示します。

データマージメカニズム

説明

deduplicate

これは merge-engine オプションのデフォルト値です。データマージメカニズムが deduplicate で、複数のデータレコードが同じプライマリキーを持つ場合、Apache Paimon 結果テーブルは最新のデータレコードのみを保持し、他のデータレコードは破棄します。

説明

最新のデータレコードが削除メッセージの場合、同じプライマリキーを持つすべてのデータレコードが破棄されます。

partial-update

partial-update メカニズムを使用すると、複数のメッセージを使用してデータを更新し、最終的に完全なデータを取得できます。既存のデータと同じプライマリキーを持つ新しいデータは、既存のデータを上書きします。NULL 値を持つ列は既存のデータを上書きできません。

たとえば、Apache Paimon 結果テーブルは次のデータレコードを順番に受信します。

  • <1, 23.0, 10, NULL>

  • <1, NULL, NULL, 'This is a book'>

  • <1, 25.2, NULL, NULL>

最初の列がプライマリキーの場合、最終結果は <1, 25.2, 10, 'This is a book'> になります。

説明
  • Apache Paimon 結果テーブルが partial-update メカニズムを使用して取得した結果をストリーミングモードで読み取る場合は、changelog-producer オプションを lookup または full-compaction に設定する必要があります。

  • partial-update メカニズムを使用する場合、削除メッセージは処理できません。partial-update.ignore-delete オプションを true に設定して、削除メッセージを無視できます。

aggregation

特定のシナリオでは、集約された値のみが必要になる場合があります。集約メカニズムは、指定された集計関数に基づいて同じプライマリキーを持つデータを集約できます。プライマリキーではない各列に集計関数を指定するには、fields.<field-name>.aggregate-function を使用する必要があります。プライマリキーではない列に集計関数を指定しない場合、その列はデフォルトで last_non_null_value 集計関数を使用します。次のサンプルコードは、集約メカニズムを使用して Apache Paimon テーブルを作成する方法の例を示しています。

CREATE TABLE MyTable (
  product_id BIGINT,
  price DOUBLE,
  sales BIGINT,
  PRIMARY KEY (product_id) NOT ENFORCED
) WITH (
  'merge-engine' = 'aggregation',
  'fields.price.aggregate-function' = 'max',
  'fields.sales.aggregate-function' = 'sum'
);

この例では、price 列のデータは max 関数に基づいて集約され、sales 列のデータは sum 関数に基づいて集約されます。入力データレコードが <1, 23.0, 15> と <1, 30.2, 20> の場合、結果は <1, 30.2, 35> になります。サポートされている集計関数とデータ型のマッピング:

  • sum: DECIMAL、TINYINT、SMALLINT、INTEGER、BIGINT、FLOAT、および DOUBLE

  • min および max: DECIMAL、TINYINT、SMALLINT、INTEGER、BIGINT、FLOAT、DOUBLE、DATE、TIME、TIMESTAMP、および TIMESTAMP_LTZ

  • last_value および last_non_null_value: すべてのデータ型

  • listagg: STRING

  • bool_and および bool_or: BOOLEAN

説明
  • sum 関数のみがデータのリトラクションと削除をサポートします。特定の列でリトラクションと削除メッセージを無視する場合は、'fields.${field_name}.ignore-retract'='true' を指定できます。

  • Apache Paimon 結果テーブルがストリーミングモードで集約結果を読み取る場合は、changelog-producer オプションを lookup または full-compaction に設定する必要があります。

増分データ生成メカニズム

増分データ生成メカニズムは、changelog-producer オプションによって指定されます。Apache Paimon は、任意の入力データストリームに対して完全な増分データを生成できます。各 UPDATE_AFTER データレコードは、UPDATE_BEFORE データレコードに対応します。次の表に、すべての増分データ生成メカニズムを示します。詳細については、「Configuration」をご参照ください。

増分データ生成メカニズム

説明

None

None は changelog-producer オプションのデフォルト値です。デフォルト値を使用する場合、下流のコンシューマーの Apache Paimon ソーステーブルは、特定のデータレコードが同じプライマリキーを持つ場合にのみデータの最新状況を取得できます。この場合、下流のコンシューマーはデータを効果的に計算するために完全な増分データを学習できません。下流のコンシューマーは最新のデータを表示し、既存のデータが削除されたかどうかを判断できますが、削除されたデータに関する詳細情報を学習することはできません。

たとえば、下流のコンシューマーが列の合計を計算したい場合、コンシューマーが最新の値 5 のみを取得した場合、下流のコンシューマーは合計をどのように更新するかを判断できません。元の値が 4 の場合、合計は 1 増加する必要があります。元の値が 6 の場合、合計は 1 減少する必要があります。このタイプのコンシューマーは UPDATE_BEFORE データに敏感です。このタイプのコンシューマーの場合、changelog-producer オプションを none に設定しないことをお勧めします。ただし、他の増分データ生成メカニズムはパフォーマンスの低下を引き起こす可能性があります。

説明

下流のコンシューマーが UPDATE_BEFORE データに敏感でないデータベースである場合は、changelog-producer オプションを none に設定できます。ビジネス要件に基づいてこのオプションを指定することをお勧めします。

Input

changelog-producer オプションを input に設定すると、Apache Paimon 結果テーブルは入力データストリームをデュアルライトモードで増分データとして増分データファイルに書き込みます。

したがって、この増分データ生成メカニズムは、Change Data Capture (CDC) データなどの入力データストリームが完全である場合にのみ使用できます。

Lookup

changelog-producer オプションを lookup に設定すると、Apache Paimon 結果テーブルは、ディメンションテーブルに似たポイントクエリメカニズムを使用して、コミットセーブポイントが実行される前のセーブポイントに対応する完全な増分データを生成します。増分データ生成メカニズムにより、Apache Paimon 結果テーブルは、入力増分データが完全であるかどうかに関係なく、完全な増分データを生成できます。

lookup メカニズムは、増分データの生成において full-compaction メカニズムよりも効率的です。ただし、lookup メカニズムはより多くのリソースを消費します。

増分データの鮮度に対する要件が高いシナリオでは、lookup メカニズムを使用することをお勧めします。たとえば、数分以内の増分データが必要です。

Full Compaction

changelog-producer オプションを full-compaction に設定すると、Apache Paimon 結果テーブルは、完全なコンパクションが実行されるたびに完全な増分データを生成します。増分データ生成メカニズムにより、Apache Paimon 結果テーブルは、入力増分データが完全であるかどうかに関係なく、完全な増分データを生成できます。完全なコンパクションが実行される時間間隔は、full-compaction.delta-commits オプションによって指定されます。

lookup メカニズムと比較して、完全なコンパクションメカニズムは増分データの生成効率が低くなります。ただし、完全なコンパクションメカニズムは、データの完全なコンパクションプロセスに基づいて追加の計算を引き起こさないため、消費されるリソースが少なくなります。

増分データの鮮度に対する要件が高くないシナリオでは、完全なコンパクションメカニズムを使用することをお勧めします。たとえば、数時間以内の増分データが必要です。

書き込みモード

次の表に、Apache Paimon テーブルでサポートされている書き込みモードを示します。

書き込みモード

説明

Change-log

change-log は、Apache Paimon テーブルのデフォルトの書き込みモードです。change-log モードでは、テーブルのプライマリキーに基づいて Apache Paimon テーブルにデータを挿入、削除、および更新できます。change-log モードでは、データマージメカニズムと増分データ生成メカニズムも使用できます。

Append-only

append-only モードでは、Apache Paimon テーブルはデータの挿入のみを許可し、プライマリキーに基づく操作はサポートしません。append-only モードは、change-log モードよりも効率的です。append-only モードでは、Apache Paimon テーブルは、データの鮮度要件が高くないシナリオで Message Queue の代替として使用できます。たとえば、数時間以内のデータが必要です。

append-only モードの詳細については、「Configuration」をご参照ください。append-only モードを使用する場合は、次の点に注意してください。

  • ビジネス要件に基づいて bucket-key オプションを指定することをお勧めします。そうしないと、Apache Paimon テーブルのデータがすべての列の値に基づいてバケットに分散されます。これにより、コンピューティング効率が低下します。

  • append-only モードは、次のルールに基づいてデータ生成順序を保証します。

    1. 2 つのデータレコードが異なるパーティションから来ており、scan.plan-sort-partition オプションが指定されている場合、パーティション値が小さいデータが優先的に生成されます。そうでない場合は、先に作成されたパーティションのデータが優先的に生成されます。

    2. 2 つのデータレコードが同じパーティションの同じバケットから来ている場合、先に書き込まれたデータが優先的に生成されます。

    3. 2 つのデータレコードが同じパーティションの異なるバケットから来ている場合、異なるバケットのデータは異なる並列サブタスクで処理されるため、データ生成順序は保証できません。

CTAS および CDAS ベースのデータ同期

CREATE TABLE AS および CREATE DATABASE AS 文を実行して、単一のテーブルまたはデータベース全体から Paimon テーブルにデータとスキーマの変更をリアルタイムで同期できます。詳細については、「Paimon カタログの管理」トピックの「Apache Paimon テーブルの管理」セクションをご参照ください。

データインジェスト

YAML ドラフトで Paimon コネクタを使用して、Paimon テーブルにデータを書き込むことができます。

構文

sink:
  type: paimon
  name: Paimon Sink
  catalog.properties.metastore: filesystem
  catalog.properties.warehouse: /path/warehouse

コネクタオプション

オプション

説明

必須

データの型

デフォルト値

備考

type

コネクタタイプ。

サポートされています

STRING

デフォルト値なし

値を paimon に設定します。

name

シンクの名前。

いいえ

STRING

デフォルト値なし

N/A.

catalog.properties.metastore

Paimon カタログのメタストアタイプ。

いいえ

STRING

filesystem

有効な値:

  • filesystem

  • dlf-paimon (DLF-Legacy はサポートされていません)

catalog.properties.*

Paimon カタログのオプションをパイプラインに渡します。

いいえ

STRING

デフォルト値なし

詳細については、「Paimon カタログの管理」をご参照ください。

table.properties.*

Paimon テーブルのオプションをパイプラインに渡します。

いいえ

STRING

デフォルト値なし

詳細については、「Paimon テーブルオプション」をご参照ください。

catalog.properties.warehouse

カタログのウェアハウスルートパス。

いいえ

STRING

デフォルト値なし

このオプションは、catalog.properties.metastore が filesystem に設定されている場合にのみ有効です。

commit.user

データファイルをコミットするためのユーザー名。

いいえ

STRING

デフォルト値なし

説明

コミットの競合が発生した場合に問題のあるデプロイメントを迅速に特定するために、各デプロイメントに一意のユーザー名を使用することをお勧めします。

partition.key

各パーティションテーブルのパーティションキー。

いいえ

STRING

デフォルト値なし

テーブルはセミコロン (;) で区切り、フィールドはコンマ (,) で区切り、テーブルとそのフィールドはコロン (:) で区切ります。例: testdb.table1:id1,id2;testdb.table2:name

サンプルコード

Paimon コネクタを使用して、Paimon カタログのタイプに基づいて Paimon テーブルにデータをインジェストできます。

  • Paimon カタログタイプが filesystem の場合、Alibaba Cloud OSS にデータを書き込みます。サンプルコード:

source:
  type: mysql
  name: MySQL Source
  hostname: ${secret_values.mysql.hostname}
  port: ${mysql.port}
  username: ${secret_values.mysql.username}
  password: ${secret_values.mysql.password}
  tables: ${mysql.source.table}
  server-id: 8601-8604

sink:
  type: paimon
  name: Paimon Sink
  catalog.properties.metastore: filesystem
  catalog.properties.warehouse: oss://default/test
  catalog.properties.fs.oss.endpoint: oss-cn-beijing-internal.aliyuncs.com
  catalog.properties.fs.oss.accessKeyId: xxxxxxxx
  catalog.properties.fs.oss.accessKeySecret: xxxxxxxx

catalog.properties で始まるオプションの詳細については、「Paimon Filesystem カタログの作成」をご参照ください。

  • Paimon カタログタイプが rest の場合、DLF にデータを書き込みます。サンプルコード:

source:
  type: mysql
  name: MySQL Source
  hostname: ${secret_values.mysql.hostname}
  port: ${mysql.port}
  username: ${secret_values.mysql.username}
  password: ${secret_values.mysql.password}
  tables: ${mysql.source.table}
  server-id: 8601-8604

sink:
  type: paimon
  name: Paimon Sink
sink:
  type: paimon
  name: Paimon Sink
  catalog.properties.metastore: rest
  catalog.properties.uri: dlf_uri
  catalog.properties.warehouse: your_warehouse
  catalog.properties.token.provider: dlf

catalog.properties で始まるオプションの詳細については、「Flink CDC を使用した DLF へのアクセス」をご参照ください。

よくある質問