全部产品
Search
文档中心

实时数仓Hologres:实时同步DataHub的数据至Hologres

更新时间:May 24, 2023

本文为您介绍如何使用Connector同步DataHub中的数据至Hologres。

前提条件

  • 开通Hologres并连接开发工具,详情请参见PSQL客户端

  • 开通DataHub,详情请参见开始使用

背景信息

阿里云流数据处理平台DataHub是流式数据(Streaming Data)的处理平台,提供对流式数据的发布(Publish)、订阅(Subscribe)和分发功能,让您可以轻松构建基于流式数据的分析和应用。

DataHub提供数据Sink/Source功能(数据同步),支持对Topic中的数据通过Hologres Connector实时同步到Hologres,对数据进行多维分析和实时探索。

DataHub与Hologres的概念映射如下表所示。

DataHub

Hologres

Project

Database

Topic

Table

从Hologres V2.0版本起优化了DataHub数据同步至Hologres的模式,该模式有如下变化:

  • 支持DataHub的4种新数据类型,包括TINYINT、SMALLINT、INTEGER和FLOAT。

  • 无需在Hologres建表时指定同步模式(datahub_sync_mode)和同步策略(datahub_upsert_mode)两个参数,改为在DataHub同步任务中配置。

  • 旧模式不占用Hologres实例连接数。新模式下每个同步任务都会占用部分连接数,每个任务占用的连接数等于DataHub Topic的Shard数。

使用限制

  • 暂不支持Hologres实例开启白名单功能。

  • 数据写入分区表必须先在Hologres中创建分区子表,详情请参见CREATE PARTITION TABLE

  • Hologres 2.0版本起支持写入带有Default值的表。

同步介绍

同步DataHub中的数据至Hologres,有两种同步模式和两种同步策略,同步模式与同步策略还可以分别进行组合,实现不同的效果。

