すべてのプロダクト
Search
ドキュメントセンター

Realtime Compute for Apache Flink:ストリーミングデータレイクハウス Paimon

最終更新日:Jan 22, 2026

Paimon コネクタは、ストリーミングデータレイクハウスコネクタであり、Paimon カタログとともに使用できます。このトピックでは、Paimon コネクタの使用方法について説明します。

背景情報

Apache Paimon は、ストリーミング処理とバッチ処理を統合するレイクストレージフォーマットです。高スループットの書き込みと低レイテンシーのクエリをサポートします。Alibaba Cloud オープンソースビッグデータプラットフォーム E-MapReduce 上の Flink、Spark、Hive、Trino などの一般的なコンピュートエンジンは、Paimon と緊密に統合されています。Apache Paimon を使用して、Hadoop 分散ファイルシステム (HDFS) または Object Storage Service (OSS) 上にデータレイクストレージサービスを構築し、これらのコンピュートエンジンに接続してデータレイク分析を実行できます。詳細については、「Apache Paimon」をご参照ください。

カテゴリ

詳細

サポートされるタイプ

ソーステーブル、ディメンションテーブル、結果テーブル、データインジェスト先

実行モード

ストリーミングモードとバッチモード

データフォーマット

サポートされていません

特定の監視メトリック

なし

API タイプ

SQL、データインジェスト用の YAML ジョブ

結果テーブルのデータ更新または削除をサポート

はい

機能

Apache Paimon は、以下のコア機能を提供します:

  • HDFS または OSS に基づいて、低コストで軽量なデータレイクストレージサービスを構築します。

  • ストリーミングモードとバッチモードで大規模なデータセットを読み書きします。

  • 秒から分単位のデータ鮮度でバッチクエリとオンライン分析処理 (OLAP) クエリを実行します。

  • 増分データを消費および生成します。Paimon は、従来のオフラインデータウェアハウスおよび最新のストリーミングデータウェアハウスのストレージレイヤーとして使用できます。

  • データを事前集約して、ストレージコストと下流の計算負荷を削減します。

  • データの履歴バージョンを取得します。

  • データを効率的にフィルタリングします。

  • スキーマ進化をサポートします。

制限事項

  • Paimon コネクタは、Flink コンピュートエンジン Ververica Runtime (VVR) バージョン 6.0.6 以降でのみサポートされます。

  • 次の表に、Paimon と VVR のバージョンマッピングを示します。

    Apache Paimon バージョン

    VVR バージョン

    1.3

    11.4

    1.2

    11.2、11.3

    1.1

    11

    1.0

    8.0.11

    0.9

    8.0.7, 8.0.8, 8.0.9, および 8.0.10

    0.8

    8.0.6

    0.7

    8.0.5

    0.6

    8.0.4

    0.6

    8.0.3

SQL

Paimon コネクタは、SQL ジョブでソーステーブルまたは結果テーブルとして使用できます。

構文

  • Paimon カタログで Paimon テーブルを作成する場合、connector パラメーターを指定する必要はありません。Paimon テーブルを作成するための構文は次のとおりです:

    CREATE TABLE `<YOUR-PAIMON-CATALOG>`.`<YOUR-DB>`.paimon_table (
      id BIGINT,
      data STRING,
      PRIMARY KEY (id) NOT ENFORCED
    ) WITH (
      ...
    );
    説明

    Paimon カタログに Paimon テーブルをすでに作成している場合は、再度作成せずに直接使用できます。

  • 別のカタログで Paimon 一時テーブルを作成する場合は、connector パラメーターと Paimon テーブルのストレージパスを指定する必要があります。Paimon テーブルを作成するための構文は次のとおりです:

    CREATE TEMPORARY TABLE paimon_table (
      id BIGINT,
      data STRING,
      PRIMARY KEY (id) NOT ENFORCED
    ) WITH (
      'connector' = 'paimon',
      'path' = '<path-to-paimon-table-files>',
      'auto-create' = 'true', -- 指定されたパスに Paimon テーブルのデータファイルが存在しない場合、ファイルが自動的に作成されます。
      ...
    );
    説明
    • パスの例'path' = 'oss://<bucket>/test/order.db/orders'.db サフィックスを省略しないでください。Paimon はこの命名規則を使用してデータベースを識別します。

    • 同じテーブルに書き込む複数のジョブは、同じパス設定を使用する必要があります。

    • 2つのパス値が異なる場合、Paimon はそれらを異なるテーブルと見なします。物理パスが同じであっても、カタログ設定が一致しないと、同時書き込み競合、コンパクション失敗、およびデータ損失が発生する可能性があります。たとえば、oss://b/testoss://b/test/ は、物理パスが同じであっても、末尾のスラッシュのために異なるパスと見なされます。

