本文为您介绍DataHub Writer支持的数据类型、字段映射和数据源等参数及配置示例。
DataHub是实时数据分发平台、流式数据(Streaming Data)的处理平台,提供对流式数据的发布(Publish)、订阅(Subscribe)和分发功能,让您可以轻松构建基于流式数据的分析和应用。
DataHub服务基于阿里云自研的飞天平台,具有高可用、低延迟、高可扩展和高吞吐的特点。它与阿里云流计算引擎StreamCompute无缝连接,您可以轻松使用SQL进行流数据分析。DataHub同时提供分发流式数据至MaxCompute(原ODPS)、OSS等云产品的功能。
重要 STRING字符串仅支持UTF-8编码,单个STRING列最长允许1 MB。
参数配置
通过Channel将Source与Sink连接,在Writer端的Channel要对应Reader端的Channel类型。通常Channel包括Memory-Channel和File-channel两种类型,如下配置即File通道。
"agent.sinks.dataXSinkWrapper.channel": "file"
参数说明
参数 | 描述 | 是否必选 | 默认值 |
---|---|---|---|
accessId | DataHub的accessId。 | 是 | 无 |
accessKey | DataHub的accessKey。 | 是 | 无 |
endPoint | 对DataHub资源的访问请求,需要根据资源所属服务,选择正确的域名。 | 是 | 无 |
maxRetryCount | 任务失败的最多重试次数。 | 否 | 无 |
mode | Value是STRING类型时,写入的模式。 | 是 | 无 |
parseContent | 解析内容。 | 是 | 无 |
project | 项目(Project)是DataHub数据的基本组织单元,一个Project下包含多个Topic。 说明 DataHub的项目空间与MaxCompute的项目相互独立,您在MaxCompute中创建的项目不能复用于DataHub,需要单独创建。 | 是 | 无 |
topic | Topic是DataHub订阅和发布的最小单位,您可以用Topic来表示一类或者一种流数据。 | 是 | 无 |
maxCommitSize | 为提高写出效率,DataX会积累Buffer数据,待积累的数据大小达到maxCommitSize 大小(单位Byte)时,批量提交到目的端。默认是1,048,576,即1 MB数据。另外datahub侧对于一次request请求写入的数据条数限制是10000条,超出10000条数据会超出限制导致任务出错,请根据您单条数据平均数据量*10000条的数据总量来从侧方面进行写入datahub的数据条数控制。 | 否 | 1MB |
向导开发介绍
- 选择数据源。 配置同步任务的数据来源和数据去向。
参数 描述 数据源 在下拉列表中选择您配置的数据源名称。 主题 即上述参数说明中的topic。 一次提交的数据量 一次向DataHub提交的数据量,单位byte。 最大重试次数 即上述参数说明中的maxRetryCount。 - 字段映射,即上述参数说明中的column。左侧的源头表字段和右侧的目标表字段为一一对应的关系。
参数 描述 同名映射 单击同名映射,可以根据名称建立相应的映射关系,请注意匹配数据类型。 同行映射 单击同行映射,可以在同行建立相应的映射关系,请注意匹配数据类型。 取消映射 单击取消映射,可以取消建立的映射关系。 自动排版 可以根据相应的规律自动排版。 - 通道控制。
参数 描述 任务期望最大并发数 数据同步任务内,可以从源并行读取或并行写入数据存储端的最大线程数。向导模式通过界面化配置并发数,指定任务所使用的并行度。 同步速率 设置同步速率可以保护读取端数据库,以避免抽取速度过大,给源库造成太大的压力。同步速率建议限流,结合源库的配置,请合理配置抽取速率。 错误记录数 错误记录数,表示脏数据的最大容忍条数。 分布式处理能力 数据同步时,可以将任务切片分散到多台执行节点上并发执行,提高同步速率。该模式下,配置较大任务并发数会增加数据存储访问压力,如需使用该功能,请提前评估数据存储的访问负载。该功能仅支持在独享数据集成资源组配置,详情请参见独享数据集成资源组和新增和使用独享数据集成资源组。
脚本开发介绍
配置一个从内存中读数据的同步作业,使用脚本模式开发的详情请参见通过脚本模式配置离线同步任务。
{
"type": "job",
"version": "2.0",//版本号。
"steps": [
{
"stepType": "stream",
"parameter": {},
"name": "Reader",
"category": "reader"
},
{
"stepType": "datahub",//插件名。
"parameter": {
"datasource": "",//数据源。
"topic": "",//Topic是DataHub订阅和发布的最小单位,您可以用Topic来表示一类或者一种流数据。
"maxRetryCount": 500,//任务失败的重试的最多次数。
"maxCommitSize": 1048576//待积累的数据Buffer大小达到maxCommitSize大小(单位Byte)时,批量提交至目的端。
//datahub侧对于一次request请求写入的数据条数限制是10000条,超出10000条数据会超出限制导致任务出错,请根据您单条数据平均数据量*10000条数据的数据总量来从侧方面进行单次写入datahub的数据条数控制。比如每条数据10 k,那么此参数的设置值要低于10*10000 k。
},
"name": "Writer",
"category": "writer"
}
],
"setting": {
"errorLimit": {
"record": ""//错误记录数。
},
"speed": {
"throttle":true,//当throttle值为false时,mbps参数不生效,表示不限流;当throttle值为true时,表示限流。
"concurrent":20, //作业并发数。
"mbps":"12"//限流
}
},
"order": {
"hops": [
{
"from": "Reader",
"to": "Writer"
}
]
}
}