说明
  • 以下的两种同步模式和两种策略,不是DataHub的任务级别配置,而是在Hologres建表时的表属性,必须在建Hologres表时指定。

  • 同步DataHub中的数据至Hologres与DataWorks数据集成批量同步至Hologres SDK写入模式冲突,关于SDK写入模式的介绍请参见Hologres Writer

  • 同步模式

    • 逐条插入

      逐条插入是指将DataHub数据逐条写入Hologres,需要在Hologres建表时指定表属性如下。

      call set_table_property('<table_name>', 'datahub_sync_mode', 'none');
    • 回放

      回放是指回放上游数据库的DML操作,DataHub相当于是一个binlog,若是使用dts-datahub-hologres是否启用新的附加列规则,需要在Hologres建表时指定表属性如下。

      • 是否启用新的附加列规则选择为时,需要在Hologres中建表时配置如下表属性。

        call set_table_property('<table_name>', 'datahub_sync_mode', 'dts');
      • 是否启用新的附加列规则选择为时,需要在Hologres中建表时配置如下表属性。

        call set_table_property('<table_name>', 'datahub_sync_mode', 'dts_old');

      DTS在同步数据到DataHub时,会在数据列的基础上附加如下8列,用于描述回放数据信息(INSERT/UPDATE/DELETE),字段的主要说明如下。

      • 附加列命名方式

        旧版数据列名称

        新版数据列名称

        dts_${原始列名}

        new_dts_sync_dts_${原始列名}

      • 附加列说明

        旧版附加列名称

        新版附加列名称

        数据类型

        说明

        dts_record_id

        new_dts_sync_dts_record_id

        String

        增量日志的记录ID,为该日志唯一标识。

        dts_operation_flag

        new_dts_sync_dts_operation_flag

        String

        操作类型,取值:

        • I:INSERT操作。

        • D:DELETE操作。

        • U:UPDATE操作。

        dts_instance_id

        new_dts_sync_dts_instance_id

        String

        数据库的server ID。暂不支持显示实际的值,目前固定为NULL。

        dts_db_name

        new_dts_sync_dts_db_name

        String

        数据库名称。

        dts_table_name

        new_dts_sync_dts_table_name

        String

        表名。

        dts_utc_timestamp

        new_dts_sync_dts_utc_timestamp

        String

        操作时间戳,即Binlog的时间戳(UTC时间)。

        dts_before_flag

        new_dts_sync_dts_before_flag

        String

        所有列的值是否更新前的值,取值:Y或N。

        dts_after_flag

        new_dts_sync_dts_after_flag

        String

        所有列的值是否更新后的值,取值:Y或N。

  • 同步策略(主键冲突策略)

    当Hologres表设置主键时,从DataHub写入的数据有如下两种主键冲突策略。

    • 覆盖

      覆盖是指当写入发生主键冲突时,新的数据覆盖老数据,这个时候需要在Hologres建表时指定表属性如下。

      call set_table_property('<table_name>', 'datahub_upsert_mode', 'insert_or_replace');
    • 忽略

      忽略是指写入时发生主键冲突,忽略新数据,即数据不更新,仍然使用老数据,这个时候需要在Hologres建表时指定表属性如下。

      call set_table_property('<table_name>', 'datahub_upsert_mode', 'insert_or_ignore');
  • 同步模式与同步策略组合

    以上通过DataHub写入Hologres的几种模式,不同组合之间实现的效果不同,具体请参见以下。

    • 插入模式与覆盖策略组合

      相当于在Hologres中执行以下SQL。

      INSERT INTO target_table (column0,…,columnN) values (?,…,?) ON CONFLICT(PK) DO UPDATE
    • 插入模式与忽略策略组合

      相当于在Hologres中执行以下SQL。

      INSERT INTO target_table (column0,…,columnN) values (?,…,?) ON CONFLICT(PK) DO NOTHING
    • 回放模式与覆盖策略组合

      • dts_operation_flag=I,相当于在Hologres中执行以下SQL。

        INSERT INTO target_table (column0,…,columnN) values (?,…,?) ON CONFLICT(PK) DO UPDATE
      • dts_operation_flag=D,相当于在Hologres中执行以下SQL。

        DELETE FROM target_table where pk=?
      • dts_operation_flag=U AND dts_before_flag=Y,相当于在Hologres中执行以下SQL。

        DELETE FROM target_table where pk=?
      • dts_operation_flag=U AND dts_after_flag=Y,相当于在Hologres中执行以下SQL。

        INSERT INTO target_table (column0,…,columnN) values (?,…,?) ON CONFLICT(PK) DO UPDATE
    • 回放模式与忽略策略组合

      • dts_operation_flag=I,相当于在Hologres中执行以下SQL。

        INSERT INTO target_table (column0,…,columnN) values (?,…,?) ON CONFLICT(PK) DO NOTHING
      • dts_operation_flag=D,相当于在Hologres中执行以下SQL。

        DELETE FROM target_table where pk=?
      • dts_operation_flag=U AND dts_before_flag=Y,相当于在Hologres中执行以下SQL。

        DELETE FROM target_table where pk=?
      • dts_operation_flag=U AND dts_after_flag=Y,相当于在Hologres中执行以下SQL。

        INSERT INTO target_table (column0,…,columnN) values (?,…,?) ON CONFLICT(PK) DO NOTHING

