Hologres Writer实现了导入数据至交互式分析(Hologres)的功能,您可以把多种数据源的数据导入Hologres进行实时分析。
使用限制
Hologres Writer不支持写入数据至Hologres的外部表。
实现原理
Hologres Writer通过数据同步框架获取Reader生成的协议数据,根据writeMode的配置选择不同的写入方式,并根据conflictMode的配置决定写入数据时的冲突解决策略:
- writeMode为SDK模式时,通过Hologres的Holohub来进行同步导入服务写入数据,为您提供最优的性能。
- writeMode为SQL模式时,通过PSQL的
INSERT INTO
命令(JDBC方式)写入数据,建议您采用此写入模式同步。
您可以通过配置conflictMode,决定新导入的数据和已有数据的主键发生冲突时,如何处理新导入的数据:
- conflictMode为Replace模式时,新数据覆盖旧数据。
- conflictMode为Ignore模式时,忽略新数据。
在不同的writeMode下,conflictMode的实现方式也不同。writeMode为SDK模式时,通过设置Hologres Table属性来改变此次导入的冲突解决模式。
注意 conflictMode仅适用于有主键的表。
参数说明
参数 | 描述 | 是否必选 | 默认值 |
---|---|---|---|
endpoint | 目标交互式分析(Hologres)实例对应的endpoint,格式为instance-id-region-endpoint.hologres.aliyuncs.com:port 。您可以从交互式分析实例的管理页面获取。
endpoint包括公网、经典网络和VPC三种网络类型,请根据数据集成资源组和Hologres实例所在的网络环境选择正确的endpoint类型,否则会出现网络不通或者性能受限的情况:
通常建议数据集成资源组和Hologres实例在同一个地域的同一个可用区,以确保网络连通,实现最大性能。 |
是 | 无 |
accessId | 访问Hologres的accessId。 | 是 | 无 |
accessKey | 访问Hologres的accessKey,请确保该密钥对目标表有写入权限。 | 是 | 无 |
database | Hologres实例内部数据库的名称。 | 是 | 无 |
table | Hologres的表名称,目前支持表名称中包含Schema,例如schema_name.table_name 。
|
是 | 无 |
writeMode | writeMode包括SDK和SQL(INSERT INTO),详情请参见实现原理。
在脚本模式中,SDK的可选配置如下:
|
是 | 无 |
conflictMode | conflictMode包括Replace和Ignore,详情请参见实现原理。 | 是 | 无 |
column | 定义导入目标表的数据列,必须包含目标表的主键集合。例如["*"] 表示全部列。
|
是 | 无 |
partition | 针对分区表,表示分区Column以及对应的Value,格式为column=value 。
说明
|
否 | 空,表示非分区表 |
向导开发介绍
- 选择数据源。
配置同步任务的数据来源和数据去向。
参数 描述 数据源 通常输入您配置的数据源名称。 表 即上述参数说明中的table。 写入模式 即上述参数说明中的writeMode。 写入冲突策略 即上述参数说明中的conflictMode。 - 字段映射,即上述参数说明中的column,左侧的源头表字段和右侧的目标表字段为一一对应关系。
参数 描述 同名映射 单击同名映射,可以根据名称建立相应的映射关系,请注意匹配数据类型。 同行映射 单击同行映射,可以在同行建立相应的映射关系,请注意匹配数据类型。 取消映射 单击取消映射,可以取消建立的映射关系。 自动排版 可以根据相应的规律自动排版。 - 通道控制。
参数 描述 任务期望最大并发数 数据同步任务内,可以从源并行读取或并行写入数据存储端的最大线程数。向导模式通过界面化配置并发数,指定任务所使用的并行度。 同步速率 设置同步速率可以保护读取端数据库,以避免抽取速度过大,给源库造成太大的压力。同步速率建议限流,结合源库的配置,请合理配置抽取速率。 错误记录数 错误记录数,表示脏数据的最大容忍条数。 分布式处理能力 数据同步时,可以将任务切片分散到多台执行节点上并发执行,提高同步速率。该模式下,配置较大任务并发数会增加数据存储访问压力,如需使用该功能,请提前评估数据存储的访问负载。该功能仅支持在独享数据集成资源组配置,详情请参见独享数据集成资源组和新增和使用独享数据集成资源组。
脚本开发介绍
使用脚本模式开发的详情请参见通过脚本模式配置任务。配置非分区表和分区表的示例如下:
- 配置非分区表
- 配置从内存产生的数据导入至Hologres普通表,示例为通过SDK(极速写入)模式导入的配置。
{ "type":"job", "version":"2.0",//版本号。 "steps":[ { "stepType":"stream", "parameter":{}, "name":"Reader", "category":"reader" }, { "stepType":"holo", "parameter":{ "endpoint": "instance-id-region-endpoint.hologres.aliyuncs.com:port", "accessId": "<yourAccessKeyId>", //访问Hologres的accessId。 "accessKey": "<yourAccessKeySecret>", //访问Hologres的accessKey。 "database": "postgres", "table": "<yourTableName>", "writeMode": "sdk", "conflictMode": "replace", "column" : [ "tag", "id", "title" ], "maxCommitSize": 1048576, "maxRetryCount": 500 }, "name":"Writer", "category":"writer" } ], "setting":{ "errorLimit":{ "record":"0"//错误记录数。 }, "speed":{ "throttle":true,//当throttle值为false时,mbps参数不生效,表示不限流;当throttle值为true时,表示限流。 "concurrent":1, //作业并发数。 "mbps":"12"//限流 } }, "order":{ "hops":[ { "from":"Reader", "to":"Writer" } ] } }
- Hologres表的DDL语句,如下所示。
begin; drop table if exists test_holowriter_sdk_replace; create table test_holowriter_sdk_replace( tag text not null, id int not null, body text not null primary key (tag, id)); call set_table_property('test_holowriter_sdk_replace', 'orientation', 'column'); call set_table_property('test_holowriter_sdk_replace', 'shard_count', '3'); commit;
- 配置从内存产生的数据导入至Hologres普通表,示例为通过SDK(极速写入)模式导入的配置。
- 配置分区表
- 配置从内存产生的数据同步至Hologres分区表的子表,示例为通过SDK(极速写入)模式导入的配置。
说明 请注意partition的配置。
{ "type":"job", "version":"2.0",//版本号。 "steps":[ { "stepType":"stream", "parameter":{}, "name":"Reader", "category":"reader" }, { "stepType":"holo", "parameter":{ "endpoint": "instance-id-region-endpoint.hologres.aliyuncs.com:port", "accessId": "<yourAccessKeyId>", //访问Hologres的accessId。 "accessKey": "<yourAccessKeySecret>", //访问Hologres的accessKey。 "database": "postgres", "table": "<yourTableName>", "writeMode": "sdk", "conflictMode": "ignore", "column" : [ "*" ], "partition": "tag=foo", "maxCommitSize": 1048576, "maxRetryCount": 500 }, "name":"Writer", "category":"writer" } ], "setting":{ "errorLimit":{ "record":"0"//错误记录数。 }, "speed":{ "throttle":true,//当throttle值为false时,mbps参数不生效,表示不限流;当throttle值为true时,表示限流。 "concurrent":1, //作业并发数。 "mbps":"12"//限流 } }, "order":{ "hops":[ { "from":"Reader", "to":"Writer" } ] } }
- Hologres表的DDL语句,如下所示。
begin; drop table if exists test_holowriter_part_table_sdk_ignore; create table test_holowriter_part_table_sdk_ignore( tag text not null, id int not null, title text not null, body text, primary key (tag, id)) partition by list( tag ); call set_table_property('test_holowriter_part_table_sdk_ignore', 'orientation', 'column'); call set_table_property('test_holowriter_part_table_sdk_ignore', 'shard_count', '3'); commit;
- 配置从内存产生的数据同步至Hologres分区表的子表,示例为通过SDK(极速写入)模式导入的配置。