All Products
Search
Document Center

DataWorks:LogHub (SLS) data source

Last Updated:Oct 15, 2025

The LogHub (SLS) data source lets you read data from and write data to Simple Log Service (SLS). This topic describes how DataWorks supports data synchronization for LogHub (SLS).

Limits

When Data Integration performs an offline write to LogHub (SLS), data duplication may occur if a task is rerun after a failover. This is because LogHub (SLS) is not idempotent.

Supported field types

Data Integration supports reading and writing the following LogHub (SLS) field types.

Field type

Offline read (LogHub (SLS) Reader)

Offline write (LogHub (SLS) Writer)

Real-time read

STRING

Supported

Supported

Supported

Details:

  • When writing data to LogHub (SLS) in offline mode

    All supported data types are converted to the STRING type before being written to LogHub (SLS). The following table lists the data type conversions for the LogHub (SLS) writer.

    Supported Data Integration internal type

    Data type when writing to LogHub (SLS)

    LONG

    STRING

    DOUBLE

    STRING

    STRING

    STRING

    DATE

    STRING

    BOOLEAN

    STRING

    BYTES

    STRING

  • When reading data from LogHub (SLS) in real-time mode

    The following metadata fields are included by default.

    LogHub (SLS) real-time synchronization field

    Data type

    Description

    __time__

    STRING

    SLS reserved field: __time__. The log time specified when the log data is written. This is a UNIX timestamp in seconds.

    __source__

    STRING

    SLS reserved field: __source__. The source device of the log.

    __topic__

    STRING

    SLS reserved field: __topic__. The topic name.

    __tag__:__receive_time__

    STRING

    The time when the log arrives at the server. If you enable the feature to record public IP addresses, the server appends this field to the raw log upon receipt. This is a UNIX timestamp in seconds.

    __tag__:__client_ip__

    STRING

    The public IP address of the log source device. If you enable the feature to record public IP addresses, the server appends this field to the raw log upon receipt.

    __tag__:__path__

    STRING

    The path of the log file collected by Logtail. Logtail automatically appends this field to the log.

    __tag__:__hostname__

    STRING

    The hostname of the machine from which Logtail collects data. Logtail automatically appends this field to the log.

Add a data source

Before you develop a synchronization task in DataWorks, you must add the required data source to DataWorks by following the instructions in Data Source Management. You can view the infotips of parameters in the DataWorks console to understand the meanings of the parameters when you add a data source.

Develop a data synchronization task

For information about the entry point for and the procedure of configuring a synchronization task, see the following configuration guides.

Note

When LogHub is used as the source, you can filter data using LogHub query syntax or Structured Process Language (SPL) statements. For more information about the syntax, see Appendix 2: SPL syntax for filtering.

Configure a single-table offline synchronization task

Configure a single-table real-time synchronization task

For more information, see Configure a real-time synchronization task in DataStudio and Configure a real-time synchronization task in Data Integration.

Configure a whole-database real-time synchronization task

For more information, see Configure a whole-database real-time synchronization task.

FAQ

For more Data Integration FAQs, see Data Integration FAQ.

Appendix 1: Script examples and parameter descriptions

Configure a batch synchronization task by using the code editor

If you want to configure a batch synchronization task by using the code editor, you must configure the related parameters in the script based on the unified script format requirements. For more information, see Configure a batch synchronization task by using the code editor. The following information describes the parameters that you must configure for data sources when you configure a batch synchronization task by using the code editor.

Reader script example

