すべてのプロダクト
Search
ドキュメントセンター

Realtime Compute for Apache Flink:Paimon コネクタ

最終更新日:Dec 04, 2025

Paimon コネクタは、ストリーミングデータレイクハウスコネクタであり、Paimon カタログと併用できます。 このトピックでは、Paimon コネクタの使用方法について説明します。

背景情報

Apache Paimon は、ストリーミング処理とバッチ処理を統合するレイクストレージフォーマットです。 高スループットの書き込みと低レイテンシーのクエリをサポートします。 Alibaba Cloud のオープンソースビッグデータプラットフォームである E-MapReduce 上の Flink、Spark、Hive、Trino などの一般的なコンピュートエンジンは、Paimon と緊密に統合されています。 Apache Paimon を使用して、Hadoop 分散ファイルシステム (HDFS) または Object Storage Service (OSS) 上にデータレイクストレージサービスを構築し、これらのコンピュートエンジンに接続してデータレイク分析を行うことができます。 詳細については、「Apache Paimon」をご参照ください。

カテゴリ

詳細

サポートされるタイプ

ソーステーブル、ディメンションテーブル、結果テーブル、データインジェスト先

実行モード

ストリーミングモードとバッチモード

データフォーマット

サポートされていません

特定のモニタリングメトリック

なし

API タイプ

SQL、データインジェスト用の YAML ジョブ

結果テーブルでのデータの更新または削除のサポート

はい

機能

Apache Paimon は、以下のコア機能を提供します。

  • HDFS または OSS に基づいて、低コストで軽量なデータレイクストレージサービスを構築します。

  • ストリーミングモードとバッチモードで大規模なデータセットを読み書きします。

  • 秒から分単位のデータ鮮度でバッチクエリとオンライン分析処理 (OLAP) クエリを実行します。

  • 増分データを消費および生成します。 Paimon は、従来のオフラインデータウェアハウスと最新のストリーミングデータウェアハウスのストレージレイヤーとして使用できます。

  • データを事前集約して、ストレージコストと下流のコンピューティング負荷を削減します。

  • データの履歴バージョンを取得します。

  • データを効率的にフィルタリングします。

  • スキーマ進化をサポートします。

制限事項

  • Paimon コネクタは、Flink コンピュートエンジンの Ververica Runtime (VVR) バージョン 6.0.6 以降でのみサポートされます。

  • 次の表に、Paimon と VVR のバージョンマッピングを示します。

    Apache Paimon バージョン

    VVR バージョン

    1.1

    11

    1.0

    8.0.11

    0.9

    8.0.7、8.0.8、8.0.9、8.0.10

    0.8

    8.0.6

    0.7

    8.0.5

    0.6

    8.0.4

    0.6

    8.0.3

SQL

Paimon コネクタは、SQL ジョブでソーステーブルまたは結果テーブルとして使用できます。

構文

  • Paimon カタログで Paimon テーブルを作成する場合、connector パラメーターを指定する必要はありません。 Paimon テーブルを作成するための構文は次のとおりです。

    CREATE TABLE `<YOUR-PAIMON-CATALOG>`.`<YOUR-DB>`.paimon_table (
      id BIGINT,
      data STRING,
      PRIMARY KEY (id) NOT ENFORCED
    ) WITH (
      ...
    );
    説明

    Paimon カタログに Paimon テーブルをすでに作成している場合は、再度作成することなく直接使用できます。

  • 別のカタログで Paimon 一時テーブルを作成する場合は、connector パラメーターと Paimon テーブルのストレージパスを指定する必要があります。 Paimon テーブルを作成するための構文は次のとおりです。

    CREATE TEMPORARY TABLE paimon_table (
      id BIGINT,
      data STRING,
      PRIMARY KEY (id) NOT ENFORCED
    ) WITH (
      'connector' = 'paimon',
      'path' = '<path-to-paimon-table-files>',
      'auto-create' = 'true', -- 指定されたパスに Paimon テーブルデータファイルが存在しない場合、ファイルが自動的に作成されます。
      ...
    );
    説明
    • パスの例'path' = 'oss://<bucket>/test/order.db/orders'.db サフィックスを省略しないでください。 Paimon はこの命名規則を使用してデータベースを識別します。

    • 同じテーブルに書き込む複数のジョブは、同じパス構成を使用する必要があります。

    • 2 つのパス値が異なる場合、Paimon はそれらを異なるテーブルと見なします。 物理パスが同じであっても、カタログ構成が一致しないと、同時書き込みの競合、コンパクションの失敗、データ損失が発生する可能性があります。 たとえば、oss://b/testoss://b/test/ は、物理パスが同じであっても、末尾のスラッシュがあるため異なるパスと見なされます。

