Hologres Reader实现了从交互式分析(Hologres) 数仓导出数据的功能,您可以根据数据集成标准协议从Hologres表中导出数据至其它数据源。

背景信息

重要 Hologres Reader仅支持使用 新增和使用独享数据集成资源组,不支持使用 使用公共资源组自定义资源组
Hologres Reader通过PSQL读取Hologres表中的数据,根据表的Shard Count发起多个并发,每个Shard对应一个Select并发任务:
  • Hologres在创建表时,在同一个CREATE TABLE事务中,通过CALL set_table_property('table_name', 'shard_count', 'xx')配置表的Shard Count。

    默认情况下,使用数据库默认的Shard Count,具体数值取决于Hologres实例的配置。

  • Select语句通过表的内置列hg_shard_id的Shard筛选数据。

支持的Hologres版本

Hologres支持的版本:0.7、0.8、0.9、0.10、1.1、1.2、1.3。

支持的字段类型

不支持UUID类型的字段。
字段类型 离线读(Hologres Reader) 离线写(Hologres Writer) 实时写
UUID 不支持 不支持 不支持
CHAR 支持 支持 支持
NCHAR 支持 支持 支持
VARCHAR 支持 支持 支持
LONGVARCHAR 支持 支持 支持
NVARCHAR 支持 支持 支持
LONGNVARCHAR 支持 支持 支持
CLOB 支持 支持 支持
NCLOB 支持 支持 支持
SMALLINT 支持 支持 支持
TINYINT 支持 支持 支持
INTEGER 支持 支持 支持
BIGINT 支持 支持 支持
NUMERIC 支持 支持 支持
DECIMAL 支持 支持 支持
FLOAT 支持 支持 支持
REAL 支持 支持 支持
DOUBLE 支持 支持 支持
TIME 支持 支持 支持
DATE 支持 支持 支持
TIMESTAMP 支持 支持 支持
BINARY 支持 支持 支持
VARBINARY 支持 支持 支持
BLOB 支持 支持 支持
LONGVARBINARY 支持 支持 支持
BOOLEAN 支持 支持 支持
BIT 支持 支持 支持
JSON 支持 支持 支持
JSONB 支持 支持 支持

参数说明

参数 描述 是否必选 默认值
endpoint 目标交互式分析(Hologres)实例对应的endpoint,格式为instance-id-region-endpoint.hologres.aliyuncs.com:port。您可以从交互式分析实例的管理页面获取。
endpoint包括经典网络、公网和VPC三种网络类型,请根据数据集成资源组和Hologres实例所在的网络环境选择正确的 endpoint类型,否则会出现网络不通或者性能受限的情况:
  • 经典网络示例:instance-id-region-endpoint-internal.hologres.aliyuncs.com:port
  • 公网示例:instance-id-region-endpoint.hologres.aliyuncs.com:port
  • VPC示例:instance-id-region-endpoint-vpc.hologres.aliyuncs.com:port

通常建议数据集成资源组和Hologres实例配在同一个地域的同一个可用区,以保证网络端口连通,实现最大性能。

accessId 访问Hologres的accessId
accessKey 访问Hologres的accessKey,请确保该密钥对目标表有写入权限。
database Hologres实例内部数据库的名称。
table Hologres的表名称,如果是分区表,请指定父表的名称。
column 定义导入目标表的数据列,必须包含目标表的主键集合。例如["*"]表示全部列。
partition 针对分区表,表示分区Column以及对应的Value,格式为column=value
重要
  • 目前Hologres仅支持LIST分区,分区Column仅支持单个Column分区,且仅支持INT4或TEXT类型。
  • 请确认该参数和表DDL的分区配置匹配。
  • 请确认对应的子表已经创建,且已经导入数据。
,表示非分区表。
fetchSize 指定使用Select语句一次性读取数据的条数。 1,000

向导开发介绍

  1. 选择数据源。
    配置同步任务的 数据来源数据去向数据源
    参数 描述
    数据源 通常输入您配置的数据源名称。
    即上述参数说明中的table
    数据过滤 您将要同步数据的筛选条件,SQL语法与选择的数据源一致,请勿填写where关键字。
  2. 字段映射,即上述参数说明中的column
    左侧的源头表字段和右侧的目标表字段为一一对应关系。单击 添加一行可以增加单个字段,鼠标放至需要删除的字段上,即可单击 删除图标进行删除 。 字段映射
    参数 描述
    同名映射 单击同名映射,可以根据名称建立相应的映射关系,请注意匹配数据类型。
    同行映射 单击同行映射,可以在同行建立相应的映射关系,请注意匹配数据类型。
    取消映射 单击取消映射,可以取消建立的映射关系。
    自动排版 可以根据相应的规律自动排版。
    手动编辑源表字段 请手动编辑字段,一行表示一个字段,首尾空行会被采用,其他空行会被忽略。
    添加一行
    • 可以输入常量,输入的值需要使用英文单引号,例如'abc’'123’等。
    • 可以配合调度参数使用,例如${bizdate}等。
    • 如果您输入的值无法解析,则类型显示为未识别。
  3. 通道控制。通道配置
    参数 描述
    任务期望最大并发数 数据同步任务内,可以从源并行读取或并行写入数据存储端的最大线程数。向导模式通过界面化配置并发数,指定任务所使用的并行度。
    同步速率 设置同步速率可以保护读取端数据库,以避免抽取速度过大,给源库造成太大的压力。同步速率建议限流,结合源库的配置,请合理配置抽取速率。
    错误记录数 错误记录数,表示脏数据的最大容忍条数。
    分布式处理能力

    数据同步时,可以将任务切片分散到多台执行节点上并发执行,提高同步速率。该模式下,配置较大任务并发数会增加数据存储访问压力,如需使用该功能,请提前评估数据存储的访问负载。该功能仅支持在独享数据集成资源组配置,详情请参见独享数据集成资源组新增和使用独享数据集成资源组

