This topic takes an example to explain how to implement semi-structured analysis based on the Simple Log Service Processing Language (SPL) in Flink SQL.
Background information
Simple Log Service is a cloud-native platform for observability and analytics, enabling cost-effective, real-time processing of logs, metrics, and traces. It simplifies data access, facilitating the ingestion of system and business logs for storage and analysis.
Realtime Compute for Apache Flink, built on Apache Flink, is a big data analytics platform suited for real-time data analysis and risk monitoring. It natively supports the Simple Log Service connector, allowing the service to be used as a source or result table.
The connector streamlines the handling of structured logs, allowing direct mapping of Simple Log Service log fields to Flink SQL table fields. For semi-structured logs that contain all content in a single field, methods like regular expressions and delimiters are necessary to extract structured data. This topic discusses a solution using SPL to configure the connector for data structuring, focusing on log cleansing and format normalization.
Semi-structured log data
Consider a log example with a complex format that includes JSON strings and mixed content. The log contains the following elements:
Payload: A JSON string, with theschedulefield also in JSON format.requestURL: A standard URL path.error: Begins with the stringCouldNotExecuteQuery, followed by a JSON structure.__tag__:__path__: Represents the log file path, whereservice_amay indicate the service name.caller: Contains the file name and line number.
{
"Payload": "{\"lastNotified\": 1705030483, \"serverUri\": \"http://test.alert.com/alert-api/tasks\", \"jobID\": \"44d6ce47bb4995ef0c8052a9a30ed6d8\", \"alertName\": \"alert-12345678-123456\", \"project\": \"test-sls-project\", \"projectId\": 123, \"aliuid\": \"1234567890\", \"alertDisplayName\": \"\\u6d4b\\u8bd5\\u963f\\u91cc\\u4e91\\u544a\\u8b66\", \"checkJobUri\": \"http://test.alert.com/alert-api/task_check\", \"schedule\": {\"timeZone\": \"\", \"delay\": 0, \"runImmediately\": false, \"type\": \"FixedRate\", \"interval\": \"1m\"}, \"jobRunID\": \"bf86aa5e67a6891d-61016da98c79b-5071a6b\", \"firedNotNotified\": 25161}",
"TaskID": "bf86aa5e67a6891d-61016da98c79b-5071a6b-334f81a-5c38aaa1-9354-43ec-8369-4f41a7c23887",
"TaskType": "ALERT",
"__source__": "11.199.XX.XXX",
"__tag__:__hostname__": "iabcde12345.cloud.abc121",
"__tag__:__path__": "/var/log/service_a.LOG",
"caller": "executor/pool.go:64",
"error": "CouldNotExecuteQuery : {\n \"httpCode\": 404,\n \"errorCode\": \"LogStoreNotExist\",\n \"errorMessage\": \"logstore k8s-event does not exist\",\n \"requestID\": \"65B7C10AB43D9895A8C3DB6A\"\n}",
"requestURL": "/apis/autoscaling/v2beta1/namespaces/python-etl/horizontalpodautoscalers/cn-shenzhen-56492-1234567890123?timeout=30s",
"ts": "2024-01-29 22:57:13"
}Requirements for structured data processing
To derive valuable insights from these logs, data cleansing is essential. Key fields must first be extracted for analysis, which is conducted in Flink. The specific requirements for field extraction are as follows:
Extract
httpCode,errorCode,errorMessage, andrequestIDfrom theerrorfield.Extract
service_afrom__tag__:__path_asserviceName.Extract
pool.gofromcallerasfileName, and64asfileNo.Extract
projectfrom thePayloadfield, and extracttypefrom theschedulewithinPayloadasscheduleType.Rename
__source__toserviceIP.
The final list of required fields is as follows:

Solutions
There are several solutions to data cleansing, each suited to specific scenarios.
Data transformation solution: Create a target logstore in the Simple Log Service console, and a data transformation task for cleansing.
Flink solution: Define
errorandpayloadas source table fields. Use SQL regular functions and JSON functions to parse these fields, insert the parsed data into a temporary table, and perform analysis on the table.SPL solution: Configure SPL statements for the Simple Log Service connector in Realtime Compute for Apache Flink to cleanse data. Define the source table fields in Flink according to the cleansed data structure.
Among these options, the SPL solution provides a more efficient approach to data cleansing. It removes the need for intermediate logstores or temporary tables, particularly for semi-structured log data. By performing cleansing closer to the source, the computing platform can focus on business logic, resulting in a clearer separation of responsibilities.
SPL solution
1. Prepare data in Simple Log Service
Activate the Simple Log Service, and create a project and logstore.
Use the Simple Log Service Java SDK to write the log example into the target logstore as sample analog data. For SDKs in other languages, refer to the corresponding SDK reference.

In the logstore, write the SPL pipeline syntax and preview the effect.

