本文为您介绍Hologres维表DDL定义、WITH参数、CACHE参数和代码示例。

说明 Hologres Connector不支持访问Hologres外部表。Hologres外部表详情请参见外部表

什么是Hologres

Hologres兼容PostgreSQL协议,与大数据生态紧密连接,支持高并发、低延时实时分析处理PB级数据,让您轻松使用现有BI(Business Intelligence)工具对数据进行多维分析和业务探索。

前提条件

已创建Hologres表,详情请参见

使用限制

  • 仅Flink计算引擎VVR 2.0.0及以上版本支持Hologres Connector。
  • 创建Hologres维表时建议选择行存模式,列存模式对于点查场景性能开销较大。
    选择行存模式创建维表时必须设置主键,并且将主键设置为clustering key才可以工作。示例语句如下:
    begin;
    create table test(a int primary key, b text, c text, d float8, e int8);
    call set_table_property('test', 'orientation', 'row');
    call set_table_property('test', 'clustering_key', 'a');
    commit;
  • Hologres维表的主键必须是Flink Join On的字段,Flink Join On的字段也必须是维表完整的主键字段,两者必须完全匹配。
  • Flink计算引擎为VVR 4.0及以上版本的Hologres Flink Connector支持非主键的维表JOIN,而VVR 4.0以下版本的Hologres Flink Connector仅支持主键的维表JOIN。

DDL定义

CREATE TABLE hologres_dim(
  id INT,
  len INT,
  content VARCHAR
) WITH (
  'connector'='hologres',
  'dbname'='<yourDbname>',
  'tablename'='<yourTablename>',
  'username'='<yourUsername>',
  'password'='<yourPassword>',
  'endpoint'='<yourEndpoint>'
);

WITH参数

说明 仅实时计算引擎Flink 1.13-VVR 4.0.11及以上版本支持所有jdbc开头的参数。
参数 说明 是否必填 备注
connector 维表类型。 固定值为hologres(小写)。
endPoint Hologres服务地址。 详情请参见访问域名
tablename 表名称。
说明 如果Schema不为Public时,则tableName需要填写为schema.tableName
无。
dbname 数据库名称。 无。
username 用户名,请填写阿里云账号的AccessKey ID。 无。
password 密码,请填写阿里云账号的AccessKey Secret。 无。
useRpcMode 是否通过RPC方式使用Hologres Connector。 参数取值如下:
  • true:使用RPC方式使用Hologres Connector。

    通过RPC方式会降低SQL连接数。

  • false(默认值):使用JDBC方式使用Hologres Connector。

    通过JDBC方式会占用SQL连接,导致JDBC链接数增加。

connectionSize 单个Flink维表任务所创建的JDBC连接池大小。

如果作业性能不足,建议您增加连接池大小。连接池大小和数据吞吐成正比。

默认值为3,单位为个。
connectionPoolName 连接池名称。同一个TaskManager中,配置相同名称的连接池的表可以共享连接池。 每个表默认使用自己的连接池。如果多个表设置相同的连接池,则这些使用相同连接池的表的connectorSize都需要相同。
说明 仅实时计算Flink 1.13-VVR 4.0.12及以上版本支持该参数。
jdbcReadBatchSize 点查Hologres维表时,攒批处理的最大条数。 默认值为128。
jdbcReadBatchQueueSize 维表点查请求缓冲队列大小。 默认值为256。
jdbcReadTimeoutMs 维表点查的超时时间。 默认值为0,表示不会超时。
说明 仅Flink 1.13-vvr-4.0.15及以上版本,Flink 1.15-vvr-6.0.2及以上版本支持该参数。
jdbcScanFetchSize 在一对多join(即没有使用完整主键)时使用scan接口,scan攒批处理数据的条数。 默认值为256。
jdbcScanTimeoutSeconds scan操作的超时时间。 默认值为60秒。
jdbcRetryCount 当连接故障时,写入和查询的重试次数。 默认值为10。
jdbcRetrySleepInitMs 每次重试的固定等待时间。

实际重试的等待时间的计算公式为retrySleepInitMs+retry*retrySleepStepMs

默认值为1000毫秒。
jdbcRetrySleepStepMs 每次重试的累加等待时间。

实际重试的等待时间的计算公式为retrySleepInitMs+retry*retrySleepStepMs

默认值为5000毫秒。
jdbcConnectionMaxIdleMs JDBC连接的空闲时间。

超过这个空闲时间,连接就会断开释放掉。

默认值为60000毫秒。
jdbcMetaCacheTTL 本地缓存TableSchema信息的过期时间。 默认值为60000毫秒。
jdbcMetaAutoRefreshFactor 如果Cache的剩余时间小于触发时间(jdbcMetaCacheTTL/jdbcMetaAutoRefreshFactor),则系统会自动刷新Cache。 默认值为4。

Cache的剩余时间=Cache的过期时间 - Cache已经存活的时间。

Cache自动刷新后,则从0开始重新计算Cache的存活时间。

CACHE参数

参数 说明 是否必填 备注
cache 缓存策略。 Hologres仅支持以下两种缓存策略:
  • LRU:缓存维表里的部分数据。源表的每条数据都会触发系统先在Cache中查找数据,如果没有找到,则去物理维表中查找。

    需要配置相关参数:缓存大小(cacheSize)和缓存更新时间间隔(cacheTTLMs)。

  • None:无缓存。
说明 cache参数默认值情况如下:
  • VVR 4.x版本及以上版本:None。
  • VVR 4.x版本以下版本:LRU。
cacheSize 缓存大小。 选择LRU缓存策略后,可以设置缓存大小,默认值为10000行。
cacheTTLMs 缓存更新时间间隔。 cacheTTLMs配置和cache有关:
  • 如果cache配置为LRU,则cacheTTLMs为缓存超时时间。默认不过期。
  • 如果cache配置为None,则cacheTTLMs可以不配置,表示缓存不超时。
async 是否异步同步数据。 取值如下:
  • true:表示异步同步数据。
  • false(默认值):表示不进行异步同步数据。
说明 异步同步数据默认是无序的。

类型映射

Hologres字段类型 Flink字段类型
INT INT
INT[] ARRAY<INT>
BIGINT BIGINT
BIGINT[] ARRAY<BIGINT>
REAL FLOAT
REAL[] ARRAY<REAL>
DOUBLE PRECISION DOUBLE
DOUBLE PRECISION[] ARRAY<DOUBLE>
BOOLEAN BOOLEAN
BOOLEAN[] ARRAY<BOOLEAN>
TEXT VARCHAR
TEXT[] ARRAY<TEXT>
NUMERIC DECIMAL
DATE DATE
TIMESTAMP WITH TIMEZONE TIMESTAMP
说明 上表中未列出的Hologres类型,Connector还未支持转换。

代码示例

CREATE TEMPORARY TABLE datagen_source (
  a INT,
  b BIGINT,
  c STRING,
  proctime AS PROCTIME()
) WITH (
  'connector' = 'datagen'
);

CREATE TEMPORARY TABLE hologres_dim (
  a INT, 
  b VARCHAR, 
  c VARCHAR
) WITH (
  'connector' = 'hologres',
   ...
);

CREATE TEMPORARY TABLE blackhole_sink (
  a INT,
  b STRING
) WITH (
  'connector' = 'blackhole'
);

insert into blackhole_sink select T.a,H.b
FROM datagen_source AS T JOIN hologres_dim FOR SYSTEM_TIME AS OF T.proctime AS H ON T.a = H.a;