This topic describes the data types and parameters supported by LogHub Reader and how to configure it by using the codeless user interface (UI) and code editor.

As an all-in-one real-time data logging service, Log Service provides features to collect, consume, deliver, query, and analyze log data. It can comprehensively improve the capabilities to process and analyze numerous logs. LogHub Reader consumes real-time log data in LogHub by using the Java SDK for Log Service, converts the data to a format that is readable by the Data Integration service, and sends the converted data to a writer.

How it works

LogHub Reader consumes real-time log data in LogHub by using the following version of the Java SDK for Log Service:
<dependency>
    <groupId>com.aliyun.openservices</groupId>
    <artifactId>aliyun-log</artifactId>
    <version>0.6.7</version>
</dependency>
In Log Service, Logstore is a basic unit for collecting, storing, and querying log data. The read and write logs of a Logstore are stored in a shard. Each Logstore consists of several shards, each of which is defined by a left-closed and right-open interval of MD5 so that intervals do not overlap each other. The range of all intervals covers all the allowed MD5 values. Each shard can independently provide some services.
  • Write: 5 Mbit/s, 2000 times/s.
  • Read: 10 Mbit/s, 100 times/s.
LogHub Reader consumes log data in shards by following this process that uses the GetCursor and BatchGetLog API operations:
  • Obtain a cursor based on the time range.
  • Read logs based on the cursor and step parameters and return the next cursor.
  • Keep moving the cursor to consume logs.
  • Split the node to concurrent threads based on shards.

Data types

The following table lists the data types supported by LogHub Reader.
Data Integration data type LogHub data type
STRING STRING

Parameters

Parameter Description Required Default value
endpoint The Log Service endpoint, which is a URL for accessing a project and log data. It varies depending on the Alibaba Cloud region where the project resides and the project name. For more information about the endpoint of each region, see Service endpoint. Yes None
accessId The AccessKey ID for accessing Log Service. Yes None
accessKey The AccessKey secret for accessing Log Service. Yes None
project The name of the project. A project is the basic unit for managing resources in Log Service. You can exercise access control at the project level, and isolate resources among different projects. Yes None
logstore The name of the Logstore. A Logstore is the basic unit for collecting, storing, and querying log data in Log Service. Yes None
batchSize The number of entries queried from Log Service at a time. No 128
column The name of the columns to be synchronized. You can set this parameter to the metadata in Log Service. In Log Service, the column names can be the log topic, unique identifier of the host, hostname, path, and log time.
Note The column name is case-sensitive. For more information about column names in Log Service, see Log Service server group.
Yes None
beginDateTime The start time of data consumption, that is, the time when log data arrives at LogHub. This parameter defines the left boundary of an interval (left-closed and right-open) in the format of yyyyMMddHHmmss, for example, 20180111013000. The parameter can work with the scheduling time parameter in DataWorks.
Note The beginDateTime and endDateTime parameters must be used in pairs.
You must specify either beginDateTime or beginTimestampMillis, but not both. None
endDateTime The end time of data consumption in the format of yyyyMMddHHmmss, such as 20180111013010. This parameter defines the right boundary of an interval (left-closed and right-open) and can work with the scheduling time parameter in DataWorks.
Note Make sure that the intervals overlap. That is, the time specified by the endDateTime parameter of the previous interval is the same as or later than the time specified by the beginDateTime parameter of the current interval. If the intervals do not overlap, data may not be pulled in some regions.
You must specify either endDateTime or endTimestampMillis, but not both. None
beginTimestampMillis The start time of data consumption. This parameter specifies the left boundary of the interval (left-closed and right-open), measured in milliseconds.
Note The beginTimestampMillis and endTimestampMillis parameters must be used in pairs.

The value -1 indicates the position where the cursor starts in Log Service, which is specified by CursorMode.BEGIN. We recommend that you specify the beginDateTime parameter.

You must specify either beginTimestampMillis or beginDateTime, but not both. None
endTimestampMillis The end time of data consumption, measured in milliseconds. This parameter defines the right boundary of the left-closed and right-open interval.
Note The endTimestampMillis and beginTimestampMillis parameters must be used in pairs.

The value -1 indicates the position where the cursor ends in Log Service, which is specified by CursorMode.END. We recommend that you specify the endDateTime parameter.

You must specify either endTimestampMillis or endDateTime, but not both. None