WITH パラメーター

パラメーター

説明

データの型

必須

デフォルト値

connector

テーブルのタイプ。

String

いいえ

なし

  • Paimon カタログで Paimon テーブルを作成する場合、このパラメーターを指定する必要はありません。

  • 別のカタログで Paimon 一時テーブルを作成する場合は、このパラメーターを paimon に設定します。

path

テーブルのストレージパス。

String

いいえ

なし

  • Paimon カタログで Paimon テーブルを作成する場合、このパラメーターを指定する必要はありません。

  • 別のカタログで Paimon 一時テーブルを作成する場合、このパラメーターは HDFS または OSS 内のテーブルのストレージディレクトリを指定します。

auto-create

Paimon 一時テーブルを作成する際に、指定されたパスに Paimon テーブルファイルが存在しない場合にファイルを自動的に作成するかどうかを指定します。

ブール値

いいえ

false

有効値:

  • false (デフォルト): 指定されたパスに Paimon テーブルファイルが存在しない場合、エラーが報告されます。

  • true: 指定されたパスが存在しない場合、Flink システムは自動的に Paimon テーブルファイルを作成します。

bucket

各パーティションのバケット数。

Integer

いいえ

1

Paimon テーブルに書き込まれたデータは、bucket-key に基づいて各バケットに分散されます。

説明

各バケットのデータ量は 5 GB 未満にすることを推奨します。

bucket-key

バケット化のためのキー列。

String

いいえ

なし

Paimon テーブルに書き込まれるデータを、どの列に基づいて異なるバケットに分散するかを指定します。

列名はカンマ (,) で区切ります。例: 'bucket-key' = 'order_id,cust_id' は、order_id 列と cust_id 列の値に基づいてデータを分散します。

説明
  • このパラメーターが指定されていない場合、データはプライマリキーに基づいて分散されます。

  • Paimon テーブルにプライマリキーが指定されていない場合、データはすべての列の値に基づいて分散されます。

changelog-producer

増分データを生成するメカニズム。

String

いいえ

none

Paimon は、任意の入力データストリームに対して完全な増分データ (すべての `update_after` データに対応する `update_before` データがある) を生成でき、下流のコンシューマーにとって便利です。 増分データ生成メカニズムの有効値:

  • none (デフォルト): 追加の増分データは生成されません。 下流は引き続き Paimon テーブルをストリーム読み取りできますが、読み取られる増分データは不完全です。 `update_after` データのみを含み、対応する `update_before` データは含まれません。

  • input: 入力データストリームは、メインテーブルと増分データファイルの両方に書き込まれます。

  • full-compaction: 各完全なコンパクション中に完全な増分データが生成されます。

  • lookup: 各スナップショットがコミットされる前に完全な増分データが生成されます。

増分データを生成するメカニズムの選択方法の詳細については、「増分データ生成メカニズム」をご参照ください。

full-compaction.delta-commits

完全なコンパクションの最大間隔。

Integer

いいえ

なし

このパラメーターは、完全なコンパクションを実行する必要があるスナップショットのコミット数を指定します。

lookup.cache-max-memory-size

Paimon ディメンションテーブルのメモリキャッシュサイズ。

String

いいえ

256 MB

