全部产品
Search
文档中心

实时计算Flink版:实时数仓Hologres

更新时间:Jun 25, 2025

本文为您介绍如何使用实时数仓Hologres连接器。

背景信息

实时数仓Hologres是一站式实时数据仓库引擎,支持海量数据实时写入、实时更新、实时分析,支持标准SQL(兼容PostgreSQL协议),支持PB级数据多维分析(OLAP)与即席分析(Ad Hoc),支持高并发低延迟的在线数据服务(Serving),与MaxCompute、Flink、DataWorks深度融合,提供离在线一体化全栈数仓解决方案。Hologres连接器支持的信息如下。

类别

详情

支持类型

源表、维表和结果表

运行模式

流模式和批模式

数据格式

暂不支持

特有监控指标

  • 源表:

    • numRecordsIn

    • numRecordsInPerSecond

  • 结果表:

    • numRecordsOut

    • numRecordsOutPerSecond

    • currentSendTime

    说明

    指标含义详情,请参见监控指标说明

API种类

Datastream和SQL

是否支持更新或删除结果表数据

特色功能

功能

详情

实时消费Hologres

支持以非Binlog或Binlog方式读取Hologres数据,兼容CDC与非CDC模式。

全增量一体化消费

支持全量、增量及全增量一体化消费。

主键冲突处理策略

支持忽略新数据、整行替换或仅更新指定字段。

多流写入的宽表Merge与局部更新

只更新修改部分列的数据,而非整行更新。

消费分区表Binlog(公测)

支持消费物理分区表Binlog,单个作业即可监听全部分区及新增分区。支持消费逻辑分区表Binlog。

分区表写入

支持向分区表的父表写入数据,并可自动创建对应的子分区表。

实时同步单表或整库

支持实时同步单表或整库级别数据 ,具备以下关键特性:

  • 自动感知上游表结构变更 :当源数据库的表结构发生变化时,Hologres 能够实时将这些变更同步到目标表中。

  • 自动处理结构变更:在新数据流入Hologres时,Flink会先触发表结构修改操作,再写入数据,整个过程您无需干预 。

详情请参见CREATE TABLE AS(CTAS)语句数据库实时入仓快速入门

前提条件

已创建Hologres表,详情请参见创建Hologres表

使用限制

注意事项

  • Hologres 与 VVR 版本消费模式兼容性及限制表

    源表

    VVR版本

    Hologres版本

    默认/推荐参数值

    实际消费模式

    备注

    < 6.0.7

    < 2.0

    holohub

    holohub(默认)

    不支持jdbc。

    ≥ 6.0.7

    < 2.0

    自定义

    holohub(默认)

    推荐配置 'sdkmode' = 'jdbc'。

    6.0.7 ~ 8.0.4

    ≥ 2.0

    jdbc(自动切换,无需配置)

    jdbc(强制)

    Hologres 2.0及以上版本下线了holohub服务,自动切换为jdbc,可能存在权限不足问题。权限配置详情请参见权限问题

    ≥ 8.0.5

    ≥ 2.1

    jdbc(自动切换,无需配置)

    jdbc(强制)

    无权限问题。Hologres 2.1.27及以上版本,切换为jdbc_fixed。

    ≥ 11.1

    任何版本

    AUTO(默认值)

    根据Hologres版本自动选择

    • 2.1.27及以上版本,选择jdbc模式,同时默认启用轻量级连接,即connection.fixed.enabled 参数默认设置为 true。

    • 2.1.0~2.1.26版本,选择jdbc模式。

    • 2.0 及以下版本,选择holohub模式。

    重要

    VVR 11.1及以上版本默认消费Binlog数据,请确认已经开启Binlog,否则可能会导致报错。

    权限问题

    如果用户不是SuperUser,使用JDBC模式消费Binlog需要进行权限配置。

    user_name为阿里云账号ID或RAM用户,详情请参见账号概述

    -- 专家权限模型下为用户授予CREATE权限,以及赋予用户实例的Replication Role权限
    GRANT CREATE ON DATABASE <db_name> TO <user_name>;
    alter role <user_name> replication;
    
    -- 如果Database开启了简单权限模型(SLMP),无法执行GRANT语句,使用spm_grant为用户授予DB的Admin权限,也可以在Holoweb中直接赋权
    call spm_grant('<db_name>_admin', '<user_name>');
    alter role <user_name> replication;

    结果表

    VVR版本

    Hologres版本

    rpc模式是否受影响

    rpc实际消费模式

    推荐/默认参数值

    备注

    6.0.4 ~ 8.0.2

    < 2.0

    rpc

    自定义

    /

    6.0.4 ~ 8.0.2

    ≥ 2.0

    jdbc_fixed(自动切换)

    自定义

    可以通过设置'jdbcWriteBatchSize'='1'防止去重。

    ≥ 8.0.3

    任何版本

    jdbc_fixed(自动切换)

    自定义

    如果配置为rpc模式,将自动切换该参数值为jdbc_fixed且设置'jdbcWriteBatchSize'='1'防止去重。

    ≥ 8.0.5

    任何版本

    jdbc_fixed(自动切换)

    自定义

    如果配置为rpc模式,将自动切换该参数值为jdbc_fixed且设置'deduplication.enabled'='false'防止去重。

    重要
    • 由于Hologres 2.0及以上版本下线了rpc服务,此时如果您将该参数配置为了rpc,Flink系统自动将参数值切换为jdbc_fixed。但是如果您配置为其他值,Flink系统将采用您配置的参数值。

    • VVR 11.1及以上版本已经取消了rpc模式,推荐采用jdbc模式连接。

    维表

    VVR版本

    Hologres版本

    rpc模式是否受影响

    rpc实际消费模式

    推荐/默认参数值

    备注

    6.0.4 ~ 8.0.2

    < 2.0

    rpc

    自定义

    /

    6.0.4 ~ 8.0.2

    ≥ 2.0

    jdbc_fixed(自动切换)

    自定义

    如果Hologres实例为2.0及以上版本,由于Hologres 2.0及以上版本下线了rpc服务,此时如果您将该参数配置为了rpc,Flink系统自动将参数值切换为jdbc_fixed。但是如果您配置为其他值,Flink系统将采用您配置的参数值。

    ≥ 8.0.3

    任何版本

    jdbc_fixed(自动切换)

    自定义

    ≥ 8.0.5

    任何版本

    jdbc_fixed(自动切换)

    自定义

    重要

    VVR 11.1及以上版本已经取消了rpc模式,默认采用jdbc模式连接。

  • JDBC模式Binlog源表支持读取JSONB类型,需要数据库级别开启GUC。

    --db级别开启GUC,仅superuser可以执行,每个db只需要设置一次。
    alter database <db_name> set hg_experimental_enable_binlog_jsonb = on;
  • 对于数据更新频繁的表,建议采用行存表开启Binlog;如果该表还同时用于OLAP等分析查询,建议使用行列共存的存储格式。

  • UPDATE操作会生成两条连续的Binlog记录,记录连续且旧数据(update_before)记录在前,新数据(update_after)记录在后。

  • 不建议对Binlog源表进行TRUNCATE或其他重建表操作,详情请参见常见问题

  • 请确保Flink与Hologres 的DECIMAL类型精度一致,以避免错误,详情请参见常见问题

  • 建议Flink作业并发数和Hologres Table的Shard个数保持一致。

    # 您可以在Hologres控制台上,使用以下语句查看Table的Shard数,其中tablename为您的业务表名称。
    select tg.property_value from hologres.hg_table_properties tb join hologres.hg_table_group_properties tg on tb.property_value = tg.tablegroup_name where tb.property_key = 'table_group' and tg.property_key = 'shard_count' and table_name = '<tablename>';

