全部产品
Search
文档中心

实时计算Flink版:实时数仓Hologres

更新时间:May 11, 2023

本文为您介绍如何使用实时数仓Hologres连接器。

背景信息

实时数仓Hologres是一站式实时数据仓库引擎,支持海量数据实时写入、实时更新、实时分析,支持标准SQL(兼容PostgreSQL协议),支持PB级数据多维分析(OLAP)与即席分析(Ad Hoc),支持高并发低延迟的在线数据服务(Serving),与MaxCompute、Flink、DataWorks深度融合,提供离在线一体化全栈数仓解决方案。

Hologres连接器支持的信息如下。

类别

详情

支持类型

源表、维表和结果表

运行模式

流模式和批模式

数据格式

暂不支持

特有监控指标

  • 源表:

    • numRecordsIn

    • numRecordsInPerSecond

  • 结果表:

    • numRecordsOut

    • numRecordsOutPerSecond

    • currentSendTime

    说明

    指标的含义及如何查看监控指标,请参见查看监控指标

API种类

Datastream和SQL

是否支持更新或删除结果表数据

特色功能

作为源表,支持以下功能特性。

功能

详情

实时消费Hologres

  • 支持作为非Binlog Source读取Hologres全量数据。

  • 支持实时消费Hologres的Binlog数据。

    • 支持非CDC模式消费。

    • 支持CDC模式消费。

    • 支持全增量一体源表消费。

获取更多信息,详情请参见Flink/Blink实时消费Hologres Binlog

作为结果表,支持以下功能特性。

功能

详情

流式语义

支持写入Changelog消息。

宽表Merge和局部更新功能

只更新修改部分的数据,而非整行更新。

作为CTAS和CDAS的目标端

支持实时同步单表、整库的数据以及相应的表结构变更到Hologres表中。

前提条件

已创建完成Hologres表,详情请参见创建Hologres表

使用限制

  • 通用:

    • 仅Flink计算引擎VVR 2.0.0及以上版本支持Hologres连接器。

    • Hologres连接器不支持访问Hologres外部表。关于Hologres外部表详情请参见外部表

    • Hologres 2.0版本起下线RPC(用于维表与结果表)、Holohub模式(用于Binlog源表),全面转为jdbc相关模式(目前包括jdbc、jdbc_fixed、jdbc_copy等)。推荐维表和结果表使用jdbc_fixed模式,该模式不占用Hologres实例连接数,详情请参见WITH参数。实时计算VVR 6.0.6以及之前的版本,如果读写Hologres 2.0版本并使用了上述已经下线的两种模式可能抛出异常。

  • 源表独有:

    • Flink默认以批模式读取Hologres源表数据,即只扫描一次Hologres全表,扫描结束,消费结束,新到Hologres源表的数据不会被读取。从VVR 3.0.0版本开始,支持实时消费Hologres数据。关于实时消费Hologres,详情请参见Flink实时消费Hologres。从VVR 6.0.3版本开始,Hologres源表在批模式读取时支持filter下推,详见源表独有参数enable_filter_push_down

    • Hologres CDC模式暂不支持定义Watermark。如果您需要进行窗口聚合,您可以采用非窗口聚合的方式,详情请参见上下游存储

  • 结果表独有:

    无。

  • 维表独有:

    • 创建Hologres维表时建议选择行存模式,列存模式对于点查场景性能开销较大。选择行存模式创建维表时必须设置主键,并且将主键设置为Clustering Key才可以工作。详情请参见建表概述

    • Hologres维表的主键必须是Flink Join On的字段,Flink Join On的字段也必须是维表完整的主键字段,两者必须完全匹配。

    • Flink实时计算引擎为VVR 4.0及以上版本的Hologres Flink连接器支持非主键的维表JOIN,而VVR 4.0以下版本的Hologres Flink连接器仅支持主键的维表JOIN。

语法结构

CREATE TABLE hologres_table (
  name VARCHAR,
  age BIGINT,
  birthday BIGINT,
  PRIMARY KEY (name) NOT ENFORCED
) WITH (
  'connector' = 'hologres',
  'dbname' = '<yourDBName>',
  'tablename' = '<yourTableName>',
  'username' = '<yourUsername>',
  'password' = '<yourPassword>',
  'endpoint' = '<yourEndpoint>'
);

WITH参数

说明