このパラメーターは、ディメンションテーブルと lookup changelog-producer の両方のキャッシュサイズに影響します。 両方のメカニズムのキャッシュサイズは、このパラメーターによって構成されます。

merge-engine

同じプライマリキーを持つデータをマージするメカニズム。

String

いいえ

deduplicate

有効値:

  • deduplicate: 最新のレコードのみが保持されます。

  • partial-update: 同じプライマリキーを持つ既存のデータを、最新データの null でない列で更新します。 他の列は変更されません。

  • aggregation: 指定された集計関数を使用して事前集約を実行します。

データマージメカニズムの詳細な分析については、「データマージメカニズム」をご参照ください。

partial-update.ignore-delete

削除 (-D) メッセージを無視するかどうかを指定します。

ブール値

いいえ

false

有効値:

  • true: 削除メッセージを無視します。

  • false: 削除メッセージを無視しません。 sequence.field などの設定項目を使用して、削除データを処理するシンクのポリシーを設定する必要があります。 そうしないと、IllegalStateException または IllegalArgumentException エラーがスローされる可能性があります。

説明
  • Realtime Compute for Apache Flink VVR 8.0.6 以前のバージョンでは、このパラメーターは merge-engine = partial-update の部分更新シナリオでのみ有効です。

  • Realtime Compute for Apache Flink VVR 8.0.7 以降のバージョンでは、このパラメーターは非部分更新シナリオと互換性があり、ignore-delete パラメーターと同じ機能を持ちます。 代わりに ignore-delete を使用することを推奨します。

  • このパラメーターを有効にするかどうかは、ビジネスシナリオと削除データが期待どおりであるかどうかに基づいて決定してください。 削除データによって表されるジョブのセマンティクスが期待どおりでない場合は、エラーをスローする方が良い選択です。

ignore-delete

削除 (-D) メッセージを無視するかどうかを指定します。

ブール値

いいえ

false

有効値は partial-update.ignore-delete と同じです。

説明
  • このパラメーターは、Realtime Compute for Apache Flink VVR 8.0.7 以降でのみサポートされます。

  • このパラメーターは `partial-update.ignore-delete` と同じ機能を持ちます。 `ignore-delete` パラメーターを使用し、両方のパラメーターを同時に設定しないことを推奨します。

partition.default-name

パーティションのデフォルト名。

String

いいえ

__DEFAULT_PARTITION__

パーティションキー列の値が null または空の文字列の場合、このデフォルト名がパーティション名として使用されます。

partition.expiration-check-interval

期限切れのパーティションをチェックする間隔。

String

いいえ

1h

詳細については、「自動パーティション期限切れを設定するにはどうすればよいですか?」をご参照ください。

partition.expiration-time

パーティションの有効期限。

String

いいえ

なし

パーティションの生存時間がこの値を超えると、パーティションは期限切れになります。 デフォルトでは、パーティションは期限切れになりません。

パーティションの生存時間は、そのパーティション値から計算されます。 詳細については、「自動パーティション期限切れを設定するにはどうすればよいですか?」をご参照ください。

partition.timestamp-formatter

時間文字列を UNIX タイムスタンプに変換するためのフォーマット文字列。

String

いいえ

なし

パーティション値からパーティションの生存時間を抽出するためのフォーマットを設定します。 詳細については、「自動パーティション期限切れを設定するにはどうすればよいですか?」をご参照ください。

partition.timestamp-pattern

パーティション値を時間文字列に変換するためのフォーマット文字列。

String

いいえ

なし

パーティション値からパーティションの生存時間を抽出するためのフォーマットを設定します。 詳細については、「自動パーティション期限切れを設定するにはどうすればよいですか?」をご参照ください。

scan.bounded.watermark

Paimon ソーステーブルによって生成されたデータのウォーターマークがこの値を超えると、Paimon ソーステーブルはデータの生成を停止します。

Long

いいえ

なし

なし。

scan.mode

Paimon ソーステーブルのコンシューマオフセットを指定します。

String

いいえ

default