开启Binlog

未建表

实时消费功能默认关闭,因此在Hologres控制台上创建表的DDL时,需要设置binlog.levelbinlog.ttl参数,示例如下。

begin;
create table test_table(
  id int primary key, 
  title text not null, 
  body text);
call set_table_property('test_table', 'orientation', 'row');--创建行存表test_message_src
call set_table_property('test_table', 'clustering_key', 'id');--在id列建立聚簇索引
call set_table_property('test_table', 'binlog.level', 'replica');--设置表属性开启Binlog功能
call set_table_property('test_table', 'binlog.ttl', '86400');--binlog.ttl,Binlog的TTL,单位为秒
commit;

已建表

Hologres控制台上,可以使用以下语句对已有表开启Binlog并设置Binlog TTL时间。table_name为开启Binlog的表名称。

-- 设置表属性开启Binlog功能
begin;
call set_table_property('<table_name>', 'binlog.level', 'replica');
commit;

-- 设置表属性,配置Binlog TTL时间,单位秒
begin;
call set_table_property('<table_name>', 'binlog.ttl', '2592000');
commit;

WITH参数

从VVR 11开始,为了更好地支持Hologres,With参数有所调整,部分参数可能已经更名或进行移除。请根据您的版本查阅对应的参数说明。

类型映射

Flink与Hologres的数据类型映射请参见Flink与Hologres的数据类型映射

使用示例

源表示例

Binlog Source表

CDC模式

该模式下Source消费的Binlog数据,将根据hg_binlog_event_type自动为每行数据设置准确的Flink RowKind类型,无需显式声明它们,例如,INSERT、DELETE、UPDATE_BEFORE和UPDATE_AFTER类型,这样就能完成表的数据的镜像同步,类似MySQL或Postgres的CDC功能。源表DDL代码示例如下。

VVR 11+

CREATE TEMPORARY TABLE test_message_src_binlog_table(
  id INTEGER,
  title VARCHAR,
  body VARCHAR
) WITH (
  'connector'='hologres',
  'dbname'='<yourDbname>',
  'tablename'='<yourTablename>',
  'username'='<yourAccessID>',
  'password'='<yourAccessSecret>',
  'endpoint'='<yourEndpoint>',
  'source.binlog.change-log-mode'='ALL',  --读取所有ChangeLog类型,包括INSERT、DELETE、UPDATE_BEFORE、UPDATE_AFTER。
  'retry-count'='10',                     --读取Binlog数据出错后的重试次数。
  'retry-sleep-step-ms'='5000',           --重试累加等待时间。第一次重试等待5秒,第二次等待10秒,依此类推。。
  'source.binlog.batch-size'='512'        --批量读取Binlog的数据行数。
);

VVR 8+

CREATE TABLE test_message_src_binlog_table(
  id INTEGER,
  title VARCHAR,
  body VARCHAR
) WITH (
  'connector'='hologres',
  'dbname'='<yourDbname>',
  'tablename'='<yourTablename>',
  'username'='<yourAccessID>',
  'password'='<yourAccessSecret>',
  'endpoint'='<yourEndpoint>',
  'binlog' = 'true',
  'cdcMode' = 'true',
  'sdkMode'='jdbc',
  'binlogMaxRetryTimes' = '10',     --读取Binlog数据出错后的重试次数。
  'binlogRetryIntervalMs' = '500',  --读取Binlog数据出错后的重试时间间隔
  'binlogBatchReadSize' = '100'     --批量读取Binlog的数据行数。
);

非CDC模式

该模式下Source消费的Binlog数据是作为普通的Flink数据传递给下游节点的,即所有数据都是INSERT类型的数据,您可以根据业务情况选择如何处理特定hg_binlog_event_type类型的数据。源表DDL代码示例如下。

VVR 11+