WITH パラメーター

パラメーター

説明

データの型

必須

デフォルト値

connector

テーブルタイプ。

String

いいえ

なし

  • Paimon カタログで Paimon テーブルを作成する場合、このパラメーターを指定する必要はありません。

  • 別のカタログで Paimon 一時テーブルを作成する場合は、このパラメーターを paimon に設定します。

path

テーブルストレージパス。

String

いいえ

なし

  • Paimon カタログで Paimon テーブルを作成する場合、このパラメーターを指定する必要はありません。

  • 別のカタログで Paimon 一時テーブルを作成する場合、このパラメーターは HDFS または OSS 内のテーブルのストレージディレクトリを指定します。

auto-create

Paimon 一時テーブルを作成する際に、指定されたパスに Paimon テーブルファイルが存在しない場合にファイルを自動的に作成するかどうかを指定します。

Boolean

いいえ

false

有効値:

  • false (デフォルト):指定されたパスに Paimon テーブルファイルが存在しない場合、エラーが報告されます。

  • true:指定されたパスが存在しない場合、Flink システムは自動的に Paimon テーブルファイルを作成します。

bucket

各パーティションのバケット数。

Integer

いいえ

1

Paimon テーブルに書き込まれたデータは、bucket-key に基づいて各バケットに分散されます。

説明

各バケットのデータ量は 5 GB 未満にすることを推奨します。

bucket-key

バケット化のためのキー列。

String

いいえ

なし

Paimon テーブルに書き込まれるデータを異なるバケットに分散させるための基準となる列を指定します。

列名はカンマ (,) で区切ります。例:'bucket-key' = 'order_id,cust_id' は、order_id と cust_id 列の値に基づいてデータを分散します。

説明
  • このパラメーターが指定されていない場合、データはプライマリキーに基づいて分散されます。

  • Paimon テーブルにプライマリキーが指定されていない場合、データはすべての列の値に基づいて分散されます。

changelog-producer

増分データを生成するメカニズム。

String

いいえ

none

Paimon は、任意の入力データストリームに対して完全な増分データ (すべての update_after データに対応する update_before データがある) を生成でき、下流のコンシューマーにとって便利です。増分データ生成メカニズムの有効値:

  • none (デフォルト):追加の増分データは生成されません。下流は引き続き Paimon テーブルをストリーム読み取りできますが、読み取られる増分データは不完全です。対応する update_before データなしで、update_after データのみが含まれます。

  • Input:入力データストリームは、増分データを含む増分データファイルにも書き込まれます。

  • full-compaction:各フルコンパクション中に完全な増分データが生成されます。

  • lookup:各スナップショットがコミットされる前に完全な増分データが生成されます。

増分データ生成メカニズムの選択方法の詳細については、「増分データ生成メカニズム」をご参照ください。

full-compaction.delta-commits

フルコンパクションの最大間隔。

Integer

いいえ

なし

このパラメーターは、フルコンパクションを実行する必要があるスナップショットコミットの数を指定します。

lookup.cache-max-memory-size

Paimon ディメンションテーブルのメモリキャッシュサイズ。

String