詳細については、「Paimon ソーステーブルのコンシューマオフセットを設定するにはどうすればよいですか?」をご参照ください。

scan.snapshot-id

Paimon ソーステーブルが消費を開始するスナップショットを指定します。

Integer

いいえ

なし

詳細については、「Paimon ソーステーブルのコンシューマオフセットを設定するにはどうすればよいですか?」をご参照ください。

scan.timestamp-millis

Paimon ソーステーブルが消費を開始する時点を指定します。

Integer

いいえ

なし

詳細については、「Paimon ソーステーブルのコンシューマオフセットを設定するにはどうすればよいですか?」をご参照ください。

snapshot.num-retained.max

期限切れにせずに保持する最新のスナップショットの最大数。

Integer

いいえ

2147483647

この設定または `snapshot.time-retained` が満たされ、かつ `snapshot.num-retained.min` も満たされた場合、スナップショットは期限切れになります。

snapshot.num-retained.min

期限切れにせずに保持する最新のスナップショットの最小数。

Integer

いいえ

10

なし。

snapshot.time-retained

スナップショットが期限切れになるまでの期間。

String

いいえ

1h

この設定または `snapshot.num-retained.max` が満たされ、かつ `snapshot.num-retained.min` も満たされた場合、スナップショットは期限切れになります。

write-mode

Paimon テーブルの書き込みモード。

String

いいえ

change-log

有効値:

  • change-log: Paimon テーブルは、プライマリキーに基づいてデータの挿入、削除、更新をサポートします。

  • append-only: Paimon テーブルはデータの挿入のみを受け付け、プライマリキーをサポートしません。 このモードは change-log モードよりも効率的です。

書き込みモードの詳細については、「書き込みモード」をご参照ください。

scan.infer-parallelism

Paimon ソーステーブルの並列度を自動的に推測するかどうかを指定します。

ブール値

いいえ

true

有効値:

  • true: Paimon ソーステーブルの並列度は、バケット数に基づいて自動的に推測されます。

  • false: VVP で設定されたデフォルトの並列度が使用されます。 エキスパートモードでは、ユーザーが設定した並列度が使用されます。

scan.parallelism

Paimon ソーステーブルの並列度。

Integer

いいえ

なし

説明

[デプロイメント詳細] > [リソース設定] タブで、[リソースモード] が [エキスパート] に設定されている場合、このパラメーターは有効になりません。

sink.parallelism

Paimon 結果テーブルの並列度。

Integer

いいえ

なし

説明

[デプロイメント詳細] > [リソース設定] タブで、[リソースモード] が [エキスパート] に設定されている場合、このパラメーターは有効になりません。

sink.clustering.by-columns

Paimon 結果テーブルへの書き込みのためのクラスタリング列を指定します。

String

いいえ

なし

Paimon append-only テーブル (非プライマリキーテーブル) の場合、バッチジョブでこのパラメーターを設定すると、クラスタリング書き込み機能が有効になります。 この機能は、特定列のデータを値の範囲でクラスタリングし、テーブルのクエリ速度を向上させます。

複数の列名をカンマ (,) で区切ります。例: 'col1,col2'

クラスタリングの詳細については、Apache Paimon 公式ドキュメントをご参照ください。

sink.delete-strategy​

システムがリトラクション (-D/-U) メッセージを正しく処理することを保証するための検証ポリシーを設定します。

​​

Enum

いいえ

NONE

検証ポリシーの有効値と、リトラクションメッセージを処理するシンクオペレーターの期待される動作は次のとおりです:​

  • ​NONE (デフォルト): 検証は実行されません。​

  • IGNORE_DELETE: シンクオペレーターは -U および -D メッセージを無視する必要があります。 リトラクションは発生しません。

  • NON_PK_FIELD_TO_NULL: シンクオペレーターは -U メッセージを無視します。ただし、-D メッセージを受信すると、プライマリキー値を保持し、スキーマ内の他の非プライマリキー値を取り消します。

    これは主に、複数のシンクが部分的な更新のために同じテーブルに同時に書き込むシナリオで使用されます。

  • DELETE_ROW_ON_PK: シンクオペレーターは -U メッセージを無視する必要があります。 ただし、-D メッセージを受信すると、プライマリキーに対応する行を削除します。​

  • CHANGELOG_STANDARD: シンクオペレーターは、-U または -D データを受信したときに、プライマリキーに対応する行を削除する必要があります。​

