本文介绍 Paimon 主键表的核心配置,包括分桶方式、数据合并机制(merge-engine)、变更数据产生机制(changelog-producer)以及 DLF 存储优化模式,帮助您根据业务场景选择合适的配置。
配置速查
类别 | 场景 | 配置建议 |
分桶方式 |
| 延迟分桶(建表不指定 |
需要流作业写入的数据,commit 后马上可见。 | 同时设置:
| |
主键无法完全包含分区键,且表大小 < 100GB。 | 动态分桶( | |
Deletion Vectors | 需要使用 StarRocks 或 Hologres 等引擎进行高性能查询。 |
|
数据合并机制 | 新数据完全覆盖老数据。 |
|
|
| |
需要进行数据的聚合。 |
| |
每个主键只想保留第一条数据。 |
| |
变更数据产生机制 | 下游不进行流消费。 |
|
设置了 |
| |
设置了 |
| |
上游流入的是完整的变更数据(例如数据库 binlog),设置了 |
| |
其它下游需要完整的变更数据的情况。 |
| |
存储优化模式 | 可接受日常 1 ~ 3 分钟,偶尔 5 ~ 10 分钟延时。 | 资源延时均衡(默认) |
愿意使用更多资源,以减少资源调整带来的延时。 | 延时优先 | |
可接受 30 分钟左右延时,希望节省更多资源。 | 资源优先 |
创建主键表
如果在创建 Paimon 表时指定了主键(primary key),则该表就是 Paimon 主键表。例如,以下 SQL 语句将创建一张分区键为 dt,主键为 dt、shop_id 和 user_id 的主键表。
Flink SQL
CREATE TABLE T (
dt STRING,
shop_id BIGINT,
user_id BIGINT,
num_orders INT,
total_amount INT,
PRIMARY KEY (dt, shop_id, user_id) NOT ENFORCED
) PARTITIONED BY (dt);Spark SQL
CREATE TABLE T (
dt STRING,
shop_id BIGINT,
user_id BIGINT,
num_orders INT,
total_amount INT
) PARTITIONED BY (dt) TBLPROPERTIES (
'primary-key' = 'dt,shop_id,user_id'
);主键表中,每行数据的主键各不相同。如果将多条具有相同主键的数据写入主键表,将会根据数据合并机制对这些数据进行合并。
下面将逐一介绍分桶方式、Deletion Vectors、数据合并机制与变更数据产生机制等Paimon主键表核心特性的具体配置与使用方式。
分桶方式
分桶(bucket)是Paimon主键表读写操作的最小单元。非分区表的所有数据,以及分区表每个分区的数据,都会被进一步划分到不同的分桶中,以便同一作业使用多个并发同时读写 Paimon 表,加快读写效率。
延迟分桶(默认)
创建 Paimon 主键表时,不在表参数中指定 bucket,或指定 'bucket' = '-2',将会创建延迟分桶的主键表。
数据写入延迟分桶表时,将首先进入对应分区的 bucket-postpone 临时目录中。随后,DLF 优化作业决定各分区的分桶数,将数据写入目标分桶目录,并进行小文件合并。
延迟分桶作为 DLF 的特色功能,具备如下优势:
自动确定分桶数:根据数据量、流量等指标自动计算,用户无需手动设置。
动态调整:当数据量等指标发生较大变化时,自动调整分桶数,无需人工干预。
分区级分桶:不同分区可使用不同的分桶数,适配分区大小不一的情况。
并发写入:多个作业可同时写入而不产生冲突。
数据的分发
默认情况下,DLF将根据每条数据主键的哈希值,确定该数据属于哪个分桶。
数据可见性
延迟分桶表的数据可见性取决于写入方式。满足以下条件时,DLF 将启用直写固定分桶优化(即跳过 bucket-postpone 临时目录,将数据直接写入固定分桶),数据写入并 commit 后即刻可见:
主键表未启用 deletion vectors,且通过 Flink 批作业或 Spark 作业执行
INSERT INTO或INSERT OVERWRITE操作。主键表已启用 deletion vectors,且通过 Flink 批作业或 Spark 作业执行
INSERT OVERWRITE操作。
其它情况下,数据需等待 DLF 处理后可见,通常在 1 ~ 3 分钟内。若 DLF 正在进行资源或分桶数调整,可能延长至 5 ~ 10 分钟。可通过选择特定的存储优化模式减少调整频率,详见存储优化模式。
关于直写固定分桶优化:
控制参数:
'postpone.batch-write-fixed-bucket',Boolean类型,取值为true(启用)或false(不启用)。支持引擎版本:实时计算 Flink 版 VVR 11.4 及以上、EMR Serverless Spark esr-4.7.0 及以上支持启用直写固定分桶优化。
新分区写入分桶数:启用直写固定分桶优化时,若写入新分区,则新分区的分桶数将自动设置为 Flink 作业的并发数或 Spark 作业的 partition 数,上限值为 2048。分桶数后续可能被 DLF 优化作业调整。
已有分区写入冲突:启用直写固定分桶优化时,若写入已存在的分区,且在写入过程中 DLF 调整了该分区的分桶数,写入作业将冲突失败。可设置
'postpone.batch-write-fixed-bucket' = 'false'关闭此优化,防止冲突的产生。Deletion vectors 表:启用 deletion vectors 的表,通过 Flink 批作业或 Spark 作业进行
INSERT INTO操作时,建议关闭直写固定分桶优化。
数据一致性
同一作业写入:对于同一用户作业写入的数据,DLF 优化作业将按写入顺序处理数据,即保证 sequential 级别的一致性。
说明从 checkpoint 或 savepoint 重启,且并发数不变的作业,也认为与重启前是同一用户作业。
不同作业写入:对于不同用户作业写入的数据,Paimon 表的最终状态可能是两个作业结果数据的混合,但不会有数据丢失,即保证 snapshot isolation 级别的一致性。如果对数据的合并顺序有需求,可以设置 sequence field 以指定数据的合并顺序,详见处理乱序数据。
固定分桶
创建 Paimon 主键表时,在表参数中指定 'bucket' = '<num>',即可指定非分区表的分桶数为 <num>,或分区表单个分区的分桶数为 <num>。其中,<num> 是一个大于 0 的整数。
分桶数过少会限制作业实际并发数,同时导致单个分桶数据量过大,降低读写性能,因此分桶数不宜太小。然而,分桶数过大会造成小文件数量过多。建议每个分桶的数据大小在 1 GB 左右。
数据的分发
与延迟分桶一致,详见数据的分发。
数据可见性
固定分桶表的数据可见性与 deletion vectors 有关:
未启用 deletion vectors 的表,数据写入并 commit 之后马上可见。
启用 deletion vectors 的表,写入的数据需要等待 DLF 优化作业处理后才能读取。一般情况下,数据将在 1 ~ 3 分钟后可见。
动态分桶
创建 Paimon 主键表时,在表参数中指定 'bucket' = '-1',将会创建动态分桶的 Paimon 表。
使用限制:
动态分桶表不支持多个作业同时写入。
对于 100 GB 以上的表,动态分桶性能损耗较大,不建议使用。
数据的分发
动态分桶表会先将数据写入已有的分桶中,当分桶的数据量超过限制时,再自动创建新的分桶。以下表参数将会影响动态分桶的行为:
参数 | 数据类型 | 默认值 | 备注 |
| Long | 2000000 | 每个分桶最多存储几条数据。 |
| Integer | - | 初始的分桶数。如果不设置,初始将创建等同于 writer 算子并发数的分桶。 |
动态分桶表的更新行为和资源消耗取决于主键与分区键的关系:
非跨分区更新
对于主键完全包含分区键的动态分桶表,Paimon 可以确定该主键属于哪个分区,但无法确定属于哪个分桶,因此需要使用额外的堆内存创建索引,以维护主键与分桶编号的映射关系。具体来说,每 1 亿条主键将额外消耗 1GB 的堆内存。只有当前正在写入的分区才会消耗堆内存,历史分区中的主键不会消耗堆内存。
除堆内存的消耗外,相比其它分桶方式,主键完全包含分区键的动态分桶表不会有明显的性能损失。
跨分区更新
对于主键不完全包含分区键的动态分桶表,Paimon 无法根据主键确定该数据属于哪个分区的哪个分桶,因此需要使用 rocksdb 维护主键与分区以及分桶编号的映射关系。相比固定分桶而言,数据量较大的表可能产生明显的性能损失。另外,因为作业启动时需要将映射关系全量加载至 rocksdb 中,作业的启动速度也会变慢。
此外,数据合并机制也影响动态分桶下的跨分区更新行为。具体来说:
deduplicate:数据将会从老分区删除,并插入新分区。aggregation与partial-update:数据将会直接在老分区中更新,无视新数据的分区键。first-row:如果相同主键的数据已经存在,则新数据将被直接丢弃。
数据可见性
与固定分桶表一致,详见数据可见性。
Deletion Vectors
创建 Paimon 表时,在表参数中设置 'deletion-vectors.enabled' = 'true',即可启用 Deletion Vectors。
在小文件合并的过程中额外生成 Deletion Vectors,可以提升 Paimon 主键表的查询性能,适用于查询性能敏感,或读多写少的场景。
数据合并机制
如果将多条具有相同主键的数据写入 Paimon 主键表,Paimon 将会根据表参数中设置的 merge-engine 参数对这些数据进行合并。该参数的取值有 deduplicate(默认值)、partial-update、aggregation 与 first-row。
处理乱序数据
默认情况下,Paimon 会按照数据的输入顺序确定合并的顺序,最后写入 Paimon 的数据会被认为是最新数据。如果输入数据流存在乱序数据,也可以通过在表参数中指定 'sequence.field' = '<column-name>',具有相同主键的数据将按 <column-name> 这一列的值从小到大进行合并。
可以作为 sequence field 的数据类型有:TINYINT、SMALLINT、INTEGER、BIGINT、TIMESTAMP、TIMESTAMP_LTZ。
deduplicate(默认)
设置 'merge-engine' = 'deduplicate' 后,对于多条具有相同主键的数据,Paimon 主键表仅会保留最新一条数据,并丢弃其它具有相同主键的数据。如果最新数据是一条 delete 消息,所有具有该主键的数据都会被丢弃。
考虑以下创建 Paimon 表的 Flink SQL 语句:
CREATE TABLE T (
k INT,
v1 DOUBLE,
v2 STRING,
PRIMARY KEY (k) NOT ENFORCED
) WITH (
'merge-engine' = 'deduplicate' -- deduplicate 是默认值,可以不设置
);依次写入以下数据:
+I(1, 2.0, 'apple')+I(1, 4.0, 'banana')+I(1, 8.0, 'cherry')
则 SELECT * FROM T WHERE k = 1 将查询到 (1, 8.0, 'cherry') 这条数据。
依次写入以下数据:
+I(1, 2.0, 'apple')+I(1, 4.0, 'banana')-D(1, 4.0, 'banana')
则 SELECT * FROM T WHERE k = 1 将查不到任何数据。
aggregation
设置 'merge-engine' = 'aggregation' 后,对于多条具有相同主键的数据,Paimon 主键表将会根据您指定的聚合函数进行聚合。
对于不属于主键的每一列,都需要通过 fields.<field-name>.aggregate-function 指定一个聚合函数,否则该列将默认使用 last_non_null_value 聚合函数。
例如,考虑以下创建 Paimon 表的 Flink SQL 语句:
CREATE TABLE T (
product_id BIGINT,
price DOUBLE,
sales BIGINT,
PRIMARY KEY (product_id) NOT ENFORCED
) WITH (
'merge-engine' = 'aggregation',
'fields.price.aggregate-function' = 'max',
'fields.sales.aggregate-function' = 'sum'
);price 列将会使用 max 函数进行聚合,而 sales 列将会使用 sum 函数进行聚合。依次写入以下数据:
+I(1, 23.0, 15)+I(1, 30.2, 20)
则 SELECT * FROM T WHERE product_id = 1 将查询到 (1, 30.2, 35) 这条数据。
支持的聚合函数
sum:求和函数。支持的数据类型为 DECIMAL、TINYINT、SMALLINT、INTEGER、BIGINT、FLOAT、DOUBLE。product:求乘积函数。支持的数据类型为 DECIMAL、TINYINT、SMALLINT、INTEGER、BIGINT、FLOAT、DOUBLE。max:求最大值的函数。支持的数据类型为 CHAR、VARCHAR、DECIMAL、TINYINT、SMALLINT、INTEGER、BIGINT、FLOAT、DOUBLE、DATE、TIME、TIMESTAMP、TIMESTAMP_LTZ。min:求最小值的函数。支持的数据类型为 CHAR、VARCHAR、DECIMAL、TINYINT、SMALLINT、INTEGER、BIGINT、FLOAT、DOUBLE、DATE、TIME、TIMESTAMP、TIMESTAMP_LTZ。first_value:返回第一次输入的值,包括 null。支持所有数据类型。first_not_null_value:返回第一次输入的非 null 值。支持所有数据类型。last_value:返回最新输入的值,包括 null。支持所有数据类型。last_non_null_value(默认):返回最新输入的非 null 值。支持所有数据类型。listagg:将输入的字符串依次用英文逗号连接。例如,输入的字符串为apple、banana、cherry,将返回apple,banana,cherry。支持 STRING 类型。bool_and:对所有输入的值求 and。支持 BOOLEAN 类型。bool_or:对所有输入的值求 or。支持 BOOLEAN 类型。rbm32:合并多个 32 位 RoaringBitmap。支持 VARBINARY 类型。rbm64:合并多个 64 位 RoaringBitmap。支持 VARBINARY 类型。nested_update:将多行聚合到一个 ARRAY 中。通过设置表参数'fields.<field-name>.nested-key' = 'pk0,pk1,...',还可以将 ARRAY 中的数据根据 nested key 进行去重(保留最后一条)。若未设置 nested key,则每条新数据将追加在 ARRAY 末尾。nested_partial_update:与nested_update类似,但会根据 nested key 对 ARRAY 中的数据进行部分列更新。必须设置表参数'fields.<field-name>.nested-key' = 'pk0,pk1,...'。collect:将多个 ARRAY 合并成一个。通过设置表参数'fields.<field-name>.distinct' = 'true',可以对 ARRAY 中的数据进行去重。merge_map:将多个 MAP 合并成一个。相同 key 的数据将保留最后一个 value。
上述聚合函数中,只有 sum、product、last_value、last_non_null_value、nested_update、collect、merge_map 支持回撤消息(update_before 与 delete 消息)。可以设置 'fields.<field-name>.ignore-retract' = 'true' 使对应列忽略回撤消息。
聚合函数使用举例
例如,使用nested_update聚合函数合并多个子订单记录:
-- 主订单表
CREATE TABLE orders (
order_id BIGINT PRIMARY KEY NOT ENFORCED,
user_name STRING,
address STRING
);
-- 子订单表
CREATE TABLE sub_orders (
order_id BIGINT,
sub_order_id INT,
product_name STRING,
price BIGINT,
PRIMARY KEY (order_id, sub_order_id) NOT ENFORCED
);
-- 宽表
-- 将同一主订单的所有子订单聚合到 sub_orders 中
-- 同时,sub_orders 中的结果将根据 sub_order_id 进行去重
CREATE TABLE order_wide (
order_id BIGINT PRIMARY KEY NOT ENFORCED,
user_name STRING,
address STRING,
sub_orders ARRAY<ROW<sub_order_id BIGINT, product_name STRING, price BIGINT>>
) WITH (
'merge-engine' = 'aggregation',
'fields.sub_orders.aggregate-function' = 'nested_update',
'fields.sub_orders.nested-key' = 'sub_order_id'
);
INSERT INTO order_wide
SELECT
order_id,
user_name,
address,
CAST (NULL AS ARRAY<ROW<sub_order_id BIGINT, product_name STRING, price BIGINT>>)
FROM orders
UNION ALL
SELECT
order_id,
CAST (NULL AS STRING),
CAST (NULL AS STRING),
ARRAY[ROW(sub_order_id, product_name, price)]
FROM sub_orders;假设主订单表中的数据为:
(1, 'Alice', 'add1'),
(2, 'Bob', 'add2')
子订单表中的数据为:
(1, 11, 'apple', 10),
(1, 12, 'banana', 20),
(2, 21, 'cherry', 30),
(2, 22, 'peach', 40),
(2, 21, 'cherry', 50)
那么宽表的最终数据为:
(1, 'Alice', 'add1', [(11, 'apple', 10), (12, 'banana', 20)])
(2, 'Bob', 'add2', [(21, 'cherry', 50), (22, 'peach', 40)])
partial-update
设置 'merge-engine' = 'partial-update' 后,可以通过多条消息对数据进行逐步更新,并最终得到完整的数据。具体来说,具有相同主键的新数据将会覆盖原来的数据,但新数据中值为 null 的列不会覆盖已有值。
partial-update 无法处理 delete 与 update_before 消息。设置 'ignore-delete' = 'true' 可忽略这两类消息。
例如,考虑以下创建 Paimon 表的 Flink SQL 语句:
CREATE TABLE T (
k INT,
v1 DOUBLE,
v2 BIGINT,
v3 STRING,
PRIMARY KEY (k) NOT ENFORCED
) WITH (
'merge-engine' = 'partial-update'
);依次写入以下数据:
+I(1, 23.0, 10, NULL)+I(1, NULL, NULL, 'This is a book')+I(1, 25.2, NULL, NULL)
则 SELECT * FROM T WHERE k = 1 将查询到 (1, 25.2, 10, 'This is a book') 这条数据。
为不同列分别指定合并顺序
在 partial-update 合并机制中,除了 sequence field 之外,您也可以通过 sequence group 为不同列分别指定合并顺序。该功能可用于在打宽场景下处理乱序数据,为来自不同源表的列分别指定合并顺序。
例如,考虑以下创建 Paimon 表的 Flink SQL 语句。
CREATE TABLE T (
k INT,
a STRING,
b STRING,
g_1 INT,
c STRING,
d STRING,
g_2 INT,
PRIMARY KEY (k) NOT ENFORCED
) WITH (
'merge-engine' = 'partial-update',
'fields.g_1.sequence-group' = 'a,b',
'fields.g_2.sequence-group' = 'c,d'
);a、b 两列以 g_1 列的值作为合并顺序依据(值越大表示数据越新),c、d 两列以 g_2 列的值作为合并顺序依据。
同时进行数据的打宽与聚合
在 partial-update 合并机制中,可以在表参数中设置 fields.<field-name>.aggregate-function,为 <field-name> 列指定聚合函数,对该列的值进行聚合。
<field-name>列需要属于某个 sequence group。aggregation 合并机制支持的聚合函数均可使用。
例如,考虑以下创建 Paimon 表的 Flink SQL 语句。
CREATE TABLE T (
k INT,
a STRING,
b INT,
g_1 INT,
c STRING,
d INT,
g_2 INT,
PRIMARY KEY (k) NOT ENFORCED
) WITH (
'merge-engine' = 'partial-update',
'fields.g_1.sequence-group' = 'a,b',
'fields.b.aggregate-function' = 'max',
'fields.g_2.sequence-group' = 'c,d',
'fields.d.aggregate-function' = 'sum'
);a、b 两列以 g_1 列的值作为合并顺序依据(值越大表示数据越新),其中 a 列保留最新的非 null 值(未指定,默认使用last_non_null_value聚合函数),b 列保留输入的最大值。c、d 两列以 g_2 列的值作为合并顺序依据,其中 c 列保留最新的非 null 值,d 列求出输入数据的和。
first-row
设置 'merge-engine' = 'first-row' 后,Paimon 只会保留相同主键数据中的第一条。与 deduplicate 合并机制相比,first-row 只会产生 insert 类型的变更数据,且变更数据的产出效率更高。
使用限制:
first-row 无法处理 delete 与 update_before 消息。设置
'ignore-delete' = 'true'可忽略这两类消息。first-row 合并机制不支持指定 sequence field。
例如,考虑以下创建 Paimon 表的 Flink SQL 语句。
CREATE TABLE T (
k INT,
v1 DOUBLE,
v2 STRING,
PRIMARY KEY (k) NOT ENFORCED
) WITH (
'merge-engine' = 'first-row'
);依次写入以下数据:
+I(1, 2.0, 'apple')+I(1, 4.0, 'banana')+I(1, 8.0, 'cherry')
则 SELECT * FROM T WHERE k = 1 将查询到 (1, 2.0, 'apple') 这条数据。
变更数据产生机制
通过设置表参数中的 changelog-producer 参数,Paimon 将会以不同的方式产生变更数据,供下游进行流式读取。该参数常用的取值有 none(默认值)、input 与 lookup。
none(默认)
适用场景:下游不进行流式读取,或下游不关心完整的变更数据,只关心增量或最新状态的场景。
设置 'changelog-producer' = 'none' 后,下游流读到的是每个 Paimon snapshot 中包含的增量数据,且增量数据中可能多次包含同一主键的值。具体来说:
对于
'merge-engine' = 'deduplicate',下游将读到完整的一行。对于
'merge-engine' = 'partial-update',下游只能读到被更新的列,其它列的值均为 null。对于
'merge-engine' = 'aggregation',下游只能读到被更新的列的增量值,其它列的值均为 null。
使用 'changelog-producer' = 'none' 进行流式读取时,建议同时设置 'scan.remove-normalize' = 'true',以清除下游 Flink 作业的 normalize 算子。normalize 算子通过 Flink 状态将变更数据补充完整,计算开销较高。对于不关心完整变更数据的场景,无需该算子;对于关心完整变更数据的场景,建议配置 'changelog-producer' = 'input' 或 'changelog-producer' = 'lookup'。
input
适用场景: 'merge-engine' = 'deduplicate',输入数据流本身是完整的变更数据时(例如数据库的 binlog)且下游关心完整的变更数据的场景。
设置 'changelog-producer' = 'input' 后,Paimon 主键表会直接将输入的消息作为变更数据传递给下游消费者。
由于 input 机制不涉及额外的计算,因此它的效率比 lookup 更高。
lookup
适用场景:下游关心完整的变更数据的所有场景。
设置 'changelog-producer' = 'lookup' 后,Paimon 主键表会通过批量点查的方式,在优化作业每次 commit 之前触发小文件合并,并利用小文件合并的结果产生完整的变更数据。无论输入数据流是否为完整的变更数据,都可以使用这一变更数据产生机制。
由于 lookup 机制涉及额外的计算,因此它的效率比 input 更低,但适用场景更加广泛。
减少无效变更数据的数量:
默认情况下,即使更新后的数据与更新之前相同,Paimon 仍然会产生变更数据。如果您希望消除此类无效的变更数据,可以在表参数中设置 'changelog-producer.row-deduplicate' = 'true'。该参数仅对 lookup 机制有效。
由于设置该参数后需要引入额外的计算对比更新前后的值,因此建议仅在无效变更数据较多的情况下使用该参数。
存储优化模式
DLF 提供了多种存储优化模式,方便用户根据不同场景,平衡数据的可见性与资源的消耗。此参数在 DLF 控制台配置,详情请参见查看与配置存储优化策略。DLF支持以下存储优化模式:
动态资源 - 资源延时均衡(默认):对于延迟分桶表,延时一般为 1 ~ 3 分钟,但出现资源调整或分桶数调整时,延迟可能为 5 ~ 10 分钟。对于其它启用了 deletion vectors 的表,延时一般为 1 ~ 3 分钟。
动态资源 - 延时优先:资源消耗一般是均衡模式的 2 倍,通过较多资源提升优化作业的处理速度,并尽可能减少资源调整的情况。
动态资源 - 资源优先:通过数据反压驱动优化作业,资源消耗一般是均衡模式的 1/3 ~ 1/2,但延时可能达到 30 分钟。
固定资源:优化作业将持续运行,不再因为流量变化或写入停止而调整作业。用户可以设置作业使用的资源量、小文件合并间隔,以及延迟分桶表的分桶数。建议仅在动态资源模式延时较高的情况下用于追数据,不建议日常使用。
说明小文件合并间隔指的是优化作业的 commit 间隔。设置后,优化作业仍将持续运行,不会转为定时调度。
设置了延迟分桶表的分桶数后,新分区将直接采用该分桶数,且不进行调整。