いいえ

256 MB

このパラメーターは、ディメンションテーブルと lookup changelog-producer の両方のキャッシュサイズに影響します。両方のメカニズムのキャッシュサイズは、このパラメーターによって設定されます。

merge-engine

同じプライマリキーを持つデータをマージするメカニズム。

String

いいえ

deduplicate

有効値:

  • deduplicate:最新のレコードのみが保持されます。

  • partial-update:同じプライマリキーを持つ既存のデータを、最新データの非 null 列で更新します。他の列は変更されません。

  • aggregation:指定された集計関数を使用して事前集約を実行します。

データマージメカニズムの詳細な分析については、「データマージメカニズム」をご参照ください。

partial-update.ignore-delete

削除 (-D) メッセージを無視するかどうかを指定します。

Boolean

いいえ

false

有効値:

  • true:削除メッセージを無視します。

  • false:削除メッセージを無視しません。sequence.field などの設定項目を使用して、シンクが削除データを処理するポリシーを設定する必要があります。そうしないと、IllegalStateException または IllegalArgumentException エラーがスローされる可能性があります。

説明
  • Realtime Compute for Apache Flink VVR 8.0.6 以前では、このパラメーターは merge-engine = partial-update の部分更新シナリオでのみ有効です。

  • Realtime Compute for Apache Flink VVR 8.0.7 以降では、このパラメーターは非部分更新シナリオと互換性があり、ignore-delete パラメーターと同じ機能を持ちます。ignore-delete を代わりに使用することを推奨します。

  • このパラメーターを有効にするかどうかは、ビジネスシナリオと削除データが期待どおりであるかどうかに基づいて決定してください。削除データが表すジョブのセマンティクスが期待どおりでない場合、エラーをスローする方が良い選択です。

ignore-delete

削除 (-D) メッセージを無視するかどうかを指定します。

Boolean

いいえ

false

有効値は partial-update.ignore-delete と同じです。

説明
  • このパラメーターは、Realtime Compute for Apache Flink VVR 8.0.7 以降でのみサポートされます。

  • このパラメーターは `partial-update.ignore-delete` と同じ機能を持ちます。`ignore-delete` パラメーターを使用し、両方のパラメーターを同時に設定しないことを推奨します。

partition.default-name

パーティションのデフォルト名。

String

いいえ

__DEFAULT_PARTITION__

パーティションキー列の値が null または空の文字列の場合、このデフォルト名がパーティション名として使用されます。

partition.expiration-check-interval

期限切れのパーティションをチェックする間隔。

String

いいえ

1h

詳細については、「パーティションの自動有効期限切れを設定するにはどうすればよいですか?」をご参照ください。

partition.expiration-time

パーティションの有効期限。

String

いいえ

なし

パーティションの生存時間がこの値を超えると、パーティションは期限切れになります。デフォルトでは、パーティションは決して期限切れになりません。

パーティションの生存時間は、そのパーティション値から計算されます。詳細については、「パーティションの自動有効期限切れを設定するにはどうすればよいですか?」をご参照ください。

partition.timestamp-formatter

時間文字列を UNIX タイムスタンプに変換するためのフォーマット文字列。

String

いいえ

なし

パーティション値からパーティションの生存時間を抽出するためのフォーマットを設定します。詳細については、「パーティションの自動有効期限切れを設定するにはどうすればよいですか?」をご参照ください。

partition.timestamp-pattern

パーティション値を時間文字列に変換するためのフォーマット文字列。

String

いいえ

なし

パーティション値からパーティションの生存時間を抽出するためのフォーマットを設定します。詳細については、「パーティションの自動有効期限切れを設定するにはどうすればよいですか?」をご参照ください。

scan.bounded.watermark

Paimon ソーステーブルによって生成されたデータのウォーターマークがこの値を超えると、Paimon ソーステーブルはデータの生成を停止します。

Long

いいえ

なし

なし。

scan.mode

