全部产品
Search
文档中心

数据湖构建:Paimon主键表:配置与特性

更新时间:May 15, 2026

本文介绍 Paimon 主键表的核心配置,包括分桶方式、数据合并机制(merge-engine)、变更数据产生机制(changelog-producer)以及 DLF 存储优化模式,帮助您根据业务场景选择合适的配置。

配置速查

类别

场景

配置建议

分桶方式

  • 可接受日常 1 ~ 3 分钟,偶尔 5 ~ 10 分钟延时。

  • 不同分区数据量差距较大。

  • 不想关心分桶数的计算。

延迟分桶(建表不指定bucket,默认)

需要流作业写入的数据,commit 后马上可见。

同时设置:

  • 固定分桶('bucket' = '<num>'

  • 'deletion-vectors.enabled' = 'false'(默认)

主键无法完全包含分区键,且表大小 < 100GB。

动态分桶('bucket' = '-1'

Deletion Vectors

需要使用 StarRocks 或 Hologres 等引擎进行高性能查询。

'deletion-vectors.enabled' = 'true'

数据合并机制

新数据完全覆盖老数据。

'merge-engine' = 'deduplicate'(默认)

  • 每条数据只更新部分列。

  • 需要进行多流打宽。

'merge-engine' = 'partial-update'

需要进行数据的聚合。

'merge-engine' = 'aggregation'

每个主键只想保留第一条数据。

'merge-engine' = 'first-row'

变更数据产生机制

下游不进行流消费。

'changelog-producer' = 'none'(默认)

设置了 'merge-engine' = 'deduplicate'(默认),下游不关心完整变更数据,只关心最新状态。

  • 'changelog-producer' = 'none'(默认)

  • 'scan.remove-normalize' = 'true'

设置了 'merge-engine' = 'partial-update''merge-engine' = 'aggregation',下游不关心完整变更数据,不需要每行完整的最新状态,只关心变化情况。

'changelog-producer' = 'none'(默认)

上游流入的是完整的变更数据(例如数据库 binlog),设置了 'merge-engine' = 'deduplicate',下游需要完整的变更数据。

'changelog-producer' = 'input'

其它下游需要完整的变更数据的情况。

'changelog-producer' = 'lookup'

存储优化模式

可接受日常 1 ~ 3 分钟,偶尔 5 ~ 10 分钟延时。

资源延时均衡(默认)

愿意使用更多资源,以减少资源调整带来的延时。

延时优先

可接受 30 分钟左右延时,希望节省更多资源。

资源优先

创建主键表

如果在创建 Paimon 表时指定了主键(primary key),则该表就是 Paimon 主键表。例如,以下 SQL 语句将创建一张分区键为 dt,主键为 dt、shop_id 和 user_id 的主键表。

Flink SQL

CREATE TABLE T (
  dt STRING,
  shop_id BIGINT,
  user_id BIGINT,
  num_orders INT,
  total_amount INT,
  PRIMARY KEY (dt, shop_id, user_id) NOT ENFORCED
) PARTITIONED BY (dt);

Spark SQL

CREATE TABLE T (
  dt STRING,
  shop_id BIGINT,
  user_id BIGINT,
  num_orders INT,
  total_amount INT
) PARTITIONED BY (dt) TBLPROPERTIES (
  'primary-key' = 'dt,shop_id,user_id'
);

主键表中,每行数据的主键各不相同。如果将多条具有相同主键的数据写入主键表,将会根据数据合并机制对这些数据进行合并。

下面将逐一介绍分桶方式、Deletion Vectors、数据合并机制与变更数据产生机制等Paimon主键表核心特性的具体配置与使用方式。

分桶方式

分桶(bucket)是Paimon主键表读写操作的最小单元。非分区表的所有数据,以及分区表每个分区的数据,都会被进一步划分到不同的分桶中,以便同一作业使用多个并发同时读写 Paimon 表,加快读写效率。

延迟分桶(默认)

创建 Paimon 主键表时,不在表参数中指定 bucket,或指定 'bucket' = '-2',将会创建延迟分桶的主键表。

数据写入延迟分桶表时,将首先进入对应分区的 bucket-postpone 临时目录中。随后,DLF 优化作业决定各分区的分桶数,将数据写入目标分桶目录,并进行小文件合并。

延迟分桶作为 DLF 的特色功能,具备如下优势:

  • 自动确定分桶数:根据数据量、流量等指标自动计算,用户无需手动设置。

  • 动态调整:当数据量等指标发生较大变化时,自动调整分桶数,无需人工干预。

  • 分区级分桶:不同分区可使用不同的分桶数,适配分区大小不一的情况。

  • 并发写入:多个作业可同时写入而不产生冲突。

分桶数计算逻辑

以下列出 DLF 优化作业确定分桶数的公式,帮助您估算相关分区的分桶数。

# 单条数据大小系数用于防止压缩率过高,如数据中有很多空值的情况
单条数据大小系数 = min(1, 数据总条数 / 总文件大小 / 表列数)

文件大小相关分桶数 = max(
    分区文件总大小 / (1.3 * 每分桶文件大小),
    最大分桶文件总大小 / (1.7 * 每分桶文件大小)
)

# 最终分桶数将上取整至最近 2 的幂次,且最大值为 2048
最终分桶数 = max(
    文件大小相关分桶数,
    分区数据总条数 / 每分桶数据条数,
    最大分桶数据总条数 / (1.7 * 每分桶数据条数),
    min(流量 / 每分桶流量, 4 * 文件大小相关分桶数, 128)
)

每分桶文件大小 = 768MB

每分桶流量 = 1536MB/h

######################################################################
# 未启用 deletion vectors
# 未设置 'changelog-producer' = 'lookup'
######################################################################

每分桶数据条数 = +∞

######################################################################
# 其它情况
######################################################################

每分桶数据条数 = 40_000_000

当满足以下条件时,DLF将会调整分桶数:

  • 分区数据总大小 > 2 * 每分桶文件大小 * 分桶数

  • 某一分桶数据大小 > 2 * 每分桶文件大小

  • 分区当前分桶数 < 0.125 * 目标分桶数,且分区大小不超过 64GB

数据的分发

默认情况下,DLF将根据每条数据主键的哈希值,确定该数据属于哪个分桶。

修改分发方式

如需修改数据的分发方式,可在创建 Paimon 主键表时指定 bucket-key 参数。例如,如果设置了 'bucket-key' = 'c1,c2',将根据每条数据 c1c2 两列的值,确定该数据属于哪个分桶。

说明
  • 不同列的名称用英文逗号分隔。

  • 主键必须完整包含 bucket-key

  • bucket-key 列的取值应尽量均匀分布,避免数据集中在少数分桶中导致读写性能下降。

数据可见性

延迟分桶表的数据可见性取决于写入方式。满足以下条件时,DLF 将启用直写固定分桶优化(即跳过 bucket-postpone 临时目录,将数据直接写入固定分桶),数据写入并 commit 后即刻可见:

  • 主键表未启用 deletion vectors,且通过 Flink 批作业或 Spark 作业执行 INSERT INTOINSERT OVERWRITE 操作。

  • 主键表已启用 deletion vectors,且通过 Flink 批作业或 Spark 作业执行 INSERT OVERWRITE 操作。

其它情况下,数据需等待 DLF 处理后可见,通常在 1 ~ 3 分钟内。若 DLF 正在进行资源或分桶数调整,可能延长至 5 ~ 10 分钟。可通过选择特定的存储优化模式减少调整频率,详见存储优化模式

说明

关于直写固定分桶优化:

  • 控制参数'postpone.batch-write-fixed-bucket',Boolean类型,取值为true(启用)或false(不启用)。

  • 支持引擎版本:实时计算 Flink 版 VVR 11.4 及以上、EMR Serverless Spark esr-4.7.0 及以上支持启用直写固定分桶优化。

  • 新分区写入分桶数:启用直写固定分桶优化时,若写入新分区,则新分区的分桶数将自动设置为 Flink 作业的并发数或 Spark 作业的 partition 数,上限值为 2048。分桶数后续可能被 DLF 优化作业调整。

  • 已有分区写入冲突:启用直写固定分桶优化时,若写入已存在的分区,且在写入过程中 DLF 调整了该分区的分桶数,写入作业将冲突失败。可设置 'postpone.batch-write-fixed-bucket' = 'false' 关闭此优化,防止冲突的产生。

  • Deletion vectors 表:启用 deletion vectors 的表,通过 Flink 批作业或 Spark 作业进行 INSERT INTO 操作时,建议关闭直写固定分桶优化。

数据一致性

  • 同一作业写入:对于同一用户作业写入的数据,DLF 优化作业将按写入顺序处理数据,即保证 sequential 级别的一致性。

    说明

    从 checkpoint 或 savepoint 重启,且并发数不变的作业,也认为与重启前是同一用户作业。

  • 不同作业写入:对于不同用户作业写入的数据,Paimon 表的最终状态可能是两个作业结果数据的混合,但不会有数据丢失,即保证 snapshot isolation 级别的一致性。如果对数据的合并顺序有需求,可以设置 sequence field 以指定数据的合并顺序,详见处理乱序数据

固定分桶

创建 Paimon 主键表时,在表参数中指定 'bucket' = '<num>',即可指定非分区表的分桶数为 <num>,或分区表单个分区的分桶数为 <num>。其中,<num> 是一个大于 0 的整数。

说明

分桶数过少会限制作业实际并发数,同时导致单个分桶数据量过大,降低读写性能,因此分桶数不宜太小。然而,分桶数过大会造成小文件数量过多。建议每个分桶的数据大小在 1 GB 左右。

数据的分发

与延迟分桶一致,详见数据的分发

数据可见性

固定分桶表的数据可见性与 deletion vectors 有关:

  • 未启用 deletion vectors 的表,数据写入并 commit 之后马上可见。

  • 启用 deletion vectors 的表,写入的数据需要等待 DLF 优化作业处理后才能读取。一般情况下,数据将在 1 ~ 3 分钟后可见。

动态分桶

创建 Paimon 主键表时,在表参数中指定 'bucket' = '-1',将会创建动态分桶的 Paimon 表。

说明

使用限制:

  • 动态分桶表不支持多个作业同时写入。

  • 对于 100 GB 以上的表,动态分桶性能损耗较大,不建议使用。

数据的分发

动态分桶表会先将数据写入已有的分桶中,当分桶的数据量超过限制时,再自动创建新的分桶。以下表参数将会影响动态分桶的行为:

参数

数据类型

默认值

备注

dynamic-bucket.target-row-num

Long

2000000

每个分桶最多存储几条数据。

dynamic-bucket.initial-buckets

Integer

-

初始的分桶数。如果不设置,初始将创建等同于 writer 算子并发数的分桶。

动态分桶表的更新行为和资源消耗取决于主键与分区键的关系

  • 主键完全包含分区键 → 非跨分区更新(仅消耗堆内存)

  • 主键不完全包含分区键 → 跨分区更新(需要 RocksDB,性能开销更大)

非跨分区更新

对于主键完全包含分区键的动态分桶表,Paimon 可以确定该主键属于哪个分区,但无法确定属于哪个分桶,因此需要使用额外的堆内存创建索引,以维护主键与分桶编号的映射关系。具体来说,每 1 亿条主键将额外消耗 1GB 的堆内存。只有当前正在写入的分区才会消耗堆内存,历史分区中的主键不会消耗堆内存。

除堆内存的消耗外,相比其它分桶方式,主键完全包含分区键的动态分桶表不会有明显的性能损失。

跨分区更新

对于主键不完全包含分区键的动态分桶表,Paimon 无法根据主键确定该数据属于哪个分区的哪个分桶,因此需要使用 rocksdb 维护主键与分区以及分桶编号的映射关系。相比固定分桶而言,数据量较大的表可能产生明显的性能损失。另外,因为作业启动时需要将映射关系全量加载至 rocksdb 中,作业的启动速度也会变慢。

此外,数据合并机制也影响动态分桶下的跨分区更新行为。具体来说:

  • deduplicate:数据将会从老分区删除,并插入新分区。

  • aggregationpartial-update:数据将会直接在老分区中更新,无视新数据的分区键。

  • first-row:如果相同主键的数据已经存在,则新数据将被直接丢弃。

数据可见性

与固定分桶表一致,详见数据可见性

Deletion Vectors

创建 Paimon 表时,在表参数中设置 'deletion-vectors.enabled' = 'true',即可启用 Deletion Vectors。

在小文件合并的过程中额外生成 Deletion Vectors,可以提升 Paimon 主键表的查询性能,适用于查询性能敏感,或读多写少的场景。

说明

数据合并机制

如果将多条具有相同主键的数据写入 Paimon 主键表,Paimon 将会根据表参数中设置的 merge-engine 参数对这些数据进行合并。该参数的取值有 deduplicate(默认值)、partial-updateaggregationfirst-row

处理乱序数据

默认情况下,Paimon 会按照数据的输入顺序确定合并的顺序,最后写入 Paimon 的数据会被认为是最新数据。如果输入数据流存在乱序数据,也可以通过在表参数中指定 'sequence.field' = '<column-name>',具有相同主键的数据将按 <column-name> 这一列的值从小到大进行合并。

可以作为 sequence field 的数据类型有:TINYINT、SMALLINT、INTEGER、BIGINT、TIMESTAMP、TIMESTAMP_LTZ。

deduplicate(默认)

设置 'merge-engine' = 'deduplicate' 后,对于多条具有相同主键的数据,Paimon 主键表仅会保留最新一条数据,并丢弃其它具有相同主键的数据。如果最新数据是一条 delete 消息,所有具有该主键的数据都会被丢弃。

考虑以下创建 Paimon 表的 Flink SQL 语句:

CREATE TABLE T (
  k INT,
  v1 DOUBLE,
  v2 STRING,
  PRIMARY KEY (k) NOT ENFORCED
) WITH (
  'merge-engine' = 'deduplicate' -- deduplicate 是默认值,可以不设置
);

依次写入以下数据:

  • +I(1, 2.0, 'apple')

  • +I(1, 4.0, 'banana')

  • +I(1, 8.0, 'cherry')

SELECT * FROM T WHERE k = 1 将查询到 (1, 8.0, 'cherry') 这条数据。

依次写入以下数据:

  • +I(1, 2.0, 'apple')

  • +I(1, 4.0, 'banana')

  • -D(1, 4.0, 'banana')

SELECT * FROM T WHERE k = 1 将查不到任何数据。

aggregation

设置 'merge-engine' = 'aggregation' 后,对于多条具有相同主键的数据,Paimon 主键表将会根据您指定的聚合函数进行聚合。

对于不属于主键的每一列,都需要通过 fields.<field-name>.aggregate-function 指定一个聚合函数,否则该列将默认使用 last_non_null_value 聚合函数。

例如,考虑以下创建 Paimon 表的 Flink SQL 语句:

CREATE TABLE T (
  product_id BIGINT,
  price DOUBLE,
  sales BIGINT,
  PRIMARY KEY (product_id) NOT ENFORCED
) WITH (
  'merge-engine' = 'aggregation',
  'fields.price.aggregate-function' = 'max',
  'fields.sales.aggregate-function' = 'sum'
);

price 列将会使用 max 函数进行聚合,而 sales 列将会使用 sum 函数进行聚合。依次写入以下数据:

  • +I(1, 23.0, 15)

  • +I(1, 30.2, 20)

SELECT * FROM T WHERE product_id = 1 将查询到 (1, 30.2, 35) 这条数据。

支持的聚合函数

  • sum:求和函数。支持的数据类型为 DECIMAL、TINYINT、SMALLINT、INTEGER、BIGINT、FLOAT、DOUBLE。

  • product:求乘积函数。支持的数据类型为 DECIMAL、TINYINT、SMALLINT、INTEGER、BIGINT、FLOAT、DOUBLE。

  • max:求最大值的函数。支持的数据类型为 CHAR、VARCHAR、DECIMAL、TINYINT、SMALLINT、INTEGER、BIGINT、FLOAT、DOUBLE、DATE、TIME、TIMESTAMP、TIMESTAMP_LTZ。

  • min:求最小值的函数。支持的数据类型为 CHAR、VARCHAR、DECIMAL、TINYINT、SMALLINT、INTEGER、BIGINT、FLOAT、DOUBLE、DATE、TIME、TIMESTAMP、TIMESTAMP_LTZ。

  • first_value:返回第一次输入的值,包括 null。支持所有数据类型。

  • first_not_null_value:返回第一次输入的非 null 值。支持所有数据类型。

  • last_value:返回最新输入的值,包括 null。支持所有数据类型。

  • last_non_null_value(默认):返回最新输入的非 null 值。支持所有数据类型。

  • listagg:将输入的字符串依次用英文逗号连接。例如,输入的字符串为 applebananacherry,将返回 apple,banana,cherry。支持 STRING 类型。

  • bool_and:对所有输入的值求 and。支持 BOOLEAN 类型。

  • bool_or:对所有输入的值求 or。支持 BOOLEAN 类型。

  • rbm32:合并多个 32 位 RoaringBitmap。支持 VARBINARY 类型。

  • rbm64:合并多个 64 位 RoaringBitmap。支持 VARBINARY 类型。

  • nested_update:将多行聚合到一个 ARRAY 中。通过设置表参数 'fields.<field-name>.nested-key' = 'pk0,pk1,...',还可以将 ARRAY 中的数据根据 nested key 进行去重(保留最后一条)。若未设置 nested key,则每条新数据将追加在 ARRAY 末尾。

  • nested_partial_update:与 nested_update 类似,但会根据 nested key 对 ARRAY 中的数据进行部分列更新。必须设置表参数 'fields.<field-name>.nested-key' = 'pk0,pk1,...'

  • collect:将多个 ARRAY 合并成一个。通过设置表参数 'fields.<field-name>.distinct' = 'true',可以对 ARRAY 中的数据进行去重。

  • merge_map:将多个 MAP 合并成一个。相同 key 的数据将保留最后一个 value。

说明

上述聚合函数中,只有 sumproductlast_valuelast_non_null_valuenested_updatecollectmerge_map 支持回撤消息(update_before 与 delete 消息)。可以设置 'fields.<field-name>.ignore-retract' = 'true' 使对应列忽略回撤消息。

聚合函数使用举例

例如,使用nested_update聚合函数合并多个子订单记录:

-- 主订单表
CREATE TABLE orders (
  order_id BIGINT PRIMARY KEY NOT ENFORCED,
  user_name STRING,
  address STRING
);

-- 子订单表
CREATE TABLE sub_orders (
  order_id BIGINT,
  sub_order_id INT,
  product_name STRING,
  price BIGINT,
  PRIMARY KEY (order_id, sub_order_id) NOT ENFORCED
);

-- 宽表
-- 将同一主订单的所有子订单聚合到 sub_orders 中
-- 同时,sub_orders 中的结果将根据 sub_order_id 进行去重
CREATE TABLE order_wide (
  order_id BIGINT PRIMARY KEY NOT ENFORCED,
  user_name STRING,
  address STRING,
  sub_orders ARRAY<ROW<sub_order_id BIGINT, product_name STRING, price BIGINT>>
) WITH (
  'merge-engine' = 'aggregation',
  'fields.sub_orders.aggregate-function' = 'nested_update',
  'fields.sub_orders.nested-key' = 'sub_order_id'
);

INSERT INTO order_wide

SELECT
  order_id,
  user_name,
  address,
  CAST (NULL AS ARRAY<ROW<sub_order_id BIGINT, product_name STRING, price BIGINT>>)
FROM orders

UNION ALL

SELECT
  order_id,
  CAST (NULL AS STRING),
  CAST (NULL AS STRING),
  ARRAY[ROW(sub_order_id, product_name, price)]
FROM sub_orders;

假设主订单表中的数据为:

  • (1, 'Alice', 'add1'),

  • (2, 'Bob', 'add2')

子订单表中的数据为:

  • (1, 11, 'apple', 10),

  • (1, 12, 'banana', 20),

  • (2, 21, 'cherry', 30),

  • (2, 22, 'peach', 40),

  • (2, 21, 'cherry', 50)

那么宽表的最终数据为:

  • (1, 'Alice', 'add1', [(11, 'apple', 10), (12, 'banana', 20)])

  • (2, 'Bob', 'add2', [(21, 'cherry', 50), (22, 'peach', 40)])

partial-update

设置 'merge-engine' = 'partial-update' 后,可以通过多条消息对数据进行逐步更新,并最终得到完整的数据。具体来说,具有相同主键的新数据将会覆盖原来的数据,但新数据中值为 null 的列不会覆盖已有值。

说明

partial-update 无法处理 delete 与 update_before 消息。设置 'ignore-delete' = 'true' 可忽略这两类消息。

例如,考虑以下创建 Paimon 表的 Flink SQL 语句:

CREATE TABLE T (
  k INT,
  v1 DOUBLE,
  v2 BIGINT,
  v3 STRING,
  PRIMARY KEY (k) NOT ENFORCED
) WITH (
  'merge-engine' = 'partial-update'
);

依次写入以下数据:

  • +I(1, 23.0, 10, NULL)

  • +I(1, NULL, NULL, 'This is a book')

  • +I(1, 25.2, NULL, NULL)

SELECT * FROM T WHERE k = 1 将查询到 (1, 25.2, 10, 'This is a book') 这条数据。

为不同列分别指定合并顺序

在 partial-update 合并机制中,除了 sequence field 之外,您也可以通过 sequence group 为不同列分别指定合并顺序。该功能可用于在打宽场景下处理乱序数据,为来自不同源表的列分别指定合并顺序。

例如,考虑以下创建 Paimon 表的 Flink SQL 语句。

CREATE TABLE T (
  k INT,
  a STRING,
  b STRING,
  g_1 INT,
  c STRING,
  d STRING,
  g_2 INT,
  PRIMARY KEY (k) NOT ENFORCED
) WITH (
  'merge-engine' = 'partial-update',
  'fields.g_1.sequence-group' = 'a,b',
  'fields.g_2.sequence-group' = 'c,d'
);

a、b 两列以 g_1 列的值作为合并顺序依据(值越大表示数据越新),c、d 两列以 g_2 列的值作为合并顺序依据。

同时进行数据的打宽与聚合

在 partial-update 合并机制中,可以在表参数中设置 fields.<field-name>.aggregate-function,为 <field-name> 列指定聚合函数,对该列的值进行聚合。

说明
  • <field-name> 列需要属于某个 sequence group。

  • aggregation 合并机制支持的聚合函数均可使用。

例如,考虑以下创建 Paimon 表的 Flink SQL 语句。

CREATE TABLE T (
  k INT,
  a STRING,
  b INT,
  g_1 INT,
  c STRING,
  d INT,
  g_2 INT,
  PRIMARY KEY (k) NOT ENFORCED
) WITH (
  'merge-engine' = 'partial-update',
  'fields.g_1.sequence-group' = 'a,b',
  'fields.b.aggregate-function' = 'max',
  'fields.g_2.sequence-group' = 'c,d',
  'fields.d.aggregate-function' = 'sum'
);

a、b 两列以 g_1 列的值作为合并顺序依据(值越大表示数据越新),其中 a 列保留最新的非 null 值(未指定,默认使用last_non_null_value聚合函数),b 列保留输入的最大值。c、d 两列以 g_2 列的值作为合并顺序依据,其中 c 列保留最新的非 null 值,d 列求出输入数据的和。

first-row

设置 'merge-engine' = 'first-row' 后,Paimon 只会保留相同主键数据中的第一条。与 deduplicate 合并机制相比,first-row 只会产生 insert 类型的变更数据,且变更数据的产出效率更高。

说明

使用限制:

  • first-row 无法处理 delete 与 update_before 消息。设置 'ignore-delete' = 'true' 可忽略这两类消息。

  • first-row 合并机制不支持指定 sequence field。

例如,考虑以下创建 Paimon 表的 Flink SQL 语句。

CREATE TABLE T (
  k INT,
  v1 DOUBLE,
  v2 STRING,
  PRIMARY KEY (k) NOT ENFORCED
) WITH (
  'merge-engine' = 'first-row'
);

依次写入以下数据:

  • +I(1, 2.0, 'apple')

  • +I(1, 4.0, 'banana')

  • +I(1, 8.0, 'cherry')

SELECT * FROM T WHERE k = 1 将查询到 (1, 2.0, 'apple') 这条数据。

变更数据产生机制

通过设置表参数中的 changelog-producer 参数,Paimon 将会以不同的方式产生变更数据,供下游进行流式读取。该参数常用的取值有 none(默认值)、inputlookup

none(默认)

适用场景:下游不进行流式读取,或下游不关心完整的变更数据,只关心增量或最新状态的场景。

设置 'changelog-producer' = 'none' 后,下游流读到的是每个 Paimon snapshot 中包含的增量数据,且增量数据中可能多次包含同一主键的值。具体来说:

  • 对于 'merge-engine' = 'deduplicate',下游将读到完整的一行。

  • 对于 'merge-engine' = 'partial-update',下游只能读到被更新的列,其它列的值均为 null。

  • 对于 'merge-engine' = 'aggregation',下游只能读到被更新的列的增量值,其它列的值均为 null。

说明

使用 'changelog-producer' = 'none' 进行流式读取时,建议同时设置 'scan.remove-normalize' = 'true',以清除下游 Flink 作业的 normalize 算子。normalize 算子通过 Flink 状态将变更数据补充完整,计算开销较高。对于不关心完整变更数据的场景,无需该算子;对于关心完整变更数据的场景,建议配置 'changelog-producer' = 'input''changelog-producer' = 'lookup'

input

适用场景: 'merge-engine' = 'deduplicate',输入数据流本身是完整的变更数据时(例如数据库的 binlog)且下游关心完整的变更数据的场景。

设置 'changelog-producer' = 'input' 后,Paimon 主键表会直接将输入的消息作为变更数据传递给下游消费者。

由于 input 机制不涉及额外的计算,因此它的效率比 lookup 更高。

lookup

适用场景:下游关心完整的变更数据的所有场景。

设置 'changelog-producer' = 'lookup' 后,Paimon 主键表会通过批量点查的方式,在优化作业每次 commit 之前触发小文件合并,并利用小文件合并的结果产生完整的变更数据。无论输入数据流是否为完整的变更数据,都可以使用这一变更数据产生机制。

由于 lookup 机制涉及额外的计算,因此它的效率比 input 更低,但适用场景更加广泛。

说明

减少无效变更数据的数量:

默认情况下,即使更新后的数据与更新之前相同,Paimon 仍然会产生变更数据。如果您希望消除此类无效的变更数据,可以在表参数中设置 'changelog-producer.row-deduplicate' = 'true'。该参数仅对 lookup 机制有效。

由于设置该参数后需要引入额外的计算对比更新前后的值,因此建议仅在无效变更数据较多的情况下使用该参数。

存储优化模式

DLF 提供了多种存储优化模式,方便用户根据不同场景,平衡数据的可见性与资源的消耗。此参数在 DLF 控制台配置,详情请参见查看与配置存储优化策略。DLF支持以下存储优化模式:

  • 动态资源 - 资源延时均衡(默认):对于延迟分桶表,延时一般为 1 ~ 3 分钟,但出现资源调整或分桶数调整时,延迟可能为 5 ~ 10 分钟。对于其它启用了 deletion vectors 的表,延时一般为 1 ~ 3 分钟。

  • 动态资源 - 延时优先:资源消耗一般是均衡模式的 2 倍,通过较多资源提升优化作业的处理速度,并尽可能减少资源调整的情况。

  • 动态资源 - 资源优先:通过数据反压驱动优化作业,资源消耗一般是均衡模式的 1/3 ~ 1/2,但延时可能达到 30 分钟。

  • 固定资源:优化作业将持续运行,不再因为流量变化或写入停止而调整作业。用户可以设置作业使用的资源量、小文件合并间隔,以及延迟分桶表的分桶数。建议仅在动态资源模式延时较高的情况下用于追数据,不建议日常使用。

    说明
    • 小文件合并间隔指的是优化作业的 commit 间隔。设置后,优化作业仍将持续运行,不会转为定时调度。

    • 设置了延迟分桶表的分桶数后,新分区将直接采用该分桶数,且不进行调整。

存储优化资源消耗

以下列出不同场景下的 DLF 优化作业资源计算公式,帮助您估算相关资源消耗。实际资源消耗情况以账单为准。

资源延时均衡,模式倍率 = 1
延时优先,模式倍率 = 0.5
资源优先,模式倍率 = 4

# 单条数据大小系数用于防止压缩率过高,如数据中有很多空值的情况
单条数据大小系数 = min(1, 数据总条数 / 总文件大小 / 表列数)

######################################################################
# 未启用 deletion vectors
# 未设置 'changelog-producer' = 'lookup'
######################################################################

并发数 = max(
    流量 / 每并发流量,
    活跃的分桶数 / (0.5 * 每并发分桶数)
)

每并发流量 = 12GB/h * 模式倍率 * 单条数据大小系数

每并发分桶数 = 512 * 模式倍率 * 单条数据大小系数

######################################################################
# 启用 deletion vectors
# 未设置 merge-engine,或设置 'merge-engine' = 'deduplicate'
# 未设置 'changelog-producer' = 'lookup'
######################################################################

并发数 = max(
    流量 / 每并发流量,
    活跃的分桶数 / 每并发分桶数,
    活跃分区 level > 0 文件总大小 / 每并发 lookup file 大小
)

每并发流量 = 3GB/h * 模式倍率 * 单条数据大小系数

每并发分桶数 = 128 * 模式倍率 * 单条数据大小系数

# 除主键占比较大的表以外,该系数对资源消耗的影响较小
每并发 lookup file 大小 = 35GB / lookup 磁盘系数
# DLF 优化作业每 CU 提供 50GB 本地磁盘,该系数用于防止磁盘不足
# lookup file 缓存大小一般约为 2 * 主键列数 / 表列数
lookup file 缓存系数 = min(
    1,
    活跃分区 lookup file 缓存总大小 / 活跃分区 level > 1 文件总大小
)

######################################################################
# 其它情况
######################################################################

并发数 = max(
    流量 / 每并发流量,
    活跃的分桶数 / (0.5 * 每并发分桶数),
    活跃分区 level > 0 文件总大小 / 每并发 lookup file 大小
)

每并发流量 = 3GB/h * 模式倍率 * 单条数据大小系数

每并发分桶数 = 64 * 模式倍率 * 单条数据大小系数

每并发 lookup file 大小 = max(35GB / lookup 磁盘系数, 4GB)
# DLF 优化作业每 CU 提供 50GB 本地磁盘,该系数用于防止磁盘不足
# 该系数一般在 1.5 ~ 2.5 之间
lookup 磁盘系数 = max(
    1,
    活跃分区 lookup file 缓存总大小 / 活跃分区 level > 1 文件总大小
)

当满足以下条件时,将会调整 DLF 优化作业的资源:

  • 某一并发活跃的分桶数 > 每并发分桶数

  • 流量 > 2 * 每并发流量 * 并发数

  • 流量 < 0.5 * 本次作业最大流量,且持续 12 分钟以上

资源分配补充说明

说明
  • 资源用量排查:单张表资源偏高通常因分区粒度过细,导致活跃分区数量激增。建议检查并优化分区策略。

  • 每并发资源:默认 1CU。若出现内存不足,DLF 可能上调至 2CU 或 4CU。可在DLF控制台表详情页面的健康诊断 > 事件中心查看相关事件。

  • Job Manager 资源:每个作业需额外 1CU 的 job manager 节点。并发较高或内存不足时,可能上调至 2CU 或 4CU。

  • 小表共享机制:对于延迟分桶表,满足特定条件的多张小表(至多 32 张)可以共享同一优化作业以节省资源。具体数量与表本身的大小、表的写入时机相关。小表判定条件如下:

    • 存储优化模式选择“资源延时均衡”或“资源优先”。

    • 同时活跃分区数不足 8 个。

    • 不满足存储优化资源消耗“其它情况”的表,同时活跃分区的总大小不超过 16GB;或满足存储优化资源消耗“其它情况”的表,同时活跃分区的总大小不超过 4GB。

  • 资源优先模式:受反压驱动,短时内资源消耗可能偏高,但总体为均衡模式的 1/3 ~ 1/2。