仅Flink实时计算引擎VVR 4.0.11及以上版本支持所有jdbc开头的参数。

  • 通用

    参数

    说明

    数据类型

    是否必填

    默认值

    备注

    connector

    表类型。

    String

    固定值为hologres

    dbname

    数据库名称。

    String

    无。

    tablename

    表名称。

    String

    如果Schema不为Public时,则tablename需要填写为schema.tableName

    username

    用户名,请填写阿里云账号的AccessKey ID。

    String

    您可以登录AccessKey 管理,获取AccessKey ID。

    password

    密码,请填写阿里云账号的AccessKey Secret。

    String

    您可以登录AccessKey 管理,获取AccessKey Secret。

    endpoint

    Hologres服务地址。

    String

    详情请参见访问域名

    jdbcRetryCount

    当连接故障时,写入和查询的重试次数。

    Integer

    10

    无。

    jdbcRetrySleepInitMs

    每次重试的固定等待时间。

    Long

    1000

    实际重试的等待时间的计算公式为jdbcRetrySleepInitMs+retry*jdbcRetrySleepStepMs。单位为毫秒。

    jdbcRetrySleepStepMs

    每次重试的累加等待时间。

    Long

    5000

    实际重试的等待时间的计算公式为jdbcRetrySleepInitMs+retry*jdbcRetrySleepStepMs。单位为毫秒。

    jdbcConnectionMaxIdleMs

    JDBC连接的空闲时间。

    Long

    60000

    超过这个空闲时间,连接就会断开释放掉。单位为毫秒。

    jdbcMetaCacheTTL

    本地缓存TableSchema信息的过期时间。

    Long

    60000

    单位为毫秒。

    jdbcMetaAutoRefreshFactor

    如果缓存的剩余时间小于触发时间,则系统会自动刷新缓存。

    Integer

    4

    缓存的剩余时间计算方法:缓存的剩余时间=缓存的过期时间 - 缓存已经存活的时间。缓存自动刷新后,则从0开始重新计算缓存的存活时间。

    触发时间计算方法:jdbcMetaCacheTTL/jdbcMetaAutoRefreshFactor两个参数的比值。

  • 源表独有

    参数

    说明

    数据类型

    是否必填

    默认值

    备注

    field_delimiter

    导出数据时,不同行之间使用的分隔符。

    String

    "\u0002"

    无。

    binlog

    是否消费Binlog数据。

    Boolean

    false

    参数取值如下:

    • true:消费Binlog数据。

    • false(默认值):不消费Binlog数据。

    sdkMode

    SDK模式。

    String

    holohub

    参数取值如下:

    • holohub(默认值):使用holohub模式的binlog源表。

    • jdbc:使用JDBC模式的binlog源表。

    详情请参见Binlog Source表

    说明
    • 仅实时计算引擎VVR 6.0.3及以上版本支持该参数。

    • 推荐使用jdbc模式,因为Hologres 2.0版本将下线holohub。

    jdbcBinlogSlotName

    JDBC模式的binlog源表的Slot名称。创建方法见下方JDBC模式Binlog源表

    String

    说明

    仅实时计算引擎VVR 6.0.3及以上版本支持该参数。

    binlogMaxRetryTimes

    读取Binlog数据出错后的重试次数。

    Integer

    60

    无。

    binlogRetryIntervalMs

    读取Binlog数据出错后的重试时间间隔。

    Long

    2000

    单位为毫秒。

    binlogBatchReadSize

    批量读取Binlog的数据行数。

    Integer

    16

    无。

    cdcMode

    是否采用CDC模式读取Binlog数据。

    Boolean

    false

    参数取值如下:

    • true:CDC模式读取Binlog数据。

    • false(默认值):非CDC模式读取Binlog数据。

    upsertSource

    源表是否使用upsert类型的Changelog。

    Boolean

    false

    仅在CDC模式下生效。参数取值如下:

    • true:仅支持Upsert类型,包括INSERT、DELETE、和UPDATE_AFTER。

    • false(默认值):支持所有类型,包括INSERT、DELETE、UPDATE_BEFORE和UPDATE_AFTER。

    说明

    如果下游包含回撤算子(例如使用ROW_NUMBER OVER WINDOW去重),则需要设置upsertSource为true,此时源表会以Upsert方式从Hologres中读取数据。

    binlogStartupMode

    Binlog数据消费模式。

    String

    earliestOffset

    参数取值如下:

    • initial:先全量消费数据,再读取Binlog开始增量消费。

    • earliestOffset(默认值):从最早的Binlog开始消费。

    • timestamp:从设置的startTime开始消费Binlog。

    说明

    如果设置了startTime或者在启动界面选择了启动时间,则binlogStartupMode强制使用timestamp模式,其他消费模式不生效,即startTime参数优先级更高。

    说明

    仅实时计算引擎VVR 4.0.13及以上版本,Hologres 0.10及以上版本支持该参数。

    startTime

    启动位点的时间。

    String

    格式为yyyy-MM-dd hh:mm:ss。如果没有设置该参数,且作业没有从State恢复,则从最早的Binlog开始消费Hologres数据。

    jdbcScanFetchSize

    扫描时攒批大小。

    Integer

    256

    无。

    jdbcScanTimeoutSeconds

    扫描操作超时时间。

    Integer

    60

    单位为秒。

    jdbcScanTransactionSessionTimeoutSeconds

    扫描操作所在事务的超时时间。

    Integer

    600秒(0表示不超时)

    对应Hologres的GUC参数idle_in_transaction_session_timeout,详情请参见GUC参数

    说明

    仅实时计算引擎Flink1.13-vvr-4.0.15及以上版本支持该参数。

    enable_filter_push_down

    全量读取阶段是否进行filter下推。

    Boolean

    false

    参数取值如下:

    • false(默认值):不进行filter下推。

    • true:读取全量数据时,将支持的过滤条件下推到Hologres执行。包括非Binlog Source全量读取以及Binlog Source使用全增量一体化消费模式时的全量阶段。

      重要

      实时计算引擎Flink1.15-vvr-6.0.3到Flink1.15-vvr-6.0.5默认会进行filter下推,但如果作业使用了hologres维表,且写入的DML中包含有对维表非主键字段的过滤条件时,维表的filter会被错误的下推,可能导致维表join出现错误结果。因此推荐使用实时计算引擎Flink1.15-vvr-6.0.6及以上版本,并在源表增加此参数来开启过滤条件下推功能。

  • 结果表独有

    参数

    说明

    数据类型

    是否必填

    默认值

    备注

    sdkMode

    SDK模式。

    String

    jdbc

    参数取值如下:

    • jdbc:默认值,表示使用jdbc模式进行写入。

    • jdbc_copy:是否使用fixed copy方式写入。fixed copy是hologres1.3新增的能力,相比通过insert方法(jdbc模式)进行写入,fixed copy方式可以更高的吞吐(因为是流模式),更低的数据延时,更低的客户端内存消耗(因为不攒批)。但此模式暂不支持delete数据。

    • rpc:表示使用rpc模式进行写入,与useRpcMode参数一致。

    • jdbc_fixed:(beta功能,需要hologres引擎版本大于等于1.3)表示使用fixed jdbc方式进行写入,与jdbc模式的区别在于不占用连接数。

    说明
    • 仅实时计算VVR 6.0.3及以上版本支持该参数。

    • Hologres 2.0版本将下线rpc模式,推荐使用jdbc或者jdbc_fixed模式。

    field_delimiter

    Hologres Sink支持将一个STRING字段按照field_delimiter切分成数组导入Hologres。

    String

    "\u0002"

    无。

    useRpcMode

    是否通过RPC方式使用Hologres连接器。

    Boolean

    false

    参数取值如下:

    • true:使用RPC方式使用Hologres连接器。

      通过RPC方式会降低SQL连接数。

    • false(默认值):使用JDBC方式使用Hologres连接器。

      通过JDBC方式会占用SQL连接,导致JDBC链接数增加。

    说明

    Hologres 2.0版本将下线rpc模式,推荐使用sdkMode参数来选择jdbc或者jdbc_fixed模式。

    mutatetype

    数据写入模式。

    String

    insertorignore

    详情请参见流式语义

    partitionrouter

    是否写入分区表。

    Boolean

    false

    无。

    createparttable

    当写入分区表时,是否根据分区值自动创建不存在的分区表。

    Boolean

    false

    如果分区值中存在短划线(-),暂不支持自动创建分区表。

    说明

    请确保分区值不会出现脏数据,否则会创建错误的分区表导致Failover,建议慎用该参数。

    ignoredelete

    是否忽略撤回消息。

    Boolean

    true

    说明

    仅在使用流式语义时生效。

    connectionSize

    单个Flink结果表任务所创建的JDBC连接池大小。

    Integer

    3

    如果作业性能不足,建议您增加连接池大小。连接池大小和数据吞吐成正比。

    jdbcWriteBatchSize

    JDBC模式,Hologres Sink节点数据攒批条数(不是来一条数据处理一条,而是攒一批再处理)的最大值。

    Integer

    256

    单位为数据行数。

    说明

    jdbcWriteBatchSizejdbcWriteBatchByteSizejdbcWriteFlushInterval三者之间为或的关系。如果同时设置了这三个参数,则满足其中一个,就进行写入结果数据。

    jdbcWriteBatchByteSize

    JDBC模式,Hologres Sink节点数据攒批字节数(不是来一条数据处理一条,而是攒一批再处理)的最大值。

    Long

    2097152(2*1024*1024)字节,即2 MB

    说明

    jdbcWriteBatchSizejdbcWriteBatchByteSizejdbcWriteFlushInterval三者之间为或的关系。如果同时设置了这三个参数,则满足其中一个,就进行写入结果数据。

    jdbcWriteFlushInterval

    JDBC模式,Hologres Sink节点数据攒批写入Hologres的最长等待时间。

    Long

    10000

    单位为毫秒。

    说明

    jdbcWriteBatchSizejdbcWriteBatchByteSizejdbcWriteFlushInterval三者之间为或的关系。如果同时设置了这三个参数,则满足其中一个,就进行写入结果数据。

    ignoreNullWhenUpdate

    mutatetype='insertOrUpdate'时,是否忽略更新写入数据中的Null值。

    Boolean

    false

    取值说明如下:

    • false(默认值):将Null值写到Hologres结果表里。

    • true:忽略更新写入数据中的Null值。

    说明

    仅Flink计算引擎VVR 4.0及以上版本支持该参数。

    connectionPoolName

    连接池名称。同一个TaskManager中,配置相同名称的连接池的表可以共享连接池。

    String

    每个表默认使用自己的连接池。如果多个表设置相同的连接池,则这些使用相同连接池的表的connectorSize都需要相同。

    说明

    仅实时计算VVR 4.0.12及以上版本支持该参数。

    jdbcEnableDefaultForNotNullColumn

    Hologres表中Not Null且没有设置默认值的字段,是否允许写入Null值。

    Boolean

    true

    参数取值如下:

    • true(默认值):允许写入Null值。

      如果将Null值写入Hologres表中Not Null且无默认值的字段,则按照下列类型写入默认值:

      • 如果字段是String类型,则默认写为空(“”)。

      • 如果字段是Number类型,则默认写为0。

      • 如果字段是以下任意时间类型:

        • Date

        • timestamp

        • timestamptz

        则默认写为1970-01-01 00:00:00

    • false:不允许写入Null值。

  • 维表独有

    参数

    说明

    数据类型

    是否必填

    默认值

    备注

    sdkMode

    SDK模式。

    String

    jdbc

    参数取值如下:

    • jdbc(默认值):表示使用jdbc模式进行写入。

    • rpc:表示使用rpc模式进行写入,与useRpcMode参数一致。

    • jdbc_fixed:(beta功能,需要hologres引擎版本大于等于1.3)表示使用fixed jdbc方式进行写入,与jdbc模式的区别在于不占用连接数。

    说明
    • 仅实时计算VVR 6.0.3及以上版本支持该参数。

    • Hologres 2.0版本将下线rpc模式,推荐使用jdbc或者jdbc_fixed模式。

    • 实时计算VVR 6.0.6版本,jdbc模式维表字符串类型的查询结果中包含null值时,可能抛出空指针异常。此时推荐使用rpc模式绕过。

    useRpcMode

    是否通过RPC方式使用Hologres连接器。

    Boolean

    false

    参数取值如下:

    • true:使用RPC方式使用Hologres连接器。

      通过RPC方式会降低SQL连接数。

    • false(默认值):使用JDBC方式使用Hologres连接器。

      通过JDBC方式会占用SQL连接,导致JDBC链接数增加。

    connectionSize

    单个Flink维表任务所创建的JDBC连接池大小。

    Integer

    3

    如果作业性能不足,建议您增加连接池大小。连接池大小和数据吞吐成正比。

    connectionPoolName

    连接池名称。同一个TaskManager中,配置相同名称的连接池的表可以共享连接池。

    String

    每个表默认使用自己的连接池。如果多个表设置相同的连接池,则这些使用相同连接池的表的connectorSize都需要相同。

    说明

    仅实时计算VVR 4.0.12及以上版本支持该参数。

    jdbcReadBatchSize

    点查Hologres维表时,攒批处理的最大条数。

    Integer

    128

    无。

    jdbcReadBatchQueueSize

    维表点查请求缓冲队列大小。

    Integer

    256

    无。

    jdbcReadTimeoutMs

    维表点查的超时时间。

    Long

    默认值为0,表示不会超时

    仅vvr 4.0.15-flink 1.13及以上版本、vvr 6.0.2-flink 1.15及以上版本支持该参数。

    jdbcReadRetryCount

    维表点查超时时的重试次数。

    Interger

    1

    • 仅VVR 6.0.3以上版本支持该参数。

    • 本参数与jdbcRetryCount不同,后者是指连接发生异常时的重试次数。

    • 只有设置了jdbcReadTimeoutMs时本参数才会生效。

    jdbcScanFetchSize

    在一对多join(即没有使用完整主键)时使用scan接口,scan攒批处理数据的条数。

    Integer

    256

    无。

    jdbcScanTimeoutSeconds

    scan操作的超时时间。

    Integer

    60

    单位为秒。

    cache

    缓存策略。

    String

    • VVR 4.x版本及以上版本:None

    • VVR 4.x版本以下版本:LRU

    Hologres仅支持以下两种缓存策略:

    • LRU:缓存维表里的部分数据。源表的每条数据都会触发系统先在Cache中查找数据,如果没有找到,则去物理维表中查找。

      需要配置相关参数:缓存大小(cacheSize)和缓存更新时间间隔(cacheTTLMs)。

    • None:无缓存。

    cacheSize

    缓存大小。

    Integer

    10000

    选择LRU缓存策略后,可以设置缓存大小。单位为条。

    cacheTTLMs

    缓存更新时间间隔。

    Long

    cacheTTLMs默认值和cache的配置有关:

    • 如果cache配置为LRU,则cacheTTLMs为缓存超时时间。默认不过期。

    • 如果cache配置为None,则cacheTTLMs可以不配置,表示缓存不超时。

    无。

    cacheEmpty

    是否缓存join结果为空的数据。

    Boolean

    true

    • true(默认值):表示缓存join结果为空的数据。

    • false:表示不缓存join结果为空的数据。

    async

    是否异步同步数据。

    Boolean

    false

    • true:表示异步同步数据。

    • false(默认值):表示不进行异步同步数据。

    说明

    异步同步数据默认是无序的。