Paimon ソーステーブルのコンシューマオフセットを指定します。

String

いいえ

default

詳細については、「Paimon ソーステーブルのコンシューマオフセットを設定するにはどうすればよいですか?」をご参照ください。

scan.snapshot-id

Paimon ソーステーブルが消費を開始するスナップショットを指定します。

Integer

いいえ

なし

詳細については、「Paimon ソーステーブルのコンシューマオフセットを設定するにはどうすればよいですか?」をご参照ください。

scan.timestamp-millis

Paimon ソーステーブルが消費を開始する時点を指定します。

Integer

いいえ

なし

詳細については、「Paimon ソーステーブルのコンシューマオフセットを設定するにはどうすればよいですか?」をご参照ください。

snapshot.num-retained.max

期限切れにならずに保持する最新のスナップショットの最大数。

Integer

いいえ

2147483647

この設定または `snapshot.time-retained` が満たされ、かつ `snapshot.num-retained.min` も満たされた場合、スナップショットは期限切れになります。

snapshot.num-retained.min

期限切れにならずに保持する最新のスナップショットの最小数。

Integer

いいえ

10

なし。

snapshot.time-retained

スナップショットが期限切れになるまでの期間。

String

いいえ

1h

この設定または `snapshot.num-retained.max` が満たされ、かつ `snapshot.num-retained.min` も満たされた場合、スナップショットは期限切れになります。

write-mode

Paimon テーブルの書き込みモード。

String

いいえ

change-log

有効値:

  • change-log:Paimon テーブルは、プライマリキーに基づいてデータの挿入、削除、更新をサポートします。

  • append-only:Paimon テーブルはデータの挿入のみを受け付け、プライマリキーをサポートしません。このモードは change-log モードよりも効率的です。

書き込みモードの詳細については、「書き込みモード」をご参照ください。

scan.infer-parallelism

Paimon ソーステーブルの並列度を自動的に推論するかどうかを指定します。

Boolean

いいえ

true

有効値:

  • true:Paimon ソーステーブルの並列度は、バケット数に基づいて自動的に推論されます。

  • false:VVP で設定されたデフォルトの並列度が使用されます。エキスパートモードでは、ユーザーが設定した並列度が使用されます。

scan.parallelism

Paimon ソーステーブルの並列度。

Integer

いいえ

なし

説明

デプロイメント詳細 > リソース設定 タブで、リソースモード が [エキスパート] に設定されている場合、このパラメーターは有効になりません。

sink.parallelism

Paimon 結果テーブルの並列度。

Integer

いいえ

なし

説明

デプロイメント詳細 > リソース設定 タブで、リソースモード が [エキスパート] に設定されている場合、このパラメーターは有効になりません。

sink.clustering.by-columns

Paimon 結果テーブルへの書き込みのためのクラスタリング列を指定します。

String

いいえ

なし

Paimon append-only テーブル (非プライマリキーテーブル) の場合、バッチジョブでこのパラメーターを設定すると、クラスタリング書き込み機能が有効になります。この機能は、特定列のデータ値の範囲に基づいてクラスタリングを行い、テーブルのクエリ速度を向上させます。

複数の列名はカンマ (,) で区切ります。例:'col1,col2'

クラスタリングの詳細については、「Apache Paimon 公式ドキュメント」をご参照ください。

sink.delete-strategy​

システムがリトラクション (-D/-U) メッセージを正しく処理することを保証するための検証ポリシーを設定します。

​​

Enum

いいえ

NONE

