本文为您介绍如何使用实时数仓Hologres连接器。
背景信息
实时数仓Hologres是一站式实时数据仓库引擎,支持海量数据实时写入、实时更新、实时分析,支持标准SQL(兼容PostgreSQL协议),支持PB级数据多维分析(OLAP)与即席分析(Ad Hoc),支持高并发低延迟的在线数据服务(Serving),与MaxCompute、Flink、DataWorks深度融合,提供离在线一体化全栈数仓解决方案。Hologres连接器支持的信息如下。
类别 | 详情 |
支持类型 | 源表、维表和结果表 |
运行模式 | 流模式和批模式 |
数据格式 | 暂不支持 |
特有监控指标 |
|
API种类 | Datastream和SQL |
是否支持更新或删除结果表数据 | 是 |
特色功能
功能 | 详情 |
支持以非Binlog或Binlog方式读取Hologres数据,兼容CDC与非CDC模式。 | |
支持全量、增量及全增量一体化消费。 | |
支持忽略新数据、整行替换或仅更新指定字段。 | |
只更新修改部分列的数据,而非整行更新。 | |
支持消费物理分区表Binlog,单个作业即可监听全部分区及新增分区。支持消费逻辑分区表Binlog。 | |
支持向分区表的父表写入数据,并可自动创建对应的子分区表。 | |
实时同步单表或整库 | 支持实时同步单表或整库级别数据 ,具备以下关键特性:
|
前提条件
已创建Hologres表,详情请参见创建Hologres表。
使用限制
通用:
Hologres连接器不支持访问Hologres外部表。关于Hologres外部表详情请参见基于HoloWeb创建MaxCompute外部表。
暂不支持实时消费TIMESTAMP类型的数据,因此创建Hologres表时,请使用TIMESTAMPTZ类型。
连接器目前的已知缺陷以及版本功能发布记录详见Hologres Connector Release Note。
源表独有:
Flink默认以批模式读取Hologres源表数据,即只扫描一次Hologres全表,扫描结束,消费结束,新到Hologres源表的数据不会被读取。
支持实时消费Hologres数据,详情请参见实时计算Flink版实时消费Hologres。
VVR 8.0以下版本Hologres CDC模式暂不支持定义Watermark。如果您需要进行窗口聚合,您可以采用非窗口聚合的方式,详情请参见MySQL/Hologres CDC源表不支持窗口函数,如何实现类似每分钟聚合统计的需求?。
维表独有:
维表建议使用主键作为Join条件,对于此类主键点查的维表,创建Hologres表时建议选择行存模式,列存模式对于点查场景性能开销较大。选择行存模式创建维表时必须设置主键,并且将主键设置为Clustering Key才可以工作。详情请参见CREATE TABLE。
如果业务需要,无法使用主键作为Join条件,对于此类非主键点查的维表(即一对多的查询),创建Hologres表时建议选择列存模式,并合理设置分布键Distribution Key以及Event Time Column(Segment Key)以优化查询性能,详情请参见表存储格式:列存、行存、行列共存。
注意事项
Hologres 与 VVR 版本消费模式兼容性及限制表
源表
VVR版本
Hologres版本
默认/推荐参数值
实际消费模式
备注
< 6.0.7
< 2.0
holohub
holohub(默认)
不支持jdbc。
≥ 6.0.7
< 2.0
自定义
holohub(默认)
推荐配置 'sdkmode' = 'jdbc'。
6.0.7 ~ 8.0.4
≥ 2.0
jdbc(自动切换,无需配置)
jdbc(强制)
Hologres 2.0及以上版本下线了holohub服务,自动切换为jdbc,可能存在权限不足问题。权限配置详情请参见权限问题。
≥ 8.0.5
≥ 2.1
jdbc(自动切换,无需配置)
jdbc(强制)
无权限问题。Hologres 2.1.27及以上版本,切换为jdbc_fixed。
≥ 11.1
任何版本
AUTO(默认值)
根据Hologres版本自动选择
2.1.27及以上版本,选择jdbc模式,同时默认启用轻量级连接,即connection.fixed.enabled 参数默认设置为 true。
2.1.0~2.1.26版本,选择jdbc模式。
2.0 及以下版本,选择holohub模式。
重要VVR 11.1及以上版本默认消费Binlog数据,请确认已经开启Binlog,否则可能会导致报错。
结果表
VVR版本
Hologres版本
rpc模式是否受影响
rpc实际消费模式
推荐/默认参数值
备注
6.0.4 ~ 8.0.2
< 2.0
否
rpc
自定义
/
6.0.4 ~ 8.0.2
≥ 2.0
是
jdbc_fixed(自动切换)
自定义
可以通过设置'jdbcWriteBatchSize'='1'防止去重。
≥ 8.0.3
任何版本
是
jdbc_fixed(自动切换)
自定义
如果配置为rpc模式,将自动切换该参数值为jdbc_fixed且设置'jdbcWriteBatchSize'='1'防止去重。
≥ 8.0.5
任何版本
是
jdbc_fixed(自动切换)
自定义
如果配置为rpc模式,将自动切换该参数值为jdbc_fixed且设置'deduplication.enabled'='false'防止去重。
重要由于Hologres 2.0及以上版本下线了rpc服务,此时如果您将该参数配置为了rpc,Flink系统自动将参数值切换为jdbc_fixed。但是如果您配置为其他值,Flink系统将采用您配置的参数值。
VVR 11.1及以上版本已经取消了rpc模式,推荐采用jdbc模式连接。
维表
VVR版本
Hologres版本
rpc模式是否受影响
rpc实际消费模式
推荐/默认参数值
备注
6.0.4 ~ 8.0.2
< 2.0
否
rpc
自定义
/
6.0.4 ~ 8.0.2
≥ 2.0
是
jdbc_fixed(自动切换)
自定义
如果Hologres实例为2.0及以上版本,由于Hologres 2.0及以上版本下线了rpc服务,此时如果您将该参数配置为了rpc,Flink系统自动将参数值切换为jdbc_fixed。但是如果您配置为其他值,Flink系统将采用您配置的参数值。
≥ 8.0.3
任何版本
是
jdbc_fixed(自动切换)
自定义
≥ 8.0.5
任何版本
是
jdbc_fixed(自动切换)
自定义
重要VVR 11.1及以上版本已经取消了rpc模式,默认采用jdbc模式连接。
JDBC模式Binlog源表支持读取JSONB类型,需要数据库级别开启GUC。
--db级别开启GUC,仅superuser可以执行,每个db只需要设置一次。 alter database <db_name> set hg_experimental_enable_binlog_jsonb = on;
对于数据更新频繁的表,建议采用行存表开启Binlog;如果该表还同时用于OLAP等分析查询,建议使用行列共存的存储格式。
UPDATE操作会生成两条连续的Binlog记录,记录连续且旧数据(update_before)记录在前,新数据(update_after)记录在后。
不建议对Binlog源表进行TRUNCATE或其他重建表操作,详情请参见常见问题。
请确保Flink与Hologres 的
DECIMAL
类型精度一致,以避免错误,详情请参见常见问题。建议Flink作业并发数和Hologres Table的Shard个数保持一致。
# 您可以在Hologres控制台上,使用以下语句查看Table的Shard数,其中tablename为您的业务表名称。 select tg.property_value from hologres.hg_table_properties tb join hologres.hg_table_group_properties tg on tb.property_value = tg.tablegroup_name where tb.property_key = 'table_group' and tg.property_key = 'shard_count' and table_name = '<tablename>';
开启Binlog
未建表
实时消费功能默认关闭,因此在Hologres控制台上创建表的DDL时,需要设置binlog.level
和binlog.ttl
参数,示例如下。
begin;
create table test_table(
id int primary key,
title text not null,
body text);
call set_table_property('test_table', 'orientation', 'row');--创建行存表test_message_src
call set_table_property('test_table', 'clustering_key', 'id');--在id列建立聚簇索引
call set_table_property('test_table', 'binlog.level', 'replica');--设置表属性开启Binlog功能
call set_table_property('test_table', 'binlog.ttl', '86400');--binlog.ttl,Binlog的TTL,单位为秒
commit;
已建表
在Hologres控制台上,可以使用以下语句对已有表开启Binlog并设置Binlog TTL时间。table_name
为开启Binlog的表名称。
-- 设置表属性开启Binlog功能
begin;
call set_table_property('<table_name>', 'binlog.level', 'replica');
commit;
-- 设置表属性,配置Binlog TTL时间,单位秒
begin;
call set_table_property('<table_name>', 'binlog.ttl', '2592000');
commit;
WITH参数
从VVR 11开始,为了更好地支持Hologres,With参数有所调整,部分参数可能已经更名或进行移除。请根据您的版本查阅对应的参数说明。
类型映射
Flink与Hologres的数据类型映射请参见Flink与Hologres的数据类型映射。
使用示例
源表示例
Binlog Source表
CDC模式
该模式下Source消费的Binlog数据,将根据hg_binlog_event_type
自动为每行数据设置准确的Flink RowKind类型,无需显式声明它们,例如,INSERT、DELETE、UPDATE_BEFORE和UPDATE_AFTER类型,这样就能完成表的数据的镜像同步,类似MySQL或Postgres的CDC功能。源表DDL代码示例如下。
VVR 11+
CREATE TEMPORARY TABLE test_message_src_binlog_table(
id INTEGER,
title VARCHAR,
body VARCHAR
) WITH (
'connector'='hologres',
'dbname'='<yourDbname>',
'tablename'='<yourTablename>',
'username'='<yourAccessID>',
'password'='<yourAccessSecret>',
'endpoint'='<yourEndpoint>',
'source.binlog.change-log-mode'='ALL', --读取所有ChangeLog类型,包括INSERT、DELETE、UPDATE_BEFORE、UPDATE_AFTER。
'retry-count'='10', --读取Binlog数据出错后的重试次数。
'retry-sleep-step-ms'='5000', --重试累加等待时间。第一次重试等待5秒,第二次等待10秒,依此类推。。
'source.binlog.batch-size'='512' --批量读取Binlog的数据行数。
);
VVR 8+
CREATE TABLE test_message_src_binlog_table(
id INTEGER,
title VARCHAR,
body VARCHAR
) WITH (
'connector'='hologres',
'dbname'='<yourDbname>',
'tablename'='<yourTablename>',
'username'='<yourAccessID>',
'password'='<yourAccessSecret>',
'endpoint'='<yourEndpoint>',
'binlog' = 'true',
'cdcMode' = 'true',
'sdkMode'='jdbc',
'binlogMaxRetryTimes' = '10', --读取Binlog数据出错后的重试次数。
'binlogRetryIntervalMs' = '500', --读取Binlog数据出错后的重试时间间隔
'binlogBatchReadSize' = '100' --批量读取Binlog的数据行数。
);
非CDC模式
该模式下Source消费的Binlog数据是作为普通的Flink数据传递给下游节点的,即所有数据都是INSERT类型的数据,您可以根据业务情况选择如何处理特定hg_binlog_event_type
类型的数据。源表DDL代码示例如下。
VVR 11+
CREATE TEMPORARY TABLE test_message_src_binlog_table(
id INTEGER,
title VARCHAR,
body VARCHAR
) WITH (
'connector'='hologres',
'dbname'='<yourDbname>',
'tablename'='<yourTablename>',
'username'='<yourAccessID>',
'password'='<yourAccessSecret>',
'endpoint'='<yourEndpoint>',
'source.binlog.change-log-mode'='ALL_AS_APPEND_ONLY', --所有ChangeLog类型都当作INSERT处理。
'retry-count'='10', --读取Binlog数据出错后的重试次数。
'retry-sleep-step-ms'='5000', --重试累加等待时间。第一次重试等待5秒,第二次等待10秒,依此类推。。
'source.binlog.batch-size'='512' --批量读取Binlog的数据行数。
);
VVR 8+
CREATE TABLE test_message_src_binlog_table(
hg_binlog_lsn BIGINT,
hg_binlog_event_type BIGINT,
hg_binlog_timestamp_us BIGINT,
id INTEGER,
title VARCHAR,
body VARCHAR
) WITH (
'connector'='hologres',
'dbname'='<yourDbname>',
'tablename'='<yourTablename>',
'username'='<yourAccessID>',
'password'='<yourAccessSecret>',
'endpoint'='<yourEndpoint>',
'binlog' = 'true',
'binlogMaxRetryTimes' = '10', --读取Binlog数据出错后的重试次数。
'binlogRetryIntervalMs' = '500', --读取Binlog数据出错后的重试时间间隔
'binlogBatchReadSize' = '100' --批量读取Binlog的数据行数。
);
非Binlog Source表
VVR 11.1及以上版本默认消费Binlog数据,请参见Binlog Source表。
CREATE TEMPORARY TABLE hologres_source (
name varchar,
age BIGINT,
birthday BIGINT
) WITH (
'connector'='hologres',
'dbname'='<yourDbname>',
'tablename'='<yourTablename>',
'username' = '${secret_values.ak_id}',
'password' = '${secret_values.ak_secret}',
'endpoint'='<yourEndpoint>',
'sdkmode' = 'jdbc'
);
结果表示例
CREATE TEMPORARY TABLE datagen_source(
name varchar,
age BIGINT,
birthday BIGINT
) WITH (
'connector'='datagen'
);
CREATE TEMPORARY TABLE hologres_sink (
name varchar,
age BIGINT,
birthday BIGINT
) WITH (
'connector'='hologres',
'dbname'='<yourDbname>',
'tablename'='<yourTablename>',
'username' = '${secret_values.ak_id}',
'password' = '${secret_values.ak_secret}',
'endpoint'='<yourEndpoint>'
);
INSERT INTO hologres_sink SELECT * from datagen_source;
维表示例
CREATE TEMPORARY TABLE datagen_source (
a INT,
b BIGINT,
c STRING,
proctime AS PROCTIME()
) WITH (
'connector' = 'datagen'
);
CREATE TEMPORARY TABLE hologres_dim (
a INT,
b VARCHAR,
c VARCHAR
) WITH (
'connector' = 'hologres',
...
);
CREATE TEMPORARY TABLE blackhole_sink (
a INT,
b STRING
) WITH (
'connector' = 'blackhole'
);
INSERT INTO blackhole_sink SELECT T.a,H.b
FROM datagen_source AS T JOIN hologres_dim FOR SYSTEM_TIME AS OF T.proctime AS H ON T.a = H.a;
特色功能详解
全增量一体化消费
适用场景
仅适用于目标表有主键的场景,推荐在CDC模式下使用的全增量Hologres源表。
Hologres支持按需开启Binlog,可以将已有历史数据的表打开Binlog。
代码示例
VVR 11+
CREATE TABLE test_message_src_binlog_table(
hg_binlog_lsn BIGINT,
hg_binlog_event_type BIGINT,
hg_binlog_timestamp_us BIGINT,
id INTEGER,
title VARCHAR,
body VARCHAR
) WITH (
'connector'='hologres',
'dbname'='<yourDbname>',
'tablename'='<yourTablename>',
'username'='<yourAccessID>',
'password'='<yourAccessSecret>',
'endpoint'='<yourEndpoint>',
'source.binlog.startup-mode' = 'INITIAL', --先读取历史全量数据,再增量消费Binlog。
'retry-count'='10', --读取Binlog数据出错后的重试次数。
'retry-sleep-step-ms'='5000', --重试累加等待时间。第一次重试等待5秒,第二次等待10秒,依此类推。。
'source.binlog.batch-size'='512' --批量读取Binlog的数据行数。
);
source.binlog.startup-mode
设置为INITIAL
,可以先全量消费数据,再读取Binlog开始增量消费。如果设置了
startTime
参数,或者在启动界面选择了启动时间,则binlogStartUpMode
会强制使用timestamp
模式消费,其他消费模式则不生效,startTime
参数优先级更高。
VVR 8+
CREATE TABLE test_message_src_binlog_table(
hg_binlog_lsn BIGINT,
hg_binlog_event_type BIGINT,
hg_binlog_timestamp_us BIGINT,
id INTEGER,
title VARCHAR,
body VARCHAR
) WITH (
'connector'='hologres',
'dbname'='<yourDbname>',
'tablename'='<yourTablename>',
'username'='<yourAccessID>',
'password'='<yourAccessSecret>',
'endpoint'='<yourEndpoint>',
'binlog' = 'true',
'cdcMode' = 'true',
'binlogStartUpMode' = 'initial', --先读取历史全量数据,再增量消费Binlog。
'binlogMaxRetryTimes' = '10', --读取Binlog数据出错后的重试次数。
'binlogRetryIntervalMs' = '500', --读取Binlog数据出错后的重试时间间隔
'binlogBatchReadSize' = '100' --批量读取Binlog的数据行数。
);
binlogStartUpMode
设置为initial
,可以先全量消费数据,再读取Binlog开始增量消费。如果设置了
startTime
参数,或者在启动界面选择了启动时间,则binlogStartUpMode
会强制使用timestamp
模式消费,其他消费模式则不生效,startTime
参数优先级更高。
主键冲突处理策略
写入Hologres时若存在重复主键的数据,连接器提供了三种处理策略。
VVR 11+
通过指定sink.on-conflict-action
参数值,来实现不同的处理策略。
sink.on-conflict-action参数值 | 含义 |
INSERT_OR_IGNORE | 保留首次出现的数据,忽略后续重复数据。 |
INSERT_OR_REPLACE | 后续数据整行替换已有数据。 |
INSERT_OR_UPDATE(默认值) | 只更新sink中已提供的字段,其他字段保持不变。 |
VVR 8+
通过指定mutatetype
参数值,来实现不同的处理策略。
mutatetype参数值 | 含义 |
insertorignore(默认值) | 保留首次出现的数据,忽略后续重复数据。 |
insertorreplace | 后续数据整行替换已有数据。 |
insertorupdate | 只更新sink中已提供的字段,其他字段保持不变。 |
假设表有字段 a、b、c、d,其中 a 是主键。若结果表字段仅提供 a 和 b,则配置INSERT_OR_UPDATE
时,仅更新 b 字段,c 和 d 保持不变。
结果表字段数量可以少于Hologres物理表,但缺失字段必须允许为空,否则将会导致写入失败。
分区表写入
默认情况下,Hologres Sink 仅支持向单表导入数据。如需导入至分区表的父表 ,需开启以下配置:
VVR 11+
sink.create-missing-partition
设置为true
,若未创建子分区表,可以自动创建。
VVR 11.1及以上版本默认支持写入分区表,自动将数据路由到对应的子分区表。
tablename
参数填写父表的名称。若未提前创建子表且未设置
sink.create-missing-partition=true
,将导致写入失败。
VVR 8+
partitionRouter
设置为true
,自动将数据路由到对应的子分区表。createparttable
设置为true
,若未创建子分区表,可以自动创建。
tablename
参数填写父表的名称。若未提前创建子表且未设置
createparttable=true
,将导致写入失败。
多流写入的宽表Merge与局部更新
在将多个数据流写入同一张 Hologres 宽表的场景中,系统支持对主键相同的数据进行自动 Merge(合并),并可选择性地仅更新发生变化的部分列,而非整行替换,从而提升写入效率与数据一致性。
使用限制
宽表必须有主键。
每个数据流的数据都必须包含完整的主键字段。
列存模式的宽表Merge场景在高RPS的情况下,CPU使用率会偏高,建议关闭表中字段的Dictionary Encoding功能。
使用示例
假设有两个Flink数据流,一个数据流中包含a、b和c字段,另一个数据流中包含a、d和e字段,Hologres宽表WIDE_TABLE包含a、b、c、d和e字段,其中a字段为主键。
VVR 11+
// 已经定义的source1和source2
CREATE TEMPORARY TABLE hologres_sink ( -- 声明a,b,c,d,e五个字段
a BIGINT,
b STRING,
c STRING,
d STRING,
e STRING,
primary key(a) not enforced
) WITH (
'connector'='hologres',
'dbname'='<yourDbname>',
'tablename'='<yourWideTablename>', -- hologres宽表,包含a,b,c,d,e五个字段
'username' = '${secret_values.ak_id}',
'password' = '${secret_values.ak_secret}',
'endpoint'='<yourEndpoint>',
'sink.on-conflict-action'='INSERT_OR_UPDATE', -- 根据主键更新数据部分列
'sink.delete-strategy'='IGNORE_DELETE', -- 撤回消息的处理策略,IGNORE_DELETE适用于仅需插入或更新数据,而无需删除数据的场景。
'sink.partial-insert.enabled'='true' -- 开启部分列更新参数,将INSERT语句中定义的字段下推给连接器,从而可以只对声明的字段进行更新或插入
);
BEGIN STATEMENT SET;
INSERT INTO hologres_sink(a,b,c) select * from source1; -- 声明只插入a,b,c三个字段
INSERT INTO hologres_sink(a,d,e) select * from source2; -- 声明只插入a,d,e三个字段
END;
VVR 8+
// 已经定义的source1和source2
CREATE TEMPORARY TABLE hologres_sink ( -- 声明a,b,c,d,e五个字段
a BIGINT,
b STRING,
c STRING,
d STRING,
e STRING,
primary key(a) not enforced
) WITH (
'connector'='hologres',
'dbname'='<yourDbname>',
'tablename'='<yourWideTablename>', -- hologres宽表,包含a,b,c,d,e五个字段
'username' = '${secret_values.ak_id}',
'password' = '${secret_values.ak_secret}',
'endpoint'='<yourEndpoint>',
'mutatetype'='insertorupdate', -- 根据主键更新数据部分列
'ignoredelete'='true', -- 忽略回撤消息产生的Delete请求
'partial-insert.enabled'='true' -- 开启部分列更新参数,支持仅更新INSERT语句中声明的字段
);
BEGIN STATEMENT SET;
INSERT INTO hologres_sink(a,b,c) select * from source1; -- 声明只插入a,b,c三个字段
INSERT INTO hologres_sink(a,d,e) select * from source2; -- 声明只插入a,d,e三个字段
END;
ignoredelete
设置为true
,忽略回撤消息产生Delete请求。实时计算引擎VVR 8.0.8及以上版本推荐使用sink.delete-strategy
配置局部更新时的各种策略。
消费分区表Binlog(公测)
分区表有助于数据归档与查询优化。Hologres Connector支持消费物理分区表和逻辑分区表的Binlog。关于物理分区表和逻辑分区表的区别参见分区表介绍。
消费物理分区表Binlog
Hologres Connector 支持通过单个作业消费分区表 Binlog,并可动态监听新增分区,显著提升实时数据处理效率与易用性。
注意事项
仅实时计算引擎VVR 8.0.11及以上版本,Hologres实例版本大于等于2.1.27,Binlog源表JDBC模式支持消费分区表。
分区名称必须严格由父表名+下划线+分区值组成,即
{parent_table}_{partition_value}
,非此格式的分区可能无法消费到,详情请参见表名生成规则。重要对于DYNAMIC模式,VVR 8.0.11版本不支持带
-
分隔符的分区字段(如YYYY-MM-DD)。自VVR 11.1版本起,可以消费自定义Format格式的分区字段。
写入分区表则不受此格式限制。
Flink中声明Hologres源表时,必须包含Hologres分区表的分区字段。
对于DYNAMIC模式,要求分区表必须开启动态分区管理。并且分区预创建参数
auto_partitioning.num_precreate
必须大于1,否则,在尝试消费最新分区时,作业将会抛出异常。
使用示例
模式类型 | 特点 | 适用场景说明 |
DYNAMIC | 动态分区消费 | 自动监听新增分区,按时间顺序动态推进消费进度。适合实时数据流场景。 |
STATIC | 静态分区消费 | 只消费已存在的分区(或手动指定的分区),不会自动发现新分区。适合固定范围的历史数据处理。 |
DYNAMIC模式
VVR 11+
假设Hologres存在如下的DDL分区表,并且已启用Binlog以及动态分区。
CREATE TABLE "test_message_src1" (
id int,
title text,
body text,
dt text,
PRIMARY KEY (id, dt)
)
PARTITION BY LIST (dt) WITH (
binlog_level = 'replica',
auto_partitioning_enable = 'true', -- 开启动态分区
auto_partitioning_time_unit = 'DAY', -- 以天为时间单元,自动创建的分区名示例:test_message_src1_20250512,test_message_src1_20250513
auto_partitioning_num_precreate = '2' -- 会提前创建两个分区
);
-- 已经存在的分区表,也可以通过ALTER TABLE方式开启动态分区
在Flink中,使用以下SQL声明对分区表test_message_src1
进行DYNAMIC模式消费。
CREATE TEMPORARY TABLE hologres_source
(
id INTEGER,
title VARCHAR,
body VARCHAR,
dt VARCHAR -- hologres分区表的分区字段
)
with (
'connector' = 'hologres',
'dbname' = '<yourDatabase>',
'tablename' = 'test_message_src1', -- 开启了动态分区的父表,
'username' = '<yourUserName>',
'password' = '<yourPassword>',
'endpoint' = '<yourEndpoint>',
'source.binlog.partition-binlog-mode' = 'DYNAMIC', -- 动态监听最新的分区
'source.binlog.startup-mode' = 'initial' -- 先全量消费数据,再读取Binlog开始增量消费。
);
VVR 8.0.11
假设Hologres存在如下的DDL分区表,并且已启用Binlog以及动态分区。
CREATE TABLE "test_message_src1" (
id int,
title text,
body text,
dt text,
PRIMARY KEY (id, dt)
)
PARTITION BY LIST (dt) WITH (
binlog_level = 'replica',
auto_partitioning_enable = 'true', -- 开启动态分区
auto_partitioning_time_unit = 'DAY', -- 以天为时间单元,自动创建的分区名示例:test_message_src1_20241027,test_message_src1_20241028
auto_partitioning_num_precreate = '2' -- 会提前创建两个分区
);
-- 已经存在的分区表,也可以通过ALTER TABLE方式开启动态分区
在Flink中,使用以下SQL声明对分区表test_message_src1
进行DYNAMIC模式消费。
CREATE TEMPORARY TABLE hologres_source
(
id INTEGER,
title VARCHAR,
body VARCHAR,
dt VARCHAR -- hologres分区表的分区字段
)
with (
'connector' = 'hologres',
'dbname' = '<yourDatabase>',
'tablename' = 'test_message_src1', -- 开启了动态分区的父表,
'username' = '<yourUserName>',
'password' = '<yourPassword>',
'endpoint' = '<yourEndpoint>',
'binlog' = 'true',
'partition-binlog.mode' = 'DYNAMIC', -- 动态监听最新的分区
'binlogstartUpMode' = 'initial', -- 先全量消费数据,再读取Binlog开始增量消费。
'sdkMode' = 'jdbc_fixed' -- 使用此模式,避免连接数限制
);
STATIC模式
VVR 11+
假设Hologres存在如下的DDL分区表,并且已启用Binlog。
CREATE TABLE test_message_src2 (
id int,
title text,
body text,
color text,
PRIMARY KEY (id, color)
)
PARTITION BY LIST (color) WITH (
binlog_level = 'replica'
);
create table test_message_src2_red partition of test_message_src2 for values in ('red');
create table test_message_src2_blue partition of test_message_src2 for values in ('blue');
create table test_message_src2_green partition of test_message_src2 for values in ('green');
create table test_message_src2_black partition of test_message_src2 for values in ('black');
在Flink中,使用以下SQL声明对分区表test_message_src2
进行STATIC模式消费。
CREATE TEMPORARY TABLE hologres_source
(
id INTEGER,
title VARCHAR,
body VARCHAR,
color VARCHAR -- hologres分区表的分区字段
)
with (
'connector' = 'hologres',
'dbname' = '<yourDatabase>',
'tablename' = 'test_message_src2', -- 分区表
'username' = '<yourUserName>',
'password' = '<yourPassword>',
'endpoint' = '<yourEndpoint>',
'source.binlog.partition-binlog-mode' = 'STATIC', -- 消费固定的分区
'source.binlog.partition-values-to-read' = 'red,blue,green', -- 仅消费配置的3个分区,不会消费'black'分区;以后新增分区也不会消费.不设置则消费父表所有分区。
'source.binlog.startup-mode' = 'initial' -- 先全量消费数据,再读取Binlog开始增量消费。
);
VVR 8.0.11
假设Hologres存在如下的DDL分区表,并且已启用Binlog。
CREATE TABLE test_message_src2 (
id int,
title text,
body text,
color text,
PRIMARY KEY (id, color)
)
PARTITION BY LIST (color) WITH (
binlog_level = 'replica'
);
create table test_message_src2_red partition of test_message_src2 for values in ('red');
create table test_message_src2_blue partition of test_message_src2 for values in ('blue');
create table test_message_src2_green partition of test_message_src2 for values in ('green');
create table test_message_src2_black partition of test_message_src2 for values in ('black');
在Flink中,使用以下SQL声明对分区表test_message_src2
进行STATIC模式消费。
CREATE TEMPORARY TABLE hologres_source
(
id INTEGER,
title VARCHAR,
body VARCHAR,
color VARCHAR -- hologres分区表的分区字段
)
with (
'connector' = 'hologres',
'dbname' = '<yourDatabase>',
'tablename' = 'test_message_src2', -- 分区表
'username' = '<yourUserName>',
'password' = '<yourPassword>',
'endpoint' = '<yourEndpoint>',
'binlog' = 'true',
'partition-binlog.mode' = 'STATIC', -- 消费固定的分区
'partition-values-to-read' = 'red,blue,green', -- 仅消费配置的3个分区,不会消费'black'分区;以后新增分区也不会消费。不设置则消费父表所有分区。
'binlogstartUpMode' = 'initial', -- 先全量消费数据,再读取Binlog开始增量消费。
'sdkMode' = 'jdbc_fixed' -- 使用此模式,避免连接数限制
);
消费逻辑分区表Binlog
Hologres Connector 支持消费逻辑分区表Binlog,并可通过参数消费指定分区。
注意事项
仅实时计算引擎VVR 11.0.0及以上版本,Hologres实例版本大于等于V3.1,支持消费逻辑分区表指定分区的Binlog。
消费逻辑分区表全部分区的Binlog,与消费普通Holo表没有区别,方法参考源表示例。
使用示例
参数名 | 说明 | 示例 |
source.binlog.logical-partition-filter-column-names | 逻辑分区表消费Binlog指定分区的分区列名。分区列名必须用双引号包裹,多个分区列用逗号分隔,若列名有双引号前面添加双引号进行转义。 | 'source.binlog.logical-partition-filter-column-names'='"Pt","id"' 该分区分区列有两个,分别是Pt和id。 |
source.binlog.logical-partition-filter-column-values | 逻辑分区表消费Binlog指定分区的分区值。每个分区可以由多个分区列的值指定,分区列之间用逗号分隔,分区列的值用双引号包裹,如果分区列的值包含双引号前面需要添加双引号进行转义。多个分区之间用分号分隔 | 'source.binlog.logical-partition-filter-column-values'='"20240910","0";"special""value","9"' 指定消费两个分区。分区列有两个。第一个分区值是(20240910, 0),第二个分区值是(special"value, 9) |
假设Holo中已经建表如下
CREATE TABLE holo_table (
id int not null,
name text,
age numeric(18,4),
"Pt" text,
primary key(id, "Pt")
)
LOGICAL PARTITION BY LIST ("Pt", id)
WITH (
binlog_level ='replica'
);
Flink中消费该表Binlog。
CREATE TEMPORARY TABLE test_src_binlog_table(
id INTEGER,
name VARCHAR,
age decimal(18,4),
`Pt` VARCHAR
) WITH (
'connector'='hologres',
'dbname'='<yourDbname>',
'tablename'='holo_table',
'username'='<yourAccessID>',
'password'='<yourAccessSecret>',
'endpoint'='<yourEndpoint>',
'source.binlog'='true',
'source.binlog.logical-partition-filter-column-names'='"Pt","id"',
'source.binlog.logical-partition-filter-column-values'='<yourPartitionColumnValues>',
'source.binlog.change-log-mode'='ALL', --读取所有ChangeLog类型,包括INSERT、DELETE、UPDATE_BEFORE、UPDATE_AFTER。
'retry-count'='10', --读取Binlog数据出错后的重试次数。
'retry-sleep-step-ms'='5000', --重试累加等待时间。第一次重试等待5秒,第二次等待10秒,依此类推。。
'source.binlog.batch-size'='512' --批量读取Binlog的数据行数。
);
DataStream API
通过DataStream的方式读写数据时,需要使用对应的DataStream连接器连接实时计算Flink版,DataStream连接器设置方法请参见DataStream连接器使用方法。Maven中央库中已经放置了Hologres DataStream连接器。在本地调试时,需要使用相应的Uber JAR,详见本地运行和调试包含连接器的作业。
Hologres源表
Binlog源表
VVR提供了Source的实现类HologresBinlogSource来读取Hologres Binlog数据。以下为构建Hologres Binlog Source的示例。
VVR 8.0.11+
public class Sample {
public static void main(String[] args) throws Exception {
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 初始化读取的表的Schema,需要和Hologres表Schema的字段匹配,可以只定义部分字段。
TableSchema schema = TableSchema.builder()
.field("a", DataTypes.INT())
.field("b", DataTypes.STRING())
.field("c", DataTypes.TIMESTAMP())
.build();
// Hologres的相关参数。
Configuration config = new Configuration();
config.setString(HologresConfigs.ENDPOINT, "yourEndpoint");
config.setString(HologresConfigs.USERNAME, "yourUserName");
config.setString(HologresConfigs.PASSWORD, "yourPassword");
config.setString(HologresConfigs.DATABASE, "yourDatabaseName");
config.setString(HologresConfigs.TABLE, "yourTableName");
config.setString(HologresConfigs.SDK_MODE, "jdbc");
config.setBoolean(HologresBinlogConfigs.OPTIONAL_BINLOG, true);
config.setBoolean(HologresBinlogConfigs.BINLOG_CDC_MODE, true);
// 构建JDBC Options。
JDBCOptions jdbcOptions = JDBCUtils.getJDBCOptions(config);
// 构建Hologres Binlog Source。
long startTimeMs = 0;
HologresBinlogSource source = new HologresBinlogSource(
new HologresConnectionParam(config),
schema,
config,
jdbcOptions,
startTimeMs,
StartupMode.INITIAL,
"",
"",
-1,
Collections.emptySet(),
new ArrayList<>()
);
env.fromSource(source, WatermarkStrategy.noWatermarks(), "Test source").print();
env.execute();
}
}
VVR 8.0.7+
public class Sample {
public static void main(String[] args) throws Exception {
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 初始化读取的表的Schema,需要和Hologres表Schema的字段匹配,可以只定义部分字段。
TableSchema schema = TableSchema.builder()
.field("a", DataTypes.INT())
.field("b", DataTypes.STRING())
.field("c", DataTypes.TIMESTAMP())
.build();
// Hologres的相关参数。
Configuration config = new Configuration();
config.setString(HologresConfigs.ENDPOINT, "yourEndpoint");
config.setString(HologresConfigs.USERNAME, "yourUserName");
config.setString(HologresConfigs.PASSWORD, "yourPassword");
config.setString(HologresConfigs.DATABASE, "yourDatabaseName");
config.setString(HologresConfigs.TABLE, "yourTableName");
config.setString(HologresConfigs.SDK_MODE, "jdbc");
config.setBoolean(HologresBinlogConfigs.OPTIONAL_BINLOG, true);
config.setBoolean(HologresBinlogConfigs.BINLOG_CDC_MODE, true);
// 构建JDBC Options。
JDBCOptions jdbcOptions = JDBCUtils.getJDBCOptions(config);
// 构建Hologres Binlog Source。
long startTimeMs = 0;
HologresBinlogSource source = new HologresBinlogSource(
new HologresConnectionParam(config),
schema,
config,
jdbcOptions,
startTimeMs,
StartupMode.INITIAL,
"",
"",
-1,
Collections.emptySet()
);
env.fromSource(source, WatermarkStrategy.noWatermarks(), "Test source").print();
env.execute();
}
}
VVR 6.0.7+
public class Sample {
public static void main(String[] args) throws Exception {
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 初始化读取的表的Schema,需要和Hologres表Schema的字段匹配,可以只定义部分字段。
TableSchema schema = TableSchema.builder()
.field("a", DataTypes.INT())
.build();
// Hologres的相关参数。
Configuration config = new Configuration();
config.setString(HologresConfigs.ENDPOINT, "yourEndpoint");
config.setString(HologresConfigs.USERNAME, "yourUserName");
config.setString(HologresConfigs.PASSWORD, "yourPassword");
config.setString(HologresConfigs.DATABASE, "yourDatabaseName");
config.setString(HologresConfigs.TABLE, "yourTableName");
config.setString(HologresConfigs.SDK_MODE, "jdbc");
config.setBoolean(HologresBinlogConfigs.OPTIONAL_BINLOG, true);
config.setBoolean(HologresBinlogConfigs.BINLOG_CDC_MODE, true);
// 构建JDBC Options。
JDBCOptions jdbcOptions = JDBCUtils.getJDBCOptions(config);
// 设置或创建默认slotname
config.setString(HologresBinlogConfigs.JDBC_BINLOG_SLOT_NAME, HoloBinlogUtil.getOrCreateDefaultSlotForJDBCBinlog(jdbcOptions));
boolean cdcMode = config.get(HologresBinlogConfigs.BINLOG_CDC_MODE) && config.get(HologresBinlogConfigs.OPTIONAL_BINLOG);
// 构建Binlog Record Converter。
JDBCBinlogRecordConverter recordConverter = new JDBCBinlogRecordConverter(
jdbcOptions.getTable(),
schema,
new HologresConnectionParam(config),
cdcMode,
Collections.emptySet());
// 构建Hologres Binlog Source。
long startTimeMs = 0;
HologresJDBCBinlogSource source = new HologresJDBCBinlogSource(
new HologresConnectionParam(config),
schema,
config,
jdbcOptions,
startTimeMs,
StartupMode.TIMESTAMP,
recordConverter,
"",
-1);
env.fromSource(source, WatermarkStrategy.noWatermarks(), "Test source").print();
env.execute();
}
}
实时计算引擎低于8.0.5版本或Hologres低于2.1版本,请注意是否为SuperUser,或具有Replication Role权限,详情请参见Hologres权限问题。
非Binlog源表
VVR提供了RichInputFormat的实现类HologresBulkreadInputFormat来读取Hologres表数据。以下为构建Hologres Source读取表数据的示例。
public class Sample {
public static void main(String[] args) throws Exception {
// set up the Java DataStream API
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 初始化读取的表的Schema,需要和Hologres表Schema的字段匹配,可以只定义部分字段。
TableSchema schema = TableSchema.builder()
.field("a", DataTypes.INT())
.field("b", DataTypes.STRING())
.field("c", DataTypes.TIMESTAMP())
.build();
// Hologres的相关参数。
Configuration config = new Configuration();
config.setString(HologresConfigs.ENDPOINT, "yourEndpoint");
config.setString(HologresConfigs.USERNAME, "yourUserName");
config.setString(HologresConfigs.PASSWORD, "yourPassword");
config.setString(HologresConfigs.DATABASE, "yourDatabaseName");
config.setString(HologresConfigs.TABLE, "yourTableName");
// 构建JDBC Options。
JDBCOptions jdbcOptions = JDBCUtils.getJDBCOptions(config);
HologresBulkreadInputFormat inputFormat = new HologresBulkreadInputFormat(
new HologresConnectionParam(config),
jdbcOptions,
schema,
"",
-1);
TypeInformation<RowData> typeInfo = InternalTypeInfo.of(schema.toRowDataType().getLogicalType());
env.addSource(new InputFormatSourceFunction<>(inputFormat, typeInfo)).returns(typeInfo).print();
env.execute();
}
}
Maven依赖
Maven中央库中已经放置了Hologres DataStream连接器。
<dependency>
<groupId>com.alibaba.ververica</groupId>
<artifactId>ververica-connector-hologres</artifactId>
<version>${vvr-version}</version>
</dependency>
Hologres结果表
VVR提供了OutputFormatSinkFunction的实现类HologresSinkFunction来写入数据。以下为构建Hologres Sink的示例。
public class Sample {
public static void main(String[] args) throws Exception {
// set up the Java DataStream API
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 初始化读取的表的Schema,需要和Hologres表Schema的字段匹配,可以只定义部分字段。
TableSchema schema = TableSchema.builder()
.field("a", DataTypes.INT())
.field("b", DataTypes.STRING())
.build();
// Hologres的相关参数。
Configuration config = new Configuration();
config.setString(HologresConfigs.ENDPOINT, "yourEndpoint");
config.setString(HologresConfigs.USERNAME, "yourUserName");
config.setString(HologresConfigs.PASSWORD, "yourPassword");
config.setString(HologresConfigs.DATABASE, "yourDatabaseName");
config.setString(HologresConfigs.TABLE, "yourTableName");
config.setString(HologresConfigs.SDK_MODE, "jdbc");
HologresConnectionParam hologresConnectionParam = new HologresConnectionParam(config);
// 构建Hologres Writer,以RowData的方式写入数据。
AbstractHologresWriter<RowData> hologresWriter = HologresJDBCWriter.createRowDataWriter(
hologresConnectionParam,
schema,
HologresTableSchema.get(hologresConnectionParam),
new Integer[0]);
// 构建Hologres Sink。
HologresSinkFunction sinkFunction = new HologresSinkFunction(hologresConnectionParam, hologresWriter);
TypeInformation<RowData> typeInfo = InternalTypeInfo.of(schema.toRowDataType().getLogicalType());
env.fromElements((RowData) GenericRowData.of(101, StringData.fromString("name"))).returns(typeInfo).addSink(sinkFunction);
env.execute();
}
}
元数据列
实时计算引擎VVR 8.0.11及以上版本的Binlog源表支持元数据列。从此版本起,建议以元数据列的方式声明hg_binlog_event_type等Binlog字段。元数据列是SQL标准的扩展,通过元数据列可以访问源表的库名和表名,以及数据的变更类型,产生时间等特定信息,您可以基于这些信息自定义处理逻辑,例如过滤变更类型为DELETE的数据等。
字段名 | 字段类型 | 说明 |
db_name | STRING NOT NULL | 包含该行记录的库名。 |
table_name | STRING NOT NULL | 包含该行记录的表名。 |
hg_binlog_lsn | BIGINT NOT NULL | Binlog系统字段,表示Binlog序号,Shard内部单调递增但不连续,不同Shard之间不保证唯一和有序。 |
hg_binlog_timestamp_us | BIGINT NOT NULL | 该行记录在数据库中的变更时间戳,单位为微秒(us)。 |
hg_binlog_event_type | BIGINT NOT NULL | 该行记录的变更类型。参数取值如下:
|
hg_shard_id | INT NOT NULL | 数据所在数据分片Shard。 Shard基本概念详情请参见Table Group和Shard。 |
在DDL中,采用<meta_column_name> <datatype> METADATA VIRTUAL
声明元数据列。示例如下:
CREATE TABLE test_message_src_binlog_table(
hg_binlog_lsn bigint METADATA VIRTUAL
hg_binlog_event_type bigint METADATA VIRTUAL
hg_binlog_timestamp_us bigint METADATA VIRTUAL
hg_shard_id int METADATA VIRTUAL
db_name string METADATA VIRTUAL
table_name string METADATA VIRTUAL
id INTEGER,
title VARCHAR,
body VARCHAR
) WITH (
'connector'='hologres',
...
);
常见问题
相关文档
Hologres Catalog的创建与使用,详情请参见管理Hologres Catalog。
Hologres与Flink深度集成,能够提供一体化的实时数仓联合解决方案,详情请参见基于Flink+Hologres搭建实时数仓。
Hologres 具备高效的数据更新与修正支持,适用于多流写入场景下的宽表构建,详情请参见基于Flink+MongoDB+Hologres的游戏行业用户行为分析。