本文為您介紹如何在Realtime Compute開發控制台向Paimon表中插入、更新、覆寫或刪除資料,以及從Paimon表消費資料,並指定消費位點。
前提條件
已建立Paimon Catalog和Paimon表,詳情請參見管理Paimon Catalog。
使用限制
僅Realtime Compute引擎VVR 8.0.5及以上版本支援Paimon表。
向Paimon表寫入資料
通過CTAS/CDAS語句同步資料及表結構變更
詳情請參見管理Paimon Catalog。
通過INSERT INTO語句插入或更新資料
您可以通過INSERT INTO語句,直接向Paimon表插入或更新資料。
Paimon主鍵表可以接受所有類型(INSERT、UPDATE_BEFORE、UPDATE_AFTER、DELETE)的訊息,相同主鍵的資料在寫入後會根據資料合併機制進行合并。
Paimon Append Only表(非主鍵表)只能接受INSERT類型的訊息。
通過INSERT OVERWRITE語句覆寫資料
覆寫是指清空並重新寫入資料。您可以通過INSERT OVERWRITE語句覆寫整張Paimon表或覆寫特定分區,SQL語句樣本如下。
僅批作業支援INSERT OVERWRITE語句。
預設情況下,INSERT OVERWRITE操作不會產生變更資料,刪除與匯入的資料無法被下遊流式消費。如果您需要消費此類資料,請參見流式消費INSERT OVERWRITE語句的結果。
my_table表是非分區表,覆寫整張my_table表。
INSERT OVERWRITE my_table SELECT ...;my_table表是分區表,覆寫my_table表中的
dt=20240108,hh=06分區。INSERT OVERWRITE my_table PARTITION (`dt` = '20240108', `hh` = '06') SELECT ...;my_table表是分區表,動態覆寫my_table表中的分區,即SELECT語句結果中出現的分區都會被覆寫,其它分區保持不變。
INSERT OVERWRITE my_table SELECT ...;my_table表是分區表,覆寫整張my_table表。
INSERT OVERWRITE my_table /*+ OPTIONS('dynamic-partition-overwrite' = 'false') */ SELECT ...;
通過DELETE語句刪除資料
您可以通過DELETE語句從Paimon主鍵表中刪除資料。DELETE語句只能在資料查詢中執行。
--從my_table表中刪除所有currency = 'UNKNOWN'的資料。
DELETE FROM my_table WHERE currency = 'UNKNOWN';過濾刪除訊息
使用Paimon主鍵表時,預設情況下,類型為DELETE的訊息會將Paimon表中對應主鍵的資料刪除。如果您不希望Paimon表處理此類訊息,可以通過SQL hint將以下參數設定為true,過濾刪除訊息。
參數 | 說明 | 資料類型 | 預設值 |
ignore-delete | 是否過濾刪除訊息。 | Boolean | false |
調整結果表的並發數
您可以通過SQL hint設定以下參數,手動調整結果表運算元的並發數。
參數 | 說明 | 資料類型 | 預設值 |
sink.parallelism | 手動設定Paimon結果表運算元的並發數。 | Integer | 無 |
例如,以下SQL將手動設定Paimon結果表運算元並發數為10。
INSERT INTO t /*+ OPTIONS('sink.parallelism' = '10') */ SELECT * FROM s;從Paimon表消費資料
通過流作業消費Paimon表
通過流作業消費的Paimon主鍵表需要設定變更資料產生機制。
預設情況下,流作業中的Paimon源表運算元將首先產出作業啟動時刻Paimon表中的全量資料,之後產出從作業啟動時刻開始Paimon表中的增量資料。
從指錨點消費Paimon表
您可以通過以下方式從指錨點消費Paimon表:
如果您不需要消費作業啟動時刻Paimon表中的全量資料,只需要消費後續的增量資料,可通過SQL Hint設定
'scan.mode' = 'latest'。SELECT * FROM t /*+ OPTIONS('scan.mode' = 'latest') */;如果您不想要消費全量資料,只想消費從指定時間點開始的增量資料,可通過SQL Hint設定
scan.timestamp-millis參數。參數值表示從Unix Epoch(1970-01-01 00:00:00 UTC)開始到指定時間點經過的毫秒數。SELECT * FROM t /*+ OPTIONS('scan.timestamp-millis' = '1678883047356') */;如果您想要消費從指定時間點之後寫入的全量資料,並持續消費後續的增量資料,可以從以下兩種操作中選擇一種。
說明此類消費方式將讀取在指定時間點之後修改的資料檔案。由於小檔案合并,資料檔案中可能包含少量在指定時間點之前寫入的資料。您可以根據業務需求,在SQL作業中添加WHERE 過濾條件對資料進行過濾。
不設定任何SQL Hint,在啟動作業時,選擇指定源表開始時間並指定具體的時間資訊。

