このトピックでは、Realtime Compute for Apache Flink の開発コンソールを使用して、Apache Paimon(Paimon)テーブルで挿入、更新、上書き、および削除操作を実行する方法について説明します。また、特定のオフセットに基づいて Paimon テーブルからデータを使用する方法についても説明します。
前提条件
Paimon カタログと Paimon テーブルが作成されていること。詳細については、「Apache Paimon カタログの管理」をご参照ください。
制限事項
Ververica Runtime(VVR) 8.0.5 以降を使用する Realtime Compute for Apache Flink のみ、Paimon テーブルをサポートしています。
Paimon テーブルへのデータの書き込み
CTAS または CDAS ステートメントを使用してデータとスキーマの変更を同期する
CREATE TABLE AS(CTAS)ステートメントまたは CREATE DATABASE AS(CDAS)ステートメントの使用方法については、「Apache Paimon カタログの管理」をご参照ください。
INSERT INTO ステートメントを使用してデータを挿入または更新する
プライマリキーテーブルは、すべての種類の変更(INSERT、UPDATE_BEFORE、UPDATE_AFTER、および DELETE)を入力としてサポートしています。書き込まれた特定のデータレコードが同じプライマリキーを持っている場合、マージエンジンの設定に基づいてマージされます。
追加専用テーブル(プライマリキーなし)は、INSERT タイプの変更のみを入力としてサポートしています。
INSERT OVERWRITE ステートメントを使用してデータを上書きする
上書きとは、古いデータを新しいデータで置き換えるプロセスです。古いデータは削除され、アクセスできなくなります。 INSERT OVERWRITE ステートメントを使用して、Paimon テーブルを部分的または完全に上書きできます。次の例では、my_table という名前のテーブルを使用します。
INSERT OVERWRITE ステートメントは、バッチデプロイメントにのみ適用されます。
デフォルトでは、INSERT OVERWRITE ステートメントによる変更は、ストリーミングモードのダウンストリームオペレーターでは使用できません。ストリーミングモードで変更を使用する方法については、「INSERT OVERWRITE ステートメントの結果の使用」をご参照ください。
パーティション化されていないテーブルのすべてのデータを上書きします。
INSERT OVERWRITE my_table SELECT ...;テーブルのパーティション(
dt=20240108,hh=06パーティションなど)を上書きします。INSERT OVERWRITE my_table PARTITION (`dt` = '20240108', `hh` = '06') SELECT ...;テーブルの特定のパーティションを上書きします。上書きするパーティションを SELECT ステートメントで指定します。
INSERT OVERWRITE my_table SELECT ...;パーティション化されたテーブルのすべてのデータを上書きします。
INSERT OVERWRITE my_table /*+ OPTIONS('dynamic-partition-overwrite' = 'false') */ SELECT ...;
DELETE ステートメントを使用してデータを削除する
DELETE ステートメントを使用して、プライマリキーテーブルからデータを削除できます。 DELETE ステートメントは、スクリプトで実行する必要があります。
-- 通貨フィールドが 'UNKNOWN' であるすべてのデータレコードを my_table テーブルから削除します。
DELETE FROM my_table WHERE currency = 'UNKNOWN';DELETE 変更を無視する
プライマリキーテーブルが DELETE 変更を受信した場合、対応するデータはデフォルトで削除されます。 DELETE 変更を無視してデータの削除を防ぐには、SQL ヒントを使用して ignore-delete パラメーターを true に設定します。
パラメーター | 説明 | データ型 | デフォルト値 |
ignore-delete | アップストリームからの DELETE 変更を無視するかどうかを指定します。 | ブール値 | false |
シンクの並列度を調整する
SQL ヒントを使用して、シンクオペレーターの並列度を手動で変更できます。次の表にパラメーターを示します。
パラメーター | 説明 | データ型 | デフォルト値 |
sink.parallelism | シンクオペレーターの並列度。 | 整数 | 該当なし |
次の例では、シンクオペレーターの並列度を 10 に設定しています。
INSERT INTO t /*+ OPTIONS('sink.parallelism' = '10') */ SELECT * FROM s;Paimon テーブルからデータを使用する
ストリーミングデプロイメントを使用する
ストリーミングデプロイメントを使用してプライマリキーテーブルからデータを使用する前に、changelog プロデューサーの設定を完了してください。
デフォルトでは、ソースオペレーターは、デプロイメントの開始時に Paimon テーブルのすべてのデータを生成し、Paimon テーブルの後続の増分データを生成し続けます。
コンシューマーオフセットを設定する
特定のオフセットに基づいて Paimon テーブルからデータを使用するには、次の方法を使用します。
デプロイメントの開始時刻以降の増分データのみを読み取る場合は、SQL ヒントを使用して
'scan.mode' = 'latest'を指定します。SELECT * FROM t /*+ OPTIONS('scan.mode' = 'latest') */;特定の時点以降の増分データのみを読み取る場合は、SQL ヒントを使用して
scan.timestamp-millisパラメーターを指定します。 このパラメーターは、エポックタイム UTC 1970-01-01 00:00:00 から経過したミリ秒数を指定します。SELECT * FROM t /*+ OPTIONS('scan.timestamp-millis' = '1678883047356') */;特定の時点から書き込まれたすべてのデータと後続の増分データを使用する場合は、次の方法を使用します。
説明Paimon はダウンストリームの使用のために小さなファイルを圧縮するため、ソースオペレーターが読み取るデータファイルには、指定された時点より前に書き込まれた少量のデータが含まれている場合があります。 SQL ステートメントで WHERE 条件を使用して、ビジネス要件に基づいてデータをフィルタリングできます。
GUI でデプロイメントを開始するときに、ソースの開始時刻を指定します
パラメーターを設定します。 SQL ヒントを使用して
scan.file-creation-time-millisパラメーターを指定します。SELECT * FROM t /*+ OPTIONS('scan.file-creation-time-millis' = '1678883047356') */;
特定のスナップショットファイルから始まる増分データのみを読み取る場合は、SQL ヒントを使用して
scan.snapshot-idパラメーターを指定します。 このパラメーターは、使用するスナップショットファイルの ID を指定します。SELECT * FROM t /*+ OPTIONS('scan.snapshot-id' = '3') */;特定のスナップショットファイルに含まれるすべてのデータと後続の増分データを読み取る場合は、SQL ヒントを使用して
'scan.mode' = 'from-snapshot-full'構成を含め、scan.snapshot-idパラメーターを指定します。scan.snapshot-idパラメーターは、使用するスナップショットファイルの ID を指定します。SELECT * FROM t /*+ OPTIONS('scan.mode' = 'from-snapshot-full', 'scan.snapshot-id' = '1') */;
コンシューマー ID を指定する
コンシューマー ID は、Paimon テーブルのコンシューマーオフセットを記録するために使用されます。
SQL ステートメントの計算ロジックを変更すると、デプロイメントトポロジが変更され、Realtime Compute for Apache Flink によって保存された状態からコンシューマーオフセットを復元できない場合があります。この場合、コンシューマー ID を指定して、コンシューマーオフセットを Paimon テーブルのメタデータファイルに保存できます。これにより、デプロイメントがステートレスに再起動された場合でも、コンシューマーは以前のオフセットから再開できます。
期限切れのスナップショットファイルが使用される前に削除されると、エラーが発生します。コンシューマー ID を指定すると、使用されていない期限切れのスナップショットファイルの削除を防ぐことができます。
ソースオペレーターのコンシューマ ID を指定するには、consumer-id パラメーターを文字列に設定します。 コンシューマ ID を初めて指定すると、コンシューマオフセットを構成する で説明されているように、コンシューマオフセットが決定されます。 後続の操作で同じコンシューマ ID を指定すると、コンシューマは Paimon テーブルの保存されたコンシューマオフセットから再開できます。
たとえば、次の SQL ステートメントを使用して、ソースオペレーターに test-id という名前のコンシューマ ID を指定できます。 コンシューマ ID のコンシューマオフセットをリセットする場合は、'consumer.ignore-progress' = 'true' を指定します。
SELECT * FROM t /*+ OPTIONS('consumer-id' = 'test-id') */; // test-id をコンシューマー ID として使用してクエリを実行します。コンシューマ ID を使用すると、スナップショットファイルと対応する既存のデータファイルは期限切れ後も削除されません。 これにより、ストレージスペースが無駄になる可能性があります。 前述の問題を解決するには、consumer.expiration-time パラメーターを指定して、非アクティブなコンシューマ ID を削除します。 たとえば、'consumer.expiration-time' = '3d' は、3 日連続で使用されていないコンシューマ ID を削除することを指定します。
INSERT OVERWRITE ステートメントの結果の使用
デフォルトでは、INSERT OVERWRITE ステートメントによる変更は、ストリーミングモードのダウンストリームオペレーターでは使用できません。 変更を使用する場合は、SQL ヒントを使用して 'streaming-read-overwrite' = 'true' を指定します。
SELECT * FROM t /*+ OPTIONS('streaming-read-overwrite' = 'true') */;バッチデプロイを使用する
既定では、バッチデプロイのソース演算子は、Paimon テーブルの最新の スナップショット ファイルを読み取り、最新のステータス データを生成します。
バッチタイムトラベル
特定の時点での Paimon テーブルの状態をクエリするには、SQL ヒントを使用して scan.timestamp-millis パラメーターを指定します。 このパラメーターは、エポックタイム UTC 1970-01-01 00:00:00 から経過したミリ秒数を指定します。
SELECT * FROM t /*+ OPTIONS('scan.timestamp-millis' = '1678883047356') */;特定のスナップショットファイルが生成されたときの Paimon テーブルの状態をクエリするには、SQL ヒントを使用して scan.snapshot-id パラメーターを指定します。 このパラメーターは、使用するスナップショットファイルの ID を指定します。
SELECT * FROM t /*+ OPTIONS('scan.snapshot-id' = '3') */;2 つのスナップショット間の増分変更をクエリする
2 つの特定のスナップショット間の Paimon テーブルへの増分変更をクエリするには、SQL ヒントを使用して incremental-between パラメーターを指定します。 たとえば、次の SQL ステートメントを実行して、スナップショット 20 とスナップショット 12 の間の変更されたデータをクエリできます。
SELECT * FROM t /*+ OPTIONS('incremental-between' = '12,20') */;デフォルトでは、バッチデプロイメントでは DELETE 変更は破棄されます。 バッチデプロイメントで DELETE 変更を使用する場合は、Paimon によって提供される 監査ログテーブル をクエリします。 例: SELECT * FROM 't$audit_log ' /*+ OPTIONS('incremental-between' = '12,20') */;。
ソース並列度を調整する
デフォルトでは、Paimon はパーティション数やバケット数などの情報に基づいてソース演算子の並列度を自動的に導出します。SQL ヒントを使用して、ソース演算子の並列度を手動で変更できます。
パラメーター | データ型 | デフォルト値 | 備考 |
scan.parallelism | Integer | N/A | ソースオペレーターの並列度。 |
scan.infer-parallelism | Boolean | true | ソースオペレーターの並列度を自動的に導出するかどうかを指定します。 |
scan.infer-parallelism.max | Integer | 1024 | ソースオペレーターによって導出される最大並列度。 |
次の例では、ソース演算子の並列度は 10 に設定されています。
SELECT * FROM t /*+ OPTIONS('scan.parallelism' = '10') */; // 並列度を 10 に設定してテーブル t から全列を選択します。ディメンションテーブルとしての機能
Paimonテーブルをディメンションテーブルとして使用できます。JOIN句の記述方法については、構文 をご参照ください。
VARIANT データの読み取りと書き込み
VVR 11.1 以降では、Paimon テーブルは半構造化 VARIANT タイプをサポートしています。 PARSE_JSON または TRY_PARSE_JSON を使用して、JSON 形式の VARCHAR データを VARIANT に変換できます。 VARIANT タイプは、JSON 形式のデータのクエリと処理のパフォーマンスを大幅に向上させます。
サンプルコード:
CREATE TABLE `my-catalog`.`my_db`.`my_tbl` (
k BIGINT,
info VARIANT
);
INSERT INTO `my-catalog`.`my_db`.`my_tbl`
SELECT k, PARSE_JSON(jsonStr) FROM T;参照資料
Paimon テーブルで書き込みまたは読み取り操作を実行する場合、SQL ヒントを使用してテーブルパラメーターを一時的に変更できます。 詳細については、「Apache Paimon テーブルの管理」をご参照ください。
プライマリキーテーブルと追加専用テーブルの機能については、「プライマリキーテーブルと追加専用テーブル」をご参照ください。
プライマリキーテーブルと追加スケーラブルテーブルを最適化する方法については、「パフォーマンスの最適化」をご参照ください。
コンシューマーは、スナップショットファイルを使用して Paimon テーブルにアクセスします。スナップショットファイルの保持期間が短いか、消費効率が低い場合、使用中のスナップショットファイルが削除される可能性があります。その結果、
File xxx not found, Possible causesエラーメッセージが表示されます。この問題の解決方法については、「デプロイメントで Apache Paimon ソーステーブルからデータを読み取るときに「File xxx not found, Possible causes」エラーメッセージが表示された場合はどうすればよいですか?」をご参照ください。