検証ポリシーの有効値と、リトラクションメッセージを処理するシンクオペレーターの期待される動作は次のとおりです:

  • ​NONE (デフォルト):検証は実行されません。

  • IGNORE_DELETE:シンクオペレーターは -U および -D メッセージを無視する必要があります。リトラクションは発生しません。

  • NON_PK_FIELD_TO_NULL:シンクオペレーターは -U メッセージを無視する必要があります。ただし、-D メッセージを受信すると、プライマリキーの値を保持し、スキーマ内の他の非プライマリキーの値をリトラクトします。

    これは主に、複数のシンクが部分更新のために同時に同じテーブルに書き込むシナリオで使用されます。

  • DELETE_ROW_ON_PK:シンクオペレーターは -U メッセージを無視する必要があります。ただし、-D メッセージを受信すると、プライマリキーに対応する行を削除します。

  • CHANGELOG_STANDARD:シンクオペレーターは、-U または -D データのいずれかを受信したときに、プライマリキーに対応する行を削除する必要があります。

説明
  • このパラメーターは、Realtime Compute for Apache Flink VVR 8.0.8 以降でのみサポートされます。

  • リトラクションメッセージを処理する際の Paimon シンクの動作は、実際には `ignore-delete` や `merge-engine` などの他の設定項目によって決定されます。この設定項目は、この動作に直接影響しません。代わりに、動作が期待されるポリシーを満たしているかどうかを検証します。動作が期待されるポリシーを満たしていない場合、検証ステップは終了し、ジョブのエラーメッセージで `ignore-delete` や `merge-engine` などの他の設定項目を期待どおりに修正する方法が示されます。

説明

設定項目の詳細については、「Apache Paimon 公式ドキュメント」をご参照ください。

機能詳細

データ鮮度と一貫性の保証

Paimon 結果テーブルは、2フェーズコミットプロトコルを使用して、Flink ジョブの各チェックポイント中に書き込まれたデータをコミットします。したがって、データ鮮度は Flink ジョブのチェックポイント間隔と同じです。各コミットは最大で2つのスナップショットを生成します。

2つの Flink ジョブが同時に同じ Paimon テーブルに書き込む場合、2つのジョブのデータが同じバケットに書き込まれなければ、シリアライズ可能な一貫性が保証されます。2つのジョブのデータが同じバケットに書き込まれる場合、スナップショット分離の一貫性のみが保証されます。これは、テーブル内のデータが両方のジョブの結果の混合になる可能性があることを意味しますが、データは失われません。

データマージメカニズム

Paimon 結果テーブルが同じプライマリキーを持つ複数のレコードを受信した場合、これらのレコードを単一のレコードにマージして、プライマリキーの一意性を維持します。merge-engine パラメーターを設定することで、データマージの動作を指定できます。次の表に、データマージメカニズムを示します。

マージメカニズム

詳細

重複排除

重複排除メカニズムは、デフォルトのデータマージメカニズムです。同じプライマリキーを持つ複数のレコードに対して、Paimon 結果テーブルは最新のレコードのみを保持し、その他は破棄します。

説明

最新のレコードが削除メッセージの場合、そのプライマリキーを持つすべてのレコードが破棄されます。

部分更新

partial-update メカニズムを指定することで、複数のメッセージを通じてデータを段階的に更新し、最終的に完全なデータを取得できます。具体的には、同じプライマリキーを持つ新しいデータは古いデータを上書きしますが、null 値を持つ列は上書きされません。

たとえば、Paimon 結果テーブルが次の3つのレコードを順に受信するとします:

  • <1, 23.0, 10, NULL>

  • <1, NULL, NULL, 'This is a book'>

  • <1, 25.2, NULL, NULL>

最初の列はプライマリキーです。最終結果は <1, 25.2, 10, 'This is a book'> となります。

説明
  • partial-update の結果をストリーム読み取りするには、changelog-producer パラメーターを `lookup` または `full-compaction` に設定する必要があります。

  • partial-update メカニズムは削除メッセージを処理できません。partial-update.ignore-delete パラメーターを設定して削除メッセージを無視できます。

集約

一部のシナリオでは、集約された値のみに関心がある場合があります。集約メカニズムは、指定した集計関数に基づいて同じプライマリキーを持つデータを集約します。プライマリキーの一部ではない各列について、fields.<field-name>.aggregate-function を使用して集計関数を指定する必要があります。そうしないと、その列にはデフォルトで last_non_null_value 集計関数が使用されます。たとえば、次の Paimon テーブル定義を考えます。

