すべてのプロダクト
Search
ドキュメントセンター

Realtime Compute for Apache Flink:ストリーミングデータレイクハウス Paimon

最終更新日:Feb 07, 2026

Paimon Catalog とともに、ストリーミングデータレイクハウスコネクタである Paimon コネクタを使用できます。このトピックでは、Paimon コネクタの使用方法について説明します。

背景情報

Apache Paimon は、ストリーミング処理とバッチ処理を統合するレイクストレージフォーマットです。高スループットでの書き込みと低遅延でのクエリをサポートしています。Alibaba Cloud のオープンソースビッグデータプラットフォーム E-MapReduce 上で一般的に使用される Flink、Spark、Hive、Trino などのコンピュートエンジンは、Apache Paimon と緊密に統合されています。Apache Paimon を使用して、Hadoop 分散ファイルシステム (HDFS) または Object Storage Service (OSS) 上にデータレイクストレージサービスを構築し、これらのコンピュートエンジンに接続してデータレイク分析を実行できます。詳細については、「Apache Paimon」をご参照ください。

カテゴリ

詳細

サポートされるタイプ

ソーステーブル、ディメンションテーブル、結果テーブル、データインジェスト先

実行モード

ストリーミングモードおよびバッチモード

データフォーマット

未対応

特定の監視メトリック

なし

API タイプ

SQL、YAML データインジェストジョブ

結果テーブルでのデータ更新または削除のサポート

はい

機能

Apache Paimon は、以下の主要機能を提供します。

  • HDFS または OSS 上に、低コストで軽量なデータレイクストレージサービスを構築します。

  • 大規模データセットをストリーミングモードおよびバッチモードで読み書きします。

  • 秒単位から分単位のデータ鮮度で、バッチクエリおよびオンライン分析処理 (OLAP) クエリを実行します。

  • 増分データの消費および生成を行います。Paimon は、従来のオフラインデータウェアハウスおよび最新のストリーミングデータウェアハウスの両方のストレージレイヤーとして使用できます。

  • 事前集約により、ストレージコストおよびダウンストリームの計算負荷を削減します。

  • データの過去のバージョンを取得します。

  • データを効率的にフィルタリングします。

  • スキーマ進化をサポートします。