操作步骤

  1. 准备DataHub数据源。

    1. 创建项目。

      1. 登录DataHub控制台,单击左侧导航栏的项目管理

      2. 项目列表页面单击新建项目

      3. 新建项目弹框,配置参数后,单击创建

      a
    2. 新建Topic。

      1. 成功创建项目后,在项目列表页面单击项目名称,进入项目详情页。

      2. 单击项目详情页右上角的新建Topic,进入新建Topic页面,填写配置参数。

      吧

      参数

      描述

      创建方式

      • 直接创建:创建新的Topic。

      • 导入MaxCompute表结构:选择MaxCompute中已有的表结构创建Topic。

      名称

      自定义Topic名称。

      类型

      Topic类型,分为以下两种:

      • TUPLE:结构化数据。

      • BLOB:非结构化数据。

      Schema详情

      选择TUPLE类型会出现Schema详情,根据自己需求创建字段,允许为NULL代表如果上游没有该字段值自动置为NULL;不允许为NULL则会严格检验,字段类型不匹配写入报错。

      Shard数量

      Shard表示对一个Topic进行数据传输的并发通道,每个Shard会有对应的ID。每个Shard会有多种状态:Opening - 启动中,Active - 启动完成可服务。每个Shard启用以后会占用一定的服务端资源,建议按需申请Shard数量。

      生命周期

      Topic中写入数据在系统中可以保存的最长时间,以天为单位,最小值为1,最大值为7。

      Shard扩展模式

      可选择开启,开启此开关后,Shard支持水平扩展,不再支持“合并”和“分裂”,此后Shard数量只能增加,不可减少。

      说明

      此模式开启后可以使用Kafka方式来消费当前Topic。

      启动多Version

      可选择开启,开启此开关后,Topic可以同时拥有多个Schema,可以选择其中一个Schema写入,消费端会根据每条数据标记的Version自适应完成解析(如对应的Version的Schema被删除则解析失败)。

      ● 用户无法再使用appendFields接口。

      ● 可以对chema进行增删改查。

      ● 创建Connector使用最新Version的Schema。

      描述

      Topic的描述信息。

    3. 写入数据。

      成功创建Topic后,您需要使用工具(例如Blink)或者程序写入数据至Topic中。

  2. Hologres创建数据接收表。

    在Hologres中创建一张用于接收数据的表,表的字段类型与DataHub中Topic的字段类型相互映射。

    DataHub与Hologres的数据类型映射如下表所示。

    DataHub

    Hologres

    TINYINT

    SMALLINT

    SMALLINT

    SMALLINT

    INTERGER

    INTERGER

    BIGINT

    BIGINT

    FLOAT

    REAL

    DOUBLE

    DOUBLE PRECISION

    DECIMAL

    DECIMAL

    STRING

    TEXT

    BOOLEAN

    BOOLEAN

    TIMESTAMP

    TIMESTAMPTZ

    示例建表语句如下。

    BEGIN;
    CREATE TABLE lineitem ( 
    L_ORDERKEY BIGINT NOT NULL,
    L_PARTKEY BIGINT NOT NULL,
    L_SUPPKEY BIGINT NOT NULL,
    L_LINENUMBER BIGINT NOT NULL,
    L_QUANTITY DECIMAL(20,10),
    L_EXTENDEDPRICE DECIMAL(20,10),
    L_DISCOUNT DECIMAL(20,10),
    L_TAX DECIMAL(20,10),
    L_RETURNFLAG TEXT,
    L_LINESTATUS TEXT,
    L_SHIPDATE TIMESTAMPTZ,
    L_COMMITDATE TIMESTAMPTZ,
    L_RECEIPTDATE TIMESTAMPTZ,
    L_SHIPINSTRUCT TEXT,
    L_SHIPMODE TEXT,
    L_COMMENT TEXT
    );
    
    CALL set_table_property('lineitem', 'orientation', 'column');
    CALL set_table_property('lineitem', 'datahub_sync_mode', 'none');
    CALL set_table_property('lineitem', 'datahub_upsert_mode', 'insert_or_ignore');
    
    COMMIT;
  3. 在DataHub中创建Hologres Connector。

    1. 单击DataHub中已创建的Topic,进入Topic详情页。

    2. 单击Topic详情页右上角的+同步

    3. 新建Connector界面单击Hologres,在新建Connector页面配置参数后,单击创建新建connector

      参数

      说明

      Instance

      Hologres实例的ID。进入Hologres管理控制台,获取实例ID

      Project

      Hologres的数据库名称。

      Topic

      Hologres用于接收数据的表名称。

      导入字段

      需要导入Hologres的字段。可以根据实际业务需求选择导入部分或全部字段。

      鉴权模式

      默认为AccessKey。

      AccessKey ID

      访问Hologres实例的AccessKey ID。您可以单击AccessKey 管理,获取用户的AccessKey ID。

      AccessKey Secret

      访问Hologres实例的AccessKey Secret。您可以单击AccessKey 管理,获取AccessKey Secret。

      Timestamp Unit

      同步时间单位,可选择如下。

      • MICROSECOND:微秒,为默认值。

      • MILLISECOND:毫秒。

      • SECOND:秒。

  4. 同步DataHub的数据至Hologres。

    成功创建Connector后,您可以在Topic详情页的同步任务中查看实时同步数据的状态。

  5. Hologres查询数据。

    您可以连接Hologres实例至开发工具,实时查询同步至Hologres中的数据,详情请参见概述。示例查询语句如下。

    SELECT COUNT(*) FROM lineitem;

常见报错

为您介绍在使用Hologres过程中的常见报错,以便于您能自行排查并解决问题。

  • 场景1:查询数据时,出现如下报错。

    ErrorMessage [Import field not found in dest schema;

    可能原因:未设置datahub_sync_mode参数值为dts

    解决办法:重新创建Hologres表,并设置表属性datahub_sync_modedts

  • 场景2:查询数据时,出现如下报错。

    ErrorCode=InternalServerError; ErrorMessage =Field already exists 

    可能原因:Hologres列datahub_sync_mode设置为dts,并且建表时包含了8列附加列。

    解决办法:重新创建Hologres表,设置datahub_sync_modedts时,字段只需要跟上游保持一致,无需多增加8列附加列。