すべてのプロダクト
Search
ドキュメントセンター

Realtime Compute for Apache Flink:Paimon テーブルへのデータの書き込みと消費

最終更新日:Jan 14, 2026

このトピックでは、Realtime Compute for Apache Flink の開発コンソールを使用して Paimon テーブルにデータを挿入、更新、上書き、または削除する方法について説明します。また、Paimon テーブルからデータを消費し、コンシューマオフセットを指定する方法についても説明します。

前提条件

Paimon Catalog と Paimon テーブルが作成済みであること。詳細については、「Paimon Catalog の管理」をご参照ください。

制限事項

Ververica Runtime (VVR) 8.0.5 以降を使用する Realtime Compute for Apache Flink のみが Paimon テーブルをサポートします。

Paimon テーブルへのデータの書き込み

CTAS 文または CDAS 文を使用したデータとスキーマ変更の同期

詳細については、「Paimon カタログの管理」をご参照ください。

INSERT INTO 文を使用したデータの挿入または更新

INSERT INTO 文を使用して、Paimon テーブルにデータを挿入または更新できます。

INSERT OVERWRITE 文を使用したデータの上書き

データの上書きとは、データをクリアして再書き込みすることです。INSERT OVERWRITE 文を使用して、Paimon テーブル全体または特定のパーティションを上書きできます。以下の例に SQL 文を示します。

説明
  • INSERT OVERWRITE 文はバッチジョブでのみサポートされます。

  • デフォルトでは、INSERT OVERWRITE 操作は changelog データを生成しません。削除およびインポートされたデータは、ダウンストリームのストリーミングジョブでは消費できません。このタイプのデータを消費するには、「INSERT OVERWRITE 文の結果のストリーミングと消費」をご参照ください。

  • 非パーティションテーブル my_table 全体を上書きします。

    INSERT OVERWRITE my_table SELECT ...;
  • my_table テーブルの dt=20240108,hh=06 パーティションを上書きします。

    INSERT OVERWRITE my_table PARTITION (`dt` = '20240108', `hh` = '06') SELECT ...;
  • my_table テーブルのパーティションを動的に上書きします。SELECT 文の結果に現れるパーティションが上書きされます。他のパーティションは変更されません。

    INSERT OVERWRITE my_table SELECT ...;
  • パーティションテーブル my_table 全体を上書きします。

    INSERT OVERWRITE my_table /*+ OPTIONS('dynamic-partition-overwrite' = 'false') */ SELECT ...;

DELETE 文を使用したデータの削除

DELETE 文を使用して、Paimon プライマリキーテーブルからデータを削除できます。DELETE 文は データ探索 でのみ実行できます。

-- my_table テーブルから currency = 'UNKNOWN' のすべてのデータを削除します。
DELETE FROM my_table WHERE currency = 'UNKNOWN';

DELETE メッセージのフィルタリング

デフォルトでは、Paimon プライマリキーテーブルを使用する場合、DELETE メッセージは対応するプライマリキーを持つデータを削除します。Paimon テーブルでこれらのメッセージを処理したくない場合は、SQL ヒントを使用して次のパラメーターを `true` に設定してフィルターできます。

パラメーター

説明

データ型

デフォルト値

ignore-delete

DELETE メッセージをフィルターするかどうかを指定します。

ブール値

false

結果テーブルの並列度の調整

SQL ヒントを使用して次のパラメーターを設定し、結果テーブルオペレーターの並列度を手動で調整できます。

パラメーター

説明

データ型

デフォルト値

sink.parallelism

Paimon 結果テーブル オペレーターの並列度を手動で設定します。

整数

なし

たとえば、次の SQL 文は、Paimon 結果テーブルオペレーターの並列度を 10 に手動で設定します。

INSERT INTO t /*+ OPTIONS('sink.parallelism' = '10') */ SELECT * FROM s;

Paimon テーブルからのデータの消費

ストリーミングジョブ

説明

ストリーミングジョブによって消費される Paimon プライマリキーテーブルの場合、changelog プロデューサーを設定する必要があります。

デフォルトでは、ストリーミングジョブの Paimon ソースオペレーターは、ジョブの開始時にまず Paimon テーブルから完全データを生成します。その後、オペレーターはその時点から Paimon テーブルの増分データを生成します。