{
 "type":"job",
 "version":"2.0",// Version number.
 "steps":[
     {
         "stepType":"LogHub",// Plug-in name.
         "parameter":{
             "datasource":"",// Data source.
             "column":[// Fields.
                 "col0",
                 "col1",
                 "col2",
                 "col3",
                 "col4",
                 "C_Category",
                 "C_Source",
                 "C_Topic",
                 "C_MachineUUID", // Topic.
                 "C_HostName", // Hostname.
                 "C_Path", // Path.
                 "C_LogTime" // Event time.
             ],
             "beginDateTime":"",// The start time for data consumption.
             "batchSize":"",// The number of data entries to query from Simple Log Service at a time.
             "endDateTime":"",// The end time for data consumption.
             "fieldDelimiter":",",// Column delimiter.
             "logstore":""// The name of the destination Logstore.
         },
         "name":"Reader",
         "category":"reader"
     },
     { 
         "stepType":"stream",
         "parameter":{},
         "name":"Writer",
         "category":"writer"
     }
 ],
 "setting":{
     "errorLimit":{
         "record":"0"// The number of error records.
     },
     "speed":{
         "throttle":true,// If throttle is set to false, the mbps parameter does not take effect, and the data rate is not limited. If throttle is set to true, the data rate is limited.
            "concurrent":1, // The number of concurrent jobs.
            "mbps":"12"// The maximum data rate. 1 mbps = 1 MB/s.
     }
 },
 "order":{
     "hops":[
         {
             "from":"Reader",
             "to":"Writer"
         }
     ]
 }
}

Reader script parameters

Parameter

Description

Required

Default value

endPoint

The endpoint of Simple Log Service. The endpoint is the URL used to access a project and its log data. The endpoint is related to the Alibaba Cloud region where the project is located and the project name. For the endpoints of each region, see Endpoints.

Yes

None

accessId

The AccessKey ID used to access Simple Log Service. It identifies the user.

Yes

None

accessKey

The AccessKey secret used to access Simple Log Service. It authenticates the user.

Yes

None

project

The name of the destination Simple Log Service project. A project is a resource management unit in Simple Log Service used to isolate and control resources.

Yes

None

logstore

The name of the destination Logstore. A Logstore is the unit for log data collection, storage, and query in Simple Log Service.

Yes

None

batchSize

The number of data entries to query from Simple Log Service at a time.

No

128

column

The column names in each data entry. You can configure metadata from Simple Log Service as synchronization columns. Simple Log Service supports metadata such as topics, unique machine group identifiers, hostnames, paths, and log times.

Note

Column names are case-sensitive. For information about how to write metadata, see Simple Log Service machine groups.

Yes

None

beginDateTime

The start offset for data consumption. This is the time when the log data arrives at LogHub (SLS). This parameter specifies the start of the time range, which is inclusive. The time must be a string in the yyyyMMddHHmmss format, such as 20180111013000. You can use this parameter with DataWorks scheduling parameters.

For example, on the Scheduling Configuration tab of the node editing page, set Parameters to beginDateTime=${yyyymmdd-1}. Then, set Log Start Time to ${beginDateTime}000000. This sets the log start time to 00:00:00 on the data timestamp. For more information, see Supported formats for scheduling parameters.

Note
  • The beginDateTime and endDateTime parameters must be used together.

  • To synchronize all data, set beginDateTime to the start time of the data and endDateTime to the current day. This may consume a large amount of resources if the data volume is large. Adjust the resource group specifications as needed.

Yes

None

endDateTime

The end offset for data consumption. This parameter specifies the end of the time range, which is exclusive. The time must be a string in the yyyyMMddHHmmss format, such as 20180111013010. You can use this parameter with DataWorks scheduling parameters.

For example, on the Scheduling Configuration tab of the node editing page, set Parameters to endDateTime=${yyyymmdd}. Then, set Log End Time to ${endDateTime}000000. This sets the log end time to 00:00:00 on the day after the data timestamp. For more information, see Supported formats for scheduling parameters.

Important
  • The time set for endDatetime must be earlier than 2038-01-19 11:14:07 +08:00. Otherwise, data may fail to be pulled.

  • The endDateTime of the previous epoch must be the same as or later than the beginDateTime of the next epoch. Otherwise, data from some regions may fail to be pulled.

Yes

None

query

Filters data in LogHub using the LogHub query syntax or SPL statements. SPL (Structured Process Language) is the syntax used by SLS to process logs.

Yes

None

Note

If data is missing after it is read from LogHub, go to the LogHub console and check whether the receive_time metadata field is within the time range configured for the task.

Writer script example