类型映射

Flink与Hologres的数据类型映射请参见Blink/Flink与Hologres的数据类型映射

使用示例

源表示例

  • 非Binlog Source表

    CREATE TEMPORARY TABLE hologres_source (
      name varchar, 
      age BIGINT,
      birthday BIGINT
    ) WITH (
      'connector'='hologres',
      'dbname'='<yourDbname>',
      'tablename'='<yourTablename>',
      'username'='<yourAccessID>',
      'password'='<yourAccessSecret>',
      'endpoint'='<yourEndpoint>',
      'field_delimiter'='|' --该参数可选。
    );
    
    CREATE TEMPORARY TABLE blackhole_sink(
      name varchar,
      age BIGINT,
      birthday BIGINT 
    ) WITH (
      'connector'='blackhole'
    );
    
    INSERT INTO blackhole_sink SELECT 
       name, age, birthday
    from hologres_source;
  • Binlog Source表

    Hologres连接器支持实时消费Hologres。实时消费Hologres,即实时消费Hologres的Binlog数据。以下为您介绍Flink实时消费Hologres的详情:

    • 使用限制

      • Hologres 0.10及以下版本,已存在的表无法修改表属性开启Binlog,需要重新建表。Hologres V1.1及以上版本,可以根据业务需要选择开启或关闭Binlog能力,同时支持配置TTL满足不同业务场景对Binlog保留时间的诉求,详情请参见订阅Hologres Binlog

      • 不支持开启分区表父表的Binlog,请使用非分区表。

      • 暂不支持实时消费TIMESTAMP类型的数据,因此创建Hologres表时,请使用TIMESTAMPTZ类型。

      • 开启Binlog后,理论上列存表的开销要大于行存表的开销。因此,对于数据更新频繁的场景,建议使用存储格式为行存的表开启Binlog。

      • 默认的Binlog源表不支持数组类型,仅支持以下数据类型:

        • INTEGER

        • BIGINT

        • TEXT

        • REAL

        • DOUBLE PRECISION

        • BOOLEAN

        • NUMERIC(38,8)

        • TIMESTAMPTZ

        说明

        对不支持的数据类型(例如SMALLINT),即使不消费此字段,仍然可能导致作业无法上线。

      • 实时计算引擎VVR 6.0.3及以上版本新增JDBC模式Binlog源表。相比原有Holohub模式,支持更完善的数据类型,如SMALLINT,数组类型等,同时也支持了自定义用户(非RAM用户)。详见下方JDBC模式Binlog源表

      • Hologres 1.3.41版本开始,JDBC模式Binlog源表新支持读取JSONB类型,但需要数据库级别开启GUC,开启GUC的命令如下。

        --db级别开启GUC,仅superuser可以执行,每个db只需要设置一次。
        alter database <db_name> set hg_experimental_enable_binlog_jsonb = on;
      • Hologres 2.0版本起下线Holohub模式,全面转为jdbc模式。

    • 开启Binlog

      实时消费功能默认关闭,因此在Hologres控制台上创建表的DDL时,需要设置binlog.levelbinlog.ttl参数,示例如下。

      begin;
      CREATE TABLE test_message_src(
       id int primary key, 
       title text not null, 
       body text
      );
      call set_table_property('test_message_src', 'orientation', 'row');
      call set_table_property('test_message_src', 'clustering_key', 'id');
      call set_table_property('test_message_src', 'binlog.level', 'replica'); --自Hologres 1.1版本起,可以在建表后开启Binlog。
      call set_table_property('test_message_src', 'binlog.ttl', '86400'); 
      commit;

      其中,binlog.level设置为replica即代表开启Binlog,binlog.ttl为Binlog的TTL,单位为秒。

    • 注意事项

      • UPDATE操作会产生两条Binlog记录,分别为更新操作前和操作后的数据记录,因此您会消费到两条数据。但是,Hologres Binlog功能会保证这两条记录是连续的且更新前的Binlog记录在前,更新后的Binlog记录在后。

      • 建议Flink作业并发数和Hologres Table的Shard个数保持一致。

        您可以在Hologres控制台上,使用以下语句查看Table的Shard数,其中tablename为您的业务表名称。

        select tg.property_value from hologres.hg_table_properties tb join hologres.hg_table_group_properties tg on tb.property_value = tg.tablegroup_name where tb.property_key = 'table_group' and tg.property_key = 'shard_count' and table_name = '<tablename>';
    • 消费模式

      • 非CDC模式

        该模式下Source消费的Binlog数据是作为普通的Flink数据传递给下游节点的,即所有数据都是INSERT类型的数据,您可以根据业务情况选择如何处理特定hg_binlog_event_type类型的数据。源表DDL代码示例如下。

        CREATE TABLE test_message_src_binlog_table(
          hg_binlog_lsn BIGINT,
          hg_binlog_event_type BIGINT,
          hg_binlog_timestamp_us BIGINT,
          id INTEGER,
          title VARCHAR,
          body VARCHAR
        ) WITH (
          'connector'='hologres',
          'dbname'='<yourDbname>',
          'tablename'='<yourTablename>',
          'username'='<yourAccessID>',
          'password'='<yourAccessSecret>',
          'endpoint'='<yourEndpoint>',
          'binlog' = 'true',
          'binlogMaxRetryTimes' = '10',
          'binlogRetryIntervalMs' = '500',
          'binlogBatchReadSize' = '100'
        );
      • CDC模式

        该模式下Source消费的Binlog数据,将根据hg_binlog_event_type自动为每行数据设置准确的Flink RowKind类型,例如,INSERT、DELETE、UPDATE_BEFORE和UPDATE_AFTER类型,这样就能完成表的数据的镜像同步,类似MySQL或Postgres的CDC功能。源表DDL代码示例如下。

        CREATE TABLE test_message_src_binlog_table(
          id INTEGER,
          title VARCHAR,
          body VARCHAR
        ) WITH (
          'connector'='hologres',
          'dbname'='<yourDbname>',
          'tablename'='<yourTablename>',
          'username'='<yourAccessID>',
          'password'='<yourAccessSecret>',
          'endpoint'='<yourEndpoint>',
          'binlog' = 'true',
          'cdcMode' = 'true',
          'binlogMaxRetryTimes' = '10',
          'binlogRetryIntervalMs' = '500',
          'binlogBatchReadSize' = '100'
        );
    • 全增量一体源表消费:

      • 背景信息

        在源表Join维表时,由于Binlog的TTL等原因,会导致无法使用源表的所有数据。原解决方案是为Binlog表设置一个很大的TTL,但这样会有以下问题:

        • 历史Binlog数据会被长时间保存,导致占用较多的存储资源。

        • 因为Binlog包含数据更新记录,使用Binlog进行全量消费会消费一些不必要的数据,导致占用较多的计算资源,且无法让用户只关注最新的数据。

        从VVR 4.0.13及以上版本,Hologres 0.10及以上版本,Hologres Binlog CDC源表支持全增量一体的消费,这种方式会先读取数据库的历史全量数据,并平滑切换到Binlog读取增量数据。采用这种方式,可以解决上述问题。

      • 适用场景

        • 适用于历史数据不包含Binlog,但又希望消费所有数据的场景。

        • 仅适用于目标表有主键的场景,推荐在CDC模式下使用的全增量Hologres源表。

        • Hologres1.1版本之后,支持按需开启Binlog,可以将已有历史数据的表打开Binlog。

      • 代码示例

        CREATE TABLE test_message_src_binlog_table(
          hg_binlog_lsn BIGINT,
          hg_binlog_event_type BIGINT,
          hg_binlog_timestamp_us BIGINT,
          id INTEGER,
          title VARCHAR,
          body VARCHAR
        ) WITH (
          'connector'='hologres',
          'dbname'='<yourDbname>',
          'tablename'='<yourTablename>',
          'username'='<yourAccessID>',
          'password'='<yourAccessSecret>',
          'endpoint'='<yourEndpoint>',
          'binlog' = 'true',
          'cdcMode' = 'true',
          'binlogStartUpMode' = 'initial', --先读取历史全量数据,再增量消费Binlog。
          'binlogMaxRetryTimes' = '10',
          'binlogRetryIntervalMs' = '500',
          'binlogBatchReadSize' = '100'
          );
    • JDBC模式Binlog源表

      • 实时计算引擎VVR 6.0.3版本开始,binlog源表新增JDBC模式(不同于CDC等消费模式,此处的JDBC模式是指底层获取binlog的SDK基于JDBC)。相比原有Holohub模式,支持更多的数据类型,包括:SMALLINT、INTEGER、BIGINT、TEXT、REAL、DOUBLE PRECISION、BOOLEAN、NUMERIC、DATE、TIME、TIMETZ、TIMESTAMP、TIMESTAMPTZ、BYTEA、JSON、int4[]、int8[]、float4[]、float8[]、boolean[]、text[]、JSONB(需要Hologres版本大于1.3.41且开启相应GUC,详见上方Binlog Source源表使用限制)。同时,JDBC模式的Binlog源表支持Hologres的自定义用户(非RAM用户)。

      • 使用方式与普通的binlog源表类似,但需要设置sdkMode为jdbc,示例如下:

        create TEMPORARY table test_message_src_binlog_table(
          id INTEGER,
          title VARCHAR,
          body VARCHAR
        ) with (
          'connector'='hologres',
          'dbname'='<yourDbname>',
          'tablename'='<yourTablename>',
          'username'='<yourAccessID>',
          'password'='<yourAccessSecret>',
          'endpoint'='<yourEndpoint>',
          'binlog' = 'true',
          'cdcMode' = 'true',
          'sdkMode'='jdbc', --使用jdbc模式的binlog源表
          'jdbcBinlogSlotName'='replication_slot_name' --jdbc模式的binlog源表需要设置slot name
        );
        
      • 实时计算引擎VVR 6.0.3到VVR 6.0.5版本使用JDBC模式消费binlog需要一些准备工作,包括创建publication、replication_slot、源表的DDL中必须设置jdbcBinlogSlotName参数。相关概念以及执行所需权限的详情可以参见通过JDBC消费Hologres Binlog。具体的SQL如下,需要在hologres中执行。

        --创建extension, 为db级别开启,仅superuser可以执行,每个db只需要设置一次,Hologres 2.0版本开始默认开启。
        create extension if not exists hg_binlog;
        
        --创建publication,对应具体的表。进行此操作需要目标表的Owner权限。
        create publication <publication_name> for table <table_name>;
        
        --创建logical_replication_slot,对应具体的publication。进行此操作需要实例的Superuser或Replication Role权限。
        call hg_create_logical_replication_slot('<replication_slot_name>', 'hgoutput', '<publication_name>');
      • 实时计算引擎VVR 6.0.6版本开始,可以选择不设置jdbcBinlogSlotName,Hologres连接器会创建默认的slot并使用。但默认创建slot需要一定的前提条件,要求用户为实例的Superuser,或用户同时拥有目标表的Owner权限和实例的Replication Role权限。相关概念以及授权操作的详情可以参见通过JDBC消费Hologres Binlog

    • Hologres Binlog实现原理

      一条Binlog的字段由Binlog系统字段和用户Table字段组成,字段定义如下:

      字段名

      字段类型

      说明

      hg_binlog_lsn

      BIGINT

      Binlog系统字段,表示Binlog序号,Shard内部单调递增不保证连续,不同Shard之间不保证唯一和有序。

      hg_binlog_event_type

      BIGINT

      Binlog系统字段,表示当前记录所表示的修改类型,参数取值如下:

      • INSERT=5:表示当前Binlog为插入一条新的记录。

      • DELETE=2:表示当前Binlog为删除一条已有的记录。

      • BEFORE_UPDATE=3:表示当前Binlog为更新操作前的记录。

      • AFTER_UPDATE=7:表示当前Binlog为更新操作后的记录。

      hg_binlog_timestamp_us

      BIGINT

      Binlog系统字段,系统时间戳,单位为微秒。

      user_table_column_1

      用户定义

      用户的表字段。

      ...

      ...

      用户的表字段。

      user_table_column_n

      用户定义

      用户的表字段。

