すべてのプロダクト
Search
ドキュメントセンター

Realtime Compute for Apache Flink:Paimon を使用したストリーミングデータレイクハウス

最終更新日:Mar 12, 2026

最適な結果を得るには、Paimon Catalog とともに Paimon コネクタを使用してください。本トピックでは、ストリーミングデータレイクハウス向けに Paimon コネクタを活用する方法について説明します。

背景情報

Apache Paimon は、ストリーミング処理とバッチ処理を統合したデータレイクストレージフォーマットです。高スループットでの書き込みと低遅延でのクエリをサポートしています。Alibaba Cloud のオープンソースビッグデータプラットフォーム E-MapReduce 上で動作する主要なコンピュートエンジン(Flink、Spark、Hive、Trino)は、Paimon と良好に連携します。また、HDFS やクラウドベースの Object Storage Service 上で、独自のデータレイクストレージサービスを迅速に構築できます。その後、対応するコンピュートエンジンを接続して、データレイクの分析を実行できます。詳細については、「Apache Paimon」をご参照ください。

カテゴリ

詳細

サポートされているタイプ

ソーステーブル、ディメンションテーブル、および結果テーブル(データインジェスト先)

実行モード

ストリーミングモードおよびバッチモード

データフォーマット

非対応

固有の監視メトリクス

なし

API 種別

SQL および YAML ベースのデータインジェストジョブ

結果テーブルへのデータ更新および削除の対応

はい

主な特徴

Apache Paimon は以下のコア機能を提供します。

  • HDFS または Object Storage Service 上で、軽量かつ低コストのデータレイクストレージサービスを構築します。

  • ストリーミングモードおよびバッチモードの両方で、大規模なデータセットの読み書きを行います。

  • 数分から数秒単位のデータ新鮮度(freshness)で、バッチおよび OLAP クエリを実行します。

  • 増分データのインジェストおよび生成を実現し、従来のオフラインデータウェアハウスおよび現代的なストリーミングデータウェアハウスのすべてのレイヤーにおけるストレージとして機能します。

  • 事前集約により、ストレージコストおよび下流の計算負荷を削減します。

  • 履歴バージョンへのロールバックが可能です。

  • データを効率的にフィルター処理できます。

  • スキーマ進化をサポートします。

