本文为您介绍如何使用Connector同步DataHub中的数据至Hologres。
前提条件
开通Hologres并连接开发工具,详情请参见PSQL客户端。
开通DataHub,详情请参见开始使用。
背景信息
阿里云流数据处理平台DataHub是流式数据(Streaming Data)的处理平台,提供对流式数据的发布(Publish)、订阅(Subscribe)和分发功能,让您可以轻松构建基于流式数据的分析和应用。
DataHub提供数据Sink/Source功能(数据同步),支持对Topic中的数据通过Hologres Connector实时同步到Hologres,对数据进行多维分析和实时探索。
DataHub与Hologres的概念映射如下表所示。
DataHub | Hologres |
Project | Database |
Topic | Table |
从Hologres V2.0版本起优化了DataHub数据同步至Hologres的模式,该模式有如下变化:
支持DataHub的4种新数据类型,包括TINYINT、SMALLINT、INTEGER和FLOAT。
无需在Hologres建表时指定同步模式(
datahub_sync_mode
)和同步策略(datahub_upsert_mode
)两个参数,改为在DataHub同步任务中配置。旧模式不占用Hologres实例连接数。新模式下每个同步任务都会占用部分连接数,每个任务占用的连接数等于DataHub Topic的Shard数。
使用限制
暂不支持Hologres实例开启白名单功能。
数据写入分区表必须先在Hologres中创建分区子表,详情请参见CREATE PARTITION TABLE。
Hologres 2.0版本起支持写入带有Default值的表。
同步介绍
同步DataHub中的数据至Hologres,有两种同步模式和两种同步策略,同步模式与同步策略还可以分别进行组合,实现不同的效果。
以下的两种同步模式和两种策略,不是DataHub的任务级别配置,而是在Hologres建表时的表属性,必须在建Hologres表时指定。
同步DataHub中的数据至Hologres与DataWorks数据集成批量同步至Hologres SDK写入模式冲突,关于SDK写入模式的介绍请参见Hologres Writer。
同步模式
逐条插入
逐条插入是指将DataHub数据逐条写入Hologres,需要在Hologres建表时指定表属性如下。
call set_table_property('<table_name>', 'datahub_sync_mode', 'none');
回放
回放是指回放上游数据库的DML操作,DataHub相当于是一个binlog,若是使用
dts-datahub-hologres
是否启用新的附加列规则,需要在Hologres建表时指定表属性如下。是否启用新的附加列规则选择为是时,需要在Hologres中建表时配置如下表属性。
call set_table_property('<table_name>', 'datahub_sync_mode', 'dts');
是否启用新的附加列规则选择为否时,需要在Hologres中建表时配置如下表属性。
call set_table_property('<table_name>', 'datahub_sync_mode', 'dts_old');
DTS在同步数据到DataHub时,会在数据列的基础上附加如下8列,用于描述回放数据信息(INSERT/UPDATE/DELETE),字段的主要说明如下。
附加列命名方式
旧版数据列名称
新版数据列名称
dts_${原始列名}
new_dts_sync_dts_${原始列名}
附加列说明
旧版附加列名称
新版附加列名称
数据类型
说明
dts_record_id
new_dts_sync_dts_record_id
String
增量日志的记录ID,为该日志唯一标识。
dts_operation_flag
new_dts_sync_dts_operation_flag
String
操作类型,取值:
I:INSERT操作。
D:DELETE操作。
U:UPDATE操作。
dts_instance_id
new_dts_sync_dts_instance_id
String
数据库的server ID。暂不支持显示实际的值,目前固定为NULL。
dts_db_name
new_dts_sync_dts_db_name
String
数据库名称。
dts_table_name
new_dts_sync_dts_table_name
String
表名。
dts_utc_timestamp
new_dts_sync_dts_utc_timestamp
String
操作时间戳,即Binlog的时间戳(UTC时间)。
dts_before_flag
new_dts_sync_dts_before_flag
String
所有列的值是否更新前的值,取值:Y或N。
dts_after_flag
new_dts_sync_dts_after_flag
String
所有列的值是否更新后的值,取值:Y或N。
同步策略(主键冲突策略)
当Hologres表设置主键时,从DataHub写入的数据有如下两种主键冲突策略。
覆盖
覆盖是指当写入发生主键冲突时,新的数据覆盖老数据,这个时候需要在Hologres建表时指定表属性如下。
call set_table_property('<table_name>', 'datahub_upsert_mode', 'insert_or_replace');
忽略
忽略是指写入时发生主键冲突,忽略新数据,即数据不更新,仍然使用老数据,这个时候需要在Hologres建表时指定表属性如下。
call set_table_property('<table_name>', 'datahub_upsert_mode', 'insert_or_ignore');
同步模式与同步策略组合
以上通过DataHub写入Hologres的几种模式,不同组合之间实现的效果不同,具体请参见以下。
插入模式与覆盖策略组合
相当于在Hologres中执行以下SQL。
INSERT INTO target_table (column0,…,columnN) values (?,…,?) ON CONFLICT(PK) DO UPDATE
插入模式与忽略策略组合
相当于在Hologres中执行以下SQL。
INSERT INTO target_table (column0,…,columnN) values (?,…,?) ON CONFLICT(PK) DO NOTHING
回放模式与覆盖策略组合
dts_operation_flag=I,相当于在Hologres中执行以下SQL。
INSERT INTO target_table (column0,…,columnN) values (?,…,?) ON CONFLICT(PK) DO UPDATE
dts_operation_flag=D,相当于在Hologres中执行以下SQL。
DELETE FROM target_table where pk=?
dts_operation_flag=U AND dts_before_flag=Y,相当于在Hologres中执行以下SQL。
DELETE FROM target_table where pk=?
dts_operation_flag=U AND dts_after_flag=Y,相当于在Hologres中执行以下SQL。
INSERT INTO target_table (column0,…,columnN) values (?,…,?) ON CONFLICT(PK) DO UPDATE
回放模式与忽略策略组合
dts_operation_flag=I,相当于在Hologres中执行以下SQL。
INSERT INTO target_table (column0,…,columnN) values (?,…,?) ON CONFLICT(PK) DO NOTHING
dts_operation_flag=D,相当于在Hologres中执行以下SQL。
DELETE FROM target_table where pk=?
dts_operation_flag=U AND dts_before_flag=Y,相当于在Hologres中执行以下SQL。
DELETE FROM target_table where pk=?
dts_operation_flag=U AND dts_after_flag=Y,相当于在Hologres中执行以下SQL。
INSERT INTO target_table (column0,…,columnN) values (?,…,?) ON CONFLICT(PK) DO NOTHING
操作步骤
准备DataHub数据源。
创建项目。
登录DataHub控制台,单击左侧导航栏的项目管理。
在项目列表页面单击新建项目。
在新建项目弹框,配置参数后,单击创建。
新建Topic。
成功创建项目后,在项目列表页面单击项目名称,进入项目详情页。
单击项目详情页右上角的新建Topic,进入新建Topic页面,填写配置参数。
参数
描述
创建方式
直接创建:创建新的Topic。
导入MaxCompute表结构:选择MaxCompute中已有的表结构创建Topic。
名称
自定义Topic名称。
类型
Topic类型,分为以下两种:
TUPLE:结构化数据。
BLOB:非结构化数据。
Schema详情
选择TUPLE类型会出现Schema详情,根据自己需求创建字段,允许为NULL代表如果上游没有该字段值自动置为NULL;不允许为NULL则会严格检验,字段类型不匹配写入报错。
Shard数量
Shard表示对一个Topic进行数据传输的并发通道,每个Shard会有对应的ID。每个Shard会有多种状态:Opening - 启动中,Active - 启动完成可服务。每个Shard启用以后会占用一定的服务端资源,建议按需申请Shard数量。
生命周期
Topic中写入数据在系统中可以保存的最长时间,以天为单位,最小值为1,最大值为7。
Shard扩展模式
可选择开启,开启此开关后,Shard支持水平扩展,不再支持“合并”和“分裂”,此后Shard数量只能增加,不可减少。
说明此模式开启后可以使用Kafka方式来消费当前Topic。
启动多Version
可选择开启,开启此开关后,Topic可以同时拥有多个Schema,可以选择其中一个Schema写入,消费端会根据每条数据标记的Version自适应完成解析(如对应的Version的Schema被删除则解析失败)。
● 用户无法再使用appendFields接口。
● 可以对chema进行增删改查。
● 创建Connector使用最新Version的Schema。
描述
Topic的描述信息。
写入数据。
成功创建Topic后,您需要使用工具(例如Blink)或者程序写入数据至Topic中。
Hologres创建数据接收表。
在Hologres中创建一张用于接收数据的表,表的字段类型与DataHub中Topic的字段类型相互映射。
DataHub与Hologres的数据类型映射如下表所示。
DataHub
Hologres
TINYINT
SMALLINT
SMALLINT
SMALLINT
INTERGER
INTERGER
BIGINT
BIGINT
FLOAT
REAL
DOUBLE
DOUBLE PRECISION
DECIMAL
DECIMAL
STRING
TEXT
BOOLEAN
BOOLEAN
TIMESTAMP
TIMESTAMPTZ
示例建表语句如下。
BEGIN; CREATE TABLE lineitem ( L_ORDERKEY BIGINT NOT NULL, L_PARTKEY BIGINT NOT NULL, L_SUPPKEY BIGINT NOT NULL, L_LINENUMBER BIGINT NOT NULL, L_QUANTITY DECIMAL(20,10), L_EXTENDEDPRICE DECIMAL(20,10), L_DISCOUNT DECIMAL(20,10), L_TAX DECIMAL(20,10), L_RETURNFLAG TEXT, L_LINESTATUS TEXT, L_SHIPDATE TIMESTAMPTZ, L_COMMITDATE TIMESTAMPTZ, L_RECEIPTDATE TIMESTAMPTZ, L_SHIPINSTRUCT TEXT, L_SHIPMODE TEXT, L_COMMENT TEXT ); CALL set_table_property('lineitem', 'orientation', 'column'); CALL set_table_property('lineitem', 'datahub_sync_mode', 'none'); CALL set_table_property('lineitem', 'datahub_upsert_mode', 'insert_or_ignore'); COMMIT;
在DataHub中创建Hologres Connector。
单击DataHub中已创建的Topic,进入Topic详情页。
单击Topic详情页右上角的+同步。
在新建Connector界面单击Hologres,在新建Connector页面配置参数后,单击创建。
参数
说明
Instance
Hologres实例的ID。进入Hologres管理控制台,获取实例ID。
Project
Hologres的数据库名称。
Topic
Hologres用于接收数据的表名称。
导入字段
需要导入Hologres的字段。可以根据实际业务需求选择导入部分或全部字段。
鉴权模式
默认为AccessKey。
AccessKey ID
访问Hologres实例的AccessKey ID。您可以单击AccessKey 管理,获取用户的AccessKey ID。
AccessKey Secret
访问Hologres实例的AccessKey Secret。您可以单击AccessKey 管理,获取AccessKey Secret。
Timestamp Unit
同步时间单位,可选择如下。
MICROSECOND:微秒,为默认值。
MILLISECOND:毫秒。
SECOND:秒。
同步DataHub的数据至Hologres。
成功创建Connector后,您可以在Topic详情页的同步任务中查看实时同步数据的状态。
Hologres查询数据。
您可以连接Hologres实例至开发工具,实时查询同步至Hologres中的数据,详情请参见概述。示例查询语句如下。
SELECT COUNT(*) FROM lineitem;
常见报错
为您介绍在使用Hologres过程中的常见报错,以便于您能自行排查并解决问题。
场景1:查询数据时,出现如下报错。
ErrorMessage [Import field not found in dest schema;
可能原因:未设置datahub_sync_mode参数值为
dts
。解决办法:重新创建Hologres表,并设置表属性datahub_sync_mode为
dts
。场景2:查询数据时,出现如下报错。
ErrorCode=InternalServerError; ErrorMessage =Field already exists
可能原因:Hologres列datahub_sync_mode设置为
dts
,并且建表时包含了8列附加列。解决办法:重新创建Hologres表,设置datahub_sync_mode为
dts
时,字段只需要跟上游保持一致,无需多增加8列附加列。