全部產品
Search
文件中心

Data Lake Formation:Paimon主鍵表:配置與特性

更新時間:May 15, 2026

本文介紹 Paimon 主鍵表的核心配置,包括分桶方式、資料合併機制(merge-engine)、變更資料產生機制(changelog-producer)以及 DLF 儲存最佳化模式,協助您根據業務情境選擇合適的配置。

配置速查

類別

情境

配置建議

分桶方式

  • 可接受日常 1 ~ 3 分鐘,偶爾 5 ~ 10 分鐘延時。

  • 不同分區資料量差距較大。

  • 不想關心分桶數的計算。

延遲分桶(建表不指定bucket,預設)

需要流作業寫入的資料,commit 後馬上可見。

同時設定:

  • 固定分桶('bucket' = '<num>'

  • 'deletion-vectors.enabled' = 'false'(預設)

主鍵無法完全包含分區鍵,且表大小 < 100GB。

動態分桶('bucket' = '-1'

Deletion Vectors

需要使用 StarRocks 或 Hologres 等引擎進行高效能查詢。

'deletion-vectors.enabled' = 'true'

資料合併機制

新資料完全覆蓋老資料。

'merge-engine' = 'deduplicate'(預設)

  • 每條資料只更新部分列。

  • 需要進行多流打寬。

'merge-engine' = 'partial-update'

需要進行資料的彙總。

'merge-engine' = 'aggregation'

每個主鍵只想保留第一條資料。

'merge-engine' = 'first-row'

變更資料產生機制

下遊不進行流消費。

'changelog-producer' = 'none'(預設)

設定了 'merge-engine' = 'deduplicate'(預設),下遊不關心完整變更資料,只關心最新狀態。

  • 'changelog-producer' = 'none'(預設)

  • 'scan.remove-normalize' = 'true'

設定了 'merge-engine' = 'partial-update''merge-engine' = 'aggregation',下遊不關心完整變更資料,不需要每行完整的最新狀態,只關心變化情況。

'changelog-producer' = 'none'(預設)

上遊流入的是完整的變更資料(例如資料庫 binlog),設定了 'merge-engine' = 'deduplicate',下遊需要完整的變更資料。

'changelog-producer' = 'input'

其它下遊需要完整的變更資料的情況。

'changelog-producer' = 'lookup'

儲存最佳化模式

可接受日常 1 ~ 3 分鐘,偶爾 5 ~ 10 分鐘延時。

資源延時均衡(預設)

願意使用更多資源,以減少資源調整帶來的延時。

延時優先

可接受 30 分鐘左右延時,希望節省更多資源。

資源優先

建立主鍵表

如果在建立 Paimon 表時指定了主鍵(primary key),則該表就是 Paimon 主鍵表。例如,以下 SQL 陳述式將建立一張分區鍵為 dt,主鍵為 dt、shop_id 和 user_id 的主鍵表。

Flink SQL

CREATE TABLE T (
  dt STRING,
  shop_id BIGINT,
  user_id BIGINT,
  num_orders INT,
  total_amount INT,
  PRIMARY KEY (dt, shop_id, user_id) NOT ENFORCED
) PARTITIONED BY (dt);

Spark SQL

CREATE TABLE T (
  dt STRING,
  shop_id BIGINT,
  user_id BIGINT,
  num_orders INT,
  total_amount INT
) PARTITIONED BY (dt) TBLPROPERTIES (
  'primary-key' = 'dt,shop_id,user_id'
);

主鍵表中,每行資料的主鍵各不相同。如果將多條具有相同主鍵的資料寫入主鍵表,將會根據資料合併機制對這些資料進行合并。

下面將逐一介紹分桶方式、Deletion Vectors、資料合併機制與變更資料產生機制等Paimon主鍵表核心特性的具體配置與使用方式。

分桶方式

分桶(bucket)是Paimon主鍵表讀寫操作的最小單元。非分區表的所有資料,以及分區表每個分區的資料,都會被進一步劃分到不同的分桶中,以便同一作業使用多個並發同時讀寫 Paimon 表,加快讀寫效率。

延遲分桶(預設)

建立 Paimon 主鍵表時,不在表參數中指定 bucket,或指定 'bucket' = '-2',將會建立延遲分桶的主鍵表。

資料寫入延遲分桶表時,將首先進入對應分區的 bucket-postpone 臨時目錄中。隨後,DLF 最佳化作業決定各分區的分桶數,將資料寫入目標分桶目錄,並進行小檔案合并。

延遲分桶作為 DLF 的特色功能,具備如下優勢:

  • 自動確定分桶數:根據資料量、流量等指標自動計算,使用者無需手動設定。

  • 動態調整:當資料量等指標發生較大變化時,自動調整分桶數,無需人工幹預。

  • 分區級分桶:不同分區可使用不同的分桶數,適配分區大小不一的情況。

  • 並發寫入:多個作業可同時寫入而不產生衝突。

分桶數計算邏輯

以下列出 DLF 最佳化作業確定分桶數的公式,協助您估算相關分區的分桶數。

# 單條資料大小係數用於防止壓縮率過高,如資料中有很多空值的情況
單條資料大小係數 = min(1, 資料總條數 / 總檔案大小 / 表列數)

檔案大小相關分桶數 = max(
    分區檔案總大小 / (1.3 * 每分桶檔案大小),
    最大分桶檔案總大小 / (1.7 * 每分桶檔案大小)
)

# 最終分桶數將上取整至最近 2 的冪次,且最大值為 2048
最終分桶數 = max(
    檔案大小相關分桶數,
    分區資料總條數 / 每分桶資料條數,
    最大分桶資料總條數 / (1.7 * 每分桶資料條數),
    min(流量 / 每分桶流量, 4 * 檔案大小相關分桶數, 128)
)

每分桶檔案大小 = 768MB

每分桶流量 = 1536MB/h

######################################################################
# 未啟用 deletion vectors
# 未設定 'changelog-producer' = 'lookup'
######################################################################

每分桶資料條數 = +∞

######################################################################
# 其它情況
######################################################################

每分桶資料條數 = 40_000_000

當滿足以下條件時,DLF將會調整分桶數:

  • 分區資料總大小 > 2 * 每分桶檔案大小 * 分桶數

  • 某一分桶資料大小 > 2 * 每分桶檔案大小

  • 分區當前分桶數 < 0.125 * 目標分桶數,且分區大小不超過 64GB

資料的分發

預設情況下,DLF將根據每條資料主鍵的雜湊值,確定該資料屬於哪個分桶。

修改分發方式

如需修改資料的分發方式,可在建立 Paimon 主鍵表時指定 bucket-key 參數。例如,如果設定了 'bucket-key' = 'c1,c2',將根據每條資料 c1c2 兩列的值,確定該資料屬於哪個分桶。

說明
  • 不同列的名稱用英文逗號分隔。

  • 主鍵必須完整包含 bucket-key

  • bucket-key 列的取值應盡量均勻分布,避免資料集中在少數分桶中導致讀寫效能下降。

資料可見度

延遲分桶表的資料可見度取決於寫入方式。滿足以下條件時,DLF 將啟用直寫固定分桶最佳化(即跳過 bucket-postpone 臨時目錄,將資料直接寫入固定分桶),資料寫入並 commit 後即刻可見:

  • 主鍵表未啟用 deletion vectors,且通過 Flink 批作業或 Spark 作業執行 INSERT INTOINSERT OVERWRITE 操作。

  • 主鍵表已啟用 deletion vectors,且通過 Flink 批作業或 Spark 作業執行 INSERT OVERWRITE 操作。

其它情況下,資料需等待 DLF 處理後可見,通常在 1 ~ 3 分鐘內。若 DLF 進行中資源或分桶數調整,可能延長至 5 ~ 10 分鐘。可通過選擇特定的儲存最佳化模式減少調整頻率,詳見儲存最佳化模式

說明

關於直寫固定分桶最佳化:

  • 控制參數'postpone.batch-write-fixed-bucket',Boolean類型,取值為true(啟用)或false(不啟用)。

  • 支援引擎版本:Realtime Compute Flink 版 VVR 11.4 及以上、EMR Serverless Spark esr-4.7.0 及以上支援啟用直寫固定分桶最佳化。

  • 新分區寫入分桶數:啟用直寫固定分桶最佳化時,若寫入新分區,則新分區的分桶數將自動化佈建為 Flink 作業的並發數或 Spark 作業的 partition 數,上限值為 2048。分桶數後續可能被 DLF 最佳化作業調整。

  • 已有分區寫入衝突:啟用直寫固定分桶最佳化時,若寫入已存在的分區,且在寫入過程中 DLF 調整了該分區的分桶數,寫入作業將衝突失敗。可設定 'postpone.batch-write-fixed-bucket' = 'false' 關閉此最佳化,防止衝突的產生。

  • Deletion vectors 表:啟用 deletion vectors 的表,通過 Flink 批作業或 Spark 作業進行 INSERT INTO 操作時,建議關閉直寫固定分桶最佳化。

資料一致性

  • 同一作業寫入:對於同一使用者作業寫入的資料,DLF 最佳化作業將按寫入順序處理資料,即保證 sequential 層級的一致性。

    說明

    從 checkpoint 或 savepoint 重啟,且並發數不變的作業,也認為與重啟前是同一使用者作業。

  • 不同作業寫入:對於不同使用者作業寫入的資料,Paimon 表的最終狀態可能是兩個作業結果資料的混合,但不會有資料丟失,即保證 snapshot isolation 層級的一致性。如果對資料的合并順序有需求,可以設定 sequence field 以指定資料的合并順序,詳見處理亂序資料

固定分桶

建立 Paimon 主鍵表時,在表參數中指定 'bucket' = '<num>',即可指定非分區表的分桶數為 <num>,或分區表單個分區的分桶數為 <num>。其中,<num> 是一個大於 0 的整數。

說明

分桶數過少會限制作業實際並發數,同時導致單個分桶資料量過大,降低讀寫效能,因此分桶數不宜太小。然而,分桶數過大會造成小檔案數量過多。建議每個分桶的資料大小在 1 GB 左右。

資料的分發

與延遲分桶一致,詳見資料的分發

資料可見度

固定分桶表的資料可見度與 deletion vectors 有關:

  • 未啟用 deletion vectors 的表,資料寫入並 commit 之後馬上可見。

  • 啟用 deletion vectors 的表,寫入的資料需要等待 DLF 最佳化作業處理後才能讀取。一般情況下,資料將在 1 ~ 3 分鐘後可見。

動態分桶

建立 Paimon 主鍵表時,在表參數中指定 'bucket' = '-1',將會建立動態分桶的 Paimon 表。

說明

使用限制:

  • 動態分桶表不支援多個作業同時寫入。

  • 對於 100 GB 以上的表,動態分桶效能損耗較大,不建議使用。

資料的分發

動態分桶表會先將資料寫入已有的分桶中,當分桶的資料量超過限制時,再自動建立新的分桶。以下表參數將會影響動態分桶的行為:

參數

資料類型

預設值

備忘

dynamic-bucket.target-row-num

Long

2000000

每個分桶最多儲存幾條資料。

dynamic-bucket.initial-buckets

Integer

-

初始的分桶數。如果不設定,初始將建立等同於 writer 運算元並發數的分桶。

動態分桶表的更新行為和資源消耗取決於主鍵與分區鍵的關係

  • 主鍵完全包含分區鍵 → 非跨分區更新(僅消耗堆記憶體)

  • 主鍵不完全包含分區鍵 → 跨分區更新(需要 RocksDB,效能開銷更大)

非跨分區更新

對於主鍵完全包含分區鍵的動態分桶表,Paimon 可以確定該主鍵屬於哪個分區,但無法確定屬於哪個分桶,因此需要使用額外的堆記憶體建立索引,以維護主鍵與分桶編號的映射關係。具體來說,每 1 億條主鍵將額外消耗 1GB 的堆記憶體。只有當前正在寫入的分區才會消耗堆記憶體,歷史分區中的主鍵不會消耗堆記憶體。

除堆記憶體的消耗外,相比其它分桶方式,主鍵完全包含分區鍵的動態分桶表不會有明顯的效能損失。

跨分區更新

對於主鍵不完全包含分區鍵的動態分桶表,Paimon 無法根據主鍵確定該資料屬於哪個分區的哪個分桶,因此需要使用 rocksdb 維護主鍵與分區以及分桶編號的映射關係。相比固定分桶而言,資料量較大的表可能產生明顯的效能損失。另外,因為作業啟動時需要將映射關係全量載入至 rocksdb 中,作業的啟動速度也會變慢。

此外,資料合併機制也影響動態分桶下的跨分區更新行為。具體來說:

  • deduplicate:資料將會從老分區刪除,並插入新分區。

  • aggregationpartial-update:資料將會直接在老分區中更新,無視新資料的分區鍵。

  • first-row:如果相同主鍵的資料已經存在,則新資料將被直接丟棄。

資料可見度

與固定分桶表一致,詳見資料可見度

Deletion Vectors

建立 Paimon 表時,在表參數中設定 'deletion-vectors.enabled' = 'true',即可啟用 Deletion Vectors。

在小檔案合并的過程中額外產生 Deletion Vectors,可以提升 Paimon 主鍵表的查詢效能,適用於查詢效能敏感,或讀多寫少的情境。

說明

資料合併機制

如果將多條具有相同主鍵的資料寫入 Paimon 主鍵表,Paimon 將會根據表參數中設定的 merge-engine 參數對這些資料進行合并。該參數的取值有 deduplicate(預設值)、partial-updateaggregationfirst-row

處理亂序資料

預設情況下,Paimon 會按照資料的輸入順序確定合并的順序,最後寫入 Paimon 的資料會被認為是最新資料。如果輸入資料流存在亂序資料,也可以通過在表參數中指定 'sequence.field' = '<column-name>',具有相同主鍵的資料將按 <column-name> 這一列的值從小到大進行合并。

可以作為 sequence field 的資料類型有:TINYINT、SMALLINT、INTEGER、BIGINT、TIMESTAMP、TIMESTAMP_LTZ。

deduplicate(預設)

設定 'merge-engine' = 'deduplicate' 後,對於多條具有相同主鍵的資料,Paimon 主鍵表僅會保留最新一條資料,並丟棄其它具有相同主鍵的資料。如果最新資料是一條 delete 訊息,所有具有該主鍵的資料都會被丟棄。

考慮以下建立 Paimon 表的 Flink SQL 陳述式:

CREATE TABLE T (
  k INT,
  v1 DOUBLE,
  v2 STRING,
  PRIMARY KEY (k) NOT ENFORCED
) WITH (
  'merge-engine' = 'deduplicate' -- deduplicate 是預設值,可以不設定
);

依次寫入以下資料:

  • +I(1, 2.0, 'apple')

  • +I(1, 4.0, 'banana')

  • +I(1, 8.0, 'cherry')

SELECT * FROM T WHERE k = 1 將查詢到 (1, 8.0, 'cherry') 這條資料。

依次寫入以下資料:

  • +I(1, 2.0, 'apple')

  • +I(1, 4.0, 'banana')

  • -D(1, 4.0, 'banana')

SELECT * FROM T WHERE k = 1 將查不到任何資料。

aggregation

設定 'merge-engine' = 'aggregation' 後,對於多條具有相同主鍵的資料,Paimon 主鍵表將會根據您指定的彙總函式進行彙總。

對於不屬於主鍵的每一列,都需要通過 fields.<field-name>.aggregate-function 指定一個彙總函式,否則該列將預設使用 last_non_null_value 彙總函式。

例如,考慮以下建立 Paimon 表的 Flink SQL 陳述式:

CREATE TABLE T (
  product_id BIGINT,
  price DOUBLE,
  sales BIGINT,
  PRIMARY KEY (product_id) NOT ENFORCED
) WITH (
  'merge-engine' = 'aggregation',
  'fields.price.aggregate-function' = 'max',
  'fields.sales.aggregate-function' = 'sum'
);

price 列將會使用 max 函數進行彙總,而 sales 列將會使用 sum 函數進行彙總。依次寫入以下資料:

  • +I(1, 23.0, 15)

  • +I(1, 30.2, 20)

SELECT * FROM T WHERE product_id = 1 將查詢到 (1, 30.2, 35) 這條資料。

支援的彙總函式

  • sum:求和函數。支援的資料類型為 DECIMAL、TINYINT、SMALLINT、INTEGER、BIGINT、FLOAT、DOUBLE。

  • product:求乘積函數。支援的資料類型為 DECIMAL、TINYINT、SMALLINT、INTEGER、BIGINT、FLOAT、DOUBLE。

  • max:求最大值的函數。支援的資料類型為 CHAR、VARCHAR、DECIMAL、TINYINT、SMALLINT、INTEGER、BIGINT、FLOAT、DOUBLE、DATE、TIME、TIMESTAMP、TIMESTAMP_LTZ。

  • min:求最小值的函數。支援的資料類型為 CHAR、VARCHAR、DECIMAL、TINYINT、SMALLINT、INTEGER、BIGINT、FLOAT、DOUBLE、DATE、TIME、TIMESTAMP、TIMESTAMP_LTZ。

  • first_value:返回第一次輸入的值,包括 null。支援所有資料類型。

  • first_not_null_value:返回第一次輸入的非 null 值。支援所有資料類型。

  • last_value:返回最新輸入的值,包括 null。支援所有資料類型。

  • last_non_null_value(預設):返回最新輸入的非 null 值。支援所有資料類型。

  • listagg:將輸入的字串依次用英文逗號串連。例如,輸入的字串為 applebananacherry,將返回 apple,banana,cherry。支援 STRING 類型。

  • bool_and:對所有輸入的值求 and。支援 BOOLEAN 類型。

  • bool_or:對所有輸入的值求 or。支援 BOOLEAN 類型。

  • rbm32:合并多個 32 位 RoaringBitmap。支援 VARBINARY 類型。

  • rbm64:合并多個 64 位元 RoaringBitmap。支援 VARBINARY 類型。

  • nested_update:將多行彙總到一個 ARRAY 中。通過設定表參數 'fields.<field-name>.nested-key' = 'pk0,pk1,...',還可以將 ARRAY 中的資料根據 nested key 進行去重(保留最後一條)。若未設定 nested key,則每條新資料將追加在 ARRAY 末尾。

  • nested_partial_update:與 nested_update 類似,但會根據 nested key 對 ARRAY 中的資料進行部分列更新。必須設定表參數 'fields.<field-name>.nested-key' = 'pk0,pk1,...'

  • collect:將多個 ARRAY 合并成一個。通過設定表參數 'fields.<field-name>.distinct' = 'true',可以對 ARRAY 中的資料進行去重。

  • merge_map:將多個 MAP 合并成一個。相同 key 的資料將保留最後一個 value。

說明

上述彙總函式中,只有 sumproductlast_valuelast_non_null_valuenested_updatecollectmerge_map 支援回撤訊息(update_before 與 delete 訊息)。可以設定 'fields.<field-name>.ignore-retract' = 'true' 使對應列忽略回撤訊息。

彙總函式使用舉例

例如,使用nested_update彙總函式合并多個子訂單記錄:

-- 主訂單表
CREATE TABLE orders (
  order_id BIGINT PRIMARY KEY NOT ENFORCED,
  user_name STRING,
  address STRING
);

-- 子訂單表
CREATE TABLE sub_orders (
  order_id BIGINT,
  sub_order_id INT,
  product_name STRING,
  price BIGINT,
  PRIMARY KEY (order_id, sub_order_id) NOT ENFORCED
);

-- 寬表
-- 將同一主訂單的所有子訂單彙總到 sub_orders 中
-- 同時,sub_orders 中的結果將根據 sub_order_id 進行去重
CREATE TABLE order_wide (
  order_id BIGINT PRIMARY KEY NOT ENFORCED,
  user_name STRING,
  address STRING,
  sub_orders ARRAY<ROW<sub_order_id BIGINT, product_name STRING, price BIGINT>>
) WITH (
  'merge-engine' = 'aggregation',
  'fields.sub_orders.aggregate-function' = 'nested_update',
  'fields.sub_orders.nested-key' = 'sub_order_id'
);

INSERT INTO order_wide

SELECT
  order_id,
  user_name,
  address,
  CAST (NULL AS ARRAY<ROW<sub_order_id BIGINT, product_name STRING, price BIGINT>>)
FROM orders

UNION ALL

SELECT
  order_id,
  CAST (NULL AS STRING),
  CAST (NULL AS STRING),
  ARRAY[ROW(sub_order_id, product_name, price)]
FROM sub_orders;

假設主訂單表中的資料為:

  • (1, 'Alice', 'add1'),

  • (2, 'Bob', 'add2')

子訂單表中的資料為:

  • (1, 11, 'apple', 10),

  • (1, 12, 'banana', 20),

  • (2, 21, 'cherry', 30),

  • (2, 22, 'peach', 40),

  • (2, 21, 'cherry', 50)

那麼寬表的最終資料為:

  • (1, 'Alice', 'add1', [(11, 'apple', 10), (12, 'banana', 20)])

  • (2, 'Bob', 'add2', [(21, 'cherry', 50), (22, 'peach', 40)])

partial-update

設定 'merge-engine' = 'partial-update' 後,可以通過多條訊息對資料進行逐步更新,並最終得到完整的資料。具體來說,具有相同主鍵的新資料將會覆蓋原來的資料,但新資料中值為 null 的列不會覆蓋已有值。

說明

partial-update 無法處理 delete 與 update_before 訊息。設定 'ignore-delete' = 'true' 可忽略這兩類訊息。

例如,考慮以下建立 Paimon 表的 Flink SQL 陳述式:

CREATE TABLE T (
  k INT,
  v1 DOUBLE,
  v2 BIGINT,
  v3 STRING,
  PRIMARY KEY (k) NOT ENFORCED
) WITH (
  'merge-engine' = 'partial-update'
);

依次寫入以下資料:

  • +I(1, 23.0, 10, NULL)

  • +I(1, NULL, NULL, 'This is a book')

  • +I(1, 25.2, NULL, NULL)

SELECT * FROM T WHERE k = 1 將查詢到 (1, 25.2, 10, 'This is a book') 這條資料。

為不同列分別指定合并順序

在 partial-update 合并機制中,除了 sequence field 之外,您也可以通過 sequence group 為不同列分別指定合并順序。該功能可用於在打寬情境下處理亂序資料,為來自不同源表的列分別指定合并順序。

例如,考慮以下建立 Paimon 表的 Flink SQL 陳述式。

CREATE TABLE T (
  k INT,
  a STRING,
  b STRING,
  g_1 INT,
  c STRING,
  d STRING,
  g_2 INT,
  PRIMARY KEY (k) NOT ENFORCED
) WITH (
  'merge-engine' = 'partial-update',
  'fields.g_1.sequence-group' = 'a,b',
  'fields.g_2.sequence-group' = 'c,d'
);

a、b 兩列以 g_1 列的值作為合并順序依據(值越大表示資料越新),c、d 兩列以 g_2 列的值作為合并順序依據。

同時進行資料的打寬與彙總

在 partial-update 合并機制中,可以在表參數中設定 fields.<field-name>.aggregate-function,為 <field-name> 列指定彙總函式,對該列的值進行彙總。

說明
  • <field-name> 列需要屬於某個 sequence group。

  • aggregation 合并機制支援的彙總函式均可使用。

例如,考慮以下建立 Paimon 表的 Flink SQL 陳述式。

CREATE TABLE T (
  k INT,
  a STRING,
  b INT,
  g_1 INT,
  c STRING,
  d INT,
  g_2 INT,
  PRIMARY KEY (k) NOT ENFORCED
) WITH (
  'merge-engine' = 'partial-update',
  'fields.g_1.sequence-group' = 'a,b',
  'fields.b.aggregate-function' = 'max',
  'fields.g_2.sequence-group' = 'c,d',
  'fields.d.aggregate-function' = 'sum'
);

a、b 兩列以 g_1 列的值作為合并順序依據(值越大表示資料越新),其中 a 列保留最新的非 null 值(未指定,預設使用last_non_null_value彙總函式),b 列保留輸入的最大值。c、d 兩列以 g_2 列的值作為合并順序依據,其中 c 列保留最新的非 null 值,d 列求出輸入資料的和。

first-row

設定 'merge-engine' = 'first-row' 後,Paimon 只會保留相同主鍵資料中的第一條。與 deduplicate 合并機制相比,first-row 只會產生 insert 類型的變更資料,且變更資料的產出效率更高。

說明

使用限制:

  • first-row 無法處理 delete 與 update_before 訊息。設定 'ignore-delete' = 'true' 可忽略這兩類訊息。

  • first-row 合并機制不支援指定 sequence field。

例如,考慮以下建立 Paimon 表的 Flink SQL 陳述式。

CREATE TABLE T (
  k INT,
  v1 DOUBLE,
  v2 STRING,
  PRIMARY KEY (k) NOT ENFORCED
) WITH (
  'merge-engine' = 'first-row'
);

依次寫入以下資料:

  • +I(1, 2.0, 'apple')

  • +I(1, 4.0, 'banana')

  • +I(1, 8.0, 'cherry')

SELECT * FROM T WHERE k = 1 將查詢到 (1, 2.0, 'apple') 這條資料。

變更資料產生機制

通過設定表參數中的 changelog-producer 參數,Paimon 將會以不同的方式產生變更資料,供下遊進行流式讀取。該參數常用的取值有 none(預設值)、inputlookup

none(預設)

適用情境:下遊不進行流式讀取,或下遊不關心完整的變更資料,只關心增量或最新狀態的情境。

設定 'changelog-producer' = 'none' 後,下遊流讀到的是每個 Paimon snapshot 中包含的增量資料,且增量資料中可能多次包含同一主鍵的值。具體來說:

  • 對於 'merge-engine' = 'deduplicate',下遊將讀到完整的一行。

  • 對於 'merge-engine' = 'partial-update',下遊只能讀到被更新的列,其它列的值均為 null。

  • 對於 'merge-engine' = 'aggregation',下遊只能讀到被更新的列的增量值,其它列的值均為 null。

說明

使用 'changelog-producer' = 'none' 進行流式讀取時,建議同時設定 'scan.remove-normalize' = 'true',以清除下遊 Flink 作業的 normalize 運算元。normalize 運算元通過 Flink 狀態將變更資料補充完整,計算開銷較高。對於不關心完整變更資料的情境,無需該運算元;對於關心完整變更資料的情境,建議配置 'changelog-producer' = 'input''changelog-producer' = 'lookup'

input

適用情境: 'merge-engine' = 'deduplicate',輸入資料流本身是完整的變更資料時(例如資料庫的 binlog)且下遊關心完整的變更資料的情境。

設定 'changelog-producer' = 'input' 後,Paimon 主鍵表會直接將輸入的訊息作為變更資料傳遞給下遊消費者。

由於 input 機制不涉及額外的計算,因此它的效率比 lookup 更高。

lookup

適用情境:下遊關心完整的變更資料的所有情境。

設定 'changelog-producer' = 'lookup' 後,Paimon 主鍵表會通過批量點查的方式,在最佳化作業每次 commit 之前觸發小檔案合并,並利用小檔案合并的結果產生完整的變更資料。無論輸入資料流是否為完整的變更資料,都可以使用這一變更資料產生機制。

由於 lookup 機制涉及額外的計算,因此它的效率比 input 更低,但適用情境更加廣泛。

說明

減少無效變更資料的數量:

預設情況下,即使更新後的資料與更新之前相同,Paimon 仍然會產生變更資料。如果您希望消除此類無效的變更資料,可以在表參數中設定 'changelog-producer.row-deduplicate' = 'true'。該參數僅對 lookup 機制有效。

由於設定該參數後需要引入額外的計算對比更新前後的值,因此建議僅在無效變更資料較多的情況下使用該參數。

儲存最佳化模式

DLF 提供了多種儲存最佳化模式,方便使用者根據不同情境,平衡資料的可見度與資源的消耗。此參數在 DLF 控制台配置,詳情請參見查看與配置儲存最佳化策略。DLF支援以下儲存最佳化模式:

  • 動態資源 - 資源延時均衡(預設):對於延遲分桶表,延時一般為 1 ~ 3 分鐘,但出現資源調整或分桶數調整時,延遲可能為 5 ~ 10 分鐘。對於其它啟用了 deletion vectors 的表,延時一般為 1 ~ 3 分鐘。

  • 動態資源 - 延時優先:資源消耗一般是均衡模式的 2 倍,通過較多資源提升最佳化作業的處理速度,並儘可能減少資源調整的情況。

  • 動態資源 - 資源優先:通過資料反壓驅動最佳化作業,資源消耗一般是均衡模式的 1/3 ~ 1/2,但延時可能達到 30 分鐘。

  • 固定資源:最佳化作業將持續運行,不再因為流量變化或寫入停止而調整作業。使用者可以設定作業使用的資源量、小檔案合并間隔,以及延遲分桶表的分桶數。建議僅在動態資源模式延時較高的情況下用於追資料,不建議日常使用。

    說明
    • 小檔案合并間隔指的是最佳化作業的 commit 間隔。設定後,最佳化作業仍將持續運行,不會轉為定時調度。

    • 設定了延遲分桶表的分桶數後,新分區將直接採用該分桶數,且不進行調整。

儲存最佳化資源消耗

以下列出不同情境下的 DLF 最佳化作業資源計算公式,協助您估算相關資源消耗。實際資源消耗情況以賬單為準。

資源延時均衡,模式倍率 = 1
延時優先,模式倍率 = 0.5
資源優先,模式倍率 = 4

# 單條資料大小係數用於防止壓縮率過高,如資料中有很多空值的情況
單條資料大小係數 = min(1, 資料總條數 / 總檔案大小 / 表列數)

######################################################################
# 未啟用 deletion vectors
# 未設定 'changelog-producer' = 'lookup'
######################################################################

並發數 = max(
    流量 / 每並發流量,
    活躍的分桶數 / (0.5 * 每並發分桶數)
)

每並發流量 = 12GB/h * 模式倍率 * 單條資料大小係數

每並發分桶數 = 512 * 模式倍率 * 單條資料大小係數

######################################################################
# 啟用 deletion vectors
# 未設定 merge-engine,或設定 'merge-engine' = 'deduplicate'
# 未設定 'changelog-producer' = 'lookup'
######################################################################

並發數 = max(
    流量 / 每並發流量,
    活躍的分桶數 / 每並發分桶數,
    活躍分區 level > 0 檔案總大小 / 每並發 lookup file 大小
)

每並發流量 = 3GB/h * 模式倍率 * 單條資料大小係數

每並發分桶數 = 128 * 模式倍率 * 單條資料大小係數

# 除主鍵佔比較大的表以外,該係數對資源消耗的影響較小
每並發 lookup file 大小 = 35GB / lookup 磁碟係數
# DLF 最佳化作業每 CU 提供 50GB 本地磁碟,該係數用於防止磁碟不足
# lookup file 緩衝大小一般約為 2 * 主鍵列數 / 表列數
lookup file 緩衝係數 = min(
    1,
    活躍分區 lookup file 緩衝總大小 / 活躍分區 level > 1 檔案總大小
)

######################################################################
# 其它情況
######################################################################

並發數 = max(
    流量 / 每並發流量,
    活躍的分桶數 / (0.5 * 每並發分桶數),
    活躍分區 level > 0 檔案總大小 / 每並發 lookup file 大小
)

每並發流量 = 3GB/h * 模式倍率 * 單條資料大小係數

每並發分桶數 = 64 * 模式倍率 * 單條資料大小係數

每並發 lookup file 大小 = max(35GB / lookup 磁碟係數, 4GB)
# DLF 最佳化作業每 CU 提供 50GB 本地磁碟,該係數用於防止磁碟不足
# 該係數一般在 1.5 ~ 2.5 之間
lookup 磁碟係數 = max(
    1,
    活躍分區 lookup file 緩衝總大小 / 活躍分區 level > 1 檔案總大小
)

當滿足以下條件時,將會調整 DLF 最佳化作業的資源:

  • 某一併發活躍的分桶數 > 每並發分桶數

  • 流量 > 2 * 每並發流量 * 並發數

  • 流量 < 0.5 * 本次作業最大流量,且持續 12 分鐘以上

資源分派補充說明

說明
  • 資源用量排查:單張表資源偏高通常因分區粒度過細,導致活躍分區數量激增。建議檢查並最佳化分區策略。

  • 每並發資源:預設 1CU。若出現記憶體不足,DLF 可能上調至 2CU 或 4CU。可在DLF控制台表詳情頁面的健康診斷 > 事件中心查看相關事件。

  • Job Manager 資源:每個作業需額外 1CU 的 job manager 節點。並發較高或記憶體不足時,可能上調至 2CU 或 4CU。

  • 小表共用機制:對於延遲分桶表,滿足特定條件的多張小表(至多 32 張)可以共用同一最佳化作業以節省資源。具體數量與表本身的大小、表的寫入時機相關。小表判定條件如下:

    • 儲存最佳化模式選擇“資源延時均衡”或“資源優先”。

    • 同時活躍分區數不足 8 個。

    • 不滿足儲存最佳化資源消耗“其它情況”的表,同時活躍分區的總大小不超過 16GB;或滿足儲存最佳化資源消耗“其它情況”的表,同時活躍分區的總大小不超過 4GB。

  • 資源優先模式:受反壓驅動,短時內資源消耗可能偏高,但總體為均衡模式的 1/3 ~ 1/2。