CREATE TEMPORARY TABLE test_message_src_binlog_table(
  id INTEGER,
  title VARCHAR,
  body VARCHAR
) WITH (
  'connector'='hologres',
  'dbname'='<yourDbname>',
  'tablename'='<yourTablename>',
  'username'='<yourAccessID>',
  'password'='<yourAccessSecret>',
  'endpoint'='<yourEndpoint>',
  'source.binlog.change-log-mode'='ALL_AS_APPEND_ONLY',  --所有ChangeLog类型都当作INSERT处理。
  'retry-count'='10',                     --读取Binlog数据出错后的重试次数。
  'retry-sleep-step-ms'='5000',           --重试累加等待时间。第一次重试等待5秒,第二次等待10秒,依此类推。。
  'source.binlog.batch-size'='512'        --批量读取Binlog的数据行数。
);

VVR 8+

CREATE TABLE test_message_src_binlog_table(
  hg_binlog_lsn BIGINT,
  hg_binlog_event_type BIGINT,
  hg_binlog_timestamp_us BIGINT,
  id INTEGER,
  title VARCHAR,
  body VARCHAR
) WITH (
  'connector'='hologres',
  'dbname'='<yourDbname>',
  'tablename'='<yourTablename>',
  'username'='<yourAccessID>',
  'password'='<yourAccessSecret>',
  'endpoint'='<yourEndpoint>',
  'binlog' = 'true',
  'binlogMaxRetryTimes' = '10',     --读取Binlog数据出错后的重试次数。
  'binlogRetryIntervalMs' = '500',  --读取Binlog数据出错后的重试时间间隔
  'binlogBatchReadSize' = '100'     --批量读取Binlog的数据行数。
);

非Binlog Source表

重要

VVR 11.1及以上版本默认消费Binlog数据,请参见Binlog Source表。

CREATE TEMPORARY TABLE hologres_source (
  name varchar, 
  age BIGINT,
  birthday BIGINT
) WITH (
  'connector'='hologres',
  'dbname'='<yourDbname>',
  'tablename'='<yourTablename>',
  'username' = '${secret_values.ak_id}',
  'password' = '${secret_values.ak_secret}',
  'endpoint'='<yourEndpoint>',
  'sdkmode' = 'jdbc'
);

结果表示例

CREATE TEMPORARY TABLE datagen_source(
  name varchar, 
  age BIGINT,
  birthday BIGINT
) WITH (
  'connector'='datagen'
);
CREATE TEMPORARY TABLE hologres_sink (
  name varchar, 
  age BIGINT,
  birthday BIGINT
) WITH (
  'connector'='hologres',           
  'dbname'='<yourDbname>',
  'tablename'='<yourTablename>',
  'username' = '${secret_values.ak_id}',
  'password' = '${secret_values.ak_secret}',
  'endpoint'='<yourEndpoint>'
);
INSERT INTO hologres_sink SELECT * from datagen_source;

维表示例

CREATE TEMPORARY TABLE datagen_source (
  a INT,
  b BIGINT,
  c STRING,
  proctime AS PROCTIME()
) WITH (
  'connector' = 'datagen'
);
CREATE TEMPORARY TABLE hologres_dim (
  a INT, 
  b VARCHAR, 
  c VARCHAR
) WITH (
  'connector' = 'hologres',
   ...
);
CREATE TEMPORARY TABLE blackhole_sink (
  a INT,
  b STRING
) WITH (
  'connector' = 'blackhole'
);
INSERT INTO blackhole_sink SELECT T.a,H.b
FROM datagen_source AS T JOIN hologres_dim FOR SYSTEM_TIME AS OF T.proctime AS H ON T.a = H.a;

特色功能详解

全增量一体化消费

适用场景

  • 仅适用于目标表有主键的场景,推荐在CDC模式下使用的全增量Hologres源表。

  • Hologres支持按需开启Binlog,可以将已有历史数据的表打开Binlog。

代码示例

VVR 11+

CREATE TABLE test_message_src_binlog_table(
  hg_binlog_lsn BIGINT,
  hg_binlog_event_type BIGINT,
  hg_binlog_timestamp_us BIGINT,
  id INTEGER,
  title VARCHAR,
  body VARCHAR
) WITH (
  'connector'='hologres',
  'dbname'='<yourDbname>',
  'tablename'='<yourTablename>',
  'username'='<yourAccessID>',
  'password'='<yourAccessSecret>',
  'endpoint'='<yourEndpoint>',
  'source.binlog.startup-mode' = 'INITIAL',   --先读取历史全量数据,再增量消费Binlog。
  'retry-count'='10',                         --读取Binlog数据出错后的重试次数。
  'retry-sleep-step-ms'='5000',               --重试累加等待时间。第一次重试等待5秒,第二次等待10秒,依此类推。。
  'source.binlog.batch-size'='512'            --批量读取Binlog的数据行数。
  );
说明
  • source.binlog.startup-mode设置为INITIAL,可以先全量消费数据,再读取Binlog开始增量消费。

  • 如果设置了startTime参数,或者在启动界面选择了启动时间,则binlogStartUpMode会强制使用timestamp模式消费,其他消费模式则不生效,startTime参数优先级更高。

VVR 8+

CREATE TABLE test_message_src_binlog_table(
  hg_binlog_lsn BIGINT,
  hg_binlog_event_type BIGINT,
  hg_binlog_timestamp_us BIGINT,
  id INTEGER,
  title VARCHAR,
  body VARCHAR
) WITH (
  'connector'='hologres',
  'dbname'='<yourDbname>',
  'tablename'='<yourTablename>',
  'username'='<yourAccessID>',
  'password'='<yourAccessSecret>',
  'endpoint'='<yourEndpoint>',
  'binlog' = 'true',
  'cdcMode' = 'true',
  'binlogStartUpMode' = 'initial', --先读取历史全量数据,再增量消费Binlog。
  'binlogMaxRetryTimes' = '10',     --读取Binlog数据出错后的重试次数。
  'binlogRetryIntervalMs' = '500',  --读取Binlog数据出错后的重试时间间隔
  'binlogBatchReadSize' = '100'     --批量读取Binlog的数据行数。
  );