指定したオフセットからのデータの消費

Paimon テーブルから指定したオフセットのデータを消費するには、次のいずれかの方法を使用します。

  • ジョブの開始時に Paimon テーブルの完全データではなく、後続の増分データのみを消費したい場合は、SQL ヒントを使用して 'scan.mode' = 'latest' を設定します。

    SELECT * FROM t /*+ OPTIONS('scan.mode' = 'latest') */;
  • 完全データを消費せず、指定した時点からの増分データのみを消費したい場合は、SQL ヒントを使用して scan.timestamp-millis パラメーターを設定します。このパラメーターは、UNIX エポック (1970-01-01 00:00:00 UTC) から目的の時点までに経過したミリ秒数を指定します。

    SELECT * FROM t /*+ OPTIONS('scan.timestamp-millis' = '1678883047356') */;
  • 指定した時点以降に書き込まれた完全データを消費し、その後も継続して増分データを消費したい場合は、次の 2 つの操作のいずれかを実行できます。

    説明

    この消費方法では、指定した時点以降に変更されたデータファイルを読み取ります。small ファイルのコンパクションのため、データファイルには指定した時点より前に書き込まれた少量のデータが含まれている場合があります。必要に応じて、SQL ジョブに WHERE フィルター条件を追加してデータをフィルターできます。

    • SQL ヒントは設定しません。ジョブを開始するときに、[ソース開始時刻の指定] を選択し、時刻情報を指定します。image.png

    • SQL ヒントを使用して scan.file-creation-time-millis パラメーターを設定します。

      SELECT * FROM t /*+ OPTIONS('scan.file-creation-time-millis' = '1678883047356') */;
  • 完全データを消費せず、特定のスナップショットから増分データのみを消費したい場合は、SQL ヒントを使用して scan.snapshot-id パラメーターを設定します。このパラメーターはスナップショットの ID を指定します。

    SELECT * FROM t /*+ OPTIONS('scan.snapshot-id' = '3') */;
  • 特定のスナップショットの完全データを消費し、その後も継続して増分データを消費したい場合は、SQL ヒントを使用して 'scan.mode' = 'from-snapshot-full'scan.snapshot-id パラメーターを設定します。scan.snapshot-id パラメーターはスナップショットの ID を指定します。

    SELECT * FROM t /*+ OPTIONS('scan.mode' = 'from-snapshot-full', 'scan.snapshot-id' = '1') */;

コンシューマー ID の指定

コンシューマー ID は Paimon テーブルの消費の進捗を保存し、主に次のシナリオで使用されます。

  • SQL ジョブの計算ロジックを変更すると、ジョブのトポロジーが変更される可能性があり、Flink ステートから消費の進捗を回復できなくなります。コンシューマー ID を設定すると、その ID の消費の進捗が Paimon テーブルのメタデータファイルに保存されます。これにより、後でジョブがステートレスモードで開始された場合でも、中断した時点から消費を再開できます。

  • コンシューマー ID を設定すると、未消費のスナップショットは有効期限が切れても削除されません。これにより、消費速度がスナップショットの有効期限切れの速度に追いつかない場合に発生するエラーを防ぐことができます。

ストリーミングジョブの Paimon ソースオペレーターにコンシューマー ID を割り当てるには、consumer-id パラメーターを設定します。値は任意の文字列にすることができます。コンシューマー ID を初めて作成する場合、その開始コンシューマオフセットは「指定したオフセットからのデータの消費」のルールによって決定されます。その後、同じコンシューマー ID を使用し続けると、Paimon テーブルからの消費を再開できます。

たとえば、次の SQL 文は、Paimon ソースオペレーターに test-id という名前のコンシューマー ID を設定する方法を示しています。コンシューマー ID に対応するコンシューマオフセットをリセットしたい場合は、'consumer.ignore-progress' = 'true' を設定することもできます。

SELECT * FROM t /*+ OPTIONS('consumer-id' = 'test-id') */;
説明

コンシューマー ID の未消費のスナップショットは有効期限が切れても削除されないため、古いコンシューマー ID をタイムリーにクリアしないと、スナップショットとそれに対応する履歴データファイルが削除されず、ストレージ領域を占有し続けます。consumer.expiration-time テーブルパラメーターを設定して、指定した期間使用されていないコンシューマー ID をクリアできます。たとえば、'consumer.expiration-time' = '3d' は、3 日間使用されていないコンシューマー ID をクリアします。

