The LogHub (SLS) data source lets you read from and write to Simple Log Service (SLS) within DataWorks synchronization tasks.
Supported capabilities
| Capability | Offline read | Offline write | Real-time read |
|---|---|---|---|
| Cross-region synchronization | ✓ | ✓ | ✓ |
| Cross-account synchronization | ✓ | ✓ | ✓ |
| Same-account synchronization | ✓ | ✓ | ✓ |
| Public cloud to finance cloud | ✓ | ✓ | ✓ |
| Data filtering (query syntax or SPL) | ✓ | — | — |
Limitations
When Data Integration writes data to LogHub (SLS) in an offline task, reruns after a failover may produce duplicate records. This happens because LogHub (SLS) write operations are not idempotent.
Supported field types
| Field type | Offline read | Offline write | Real-time read |
|---|---|---|---|
| STRING | ✓ | ✓ | ✓ |
Type conversions on offline write
The LogHub (SLS) Writer converts all incoming field types to STRING before writing.
| Source type | Written as |
|---|---|
| LONG | STRING |
| DOUBLE | STRING |
| STRING | STRING |
| DATE | STRING |
| BOOLEAN | STRING |
| BYTES | STRING |
Metadata fields in real-time read
During real-time reads, SLS automatically includes the following reserved metadata fields alongside your log data. These fields appear as additional columns in the destination table.
| Field | Type | Description |
|---|---|---|
__time__ |
STRING | Reserved SLS field. Log entry time as a UNIX timestamp in seconds. |
__source__ |
STRING | Reserved SLS field. Source device that generated the log. |
__topic__ |
STRING | Reserved SLS field. Topic name associated with the log. |
__tag__:__receive_time__ |
STRING | Time the server received the log, as a UNIX timestamp in seconds. Included when public IP address recording is enabled. |
__tag__:__client_ip__ |
STRING | Public IP address of the source device. Included when public IP address recording is enabled. |
__tag__:__path__ |
STRING | File path of the log collected by Logtail. Automatically appended by Logtail. |
__tag__:__hostname__ |
STRING | Hostname of the machine from which Logtail collects data. Automatically appended by Logtail. |
Create a data source
Before configuring a synchronization task, create a LogHub data source in DataWorks. For the full procedure, see Data Source Management. Detailed parameter explanations are available in the tooltips on the configuration page.
Create a cross-account data source
The following example shows how to configure account B to synchronize LogHub data from account A into a MaxCompute instance in account B.
Option 1: Use account A's AccessKey
Create a LogHub data source using the AccessKey ID and AccessKey secret of account A. Account B can then synchronize data from all Simple Log Service projects in account A.
Option 2: Use a RAM user with scoped permissions
Create a LogHub data source using the AccessKey ID and AccessKey secret of RAM user A1, which belongs to account A.
Grant system policies (full access to all projects)
Grant RAM user A1 the AliyunLogFullAccess and AliyunLogReadOnlyAccess system policies. With these policies, RAM user A1 can query all logs in Simple Log Service that belong to account A's primary account.
For instructions, see Create a RAM user and grant permissions.
Grant a custom policy (restrict access to specific projects)
To limit account B to specific SLS projects, create a custom permission policy in account A. In the RAM console, go to Permissions > Policies and click Create Permission Policy.
The following policy restricts access to project_name1 and project_name2 only:
{
"Version": "1",
"Statement": [
{
"Action": [
"log:Get*",
"log:List*",
"log:CreateConsumerGroup",
"log:UpdateConsumerGroup",
"log:DeleteConsumerGroup",
"log:ListConsumerGroup",
"log:ConsumerGroupUpdateCheckPoint",
"log:ConsumerGroupHeartBeat",
"log:GetConsumerGroupCheckPoint"
],
"Resource": [
"acs:log:*:*:project/project_name1",
"acs:log:*:*:project/project_name1/*",
"acs:log:*:*:project/project_name2",
"acs:log:*:*:project/project_name2/*"
],
"Effect": "Allow"
}
]
}
For more about SLS authorization, see Introduction and Overview.
Configure a synchronization task
When LogHub is the data source, you can filter data using LogHub's query syntax or SLS Processing Language (SPL) statements. For details, see SPL syntax for filtering below.
Offline single-table synchronization
Configure the task using the codeless UI or the code editor:
-
Codeless UI: See Codeless UI configuration. Parameters must follow the format described in Reader parameters and Writer parameters.
-
Code editor: See Code editor configuration and the script examples below.
Real-time single-table synchronization
See Real-time synchronization task configuration (Legacy) or Single-table real-time synchronization task configuration.
Whole-database synchronization
See Configure a real-time synchronization task for an entire database.
Reader parameters
The following parameters apply when configuring LogHub as a source in the code editor.
Example script
{
"type": "job",
"version": "2.0",
"steps": [
{
"stepType": "LogHub",
"parameter": {
"datasource": "<data-source-name>",
"logstore": "<logstore-name>",
"beginDateTime": "${beginDateTime}000000",
"endDateTime": "${endDateTime}000000",
"column": [
"col0",
"col1",
"C_Topic",
"C_HostName",
"C_Path",
"C_LogTime"
],
"batchSize": "128",
"query": ""
},
"name": "Reader",
"category": "reader"
},
{
"stepType": "stream",
"parameter": {},
"name": "Writer",
"category": "writer"
}
],
"setting": {
"errorLimit": {
"record": "0"
},
"speed": {
"throttle": true,
"concurrent": 1,
"mbps": "12"
}
},
"order": {
"hops": [
{
"from": "Reader",
"to": "Writer"
}
]
}
}
Parameters
| Parameter | Required | Default | Description |
|---|---|---|---|
datasource |
Yes | — | Name of the LogHub data source configured in DataWorks. |
project |
Yes | — | Name of the source SLS project. A project is the primary resource management unit in SLS. |
logstore |
Yes | — | Name of the source Logstore. A Logstore is the unit for collecting, storing, and querying log data in SLS. |
endPoint |
Yes | — | SLS endpoint URL for the region where the project is located. For endpoint values by region, see Service endpoints. |
accessId |
Yes | — | AccessKey ID used to access SLS. |
accessKey |
Yes | — | AccessKey secret used to access SLS. |
beginDateTime |
Yes | — | Start time of data consumption (inclusive). Format: yyyyMMddHHmmss (for example, 20180111013000). Works with DataWorks scheduling parameters — for example, set Parameters to beginDateTime=${yyyymmdd-1} and Log Start Time to ${beginDateTime}000000 to consume from 00:00:00 the day before the business date. Must be used together with endDateTime. For supported scheduling parameter formats, see Supported formats for scheduling parameters. |
endDateTime |
Yes | — | End time of data consumption (exclusive). Format: yyyyMMddHHmmss (for example, 20180111013010). Must be earlier than 2038-01-19 11:14:07 +8:00. The endDateTime of one cycle must be equal to or later than the beginDateTime of the next cycle to avoid gaps. |
column |
Yes | — | List of column names to read. Column names are case-sensitive. You can include SLS metadata fields such as log Topic, machine UUID, hostname, path, and log time. For metadata syntax, see Simple Log Service machine group. |
batchSize |
No | 128 |
Number of log entries to fetch from SLS per request. |
query |
Yes | — | Filter expression using LogHub query syntax or SPL statements. Pass an empty string to read all data. |
If data is missing after synchronization, check whether the receive_time metadata field in the LogHub console falls within the time range configured in the task.
Writer parameters
The following parameters apply when configuring LogHub as a destination in the code editor.
The LogHub (SLS) Writer receives data from the reader via the Data Integration framework, converts each field to STRING, and pushes records in batches to LogHub using the LogService Java SDK.
Example script
{
"type": "job",
"version": "2.0",
"steps": [
{
"stepType": "stream",
"parameter": {},
"name": "Reader",
"category": "reader"
},
{
"stepType": "LogHub",
"parameter": {
"datasource": "<data-source-name>",
"logstore": "<logstore-name>",
"topic": "",
"column": [
"col0",
"col1",
"col2",
"col3",
"col4",
"col5"
],
"batchSize": "1024"
},
"name": "Writer",
"category": "writer"
}
],
"setting": {
"errorLimit": {
"record": ""
},
"speed": {
"throttle": true,
"concurrent": 3,
"mbps": "12"
}
},
"order": {
"hops": [
{
"from": "Reader",
"to": "Writer"
}
]
}
}
Parameters
| Parameter | Required | Default | Description |
|---|---|---|---|
datasource |
Yes | — | Name of the LogHub data source configured in DataWorks. |
project |
Yes | — | Name of the destination SLS project. |
logstore |
Yes | — | Name of the destination Logstore. |
endpoint |
Yes | — | SLS endpoint URL for the region where the project is located. For endpoint values by region, see Service endpoints. |
accessKeyId |
Yes | — | AccessKey ID used to access SLS. |
accessKeySecret |
Yes | — | AccessKey secret used to access SLS. |
column |
Yes | — | List of column names in each record. |
topic |
No | "" |
Topic name to set on written log entries. |
batchSize |
No | 1024 |
Number of records per batch. Maximum: 4096. A single batch cannot exceed 5 MB — reduce this value if individual records are large. |
SPL syntax for filtering
When LogHub is the data source, filter data using either the LogHub query syntax or SLS Processing Language (SPL) statements. SPL is the language SLS uses to process logs.
For the full SPL reference, see SPL syntax.
| Use case | LogHub query syntax | SPL statement |
|---|---|---|
| Data filtering | SELECT * WHERE Type='write' |
| where Type='write' (exact match)<br>| where Type like '%write%' (fuzzy)<br>| where regexp_like(server_protocol, '\\d+') (regex) |
| Field selection and renaming | SELECT "__tag__:node" AS node, path |
| project node="__tag__:node", path (select and rename)<br>| project -wildcard "__tag__:*" (select by pattern)<br>| project-rename node="__tag__:node" (rename without dropping others)<br>| project-away -wildcard "__tag__:*" (exclude by pattern) |
| Data normalization | SELECT CAST(Status AS BIGINT) AS Status, date_parse(Time, '%Y-%m-%d %H:%i') AS Time |
| extend Status=cast(Status as BIGINT), extend Time=date_parse(Time, '%Y-%m-%d %H:%i') |
| Field extraction | Extract with regex or from JSON | | parse-regexp protocol, '(\\w+)/(\\d+)' as scheme, version (regex)<br>| parse-json -path='$.0' content (JSON)<br>| parse-csv -delim='^_^' content as ip, time, host (CSV) |
FAQ
A field with data in LogHub becomes empty after synchronization.
See A field with data in LogHub becomes empty after synchronization.
Data is lost when synchronizing from LogHub.
See Data loss when synchronizing from LogHub.
The fields read from the LogHub field mapping are not as expected.
See The fields read from the LogHub field mapping are not as expected.
For more, see Data Integration FAQs.