脚本开发介绍

  • 配置非分区表
    • 配置从Hologres非分区表读取数据至内存,如下所示。
      {
          "type":"job",
          "version":"2.0",//版本号。
          "steps":[
              {
                  "stepType":"holo",//插件名。
                  "parameter":{
                      "endpoint": "instance-id-region-endpoint.hologres.aliyuncs.com:port",
                      "accessId": "***************", //访问Hologres的accessId。
                      "accessKey": "*******************", //访问Hologres的accessKey。
                      "database": "postgres",
                      "table": "holo_reader_****",
                      "column" : [ //字段。
                          "tag",
                          "id",
                          "title"
                      ]
                  },
                  "name":"Reader",
                  "category":"reader"
              },
              {
                  "stepType":"stream",
                  "parameter":{},
                  "name":"Writer",
                  "category":"writer"
              }
          ],
          "setting":{
              "errorLimit":{
                  "record":"0"//错误记录数。
              },
              "speed":{
                  "throttle":true,//当throttle值为false时,mbps参数不生效,表示不限流;当throttle值为true时,表示限流。
                  "concurrent":1,//作业并发数。 
                        "mbps":"12"//限流
              }
          },
          "order":{
              "hops":[
                  {
                      "from":"Reader",
                      "to":"Writer"
                  }
              ]
          }
      }
    • Hologres表的DDL语句,如下所示。
      begin;
      drop table if exists holo_reader_basic_src;
      create table holo_reader_basic_src(
        tag text not null, 
        id int not null, 
        title text not null, 
        body text, 
        primary key (tag, id));
        call set_table_property('holo_reader_basic_src', 'orientation', 'column');
        call set_table_property('holo_reader_basic_src', 'shard_count', '3');
      commit;
  • 配置分区表
    • 配置从内存产生的数据同步至Hologres分区表的子表,示例为通过SDK模式导入的配置。
      说明 请注意 partition的配置。
      {
          "type":"job",
          "version":"2.0",//版本号。
          "steps":[
              {
                  "stepType":"holo",//插件名。
                  "parameter":{
                      "endpoint": "instance-id-region-endpoint.hologres.aliyuncs.com:port",
                      "accessId": "***************", //访问Hologres的accessId。
                      "accessKey": "*******************", //访问Hologres的accessKey。
                      "database": "postgres",
                      "table": "holo_reader_basic_****",
                      "partition": "tag=foo",
                      "column" : [
                          "*"
                      ],
                      "fetchSize": "100"
                  },
                  "name":"Reader",
                  "category":"reader"
              },
              {
                  "stepType":"stream",
                  "parameter":{},
                  "name":"Writer",
                  "category":"writer"
              }
          ],
          "setting":{
              "errorLimit":{
                  "record":"0"//错误记录数。
              },
              "speed":{
                  "throttle":true,//当throttle值为false时,mbps参数不生效,表示不限流;当throttle值为true时,表示限流。
                  "concurrent":1,//作业并发数。
                        "mbps":"12"//限流
              }
          },
          "order":{
              "hops":[
                  {
                      "from":"Reader",
                      "to":"Writer"
                  }
              ]
          }
      }
    • Hologres表的DDL语句,如下所示。
      begin;
      drop table if exists holo_reader_basic_part_src;
      create table holo_reader_basic_part_src(
        tag text not null, 
        id int not null, 
        title text not null, 
        body text, 
        primary key (tag, id))
        partition by list( tag );
        call set_table_property('holo_reader_basic_part_src', 'orientation', 'column');
        call set_table_property('holo_reader_basic_part_src', 'shard_count', '3');
      commit;
      
      create table holo_reader_basic_part_src_1583161774228 partition of holo_reader_basic_part_src for values in ('foo');
      
      # 确保分区表子表已经创建且导入数据。
      postgres=# \d+ holo_reader_basic_part_src
                               Table "public.holo_reader_basic_part_src"
       Column |  Type   | Collation | Nullable | Default | Storage  | Stats target | Description 
      --------+---------+-----------+----------+---------+----------+--------------+-------------
       tag    | text    |           | not null |         | extended |              | 
       id     | integer |           | not null |         | plain    |              | 
       title  | text    |           | not null |         | extended |              | 
       body   | text    |           |          |         | extended |              | 
      Partition key: LIST (tag)
      Indexes:
          "holo_reader_basic_part_src_pkey" PRIMARY KEY, btree (tag, id)
      Partitions: holo_reader_basic_part_src_1583161774228 FOR VALUES IN ('foo')