本文为您介绍LogHub Reader支持的数据类型、字段映射和数据源等参数及配置示例。

背景信息

日志服务(Log Service)是针对实时数据的一站式服务,为您提供日志类数据采集、消费、投递及查询分析功能,全面提升海量日志的处理、分析能力。LogHub Reader是使用日志服务的Java SDK消费LogHub中的实时日志数据,并将日志数据转换为数据集成传输协议传递给Writer。

实现原理

LogHub Reader通过日志服务Java SDK消费LogHub中的实时日志数据,具体使用的日志服务Java SDK版本,如下所示。
<dependency>
    <groupId>com.aliyun.openservices</groupId>
    <artifactId>aliyun-log</artifactId>
    <version>0.6.7</version>
</dependency>
日志库(Logstore)是日志服务中日志数据的采集、存储和查询单元,Logstore读写日志必定保存在某一个分区(Shard)上。每个日志库分若干个分区,每个分区由MD5左闭右开区间组成,每个区间范围不会相互覆盖,并且所有的区间的范围是MD5整个取值范围,每个分区可以提供一定的服务能力:
  • 写入:5MB/s,2000次/s。
  • 读取:10MB/s,100次/s。
LogHub Reader消费Shard中的日志,具体消费过程(GetCursor、BatchGetLog相关API)如下所示:
  • 根据时间区间范围获得游标。
  • 通过游标、步长参数读取日志,同时返回下一个位置游标。
  • 不断移动游标进行日志消费。
  • 根据Shard进行任务的切分并发执行。

类型转换列表

LogHub Reader针对LogHub类型的转换列表,如下所示。
数据集成内部类型 LogHub数据类型
STRING STRING

参数说明

参数 描述 是否必选 默认值
endpoint 日志服务入口endpoint是访问一个项目(Project)及其内部日志数据的URL。它和Project所在的阿里云地域(Region)及Project名称相关。各地域的服务入口请参见服务入口
accessId 访问日志服务的访问密钥,用于标识用户。
accessKey 访问日志服务的访问密钥,用来验证用户的密钥。
project 目标日志服务的项目名称,是日志服务中的资源管理单元,用于隔离和控制资源。
logstore 目标日志库的名称,logstore是日志服务中日志数据的采集、存储和查询单元。
batchSize 一次从日志服务查询的数据条数。 128
column 每条数据中的列名,此处可以配置日志服务中的元数据作为同步列。日志服务支持日志主题、采集机器唯一标识、主机名、路径和日志时间等元数据。
说明 列名区分大小写。元数据的写法请参见日志服务机器组
beginDateTime 数据消费的开始时间位点,即日志数据到达Loghub的时间。该参数为时间范围(左闭右开)的左边界,yyyyMMddHHmmss格式的时间字符串(例如20180111013000),可以和DataWorks的调度时间参数配合使用。
说明 beginDateTimeendDateTime需要互相组合配套使用。
beginTimestampMillis选择一种
endDateTime 数据消费的结束时间位点,为时间范围(左闭右开)的右边界,yyyyMMddHHmmss格式的时间字符串(例如20180111013010),可以和DataWorks的调度时间参数配合使用。
说明 请尽量保证周期之间重合。即上周期的endDateTime时间和下周期的beginDateTime时间一致,或比下周期的beginDateTime时间晚。否则,可能无法拉取部分区域的数据。
endTimestampMillis选择一种
beginTimestampMillis 数据消费的开始时间位点。该参数为时间范围(左闭右开)的左边界,单位为毫秒。
说明 beginTimestampMillisendTimestampMillis组合配套使用。

-1表示日志服务游标的最开始CursorMode.BEGIN。推荐使用beginDateTime模式。

beginDateTime选择一种
endTimestampMillis 数据消费的结束时间位点,为时间范围(左闭右开)的右边界,单位为毫秒。
说明 endTimestampMillis和beginTimestampMillis组合配套使用。

-1表示日志服务游标的最后位置CursorMode.END。推荐使用endDateTime模式。

endDateTime选择一种

向导开发介绍

  1. 选择数据源。
    配置同步任务的数据来源数据去向选择数据源
    参数 描述
    数据源 即上述参数说明中的datasource,通常输入您配置的数据源名称。
    Logstore 目标日志库的名称。
    日志开始时间 数据消费的开始时间位点,即日志数据到达Loghub的时间。时间范围(左闭右开)的左边界,yyyyMMddHHmmss格式的时间字符串(例如20180111013000),可以和DataWorks的调度时间参数配合使用。
    日志结束时间 数据消费的结束时间位点,时间范围(左闭右开)的右边界,yyyyMMddHHmmss格式的时间字符串(例如20180111013010),可以和DataWorks的调度时间参数配合使用。
    批量条数 一次从日志服务查询的数据条数。
  2. 字段映射,即上述参数说明中的column
    左侧的源头表字段和右侧的目标表字段为一一对应关系。单击添加一行可以增加单个字段,鼠标放至需要删除的字段上,即可单击删除图标进行删除。删除
    参数 描述
    同名映射 单击同名映射,可以根据名称建立相应的映射关系,请注意匹配数据类型。
    同行映射 单击同行映射,可以在同行建立相应的映射关系,请注意匹配数据类型。
    取消映射 单击取消映射,可以取消建立的映射关系。
    自动排版 可以根据相应的规律自动排版。
    手动编辑源表字段 请手动编辑字段,一行表示一个字段,首尾空行会被采用,其他空行会被忽略。
  3. 通道控制。通道控制台
    参数 描述
    任务期望最大并发数 数据同步任务内,可以从源并行读取或并行写入数据存储端的最大线程数。向导模式通过界面化配置并发数,指定任务所使用的并行度。
    同步速率 设置同步速率可以保护读取端数据库,以避免抽取速度过大,给源库造成太大的压力。同步速率建议限流,结合源库的配置,请合理配置抽取速率。
    错误记录数 错误记录数,表示脏数据的最大容忍条数。

脚本开发介绍

使用脚本模式开发的详情请参见通过脚本模式配置任务

脚本配置样例如下所示,具体参数填写请参见参数说明。
{
 "type":"job",
 "version":"2.0",//版本号。
 "steps":[
     {
         "stepType":"loghub",//插件名。
         "parameter":{
             "datasource":"",//数据源。
             "column":[//字段。
                 "col0",
                 "col1",
                 "col2",
                 "col3",
                 "col4",
                 "C_Category",
                 "C_Source",
                 "C_Topic",
                 "C_MachineUUID", //日志主题。
                 "C_HostName", //主机名。
                 "C_Path", //路径。
                 "C_LogTime" //日志时间。
             ],
             "beginDateTime":"",//数据消费的开始时间位点。
             "batchSize":"",//一次从日志服务查询的数据条数。
             "endDateTime":"",//数据消费的结束时间位点。
             "fieldDelimiter":",",//列分隔符。
             "logstore":""//:目标日志库的名字。
         },
         "name":"Reader",
         "category":"reader"
     },
     { 
         "stepType":"stream",
         "parameter":{},
         "name":"Writer",
         "category":"writer"
     }
 ],
 "setting":{
     "errorLimit":{
         "record":"0"//错误记录数。
     },
     "speed":{
         "throttle":false,//false代表不限流,下面的限流的速度不生效;true代表限流。
         "concurrent":1 //作业并发数。
     }
 },
 "order":{
     "hops":[
         {
             "from":"Reader",
             "to":"Writer"
         }
     ]
 }
}
说明 如果元数据配置JSON中有tag前缀,需要删除tag前缀。例如,__tag__:__client_ip__需要修改为__client_ip__