This topic describes the data types and parameters that LogHub Reader supports and how to configure it by using the codeless user interface (UI) and code editor.

Background information

As an all-in-one real-time data logging service, Log Service provides features to collect, consume, deliver, query, and analyze log data. It can comprehensively improve the capabilities to process and analyze numerous logs. LogHub Reader consumes real-time log data in LogHub by using Log Service SDK for Java, converts the data to a format that is readable by Data Integration, and then sends the converted data to a writer.

How it works

LogHub Reader consumes real-time log data in LogHub by using the following version of Log Service SDK for Java:
<dependency>
    <groupId>com.aliyun.openservices</groupId>
    <artifactId>aliyun-log</artifactId>
    <version>0.6.7</version>
</dependency>
In Log Service, Logstore is a basic unit for collecting, storing, and querying log data. The read and write logs of a Logstore are stored in a shard. Each Logstore consists of several shards, each of which is defined by a left-closed, right-open interval of MD5 values so that intervals do not overlap each other. The range of all intervals covers all the allowed MD5 values. Each shard can independently provide some services.
  • Write: 5 Mbit/s, 2,000 times/s.
  • Read: 10 Mbit/s, 100 times/s.
LogHub Reader consumes log data in shards by following this process that uses the GetCursor and BatchGetLog API operations:
  • Obtain a cursor based on the time range.
  • Read logs based on the cursor and step parameters and return the next cursor.
  • Keep moving the cursor to consume logs.
  • Split the node to concurrent threads based on shards.

Data types

The following table describes the data types that LogHub Reader supports.
Data Integration data type LogHub data type
STRING STRING

Parameters

Parameter Description Required Default value
endpoint The Log Service endpoint, which is a URL that you can use to access a project and log data. It varies based on the Alibaba Cloud region where the project resides and the project name. For more information, see Endpoints. Yes N/A
accessId The AccessKey ID for accessing Log Service. Yes N/A
accessKey The AccessKey secret for accessing Log Service. Yes N/A
project The name of the project. A project is the basic unit for managing resources in Log Service. You can exercise access control at the project level, and isolate resources among different projects. Yes N/A
logstore The name of the Logstore from which data is read. A Logstore is the basic unit that you can use to collect, store, and query log data in Log Service. Yes N/A
batchSize The number of entries that are queried from Log Service at a time. No 128
column The name of the columns to be synchronized. You can set this parameter to the metadata in Log Service. In Log Service, the column names can be the log topic, unique identifier of the host, hostname, path, and log time.
Note The column name is case-sensitive. For more information about column names in Log Service, see Introduction.
Yes N/A
beginDateTime The start time of data consumption. The value is the time when log data reaches LogHub. This parameter defines the left boundary of a left-closed, right-open interval in the format of yyyyMMddHHmmss, for example, 20180111013000. The parameter can work with the scheduling time parameter in DataWorks.
For example, if you enter beginDateTime=${yyyymmdd-1} in the Arguments field on the Properties tab, you can set Start Timestamp to ${beginDateTime}000000 on the node editing page to consume logs that are generated from 00:00:00 of the data timestamp.
Note The beginDateTime and endDateTime parameters must be used in pairs.
You can specify either beginDateTime or beginTimestampMillis, but not both. N/A
endDateTime The end time of data consumption in the format of yyyyMMddHHmmss, such as 20180111013010. This parameter defines the right boundary of a left-closed, right-open interval and can work with the scheduling time parameter in DataWorks.
For example, if you enter endDateTime=${yyyymmdd} in the Arguments field on the Properties tab, you can set End Timestamp to ${endDateTime}000000 on the node editing page to consume logs that are generated until 00:00:00 of the next day of the data timestamp.
Note The time that is specified by the endDateTime parameter of the previous interval cannot be earlier than the time that is specified by the beginDateTime of the current interval. Otherwise, data may not be pulled in some regions.
You can specify either endDateTime or endTimestampMillis, but not both. N/A
beginTimestampMillis The start time of data consumption. This parameter specifies the left boundary of the left-closed, right-open interval, measured in milliseconds. This parameter is more precise than beginDateTime.
For example, if you enter beginDateTime=${yyyymmdd-1} in the Arguments field on the Properties tab, you can set Start Timestamp to ${beginDateTime}000000 on the node editing page to consume logs that are generated from 00:00:00 of the data timestamp.
Note The beginDateTime and endDateTime parameters must be used in pairs.

The value -1 indicates the position where the cursor starts in Log Service, which is specified by CursorMode.BEGIN. We recommend that you specify the beginDateTime parameter.

