This topic describes the data types and parameters that are supported by LogHub (SLS) Reader and how to configure LogHub (SLS) Reader by using the codeless user interface (UI) and code editor.

Background information

Log Service is an all-in-one real-time data logging service and allows you to collect, consume, deliver, query, and analyze log data. It comprehensively improves the capabilities to process and analyze large amounts of logs. LogHub (SLS) Reader consumes real-time log data in LogHub (SLS) by using Log Service SDK for Java, and converts the data to a format that is readable to Data Integration, and then sends the converted data to a writer.

How it works

LogHub (SLS) Reader consumes real-time log data in LogHub (SLS) by using Log Service SDK for Java. The following code provides an example of the SDK:
<dependency>
    <groupId>com.aliyun.openservices</groupId>
    <artifactId>aliyun-log</artifactId>
    <version>0.6.7</version>
</dependency>
In Log Service, a Logstore is a basic unit that you can use to collect, store, and query log data. The read and write logs of a Logstore are stored in a shard. Each Logstore consists of several shards. Each shard is defined by a left-closed, right-open interval of MD5 values so that intervals do not overlap each other. The range indicated by all intervals covers all the allowed MD5 values. Each shard can independently provide services.
  • Write: 5 MB/s, 2,000 times/s
  • Read: 10 MB/s, 100 times/s
LogHub (SLS) Reader consumes log data in shards based on the following process in which the GetCursor and BatchGetLog API operations are called:
  • Obtains a cursor based on a time range.
  • Reads logs based on the cursor and step parameters and returns the next cursor.
  • Keeps moving the cursor to consume logs.
  • Splits the node based on shards and uses parallel threads to run the node.

Data types

The following table lists the mapping between the Data Integration data type and LogHub (SLS) data type.
Data Integration data type LogHub (SLS) data type
STRING STRING

Parameters

Parameter Description Required Default value
endPoint The endpoint of Log Service, which is a URL that you can use to access a project and log data. The endpoint varies based on the Alibaba Cloud region where the project resides and the project name. For more information about the endpoint of Log Service in each region, see Endpoints. Yes No default value
accessId The AccessKey ID that is used to access Log Service. Yes No default value
accessKey The AccessKey secret that is used to access Log Service. Yes No default value
project The name of the project. A project is the basic unit for managing resources in Log Service. Projects are used to isolate resources and control access to the resources. Yes No default value
logstore The name of the Logstore from which you want to read data. A Logstore is a basic unit that you can use to collect, store, and query log data in Log Service. Yes No default value
batchSize The number of data entries that are queried from Log Service each time. No 128
column The names of the columns from which you want to read data. You can set this parameter to the metadata in Log Service. Supported metadata includes the log topic, unique identifier of the host, hostname, path, and log time.
Note The column name is case-sensitive. For more information about column names in Log Service, see Introduction.
Yes No default value
beginDateTime The start time of data consumption. The value is the time at which log data reaches LogHub (SLS). This parameter defines the left boundary of a left-closed, right-open interval in the format of yyyyMMddHHmmss, such as 20180111013000. This parameter can work with the scheduling time parameter in DataWorks.
For example, if you enter beginDateTime=${yyyymmdd-1} in the Parameters field on the Properties tab, you can set Start Timestamp to ${beginDateTime}000000 on the node configuration tab to consume logs that are generated from 00:00:00 of the data timestamp.
Note The beginDateTime and endDateTime parameters must be used in pairs.
Yes No default value
endDateTime The end time of data consumption. This parameter defines the right boundary of a left-closed, right-open interval in the format of yyyyMMddHHmmss, such as 20180111013010. This parameter can work with the scheduling time parameters in DataWorks.
For example, if you enter endDateTime=${yyyymmdd} in the Parameters field on the Properties tab, you can set End Timestamp to ${endDateTime}000000 on the node configuration tab to consume logs that are generated until 00:00:00 of the next day of the data timestamp.
Note The time that is specified by the endDateTime parameter of the previous interval cannot be earlier than the time that is specified by the beginDateTime parameter of the current interval. Otherwise, data in some regions may not be read.
Yes No default value