通過SQL Hint設定
scan.file-creation-time-millis參數。SELECT * FROM t /*+ OPTIONS('scan.file-creation-time-millis' = '1678883047356') */;
如果您不想要消費全量資料,只想消費從特定快照檔案開始的增量資料,可通過SQL Hint設定
scan.snapshot-id參數,參數值是指定快照檔案的編號。SELECT * FROM t /*+ OPTIONS('scan.snapshot-id' = '3') */;如果您想要消費特定快照檔案的全量資料,並持續消費後續的增量資料,可通過SQL hint設定
'scan.mode' = 'from-snapshot-full'和scan.snapshot-id參數,scan.snapshot-id參數值是指定快照檔案的編號。SELECT * FROM t /*+ OPTIONS('scan.mode' = 'from-snapshot-full', 'scan.snapshot-id' = '1') */;
指定Consumer ID
Consumer ID可以儲存Paimon表的消費進度,主要用於以下情境:
如果您修改了SQL作業的計算邏輯,可能會導致作業拓撲發生變化,無法從Flink狀態中恢複消費進度。設定Consumer ID可以將此ID對應的消費進度儲存在Paimon表的中繼資料檔案中,即使後續無狀態啟動作業,也能從中斷的位點繼續消費Paimon表。
設定Consumer ID後,未被消費過的快照檔案不會因到期而被刪除,可以防止因消費速度跟不上快照到期速度導致的報錯。
通過設定consumer-id參數,您可以給流作業中的Paimon源表運算元賦予一個Consumer ID,其值可以是任意的字串。Consumer ID第一次建立時,它的起始消費位點根據從指錨點消費Paimon表中的規則確定。後續只要繼續使用相同的Consumer ID,即可恢複Paimon表的消費進度。
例如,為Paimon源表運算元設定名為test-id的Consumer ID的SQL語句樣本如下。如果您想要重設某個Consumer ID對應的消費位點,可以額外設定'consumer.ignore-progress' = 'true'。
SELECT * FROM t /*+ OPTIONS('consumer-id' = 'test-id') */;由於未被Consumer ID消費過的快照檔案不會因到期而被刪除,如果不及時清理廢棄的Consumer ID,快照檔案及其對應的歷史資料檔案將永遠不會被刪除,會佔用儲存空間。您可以設定consumer.expiration-time表參數,將超過規定時間不使用的Cconsumer ID清理掉。例如,'consumer.expiration-time' = '3d'表示將3天未使用的Consumer ID清理掉。
流式消費INSERT OVERWRITE語句的結果
預設情況下,INSERT OVERWRITE操作不會產生變更資料,刪除與匯入的資料無法被下遊流式消費。如果您需要消費此類資料,可以在流式消費作業中通過SQL Hint配置'streaming-read-overwrite' = 'true'。
SELECT * FROM t /*+ OPTIONS('streaming-read-overwrite' = 'true') */;通過批作業消費Paimon表
預設情況下,批作業中的Paimon源表運算元將讀取最新的快照檔案,輸出Paimon表的最新狀態資料。
Batch Time Travel
通過SQL Hint設定scan.timestamp-millis參數,即可查詢Paimon表在該時間點的狀態。參數值表示從Unix Epoch(1970-01-01 00:00:00 UTC)開始到指定時間點經過的毫秒數。
SELECT * FROM t /*+ OPTIONS('scan.timestamp-millis' = '1678883047356') */;通過SQL Hint設定scan.snapshot-id參數,即可查詢Paimon表在該快照檔案產生時的狀態。參數值為指定快照檔案的編號。
SELECT * FROM t /*+ OPTIONS('scan.snapshot-id' = '3') */;查詢兩次快照之間的資料變化
如果您想要查詢兩次快照間Paimon表中資料發生的變化,可以通過SQL Hint設定incremental-between參數。例如,查看20號快照檔案和12號快照檔案間發生變化的所有資料,SQL語句樣本如下。
SELECT * FROM t /*+ OPTIONS('incremental-between' = '12,20') */;由於批作業不支援消費Delete類型的訊息,預設情況下此類訊息將會被丟棄。如果您想要在批作業中消費Delete類型的訊息,請查詢Audit Log系統資料表。例如SELECT * FROM `t$audit_log ` /*+ OPTIONS('incremental-between' = '12,20') */;。
調整源表的並發數
預設情況下,Paimon根據分區數以及分桶數等資訊自動推斷源表運算元的並發數。您可以通過SQL Hint設定以下參數,手動調整源表運算元的並發數。
參數 | 資料類型 | 預設值 | 備忘 |
scan.parallelism | Integer | 無 | 手動設定Paimon源表運算元的並發數。 |
scan.infer-parallelism | Boolean | true | 是否自動推斷Paimon源表運算元的並發數。 |
scan.infer-parallelism.max | Integer | 1024 | Paimon源表運算元自動推斷出的並發數上限。 |
手動設定Paimon源表運算元並發數為10的SQL語句樣本如下。
SELECT * FROM t /*+ OPTIONS('scan.parallelism' = '10') */;使用Paimon維表
Paimon也可以作為維表使用。關於維表JOIN的文法,詳情請參見維表JOIN語句。
寫入和消費半結構化資料類型VARIANT
在Realtime Compute引擎VVR 11.1及以上版本,Paimon表引入半結構化資料類型 VARIANT,支援通過PARSE_JSON/TRY_PARSE_JSON將VARCHAR類型的JSON字串轉換為VARIANT資料類型。直接寫入和消費VARIANT類型的資料將顯著提升JSON資料的查詢和處理效能。
樣本SQL如下:
CREATE TABLE `my-catalog`.`my_db`.`my_tbl` (
k BIGINT,
info VARIANT
);
INSERT INTO `my-catalog`.`my_db`.`my_tbl`
SELECT k, PARSE_JSON(jsonStr) FROM T;相關文檔
Paimon表資料寫入和消費時,支援使用SQL Hint臨時修改表參數,詳情請參見管理Paimon表。
Paimon主鍵表和Append表的基本特性與功能,詳情請參見Paimon主鍵表和Append Only表。
不同情境下Paimon主鍵表和Append Scalable表的常用最佳化,詳情請參見Paimon效能最佳化。
Paimon表的消費依賴快照檔案,快照到期時間太短或消費作業效率低會導致正在消費的快照檔案因到期被刪除,消費作業出現
File xxx not found, Possible causes報錯,解決方案請參見讀Paimon作業出現File xxx not found, Possible causes。