INSERT OVERWRITE 文の結果のストリーミングと消費

デフォルトでは、INSERT OVERWRITE 操作は changelog データを生成しません。削除およびインポートされたデータは、ダウンストリームのストリーミングジョブでは消費できません。このタイプのデータを消費するには、SQL ヒントを使用してストリーミングジョブで 'streaming-read-overwrite' = 'true' を設定します。

SELECT * FROM t /*+ OPTIONS('streaming-read-overwrite' = 'true') */;

バッチジョブ

デフォルトでは、バッチジョブの Paimon ソースオペレーターは最新のスナップショットを読み取り、Paimon テーブルの最新の状態データを出力します。

バッチタイムトラベル

SQL ヒントを使用して scan.timestamp-millis パラメーターを設定し、その時点での Paimon テーブルの状態をクエリできます。このパラメーターは、UNIX エポック (1970-01-01 00:00:00 UTC) から指定した時点までに経過したミリ秒数を指定します。

SELECT * FROM t /*+ OPTIONS('scan.timestamp-millis' = '1678883047356') */;

SQL ヒントを使用して scan.snapshot-id パラメーターを設定し、スナップショットが生成された時点での Paimon テーブルの状態をクエリできます。このパラメーターはスナップショットの ID を指定します。

SELECT * FROM t /*+ OPTIONS('scan.snapshot-id' = '3') */;

2 つのスナップショット間のデータ変更のクエリ

2 つのスナップショット間の Paimon テーブルのデータ変更をクエリしたい場合は、SQL ヒントを使用して incremental-between パラメーターを設定できます。たとえば、スナップショット 20 とスナップショット 12 の間に変更されたすべてのデータを表示するには、次の SQL 文を使用します。

SELECT * FROM t /*+ OPTIONS('incremental-between' = '12,20') */;
説明

バッチジョブは DELETE メッセージの消費をサポートしていないため、これらのメッセージはデフォルトで破棄されます。バッチジョブで DELETE メッセージを消費したい場合は、Audit Log システムテーブルをクエリできます。たとえば、SELECT * FROM `t$audit_log ` /*+ OPTIONS('incremental-between' = '12,20') */; のようにします。

ソーステーブルの並列度の調整

デフォルトでは、Paimon はパーティションやバケットの数などの情報に基づいて、ソースオペレーターの並列度を自動的に推測します。SQL ヒントを使用して次のパラメーターを設定し、ソースオペレーターの並列度を手動で調整できます。

パラメーター

データ型

デフォルト値

備考

scan.parallelism

整数

なし

Paimon ソースオペレーターの並列度を手動で設定します。

scan.infer-parallelism

ブール値

true

Paimon ソースオペレーターの並列度を自動的に推測するかどうかを指定します。

scan.infer-parallelism.max

整数

1024

自動的に推測される Paimon ソースオペレーターの並列度の上限。

以下は、Paimon ソースオペレーターの並列度を 10 に手動で設定する SQL 文の例です。

SELECT * FROM t /*+ OPTIONS('scan.parallelism' = '10') */;

Paimon テーブルのディメンションテーブルとしての使用

Paimon テーブルはディメンションテーブルとしても使用できます。ディメンションテーブル JOIN の構文の詳細については、「ディメンションテーブル JOIN 文」をご参照ください。

半構造化データ型 VARIANT の書き込みと消費

Ververica Runtime (VVR) 11.1 以降では、Paimon テーブルに半構造化データ型 VARIANT が導入されました。この型を使用すると、PARSE_JSON または TRY_PARSE_JSON を使用して VARCHAR 型の JSON 文字列を VARIANT データ型に変換できます。VARIANT 型のデータを直接書き込んで消費することで、JSON データのクエリと処理のパフォーマンスが大幅に向上します。

以下は SQL 文のサンプルです。

CREATE TABLE `my-catalog`.`my_db`.`my_tbl` (
  k BIGINT,
  info VARIANT
);

INSERT INTO `my-catalog`.`my_db`.`my_tbl` 
SELECT k, PARSE_JSON(jsonStr) FROM T;

関連ドキュメント