説明
  • このパラメーターは、Realtime Compute for Apache Flink VVR 8.0.8 以降でのみサポートされます。

  • Paimon シンクがリトラクションメッセージを処理するときの動作は、実際には `ignore-delete` や `merge-engine` などの他の設定項目によって決定されます。 この設定項目は、この動作に直接影響しません。 代わりに、動作が期待されるポリシーを満たしているかどうかを検証します。 動作が期待されるポリシーを満たしていない場合、検証ステップは終了し、ジョブのエラーメッセージは、期待を満たすために `ignore-delete` や `merge-engine` などの他の設定項目をどのように変更するかを促します。

説明

設定項目の詳細については、Apache Paimon 公式ドキュメントをご参照ください。

機能詳細

データの鮮度と一貫性の保証

Paimon 結果テーブルは、2 フェーズコミットプロトコルを使用して、Flink ジョブの各チェックポイント中に書き込まれたデータをコミットします。 したがって、データの鮮度は Flink ジョブのチェックポイント間隔と同じです。 各コミットは最大 2 つのスナップショットを生成します。

2 つの Flink ジョブが同時に同じ Paimon テーブルに書き込む場合、2 つのジョブのデータが同じバケットに書き込まれない場合は、シリアライズ可能な一貫性が保証されます。 2 つのジョブのデータが同じバケットに書き込まれる場合は、スナップショット分離の一貫性のみが保証されます。 これは、テーブル内のデータが両方のジョブの結果の混合である可能性があることを意味しますが、データは失われません。

データマージメカニズム

Paimon 結果テーブルが同じプライマリキーを持つ複数のレコードを受信すると、これらのレコードを 1 つのレコードにマージして、プライマリキーの一意性を維持します。 merge-engine パラメーターを設定することで、データマージの動作を指定できます。 次の表に、データマージメカニズムを示します。

メカニズム

詳細

Deduplicate

Deduplicate メカニズムは、デフォルトのデータマージメカニズムです。 同じプライマリキーを持つ複数のレコードの場合、Paimon 結果テーブルは最新のレコードのみを保持し、その他は破棄します。

説明

最新のレコードが削除メッセージの場合、そのプライマリキーを持つすべてのレコードが破棄されます。

部分更新

partial-update メカニズムを指定することで、複数のメッセージを介してデータを段階的に更新し、最終的に完全なデータを取得できます。 具体的には、同じプライマリキーを持つ新しいデータは古いデータを上書きしますが、null 値を持つ列は上書きされません。

たとえば、Paimon 結果テーブルが次の 3 つのレコードを順番に受信するとします。

  • <1, 23.0, 10, NULL>

  • <1, NULL, NULL, 'This is a book'>

  • <1, 25.2, NULL, NULL>

最初の列はプライマリキーです。 最終結果は <1, 25.2, 10, 'This is a book'> です。

説明
  • partial-update の結果をストリーム読み取りするには、changelog-producer パラメーターを `lookup` または `full-compaction` に設定する必要があります。

  • partial-update メカニズムは削除メッセージを処理できません。 partial-update.ignore-delete パラメーターを設定して、削除メッセージを無視できます。

集約

一部のシナリオでは、集約された値のみに関心がある場合があります。 集約メカニズムは、指定した集計関数に基づいて、同じプライマリキーを持つデータを集約します。 プライマリキーの一部ではない各列について、fields.<field-name>.aggregate-function を使用して集計関数を指定する必要があります。 そうしないと、その列にはデフォルトで last_non_null_value 集計関数が使用されます。 たとえば、次の Paimon テーブル定義を考えます。