{
    "type": "job",
    "version": "2.0",// Version number.
    "steps": [
        { 
            "stepType": "stream",
            "parameter": {},
            "name": "Reader",
            "category": "reader"
        },
        {
            "stepType": "LogHub",// Plug-in name.
            "parameter": {
                "datasource": "",// Data source.
                "column": [// Fields.
                    "col0",
                    "col1",
                    "col2",
                    "col3",
                    "col4",
                    "col5"
                ],
                "topic": "",// Select a topic.
                "batchSize": "1024",// The number of records in a batch submission.
                "logstore": ""// The name of the destination Simple Log Service Logstore.
            },
            "name": "Writer",
            "category": "writer"
        }
    ],
    "setting": {
        "errorLimit": {
            "record": ""// The number of error records.
        },
        "speed": {
            "throttle":true,// If throttle is set to false, the mbps parameter does not take effect, and the data rate is not limited. If throttle is set to true, the data rate is limited.
            "concurrent":3, // The number of concurrent jobs.
            "mbps":"12"// The maximum data rate. 1 mbps = 1 MB/s.
        }
    },
    "order": {
        "hops": [
            {
                "from": "Reader",
                "to": "Writer"
            }
        ]
    }
}

Writer script parameters

Note

The LogHub (SLS) writer retrieves data from the reader through the Data Integration framework. The writer then converts the supported Data Integration data types to the STRING type. When the number of records reaches the specified batchSize, the data is pushed to LogHub (SLS) in a single batch using the Simple Log Service Java SDK.

Parameter

Description

Required

Default value

endpoint

The endpoint of Simple Log Service. The endpoint is the URL used to access a project and its log data. The endpoint is related to the Alibaba Cloud region where the project is located and the project name. For the endpoints of each region, see: Endpoints.

Yes

None

accessKeyId

The AccessKeyId used to access Simple Log Service.

Yes

None

accessKeySecret

The AccessKeySecret used to access Simple Log Service.

Yes

None

project

The name of the destination Simple Log Service project.

Yes

None

logstore

The name of the destination Logstore. A Logstore is the unit for log data collection, storage, and query in Simple Log Service.

Yes

None

topic

The topic name in the destination Simple Log Service.

No

Empty string

batchSize

The number of data entries to synchronize to LogHub (SLS) at a time. The default value is 1,024. The maximum value is 4,096.

Note

The size of data synchronized to LogHub (SLS) in a single batch cannot exceed 5 MB. Adjust the number of entries to push at a time based on the size of a single data entry.

No

1,024

column

The column names in each data entry.

Yes

None

Appendix 2: SPL syntax for filtering

When LogHub is used as the source, you can filter data from LogHub using the LogHub query syntax or Structured Process Language (SPL) statements. The following table describes the syntax.

Note

For more information about SPL, see SPL syntax.

Scenario

SQL statement

SPL statement

Data filtering

SELECT * WHERE Type='write'

  • Conditional filtering.

    | where Type='write'
  • Fuzzy query.

    | where Type like '%write%'
  • Regular expression.

    | where regexp_like(server_protocol, '\d+')
  • More (SQL expression).

    | where <sql-expr> 

Field processing and filtering

Select specific fields and rename them:

SELECT "__tag__:node" AS node, path
  • Select specific fields and rename them.

    | project node="__tag__:node", path
  • Select fields by pattern.

    | project -wildcard "__tag__:*"
  • Rename some fields without affecting others.

    | project-rename node="__tag__:node"
  • Exclude fields by pattern.

    | project-away -wildcard "__tag__:*"

Data cleansing

(calling SQL functions)

Convert data types, parse time, and so on:

SELECT 
  CAST(Status AS BIGINT) AS Status, 
  date_parse(Time, '%Y-%m-%d %H:%i') AS Time

Convert data types, parse time, and so on:

| extend Status=cast(Status as BIGINT), extend Time=date_parse(Time, '%Y-%m-%d %H:%i')

Field extraction

Regular expression extraction:

SELECT 
  CAST(Status AS BIGINT) AS Status, 
  date_parse(Time, '%Y-%m-%d %H:%i') AS Time

JSON extraction:

SELECT 
  CAST(Status AS BIGINT) AS Status, 
  date_parse(Time, '%Y-%m-%d %H:%i') AS Time
  • Regular expression extraction: one-time match.

    | parse-regexp protocol, '(\w+)/(\d+)' as scheme, version
  • JSON extraction: expand all.

    | parse-json -path='$.0' content
  • CSV extraction.

    | parse-csv -delim='^_^' content as ip, time, host