结果表示例

CREATE TEMPORARY TABLE datagen_source(
  name varchar, 
  age BIGINT,
  birthday BIGINT
) WITH (
  'connector'='datagen'
);

CREATE TEMPORARY TABLE hologres_sink (
  name varchar, 
  age BIGINT,
  birthday BIGINT
) WITH (
  'connector'='hologres',           
  'dbname'='<yourDbname>',
  'tablename'='<yourTablename>',
  'username'='<yourAccessID>',
  'password'='<yourAccessSecret>',
  'endpoint'='<yourEndpoint>'
);

INSERT INTO hologres_sink SELECT * from datagen_source;

维表示例

CREATE TEMPORARY TABLE datagen_source (
  a INT,
  b BIGINT,
  c STRING,
  proctime AS PROCTIME()
) WITH (
  'connector' = 'datagen'
);

CREATE TEMPORARY TABLE hologres_dim (
  a INT, 
  b VARCHAR, 
  c VARCHAR
) WITH (
  'connector' = 'hologres',
   ...
);

CREATE TEMPORARY TABLE blackhole_sink (
  a INT,
  b STRING
) WITH (
  'connector' = 'blackhole'
);

INSERT INTO blackhole_sink SELECT T.a,H.b
FROM datagen_source AS T JOIN hologres_dim FOR SYSTEM_TIME AS OF T.proctime AS H ON T.a = H.a;