Configure LogHub Reader by using the codeless UI

  1. Configure the connections.
    Configure the source and destination connections for the sync node.Connections
    Parameter Description
    Connection The datasource parameter in the preceding parameter description. Select a connection type, and enter the name of a connection that has been configured in DataWorks.
    Logstore The name of the target Logstore.
    Start Timestamp The start time of data consumption, that is, the time when log data arrives at LogHub. This parameter defines the left boundary of an interval (left-closed and right-open) in the format of yyyyMMddHHmmss, for example, 20180111013000. The parameter can work with the scheduling time parameter in DataWorks.
    End Timestamp The end time of data consumption in the format of yyyyMMddHHmmss, such as 20180111013010. This parameter defines the right boundary of an interval (left-closed and right-open) and can work with the scheduling time parameter in DataWorks.
    Records per Batch The number of entries queried from Log Service at a time.
  2. Configure field mapping, that is, the column parameter in the preceding parameter description.
    Fields in the source table on the left have a one-to-one mapping with fields in the destination table on the right. You can click Add to add a field, or move the pointer over a field and click the Delete icon to delete the field.Delete
    Parameter Description
    Map Fields with the Same Name Click Map Fields with the Same Name to establish a mapping between fields with the same name. Note that the data types of the fields must match.
    Fields in the Same Line Click Map Fields in the Same Line to establish a mapping for fields in the same row. Note that the data types of the fields must match.
    Delete All Mappings Click Delete All Mappings to remove mappings that have been established.
    Auto Layout Click Auto Layout. The fields are automatically sorted based on specified rules.
    Change Fields Click the Change Fields icon. In the Change Fields dialog box that appears, you can manually edit fields in the source table. Each field occupies a row. The first and the last blank rows are included, whereas other blank rows are ignored.
  3. Configure channel control policies.Channel
    Parameter Description
    Expected Maximum Concurrency The maximum number of concurrent threads to read and write data to data storage within the sync node. You can configure the concurrency for a node on the codeless UI.
    Bandwidth Throttling Specifies whether to enable bandwidth throttling. You can enable bandwidth throttling and set a maximum transmission rate to avoid heavy read workload of the source. We recommend that you enable bandwidth throttling and set the maximum transmission rate to a proper value.
    Dirty Data Records Allowed The maximum number of dirty data records allowed.
    Resource Group The resource group used for running the sync node. If a large number of nodes including this sync node are deployed on the default resource group, the sync node may need to wait for resources. We recommend that you purchase an exclusive resource group for data integration or add a custom resource group. For more information, see DataWorks exclusive resources and Add a custom resource group.

Configure LogHub Reader by using the code editor

In the following code, a node is configured to read data from a Logstore. For more information about the parameters, see the preceding parameter description.
{
 "type":"job",
 "version":"2.0",// The version number.
 "steps":[
     {
         "stepType":"loghub",// The reader type.
         "parameter":{
             "datasource":"",// The connection name.
             "column":[// The columns to be synchronized.
                 "col0",
                 "col1",
                 "col2",
                 "col3",
                 "col4",
                 "=Topic",// The log topic.
                 "HostName",// The hostname.
                 "Path",// The path.
                 "LogTime"// The log time.
             ],
             "beginDateTime":"",// The start time of data consumption.
             "batchSize":"",// The number of entries that are queried from Log Service at a time.
             "endDateTime":",",// The end time of data consumption.
             "fieldDelimiter":",",// The column delimiter.
             "logstore":"// The name of the target Logstore.
         },
         "name":"Reader",
         "category":"reader"
     },
     { 
         "stepType":"stream",
         "parameter":{},
         "name":"Writer",
         "category":"writer"
     }
 ],
 "setting":{
     "errorLimit":{
         "record":"0"// The maximum number of dirty data records allowed.
     },
     "speed":{
         "throttle":false,// Specifies whether to enable bandwidth throttling. A value of false indicates that the bandwidth is not throttled. A value of true indicates that the bandwidth is throttled. The maximum transmission rate takes effect only if you set this parameter to true.
         "concurrent":1,// The maximum number of concurrent threads.
     }
 },
 "order":{
     "hops":[
         {
             "from":"Reader",
             "to":"Writer"
         }
     ]
 }
}
Note If the metadata in JSON format is prefixed by tag, delete the tag prefix. For example, change __tag__:__client_ip__ to __client_ip__.