说明
  • binlogStartUpMode设置为initial,可以先全量消费数据,再读取Binlog开始增量消费。

  • 如果设置了startTime参数,或者在启动界面选择了启动时间,则binlogStartUpMode会强制使用timestamp模式消费,其他消费模式则不生效,startTime参数优先级更高。

主键冲突处理策略

写入Hologres时若存在重复主键的数据,连接器提供了三种处理策略。

VVR 11+

通过指定sink.on-conflict-action参数值,来实现不同的处理策略。

sink.on-conflict-action参数值

含义

INSERT_OR_IGNORE

保留首次出现的数据,忽略后续重复数据。

INSERT_OR_REPLACE

后续数据整行替换已有数据。

INSERT_OR_UPDATE(默认值)

只更新sink中已提供的字段,其他字段保持不变。

VVR 8+

通过指定mutatetype参数值,来实现不同的处理策略。

mutatetype参数值

含义

insertorignore(默认值)

保留首次出现的数据,忽略后续重复数据。

insertorreplace

后续数据整行替换已有数据。

insertorupdate

只更新sink中已提供的字段,其他字段保持不变。

假设表有字段 a、b、c、d,其中 a 是主键。若结果表字段仅提供 a 和 b,则配置INSERT_OR_UPDATE时,仅更新 b 字段,c 和 d 保持不变。
说明

结果表字段数量可以少于Hologres物理表,但缺失字段必须允许为空,否则将会导致写入失败。

分区表写入

默认情况下,Hologres Sink 仅支持向单表导入数据。如需导入至分区表的父表 ,需开启以下配置:

VVR 11+

sink.create-missing-partition设置为true,若未创建子分区表,可以自动创建。

说明
  • VVR 11.1及以上版本默认支持写入分区表,自动将数据路由到对应的子分区表。

  • tablename 参数填写父表的名称。

  • 若未提前创建子表且未设置 sink.create-missing-partition=true,将导致写入失败。

VVR 8+

  • partitionRouter设置为true,自动将数据路由到对应的子分区表。

  • createparttable设置为true,若未创建子分区表,可以自动创建。

说明
  • tablename 参数填写父表的名称。

  • 若未提前创建子表且未设置 createparttable=true,将导致写入失败。

多流写入的宽表Merge与局部更新

在将多个数据流写入同一张 Hologres 宽表的场景中,系统支持对主键相同的数据进行自动 Merge(合并),并可选择性地仅更新发生变化的部分列,而非整行替换,从而提升写入效率与数据一致性。

使用限制

  • 宽表必须有主键。

  • 每个数据流的数据都必须包含完整的主键字段。

  • 列存模式的宽表Merge场景在高RPS的情况下,CPU使用率会偏高,建议关闭表中字段的Dictionary Encoding功能。

使用示例

假设有两个Flink数据流,一个数据流中包含a、b和c字段,另一个数据流中包含a、d和e字段,Hologres宽表WIDE_TABLE包含a、b、c、d和e字段,其中a字段为主键。

VVR 11+

// 已经定义的source1和source2
CREATE TEMPORARY TABLE hologres_sink ( -- 声明a,b,c,d,e五个字段
  a BIGINT, 
  b STRING,
  c STRING,
  d STRING,
  e STRING,
  primary key(a) not enforced
) WITH (
  'connector'='hologres',           
  'dbname'='<yourDbname>',
  'tablename'='<yourWideTablename>',  -- hologres宽表,包含a,b,c,d,e五个字段
  'username' = '${secret_values.ak_id}',
  'password' = '${secret_values.ak_secret}',
  'endpoint'='<yourEndpoint>',
  'sink.on-conflict-action'='INSERT_OR_UPDATE',   -- 根据主键更新数据部分列
  'sink.delete-strategy'='IGNORE_DELETE',         -- 撤回消息的处理策略,IGNORE_DELETE适用于仅需插入或更新数据,而无需删除数据的场景。
  'sink.partial-insert.enabled'='true'            -- 开启部分列更新参数,将INSERT语句中定义的字段下推给连接器,从而可以只对声明的字段进行更新或插入
);

BEGIN STATEMENT SET;
INSERT INTO hologres_sink(a,b,c) select * from source1;  -- 声明只插入a,b,c三个字段
INSERT INTO hologres_sink(a,d,e) select * from source2;  -- 声明只插入a,d,e三个字段
END;

VVR 8+

// 已经定义的source1和source2
CREATE TEMPORARY TABLE hologres_sink ( -- 声明a,b,c,d,e五个字段
  a BIGINT, 
  b STRING,
  c STRING,
  d STRING,
  e STRING,
  primary key(a) not enforced
) WITH (
  'connector'='hologres',           
  'dbname'='<yourDbname>',
  'tablename'='<yourWideTablename>',  -- hologres宽表,包含a,b,c,d,e五个字段
  'username' = '${secret_values.ak_id}',
  'password' = '${secret_values.ak_secret}',
  'endpoint'='<yourEndpoint>',
  'mutatetype'='insertorupdate',    -- 根据主键更新数据部分列
  'ignoredelete'='true',            -- 忽略回撤消息产生的Delete请求
  'partial-insert.enabled'='true'   -- 开启部分列更新参数,支持仅更新INSERT语句中声明的字段
);

BEGIN STATEMENT SET;
INSERT INTO hologres_sink(a,b,c) select * from source1;  -- 声明只插入a,b,c三个字段
INSERT INTO hologres_sink(a,d,e) select * from source2;  -- 声明只插入a,d,e三个字段
END;
说明