CREATE TABLE MyTable (
  product_id BIGINT,
  price DOUBLE,
  sales BIGINT,
  PRIMARY KEY (product_id) NOT ENFORCED
) WITH (
  'merge-engine' = 'aggregation',
  'fields.price.aggregate-function' = 'max',
  'fields.sales.aggregate-function' = 'sum'
);

price 列は max 関数で集約され、sales 列は sum 関数で集約されます。2つの入力レコード <1, 23.0, 15> と <1, 30.2, 20> が与えられた場合、最終結果は <1, 30.2, 35> となります。現在サポートされている集計関数とそれに対応するデータ型は次のとおりです:

  • sum:DECIMAL、TINYINT、SMALLINT、INTEGER、BIGINT、FLOAT、DOUBLE をサポートします。

  • min および max:DECIMAL、TINYINT、SMALLINT、INTEGER、BIGINT、FLOAT、DOUBLE、DATE、TIME、TIMESTAMP、TIMESTAMP_LTZ をサポートします。

  • last_value および last_non_null_value:すべてのデータ型をサポートします。

  • listagg:STRING をサポートします。

  • bool_and および bool_or:BOOLEAN をサポートします。

説明
  • sum 関数のみがデータのリトラクションと削除をサポートします。他の集計関数はサポートしません。特定の列でリトラクションと削除メッセージを無視する必要がある場合は、'fields.${field_name}.ignore-retract'='true' を設定できます。

  • 集約の結果をストリーム読み取りするには、changelog-producer パラメーターを `lookup` または `full-compaction` に設定する必要があります。

増分データ生成メカニズム

changelog-producer パラメーターを適切な増分データ生成メカニズムに設定することで、Paimon は任意の入力データストリームに対して完全な増分データを生成できます。完全な増分データとは、すべての update_after データに対応する update_before データがあることを意味します。以下にすべての増分データ生成メカニズムをリストします。詳細については、「Apache Paimon 公式ドキュメント」をご参照ください。

メカニズム

詳細

None

changelog-producer を `none` (デフォルト値) に設定すると、下流の Paimon ソーステーブルは同じプライマリキーのデータの最新の状態しか見ることができません。しかし、この最新の状態では、下流のコンシューマーが正しい計算のために完全な増分データを簡単に理解することはできません。これは、対応するデータが削除されたか、最新のデータが何かを判断できるだけで、変更前のデータが何であったかは判断できないためです。

たとえば、下流のコンシューマーが列の合計を計算する必要があるとします。コンシューマーが最新のデータである 5 しか見ない場合、合計をどのように更新すればよいか判断できません。以前のデータが 4 であれば、合計を 1 増やすべきです。以前のデータが 6 であれば、合計を 1 減らすべきです。このようなコンシューマーは update_before データに敏感です。増分データ生成メカニズムを `none` に設定しないことを推奨します。ただし、他のメカニズムはパフォーマンスのオーバーヘッドを伴います。

説明

データベースなどの下流のコンシューマーが update_before データに敏感でない場合は、増分データ生成メカニズムを `none` に設定できます。したがって、実際のニーズに基づいてメカニズムを設定することを推奨します。

Input

changelog-producer を `input` に設定すると、Paimon 結果テーブルは入力データストリームをメインテーブルと増分データファイルの両方に書き込みます。

したがって、このメカニズムは、入力データストリーム自体が変更データキャプチャ (CDC) データなどの完全な増分データである場合にのみ使用できます。

Lookup

changelog-producer を `lookup` に設定すると、Paimon 結果テーブルはディメンションテーブルのポイントクエリに似たメカニズムを使用して、各コミット前に現在のスナップショットの完全な増分データを生成します。このメカニズムは、入力データが完全な増分データであるかどうかに関係なく、完全な増分データを生成できます。