特色功能详解

流式语义

流处理,也称为流数据或流事件处理,即对一系列无界数据或事件连续处理。执行流数据或流事件处理的系统通常允许您指定一种可靠性模式或处理语义,保证整个系统处理数据的准确性,因为网络或设备故障等可能会导致数据丢失。

根据Hologres Sink的配置和Hologres表的属性,流式语义分为以下两种:

  • Exactly-once(仅一次):即使在发生各种故障的情况下,系统只处理一次数据或事件。

  • At-least-once(至少一次):如果在系统完全处理之前丢失了数据或事件,则从源头重新传输,因此可以多次处理数据或事件。如果第一次重试成功,则不必进行后续重试。

在Hologres结果表中使用流式语义,您需要注意以下几点:

  • 如果Hologres物理表未设置主键,则Hologres Sink使用At-least-once语义。

  • 如果Hologres物理表已设置主键,则Hologres Sink通过主键确保Exactly-once语义。当同主键数据出现多次时,您需要设置mutatetype参数确定更新结果表的方式,mutatetype取值如下:

    • insertorignore(默认值):保留首次出现的数据,忽略后续所有数据。

    • insertorreplace:后出现的数据整行替换已有数据。

    • insertorupdate:更新已有数据的部分列。例如一张表有a、b、c和d四个字段,a是PK(Primary Key),写入Hologres时只写入a和b两个字段,在PK重复的情况下,系统只会更新b字段,c和d保持不变。

    说明
    • mutatetype设置为insertorreplaceinsertorupdate时,系统根据主键更新数据。

    • Flink定义的结果表中的数据列数不一定要和Hologres物理表的列数一致,您需要保证缺失的列没有非空约束,即列值可以为Null,否则会报错。

  • 默认情况下,Hologres Sink只能向一张表导入数据。如果导入数据至分区表的父表,即使导入成功,也会查询数据失败。您可以设置参数partitionRouter为true,开启自动将数据路由到对应分区表的功能。注意事项如下:

    • tablename参数需要填写为父表的表名。

    • 如果没有提前创建分区表,需要在WITH参数中启用createparttable=true,从而支持自动创建分区表,否则会导入失败。