You can specify either beginTimestampMillis or beginDateTime, but not both. N/A
endTimestampMillis The end time of data consumption, measured in milliseconds. This parameter defines the right boundary of the left-closed, right-open interval. This parameter is more precise than endDateTime.
For example, if you enter endDateTime=${yyyymmdd} in the Arguments field on the Properties tab, you can set End Timestamp to ${endDateTime}000000 on the node editing page to consume logs that are generated until 00:00:00:00 of the next day of the data timestamp.
Note The endTimestampMillis and beginTimestampMillis parameters must be used in pairs.

The value -1 indicates the position where the cursor ends in Log Service, which is specified by CursorMode.END. We recommend that you specify the endDateTime parameter.

You can specify either endTimestampMillis or endDateTime, but not both. N/A

Configure LogHub Reader by using the codeless UI

  1. Configure the connections.
    Configure the connections to the source and destination data stores for the sync node.Connections section
    Parameter Description
    Connection The datasource parameter in the preceding parameter description. Select a connection type and select the name of a connection that you have configured in DataWorks.
    Logstore The name of the Logstore from which data is read.
    Start Timestamp The start time of data consumption. The value is the time when log data reaches LogHub. This parameter defines the left boundary of a left-closed, right-open interval in the format of yyyyMMddHHmmss, for example, 20180111013000. The parameter can work with the scheduling time parameter in DataWorks.
    End Timestamp The end time of data consumption in the format of yyyyMMddHHmmss, such as 20180111013010. This parameter defines the right boundary of a left-closed, right-open interval and can work with the scheduling time parameter in DataWorks.
    Records per Batch The number of entries that are queried from Log Service at a time.
  2. Configure field mapping, that is, the column parameter in the preceding parameter description.
    Fields in the source table on the left have a one-to-one mapping with fields in the destination table on the right. You can click Add to add a field. To delete a field, move the pointer over the field and click the Delete icon.Mappings section
    GUI element Description
    Map Fields with the Same Name Click Map Fields with the Same Name to establish a mapping between fields with the same name. The data types of the fields must match.
    Map Fields in the Same Line Click Map Fields in the Same Line to establish a mapping between fields in the same row. The data types of the fields must match.
    Delete All Mappings Click Delete All Mappings to remove mappings that have been established.
    Auto Layout Click Auto Layout to sort the fields based on specified rules.
    Change Fields Click the Change Fields icon. In the Change Fields dialog box, you can manually edit the fields in the source table. Each field occupies a row. The first and the last blank rows are included, whereas other blank rows are ignored.
  3. Configure channel control policies.Channel section
    Parameter Description
    Expected Maximum Concurrency The maximum number of concurrent threads that the sync node uses to read data from or write data to data stores. You can configure the concurrency for the node on the codeless UI.
    Bandwidth Throttling Specifies whether to enable bandwidth throttling. You can enable bandwidth throttling and set a maximum transmission rate to avoid heavy read workload of the source. We recommend that you enable bandwidth throttling and set the maximum transmission rate to a proper value.
    Dirty Data Records Allowed The maximum number of dirty data records allowed.

Configure LogHub Reader by using the code editor

You can configure LogHub Reader by using the code editor. For more information, see Create a sync node by using the code editor.

The following example shows how to configure a sync node to read data from a Logstore. For more information about the parameters, see the preceding parameter description.
{
 "type":"job",
 "version":"2.0",// The version number.
 "steps":[
     {
         "stepType":"loghub",// The reader type.
         "parameter":{
             "datasource":"",// The connection name.
             "column":[// The columns to be synchronized from the source table.
                 "col0",
                 "col1",
                 "col2",
                 "col3",
                 "col4",
                 "C_Category",
                 "C_Source",
                 "C_Topic",
                 "C_MachineUUID", // The log topic.
                 "C_hostname", // The host name.
                 "C_path", // The path.
                 "C_logtime" // The log time.
             ],
             "beginDateTime":"",// The start time of data consumption.
             "batchSize":"",// The number of entries that are queried from Log Service at a time.
             "endDateTime":",",// The end time of data consumption.
             "fieldDelimiter":",",// The column delimiter.
             "logstore":"// The name of the Logstore from which data is read.
         },
         "name":"Reader",
         "category":"reader"
     },
     { 
         "stepType":"stream",
         "parameter":{},
         "name":"Writer",
         "category":"writer"
     }
 ],
 "setting":{
     "errorLimit":{
         "record":"0"// The maximum number of dirty data records allowed.
     },
     "speed":{
         "throttle":false,// Specifies whether to enable bandwidth throttling. A value of false indicates that the bandwidth is not throttled. A value of true indicates that the bandwidth is throttled. The maximum transmission rate takes effect only if you set this parameter to true.
         "concurrent":1 // The maximum number of concurrent threads.
     }
 },
 "order":{
     "hops":[
         {
             "from":"Reader",
             "to":"Writer"
         }
     ]
 }
}
Note If the metadata in JSON format is prefixed by tag, delete the tag prefix. For example, change __tag__:__client_ip__ to __client_ip__.