Configure LogHub (SLS) Reader by using the codeless UI

  1. Configure data sources.
    Configure the source and destination for the synchronization node. Configure data sources
    Parameter Description
    Connection The name of the data source from which you want to read data. This parameter is equivalent to the datasource parameter that is described in the preceding section.
    Logstore The name of the Logstore from which you want to read data.
    Start Timestamp The start time of data consumption. The value is the time at which log data reaches LogHub (SLS). This parameter defines the left boundary of a left-closed, right-open interval in the format of yyyyMMddHHmmss, such as 20180111013000. This parameter can work with the scheduling parameters in DataWorks to synchronize incremental data. For more information, see Overview of scheduling parameters.
    End Timestamp The end time of data consumption. This parameter defines the right boundary of a left-closed, right-open interval in the format of yyyyMMddHHmmss, such as 20180111013010. This parameter can work with the scheduling parameters in DataWorks.
    Records per Batch The number of data entries that are queried from Log Service each time.

    If you use a batch synchronization node to synchronize data, you can configure scheduling parameters for the node to specify the range of data to be synchronized. Before the node starts to synchronize data, the configured scheduling parameters are replaced with the actual values based on the expressions of the scheduling parameters. For more information about how to configure scheduling parameters for a batch synchronization node that is used to synchronize incremental data, see Synchronize incremental data and Overview of the batch synchronization feature.

  2. Configure field mappings. This operation is equivalent to setting the column parameter that is described in the preceding section.
    Fields in the source on the left have a one-to-one mapping with fields in the destination on the right. You can click Add to add a field. To remove an added field, move the pointer over the field and click the Remove icon. Remove
    Operation Description
    Map Fields with the Same Name Click Map Fields with the Same Name to establish mappings between fields with the same name. The data types of the fields must match.
    Map Fields in the Same Line Click Map Fields in the Same Line to establish mappings between fields in the same row. The data types of the fields must match.
    Delete All Mappings Click Delete All Mappings to remove the mappings that are established.
    Auto Layout Click Auto Layout. Then, the system automatically sorts the fields based on specific rules.
    Change Fields Click the Change Fields icon. In the Change Fields dialog box, you can manually edit the fields in the source table. Each field occupies a row. The first and the last blank rows are included, whereas other blank rows are ignored.
  3. Configure channel control policies. Channel control
    Parameter Description
    Expected Maximum Concurrency The maximum number of parallel threads that the synchronization node uses to read data from the source or write data to the destination. You can configure the parallelism for the synchronization node on the codeless UI.
    Bandwidth Throttling Specifies whether to enable bandwidth throttling. You can enable bandwidth throttling and specify a maximum transmission rate to prevent heavy read workloads on the source. We recommend that you enable bandwidth throttling and set the maximum transmission rate to an appropriate value based on the configurations of the source.
    Dirty Data Records Allowed The maximum number of dirty data records allowed.
    Distributed Execution

    The distributed execution mode that allows you to split your node into pieces and distribute them to multiple Elastic Compute Service (ECS) instances for parallel execution. This speeds up synchronization. If you use a large number of parallel threads to run your synchronization node in distributed execution mode, excessive access requests are sent to the data sources. Therefore, before you use the distributed execution mode, you must evaluate the access load on the data sources. You can enable this mode only if you use an exclusive resource group for Data Integration. For more information about exclusive resource groups for Data Integration, see Exclusive resource groups for Data Integration and Create and use an exclusive resource group for Data Integration.

Configure LogHub (SLS) Reader by using the code editor

For more information about how to configure a synchronization node by using the code editor, see Configure a batch synchronization node by using the code editor.

In the following code, a synchronization node is configured to read data from LogHub (SLS). For more information about the parameters, see the preceding parameter description.
{
 "type":"job",
 "version":"2.0",// The version number. 
 "steps":[
     {
         "stepType":"LogHub",// The reader type. 
         "parameter":{
             "datasource":"", // The name of the data source. 
             "column":[// The names of the columns from which you want to read data. 
                 "col0",
                 "col1",
                 "col2",
                 "col3",
                 "col4",
                 "C_Category",
                 "C_Source",
                 "C_Topic",
                 "C_MachineUUID", // The log topic. 
                 "C_HostName", // The hostname. 
                 "C_Path", // The path. 
                 "C_LogTime" // The time when the event occurred. 
             ],
             "beginDateTime":"",// The start time of data consumption. 
             "batchSize":"",// The number of data entries that are queried from Log Service at a time. 
             "endDateTime":"",// The end time of data consumption. 
             "fieldDelimiter":",",// The column delimiter. 
             "logstore":""// The name of the Logstore from which you want to read data. 
         },
         "name":"Reader",
         "category":"reader"
     },
     { 
         "stepType":"stream",
         "parameter":{},
         "name":"Writer",
         "category":"writer"
     }
 ],
 "setting":{
     "errorLimit":{
         "record":"0"// The maximum number of dirty data records allowed. 
     },
     "speed":{
         "throttle":true,// Specifies whether to enable throttling. The value false indicates that throttling is disabled, and the value true indicates that throttling is enabled. The mbps parameter takes effect only when the throttle parameter is set to true. 
            "concurrent":1 // The maximum number of parallel threads. 
            "mbps":"12",// The maximum transmission rate.
     }
 },
 "order":{
     "hops":[
         {
             "from":"Reader",
             "to":"Writer"
         }
     ]
 }
}
Note If the metadata in the JSON format is prefixed by tag, delete the tag prefix. For example, you need to change __tag__:__client_ip__ to __client_ip__.