CREATE TABLE MyTable (
  product_id BIGINT,
  price DOUBLE,
  sales BIGINT,
  PRIMARY KEY (product_id) NOT ENFORCED
) WITH (
  'merge-engine' = 'aggregation',
  'fields.price.aggregate-function' = 'max',
  'fields.sales.aggregate-function' = 'sum'
);

price 列は max 関数を使用して集計され、sales 列は sum 関数を使用して集計されます。 2 つの入力レコード <1, 23.0, 15> と <1, 30.2, 20> が与えられた場合、最終結果は <1, 30.2, 35> です。 現在サポートされている集計関数とそれに対応するデータ型は次のとおりです。

  • sum: DECIMAL、TINYINT、SMALLINT、INTEGER、BIGINT、FLOAT、DOUBLE をサポートします。

  • min および max: DECIMAL、TINYINT、SMALLINT、INTEGER、BIGINT、FLOAT、DOUBLE、DATE、TIME、TIMESTAMP、TIMESTAMP_LTZ をサポートします。

  • last_value および last_non_null_value: すべてのデータ型をサポートします。

  • listagg: STRING をサポートします。

  • bool_and および bool_or: BOOLEAN をサポートします。

説明
  • sum 関数のみがデータのリトラクションと削除をサポートします。 他の集計関数はサポートしません。 特定の列でリトラクションと削除メッセージを無視する必要がある場合は、'fields.${field_name}.ignore-retract'='true' を設定できます。

  • 集約の結果をストリーム読み取りするには、changelog-producer パラメーターを `lookup` または `full-compaction` に設定する必要があります。

増分データ生成メカニズム

changelog-producer パラメーターを適切な増分データ生成メカニズムに設定することで、Paimon は任意の入力データストリームに対して完全な増分データを生成できます。 完全な増分データとは、すべての `update_after` データに対応する `update_before` データがあることを意味します。 以下に、すべての増分データ生成メカニズムをリストします。 詳細については、Apache Paimon 公式ドキュメントをご参照ください。

メカニズム

詳細

None

changelog-producer を `none` (デフォルト値) に設定すると、下流の Paimon ソーステーブルは、同じプライマリキーのデータの最新の状態しか見ることができません。 ただし、この最新の状態では、下流のコンシューマーが正しい計算のために完全な増分データを簡単に理解することはできません。 これは、対応するデータが削除されたか、最新のデータが何かを判断できるだけで、変更前のデータが何であったかは判断できないためです。

たとえば、下流のコンシューマーが列の合計を計算する必要があるとします。 コンシューマーが最新のデータである 5 しか見ない場合、合計をどのように更新するかを判断できません。 以前のデータが 4 だった場合は、合計を 1 増やす必要があります。 以前のデータが 6 だった場合は、合計を 1 減らす必要があります。 このようなコンシューマーは `update_before` データに敏感です。 増分データ生成メカニズムを `none` に設定しないことを推奨します。 ただし、他のメカニズムではパフォーマンスのオーバーヘッドが発生します。

説明

データベースなどの下流のコンシューマーが `update_before` データに敏感でない場合は、増分データ生成メカニズムを `none` に設定できます。 したがって、実際のニーズに基づいてメカニズムを設定することを推奨します。

Input

changelog-producer を `input` に設定すると、Paimon 結果テーブルは入力データストリームをメインテーブルと増分データファイルの両方に書き込みます。

したがって、このメカニズムは、Change Data Capture (CDC) データなど、入力データストリーム自体が完全な増分データである場合にのみ使用できます。

Lookup

changelog-producer を `lookup` に設定すると、Paimon 結果テーブルは、ディメンションテーブルのポイントクエリに似たメカニズムを使用して、各コミット前に現在のスナップショットの完全な増分データを生成します。 このメカニズムは、入力データが完全な増分データであるかどうかに関係なく、完全な増分データを生成できます。

以下で説明する Full Compaction メカニズムと比較して、Lookup メカニズムは増分データを生成するためのタイムラインが優れていますが、全体的に多くのリソースを消費します。