使用制限および推奨事項

  • Paimon コネクタは、Ververica Runtime (VVR) バージョン 6.0.6 以降でのみサポートされます。

  • 以下の表は、Paimon コミュニティ版と Realtime Compute for Apache Flink エンジン(VVR)のバージョン間の対応関係を示しています。

    Apache Paimon バージョン

    VVR バージョン

    1.3

    11.4

    1.2

    11.2 および 11.3

    1.1

    11

    1.0

    8.0.11

    0.9

    8.0.7、8.0.8、8.0.9、および 8.0.10

    0.8

    8.0.6

    0.7

    8.0.5

    0.6

    8.0.4

    0.6

    8.0.3

  • 同時書き込み時のストレージ推奨事項

    複数のジョブが同一の Paimon テーブルに対して同時に更新を行う場合、標準の OSS ストレージ(oss://)では、ファイル操作の原子性制限により、まれにコミット競合やジョブ失敗が発生する可能性があります。

    安定的で継続的な書き込みを確保するには、強力な原子性を保証するメタデータまたはストレージサービスを使用してください。Data Lake Formation (DLF) の使用を推奨します。Data Lake Formation (DLF) は、Paimon メタデータとストレージ管理を統合します。または、OSS-HDFS または HDFS を使用してください。

  • パラメーター設定の有効化

    Paimon テーブルの構成パラメーターを変更した後は、関連するジョブを再起動して、新しい設定を有効化してください。実行中のジョブは、そのような変更を動的に検出・読み込むことはできません。

  • パーティション削除後の物理クリーンアップ遅延

    DROP PARTITION を実行しても、基盤となる物理データファイルは即座に削除されません。
    この操作は論理削除のみを実行します。Paimon は、最新のスナップショットから該当パーティションのメタデータを削除します。Paimon はタイムトラベルをサポートしているため、履歴スナップショットは引き続き該当パーティションのデータファイルを参照します。物理ファイルは、該当パーティションを参照するすべてのスナップショットが有効期限切れとなり、Paimon のスナップショット有効期限切れメカニズムによってクリーンアップされた後にのみ削除されます。

SQL

SQL ジョブにおいて、Paimon コネクタをソーステーブルまたは結果テーブルとして使用できます。

構文

  • Paimon Catalog 内で Paimon テーブルを作成する場合は、connector パラメーターを省略します。構文は以下のとおりです。

    CREATE TABLE `<YOUR-PAIMON-CATALOG>`.`<YOUR-DB>`.paimon_table (
      id BIGINT,
      data STRING,
      PRIMARY KEY (id) NOT ENFORCED
    ) WITH (
      ...
    );
    説明

    Paimon Catalog 内に既に Paimon テーブルを作成済みの場合は、再作成せずにそのままご利用ください。

  • 他のカタログ内で Paimon の一時テーブルを作成する場合は、connector パラメーターおよび Paimon テーブルファイルのパスを指定します。構文は以下のとおりです。

    CREATE TEMPORARY TABLE paimon_table (
      id BIGINT,
      data STRING,
      PRIMARY KEY (id) NOT ENFORCED
    ) WITH (
      'connector' = 'paimon',
      'path' = '<path-to-paimon-table-files>',
      'auto-create' = 'true', -- パスに Paimon テーブルデータが存在しない場合、自動的にファイルを作成します。
      ...
    );
    説明
    • パスの例: 'path' = 'oss://<bucket>/test/order.db/orders'.db サフィックスを省略しないでください。 Paimon は、このサフィックスを使用してデータベースを識別します。

    • 同一テーブルへ書き込みを行う複数のジョブは、同じパスを共有する必要があります。

    • パスが異なる場合、たとえ物理的な場所が同一であっても、別のテーブルと見なされます。たとえば、oss://b/testoss://b/test/ は末尾のスラッシュのみが異なりますが、異なるテーブルを指します。カタログ構成が不一致の場合、同時書き込み競合、コンパクション失敗、またはデータ損失が発生する可能性があります。

WITH パラメーター

パラメーター

説明

データの型

必須

デフォルト値

備考

connector

テーブル種別。

String

いいえ

なし

  • Paimon Catalog 内で Paimon テーブルを作成する場合は、このパラメーターを省略します。

  • 他のカタログ内で Paimon の一時テーブルを作成する場合は、このパラメーターを paimon に設定します。

path

テーブルのストレージパス。

String

いいえ

なし

  • Paimon Catalog 内で Paimon テーブルを作成する場合は、このパラメーターを省略します。

  • 他のカタログ内で Paimon の一時テーブルを作成する場合は、HDFS または OSS 上のテーブル格納ディレクトリを指定します。

auto-create

Paimon の一時テーブルを作成する際に、指定されたパスに Paimon テーブルファイルが存在しない場合、自動的にファイルを作成します。

Boolean

いいえ

false

有効な値:

  • false(デフォルト):パスに Paimon テーブルファイルが存在しない場合、エラーを返します。

  • true:パスが存在しない場合、Flink が自動的に Paimon テーブルファイルを作成します。

file.format

テーブル内のデータファイルのストレージクラス。

String

いいえ

parquet

有効な値:

  • orc

  • parquet

  • avro

  • lance(VVR 11.6 以降)

bucket

パーティションあたりのバケット数。

Integer

いいえ

1

Paimon テーブルへの書き込みデータは、bucket-key に基づいてバケット間で分散されます。

説明

バケットサイズは 5 GB 未満を推奨します。

bucket-key

バケットキー列。

String

いいえ

なし

データがバケット間でどのように分散されるかを決定する列を指定します。

列名は英語のカンマ(,)で区切ります。たとえば、'bucket-key' = 'order_id,cust_id' とすると、order_id 列および cust_id 列の値に基づいてデータが離散化されます。

説明
  • 省略した場合、Paimon はプライマリキーを使用してデータを分散します。

  • プライマリキーが定義されていない場合、Paimon はすべての列を使用します。

changelog-producer

増分データを生成するメカニズム。

String

いいえ

none

Paimon は、任意の入力データストリームに対して、完全なチェンジログ(update_before および update_after レコードが一致)を生成できます。これにより、下流のコンシューマーが更新を正しく処理できるようになります。有効な値:

  • none(デフォルト):追加のチェンジログレコードは生成しません。下流のコンシューマーは依然としてストリーミングモードで Paimon テーブルを読み取れますが、チェンジログは不完全です(update_after レコードのみで、update_before レコードはありません)。

  • Input:入力データストリームを増分データファイルに二重書き込みすることで、増分データとして利用します。

  • full-compaction:各フルコンパクション時に完全なチェンジログを生成します。

  • lookup:各スナップショットコミット前に完全なチェンジログを生成します。

チェンジログプロデューサーの選択に関するガイドラインについては、「チェンジログプロデューサー」をご参照ください。

full-compaction.delta-commits

フルコンパクション間の最大間隔。

Integer

いいえ

なし

このパラメーターは、フルコンパクションを実行するまでのスナップショットコミット数を設定します。

lookup.cache-max-memory-size

Paimon ディメンションテーブルのメモリキャッシュサイズ。

String

いいえ

256 MB

この設定は、ディメンションテーブルのキャッシュサイズおよび lookup changelog-producer のキャッシュサイズの両方に適用されます。

merge-engine

同一のプライマリキーを持つ行をマージするメカニズム。

String

いいえ

deduplicate

有効な値:

  • deduplicate:最も新しい行のみを保持します。

  • Partial update:最新のデータから null でない列を使用して、同一のプライマリキーを持つ既存のデータを更新し、他の列は変更しません。

  • aggregation:指定された集計関数を使用してデータを事前集約します。

マージエンジンの詳細については、「マージエンジン」をご参照ください。

partial-update.ignore-delete

delete(-D)メッセージを無視するかどうか。

Boolean

いいえ

false

有効な値:

  • true:delete メッセージを無視します。

  • false:delete メッセージを無視しません。delete データのハンドリングは、sequence.field などのパラメーターで設定してください。設定しない場合、ジョブで IllegalStateException または IllegalArgumentException がスローされる可能性があります。

説明
  • このパラメーターは、VVR 8.0.6 以前の partial-update シナリオ(merge-engine = partial-update)でのみ適用されます。

  • VVR 8.0.7 以降では、すべてのシナリオでこのパラメーターが機能し、ignore-delete パラメーターと同様の動作をします。代わりに ignore-delete を使用してください。

  • このパラメーターを有効化する前に、delete メッセージが想定されるジョブのセマンティクスと一致するかどうかを評価してください。一致しない場合は、エラーをスローする方が安全です。

ignore-delete

delete(-D)メッセージを無視するかどうか。

Boolean

いいえ

false

partial-update.ignore-delete と同じ有効な値です。

説明
  • VVR 8.0.7 以降でのみサポートされます。

  • partial-update.ignore-delete と機能的に同一です。ignore-delete のみを設定し、両方を設定しないでください。

partition.default-name

デフォルトのパーティション名。

String

いいえ

__DEFAULT_PARTITION__

パーティション列の値が null または空文字列の場合に使用されるパーティション名です。

partition.expiration-check-interval

期限切れパーティションをチェックする頻度。

String

いいえ

1h

詳細については、「自動パーティション有効期限切れの設定方法」をご参照ください。

partition.expiration-time

パーティションの保持期間。

String

いいえ

なし

この期間経過後にパーティションが有効期限切れとなります。デフォルトでは、パーティションは有効期限切れになりません。

期間はパーティション値から算出されます。詳細については、「自動パーティション有効期限切れの設定方法」をご参照ください。

partition.timestamp-formatter

タイム文字列をフォーマット済みのタイムスタンプに変換します。

String

いいえ

なし

パーティション値からパーティションの経過時間を抽出するために使用されるフォーマットを指定します。詳細については、「自動パーティション有効期限切れの設定方法」をご参照ください。

partition.timestamp-pattern

パーティション値をタイムスタンプ文字列に変換するためのフォーマット文字列。

String

いいえ

なし

パーティション値からパーティションの経過時間を抽出するために使用されるフォーマットを指定します。詳細については、「自動パーティション有効期限切れの設定方法」をご参照ください。

scan.bounded.watermark

Paimon ソーステーブルの Watermark がこの値を超えると、読み取りを停止します。

Long

いいえ

なし

なし。

scan.mode

Paimon ソーステーブルのコンシューマオフセット。

String

いいえ

default

詳細については、「Paimon ソーステーブルのコンシューマオフセットの設定方法」をご参照ください。

scan.snapshot-id

読み取り開始対象のスナップショット ID。

Integer

いいえ

なし

詳細については、「Paimon ソーステーブルのコンシューマオフセットの設定方法」をご参照ください。

scan.timestamp-millis

読み取り開始対象のタイムスタンプ。

Integer

いいえ

なし

詳細については、「Paimon ソーステーブルのコンシューマオフセットの設定方法」をご参照ください。

snapshot.num-retained.max

保持する最新のスナップショットの最大数。

Integer

いいえ

2147483647

スナップショットは、この制限または snapshot.time-retained のいずれかに該当し、かつ snapshot.num-retained.min を満たす場合に有効期限切れとなります。

snapshot.num-retained.min

保持する最新のスナップショットの最小数。

Integer

いいえ

10

なし。

snapshot.time-retained

スナップショットの有効期限までの時間。

String

いいえ

1h

スナップショットは、この制限または snapshot.num-retained.max のいずれかに該当し、かつ snapshot.num-retained.min を満たす場合に有効期限切れとなります。

write-mode

Paimon テーブルの書き込みモード。

String

いいえ

change-log

有効な値:

  • change-log:プライマリキーに基づく insert、delete、update 操作をサポートします。

  • 追加専用: Paimon テーブルは挿入操作のみを許可し、プライマリキーをサポートしません。このモードは、変更ログモードよりも効率的です。

詳細については、「書き込みモード」をご参照ください。

scan.infer-parallelism

Paimon ソーステーブルの並列度を自動的に推論するかどうか。

Boolean

いいえ

true

有効な値:

  • true:バケット数から並列度を推論します。

  • false:VVP で設定されたデフォルトの並列度を使用します。エキスパートモードでは、ユーザーが指定した並列度を使用します。

scan.parallelism

Paimon ソーステーブルの並列度。

Integer

いいえ

なし

説明

このパラメーターは、デプロイメントの詳細 > リソース構成 タブで リソース割り当てモード がエキスパートモードに設定されている場合、効果がありません。

sink.parallelism

Paimon 結果テーブルの並列度。

Integer

いいえ

なし

説明

このパラメーターは、デプロイメントの詳細 > リソース構成 タブで リソース割り当てモード がエキスパートモードに設定されている場合、効果がありません。

sink.clustering.by-columns

Paimon 結果テーブルのクラスタリング列。

String

いいえ

なし

Paimon の append-only テーブル(プライマリキーを持たないテーブル) の場合、バッチジョブでこのパラメーターを設定してクラスタリング書き込みを有効化できます。クラスタリングは、指定された列の値範囲に基づいてデータをグループ化することで、クエリ速度を向上させます。

列名はカンマ(,)で区切ります。例:'col1,col2'

詳細については、「Apache Paimon ドキュメント」をご参照ください。

sink.delete-strategy

retract(-D/-U)メッセージを正しく処理するための検証戦略。

​​

Enum

いいえ

NONE

有効な戦略および必要な sink 動作:

  • NONE(デフォルト):検証を行いません。

  • IGNORE_DELETE:sink は -U および -D メッセージを無視し、retract しません。

  • NON_PK_FIELD_TO_NULL:sink は -U メッセージを無視します。-D メッセージを受信した場合、プライマリキーは保持しつつ、プライマリキー以外のフィールドを null にします。

    これは、複数の sink が同一テーブルに書き込む部分更新のシナリオで有用です。

  • DELETE_ROW_ON_PK:sink は -U メッセージを無視します。-D メッセージを受信した場合、該当するプライマリキーに一致する行を削除します。

  • CHANGELOG_STANDARD:sink は -U および -D メッセージを受信した場合、該当するプライマリキーに一致する行を削除します。

説明
  • VVR 8.0.8 以降でのみサポートされます。

  • 実際の動作は、ignore-delete や merge-engine などの他のパラメーターに依存します。このパラメーターは、これらの設定が選択した戦略と整合しているかを検証します。整合していない場合、検証に失敗し、エラーメッセージが修正方法を案内します。

説明

その他の構成オプションについては、「Apache Paimon ドキュメント」をご参照ください。

機能の詳細

データ新鮮度および一貫性保証

Paimon 結果テーブルは、Flink ジョブの各チェックポイントで二相コミットプロトコルを使用してデータをコミットします。そのため、データ新鮮度は Flink ジョブのチェックポイント間隔に対応します。各コミットでは、最大で 2 つのスナップショットが生成されます。

2 つの Flink ジョブが同一の Paimon テーブルに同時に書き込む場合、データが異なるバケットに書き込まれていれば直列化可能(serializable)な一貫性が保証されます。一方、同一のバケットに書き込まれている場合、スナップショット隔離(snapshot isolation)の一貫性しか保証されません。つまり、テーブルのデータは両方のジョブの結果が混在する可能性がありますが、データ損失は発生しません。

マージエンジン

Paimon 結果テーブルが同一のプライマリキーを持つ複数の行を受信した場合、一意性を保つためにそれらを 1 行にマージします。merge-engine パラメーターを設定することで、マージの動作を制御できます。以下の表に各オプションを示します。

マージエンジン

詳細

Deduplicate

Deduplicate はデフォルトのマージエンジンです。同一のプライマリキーを持つ行に対して、Paimon 結果テーブルは最も新しい行のみを保持し、残りを破棄します。

説明

最も新しい行が delete メッセージである場合、該当するプライマリキーを持つすべての行が破棄されます。

Partial update

Partial update を使用すると、複数のメッセージにわたってデータを段階的に更新できます。同一のプライマリキーを持つ新しい行は既存の行を上書きしますが、null の列は変更されません。

たとえば、Paimon 結果テーブルが以下の順序で行を受信したとします。

  • <1, 23.0, 10, NULL>

  • <1, NULL, NULL, 'This is a book'>

  • <1, 25.2, NULL, NULL>

最初の列がプライマリキーである場合、最終的な結果は <1, 25.2, 10, 'This is a book'> となります。

説明
  • 部分更新の結果をストリーム読み取りするには、changelog-producer パラメーターを lookup または full-compaction に設定する必要があります。

  • Partial update は delete メッセージを処理しません。partial-update.ignore-delete を設定して無視してください。

Aggregation

一部のケースでは、集計値のみが必要になることがあります。Aggregation を使用すると、Paimon は指定された集計関数を用いて、同一のプライマリキーを持つ行をマージします。各プライマリキー以外の列に対しては、fields.<field-name>.aggregate-function を使用して集計関数を定義します。定義しない場合、Paimon は last_non_null_value を使用します。例:

CREATE TABLE MyTable (
  product_id BIGINT,
  price DOUBLE,
  sales BIGINT,
  PRIMARY KEY (product_id) NOT ENFORCED
) WITH (
  'merge-engine' = 'aggregation',
  'fields.price.aggregate-function' = 'max',
  'fields.sales.aggregate-function' = 'sum'
);

price 列は max で集計され、sales 列は sum で集計されます。入力が <1, 23.0, 15> および <1, 30.2, 20> の場合、結果は <1, 30.2, 35> となります。サポートされる集計関数およびデータ型:

  • sum:DECIMAL、TINYINT、SMALLINT、INTEGER、BIGINT、FLOAT、DOUBLE

  • min および max:DECIMAL、TINYINT、SMALLINT、INTEGER、BIGINT、FLOAT、DOUBLE、DATE、TIME、TIMESTAMP、TIMESTAMP_LTZ

  • last_value および last_non_null_value:すべての型

  • listagg:STRING

  • bool_and および bool_or:BOOLEAN

説明
  • retract および delete 操作をサポートするのは sum のみです。他の関数はサポートしません。特定の列について retract を無視するには、'fields.${field_name}.ignore-retract'='true' を設定してください。

  • 集計結果をストリーミング読み取りするには、changelog-producer パラメーターを lookup または full-compaction に設定する必要があります。

増分データ生成メカニズム

changelog-producer パラメーターを設定することで、任意の入力データストリームに対して完全なチェンジログ(update_before および update_after レコードが一致)を生成できます。以下の表に、利用可能なすべてのチェンジログプロデューサーを示します。詳細については、「Apache Paimon ドキュメント」をご参照ください。

メカニズム

詳細

None

changelog-producer が `none` (デフォルト値) に設定されている場合、ダウンストリームの Paimon ソーステーブルは、同じプライマリキーのデータの最新の状態しか見ることができません。しかし、この最新の状態は、正しい計算に必要な完全な増分データをダウンストリームのコンシューマーに提供しません。これは、コンシューマーは対応するデータが削除されたかどうか、または最新のデータが何かは判断できますが、変更前のデータが何であったかを知ることはできないためです。

たとえば、下流のコンシューマーが列の合計を計算するとします。最新の値 5 のみが表示される場合、合計をどう更新すべきか判断できません。前の値が 4 であれば 1 を加算すべきですが、6 であれば 1 を減算すべきです。update_before データに敏感なコンシューマーは、none を避けるべきです。ただし、他のプロデューサーはパフォーマンスオーバーヘッドを伴います。

説明

下流のコンシューマー(たとえばデータベース)が update_before データを必要としない場合、none は許容されます。ご自身の要件に応じて選択してください。

Input

changelog-producer を input に設定すると、Paimon サイクテーブルは入力データストリームを増分データとして増分データファイルに 2 回書き込みます。

したがって、このオプションは、入力ストリーム自体が CDC データなど完全なチェンジログである場合にのみ使用してください。

Lookup

changelog-producer を `lookup` に設定すると、Paimon シンクテーブルは、ディメンションテーブルと同様のポイントクエリメカニズムを使用して、各スナップショットのコミット前に現在のスナップショット用の完全な増分データを生成します。このメカニズムは、入力データが完全であるかどうかに関係なく、完全な増分データを生成できます。

Full Compaction と比較して、Lookup はより新鮮なチェンジログを提供しますが、より多くのリソースを消費します。

分単位の更新など、高い新鮮度を要求する増分データにこの機能を推奨します。

Full Compaction

changelog-producer`full-compaction` に設定すると、Paimon 結果テーブルは各フルコンパクションの際に完全な増分データを生成します。このメカニズムでは、入力データがすでに完全な増分データであるかどうかに関係なく、完全な増分データが生成されます。フルコンパクションの間隔は、full-compaction.delta-commits パラメーターで指定されます。

Lookup と比較して、Full Compaction は新鮮度の低いチェンジログを提供しますが、既存のコンパクション作業に合わせて実行されるため、リソース使用量は少なくなります。

増分データの新鮮度要件が厳しくない場合(たとえば、時間単位の更新で十分な場合)にこのアプローチを推奨します。

書き込みモード

Paimon テーブルは以下の書き込みモードをサポートします。

モード

詳細

Change-log

Change-log はデフォルトの書き込みモードです。プライマリキーに基づく insert、delete、update 操作をサポートします。また、このモードではマージエンジンおよびチェンジログプロデューサーも使用できます。

Append-only

Append-only モードは insert のみをサポートし、プライマリキーを使用しません。change-log モードよりも効率的です。中程度のデータ新鮮度(たとえば分単位)で十分な場合のメッセージキューの代替としてご利用ください。

詳細については、「Apache Paimon ドキュメント」をご参照ください。Append-only モードを使用する際は、以下の点にご注意ください。

  • bucket-key パラメーターを必要に応じて設定してください。設定しない場合、Paimon はすべての列を使用してデータをバケット間で分散するため、計算効率が低下します。

  • Append-only モードでは、出力順序が一定程度保たれます。出力順序は以下のように決定されます。

    1. 2 行が異なるパーティションに属する場合、scan.plan-sort-partition が設定されている場合は、パーティション値が小さい方のパーティションから出力されます。設定されていない場合は、作成日時が早い方のパーティションから出力されます。

    2. 2 行が同一のパーティションおよび同一のバケットに属する場合、先に書き込まれた行が先に出力されます。

    3. 2 行が同一のパーティションだが異なるバケットに属する場合、バケットは個別のスレッドで処理されるため、出力順序は保証されません。

CTAS および CDAS の対象

Paimon テーブルは、単一テーブルまたはデータベース全体のリアルタイム同期をサポートします。上流テーブルのスキーマ変更は、リアルタイムで Paimon テーブルに同期されます。詳細については、「Paimon テーブルの管理」および「Paimon Catalog の管理」をご参照ください。

データインジェスト

YAML ベースのデータインジェストジョブにおいて、Paimon コネクタを結果テーブルとして使用できます。

構文

sink:
  type: paimon
  name: Paimon Sink
  catalog.properties.metastore: filesystem
  catalog.properties.warehouse: /path/warehouse

構成オプション

パラメーター

説明

必須

データの型

デフォルト値

備考

type

コネクタ種別。

はい

STRING

なし

固定値:paimon

name

シンク名。

いいえ

STRING

なし

シンクの名前。

catalog.properties.metastore

Paimon Catalog の種別。

いいえ

STRING

filesystem

有効な値:

  • filesystem(デフォルト)

  • rest(DLF のみ対応。DLF-Legacy は非対応)

catalog.properties.*

Paimon Catalog を作成するためのパラメーター。

いいえ

STRING

なし

詳細については、「Paimon Catalog の管理」をご参照ください。

table.properties.*

Paimon テーブルを作成するためのパラメーター。

いいえ

STRING

なし

詳細については、「Paimon テーブルのオプション」をご参照ください。

catalog.properties.warehouse

ファイルストレージのルートディレクトリ。

いいえ

STRING

なし

このパラメーターは、catalog.properties.metastore がファイルシステムに設定されている場合にのみ有効になります。

commit.user-prefix

データファイルのコミット時に使用されるユーザー名プレフィックス。

いいえ

STRING

なし

説明

コミット失敗時に競合するジョブを特定できるよう、異なるジョブで異なるユーザー名を使用してください。

partition.key

各パーティションテーブルのパーティションフィールド。

いいえ

STRING

なし

テーブルは ; で区切り、フィールドは , で区切り、テーブルとフィールドは : で区切ります。例:testdb.table1:id1,id2;testdb.table2:name

sink.cross-partition-upsert.tables

クロスパーティションアップサート(プライマリキーにすべてのパーティションフィールドが含まれていない)を必要とするテーブル。

いいえ

STRING

なし

クロスパーティションアップサートを必要とするテーブルに適用されます。

  • 形式:テーブル名をセミコロン ; で区切ります。

  • パフォーマンス上のヒント:この操作は大量の計算リソースを消費します。これらのテーブル専用のジョブを作成してください。

重要
  • 該当するすべてのテーブルをリストしてください。テーブルを省略すると、重複データが発生する可能性があります。

sink.commit.parallelism

コミット演算子の並列度。

いいえ

INTEGER

なし

コミット演算子がボトルネックとなっている場合、この値を増加させることでパフォーマンスを向上させられます。

VVR 11.6 以降でのみサポートされます。

説明

このパラメーターを設定すると、演算子の並列度が変更されます。ステートフルジョブを再起動する際は、AllowNonRestoredState を有効化してください。

使用例

Paimon Catalog の種別に応じて、Paimon コネクタをデータインジェスト結果テーブルとして構成します。

  • Paimon Catalog を filesystem として使用し、Alibaba Cloud OSS に書き込む場合の構成例:

    source:
      type: mysql
      name: MySQL Source
      hostname: ${secret_values.mysql.hostname}
      port: ${mysql.port}
      username: ${secret_values.mysql.username}
      password: ${secret_values.mysql.password}
      tables: ${mysql.source.table}
      server-id: 8601-8604
    
    sink:
      type: paimon
      name: Paimon Sink
      catalog.properties.metastore: filesystem
      catalog.properties.warehouse: oss://default/test
      catalog.properties.fs.oss.endpoint: oss-cn-beijing-internal.aliyuncs.com
      catalog.properties.fs.oss.accessKeyId: xxxxxxxx
      catalog.properties.fs.oss.accessKeySecret: xxxxxxxx

    catalog.properties で始まるパラメーターの意味については、「Paimon Filesystem Catalog の作成」をご参照ください。

  • REST Catalog を使用し、Alibaba Cloud Data Lake Formation に書き込む場合の例:

    source:
      type: mysql
      name: MySQL Source
      hostname: ${secret_values.mysql.hostname}
      port: ${mysql.port}
      username: ${secret_values.mysql.username}
      password: ${secret_values.mysql.password}
      tables: ${mysql.source.table}
      server-id: 8601-8604
    
    sink:
      type: paimon
      name: Paimon Sink
      catalog.properties.metastore: rest
      catalog.properties.uri: dlf_uri
      catalog.properties.warehouse: your_warehouse
      catalog.properties.token.provider: dlf
      # (任意)読み取りパフォーマンスを向上させるため、削除ベクターを有効化します。
      table.properties.deletion-vectors.enabled: true

    catalog.properties で始まるパラメーターの意味については、「Flink CDC カタログ構成パラメーター」をご参照ください。

スキーマ進化

データインジェスト結果テーブルとして Paimon を使用する場合、以下のスキーマ進化イベントをサポートします。

  • CREATE TABLE EVENT

  • ADD COLUMN EVENT

  • ALTER COLUMN TYPE EVENT(プライマリキー列の型変更は非対応)

  • RENAME COLUMN EVENT

  • DROP COLUMN EVENT

  • TRUNCATE TABLE EVENT

  • DROP TABLE EVENT

説明

下流の Paimon テーブルが既に存在する場合、Paimon は既存のスキーマを使用して書き込みを行い、テーブルの再作成を試みません。

よくある質問