本文为您介绍如何使用实时数仓Hologres连接器。
背景信息
实时数仓Hologres是一站式实时数据仓库引擎,支持海量数据实时写入、实时更新、实时分析,支持标准SQL(兼容PostgreSQL协议),支持PB级数据多维分析(OLAP)与即席分析(Ad Hoc),支持高并发低延迟的在线数据服务(Serving),与MaxCompute、Flink、DataWorks深度融合,提供离在线一体化全栈数仓解决方案。Hologres连接器支持的信息如下。
类别 | 详情 |
支持类型 | 源表、维表和结果表 |
运行模式 | 流模式和批模式 |
数据格式 | 暂不支持 |
特有监控指标 |
|
API种类 | Datastream和SQL |
是否支持更新或删除结果表数据 | 是 |
特色功能
源表
功能 | 详情 |
实时消费Hologres |
获取更多信息,详情请参见Flink/Blink实时消费Hologres Binlog。 |
结果表
功能 | 详情 |
支持写入Changelog消息。 | |
只更新修改部分的数据,而非整行更新。 | |
支持实时同步单表、整库的数据以及相应的表结构变更到Hologres表中。 | |
插入部分列 说明 仅实时计算引擎VVR 6.0.7及以上版本支持。 | 支持将Flink INSERT DML中指定的列名下推给连接器,从而仅更新指定的列。 |
前提条件
已创建Hologres表,详情请参见创建Hologres表。
使用限制
通用:
仅Flink计算引擎VVR 2.0.0及以上版本支持Hologres连接器。
Hologres连接器不支持访问Hologres外部表。关于Hologres外部表详情请参见创建Hologres外部表(映射到MaxCompute)。
连接器目前的已知缺陷以及版本功能发布记录详见Hologres Connector Release Note。
源表独有:
Flink默认以批模式读取Hologres源表数据,即只扫描一次Hologres全表,扫描结束,消费结束,新到Hologres源表的数据不会被读取。从VVR 3.0.0版本开始,支持实时消费Hologres数据,详情请参见实时计算Flink版实时消费Hologres。从VVR 6.0.3版本开始,Hologres源表在批模式读取时支持filter下推,详见源表独有参数
enable_filter_push_down
。实时计算引擎8.0以下版本Hologres CDC模式暂不支持定义Watermark。如果您需要进行窗口聚合,您可以采用非窗口聚合的方式,详情请参见MySQL/Hologres CDC源表不支持窗口函数,如何实现类似每分钟聚合统计的需求?。
结果表独有:无。
维表独有:
维表建议使用主键作为Join条件,对于此类主键点查的维表,创建Hologres表时建议选择行存模式,列存模式对于点查场景性能开销较大。选择行存模式创建维表时必须设置主键,并且将主键设置为Clustering Key才可以工作。详情请参见CREATE TABLE。
如果业务需要,无法使用主键作为Join条件,对于此类非主键点查的维表(即一对多的查询),创建Hologres表时建议选择列存模式,并合理设置分布键Distribution Key以及Event Time Column(Segment Key)以优化查询性能,详情请参见表存储格式:列存、行存、行列共存。
VVR 4.0以下版本仅支持对维表主键点查的维表Join,VVR 4.0及以上版本,jdbc模式支持维表的非主键点查。
注意事项
使用了rpc模式时,VVR版本升级注意事项:
Hologres 2.0版本下线了rpc(用于维表与结果表)模式,全面转为jdbc相关模式(目前包括jdbc、jdbc_fixed和jdbc_copy等),rpc模式不会对同一批次中相同主键的数据做去重,如果业务场景需要保留完整的数据,切换为jdbc模式后,可以通过设置'jdbcWriteBatchSize'='1'防止去重,或者升级到VVR 8.0.5版本配置deduplication.enabled参数为false。
如果您作业中存在使用了rpc模式读写Hologres的情况,此时如果您需要将VVR 4.x升级到VVR 6.x或VVR 8.x,请按照以下情况进行处理:
升级到VVR 6.0.4~6.0.6版本,可能会抛出异常。推荐维表和结果表使用jdbc_fixed或jdbc模式。
升级到VVR 6.0.7及以上版本,无需您做任何处理,Flink系统会自动替换rpc为jdbc相关模式。
使用binlog源表且未指定jdbc模式时,VVR版本升级注意事项:
Hologres 2.0版本开始有限支持holohub(用于Binlog源表)模式,Hologres 2.1版本彻底下线了holohub模式,全面转为jdbc模式。
如果您作业中存在消费binlog源表的情况,而且binlog源表未通过sdkmode='jdbc'指定jdbc模式,默认会使用holohub模式。此时如果您需要将VVR 4.x升级到VVR 6.x或VVR 8.x,请按照以下情况进行处理:
如果Hologres版本是2.0。
升级到VVR 6.0.7~VVR 8.0.3版本,仍然可以继续使用holohub模式。
升级到VVR 8.0.4及以上版本,可能抛出权限不足的异常,需要特别配置用户权限,详情见实时计算Flink版实时消费Hologres。
如果Hologres版本是2.1。
升级到VVR 6.0.7~VVR 8.0.4版本,可能无法正常消费Binlog,建议升级到VVR 8.0.5。
升级到VVR 8.0.5及以上版本,无需您做任何处理,Flink系统会自动替换holohub模式为jdbc模式。
语法结构
CREATE TABLE hologres_table (
name VARCHAR,
age BIGINT,
birthday BIGINT,
PRIMARY KEY (name) NOT ENFORCED
) WITH (
'connector' = 'hologres',
'dbname' = '<yourDBName>',
'tablename' = '<yourTableName>',
'username' = '${secret_values.ak_id}',
'password' = '${secret_values.ak_secret}',
'endpoint' = '<yourEndpoint>',
'sdkmode' = 'jdbc'
);
WITH参数
仅Flink实时计算引擎VVR 4.0.11及以上版本支持所有jdbc开头的参数。
通用
参数
说明
数据类型
是否必填
默认值
备注
connector
表类型。
String
是
无
固定值为
hologres
。dbname
数据库名称。
String
是
无
Hologres V2.0版本推出了全新的弹性高可用实例形态,将计算资源分解为不同的计算组(Virtual Warehouse),更好的服务于高可用部署。不同的计算组使用相同的Endpoint,您可以通过在dbname参数后添加特定的后缀来指定连接某个计算组。例如某张维表希望连接特定的计算组read_warehouse,可以通过
'dbname' = 'db_test@read_warehouse'
方式指定。说明仅JDBC相关模式支持使用计算组,详见源表、维表和结果表WITH参数中的sdkMode参数。
tablename
表名称。
String
是
无
如果Schema不为Public时,则tablename需要填写为
schema.tableName
。username
用户名,请填写阿里云账号的AccessKey ID。
String
是
无
详情请参见如何查看AccessKey ID和AccessKey Secret信息?
重要为了避免您的AK信息泄露,建议您通过密钥管理的方式填写AccessKey ID取值,详情请参见变量和密钥管理。
password
密码,请填写阿里云账号的AccessKey Secret。
String
是
无
详情请参见如何查看AccessKey ID和AccessKey Secret信息?
重要为了避免您的AK信息泄露,建议您通过密钥管理的方式填写AccessKey Secret取值,详情请参见变量和密钥管理。
endpoint
Hologres服务地址。
String
是
无
详情请参见访问域名。
connection.ssl.mode
是否启用SSL(Secure Sockets Layer)传输加密,以及启用采用何种模式。
String
否
disable
参数取值如下:
disable(默认值):不启用传输加密。
require:启用SSL,只对数据链路加密。
verify-ca:启用SSL,加密数据链路,同时使用CA证书验证Hologres服务端的真实性。
verify-full:启用SSL,加密数据链路,使用CA证书验证Hologres服务端的真实性,同时比对证书内的CN或DNS与连接时配置的Hologres连接地址是否一致。
说明VVR 8.0.5及以上版本开始支持此参数。
Hologres自1.1版本起支持SSL传输加密的require模式,2.1版本起新增支持verify-ca和verify-full模式。详见传输加密。
当配置为verify-ca或者verify-full时,需要同时配置connection.ssl.root-cert.location参数。
connection.ssl.root-cert.location
当传输加密模式需要证书时,配置证书的路径。
String
否
无
当connection.ssl.mode配置为verify-ca或者verify-full时,需要同时配置CA证书的路径。证书可以使用实时计算控制台的资源上传功能上传至平台,上传后文件存放在/flink/usrlib目录下。例如,需要使用的CA证书文件名为certificate.crt,则上传后参数取值应该为
'/flink/usrlib/certificate.crt'
。说明VVR 8.0.5及以上版本开始支持此参数。
CA证书获取方式见传输加密-下载CA证书。
jdbcRetryCount
当连接故障时,写入和查询的重试次数。
Integer
否
10
无。
jdbcRetrySleepInitMs
每次重试的固定等待时间。
Long
否
1000
实际重试的等待时间的计算公式为
jdbcRetrySleepInitMs+retry*jdbcRetrySleepStepMs
。单位为毫秒。jdbcRetrySleepStepMs
每次重试的累加等待时间。
Long
否
5000
实际重试的等待时间的计算公式为
jdbcRetrySleepInitMs+retry*jdbcRetrySleepStepMs
。单位为毫秒。jdbcConnectionMaxIdleMs
JDBC连接的空闲时间。
Long
否
60000
超过这个空闲时间,连接就会断开释放掉。单位为毫秒。
jdbcMetaCacheTTL
本地缓存TableSchema信息的过期时间。
Long
否
60000
单位为毫秒。
jdbcMetaAutoRefreshFactor
如果缓存的剩余时间小于触发时间,则系统会自动刷新缓存。
Integer
否
4
缓存的剩余时间计算方法:缓存的剩余时间=缓存的过期时间 - 缓存已经存活的时间。缓存自动刷新后,则从0开始重新计算缓存的存活时间。
触发时间计算方法:jdbcMetaCacheTTL/jdbcMetaAutoRefreshFactor两个参数的比值。
type-mapping.timestamp-converting.legacy
Flink和Hologres之间是否进行时间类型的相互转换。
Boolean
否
true
参数取值如下:
true:不进行相互转换。时区转换将采用运行环境中的JVM时区。
false(推荐):进行相互转换。时区转换将使用Flink所配置的时区。
说明仅实时计算引擎VVR 8.0.6及以上版本支持该参数。
Flink和Hologres的时区详情,请参见Flink与Hologres时区说明。
property-version=0时,默认值为true;property-version=1时,默认值为false。
property-version
Connector参数版本。
Integer
否
0
可填的值为0和1,默认值为0。
说明仅VVR 8.0.6及以上版本支持配置该参数。
在不同参数版本里,可用的参数集合和参数的默认值可能不同。如果存在区别,区别详情会在参数的说明部分描述。
推荐使用参数版本1。
源表独有
参数
说明
数据类型
是否必填
默认值
备注
field_delimiter
导出数据时,不同行之间使用的分隔符。
String
否
"\u0002"
无。
binlog
是否消费Binlog数据。
Boolean
否
false
参数取值如下:
true:消费Binlog数据。
false(默认值):不消费Binlog数据。
说明实时计算引擎VVR 8.0.6及以上版本支持多版本默认值。
property-version=0时,默认值为false。
property-version=1时,默认值为true。
sdkMode
SDK模式。
String
否
holohub
参数取值如下:
holohub(默认值):使用holohub模式消费binlog。
jdbc:使用jdbc模式消费binlog。
jdbc_fixed: 使用fixed jdbc模式消费binlog,与jdbc模式的区别在于不受连接数限制。目前此模式暂不支持消费开起数据脱敏功能的database的binlog,详情请参见数据脱敏。
详情请参见Binlog Source表。
说明VVR 6.0.3及以下版本:不支持配置该参数。
VVR 6.0.4~6.0.6版本:推荐使用默认配置,即holohub。
VVR 6.0.7及以上版本:推荐配置为jdbc。
Hologres实例为2.0以下版本,Flink系统采用您配置的参数值。
Hologres实例为2.0及以上版本,由于Hologres 2.0版本下线了holohub服务,此时如果您配置了holohub,Flink系统自动切换为jdbc。但是如果您配置为jdbc,Flink系统采用jdbc。
VVR 8.0.4~8.0.5:
Hologres实例为2.0版本,Flink系统自动切换为jdbc。可能存在权限不足等问题,参考实时计算Flink版实时消费Hologres文档进行处理。
Hologres实例为2.1及以上版本,Flink系统自动切换为jdbc。
VVR 8.0.6及以上版本:
Hologres实例为2.1.27及以上版本,Flink系统自动切换为jdbc_fixed模式。
Hologres实例为2.1.0~2.1.26版本,Flink系统自动切换为jdbc模式。
Hologres实例为2.0版本,Flink系统会尝试使用jdbc模式,如果存在权限不足等异常,会自动选择holohub模式启动。
jdbcBinlogSlotName
JDBC模式的binlog源表的Slot名称。创建方法请参见JDBC模式Binlog源表。
String
否
无
仅在sdkMode配置为jdbc时生效,如果用户未配置,连接器会默认创建一个Slot来使用。详见JDBC模式Binlog源表。
说明Hologres实例2.1版本起,不再需要配置此参数,连接器也不会尝试自动创建。
binlogMaxRetryTimes
读取Binlog数据出错后的重试次数。
Integer
否
60
无。
binlogRetryIntervalMs
读取Binlog数据出错后的重试时间间隔。
Long
否
2000
单位为毫秒。
binlogBatchReadSize
批量读取Binlog的数据行数。
Integer
否
100
无。
cdcMode
是否采用CDC模式读取Binlog数据。
Boolean
否
false
参数取值如下:
true:CDC模式读取Binlog数据。
false(默认值):非CDC模式读取Binlog数据。
说明实时计算引擎VVR 8.0.6及以上版本支持多版本默认值。
property-version=0时,默认值为false。
property-version=1时,默认值为true。
upsertSource
源表是否使用upsert类型的Changelog。
Boolean
否
false
仅在CDC模式下生效。参数取值如下:
true:仅支持Upsert类型,包括INSERT、DELETE和UPDATE_AFTER。
false(默认值):支持所有类型,包括INSERT、DELETE、UPDATE_BEFORE和UPDATE_AFTER。
说明如果下游包含回撤算子(例如使用ROW_NUMBER OVER WINDOW去重),则需要设置upsertSource为true,此时源表会以Upsert方式从Hologres中读取数据。
binlogStartupMode
Binlog数据消费模式。
String
否
earliestOffset
参数取值如下:
initial:先全量消费数据,再读取Binlog开始增量消费。
earliestOffset(默认值):从最早的Binlog开始消费。
timestamp:从设置的startTime开始消费Binlog。
说明如果设置了startTime或者在启动界面选择了启动时间,则binlogStartupMode强制使用timestamp模式,其他消费模式不生效,即startTime参数优先级更高。
说明仅实时计算引擎VVR 4.0.13及以上版本支持该参数。
实时计算引擎VVR 8.0.6及以上版本支持多版本默认值。
property-version=0时,默认值为false。
property-version=1时,默认值为true。
startTime
启动位点的时间。
String
否
无
格式为yyyy-MM-dd hh:mm:ss。如果没有设置该参数,且作业没有从State恢复,则从最早的Binlog开始消费Hologres数据。
jdbcScanFetchSize
扫描时攒批大小。
Integer
否
256
无。
jdbcScanTimeoutSeconds
扫描操作超时时间。
Integer
否
60
单位为秒。
jdbcScanTransactionSessionTimeoutSeconds
扫描操作所在事务的超时时间。
Integer
否
600秒(0表示不超时)
对应Hologres的GUC参数idle_in_transaction_session_timeout,详情请参见GUC参数。
说明仅实时计算引擎Flink1.13-vvr-4.0.15及以上版本支持该参数。
enable_filter_push_down
全量读取阶段是否进行filter下推。
Boolean
否
false
参数取值如下:
false(默认值):不进行filter下推。
true:读取全量数据时,将支持的过滤条件下推到Hologres执行。包括非Binlog Source全量读取以及Binlog Source使用全增量一体化消费模式时的全量阶段。
重要实时计算引擎Flink1.15-vvr-6.0.3到Flink1.15-vvr-6.0.5默认会进行filter下推,但如果作业使用了hologres维表,且写入的DML中包含有对维表非主键字段的过滤条件时,维表的filter会被错误的下推,可能导致维表join出现错误结果。因此推荐使用实时计算引擎Flink1.15-vvr-6.0.6及以上版本,并在源表增加此参数来开启过滤条件下推功能。
结果表独有
参数
说明
数据类型
是否必填
默认值
备注
sdkMode
SDK模式。
String
否
jdbc
参数取值如下:
jdbc:默认值,表示使用jdbc模式进行写入。
jdbc_copy:是否使用fixed copy方式写入。
fixed copy是hologres1.3新增的能力,相比通过insert方法(jdbc模式)进行写入,fixed copy方式可以更高的吞吐(因为是流模式),更低的数据延时,更低的客户端内存消耗(因为不攒批)。但此模式暂不支持delete数据,也不支持写入分区父表,不支持ignoreNullWhenUpdate参数。
rpc:表示使用rpc模式进行写入,与useRpcMode参数一致,与jdbc模式的区别在于不占用连接数,并且不支持写入Hologres的JSONB、RoarinBitmap类型。
jdbc_fixed(beta功能):表示使用fixed jdbc方式进行写入,
需要Hologres引擎版本大于等于1.3,与jdbc模式的区别在于不占用连接数,不支持写入Hologres的Jsonb,RoarinBitmap类型。目前此模式暂不支持写入开起数据脱敏功能的database,详情请参见数据脱敏。
说明VVR 6.0.3以下版本:不支持配置该参数。
VVR 6.0.4~VVR 6.0.6版本:推荐配置为jdbc。
VVR 6.0.7~VVR 8.0.1版本:推荐配置为jdbc。
如果Hologres实例为2.0以下版本,Flink系统采用您配置的参数值。
如果Hologres实例为2.0及以上版本,由于Hologres 2.0及以上版本下线了rpc服务,此时如果您将该参数配置为rpc,Flink系统将自动切换该参数值为jdbc_fixed,但是如果您配置为其他值,Flink系统将采用您配置的参数值。
rpc模式不会对同一批次中相同主键的数据做去重,如果业务场景需要保留完整的数据,切换为jdbc模式后,可以通过设置'jdbcWriteBatchSize'='1'防止去重。
VVR 8.0.3及以上版本:推荐配置为jdbc。
自此版本开始,无论Hologres实例版本,都不再支持rpc模式,如果选择rpc模式,将自动切换该参数值为jdbc_fixed且设置'jdbcWriteBatchSize'='1'防止去重。
VVR 8.0.5及以上版本:推荐配置为jdbc。
如果选择rpc模式,将自动切换该参数值为jdbc_fixed且设置deduplication.enabled参数为false防止去重。
bulkload
是否采用bulkload写入。
Boolean
否
false
仅在sdkMode设置为jdbc_copy时生效。bulkload写入目前仅适用于无主键表或者主键保证不重复的有主键表(主键重复会抛出异常),相比默认的jdbc_copy,写入使用更少的Hologres资源。
说明仅实时计算引擎VVR 8.0.5及以上版本且Hologres实例为2.1及以上版本支持该参数。
useRpcMode
是否通过RPC方式使用Hologres连接器。
Boolean
否
false
参数取值如下:
true:使用RPC方式使用Hologres连接器。
与sdkMode参数设置为rpc效果相同,通过RPC方式会降低SQL连接数。
false(默认值):使用JDBC方式使用Hologres连接器。
通过JDBC方式会占用SQL连接,导致JDBC连接数增加。
说明Hologres 2.0版本下线了rpc模式,推荐使用sdkMode参数来选择jdbc或者jdbc_fixed模式。
实时计算引擎VVR 6.0.7及VVR 8.0.1版本在检测到Hologres实例是2.0及以上版本时,会自动切换rpc模式为jdbc_fixed模式。
实时计算引擎VVR 8.0.3及以上版本会自动切换rpc模式为jdbc_fixed模式。
rpc模式不会对同一批次中相同主键的数据做去重,如果业务场景需要保留完整的数据变化记录,推荐使用实时计算引擎VVR 8.0.5及以上版本,jdbc模式可以配置deduplication.enabled参数为false不进行去重。
property-version=1时,该参数下线。
mutatetype
数据写入模式。
String
否
insertorignore
详情请参见流式语义。
说明实时计算引擎VVR 8.0.6及以上版本支持多版本默认值。
property-version=0时,默认值为insertorignore。
property-version=1时,默认值为insertorupdate。
partitionrouter
是否写入分区表。
Boolean
否
false
无。
createparttable
当写入分区表时,是否根据分区值自动创建不存在的分区表。
Boolean
否
false
RPC模式下,如果分区值中存在短划线(-),暂不支持自动创建分区表。
说明Hologres实例1.3.22及以上版本开始支持使用Date类型做分区键。实时计算引擎VVR 8.0.3及以上版本,支持使用Date类型做分区键时自动创建分区表。
请确保分区值不会出现脏数据,否则会创建错误的分区表导致Failover,建议慎用该参数。
当sdk_mode设置为jdbc_copy时,不支持写入分区父表。
ignoredelete
是否忽略撤回消息。
Boolean
否
true
说明仅在使用流式语义时生效。
实时计算引擎VVR 8.0.8及以上版本推荐sink.delete-strategy。
实时计算引擎VVR 8.0.6及以上版本支持多版本默认值。
property-version=0时,默认值为true。
property-version=1时,默认值为false。
sink.delete-strategy
撤回消息的处理方式。
String
否
无
参数取值如下:
IGNORE_DELETE:忽略Update Before和Delete消息。适用于仅需插入或更新数据,而无需删除数据的场景。
NON_PK_FIELD_TO_NULL:忽略Update Before消息,并将Delete消息执行为将非主键字段更新为null。适用于希望在局部更新操作中执行删除操作而不影响其他列的场景。
DELETE_ROW_ON_PK:忽略Update Before消息,并将Delete消息执行为根据主键删除整行。适用于在局部更新过程中,希望执行删除整行操作,从而影响其他列的场景。
CHANGELOG_STANDARD:Flink框架按照 Flink SQL Changelog的工作原理运行,不忽略删除操作,并通过先删除数据再插入的方式执行更新操作,以确保数据准确性。适用于不涉及局部更新的场景。
说明仅实时计算引擎VVR 8.0.8及以上版本支持该参数。
启用NON_PK_FIELD_TO_NULL选项可能会导致记录中只有主键,其他所有列都为null。
connectionSize
单个Flink结果表任务所创建的JDBC连接池大小。
Integer
否
3
如果作业性能不足,建议您增加连接池大小。连接池大小和数据吞吐成正比。
jdbcWriteBatchSize
JDBC模式,Hologres Sink节点数据攒批条数(不是来一条数据处理一条,而是攒一批再处理)的最大值。
Integer
否
256
单位为数据行数。
说明jdbcWriteBatchSize、jdbcWriteBatchByteSize和jdbcWriteFlushInterval三者之间为或的关系。如果同时设置了这三个参数,则满足其中一个,就进行写入结果数据。
jdbcWriteBatchByteSize
JDBC模式,Hologres Sink节点数据攒批字节数(不是来一条数据处理一条,而是攒一批再处理)的最大值。
Long
否
2097152(2*1024*1024)字节,即2 MB
说明jdbcWriteBatchSize、jdbcWriteBatchByteSize和jdbcWriteFlushInterval三者之间为或的关系。如果同时设置了这三个参数,则满足其中一个,就进行写入结果数据。
jdbcWriteFlushInterval
JDBC模式,Hologres Sink节点数据攒批写入Hologres的最长等待时间。
Long
否
10000
单位为毫秒。
说明jdbcWriteBatchSize、jdbcWriteBatchByteSize和jdbcWriteFlushInterval三者之间为或的关系。如果同时设置了这三个参数,则满足其中一个,就进行写入结果数据。
ignoreNullWhenUpdate
当mutatetype='insertOrUpdate'时,是否忽略更新写入数据中的Null值。
Boolean
否
false
取值说明如下:
false(默认值):将Null值写到Hologres结果表里。
true:忽略更新写入数据中的Null值。
说明仅Flink计算引擎VVR 4.0及以上版本支持该参数。
当sdk_mode设置为jdbc_copy时,不支持此参数。
connectionPoolName
连接池名称。同一个TaskManager中,配置相同名称的连接池的表可以共享连接池。
String
否
无
取值为非
'default'
的任意字符串。如果多个表设置相同的连接池,则这些使用相同连接池的表的connectionSize参数也需要相同。说明VVR 4.0.12以下版本:不支持配置该参数。
VVR 4.0.12~VVR 8.0.3版本:默认不共享,每个表使用自己的连接池。
VVR 8.0.4以上版本:同一个作业中endpoint相同的表会默认共享连接池。作业中表数量较多时连接数可能相对不足影响性能,这种情况下推荐为不同的表设置不同的connectionPoolName。
此参数可以按需配置,比如作业中有维表A,B以及结果表C,D,E五张hologres表,可以A表和B表使用'pool1',C表和D表使用'pool2',E表流量较大,单独使用'pool3'。
jdbcEnableDefaultForNotNullColumn
如果将Null值写入Hologres表中Not Null且无默认值的字段,是否允许连接器帮助填充一个默认值。
Boolean
否
true
参数取值如下:
true(默认值):允许连接器填充默认值并写入,规则如下。
如果字段是String类型,则默认写为空("")。
如果字段是Number类型,则默认写为0。
如果是Date、timestamp或timestamptz时间类型字段,则默认写为1970-01-01 00:00:00。
false:不填充默认值,写Null到Not Null字段时,会抛出异常。
remove-u0000-in-text.enabled
如果写入时字符串类型包含\u0000非法字符,是否允许连接器帮助去除。
Boolean
否
false
参数取值如下:
false(默认值):连接器不对数据进行操作,但碰到脏数据时写入可能抛出如下异常,
ERROR: invalid byte sequence for encoding "UTF8": 0x00
此时需要在源表提前处理脏数据,或者在SQL中定义脏数据处理逻辑。
true:连接器会帮助去除字符串类型中的\u0000,防止写入抛出异常。
说明仅实时计算引擎VVR 8.0.1及以上版本支持该参数。
partial-insert.enabled
是否只插入INSERT语句中定义的字段。
Boolean
否
false
参数取值如下:
false(默认值):无论INSERT语句中声明了哪些字段,都会更新结果表DDL中定义的所有字段,对于未在INSERT语句中声明的字段,会被更新为null。
true:将INSERT语句中定义的字段下推给连接器,从而可以只对声明的字段进行更新或插入。
说明仅实时计算引擎VVR 6.0.7及以上版本支持该参数。
此参数仅在mutatetype参数配置为InsertOrUpdate时生效。
deduplication.enabled
jdbc及jdbc_fixed模式写入攒批过程中,是否进行去重。
Boolean
否
true
参数取值如下:
true(默认值):如果一批数据中有主键相同的数据,默认进行去重,只保留最后一条到达的数据。以两个字段,其中第一个字段为主键的数据举例:
INSERT (1,'a')
和INSERT (1,'b')
两条记录先后到达,去重之后只保留后到达的(1,'b')
写入Hologres结果表中。Hologres结果表中已经存在记录
(1,'a')
,此时DELETE (1,'a')
和INSERT (1,'b')
两条记录先后到达,只保留后到达的(1,'b')
写入hologres中,表现为直接更新,而不是先删除再插入。
false:在攒批过程中不进行去重,如果发现新到的数据和目前攒批的数据中存在主键相同的情况,先将攒批数据写入,写入完成之后再继续写入新到的数据。
说明仅实时计算引擎VVR 8.0.5及以上版本支持该参数。
不允许攒批去重时,极端情况下(例如所有数据的主键都相同)写入会退化为不攒批的单条写入,对性能有一定影响。
维表独有
参数
说明
数据类型
是否必填
默认值
备注
sdkMode
SDK模式。
String
否
jdbc
参数取值如下:
jdbc(默认值):表示使用jdbc模式进行查询,支持主键点查和非主键的查询,但是非主键的查询对性能影响较大,查询较慢。
rpc:表示使用rpc模式进行点查,与useRpcMode参数一致,仅支持主键点查,即维表的主键字段必须与Flink Join On的字段完全匹配,与jdbc模式的区别在于不占用连接数,不支持读取Hologres的Jsonb,RoarinBitmap类型。
jdbc_fixed:(beta功能,需要hologres引擎版本大于等于1.3)表示使用fixed jdbc方式进行点查,与jdbc模式的区别在于不占用连接数,且不支持读取Hologres的Jsonb,RoarinBitmap类型。仅支持主键点查,即维表的主键字段必须与Flink Join On的字段完全匹配。目前此模式暂不支持查询开起数据脱敏功能的database,详情请参见数据脱敏。
说明VVR 6.0.3以下版本:不支持配置该参数。
VVR 6.0.4~VVR 6.0.6版本:推荐配置为jdbc。
在VVR 6.0.6版本,SDK模式选择为jdbc时,如果维表字符串类型的查询结果中包含null值,可能抛出空指针异常,此时推荐您使用rpc模式绕过。
VVR 6.0.7及VVR 8.0.1:推荐配置为jdbc。
如果Hologres实例为2.0以下版本,Flink系统将采用您配置的参数值。
如果Hologres实例为2.0及以上版本,由于Hologres 2.0及以上版本下线了rpc服务,此时如果您将该参数配置为了rpc,Flink系统自动将参数值切换为jdbc_fixed。但是如果您配置为其他值,Flink系统将采用您配置的参数值。
VVR 8.0.3及以上版本:推荐配置为jdbc。
自此版本开始,无论Hologres实例版本,都不再支持rpc模式,如果选择rpc模式,将自动切换该参数值为jdbc_fixed。
useRpcMode
是否通过RPC方式使用Hologres连接器。
Boolean
否
false
参数取值如下:
true:使用RPC方式使用Hologres连接器。与sdkMode参数设置为rpc效果相同。通过RPC方式会降低SQL连接数。
false(默认值):使用JDBC方式使用Hologres连接器。
通过JDBC方式会占用SQL连接,导致JDBC链接数增加。
说明Hologres 2.0版本下线了rpc了服务,推荐使用sdkMode参数来选择jdbc或者jdbc_fixed模式。
实时计算引擎VVR 6.0.7及VVR 8.0.1版本在检测到Hologres实例是2.0及以上版本时,会自动切换rpc模式为jdbc_fixed模式。
实时计算引擎VVR 8.0.3及以上版本会自动切换rpc模式为jdbc_fixed模式。
connectionSize
单个Flink维表任务所创建的JDBC连接池大小。
Integer
否
3
如果作业性能不足,建议您增加连接池大小。连接池大小和数据吞吐成正比。
connectionPoolName
连接池名称。同一个TaskManager中,配置相同名称的连接池的表可以共享连接池。
String
否
无
取值为非
'default'
的任意字符串。如果多个表设置相同的连接池,则这些使用相同连接池的表的connectionSize参数也需要相同。您可以按需配置此参数,例如作业中有维表A,B以及结果表C,D,E五张hologres表,可以A表和B表使用pool1,C表和D表使用pool2,E表流量较大,单独使用pool3。
说明VVR 4.0.12以下版本:不支持配置该参数。
VVR 4.0.12~VVR 8.0.3版本:默认不共享,每个表使用自己的连接池。
VVR 8.0.4以上版本:同一个作业中Endpoint相同的表会默认共享连接池。作业中表数量较多时连接数可能相对不足影响性能,这种情况下推荐为不同的表设置不同的connectionPoolName。
jdbcReadBatchSize
点查Hologres维表时,攒批处理的最大条数。
Integer
否
128
无。
jdbcReadBatchQueueSize
维表点查请求缓冲队列大小。
Integer
否
256
无。
jdbcReadTimeoutMs
维表点查的超时时间。
Long
否
默认值为0,表示不会超时
仅vvr 4.0.15-flink 1.13及以上版本、vvr 6.0.2-flink 1.15及以上版本支持该参数。
jdbcReadRetryCount
维表点查超时时的重试次数。
Interger
否
VVR 8.0.5以下版本:1
VVR 8.0.5及以上版本:10
仅VVR 6.0.3以上版本支持该参数。
本参数与jdbcRetryCount不同,后者是指连接发生异常时的重试次数。
jdbcScanFetchSize
在一对多join(即没有使用完整主键)时使用scan接口,scan攒批处理数据的条数。
Integer
否
256
无。
jdbcScanTimeoutSeconds
scan操作的超时时间。
Integer
否
60
单位为秒。
cache
缓存策略。
String
否
见备注列。
Hologres仅支持None和LRU两种缓存策略,取值详情请参见背景信息。
说明Cache默认值和VVR版本有关:
VVR 4.x版本及以上版本,默认值为None。
VVR 4.x版本以下版本,默认值为LRU。
cacheSize
缓存大小。
Integer
否
10000
选择LRU缓存策略后,可以设置缓存大小。单位为条。
cacheTTLMs
缓存更新时间间隔。
Long
否
见备注列。
单位为毫秒。cacheTTLMs默认值和cache的配置有关:
如果cache配置为LRU,则cacheTTLMs为缓存超时时间。默认不过期。
如果cache配置为None,则cacheTTLMs可以不配置,表示缓存不超时。
cacheEmpty
是否缓存join结果为空的数据。
Boolean
否
true
true(默认值):表示缓存join结果为空的数据。
false:表示不缓存join结果为空的数据。
async
是否异步返回数据。
Boolean
否
false
true:表示异步返回数据。
false(默认值):表示不进行异步返回数据。
说明异步返回数据是无序的。
类型映射
Flink与Hologres的数据类型映射请参见Blink/Flink与Hologres的数据类型映射。
使用示例
源表示例
非Binlog Source表
CREATE TEMPORARY TABLE hologres_source (
name varchar,
age BIGINT,
birthday BIGINT
) WITH (
'connector'='hologres',
'dbname'='<yourDbname>',
'tablename'='<yourTablename>',
'username' = '${secret_values.ak_id}',
'password' = '${secret_values.ak_secret}',
'endpoint'='<yourEndpoint>',
'field_delimiter'='|' --该参数可选。
'sdkmode' = 'jdbc'
);
CREATE TEMPORARY TABLE blackhole_sink(
name varchar,
age BIGINT,
birthday BIGINT
) WITH (
'connector'='blackhole'
);
INSERT INTO blackhole_sink
SELECT name, age, birthday
from hologres_source;
Binlog Source表
Hologres连接器支持实时消费Hologres,即实时消费Hologres的Binlog数据。Flink实时消费Hologres详情请参见实时计算Flink版实时消费Hologres。
结果表示例
CREATE TEMPORARY TABLE datagen_source(
name varchar,
age BIGINT,
birthday BIGINT
) WITH (
'connector'='datagen'
);
CREATE TEMPORARY TABLE hologres_sink (
name varchar,
age BIGINT,
birthday BIGINT
) WITH (
'connector'='hologres',
'dbname'='<yourDbname>',
'tablename'='<yourTablename>',
'username' = '${secret_values.ak_id}',
'password' = '${secret_values.ak_secret}',
'endpoint'='<yourEndpoint>'
);
INSERT INTO hologres_sink SELECT * from datagen_source;
维表示例
CREATE TEMPORARY TABLE datagen_source (
a INT,
b BIGINT,
c STRING,
proctime AS PROCTIME()
) WITH (
'connector' = 'datagen'
);
CREATE TEMPORARY TABLE hologres_dim (
a INT,
b VARCHAR,
c VARCHAR
) WITH (
'connector' = 'hologres',
...
);
CREATE TEMPORARY TABLE blackhole_sink (
a INT,
b STRING
) WITH (
'connector' = 'blackhole'
);
INSERT INTO blackhole_sink SELECT T.a,H.b
FROM datagen_source AS T JOIN hologres_dim FOR SYSTEM_TIME AS OF T.proctime AS H ON T.a = H.a;
特色功能详解
流式语义
宽表Merge和局部更新功能
作为CTAS和CDAS的目标端
DataStream API
通过DataStream的方式读写数据时,需要使用对应的DataStream连接器连接Flink全托管,DataStream连接器设置方法请参见DataStream连接器使用方法。Maven中央库中已经放置了Hologres DataStream连接器。VVR 6.0.7请使用其中的1.15-vvr-6.0.7-1版本的依赖。VVR 8.0.7的依赖请通过ververica-connector-hologres-1.17-vvr-8.0.7.jar下载,在本地调试时,需要使用相应的Uber JAR,详见本地运行和调试包含连接器的作业,VVR 8.0.7对应的Uber JAR为ververica-connector-hologres-1.17-vvr-8.0.7-uber.jar。
Hologres源表
VVR提供了RichInputFormat的实现类HologresBulkreadInputFormat来读取Hologres表数据。以下为构建Hologres Source读取表数据的示例。
VVR 4.0.15
// 初始化读取的表的Schema,需要和Hologres表Schema的字段匹配,可以只定义部分字段。
TableSchema schema = TableSchema.builder()
.field("a", DataTypes.INT())
.build();
// Hologres的相关参数。
Configuration config = new Configuration();
config.setString(HologresConfigs.ENDPOINT, "yourEndpoint");
config.setString(HologresConfigs.USERNAME, "yourUserName");
config.setString(HologresConfigs.PASSWORD, "yourPassword");
config.setString(HologresConfigs.DATABASE, "yourDatabaseName");
config.setString(HologresConfigs.TABLE, "yourTableName");
// 构建JDBC Options。
JDBCOptions jdbcOptions = JDBCUtils.getJDBCOptions(config);
String query = JDBCUtils.getSimpleSelectFromStatement(
jdbcOptions.getTable(), schema.getFieldNames());
// 构建HologresBulkreadInputFormat读取表数据。
HologresBulkreadInputFormat inputFormat = new HologresBulkreadInputFormat(jdbcOptions, schema, query);
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
TypeInformation<RowData> typeInfo = InternalTypeInfo.of(schema.toRowDataType().getLogicalType());
env.addSource(new InputFormatSourceFunction<>(inputFormat, typeInfo)).returns(typeInfo)
.print();
env.execute();
VVR 6.0.7&VVR 8.0.7
// set up the Java DataStream API
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 初始化读取的表的Schema,需要和Hologres表Schema的字段匹配,可以只定义部分字段。
TableSchema schema = TableSchema.builder()
.field("a", DataTypes.INT())
.build();
// Hologres的相关参数。
Configuration config = new Configuration();
config.setString(HologresConfigs.ENDPOINT, "yourEndpoint");
config.setString(HologresConfigs.USERNAME, "yourUserName");
config.setString(HologresConfigs.PASSWORD, "yourPassword");
config.setString(HologresConfigs.DATABASE, "yourDatabaseName");
config.setString(HologresConfigs.TABLE, "yourTableName");
// 构建JDBC Options。
JDBCOptions jdbcOptions = JDBCUtils.getJDBCOptions(config);
HologresBulkreadInputFormat inputFormat = new HologresBulkreadInputFormat(new HologresConnectionParam(config), jdbcOptions, schema, "", -1);
TypeInformation<RowData> typeInfo = InternalTypeInfo.of(schema.toRowDataType().getLogicalType());
env.addSource(new InputFormatSourceFunction<>(inputFormat, typeInfo)).returns(typeInfo)
.print();
env.execute();
Hologres Binlog源表
VVR提供了Source的实现类HologresBinlogSource来读取Hologres Binlog数据。以下为构建Hologres Binlog Source的示例。
VVR 4.0.15
// 初始化读取的表的Schema,需要和Hologres表Schema的字段匹配,可以只定义部分字段。
TableSchema schema = TableSchema.builder()
.field("a", DataTypes.INT())
.build();
// Hologres的相关参数。
Configuration config = new Configuration();
config.setString(HologresConfigs.ENDPOINT, "yourEndpoint");
config.setString(HologresConfigs.USERNAME, "yourUserName");
config.setString(HologresConfigs.PASSWORD, "yourPassword");
config.setString(HologresConfigs.DATABASE, "yourDatabaseName");
config.setString(HologresConfigs.TABLE, "yourTableName");
config.setBoolean(HologresBinlogConfigs.BINLOG_CDC_MODE, true);
// 构建JDBC Options。
JDBCOptions jdbcOptions = JDBCUtils.getJDBCOptions(config);
jdbcOptions.setHolohubEndpoint(JDBCUtils.getHolohubEndpoint(jdbcOptions));
RowDataRecordConverter recordConverter = buildRecordConverter(schema, config, jdbcOptions);
// 构建Hologres Binlog Source。
long startTimeMs = 0;
HologresBinlogSource<RowData> source = new HologresBinlogSource<>(
schema,
config,
jdbcOptions,
recordConverter,
startTimeMs);
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.fromSource(source, WatermarkStrategy.noWatermarks(), "Test source").print();
env.execute();
VVR 6.0.7
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 初始化读取的表的Schema,需要和Hologres表Schema的字段匹配,可以只定义部分字段。
TableSchema schema = TableSchema.builder()
.field("a", DataTypes.INT())
.build();
// Hologres的相关参数。
Configuration config = new Configuration();
config.setString(HologresConfigs.ENDPOINT, "yourEndpoint");
config.setString(HologresConfigs.USERNAME, "yourUserName");
config.setString(HologresConfigs.PASSWORD, "yourPassword");
config.setString(HologresConfigs.DATABASE, "yourDatabaseName");
config.setString(HologresConfigs.TABLE, "yourTableName");
config.setString(HologresConfigs.SDK_MODE, "jdbc");
config.setBoolean(HologresBinlogConfigs.OPTIONAL_BINLOG, true);
config.setBoolean(HologresBinlogConfigs.BINLOG_CDC_MODE, true);
// 构建JDBC Options。
JDBCOptions jdbcOptions = JDBCUtils.getJDBCOptions(config);
// 设置或创建默认slotname
config.setString(HologresBinlogConfigs.JDBC_BINLOG_SLOT_NAME, HoloBinlogUtil.getOrCreateDefaultSlotForJDBCBinlog(jdbcOptions));
boolean cdcMode = config.get(HologresBinlogConfigs.BINLOG_CDC_MODE)
&& config.get(HologresBinlogConfigs.OPTIONAL_BINLOG);
// 构建Binlog Record Converter。
JDBCBinlogRecordConverter recordConverter = new JDBCBinlogRecordConverter(
jdbcOptions.getTable(),
schema,
new HologresConnectionParam(config),
cdcMode,
Collections.emptySet());
// 构建Hologres Binlog Source。
long startTimeMs = 0;
HologresJDBCBinlogSource source = new HologresJDBCBinlogSource(
new HologresConnectionParam(config),
schema,
config,
jdbcOptions,
startTimeMs,
StartupMode.TIMESTAMP,
recordConverter,
"",
-1);
env.fromSource(source, WatermarkStrategy.noWatermarks(), "Test source").print();
env.execute();
VVR 8.0.7
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 初始化读取的表的Schema,需要和Hologres表Schema的字段匹配,可以只定义部分字段。
TableSchema schema = TableSchema.builder()
.field("a", DataTypes.INT())
.build();
// Hologres的相关参数。
Configuration config = new Configuration();
config.setString(HologresConfigs.ENDPOINT, "yourEndpoint");
config.setString(HologresConfigs.USERNAME, "yourUserName");
config.setString(HologresConfigs.PASSWORD, "yourPassword");
config.setString(HologresConfigs.DATABASE, "yourDatabaseName");
config.setString(HologresConfigs.TABLE, "yourTableName");
config.setString(HologresConfigs.SDK_MODE, "jdbc");
config.setBoolean(HologresBinlogConfigs.OPTIONAL_BINLOG, true);
config.setBoolean(HologresBinlogConfigs.BINLOG_CDC_MODE, true);
// 构建JDBC Options。
JDBCOptions jdbcOptions = JDBCUtils.getJDBCOptions(config);
// 构建Hologres Binlog Source。
long startTimeMs = 0;
HologresBinlogSource source = new HologresBinlogSource(
new HologresConnectionParam(config),
schema,
config,
jdbcOptions,
startTimeMs,
StartupMode.INITIAL,
"",
"",
-1,
Collections.emptySet());
env.fromSource(source, WatermarkStrategy.noWatermarks(), "Test source").print();
env.execute();
方法buildRecordConverter不在VVR Connector依赖中,是示例代码中提供的方法。
Hologres Binlog注意事项和实现原理等详情,请参见Binlog Source表。
Hologres结果表
VVR提供了OutputFormatSinkFunction的实现类HologresSinkFunction来写入数据。以下为构建Hologres Sink的示例。
VVR 4.0.15
// 初始化读取的表的Schema。
TableSchema schema = TableSchema.builder()
.field("a", DataTypes.INT())
.field("b", DataTypes.STRING())
.build();
// Hologres的相关参数。
Configuration config = new Configuration();
config.setString(HologresConfigs.ENDPOINT, "yourEndpoint");
config.setString(HologresConfigs.USERNAME, "yourUserName");
config.setString(HologresConfigs.PASSWORD, "yourPassword");
config.setString(HologresConfigs.DATABASE, "yourDatabaseName");
config.setString(HologresConfigs.TABLE, "yourTableName");
config.setBoolean(HologresConfigs.USE_RPC_MODE, true);
HologresConnectionParam hologresConnectionParam = new HologresConnectionParam(config);
// 构建Hologres Writer,以RowData的方式写入数据。
AbstractHologresWriter<RowData> hologresWriter =
buildHologresWriter(schema, config, hologresConnectionParam);
// 构建Hologres Sink。
HologresSinkFunction sinkFunction = new HologresSinkFunction(hologresConnectionParam, hologresWriter);
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
TypeInformation<RowData> typeInfo = InternalTypeInfo.of(schema.toRowDataType().getLogicalType());
int offset = (int) (System.currentTimeMillis() % Integer.MAX_VALUE);
env.fromElements((RowData)GenericRowData.of(2 + offset, StringData.fromString("2")), GenericRowData.of(3 + offset, StringData.fromString("3"))).returns(typeInfo)
.addSink(sinkFunction);
env.execute();
VVR 6.0.7&VVR 8.0.7
// set up the Java DataStream API
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 初始化读取的表的Schema,需要和Hologres表Schema的字段匹配,可以只定义部分字段。
TableSchema schema = TableSchema.builder()
.field("a", DataTypes.INT())
.field("b", DataTypes.STRING())
.build();
// Hologres的相关参数。
Configuration config = new Configuration();
config.setString(HologresConfigs.ENDPOINT, "yourEndpoint");
config.setString(HologresConfigs.USERNAME, "yourUserName");
config.setString(HologresConfigs.PASSWORD, "yourPassword");
config.setString(HologresConfigs.DATABASE, "yourDatabaseName");
config.setString(HologresConfigs.TABLE, "yourTableName");
config.setString(HologresConfigs.SDK_MODE, "jdbc");
HologresConnectionParam hologresConnectionParam = new HologresConnectionParam(config);
// 构建Hologres Writer,以RowData的方式写入数据。
AbstractHologresWriter<RowData> hologresWriter = HologresJDBCWriter.createRowDataWriter(
hologresConnectionParam, schema, HologresTableSchema.get(hologresConnectionParam), new Integer[0]);
// 构建Hologres Sink。
HologresSinkFunction sinkFunction = new HologresSinkFunction(hologresConnectionParam, hologresWriter);
TypeInformation<RowData> typeInfo = InternalTypeInfo.of(schema.toRowDataType().getLogicalType());
int offset = (int) (System.currentTimeMillis() % Integer.MAX_VALUE);
env.fromElements((RowData)GenericRowData.of(2 + offset, StringData.fromString("2")), GenericRowData.of(3 + offset, StringData.fromString("3"))).returns(typeInfo)
.addSink(sinkFunction);
env.execute();
方法buildHologresWriter不在VVR Connector依赖中,是示例代码中提供的方法。
Flink与Hologres时区说明
时间类型
产品 | 类型 | 说明 |
Flink | 表示没有时区信息的日期和时间,描述年、 月、日、小时、分钟、秒和小数秒对应的时间戳。可以通过一个字符串来指定,例如 | |
用于描述时间线上的绝对时间点,使用long保存从epoch至今的毫秒数,使用int保存毫秒中的纳秒数。epoch时间是从Java的标准epoch时间开始计算。在计算和可视化时, 每个TIMESTAMP_LTZ类型的数据都使用Session (会话)中配置的时区。可以用于跨时区的计算,因为它是一个基于epoch的绝对时间点,代表的就是不同时区的同一个绝对时间点。 相同的TIMESTAMP_LTZ值,在不同的时区可能会反映出不同的本地TIMESTAMP,例如:如果一个TIMESTAMP_LTZ值为 | ||
Hologres | TIMESTAMP | 类似于Flink的 |
TIMESTAMP WITH TIME ZONE (TIMESTAMPTZ) | 类似于Flink的 例如北京(UTC+8)时区的时间戳 |
时间类型映射
实时计算引擎VVR 8.0.6及以上版本且
type-mapping.timestamp-converting.legacy=false
时,支持所有时间类型间的相互转换。Flink
Hologres
详情
TIMESTAMP
TIMESTAMP
之间相互转换是直接的,不涉及时区转换。因此推荐采用该数据映射。
TIMESTAMP LTZ
TIMESTAMPTZ
TIMESTAMP
TIMESTAMPTZ
之间的转换涉及时区转换。为了在转换中保持准确性,需要通过配置项参数
table.local-time-zone
设置Flink时区,配置项参数设置方法请参见如何配置作业运行参数?。例如当设置
'table.local-time-zone': 'Asia/Shanghai'
时,表示Flink时区为上海(+8时区)时,Flink TIMESTAMP类型的数据为2022-01-01 01:01:01.123456,写入Hologres TIMESTAMP TZ的数值为2022-01-01 01:01:01.123456+8。TIMESTAMP LTZ
TIMESTAMP
实时计算引擎VVR8.0.6及以上版本且
type-mapping.timestamp-converting.legacy=true
时或者VVR 8.0.5及以下版本,除TIMESTAMP间转化,其他类型相互转化可能会出现数据偏差问题。Flink
Hologres
备注
TIMESTAMP
TIMESTAMP
之间相互转换是直接的,不涉及时区转换。因此推荐采用该数据映射。
TIMESTAMP LTZ
TIMESTAMPTZ
读写Hologres数据时都当作无时区时间进行处理,可能会存在数据偏差。
例如,Flink TIMESTAMP_LTZ类型的数值为2024-03-19T04:00:00Z,在上海(+8时区)对应的实际无时区时间为2024-03-19T12:00:00,但是写入时将2024-03-19T04:00:00当作无时区时间,写入Hologres TIMESTAMPTZ的数值为2024-03-19T04:00:00+08,数值偏差8小时。
TIMESTAMP
TIMESTAMPTZ
时区转换默认采用的是运行环境的JVM时区,而不是Flink时区,这与Flink内部计算的时区转换格式不同。当Flink时区与机器的JVM时区不一致时,会导致数据存在偏差,建议采用Flink时区进行Hologres数据的读写。
TIMESTAMP LTZ
TIMESTAMP