分単位の鮮度など、データ鮮度に対する要件が高い場合に、このメカニズムを使用することを推奨します。

Full Compaction

changelog-producer を `full-compaction` に設定すると、Paimon 結果テーブルは各完全なコンパクション中に完全な増分データを生成します。 このメカニズムは、入力データが完全な増分データであるかどうかに関係なく、完全な増分データを生成できます。 完全なコンパクションの間隔は、full-compaction.delta-commits パラメーターによって指定されます。

Lookup メカニズムと比較して、Full Compaction メカニズムは増分データを生成するためのタイムラインが劣ります。 ただし、データの完全なコンパクションプロセスを利用し、余分な計算を生成しないため、全体的に消費するリソースが少なくなります。

時間単位の鮮度など、データ鮮度に対する要件が高くない場合に、このメカニズムを使用することを推奨します。

書き込みモード

Paimon テーブルは現在、次の書き込みモードをサポートしています。

モード

詳細

Change-log

change-log 書き込みモードは、Paimon テーブルのデフォルトの書き込みモードです。 このモードは、プライマリキーに基づいてデータの挿入、削除、更新をサポートします。 このモードでは、前述のデータマージおよび増分データ生成メカニズムも使用できます。

Append-only

append-only 書き込みモードは、データの挿入のみをサポートし、プライマリキーをサポートしません。 このモードは change-log モードよりも効率的であり、分単位の鮮度など、データ鮮度の要件が高くないシナリオでメッセージキューの代替として使用できます。

append-only 書き込みモードの詳細については、Apache Paimon 公式ドキュメントをご参照ください。 append-only 書き込みモードを使用する場合は、次の点にご注意ください。

  • 実際のニーズに基づいて bucket-key パラメーターを設定することを推奨します。 そうしないと、Paimon テーブルはすべての列の値に基づいてバケット化を実行し、効率が低下します。

  • append-only 書き込みモードは、ある程度データの出力順序を保証できます。 具体的な出力順序は次のとおりです。

    1. 2 つのレコードが異なるパーティションからのものである場合、scan.plan-sort-partition パラメーターが設定されていれば、パーティション値が小さいレコードが先に出力されます。 そうでない場合は、先に作成されたパーティションのデータが先に出力されます。

    2. 2 つのレコードが同じパーティションの同じバケットからのものである場合、先に書き込まれたデータが先に出力されます。

    3. 2 つのレコードが同じパーティションの異なるバケットからのものである場合、異なるバケットは異なる同時タスクによって処理されるため、それらの出力順序は保証されません。

CTAS および CDAS の宛先として

Paimon テーブルは、単一テーブルまたはデータベース全体のレベルでのデータのリアルタイム同期をサポートします。 同期中に上流テーブルのスキーマが変更された場合、その変更は Paimon テーブルにもリアルタイムで同期されます。 詳細については、「Paimon テーブルの管理」および「Paimon カタログの管理」をご参照ください。

データインジェスト

Paimon コネクタは、データインジェストのための YAML ジョブ開発でシンクとして使用できます。

構文

sink:
  type: paimon
  name: Paimon Sink
  catalog.properties.metastore: filesystem
  catalog.properties.warehouse: /path/warehouse

設定項目

パラメーター

説明

必須

データの型

デフォルト値

type

コネクタのタイプ。

はい

STRING

なし

このパラメーターを paimon に設定します。

name

シンク名。

いいえ

STRING

なし

シンクの名前。

catalog.properties.metastore

Paimon カタログのタイプ。

いいえ

STRING

filesystem

有効値:

  • filesystem (デフォルト)

  • rest (DLF のみがサポートされます。 DLF-Legacy はサポートされていません。)

catalog.properties.*

Paimon カタログを作成するためのパラメーター。

いいえ

STRING

なし

詳細については、「Paimon カタログの管理」をご参照ください。

table.properties.*

Paimon テーブルを作成するためのパラメーター。

