このトピックでは、Realtime Compute for Apache Flink の開発コンソールを使用して Paimon テーブルにデータを挿入、更新、上書き、または削除する方法について説明します。また、Paimon テーブルからデータを消費し、コンシューマオフセットを指定する方法についても説明します。
前提条件
Paimon Catalog と Paimon テーブルが作成済みであること。詳細については、「Paimon Catalog の管理」をご参照ください。
制限事項
Ververica Runtime (VVR) 8.0.5 以降を使用する Realtime Compute for Apache Flink のみが Paimon テーブルをサポートします。
Paimon テーブルへのデータの書き込み
CTAS 文または CDAS 文を使用したデータとスキーマ変更の同期
詳細については、「Paimon カタログの管理」をご参照ください。
INSERT INTO 文を使用したデータの挿入または更新
INSERT INTO 文を使用して、Paimon テーブルにデータを挿入または更新できます。
Paimon プライマリキーテーブルは、INSERT、UPDATE_BEFORE、UPDATE_AFTER、DELETE を含むすべてのタイプのメッセージを受け入れることができます。同じプライマリキーを持つデータは、データマージメカニズムに基づいて書き込み後にマージされます。
Paimon 追加専用テーブル (非プライマリキーテーブル) は、INSERT メッセージのみを受け入れることができます。
INSERT OVERWRITE 文を使用したデータの上書き
データの上書きとは、データをクリアして再書き込みすることです。INSERT OVERWRITE 文を使用して、Paimon テーブル全体または特定のパーティションを上書きできます。以下の例に SQL 文を示します。
INSERT OVERWRITE 文はバッチジョブでのみサポートされます。
デフォルトでは、INSERT OVERWRITE 操作は changelog データを生成しません。削除およびインポートされたデータは、ダウンストリームのストリーミングジョブでは消費できません。このタイプのデータを消費するには、「INSERT OVERWRITE 文の結果のストリーミングと消費」をご参照ください。
非パーティションテーブル my_table 全体を上書きします。
INSERT OVERWRITE my_table SELECT ...;my_table テーブルの
dt=20240108,hh=06パーティションを上書きします。INSERT OVERWRITE my_table PARTITION (`dt` = '20240108', `hh` = '06') SELECT ...;my_table テーブルのパーティションを動的に上書きします。SELECT 文の結果に現れるパーティションが上書きされます。他のパーティションは変更されません。
INSERT OVERWRITE my_table SELECT ...;パーティションテーブル my_table 全体を上書きします。
INSERT OVERWRITE my_table /*+ OPTIONS('dynamic-partition-overwrite' = 'false') */ SELECT ...;
DELETE 文を使用したデータの削除
DELETE 文を使用して、Paimon プライマリキーテーブルからデータを削除できます。DELETE 文は データ探索 でのみ実行できます。
-- my_table テーブルから currency = 'UNKNOWN' のすべてのデータを削除します。
DELETE FROM my_table WHERE currency = 'UNKNOWN';DELETE メッセージのフィルタリング
デフォルトでは、Paimon プライマリキーテーブルを使用する場合、DELETE メッセージは対応するプライマリキーを持つデータを削除します。Paimon テーブルでこれらのメッセージを処理したくない場合は、SQL ヒントを使用して次のパラメーターを `true` に設定してフィルターできます。
パラメーター | 説明 | データ型 | デフォルト値 |
ignore-delete | DELETE メッセージをフィルターするかどうかを指定します。 | ブール値 | false |
結果テーブルの並列度の調整
SQL ヒントを使用して次のパラメーターを設定し、結果テーブルオペレーターの並列度を手動で調整できます。
パラメーター | 説明 | データ型 | デフォルト値 |
sink.parallelism | Paimon 結果テーブル オペレーターの並列度を手動で設定します。 | 整数 | なし |
たとえば、次の SQL 文は、Paimon 結果テーブルオペレーターの並列度を 10 に手動で設定します。
INSERT INTO t /*+ OPTIONS('sink.parallelism' = '10') */ SELECT * FROM s;Paimon テーブルからのデータの消費
ストリーミングジョブ
ストリーミングジョブによって消費される Paimon プライマリキーテーブルの場合、changelog プロデューサーを設定する必要があります。
デフォルトでは、ストリーミングジョブの Paimon ソースオペレーターは、ジョブの開始時にまず Paimon テーブルから完全データを生成します。その後、オペレーターはその時点から Paimon テーブルの増分データを生成します。
指定したオフセットからのデータの消費
Paimon テーブルから指定したオフセットのデータを消費するには、次のいずれかの方法を使用します。
ジョブの開始時に Paimon テーブルの完全データではなく、後続の増分データのみを消費したい場合は、SQL ヒントを使用して
'scan.mode' = 'latest'を設定します。SELECT * FROM t /*+ OPTIONS('scan.mode' = 'latest') */;完全データを消費せず、指定した時点からの増分データのみを消費したい場合は、SQL ヒントを使用して
scan.timestamp-millisパラメーターを設定します。このパラメーターは、UNIX エポック (1970-01-01 00:00:00 UTC) から目的の時点までに経過したミリ秒数を指定します。SELECT * FROM t /*+ OPTIONS('scan.timestamp-millis' = '1678883047356') */;指定した時点以降に書き込まれた完全データを消費し、その後も継続して増分データを消費したい場合は、次の 2 つの操作のいずれかを実行できます。
説明この消費方法では、指定した時点以降に変更されたデータファイルを読み取ります。small ファイルのコンパクションのため、データファイルには指定した時点より前に書き込まれた少量のデータが含まれている場合があります。必要に応じて、SQL ジョブに WHERE フィルター条件を追加してデータをフィルターできます。
SQL ヒントは設定しません。ジョブを開始するときに、[ソース開始時刻の指定] を選択し、時刻情報を指定します。