ignoredelete设置为true,忽略回撤消息产生Delete请求。实时计算引擎VVR 8.0.8及以上版本推荐使用sink.delete-strategy配置局部更新时的各种策略。

消费分区表Binlog(公测)

分区表有助于数据归档与查询优化。Hologres Connector支持消费物理分区表和逻辑分区表的Binlog。关于物理分区表和逻辑分区表的区别参见分区表介绍

消费物理分区表Binlog

Hologres Connector 支持通过单个作业消费分区表 Binlog,并可动态监听新增分区,显著提升实时数据处理效率与易用性。

注意事项

  • 仅实时计算引擎VVR 8.0.11及以上版本,Hologres实例版本大于等于2.1.27,Binlog源表JDBC模式支持消费分区表。

  • 分区名称必须严格由父表名+下划线+分区值组成,即{parent_table}_{partition_value},非此格式的分区可能无法消费到,详情请参见表名生成规则

    重要
    • 对于DYNAMIC模式,VVR 8.0.11版本不支持带-分隔符的分区字段(如YYYY-MM-DD)。

    • 自VVR 11.1版本起,可以消费自定义Format格式的分区字段。

    • 写入分区表则不受此格式限制。

  • Flink中声明Hologres源表时,必须包含Hologres分区表的分区字段。

  • 对于DYNAMIC模式,要求分区表必须开启动态分区管理。并且分区预创建参数auto_partitioning.num_precreate必须大于1,否则,在尝试消费最新分区时,作业将会抛出异常。

使用示例

模式类型

特点

适用场景说明

DYNAMIC

动态分区消费

自动监听新增分区,按时间顺序动态推进消费进度。适合实时数据流场景。

STATIC

静态分区消费

只消费已存在的分区(或手动指定的分区),不会自动发现新分区。适合固定范围的历史数据处理。

DYNAMIC模式

VVR 11+

假设Hologres存在如下的DDL分区表,并且已启用Binlog以及动态分区。

CREATE TABLE "test_message_src1" (
    id int,
    title text,
    body text,
    dt text,
    PRIMARY KEY (id, dt)
)
PARTITION BY LIST (dt) WITH (
    binlog_level = 'replica', 
    auto_partitioning_enable =  'true',   -- 开启动态分区
    auto_partitioning_time_unit = 'DAY',  -- 以天为时间单元,自动创建的分区名示例:test_message_src1_20250512,test_message_src1_20250513
    auto_partitioning_num_precreate = '2' -- 会提前创建两个分区
);
-- 已经存在的分区表,也可以通过ALTER TABLE方式开启动态分区

在Flink中,使用以下SQL声明对分区表test_message_src1进行DYNAMIC模式消费。

CREATE TEMPORARY TABLE hologres_source
(
  id INTEGER,
  title VARCHAR,
  body VARCHAR,
  dt VARCHAR  -- hologres分区表的分区字段
)
with (
  'connector' = 'hologres',
  'dbname' = '<yourDatabase>',
  'tablename' = 'test_message_src1',  -- 开启了动态分区的父表,
  'username' = '<yourUserName>',
  'password' = '<yourPassword>',
  'endpoint' = '<yourEndpoint>',
  'source.binlog.partition-binlog-mode' = 'DYNAMIC', -- 动态监听最新的分区
  'source.binlog.startup-mode' = 'initial'           -- 先全量消费数据,再读取Binlog开始增量消费。
);

VVR 8.0.11

假设Hologres存在如下的DDL分区表,并且已启用Binlog以及动态分区。

CREATE TABLE "test_message_src1" (
    id int,
    title text,
    body text,
    dt text,
    PRIMARY KEY (id, dt)
)
PARTITION BY LIST (dt) WITH (
    binlog_level = 'replica', 
    auto_partitioning_enable =  'true',   -- 开启动态分区
    auto_partitioning_time_unit = 'DAY',  -- 以天为时间单元,自动创建的分区名示例:test_message_src1_20241027,test_message_src1_20241028
    auto_partitioning_num_precreate = '2' -- 会提前创建两个分区
);

-- 已经存在的分区表,也可以通过ALTER TABLE方式开启动态分区

在Flink中,使用以下SQL声明对分区表test_message_src1进行DYNAMIC模式消费。

CREATE TEMPORARY TABLE hologres_source
(
  id INTEGER,
  title VARCHAR,
  body VARCHAR,
  dt VARCHAR  -- hologres分区表的分区字段
)
with (
  'connector' = 'hologres',
  'dbname' = '<yourDatabase>',
  'tablename' = 'test_message_src1',  -- 开启了动态分区的父表,
  'username' = '<yourUserName>',
  'password' = '<yourPassword>',
  'endpoint' = '<yourEndpoint>',
  'binlog' = 'true',
  'partition-binlog.mode' = 'DYNAMIC',  -- 动态监听最新的分区
  'binlogstartUpMode' = 'initial',      -- 先全量消费数据,再读取Binlog开始增量消费。
  'sdkMode' = 'jdbc_fixed'              -- 使用此模式,避免连接数限制
);

STATIC模式

VVR 11+

假设Hologres存在如下的DDL分区表,并且已启用Binlog。

CREATE TABLE test_message_src2 (
    id int,
    title text,
    body text,
    color text,
    PRIMARY KEY (id, color)
)
PARTITION BY LIST (color) WITH (
    binlog_level = 'replica'
);
create table test_message_src2_red partition of test_message_src2 for values in ('red');
create table test_message_src2_blue partition of test_message_src2 for values in ('blue');
create table test_message_src2_green partition of test_message_src2 for values in ('green');
create table test_message_src2_black partition of test_message_src2 for values in ('black');

在Flink中,使用以下SQL声明对分区表test_message_src2进行STATIC模式消费。