以下で説明する Full Compaction メカニズムと比較して、Lookup メカニズムは増分データの生成においてより良いタイムラインを提供しますが、全体的により多くのリソースを消費します。

分単位の鮮度など、データ鮮度に対する要件が高い場合にこのメカニズムを使用することを推奨します。

Full Compaction

changelog-producer を `full-compaction` に設定すると、Paimon 結果テーブルは各フルコンパクション中に完全な増分データを生成します。このメカニズムは、入力データが完全な増分データであるかどうかに関係なく、完全な増分データを生成できます。フルコンパクションの間隔は full-compaction.delta-commits パラメーターで指定されます。

Lookup メカニズムと比較して、Full Compaction メカニズムは増分データの生成におけるタイムラインが劣ります。しかし、データのフルコンパクションプロセスを利用するため、追加の計算を生成せず、全体的により少ないリソースを消費します。

時間単位の鮮度など、データ鮮度に対する要件が高くない場合にこのメカニズムを使用することを推奨します。

書き込みモード

Paimon テーブルは現在、以下の書き込みモードをサポートしています。

モード

詳細

Change-log

change-log 書き込みモードは、Paimon テーブルのデフォルトの書き込みモードです。このモードは、プライマリキーに基づいてデータの挿入、削除、更新をサポートします。また、このモードでは前述のデータマージおよび増分データ生成メカニズムも使用できます。

Append-only

append-only 書き込みモードは、データの挿入のみをサポートし、プライマリキーをサポートしません。このモードは change-log モードよりも効率的であり、分単位の鮮度など、データ鮮度の要件が高くないシナリオでメッセージキューの代替として使用できます。

append-only 書き込みモードの詳細については、「Apache Paimon 公式ドキュメント」をご参照ください。append-only 書き込みモードを使用する際は、次の点に注意してください:

  • 実際のニーズに基づいて bucket-key パラメーターを設定することを推奨します。そうしないと、Paimon テーブルはすべての列の値に基づいてバケット化を行い、効率が低下します。

  • append-only 書き込みモードは、ある程度データの出力順序を保証できます。具体的な出力順序は次のとおりです:

    1. 2つのレコードが異なるパーティションからのものである場合、scan.plan-sort-partition パラメーターが設定されていれば、パーティション値が小さいレコードが先に出力されます。そうでない場合は、先に作成されたパーティションのデータが先に出力されます。

    2. 2つのレコードが同じパーティションの同じバケットからのものである場合、先に書き込まれたデータが先に出力されます。

    3. 2つのレコードが同じパーティションの異なるバケットからのものである場合、異なるバケットは異なる同時実行タスクによって処理されるため、出力順序は保証されません。

CTAS および CDAS の宛先として

Paimon テーブルは、単一テーブルまたはデータベース全体のレベルでデータのリアルタイム同期をサポートします。同期中に上流テーブルのスキーマが変更された場合、その変更は Paimon テーブルにもリアルタイムで同期されます。詳細については、「Paimon テーブルの管理」および「Paimon カタログの管理」をご参照ください。

データインジェスト

Paimon コネクタは、データインジェストのための YAML ジョブ開発でシンクとして使用できます。

構文

sink:
  type: paimon
  name: Paimon Sink
  catalog.properties.metastore: filesystem
  catalog.properties.warehouse: /path/warehouse

設定項目

パラメーター

説明

必須

データの型

デフォルト値

type

コネクタタイプ。

はい

STRING

なし

静的フィールドは paimon に設定されます。

name

シンク名。

いいえ

STRING

なし

シンクの名前。

catalog.properties.metastore

Paimon カタログのタイプ。

いいえ

STRING

filesystem

有効値:

  • filesystem (デフォルト)

  • rest (DLF のみがサポートされます。DLF-Legacy はサポートされません。)

catalog.properties.*

Paimon カタログを作成するためのパラメーター。

いいえ

STRING

なし

詳細については、「Paimon カタログの管理」をご参照ください。