制限事項と推奨事項

  • Paimon コネクタは、Flink コンピュートエンジン Ververica Runtime (VVR) バージョン 6.0.6 以降でのみサポートされます。

  • 以下の表は、Paimon と VVR のバージョンマッピングを示しています。

    Apache Paimon バージョン

    VVR バージョン

    1.3

    11.4

    1.2

    11.2、11.3

    1.1

    11

    1.0

    8.0.11

    0.9

    8.0.7、8.0.8、8.0.9、および 8.0.10

    0.8

    8.0.6

    0.7

    8.0.5

    0.6

    8.0.4

    0.6

    8.0.3

  • 同時書き込みシナリオにおけるストレージの推奨事項

    複数のジョブが同時に同じ Paimon テーブルを更新する場合、ファイル操作の原子性に関する制限により、標準的な OSS ストレージ (oss://) ではまれにコミット競合やジョブエラーが発生する可能性があります。

    常に安定した書き込みを保証するためには、強力な原子性保証を提供するメタデータまたはストレージサービスの使用を推奨します。特に、Paimon メタデータおよびストレージサービスを統合管理できる Data Lake Formation (DLF) の使用を優先的に推奨します。また、OSS-HDFS または HDFS を使用することも可能です。

SQL

Paimon コネクタは、SQL ジョブ内でソーステーブルまたは結果テーブルとして使用できます。

構文

  • Paimon Catalog 内に Paimon テーブルを作成する場合、connector パラメーターを指定する必要はありません。Paimon テーブルの作成構文は次のとおりです。

    CREATE TABLE `<YOUR-PAIMON-CATALOG>`.`<YOUR-DB>`.paimon_table (
      id BIGINT,
      data STRING,
      PRIMARY KEY (id) NOT ENFORCED
    ) WITH (
      ...
    );
    説明

    Paimon Catalog 内にすでに Paimon テーブルが作成済みの場合は、再度作成せずに直接使用できます。

  • 他のカタログ内に Paimon 一時テーブルを作成する場合は、connector パラメーターおよび Paimon テーブルのストレージパスを指定する必要があります。Paimon テーブルの作成構文は次のとおりです。

    CREATE TEMPORARY TABLE paimon_table (
      id BIGINT,
      data STRING,
      PRIMARY KEY (id) NOT ENFORCED
    ) WITH (
      'connector' = 'paimon',
      'path' = '<path-to-paimon-table-files>',
      'auto-create' = 'true', -- 指定されたパスに Paimon テーブルのデータファイルが存在しない場合、自動的にファイルが作成されます。
      ...
    );
    説明
    • パスの例: 'path' = 'oss://<bucket>/test/order.db/orders'。Paimon は、データベースを識別するためにこの命名規則を使用するため、.db サフィックスを省略しないでください。

    • 同じテーブルに書き込む複数のジョブは、同じパス設定を使用する必要があります。

    • 2 つのパス値が異なる場合、Paimon はそれらを異なるテーブルと見なします。物理パスが同一であっても、Catalog 設定の不一致により、同時書き込み時の競合、コンパクション失敗、およびデータ損失が発生する可能性があります。たとえば、oss://b/testoss://b/test/ は末尾のスラッシュの有無により異なるパスと見なされますが、物理パスは同一です。

WITH パラメーター

パラメーター

説明

データの型

必須

デフォルト値

備考

connector

テーブルタイプ。

String

いいえ

なし

  • Paimon Catalog 内に Paimon テーブルを作成する場合、このパラメーターを指定する必要はありません。

  • 他のカタログ内に Paimon 一時テーブルを作成する場合、このパラメーターを paimon に設定します。

path

テーブルのストレージパス。

String

いいえ

なし

  • Paimon Catalog 内に Paimon テーブルを作成する場合、このパラメーターを指定する必要はありません。

  • 他のカタログ内に Paimon 一時テーブルを作成する場合、このパラメーターは HDFS または OSS 内のテーブルの保存ディレクトリを指定します。

auto-create

Paimon 一時テーブルを作成する際に、指定されたパスに Paimon テーブルファイルが存在しない場合に自動的にファイルを作成するかどうかを指定します。

Boolean

いいえ

false

有効値:

  • false (デフォルト):指定されたパスに Paimon テーブルファイルが存在しない場合、エラーが報告されます。

  • true:指定されたパスが存在しない場合、Flink システムが自動的に Paimon テーブルファイルを作成します。

bucket

各パーティション内のバケット数。

Integer

いいえ

1

Paimon テーブルに書き込まれるデータは、bucket-key に基づいて各バケットに分散されます。

説明

各バケットのデータ量は 5 GB 未満になるように推奨します。

bucket-key

バケット分割のためのキー列。

String

いいえ

なし

Paimon テーブルに書き込まれるデータを異なるバケットに分散させるために使用する列を指定します。

列名はカンマ (,) で区切ります。たとえば、'bucket-key' = 'order_id,cust_id' は order_id 列および cust_id 列の値に基づいてデータを分散させます。

説明
  • このパラメーターを指定しない場合、プライマリキーに基づいてデータが分散されます。

  • Paimon テーブルにプライマリキーが指定されていない場合、すべての列の値に基づいてデータが分散されます。

changelog-producer

増分データを生成するメカニズム。

String

いいえ

none

Paimon は、任意の入力データストリームに対して完全な増分データ (すべての update_after データに対応する update_before データが存在する) を生成できます。これは、ダウンストリームのコンシューマーにとって便利です。増分データ生成メカニズムの有効値は次のとおりです。

  • none (デフォルト):追加の増分データは生成されません。ダウンストリームは依然として Paimon テーブルをストリーミング読み取りできますが、読み取られる増分データは不完全です。update_after データのみが含まれ、対応する update_before データは含まれません。

  • Input:入力データストリームも増分データファイルに書き込まれ、そのファイルに増分データが含まれます。

  • full-compaction:完全コンパクションのたびに完全な増分データが生成されます。

  • lookup:各スナップショットのコミット前に完全な増分データが生成されます。

増分データ生成メカニズムの選択方法の詳細については、「増分データ生成メカニズム」をご参照ください。

full-compaction.delta-commits

完全コンパクションの最大間隔。

Integer

いいえ

なし

このパラメーターは、完全コンパクションを実行する必要があるスナップショットコミットの回数を指定します。

lookup.cache-max-memory-size

Paimon ディメンションテーブルのメモリキャッシュサイズ。

String

いいえ

256 MB

このパラメーターは、ディメンションテーブルおよび lookup changelog-producer の両方のキャッシュサイズに影響します。両方のメカニズムのキャッシュサイズは、このパラメーターによって設定されます。

merge-engine

同じプライマリキーを持つデータをマージするメカニズム。

String

いいえ

deduplicate

有効値:

  • deduplicate:最新のレコードのみを保持します。

  • partial-update:同じプライマリキーを持つ既存データを、最新データの NULL でない列で更新します。他の列は変更されません。

  • aggregation:指定された集計関数を使用して事前集約を実行します。

データマージメカニズムの詳細な分析については、「データマージメカニズム」をご参照ください。

partial-update.ignore-delete

削除 (-D) メッセージを無視するかどうかを指定します。

Boolean

いいえ

false

有効値:

  • true:削除メッセージを無視します。

  • false:削除メッセージを無視しません。削除データを処理するためのポリシーを、sequence.field などの設定項目を使用してシンク側で設定する必要があります。設定しない場合、IllegalStateException または IllegalArgumentException エラーがスローされる可能性があります。

説明
  • Realtime Compute for Apache Flink VVR 8.0.6 以前では、このパラメーターは merge-engine = partial-update の部分更新シナリオでのみ有効です。

  • Realtime Compute for Apache Flink VVR 8.0.7 以降では、このパラメーターは部分更新以外のシナリオとも互換性があり、ignore-delete パラメーターと同じ機能を持ちます。ignore-delete の使用を推奨します。

  • ビジネスシナリオおよび削除データの期待値に基づいて、このパラメーターを有効にするかどうかを判断してください。削除データが表すジョブセマンティクスが期待通りでない場合、エラーをスローすることがより適切な選択です。

ignore-delete

削除 (-D) メッセージを無視するかどうかを指定します。

Boolean

いいえ

false

有効値は partial-update.ignore-delete と同じです。

説明
  • このパラメーターは、Realtime Compute for Apache Flink VVR 8.0.7 以降でのみサポートされます。

  • このパラメーターは `partial-update.ignore-delete` と同じ機能を持ちます。`ignore-delete` パラメーターの使用を推奨し、両方のパラメーターを同時に設定しないでください。

partition.default-name

パーティションのデフォルト名。

String

いいえ

__DEFAULT_PARTITION__

パーティションキー列の値が NULL または空文字列の場合、このデフォルト名がパーティション名として使用されます。

partition.expiration-check-interval

期限切れパーティションをチェックする間隔。

String

いいえ

1h

詳細については、「自動パーティション有効期限の設定方法」をご参照ください。

partition.expiration-time

パーティションの有効期限時間。

String

いいえ

なし

パーティションの生存時間がこの値を超えると、パーティションは有効期限切れになります。デフォルトでは、パーティションは有効期限切れになりません。

パーティションの生存時間は、そのパーティション値から計算されます。詳細については、「自動パーティション有効期限の設定方法」をご参照ください。

partition.timestamp-formatter

時間文字列を UNIX タイムスタンプに変換するためのフォーマット文字列。

String

いいえ

なし

パーティション値からパーティション生存時間を抽出するためのフォーマットを設定します。詳細については、「自動パーティション有効期限の設定方法」をご参照ください。

partition.timestamp-pattern

パーティション値をタイムスタンプ文字列に変換するためのフォーマット文字列。

String

いいえ

なし

パーティション値からパーティション生存時間を抽出するためのフォーマットを設定します。詳細については、「自動パーティション有効期限の設定方法」をご参照ください。

scan.bounded.watermark

Paimon ソーステーブルによって生成されるデータのウォーターマークがこの値を超えると、Paimon ソーステーブルはデータ生成を停止します。

Long

いいえ

なし

なし。

scan.mode

Paimon ソーステーブルのコンシューマーオフセットを指定します。

String

いいえ

default

詳細については、「Paimon ソーステーブルのコンシューマーオフセットの設定方法」をご参照ください。

scan.snapshot-id

Paimon ソーステーブルが消費を開始するスナップショットを指定します。

Integer

いいえ

なし

詳細については、「Paimon ソーステーブルのコンシューマーオフセットの設定方法」をご参照ください。

scan.timestamp-millis

Paimon ソーステーブルが消費を開始する時点を指定します。

Integer

いいえ

なし

詳細については、「Paimon ソーステーブルのコンシューマーオフセットの設定方法」をご参照ください。

snapshot.num-retained.max

有効期限切れにならずに保持される最新スナップショットの最大数。

Integer

いいえ

2147483647

この設定または `snapshot.time-retained` のいずれかが満たされ、かつ `snapshot.num-retained.min` も満たされた場合、スナップショットは有効期限切れになります。

snapshot.num-retained.min

有効期限切れにならずに保持される最新スナップショットの最小数。

Integer

いいえ

10

なし。

snapshot.time-retained

スナップショットが有効期限切れになるまでの期間。

String

いいえ

1h

この設定または `snapshot.num-retained.max` のいずれかが満たされ、かつ `snapshot.num-retained.min` も満たされた場合、スナップショットは有効期限切れになります。

write-mode

Paimon テーブルの書き込みモード。

String

いいえ

change-log

有効値:

  • change-log:Paimon テーブルは、プライマリキーに基づいてデータの挿入、削除、および更新をサポートします。

  • append-only:Paimon テーブルはデータ挿入のみを受け付け、プライマリキーをサポートしません。このモードは change-log モードよりも効率的です。

書き込みモードの詳細については、「書き込みモード」をご参照ください。

scan.infer-parallelism

Paimon ソーステーブルの並行度を自動的に推測するかどうかを指定します。

Boolean

いいえ

true

有効値:

  • true:Paimon ソーステーブルの並行度は、バケット数に基づいて自動的に推測されます。

  • false:VVP で設定されたデフォルトの並行度が使用されます。エキスパートモードでは、ユーザーが設定した並行度が使用されます。

scan.parallelism

Paimon ソーステーブルの並行度。

Integer

いいえ

なし

説明

デプロイメントの詳細 > リソース構成 タブで、リソースモード がエキスパートに設定されている場合、このパラメーターは有効になりません。

sink.parallelism

Paimon 結果テーブルの並行度。

Integer

いいえ

なし

説明

デプロイメントの詳細 > リソース構成 タブで、リソースモード がエキスパートに設定されている場合、このパラメーターは有効になりません。

sink.clustering.by-columns

Paimon 結果テーブルへの書き込み時にクラスタリング列を指定します。

String

いいえ

なし

Paimon append-only テーブル (非プライマリキーテーブル) では、バッチジョブでこのパラメーターを設定すると、クラスタリング書き込み機能が有効になります。この機能は、特定の列の値範囲に基づいてデータをクラスタリングし、テーブルのクエリ速度を向上させます。

複数の列名はカンマ (,) で区切ります。たとえば、'col1,col2' のように指定します。

クラスタリングの詳細については、公式 Apache Paimon ドキュメントをご参照ください。

sink.delete-strategy​

リトラクション (-D/-U) メッセージを正しく処理していることを保証するための検証ポリシーを設定します。

​​

Enum

いいえ

NONE

検証ポリシーの有効値およびリトラクションメッセージを処理する際のシンクオペレーターの期待される動作は次のとおりです。​

  • ​NONE (デフォルト):検証は実行されません。​

  • IGNORE_DELETE:シンクオペレーターは -U および -D メッセージを無視します。リトラクションは発生しません。

  • NON_PK_FIELD_TO_NULL:シンクオペレーターは -U メッセージを無視します。ただし、-D メッセージを受信した場合、プライマリキー値を保持し、スキーマ内の他の非プライマリキー値をリトラクトします。

    これは、複数のシンクが同時に同じテーブルに書き込みを行う部分更新シナリオで主に使用されます。​

  • DELETE_ROW_ON_PK:シンクオペレーターは -U メッセージを無視します。ただし、-D メッセージを受信した場合、プライマリキーに対応する行を削除します。​

  • CHANGELOG_STANDARD:シンクオペレーターは、-U または -D データを受信した場合、プライマリキーに対応する行を削除します。​

説明
  • このパラメーターは、Realtime Compute for Apache Flink VVR 8.0.8 以降でのみサポートされます。

  • Paimon シンクがリトラクションメッセージを処理する際の動作は、実際には `ignore-delete` や `merge-engine` などの他の設定項目によって決定されます。この設定項目は、その動作に直接影響を与えるものではなく、期待されるポリシーに動作が合致しているかどうかを検証するものです。動作が期待されるポリシーに合致しない場合、検証ステップが終了し、ジョブのエラーメッセージに `ignore-delete` や `merge-engine` などの他の設定項目をどのように修正すれば期待に合致するかが表示されます。

説明

設定項目の詳細については、公式 Apache Paimon ドキュメントをご参照ください。

機能の詳細

データ鮮度および一貫性保証

Paimon 結果テーブルは、Flink ジョブの各チェックポイント中に、書き込まれたデータをコミットするために 2 相コミットプロトコルを使用します。そのため、データ鮮度は Flink ジョブのチェックポイント間隔と同じになります。各コミットにより、最大 2 つのスナップショットが生成されます。

2 つの Flink ジョブが同時に同じ Paimon テーブルに書き込む場合、2 つのジョブからのデータが同じバケットに書き込まれない場合、直列化可能性の一貫性が保証されます。2 つのジョブからのデータが同じバケットに書き込まれる場合、スナップショット分離の一貫性のみが保証されます。つまり、テーブル内のデータは両方のジョブの結果が混在したものになる可能性がありますが、データが失われることはありません。

データマージの仕組み

Paimon 結果テーブルが同じプライマリキーを持つ複数のレコードを受信すると、これらのレコードを単一のレコードにマージして、プライマリキーの一意性を維持します。merge-engine パラメーターを設定することで、データマージ動作を指定できます。以下の表は、データマージメカニズムを示しています。

マージメカニズム

詳細

Deduplicate

Deduplicate メカニズムは、デフォルトのデータマージメカニズムです。同じプライマリキーを持つ複数のレコードについて、Paimon 結果テーブルは最新のレコードのみを保持し、他のレコードは破棄します。

説明

最新のレコードが削除メッセージの場合、そのプライマリキーを持つすべてのレコードが破棄されます。

Partial Update

Partial Update メカニズムを指定することで、複数のメッセージを通じてデータを段階的に更新し、最終的に完全なデータを取得できます。具体的には、同じプライマリキーを持つ新しいデータは古いデータを上書きしますが、NULL 値を持つ列は上書きされません。

たとえば、Paimon 結果テーブルが次の 3 つのレコードを順番に受信するとします。

  • <1, 23.0, 10, NULL>

  • <1, NULL, NULL, 'This is a book'>

  • <1, 25.2, NULL, NULL>

最初の列がプライマリキーです。最終的な結果は <1, 25.2, 10, 'This is a book'> になります。

説明
  • Partial Update の結果をストリーミング読み取りするには、changelog-producer パラメーターを `lookup` または `full-compaction` に設定する必要があります。

  • Partial Update メカニズムは削除メッセージを処理できません。partial-update.ignore-delete パラメーターを設定して、削除メッセージを無視できます。

Aggregation

一部のシナリオでは、集計値のみに関心がある場合があります。Aggregation メカニズムは、指定した集計関数に基づいて同じプライマリキーを持つデータを集約します。プライマリキーの一部ではない各列について、fields.<field-name>.aggregate-function を使用して集計関数を指定する必要があります。指定しない場合、その列にはデフォルトで last_non_null_value 集計関数が使用されます。たとえば、次の Paimon テーブル定義を考えてみます。

CREATE TABLE MyTable (
  product_id BIGINT,
  price DOUBLE,
  sales BIGINT,
  PRIMARY KEY (product_id) NOT ENFORCED
) WITH (
  'merge-engine' = 'aggregation',
  'fields.price.aggregate-function' = 'max',
  'fields.sales.aggregate-function' = 'sum'
);

price 列は max 関数を使用して集約され、sales 列は sum 関数を使用して集約されます。<1, 23.0, 15> および <1, 30.2, 20> の 2 つの入力レコードが与えられた場合、最終的な結果は <1, 30.2, 35> になります。現在サポートされている集計関数および対応するデータの型は次のとおりです。

  • sum:DECIMAL、TINYINT、SMALLINT、INTEGER、BIGINT、FLOAT、DOUBLE をサポートします。

  • min および max:DECIMAL、TINYINT、SMALLINT、INTEGER、BIGINT、FLOAT、DOUBLE、DATE、TIME、TIMESTAMP、TIMESTAMP_LTZ をサポートします。

  • last_value および last_non_null_value:すべてのデータの型をサポートします。

  • listagg:STRING をサポートします。

  • bool_and および bool_or:BOOLEAN をサポートします。

説明
  • sum 関数のみがデータのリトラクションおよび削除をサポートします。他の集計関数はサポートしません。特定の列でリトラクションおよび削除メッセージを無視する必要がある場合は、'fields.${field_name}.ignore-retract'='true' を設定できます。

  • Aggregation の結果をストリーミング読み取りするには、changelog-producer パラメーターを `lookup` または `full-compaction` に設定する必要があります。

増分データ生成メカニズム

changelog-producer パラメーターを適切な増分データ生成メカニズムに設定することで、Paimon は任意の入力データストリームに対して完全な増分データを生成できます。完全な増分データとは、すべての update_after データに対応する update_before データが存在することを意味します。以下に、すべての増分データ生成メカニズムを示します。詳細については、公式 Apache Paimon ドキュメントをご参照ください。

メカニズム

詳細

None

changelog-producer を `none` (デフォルト値) に設定すると、ダウンストリームの Paimon ソーステーブルは、同じプライマリキーを持つデータの最新状態のみを確認できます。ただし、この最新状態では、ダウンストリームのコンシューマーが正しい計算を行うための完全な増分データを簡単に理解することはできません。これは、対応するデータが削除されたのか、最新のデータが何かを判断できますが、変更前のデータが何だったかを判断できないためです。

たとえば、ダウンストリームのコンシューマーが列の合計を計算する必要があるとします。コンシューマーが最新のデータ (5) のみを確認した場合、合計をどのように更新すべきかを判断できません。前のデータが 4 だった場合、合計を 1 増やす必要があります。前のデータが 6 だった場合、合計を 1 減らす必要があります。このようなコンシューマーは update_before データに敏感です。none に設定しないことを推奨します。ただし、他のメカニズムはパフォーマンスオーバーヘッドを伴います。

説明

データベースなど、update_before データに敏感でないダウンストリームのコンシューマーを使用する場合、増分データ生成メカニズムを `none` に設定できます。そのため、実際のニーズに基づいてメカニズムを設定することを推奨します。

Input

changelog-producer を `input` に設定すると、Paimon 結果テーブルは入力データストリームをメインテーブルおよび増分データファイルの両方に書き込みます。

したがって、このメカニズムは、入力データストリーム自体が完全な増分データ (Change Data Capture (CDC) データなど) である場合にのみ使用できます。

Lookup

changelog-producer を `lookup` に設定すると、Paimon 結果テーブルは、各コミット前に現在のスナップショットの完全な増分データを生成するために、ディメンションテーブルのポイントクエリに類似したメカニズムを使用します。このメカニズムは、入力データが完全な増分データであるかどうかに関係なく、完全な増分データを生成できます。

以下で説明する Full Compaction メカニズムと比較して、Lookup メカニズムは増分データの生成においてより優れたタイムリー性を提供しますが、全体的により多くのリソースを消費します。

分単位のデータ鮮度など、データ鮮度の要件が高い場合にこのメカニズムの使用を推奨します。

Full Compaction

changelog-producer を `full-compaction` に設定すると、Paimon 結果テーブルは各完全コンパクション中に完全な増分データを生成します。このメカニズムは、入力データが完全な増分データであるかどうかに関係なく、完全な増分データを生成できます。完全コンパクションの間隔は、full-compaction.delta-commits パラメーターで指定されます。

Lookup メカニズムと比較して、Full Compaction メカニズムは増分データの生成においてタイムリー性が劣ります。ただし、データの完全コンパクションプロセスを利用し、追加の計算を生成しないため、全体的により少ないリソースを消費します。

時間単位のデータ鮮度など、データ鮮度の要件が高くない場合にこのメカニズムの使用を推奨します。

書き込みモード

Paimon テーブルは現在、以下の書き込みモードをサポートしています。

モード

詳細

Change-log

Change-log 書き込みモードは、Paimon テーブルのデフォルトの書き込みモードです。このモードは、プライマリキーに基づいてデータの挿入、削除、および更新をサポートします。また、このモードでは、前述のデータマージおよび増分データ生成メカニズムも使用できます。

Append-only

Append-only 書き込みモードは、データ挿入のみをサポートし、プライマリキーをサポートしません。このモードは change-log モードよりも効率的であり、分単位のデータ鮮度など、データ鮮度の要件が高くないシナリオでメッセージキューの代替として使用できます。

Append-only 書き込みモードの詳細については、公式 Apache Paimon ドキュメントをご参照ください。Append-only 書き込みモードを使用する際は、以下の点に注意してください。

  • 実際のニーズに基づいて bucket-key パラメーターを設定することを推奨します。設定しない場合、Paimon テーブルはすべての列の値に基づいてバケット分割を行い、効率が低下します。

  • Append-only 書き込みモードは、ある程度データの出力順序を保証できます。具体的な出力順序は次のとおりです。

    1. 2 つのレコードが異なるパーティションに属する場合、scan.plan-sort-partition パラメーターが設定されている場合、パーティション値が小さいレコードが先に出力されます。設定されていない場合、先に作成されたパーティションのデータが先に出力されます。

    2. 2 つのレコードが同じパーティションの同じバケットに属する場合、先に書き込まれたデータが先に出力されます。

    3. 2 つのレコードが同じパーティションの異なるバケットに属する場合、出力順序は保証されません。これは、異なるバケットが異なる並行タスクによって処理されるためです。

CTAS および CDAS の宛先として

Paimon テーブルは、単一テーブルレベルまたはデータベース全体レベルでデータのリアルタイム同期をサポートします。同期中にアップストリームテーブルのスキーマが変更された場合、その変更も Paimon テーブルにリアルタイムで同期されます。詳細については、「Paimon テーブルの管理」および「Paimon Catalog の管理」をご参照ください。

データインジェスト

Paimon コネクタは、YAML ジョブ開発でデータインジェストのシンクとして使用できます。

構文

sink:
  type: paimon
  name: Paimon Sink
  catalog.properties.metastore: filesystem
  catalog.properties.warehouse: /path/warehouse

設定項目

パラメーター

説明

必須

データの型

デフォルト値

備考

type

コネクタタイプ。

はい

STRING

なし

静的フィールドは paimon に設定されます。

name

シンク名。

いいえ

STRING

なし

シンクの名前。

catalog.properties.metastore

Paimon Catalog のタイプ。

いいえ

STRING

filesystem

有効値:

  • filesystem (デフォルト)

  • rest (DLF のみサポート。DLF-Legacy はサポートされません。)

catalog.properties.*

Paimon Catalog を作成するためのパラメーター。

いいえ

STRING

なし

詳細については、「Paimon Catalog の管理」をご参照ください。

table.properties.*

Paimon テーブルを作成するためのパラメーター。

いいえ

STRING

なし

詳細については、「Paimon テーブルオプション」をご参照ください。

catalog.properties.warehouse

ファイルストレージのルートディレクトリ。

いいえ

STRING

なし

このパラメーターは、catalog.properties.metastore が `filesystem` に設定されている場合にのみ有効になります。

commit.user-prefix

データファイルをコミットする際のユーザー名プレフィックス。

いいえ

STRING

なし

説明

コミット競合が発生した場合に競合ジョブを簡単に特定できるように、異なるジョブに異なるユーザー名を設定することを推奨します。

partition.key

各パーティションテーブルのパーティションフィールド。

いいえ

STRING

なし

異なるテーブルはセミコロン (;) で区切り、異なるフィールドはカンマ (,) で区切り、テーブルとフィールドはコロン (:) で区切ります。例:testdb.table1:id1,id2;testdb.table2:name

sink.cross-partition-upsert.tables

プライマリキーにすべてのパーティションフィールドが含まれていないため、パーティション間の更新を必要とするテーブルを指定します。

いいえ

STRING

なし

パーティション間で更新するテーブル。

  • 形式:テーブル名はセミコロン (;) で区切ります。

  • パフォーマンスに関する推奨事項:この操作は大量の計算リソースを消費します。これらのテーブルに対して個別のジョブを作成してください。

重要
  • 該当するすべてのテーブルをリストアップしてください。テーブル名を省略すると、データが重複する可能性があります。

以下の例は、Paimon Catalog タイプに基づいて Paimon をデータインジェストシンクとして設定する方法を示しています。

  • Paimon Catalog が filesystem の場合、Alibaba Cloud OSS への書き込みの設定例:

    source:
      type: mysql
      name: MySQL Source
      hostname: ${secret_values.mysql.hostname}
      port: ${mysql.port}
      username: ${secret_values.mysql.username}
      password: ${secret_values.mysql.password}
      tables: ${mysql.source.table}
      server-id: 8601-8604
    
    sink:
      type: paimon
      name: Paimon Sink
      catalog.properties.metastore: filesystem
      catalog.properties.warehouse: oss://default/test
      catalog.properties.fs.oss.endpoint: oss-cn-beijing-internal.aliyuncs.com
      catalog.properties.fs.oss.accessKeyId: xxxxxxxx
      catalog.properties.fs.oss.accessKeySecret: xxxxxxxx

    catalog.properties で始まるパラメーターの詳細については、「Paimon Filesystem Catalog の作成」をご参照ください。

  • Paimon Catalog が rest の場合、Alibaba Cloud Data Lake Formation への書き込みの設定例:

    source:
      type: mysql
      name: MySQL Source
      hostname: ${secret_values.mysql.hostname}
      port: ${mysql.port}
      username: ${secret_values.mysql.username}
      password: ${secret_values.mysql.password}
      tables: ${mysql.source.table}
      server-id: 8601-8604
    
    sink:
      type: paimon
      name: Paimon Sink
      catalog.properties.metastore: rest
      catalog.properties.uri: dlf_uri
      catalog.properties.warehouse: your_warehouse
      catalog.properties.token.provider: dlf
      # (オプション) 削除ベクターを有効にして読み取りパフォーマンスを向上させます。
      table.properties.deletion-vectors.enabled: true

    catalog.properties で始まるパラメーターの詳細については、「Flink CDC Catalog 設定パラメーター」をご参照ください。

スキーマ進化

現在、データインジェストシンクとしての Paimon は、以下のスキーマ進化イベントをサポートしています。

  • CREATE TABLE EVENT

  • ADD COLUMN EVENT

  • ALTER COLUMN TYPE EVENT (プライマリキー列の型の変更はサポートされていません)

  • RENAME COLUMN EVENT

  • DROP COLUMN EVENT

  • TRUNCATE TABLE EVENT

  • DROP TABLE EVENT

説明

ダウンストリームの Paimon テーブルがすでに存在する場合、その既存のスキーマが書き込みに使用されます。システムは再度テーブルを作成しようとしません。

よくある質問