Hologres Reader实现了从交互式分析(Hologres) 数仓导出数据的功能,您可以根据数据集成标准协议从Hologres表中导出数据至其它数据源。
背景信息
Hologres Reader通过PSQL读取Hologres表中的数据,根据表的Shard Count发起多个并发,每个Shard对应一个Select并发任务:
- Hologres在创建表时,在同一个
CREATE TABLE
事务中,通过CALL set_table_property('table_name', 'shard_count', 'xx')
配置表的Shard Count。默认情况下,使用数据库默认的Shard Count,具体数值取决于Hologres实例的配置。
- Select语句通过表的内置列hg_shard_id的Shard筛选数据。
支持的Hologres版本
Hologres支持的版本:0.7、0.8、0.9、0.10、1.1、1.2、1.3。
支持的字段类型
不支持UUID类型的字段。字段类型 | 离线读(Hologres Reader) | 离线写(Hologres Writer) | 实时写 |
---|---|---|---|
UUID | 不支持 | 不支持 | 不支持 |
CHAR | 支持 | 支持 | 支持 |
NCHAR | 支持 | 支持 | 支持 |
VARCHAR | 支持 | 支持 | 支持 |
LONGVARCHAR | 支持 | 支持 | 支持 |
NVARCHAR | 支持 | 支持 | 支持 |
LONGNVARCHAR | 支持 | 支持 | 支持 |
CLOB | 支持 | 支持 | 支持 |
NCLOB | 支持 | 支持 | 支持 |
SMALLINT | 支持 | 支持 | 支持 |
TINYINT | 支持 | 支持 | 支持 |
INTEGER | 支持 | 支持 | 支持 |
BIGINT | 支持 | 支持 | 支持 |
NUMERIC | 支持 | 支持 | 支持 |
DECIMAL | 支持 | 支持 | 支持 |
FLOAT | 支持 | 支持 | 支持 |
REAL | 支持 | 支持 | 支持 |
DOUBLE | 支持 | 支持 | 支持 |
TIME | 支持 | 支持 | 支持 |
DATE | 支持 | 支持 | 支持 |
TIMESTAMP | 支持 | 支持 | 支持 |
BINARY | 支持 | 支持 | 支持 |
VARBINARY | 支持 | 支持 | 支持 |
BLOB | 支持 | 支持 | 支持 |
LONGVARBINARY | 支持 | 支持 | 支持 |
BOOLEAN | 支持 | 支持 | 支持 |
BIT | 支持 | 支持 | 支持 |
JSON | 支持 | 支持 | 支持 |
JSONB | 支持 | 支持 | 支持 |
参数说明
参数 | 描述 | 是否必选 | 默认值 |
---|---|---|---|
endpoint | 目标交互式分析(Hologres)实例对应的endpoint,格式为instance-id-region-endpoint.hologres.aliyuncs.com:port 。您可以从交互式分析实例的管理页面获取。
endpoint包括经典网络、公网和VPC三种网络类型,请根据数据集成资源组和Hologres实例所在的网络环境选择正确的
endpoint类型,否则会出现网络不通或者性能受限的情况:
通常建议数据集成资源组和Hologres实例配在同一个地域的同一个可用区,以保证网络端口连通,实现最大性能。 |
是 | 无 |
accessId | 访问Hologres的accessId。 | 是 | 无 |
accessKey | 访问Hologres的accessKey,请确保该密钥对目标表有写入权限。 | 是 | 无 |
database | Hologres实例内部数据库的名称。 | 是 | 无 |
table | Hologres的表名称,如果是分区表,请指定父表的名称。 | 是 | 无 |
column | 定义导入目标表的数据列,必须包含目标表的主键集合。例如["*"] 表示全部列。 |
是 | 无 |
partition | 针对分区表,表示分区Column以及对应的Value,格式为column=value 。
重要
|
否 | 空,表示非分区表。 |
fetchSize | 指定使用Select语句一次性读取数据的条数。 | 否 | 1,000 |
向导开发介绍
- 选择数据源。
配置同步任务的 数据来源和 数据去向。
参数 描述 数据源 通常输入您配置的数据源名称。 表 即上述参数说明中的table。 数据过滤 您将要同步数据的筛选条件,SQL语法与选择的数据源一致,请勿填写where关键字。 - 字段映射,即上述参数说明中的column。
左侧的源头表字段和右侧的目标表字段为一一对应关系。单击 添加一行可以增加单个字段,鼠标放至需要删除的字段上,即可单击 删除图标进行删除 。
参数 描述 同名映射 单击同名映射,可以根据名称建立相应的映射关系,请注意匹配数据类型。 同行映射 单击同行映射,可以在同行建立相应的映射关系,请注意匹配数据类型。 取消映射 单击取消映射,可以取消建立的映射关系。 自动排版 可以根据相应的规律自动排版。 手动编辑源表字段 请手动编辑字段,一行表示一个字段,首尾空行会被采用,其他空行会被忽略。 添加一行 - 可以输入常量,输入的值需要使用英文单引号,例如'abc’、'123’等。
- 可以配合调度参数使用,例如${bizdate}等。
- 如果您输入的值无法解析,则类型显示为未识别。
- 通道控制。
参数 描述 任务期望最大并发数 数据同步任务内,可以从源并行读取或并行写入数据存储端的最大线程数。向导模式通过界面化配置并发数,指定任务所使用的并行度。 同步速率 设置同步速率可以保护读取端数据库,以避免抽取速度过大,给源库造成太大的压力。同步速率建议限流,结合源库的配置,请合理配置抽取速率。 错误记录数 错误记录数,表示脏数据的最大容忍条数。 分布式处理能力 数据同步时,可以将任务切片分散到多台执行节点上并发执行,提高同步速率。该模式下,配置较大任务并发数会增加数据存储访问压力,如需使用该功能,请提前评估数据存储的访问负载。该功能仅支持在独享数据集成资源组配置,详情请参见独享数据集成资源组和新增和使用独享数据集成资源组。
脚本开发介绍
- 配置非分区表
- 配置从Hologres非分区表读取数据至内存,如下所示。
{ "type":"job", "version":"2.0",//版本号。 "steps":[ { "stepType":"holo",//插件名。 "parameter":{ "endpoint": "instance-id-region-endpoint.hologres.aliyuncs.com:port", "accessId": "***************", //访问Hologres的accessId。 "accessKey": "*******************", //访问Hologres的accessKey。 "database": "postgres", "table": "holo_reader_****", "column" : [ //字段。 "tag", "id", "title" ] }, "name":"Reader", "category":"reader" }, { "stepType":"stream", "parameter":{}, "name":"Writer", "category":"writer" } ], "setting":{ "errorLimit":{ "record":"0"//错误记录数。 }, "speed":{ "throttle":true,//当throttle值为false时,mbps参数不生效,表示不限流;当throttle值为true时,表示限流。 "concurrent":1,//作业并发数。 "mbps":"12"//限流 } }, "order":{ "hops":[ { "from":"Reader", "to":"Writer" } ] } }
- Hologres表的DDL语句,如下所示。
begin; drop table if exists holo_reader_basic_src; create table holo_reader_basic_src( tag text not null, id int not null, title text not null, body text, primary key (tag, id)); call set_table_property('holo_reader_basic_src', 'orientation', 'column'); call set_table_property('holo_reader_basic_src', 'shard_count', '3'); commit;
- 配置从Hologres非分区表读取数据至内存,如下所示。
- 配置分区表
- 配置从内存产生的数据同步至Hologres分区表的子表,示例为通过SDK模式导入的配置。
说明 请注意 partition的配置。
{ "type":"job", "version":"2.0",//版本号。 "steps":[ { "stepType":"holo",//插件名。 "parameter":{ "endpoint": "instance-id-region-endpoint.hologres.aliyuncs.com:port", "accessId": "***************", //访问Hologres的accessId。 "accessKey": "*******************", //访问Hologres的accessKey。 "database": "postgres", "table": "holo_reader_basic_****", "partition": "tag=foo", "column" : [ "*" ], "fetchSize": "100" }, "name":"Reader", "category":"reader" }, { "stepType":"stream", "parameter":{}, "name":"Writer", "category":"writer" } ], "setting":{ "errorLimit":{ "record":"0"//错误记录数。 }, "speed":{ "throttle":true,//当throttle值为false时,mbps参数不生效,表示不限流;当throttle值为true时,表示限流。 "concurrent":1,//作业并发数。 "mbps":"12"//限流 } }, "order":{ "hops":[ { "from":"Reader", "to":"Writer" } ] } }
- Hologres表的DDL语句,如下所示。
begin; drop table if exists holo_reader_basic_part_src; create table holo_reader_basic_part_src( tag text not null, id int not null, title text not null, body text, primary key (tag, id)) partition by list( tag ); call set_table_property('holo_reader_basic_part_src', 'orientation', 'column'); call set_table_property('holo_reader_basic_part_src', 'shard_count', '3'); commit; create table holo_reader_basic_part_src_1583161774228 partition of holo_reader_basic_part_src for values in ('foo'); # 确保分区表子表已经创建且导入数据。 postgres=# \d+ holo_reader_basic_part_src Table "public.holo_reader_basic_part_src" Column | Type | Collation | Nullable | Default | Storage | Stats target | Description --------+---------+-----------+----------+---------+----------+--------------+------------- tag | text | | not null | | extended | | id | integer | | not null | | plain | | title | text | | not null | | extended | | body | text | | | | extended | | Partition key: LIST (tag) Indexes: "holo_reader_basic_part_src_pkey" PRIMARY KEY, btree (tag, id) Partitions: holo_reader_basic_part_src_1583161774228 FOR VALUES IN ('foo')
- 配置从内存产生的数据同步至Hologres分区表的子表,示例为通过SDK模式导入的配置。