将DataHub数据同步至Hologres适用于需要实时数据分析、实时监控、实时报表和复杂数据结构处理的场景,通过结合DataHub和Hologres的能力,可以提升数据处理和分析的效率和准确性。本文介绍如何创建DataHub数据同步任务以及常见问题。
背景信息
DataHub提供数据Sink/Source功能,即数据同步功能,支持将对应Topic中的数据实时/准实时的同步到第三方阿里云产品中,打通阿里云产品间的数据流通。关于DataHub数据同步功能介绍详情,请参见概述
DataHub与Hologres的映射关系如下表所示。
|
DataHub |
Hologres |
|
Project |
Database |
|
Topic |
Table |
同步场景与策略介绍
同步场景
|
同步场景 |
说明 |
|
逐条插入 |
将源数据库中数据逐条插入到目标数据库中,适用于将DataHub数据全量同步至Hologres的场景。 |
|
回放 |
通过分析并重新执行Binlog数据库操作记录日志,将源数据库变更操作应用到目标数据库中,最终确保数据一致性。适用于DTS同步数据至DataHub,再将DataHub数据同步至Hologres的链路场景,其中DataHub相当于Binlog。 说明
DTS同步数据至DataHub,会在数据列的基础上添加附加列,用于记录数据的操作信息。DTS同步数据至DataHub,附加列的命名存在新旧差异,详情请参见修改数据同步的附加列规则。 |
同步策略
|
同步策略 |
说明 |
|
覆盖(replace) |
数据写入发生主键冲突,新的数据覆盖老数据并写入,确保目标数据库中的数据与源数据保持一致。 |
|
忽略(ignore) |
数据写入发生主键冲突,忽略新数据,即数据不更新,避免重复数据的导入和覆盖,保持目标数据库的数据完整性。 |
注意事项
-
仅支持将DataHub TUPLE类型数据同步到Hologres。
-
数据写入分区表必须先在Hologres中创建分区子表,详情请参见CREATE PARTITION TABLE。
-
每个同步任务都会占用部分连接数,每个任务占用的连接数等于DataHub Topic的Shard数。
准备工作
-
开通DataHub服务并准备DataHub数据。具体操作,请参见快速入门(同步示例)。
-
购买Hologres实例并建表,本文以表
lineitem为示例。具体操作,请参见购买Hologres实例和通过HoloWeb连接Hologres创建表。DataHub与Hologres的数据类型映射如下表所示。
DataHub
Hologres
TINYINT
SMALLINT
SMALLINT
SMALLINT
INTEGER
INTEGER
BIGINT
BIGINT
FLOAT
REAL
DOUBLE
DOUBLE PRECISION
DECIMAL
DECIMAL
STRING
TEXT
BOOLEAN
BOOLEAN
TIMESTAMP
TIMESTAMPTZ
示例建表语句如下。
BEGIN; CREATE TABLE lineitem ( L_ORDERKEY BIGINT NOT NULL, L_PARTKEY BIGINT NOT NULL, L_SUPPKEY BIGINT NOT NULL, L_LINENUMBER BIGINT NOT NULL, L_QUANTITY DECIMAL(20,10), L_EXTENDEDPRICE DECIMAL(20,10), L_DISCOUNT DECIMAL(20,10), L_TAX DECIMAL(20,10), L_RETURNFLAG TEXT, L_LINESTATUS TEXT, L_SHIPDATE TIMESTAMPTZ, L_COMMITDATE TIMESTAMPTZ, L_RECEIPTDATE TIMESTAMPTZ, L_SHIPINSTRUCT TEXT, L_SHIPMODE TEXT, L_COMMENT TEXT ); caLL set_table_property('lineitem', 'orientation', 'column'); COMMIT;
创建同步任务
-
登录DataHub服务控制台,单击已创建的Topic,进入Topic详情页。
-
单击Topic详情页右上角的+同步。
-
单击Hologres,在新建Connector页面配置参数。
参数
说明
Instance
Hologres实例的ID。进入Hologres管理控制台,获取实例ID。
Database
用于接收DataHub数据的Hologres数据库名称。
Table
用于接收DataHub数据的Hologres表名称
lineitem。主键冲突策略
主键冲突时的数据更新策略。取值说明:
-
replace(默认值):数据写入发生主键冲突时,新数据写入覆盖老数据。
-
ignore:数据写入发生主键冲突时,忽略新数据,即数据不更新,仍然使用老数据。
关于数据同步策略详情,请参见同步策略。
同步场景
数据同步的使用场景。取值说明:
-
default(默认值):逐条插入。
-
dts:通过DTS同步数据至DataHub时,若启用新附加列规则,选择此参数。
-
dts_old:通过DTS同步数据至DataHub时,若未启用新附加列规则,选择此参数。
关于数据同步场景详情,请参见同步场景。
导入字段
需要导入Hologres的字段。可以根据实际业务需求选择导入部分或全部字段。
鉴权模式
默认为AccessKey。
AccessKey ID
访问Hologres实例的AccessKey ID。您可以单击AccessKey 管理,获取用户的AccessKey ID。
AccessKey Secret
访问Hologres实例的AccessKey Secret。您可以单击AccessKey 管理,获取AccessKey Secret。
Timestamp Unit
同步时间单位,取值说明。
-
MICROSECOND:微秒,为默认值。
-
MILLISECOND:毫秒。
-
SECOND:秒。
-
-
单击创建,同步DataHub的数据至Hologres。
创建Connector后,您可以在Topic详情页的同步任务中查看实时同步数据的状态。
-
Hologres查询数据。
您可以连接Hologres实例开发工具,实时查询同步至Hologres中的数据。Hologres连接详情,请参见连接Hologres。执行以下示例查询语句。
SELECT COUNT(*) FROM lineitem;
常见问题
为您介绍在使用Hologres过程中的常见报错,以便于您能自行排查并解决问题。
-
问题1
-
报错信息
ErrorMessage: Import field not found in dest schema. -
报错原因
-
Hologres表中未包含同步任务指定的导入字段。
-
任务的同步场景指定为default,但是导入字段包含DTS同步到DataHub产生的附加列。
-
-
解决方法
-
重新创建Hologres表,加上未包含的导入字段;修改同步任务的导入字段,去掉Hologres表中不存在的字段。
-
重新创建同步任务,并指定同步场景为dts或dts_old。
-
-
-
问题2
-
报错信息
ErrorMessage: Column type not match with Holo column. -
报错原因
DataHub Topic中的字段类型与Hologres表中的字段类型不匹配。
-
解决方法
根据数据类型映射,重新创建Hologres表,设置正确的字段类型。
-
-
问题3
-
报错信息
ErrorMessage: Not import column xxx not allow null and no default value. -
报错原因
Hologres表中的部分字段未包含在同步任务的导入字段中,但是这部分字段设置了not null属性,并且未设置default value。
-
解决方法
重新建Hologres表,对未包含在同步任务导入字段中的字段,不设置not null属性或设置default value。
-