CREATE TEMPORARY TABLE hologres_source
(
  id INTEGER,
  title VARCHAR,
  body VARCHAR,
  color VARCHAR  -- hologres分区表的分区字段
)
with (
  'connector' = 'hologres',
  'dbname' = '<yourDatabase>',
  'tablename' = 'test_message_src2',  -- 分区表
  'username' = '<yourUserName>',
  'password' = '<yourPassword>',
  'endpoint' = '<yourEndpoint>',
  'source.binlog.partition-binlog-mode' = 'STATIC', -- 消费固定的分区
  'source.binlog.partition-values-to-read' = 'red,blue,green',  -- 仅消费配置的3个分区,不会消费'black'分区;以后新增分区也不会消费.不设置则消费父表所有分区。
  'source.binlog.startup-mode' = 'initial'  -- 先全量消费数据,再读取Binlog开始增量消费。
);

VVR 8.0.11

假设Hologres存在如下的DDL分区表,并且已启用Binlog。

CREATE TABLE test_message_src2 (
    id int,
    title text,
    body text,
    color text,
    PRIMARY KEY (id, color)
)
PARTITION BY LIST (color) WITH (
    binlog_level = 'replica'
);
create table test_message_src2_red partition of test_message_src2 for values in ('red');
create table test_message_src2_blue partition of test_message_src2 for values in ('blue');
create table test_message_src2_green partition of test_message_src2 for values in ('green');
create table test_message_src2_black partition of test_message_src2 for values in ('black');

在Flink中,使用以下SQL声明对分区表test_message_src2进行STATIC模式消费。

CREATE TEMPORARY TABLE hologres_source
(
  id INTEGER,
  title VARCHAR,
  body VARCHAR,
  color VARCHAR  -- hologres分区表的分区字段
)
with (
  'connector' = 'hologres',
  'dbname' = '<yourDatabase>',
  'tablename' = 'test_message_src2',  -- 分区表
  'username' = '<yourUserName>',
  'password' = '<yourPassword>',
  'endpoint' = '<yourEndpoint>',
  'binlog' = 'true',
  'partition-binlog.mode' = 'STATIC', -- 消费固定的分区
  'partition-values-to-read' = 'red,blue,green',  -- 仅消费配置的3个分区,不会消费'black'分区;以后新增分区也不会消费。不设置则消费父表所有分区。
  'binlogstartUpMode' = 'initial',  -- 先全量消费数据,再读取Binlog开始增量消费。
  'sdkMode' = 'jdbc_fixed' -- 使用此模式,避免连接数限制
);

消费逻辑分区表Binlog

Hologres Connector 支持消费逻辑分区表Binlog,并可通过参数消费指定分区。

注意事项

  • 仅实时计算引擎VVR 11.0.0及以上版本,Hologres实例版本大于等于V3.1,支持消费逻辑分区表指定分区的Binlog。

  • 消费逻辑分区表全部分区的Binlog,与消费普通Holo表没有区别,方法参考源表示例

使用示例

参数名

说明

示例

source.binlog.logical-partition-filter-column-names

逻辑分区表消费Binlog指定分区的分区列名。分区列名必须用双引号包裹,多个分区列用逗号分隔,若列名有双引号前面添加双引号进行转义。

'source.binlog.logical-partition-filter-column-names'='"Pt","id"'

该分区分区列有两个,分别是Pt和id。

source.binlog.logical-partition-filter-column-values

逻辑分区表消费Binlog指定分区的分区值。每个分区可以由多个分区列的值指定,分区列之间用逗号分隔,分区列的值用双引号包裹,如果分区列的值包含双引号前面需要添加双引号进行转义。多个分区之间用分号分隔

'source.binlog.logical-partition-filter-column-values'='"20240910","0";"special""value","9"'