宽表Merge和局部更新功能

在把多个流的数据写到一张Hologres宽表的场景中,会涉及到宽表Merge和数据的局部更新。下面通过示例来介绍如何设置。

假设有两个Flink数据流,一个数据流中包含A、B和C字段,另一个数据流中包含A、D和E字段,Hologres宽表WIDE_TABLE包含A、B、C、D和E字段,其中A字段为主键。具体操作如下:

  1. 使用Flink SQL创建两张Hologres结果表,其中一张表只声明A、B和C字段,另一张表只声明A、D和E字段。这两张表都映射至宽表WIDE_TABLE。

  2. 两张结果表的属性设置:

    • mutatetype设置为insertorupdate,可以根据主键更新数据。

    • ignoredelete设置为true,防止回撤消息产生Delete请求。

  3. 将两个Flink数据流的数据分别INSERT至对应的结果表中。

说明

在上述场景中,有如下限制:

  • 宽表必须有主键。

  • 每个数据流的数据都必须包含完整的主键字段。

  • 列存模式的宽表Merge场景在高RPS的情况下,CPU使用率会偏高,建议关闭表中字段的Dictionary Encoding功能。

作为CTAS和CDAS的目标端

Hologres支持实时同步单表或整库级别的数据,在同步过程之中如果上游的表结构发生了变更也会实时同步到Hologres表中。