SQL ヒントを使用して
scan.file-creation-time-millisパラメーターを設定します。SELECT * FROM t /*+ OPTIONS('scan.file-creation-time-millis' = '1678883047356') */;
完全データを消費せず、特定のスナップショットから増分データのみを消費したい場合は、SQL ヒントを使用して
scan.snapshot-idパラメーターを設定します。このパラメーターはスナップショットの ID を指定します。SELECT * FROM t /*+ OPTIONS('scan.snapshot-id' = '3') */;特定のスナップショットの完全データを消費し、その後も継続して増分データを消費したい場合は、SQL ヒントを使用して
'scan.mode' = 'from-snapshot-full'とscan.snapshot-idパラメーターを設定します。scan.snapshot-idパラメーターはスナップショットの ID を指定します。SELECT * FROM t /*+ OPTIONS('scan.mode' = 'from-snapshot-full', 'scan.snapshot-id' = '1') */;
コンシューマー ID の指定
コンシューマー ID は Paimon テーブルの消費の進捗を保存し、主に次のシナリオで使用されます。
SQL ジョブの計算ロジックを変更すると、ジョブのトポロジーが変更される可能性があり、Flink ステートから消費の進捗を回復できなくなります。コンシューマー ID を設定すると、その ID の消費の進捗が Paimon テーブルのメタデータファイルに保存されます。これにより、後でジョブがステートレスモードで開始された場合でも、中断した時点から消費を再開できます。
コンシューマー ID を設定すると、未消費のスナップショットは有効期限が切れても削除されません。これにより、消費速度がスナップショットの有効期限切れの速度に追いつかない場合に発生するエラーを防ぐことができます。
ストリーミングジョブの Paimon ソースオペレーターにコンシューマー ID を割り当てるには、consumer-id パラメーターを設定します。値は任意の文字列にすることができます。コンシューマー ID を初めて作成する場合、その開始コンシューマオフセットは「指定したオフセットからのデータの消費」のルールによって決定されます。その後、同じコンシューマー ID を使用し続けると、Paimon テーブルからの消費を再開できます。
たとえば、次の SQL 文は、Paimon ソースオペレーターに test-id という名前のコンシューマー ID を設定する方法を示しています。コンシューマー ID に対応するコンシューマオフセットをリセットしたい場合は、'consumer.ignore-progress' = 'true' を設定することもできます。
SELECT * FROM t /*+ OPTIONS('consumer-id' = 'test-id') */;コンシューマー ID の未消費のスナップショットは有効期限が切れても削除されないため、古いコンシューマー ID をタイムリーにクリアしないと、スナップショットとそれに対応する履歴データファイルが削除されず、ストレージ領域を占有し続けます。consumer.expiration-time テーブルパラメーターを設定して、指定した期間使用されていないコンシューマー ID をクリアできます。たとえば、'consumer.expiration-time' = '3d' は、3 日間使用されていないコンシューマー ID をクリアします。
INSERT OVERWRITE 文の結果のストリーミングと消費
デフォルトでは、INSERT OVERWRITE 操作は changelog データを生成しません。削除およびインポートされたデータは、ダウンストリームのストリーミングジョブでは消費できません。このタイプのデータを消費するには、SQL ヒントを使用してストリーミングジョブで 'streaming-read-overwrite' = 'true' を設定します。
SELECT * FROM t /*+ OPTIONS('streaming-read-overwrite' = 'true') */;バッチジョブ
デフォルトでは、バッチジョブの Paimon ソースオペレーターは最新のスナップショットを読み取り、Paimon テーブルの最新の状態データを出力します。
バッチタイムトラベル
SQL ヒントを使用して scan.timestamp-millis パラメーターを設定し、その時点での Paimon テーブルの状態をクエリできます。このパラメーターは、UNIX エポック (1970-01-01 00:00:00 UTC) から指定した時点までに経過したミリ秒数を指定します。
SELECT * FROM t /*+ OPTIONS('scan.timestamp-millis' = '1678883047356') */;SQL ヒントを使用して scan.snapshot-id パラメーターを設定し、スナップショットが生成された時点での Paimon テーブルの状態をクエリできます。このパラメーターはスナップショットの ID を指定します。
SELECT * FROM t /*+ OPTIONS('scan.snapshot-id' = '3') */;2 つのスナップショット間のデータ変更のクエリ
2 つのスナップショット間の Paimon テーブルのデータ変更をクエリしたい場合は、SQL ヒントを使用して incremental-between パラメーターを設定できます。たとえば、スナップショット 20 とスナップショット 12 の間に変更されたすべてのデータを表示するには、次の SQL 文を使用します。
SELECT * FROM t /*+ OPTIONS('incremental-between' = '12,20') */;バッチジョブは DELETE メッセージの消費をサポートしていないため、これらのメッセージはデフォルトで破棄されます。バッチジョブで DELETE メッセージを消費したい場合は、Audit Log システムテーブルをクエリできます。たとえば、SELECT * FROM `t$audit_log ` /*+ OPTIONS('incremental-between' = '12,20') */; のようにします。
ソーステーブルの並列度の調整
デフォルトでは、Paimon はパーティションやバケットの数などの情報に基づいて、ソースオペレーターの並列度を自動的に推測します。SQL ヒントを使用して次のパラメーターを設定し、ソースオペレーターの並列度を手動で調整できます。
パラメーター | データ型 | デフォルト値 | 備考 |
scan.parallelism | 整数 | なし | Paimon ソースオペレーターの並列度を手動で設定します。 |
scan.infer-parallelism | ブール値 | true | Paimon ソースオペレーターの並列度を自動的に推測するかどうかを指定します。 |
scan.infer-parallelism.max | 整数 | 1024 | 自動的に推測される Paimon ソースオペレーターの並列度の上限。 |
以下は、Paimon ソースオペレーターの並列度を 10 に手動で設定する SQL 文の例です。
SELECT * FROM t /*+ OPTIONS('scan.parallelism' = '10') */;Paimon テーブルのディメンションテーブルとしての使用
Paimon テーブルはディメンションテーブルとしても使用できます。ディメンションテーブル JOIN の構文の詳細については、「ディメンションテーブル JOIN 文」をご参照ください。
半構造化データ型 VARIANT の書き込みと消費
Ververica Runtime (VVR) 11.1 以降では、Paimon テーブルに半構造化データ型 VARIANT が導入されました。この型を使用すると、PARSE_JSON または TRY_PARSE_JSON を使用して VARCHAR 型の JSON 文字列を VARIANT データ型に変換できます。VARIANT 型のデータを直接書き込んで消費することで、JSON データのクエリと処理のパフォーマンスが大幅に向上します。
以下は SQL 文のサンプルです。
CREATE TABLE `my-catalog`.`my_db`.`my_tbl` (
k BIGINT,
info VARIANT
);
INSERT INTO `my-catalog`.`my_db`.`my_tbl`
SELECT k, PARSE_JSON(jsonStr) FROM T;関連ドキュメント
Paimon テーブルにデータを書き込む、または Paimon テーブルからデータを消費する際に、SQL ヒントを使用してテーブルパラメーターを一時的に変更できます。詳細については、「Paimon テーブルの管理」をご参照ください。
Paimon プライマリキーテーブルと追加専用テーブルの基本的な属性と特徴の詳細については、「Paimon プライマリキーテーブルと追加専用テーブル」をご参照ください。
さまざまなシナリオにおける Paimon プライマリキーテーブルと追加専用テーブルの一般的な最適化の詳細については、「Paimon のパフォーマンス最適化」をご参照ください。
Paimon テーブルの消費はスナップショットに依存します。スナップショットの有効期限が短すぎるか、消費ジョブの効率が低い場合、消費中のスナップショットが有効期限切れで削除されることがあります。これにより、消費ジョブで
File xxx not found, Possible causesエラーが報告されます。この問題の解決方法については、「Paimon テーブルからデータを読み取るジョブで「File xxx not found, Possible causes」エラーが報告される」をご参照ください。