阿里云流数据处理平台DataHub是流式数据(Streaming Data)的处理平台,为您提供发布(Publish)、订阅(Subscribe)和分发流式数据的功能,让您可以轻松构建基于流式数据的分析和应用。

DataHub Reader通过DataHub的Java SDK读取DataHub中的数据,具体使用的Java SDK版本,如下所示。
<dependency>
    <groupId>com.aliyun.DataHub</groupId>
    <artifactId>aliyun-sdk-DataHub</artifactId>
    <version>2.9.1</version>
</dependency>

参数说明

参数描述是否必选
endpointDataHub的endpoint
accessId访问DataHub的用户accessId
accessKey 访问DataHub的用户accessKey
project目标DataHub的项目名称。project是DataHub中的资源管理单元,用于资源隔离和控制。
topic目标DataHub的topic名称。
batchSize一次读取的数据量,默认为1,024条。
beginDateTime数据消费的开始时间位点。该参数是时间范围(左闭右开)的左边界,yyyyMMddHHmmss格式的时间字符串,可以和DataWorks的调度时间参数配合使用。
说明 beginDateTimeendDateTime需要互相组合配套使用。
endDateTime数据消费的结束时间位点。该参数是时间范围(左闭右开)的右边界,yyyyMMddHHmmss格式的时间字符串,可以和DataWorks的调度时间参数配合使用。
说明 beginDateTimeendDateTime需要互相组合配套使用。

向导开发介绍

  1. 选择数据源。
    配置同步任务的数据来源数据去向选择数据源
    参数描述
    数据源在下拉列表中选择您配置的数据源名称。
    主题即上述参数说明中的topic
    消费开始时间即上述参数说明中的beginDateTime
    消费结束时间即上述参数说明中的endDateTime
    批量条数即上述参数说明中的batchSize
  2. 字段映射,即上述参数说明中的column
    左侧的源头表字段和右侧的目标表字段为一一对应关系。单击添加一行可以增加单个字段。鼠标放至需要删除的字段上,即可单击删除图标进行删除 。字段映射
    参数描述
    同名映射单击同名映射,可以根据名称建立相应的映射关系,请注意匹配数据类型。
    同行映射单击同行映射,可以在同行建立相应的映射关系,请注意匹配数据类型。
    取消映射单击取消映射,可以取消建立的映射关系。
    自动排版可以根据相应的规律自动排版。
    手动编辑源表字段请手动编辑字段,一行表示一个字段,首尾空行会被采用,其它空行会被忽略。
    添加一行添加一行的功能如下所示:
    • 可以输入常量,输入的值需要使用英文单引号。例如,'abc''123'等。
    • 可以配合调度参数使用。例如,${bizdate}等。
    • 可以输入关系数据库支持的函数。例如,now()count(1)等。
    • 如果您输入的值无法解析,则类型显示为未识别
  3. 通道控制。通道配置
    参数描述
    任务期望最大并发数数据同步任务内,可以从源并行读取或并行写入数据存储端的最大线程数。向导模式通过界面化配置并发数,指定任务所使用的并行度。
    说明 MySQL Reader插件在进行分库分表等多表同步时,若要对单表进行切分,则需要满足任务并发数大于表个数这一条件,否则切分的Task数目等于表的个数。
    同步速率设置同步速率可以保护读取端数据库,以避免抽取速度过大,给源库造成太大的压力。同步速率建议限流,结合源库的配置,请合理配置抽取速率。
    错误记录数错误记录数,表示脏数据的最大容忍条数。
    分布式处理能力

    数据同步时,可以将任务切片分散到多台执行节点上并发执行,提高同步速率。该模式下,配置较大任务并发数会增加数据存储访问压力,如需使用该功能,请提前评估数据存储的访问负载。该功能仅支持在独享数据集成资源组配置,详情请参见独享数据集成资源组新增和使用独享数据集成资源组

脚本开发介绍

配置一个从DataHub读取数据的作业,使用脚本开发的详情请参见通过脚本模式配置离线同步任务
{
    "type":"job",
    "version":"2.0",//版本号
    "steps":[
        {
         "job": {
           "content": [
            {
                "reader": {
                    "name": "DataHubreader",
                    "parameter": {
                        "endpoint": "xxx" //DataHub的endpoint。
                        "accessId": "xxx", //访问DataHub的用户accessId。
                        "accessKey": "xxx", //访问DataHub的用户accessKey。
                        "project": "xxx", //目标DataHub的项目名称。
                        "topic": "xxx" //目标DataHub的topic名称。
                        "batchSize": 1000, //一次读取的数据量。
                        "beginDateTime": "20180910111214", //数据消费的开始时间位点。
                        "endDateTime": "20180910111614", //数据消费的结束时间位点。
                        "column": [
                            "col0",
                            "col1",
                            "col2",
                            "col3",
                            "col4"
                                  ]
                                }
                           },
                "writer": {
                    "name": "streamwriter",
                    "parameter": {
                        "print": false
                                 }
                            }
             }
           ]
         }
       }
     ],
    "setting":{
        "errorLimit":{
            "record":"0"//错误记录数
        },
        "speed":{
            "throttle":true,//当throttle值为false时,mbps参数不生效,表示不限流;当throttle值为true时,表示限流。
            "concurrent":1,//并发数
            "mbps":"12"//限流
        }
    },
    "order":{
        "hops":[
            {
                "from":"Reader",
                "to":"Writer"
            }
        ]
    }
}