table.properties.*

Paimon テーブルを作成するためのパラメーター。

いいえ

STRING

なし

詳細については、「Paimon テーブルオプション」をご参照ください。

catalog.properties.warehouse

ファイルストレージのルートディレクトリ。

いいえ

STRING

なし

このパラメーターは、catalog.properties.metastore が `filesystem` に設定されている場合にのみ有効です。

commit.user-prefix

データファイルをコミットするためのユーザー名プレフィックス。

いいえ

STRING

なし

説明

コミット競合が発生した際に競合するジョブを容易に特定できるよう、異なるジョブには異なるユーザー名を設定することを推奨します。

partition.key

各パーティションテーブルのパーティションフィールド。

いいえ

STRING

なし

異なるテーブルはセミコロン (;) で、異なるフィールドはカンマ (,) で、テーブルとフィールドはコロン (:) で区切ります。例:testdb.table1:id1,id2;testdb.table2:name

sink.cross-partition-upsert.tables

プライマリキーにすべてのパーティションフィールドが含まれていない、パーティション間の更新が必要なテーブルを指定します。

いいえ

STRING

なし

パーティション間で更新するテーブル。

  • フォーマット:テーブル名はセミコロン (;) で区切ります。

  • パフォーマンスに関する推奨事項:この操作は大量の計算リソースを消費します。これらのテーブルには個別のジョブを作成してください。

重要
  • 対象となるすべてのテーブルをリストアップしてください。テーブル名を省略すると、データの重複が発生します。

以下の例は、Paimon カタログのタイプに基づいて、Paimon をデータインジェストシンクとして設定する方法を示しています。

  • Paimon カタログが filesystem の場合に Alibaba Cloud OSS に書き込むための設定例:

    source:
      type: mysql
      name: MySQL Source
      hostname: ${secret_values.mysql.hostname}
      port: ${mysql.port}
      username: ${secret_values.mysql.username}
      password: ${secret_values.mysql.password}
      tables: ${mysql.source.table}
      server-id: 8601-8604
    
    sink:
      type: paimon
      name: Paimon Sink
      catalog.properties.metastore: filesystem
      catalog.properties.warehouse: oss://default/test
      catalog.properties.fs.oss.endpoint: oss-cn-beijing-internal.aliyuncs.com
      catalog.properties.fs.oss.accessKeyId: xxxxxxxx
      catalog.properties.fs.oss.accessKeySecret: xxxxxxxx

    catalog.properties で始まるパラメーターの詳細については、「Paimon Filesystem カタログの作成」をご参照ください。

  • Paimon カタログが rest の場合に Alibaba Cloud DLF 2.5 に書き込むための設定例:

    source:
      type: mysql
      name: MySQL Source
      hostname: ${secret_values.mysql.hostname}
      port: ${mysql.port}
      username: ${secret_values.mysql.username}
      password: ${secret_values.mysql.password}
      tables: ${mysql.source.table}
      server-id: 8601-8604
    
    sink:
      type: paimon
      name: Paimon Sink
      catalog.properties.metastore: rest
      catalog.properties.uri: dlf_uri
      catalog.properties.warehouse: your_warehouse
      catalog.properties.token.provider: dlf

    catalog.properties で始まるパラメーターの詳細については、「Flink CDC カタログ設定パラメーター」をご参照ください。

スキーマ進化

現在、データインジェストシンクとしての Paimon は、以下のスキーマ進化イベントをサポートしています:

  • CREATE TABLE EVENT

  • ADD COLUMN EVENT

  • ALTER COLUMN TYPE EVENT (プライマリキー列の型変更はサポートされていません)

  • RENAME COLUMN EVENT

  • DROP COLUMN EVENT

  • TRUNCATE TABLE EVENT

  • DROP TABLE EVENT

説明

下流の Paimon テーブルが既に存在する場合、その既存のスキーマが書き込みに使用されます。システムはテーブルを再度作成しようとはしません。

よくある質問