いいえ

STRING

なし

詳細については、「Paimon テーブルオプション」をご参照ください。

catalog.properties.warehouse

ファイルストレージのルートディレクトリ。

いいえ

STRING

なし

このパラメーターは、catalog.properties.metastore が `filesystem` に設定されている場合にのみ有効です。

commit.user-prefix

データファイルをコミットするためのユーザー名プレフィックス。

いいえ

STRING

なし

説明

コミットの競合が発生したときに競合するジョブを簡単に見つけられるように、異なるジョブに異なるユーザー名を設定することを推奨します。

partition.key

各パーティションテーブルのパーティションフィールド。

いいえ

STRING

なし

異なるテーブルの区切りにはセミコロン (;) を、異なるフィールドの区切りにはコンマ (,) を、テーブルとフィールドの区切りにはコロン (:) を使用します。例: testdb.table1:id1,id2;testdb.table2:name

sink.cross-partition-upsert.tables

パーティション間の更新が必要なテーブルを指定します。プライマリキーにはすべてのパーティションフィールドが含まれていません。

いいえ

STRING

なし

パーティションをまたいで更新するテーブル。

  • フォーマット: セミコロン (;) を使用してテーブル名を区切ります。

  • パフォーマンスに関する推奨事項: この操作は大量の計算リソースを消費します。 これらのテーブルには個別のジョブを作成してください。

重要
  • 対象となるすべてのテーブルをリストアップしてください。 テーブル名を省略すると、データが重複します。

次の例は、Paimon カタログのタイプに基づいて、Paimon をデータインジェストシンクとして設定する方法を示しています。

  • Paimon カタログが filesystem の場合に Alibaba Cloud OSS に書き込むための設定例:

    source:
      type: mysql
      name: MySQL Source
      hostname: ${secret_values.mysql.hostname}
      port: ${mysql.port}
      username: ${secret_values.mysql.username}
      password: ${secret_values.mysql.password}
      tables: ${mysql.source.table}
      server-id: 8601-8604
    
    sink:
      type: paimon
      name: Paimon Sink
      catalog.properties.metastore: filesystem
      catalog.properties.warehouse: oss://default/test
      catalog.properties.fs.oss.endpoint: oss-cn-beijing-internal.aliyuncs.com
      catalog.properties.fs.oss.accessKeyId: xxxxxxxx
      catalog.properties.fs.oss.accessKeySecret: xxxxxxxx

    catalog.properties で始まるパラメーターの詳細については、「Paimon Filesystem カタログの作成」をご参照ください。

  • Paimon カタログが rest の場合に Alibaba Cloud DLF 2.5 に書き込むための設定例:

    source:
      type: mysql
      name: MySQL Source
      hostname: ${secret_values.mysql.hostname}
      port: ${mysql.port}
      username: ${secret_values.mysql.username}
      password: ${secret_values.mysql.password}
      tables: ${mysql.source.table}
      server-id: 8601-8604
    
    sink:
      type: paimon
      name: Paimon Sink
    sink:
      type: paimon
      name: Paimon Sink
      catalog.properties.metastore: rest
      catalog.properties.uri: dlf_uri
      catalog.properties.warehouse: your_warehouse
      catalog.properties.token.provider: dlf

    catalog.properties で始まるパラメーターの詳細については、「Flink CDC カタログ設定パラメーター」をご参照ください。

スキーマ進化

現在、データインジェストシンクとしての Paimon は、次のスキーマ進化イベントをサポートしています。

  • CREATE TABLE EVENT

  • ADD COLUMN EVENT

  • ALTER COLUMN TYPE EVENT (プライマリキー列の型の変更はサポートされていません)

  • RENAME COLUMN EVENT

  • DROP COLUMN EVENT

  • TRUNCATE TABLE EVENT

  • DROP TABLE EVENT

説明

下流の Paimon テーブルがすでに存在する場合、その既存のスキーマが書き込みに使用されます。 システムはテーブルを再度作成しようとはしません。

よくある質問