The query statement is as follows. The SPL pipeline syntax uses the
|separator to separate instructions. You can immediately see the result after entering each instruction, then progressively add pipelines to iteratively achieve the final result. For more information, see Syntax of scan-based query.* | project Payload, error, "__tag__:__path__", "__tag__:__hostname__", caller | parse-json Payload | project-away Payload | parse-regexp error, 'CouldNotExecuteQuery : ({[\w":\s,\-}]+)' as errorJson | parse-json errorJson | parse-regexp "__tag__:__path__", '\/var\/log\/([\w\_]+).LOG' as serviceName | parse-regexp caller, '\w+/([\w\.]+):(\d+)' as fileName, fileNo | project-rename serviceHost="__tag__:__hostname__" | extend scheduleType = json_extract_scalar(schedule, '$.type') | project httpCode, errorCode,errorMessage,requestID,fileName, fileNo, serviceHost,scheduleType, projectThe syntax is explained as follows:
project: Retain thePayload,error,tag:path, andcallerfields from the original result, discarding others to facilitate subsequent parsing.parse-json: Convert thePayloadstring into JSON, producing first-level fields likelastNotified,serviceUri, andjobID.project-away: Remove the originalPayloadfield.parse-regexp: Extract a portion of the JSON content from theerrorfield and store it inerrorJson.parse-json: Expand theerrorJsonfield to retrieve fields such ashttpCode,errorCode, anderrorMessage.parse-regexp: Use a regular expression to extract the file name from__tag__:__path__and assign it toserviceName.parse-regexp: Extract the file name and line number fromcaller, placing them in thefileNameandfileNofields, respectively.project-rename: Rename the__tag__:__hostname__field to serviceHost.extend: Use thejson_extract_scalarfunction to extract thetypefield from theschedule, naming itscheduleType.project: Retain the required field list, including theprojectfield fromPayload.
2. Create a SQL job
Log on to the Realtime Compute for Apache Flink console and click the target workspace.
ImportantThe target workspace and the Simple Log Service project must be in the same region.
In the left-side navigation pane, choose .
Click New. In the New Draft dialog box, select and click Next.

Enter a name and click Create. Copy the following SQL to create a temporary table in the draft.
CREATE TEMPORARY TABLE sls_input_complex ( errorCode STRING, errorMessage STRING, fileName STRING, fileNo STRING, httpCode STRING, requestID STRING, scheduleType STRING, serviceHost STRING, project STRING, proctime as PROCTIME() ) WITH ( 'connector' = 'sls', 'endpoint' ='ap-southeast-1-intranet.log.aliyuncs.com', 'accessId' = '${yourAccessKeyID}', 'accessKey' = '${yourAccessKeySecret}', 'starttime' = '2024-02-01 10:30:00', 'project' ='${project}', 'logstore' ='${logtore}', 'query' = '* | project Payload, error, "__tag__:__path__", "__tag__:__hostname__", caller | parse-json Payload | project-away Payload | parse-regexp error, ''CouldNotExecuteQuery : ({[\w":\s,\-}]+)'' as errorJson | parse-json errorJson | parse-regexp "__tag__:__path__", ''\/var\/log\/([\w\_]+).LOG'' as serviceName | parse-regexp caller, ''\w+/([\w\.]+):(\d+)'' as fileName, fileNo | project-rename serviceHost="__tag__:__hostname__" | extend scheduleType = json_extract_scalar(schedule, ''$.type'') | project httpCode, errorCode,errorMessage,requestID,fileName, fileNo, serviceHost,scheduleType,project' );The parameters in the SQL statement are described below. Replace them as needed.
Parameter
Description
Example
connector
See Supported connectors.
sls
endpoint
The internal endpoint used to access your Simple Log Service project. For information about how to obtain it, see View endpoints.
ap-southeast-1-intranet.log.aliyuncs.com
accessId
The AccessKey ID used to identify a user. For more information, see Create an AccessKey pair.
LTAI****************
accessKey
The AccessKey secret used to verify the identity of the user. For more information, see Create an AccessKey pair.
yourAccessKeySecret
starttime
The start time for querying logs.
2025-02-19 00:00:00
project
The name of the Simple Log Service project.
test-project
logstore
The name of the Simple Log Service logstore.
clb-access-log
query
Enter the SPL statement. Note that the strings must be escaped using single quotes.
* | where slbid = ''slb-01''
NoteHere, the
''represents an embedded single quote within the string.Select the SQL, right-click it, and choose Run to connect to Simple Log Service.

3. Perform query and view the result
Copy the following analysis statement into the draft to perform an aggregate query by
slbid.SELECT * FROM sls_input_complex;Click Debug in the upper-right corner. In the debug dialog box, select Create new session cluster from the Session Cluster drop-down list. Refer to the figure below to create a new debug cluster.

In the debug dialog box, select the created debug cluster, and then click OK.

In the Results area, view the column values from the table, which reflect the outcomes processed by SPL. The final list of fields generated by SPL aligns with those in the table.