指定消费两个分区。分区列有两个。第一个分区值是(20240910, 0),第二个分区值是(special"value, 9)

假设Holo中已经建表如下

CREATE TABLE holo_table (
    id int not null,
    name text,
    age numeric(18,4),
    "Pt" text,
    primary key(id, "Pt")
)
LOGICAL PARTITION BY LIST ("Pt", id)
WITH (
    binlog_level ='replica'
);

Flink中消费该表Binlog。

CREATE TEMPORARY TABLE test_src_binlog_table(
  id INTEGER,
  name VARCHAR,
  age decimal(18,4),
  `Pt` VARCHAR
) WITH (
  'connector'='hologres',
  'dbname'='<yourDbname>',
  'tablename'='holo_table',
  'username'='<yourAccessID>',
  'password'='<yourAccessSecret>',
  'endpoint'='<yourEndpoint>',
  'source.binlog'='true',
  'source.binlog.logical-partition-filter-column-names'='"Pt","id"',
  'source.binlog.logical-partition-filter-column-values'='<yourPartitionColumnValues>',
  'source.binlog.change-log-mode'='ALL',  --读取所有ChangeLog类型,包括INSERT、DELETE、UPDATE_BEFORE、UPDATE_AFTER。
  'retry-count'='10',                     --读取Binlog数据出错后的重试次数。
  'retry-sleep-step-ms'='5000',           --重试累加等待时间。第一次重试等待5秒,第二次等待10秒,依此类推。。
  'source.binlog.batch-size'='512'        --批量读取Binlog的数据行数。
);

DataStream API

重要

通过DataStream的方式读写数据时,需要使用对应的DataStream连接器连接实时计算Flink版,DataStream连接器设置方法请参见DataStream连接器使用方法。Maven中央库中已经放置了Hologres DataStream连接器。在本地调试时,需要使用相应的Uber JAR,详见本地运行和调试包含连接器的作业

Hologres源表

Binlog源表

VVR提供了Source的实现类HologresBinlogSource来读取Hologres Binlog数据。以下为构建Hologres Binlog Source的示例。

VVR 8.0.11+

public class Sample {
    public static void main(String[] args) throws Exception {
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        // 初始化读取的表的Schema,需要和Hologres表Schema的字段匹配,可以只定义部分字段。
        TableSchema schema = TableSchema.builder()
                .field("a", DataTypes.INT())
                .field("b", DataTypes.STRING())
                .field("c", DataTypes.TIMESTAMP())
                .build();
        // Hologres的相关参数。
        Configuration config = new Configuration();
        config.setString(HologresConfigs.ENDPOINT, "yourEndpoint");
        config.setString(HologresConfigs.USERNAME, "yourUserName");
        config.setString(HologresConfigs.PASSWORD, "yourPassword");
        config.setString(HologresConfigs.DATABASE, "yourDatabaseName");
        config.setString(HologresConfigs.TABLE, "yourTableName");
        config.setString(HologresConfigs.SDK_MODE, "jdbc");
        config.setBoolean(HologresBinlogConfigs.OPTIONAL_BINLOG, true);
        config.setBoolean(HologresBinlogConfigs.BINLOG_CDC_MODE, true);
        // 构建JDBC Options。
        JDBCOptions jdbcOptions = JDBCUtils.getJDBCOptions(config);
        // 构建Hologres Binlog Source。
        long startTimeMs = 0;
        HologresBinlogSource source = new HologresBinlogSource(
                new HologresConnectionParam(config),
                schema,
                config,
                jdbcOptions,
                startTimeMs,
                StartupMode.INITIAL,
                "",
                "",
                -1,
                Collections.emptySet(),
                new ArrayList<>()
        );
        env.fromSource(source, WatermarkStrategy.noWatermarks(), "Test source").print();
        env.execute();
    }
}

VVR 8.0.7+

public class Sample {
    public static void main(String[] args) throws Exception {
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        // 初始化读取的表的Schema,需要和Hologres表Schema的字段匹配,可以只定义部分字段。
        TableSchema schema = TableSchema.builder()
                .field("a", DataTypes.INT())
                .field("b", DataTypes.STRING())
                .field("c", DataTypes.TIMESTAMP())
                .build();
        // Hologres的相关参数。
        Configuration config = new Configuration();
        config.setString(HologresConfigs.ENDPOINT, "yourEndpoint");
        config.setString(HologresConfigs.USERNAME, "yourUserName");
        config.setString(HologresConfigs.PASSWORD, "yourPassword");
        config.setString(HologresConfigs.DATABASE, "yourDatabaseName");
        config.setString(HologresConfigs.TABLE, "yourTableName");
        config.setString(HologresConfigs.SDK_MODE, "jdbc");
        config.setBoolean(HologresBinlogConfigs.OPTIONAL_BINLOG, true);
        config.setBoolean(HologresBinlogConfigs.BINLOG_CDC_MODE, true);
        // 构建JDBC Options。
        JDBCOptions jdbcOptions = JDBCUtils.getJDBCOptions(config);
        // 构建Hologres Binlog Source。
        long startTimeMs = 0;
        HologresBinlogSource source = new HologresBinlogSource(
                new HologresConnectionParam(config),
                schema,
                config,
                jdbcOptions,
                startTimeMs,
                StartupMode.INITIAL,
                "",
                "",
                -1,
                Collections.emptySet()
        );
        env.fromSource(source, WatermarkStrategy.noWatermarks(), "Test source").print();
        env.execute();
    }
}

VVR 6.0.7+

public class Sample {
    public static void main(String[] args) throws Exception {
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        // 初始化读取的表的Schema,需要和Hologres表Schema的字段匹配,可以只定义部分字段。
        TableSchema schema = TableSchema.builder()
                .field("a", DataTypes.INT())
                .build();
         // Hologres的相关参数。
        Configuration config = new Configuration();
        config.setString(HologresConfigs.ENDPOINT, "yourEndpoint");
        config.setString(HologresConfigs.USERNAME, "yourUserName");
        config.setString(HologresConfigs.PASSWORD, "yourPassword");
        config.setString(HologresConfigs.DATABASE, "yourDatabaseName");
        config.setString(HologresConfigs.TABLE, "yourTableName");
        config.setString(HologresConfigs.SDK_MODE, "jdbc");
        config.setBoolean(HologresBinlogConfigs.OPTIONAL_BINLOG, true);
        config.setBoolean(HologresBinlogConfigs.BINLOG_CDC_MODE, true);
        // 构建JDBC Options。
        JDBCOptions jdbcOptions = JDBCUtils.getJDBCOptions(config);
        // 设置或创建默认slotname
        config.setString(HologresBinlogConfigs.JDBC_BINLOG_SLOT_NAME, HoloBinlogUtil.getOrCreateDefaultSlotForJDBCBinlog(jdbcOptions));

        boolean cdcMode = config.get(HologresBinlogConfigs.BINLOG_CDC_MODE) && config.get(HologresBinlogConfigs.OPTIONAL_BINLOG);
        // 构建Binlog Record Converter。
        JDBCBinlogRecordConverter recordConverter = new JDBCBinlogRecordConverter(
                jdbcOptions.getTable(),
                schema,
                new HologresConnectionParam(config),
                cdcMode,
                Collections.emptySet());
        
        // 构建Hologres Binlog Source。
        long startTimeMs = 0;
        HologresJDBCBinlogSource source = new HologresJDBCBinlogSource(
                new HologresConnectionParam(config),
                schema,
                config,
                jdbcOptions,
                startTimeMs,
                StartupMode.TIMESTAMP,
                recordConverter,
                "",
                -1);
        env.fromSource(source, WatermarkStrategy.noWatermarks(), "Test source").print();
        env.execute();
    }
}
重要

实时计算引擎低于8.0.5版本或Hologres低于2.1版本,请注意是否为SuperUser,或具有Replication Role权限,详情请参见Hologres权限问题

非Binlog源表

VVR提供了RichInputFormat的实现类HologresBulkreadInputFormat来读取Hologres表数据。以下为构建Hologres Source读取表数据的示例。

public class Sample {
    public static void main(String[] args) throws Exception {
        // set up the Java DataStream API
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        // 初始化读取的表的Schema,需要和Hologres表Schema的字段匹配,可以只定义部分字段。
        TableSchema schema = TableSchema.builder()
                .field("a", DataTypes.INT())
                .field("b", DataTypes.STRING())
                .field("c", DataTypes.TIMESTAMP())
                .build();
        // Hologres的相关参数。
        Configuration config = new Configuration();
        config.setString(HologresConfigs.ENDPOINT, "yourEndpoint");
        config.setString(HologresConfigs.USERNAME, "yourUserName");
        config.setString(HologresConfigs.PASSWORD, "yourPassword");
        config.setString(HologresConfigs.DATABASE, "yourDatabaseName");
        config.setString(HologresConfigs.TABLE, "yourTableName");
        // 构建JDBC Options。
        JDBCOptions jdbcOptions = JDBCUtils.getJDBCOptions(config);
        HologresBulkreadInputFormat inputFormat = new HologresBulkreadInputFormat(
                new HologresConnectionParam(config),
                jdbcOptions,
                schema,
                "",
                -1);
        TypeInformation<RowData> typeInfo = InternalTypeInfo.of(schema.toRowDataType().getLogicalType());
        env.addSource(new InputFormatSourceFunction<>(inputFormat, typeInfo)).returns(typeInfo).print();
        env.execute();
    }
}

Maven依赖

Maven中央库中已经放置了Hologres DataStream连接器

<dependency>
    <groupId>com.alibaba.ververica</groupId>
    <artifactId>ververica-connector-hologres</artifactId>
    <version>${vvr-version}</version>
</dependency>

Hologres结果表

VVR提供了OutputFormatSinkFunction的实现类HologresSinkFunction来写入数据。以下为构建Hologres Sink的示例。

public class Sample {
    public static void main(String[] args) throws Exception {
        // set up the Java DataStream API
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        // 初始化读取的表的Schema,需要和Hologres表Schema的字段匹配,可以只定义部分字段。
        TableSchema schema = TableSchema.builder()
                .field("a", DataTypes.INT())
                .field("b", DataTypes.STRING())
                .build();
        // Hologres的相关参数。
        Configuration config = new Configuration();
        config.setString(HologresConfigs.ENDPOINT, "yourEndpoint");
        config.setString(HologresConfigs.USERNAME, "yourUserName");
        config.setString(HologresConfigs.PASSWORD, "yourPassword");
        config.setString(HologresConfigs.DATABASE, "yourDatabaseName");
        config.setString(HologresConfigs.TABLE, "yourTableName");
        config.setString(HologresConfigs.SDK_MODE, "jdbc");
        HologresConnectionParam hologresConnectionParam = new HologresConnectionParam(config);
        
         // 构建Hologres Writer,以RowData的方式写入数据。
        AbstractHologresWriter<RowData> hologresWriter = HologresJDBCWriter.createRowDataWriter(
                hologresConnectionParam, 
                schema, 
                HologresTableSchema.get(hologresConnectionParam), 
                new Integer[0]);
        // 构建Hologres Sink。
        HologresSinkFunction sinkFunction = new HologresSinkFunction(hologresConnectionParam, hologresWriter);
        TypeInformation<RowData> typeInfo = InternalTypeInfo.of(schema.toRowDataType().getLogicalType());
        env.fromElements((RowData) GenericRowData.of(101, StringData.fromString("name"))).returns(typeInfo).addSink(sinkFunction);
        env.execute();
    }
}

元数据列

实时计算引擎VVR 8.0.11及以上版本的Binlog源表支持元数据列。从此版本起,建议以元数据列的方式声明hg_binlog_event_type等Binlog字段。元数据列是SQL标准的扩展,通过元数据列可以访问源表的库名和表名,以及数据的变更类型,产生时间等特定信息,您可以基于这些信息自定义处理逻辑,例如过滤变更类型为DELETE的数据等。

字段名

字段类型

说明

db_name

STRING NOT NULL

包含该行记录的库名。

table_name

STRING NOT NULL

包含该行记录的表名。

hg_binlog_lsn

BIGINT NOT NULL

Binlog系统字段,表示Binlog序号,Shard内部单调递增但不连续,不同Shard之间不保证唯一和有序。

hg_binlog_timestamp_us

BIGINT NOT NULL

该行记录在数据库中的变更时间戳,单位为微秒(us)。

hg_binlog_event_type

BIGINT NOT NULL

该行记录的变更类型。参数取值如下:

  • 5:表示INSERT消息。

  • 2:表示DELETE消息。

  • 3:表示UPDATE_BEFORE消息。

  • 7:表示UPDATE_AFTER消息。

hg_shard_id

INT NOT NULL

数据所在数据分片Shard。 Shard基本概念详情请参见Table Group和Shard

在DDL中,采用<meta_column_name> <datatype> METADATA VIRTUAL声明元数据列。示例如下:

CREATE TABLE test_message_src_binlog_table(
  hg_binlog_lsn bigint METADATA VIRTUAL
  hg_binlog_event_type bigint METADATA VIRTUAL
  hg_binlog_timestamp_us bigint METADATA VIRTUAL
  hg_shard_id int METADATA VIRTUAL
  db_name string METADATA VIRTUAL
  table_name string METADATA VIRTUAL
  id INTEGER,
  title VARCHAR,
  body VARCHAR
) WITH (
  'connector'='hologres',
  ...
  );

常见问题

相关文档