当上游需要同步的表发生了表结构变更,随着新数据流到Hologres表时,Flink会先触发Hologres修改相应的表结构,然后再将数据写入到相应的表中。以上过程,全部由Flink自动完成,您不需要关心实现细节。详情请参见CREATE TABLE AS(CTAS)语句CREATE DATABASE AS(CDAS)语句数据库实时入仓快速入门

DataStream API

重要

通过DataStream的方式读写数据时,则需要使用对应的DataStream连接器连接Flink全托管,DataStream连接器设置方法请参见DataStream连接器使用方法。Maven中央库中已经放置了Hologres DataStream连接器

  • Hologres源表

    VVR提供了RichInputFormat的实现类HologresBulkreadInputFormat来读取Hologres表数据。以下为构建Hologres Source读取表数据的示例。

    // 初始化读取的表的Schema,需要和Hologres表Schema的字段匹配,可以只定义部分字段。
    TableSchema schema = TableSchema.builder()
        .field("a", DataTypes.INT())
        .build();
    // Hologres的相关参数。
    Configuration config = new Configuration();
    config.setString(HologresConfigs.ENDPOINT, "yourEndpoint");
    config.setString(HologresConfigs.USERNAME, "yourUserName");
    config.setString(HologresConfigs.PASSWORD, "yourPassword");
    config.setString(HologresConfigs.DATABASE, "yourDatabaseName");
    config.setString(HologresConfigs.TABLE, "yourTableName");
    // 构建JDBC Options。
    JDBCOptions jdbcOptions = JDBCUtils.getJDBCOptions(config);
    String query = JDBCUtils.getSimpleSelectFromStatement(
        jdbcOptions.getTable(), schema.getFieldNames());
    
    // 构建HologresBulkreadInputFormat读取表数据。
    HologresBulkreadInputFormat inputFormat = new HologresBulkreadInputFormat(jdbcOptions, schema, query);
    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    TypeInformation<RowData> typeInfo = InternalTypeInfo.of(schema.toRowDataType().getLogicalType());
    env.addSource(new InputFormatSourceFunction<>(inputFormat, typeInfo)).returns(typeInfo)
        .print();
    env.execute();
  • Hologres Binlog源表

    VVR提供了Source的实现类HologresBinlogSource来读取Hologres Binlog数据。以下为构建Hologres Binlog Source的示例。

    // 初始化读取的表的Schema,需要和Hologres表Schema的字段匹配,可以只定义部分字段。
    TableSchema schema = TableSchema.builder()
        .field("a", DataTypes.INT())
        .build();
    
    // Hologres的相关参数。
    Configuration config = new Configuration();
    config.setString(HologresConfigs.ENDPOINT, "yourEndpoint");
    config.setString(HologresConfigs.USERNAME, "yourUserName");
    config.setString(HologresConfigs.PASSWORD, "yourPassword");
    config.setString(HologresConfigs.DATABASE, "yourDatabaseName");
    config.setString(HologresConfigs.TABLE, "yourTableName");
    config.setBoolean(HologresBinlogConfigs.BINLOG_CDC_MODE, true);
    // 构建JDBC Options。
    JDBCOptions jdbcOptions = JDBCUtils.getJDBCOptions(config);
    jdbcOptions.setHolohubEndpoint(JDBCUtils.getHolohubEndpoint(jdbcOptions));
    RowDataRecordConverter recordConverter = buildRecordConverter(schema, config, jdbcOptions);
    // 构建Hologres Binlog Source。
    long startTimeMs = 0;
    HologresBinlogSource<RowData> source = new HologresBinlogSource<>(
        schema,
        config,
        jdbcOptions,
        recordConverter,
        startTimeMs);
    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    env.fromSource(source, WatermarkStrategy.noWatermarks(), "Test source").print();
    env.execute();
    说明
    • 方法buildRecordConverter不在VVR Connector依赖中,是示例代码中提供的方法。

    • Hologres Binlog注意事项和实现原理等详情,请参见Binlog Source表

  • Hologres结果表

    VVR提供了OutputFormatSinkFunction的实现类HologresSinkFunction来写入数据。以下为构建Hologres Sink的示例。

    // 初始化读取的表的Schema。
    TableSchema schema = TableSchema.builder()
        .field("a", DataTypes.INT())
      .field("b", DataTypes.STRING())
        .build();
    // Hologres的相关参数。
    Configuration config = new Configuration();
    config.setString(HologresConfigs.ENDPOINT, "yourEndpoint");
    config.setString(HologresConfigs.USERNAME, "yourUserName");
    config.setString(HologresConfigs.PASSWORD, "yourPassword");
    config.setString(HologresConfigs.DATABASE, "yourDatabaseName");
    config.setString(HologresConfigs.TABLE, "yourTableName");
    config.setBoolean(HologresConfigs.USE_RPC_MODE, true);
    HologresConnectionParam hologresConnectionParam = new HologresConnectionParam(config);
    // 构建Hologres Writer,以RowData的方式写入数据。
    AbstractHologresWriter<RowData> hologresWriter =
        buildHologresWriter(schema, config, hologresConnectionParam);
    // 构建Hologres Sink。
    HologresSinkFunction sinkFunction = new HologresSinkFunction(hologresConnectionParam, hologresWriter);
    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    TypeInformation<RowData> typeInfo = InternalTypeInfo.of(schema.toRowDataType().getLogicalType());
    int offset = (int) (System.currentTimeMillis() % Integer.MAX_VALUE);
    env.fromElements((RowData)GenericRowData.of(2 + offset, StringData.fromString("2")), GenericRowData.of(3 + offset, StringData.fromString("3"))).returns(typeInfo)
        .addSink(sinkFunction);
    env.execute();
    说明

    方法buildHologresWriter不在VVR Connector依赖中,是示例代码中提供的方法。