本文介紹Flink SQL基於SPL實現弱結構化分析的操作步驟。
背景
Log ServiceSLS是雲原生觀測與分析平台,為Log、Metric、Trace等資料提供大規模、低成本、即時的平台化服務。基於Log Service的便捷的資料接入能力,可以將系統日誌、業務日誌等接入SLS進行儲存、分析。阿里雲Flink是阿里雲基於Apache Flink構建的巨量資料分析平台,在即時資料分析、風控監測等情境應用廣泛。阿里雲Flink原生支援阿里雲Log ServiceSLS的Connector,使用者可以在阿里雲Flink平台將SLS作為源表或者結果表使用。
阿里雲Flink SLS Connector對於結構化的日誌非常直接,通過配置,SLS的日誌欄位可以與Flink SQL的Table欄位列一一映射。然而仍有大量的業務日誌並非完全的結構化,例如會將所有日誌內容寫入一個欄位中,需要用Regex、分隔字元拆分等手段才可以提取出結構化的欄位,基於這個情境,本文介紹一種使用SLS SPL配置SLS Connector完成資料結構化的方案,覆蓋日誌清洗與格式規整情境。
弱結構化日誌資料
下面是一條日誌樣本,日誌格式較為複雜,既有JSON字串,又有字串與JSON混合的情境。其中:
Payload為JSON字串,其中schedule欄位的內容也是一段JSON結構。
requestURL為一段標準的URL Path路徑。
error欄位的前半部分包含CouldNotExecuteQuery字串,後半部分是一段JSON結構。
__tag__:__path__包含記錄檔的路徑,其中service_a可能是業務名稱。
caller中包含檔案名稱與檔案行數。
{
"Payload": "{\"lastNotified\": 1705030483, \"serverUri\": \"http://test.alert.com/alert-api/tasks\", \"jobID\": \"44d6ce47bb4995ef0c8052a9a30ed6d8\", \"alertName\": \"alert-12345678-123456\", \"project\": \"test-sls-project\", \"projectId\": 123, \"aliuid\": \"1234567890\", \"alertDisplayName\": \"\\u6d4b\\u8bd5\\u963f\\u91cc\\u4e91\\u544a\\u8b66\", \"checkJobUri\": \"http://test.alert.com/alert-api/task_check\", \"schedule\": {\"timeZone\": \"\", \"delay\": 0, \"runImmediately\": false, \"type\": \"FixedRate\", \"interval\": \"1m\"}, \"jobRunID\": \"bf86aa5e67a6891d-61016da98c79b-5071a6b\", \"firedNotNotified\": 25161}",
"TaskID": "bf86aa5e67a6891d-61016da98c79b-5071a6b-334f81a-5c38aaa1-9354-43ec-8369-4f41a7c23887",
"TaskType": "ALERT",
"__source__": "11.199.XXX.XXX",
"__tag__:__hostname__": "iabcde12345.cloud.abc121",
"__tag__:__path__": "/var/log/service_a.LOG",
"caller": "executor/pool.go:64",
"error": "CouldNotExecuteQuery : {\n \"httpCode\": 404,\n \"errorCode\": \"LogStoreNotExist\",\n \"errorMessage\": \"logstore k8s-event does not exist\",\n \"requestID\": \"65B7C10AB43D9895A8C3DB6A\"\n}",
"requestURL": "/apis/autoscaling/v2beta1/namespaces/python-etl/horizontalpodautoscalers/cn-shenzhen-56492-1234567890123?timeout=30s",
"ts": "2024-01-29 22:57:13"
}結構化資料處理需求
對於這樣的日誌提取出更有價值的資訊需要進行資料清洗,首先需要提取重要的欄位,然後對這些欄位進行資料分析。本文關注重要欄位的提取,分析仍然可以在Flink中進行。假設提取欄位具體需求如下:
提取error中的httpCode、errorCode、errorMessage和requestID。
提取 __tag__:__path_中的_service_a作為serviceName。
提取caller中的pool.go作為fileName,64作為fileNo
提取Payload中的project,提取Payload下面的schedule中的type為scheduleType。
重新命名 __source__ 為serviceIP。
其餘欄位捨棄,最終需要的欄位列表如下。

解決方案
實現這樣的資料清洗,有很多種方法,這裡列舉幾種基於SLS與Flink的方案,不同方案之間沒有絕對的優劣,需要根據不同的情境選擇不同的方案。
資料加工方案:在SLS控制台建立目標LogStore,通過建立資料加工任務,完成對資料的清洗。
Flink方案:將error和payload指定為源表欄位,通過SQL正則函數、JSON函數對欄位進行解析,將解析後的欄位寫入暫存資料表,然後對暫存資料表進行分析。
SPL方案:在Flink SLS Connector中配置SPL語句,對資料進行清洗,Flink中源表欄位定義為清洗後的資料結構。
從上述三種方案的原理不難看出,在需要資料清洗的情境中,在SLS Connector 中配置SPL,是一種更輕量化的方案。在日誌資料弱結構化的情境中,SPL方案既避免了方案一中建立臨時中間LogStore,也避免了方案二中在Flink中建立暫存資料表,在離資料來源更近的位置進行資料清洗,在計算平台關注商務邏輯,職責分離更加清晰。
在Flink中使用SPL
1. SLS準備資料
將上文日誌片段通過SDK方式寫入到目標LogStore,用於範例類比資料。

在LogStore,編寫SLS SPL管道式文法,預覽SPL效果。

查詢分析語句如下,SLS SPL管道式文法使用
|分隔字元分割不同的指令,每次輸入一個指令可以即時查看結果,然後再增加管道數,漸進式、探索式擷取最終結果。更多資訊,請參考掃描(Scan)查詢文法。* | project Payload, error, "__tag__:__path__", "__tag__:__hostname__", caller | parse-json Payload | project-away Payload | parse-regexp error, 'CouldNotExecuteQuery : ({[\w":\s,\-}]+)' as errorJson | parse-json errorJson | parse-regexp "__tag__:__path__", '\/var\/log\/([\w\_]+).LOG' as serviceName | parse-regexp caller, '\w+/([\w\.]+):(\d+)' as fileName, fileNo | project-rename serviceHost="__tag__:__hostname__" | extend scheduleType = json_extract_scalar(schedule, '$.type') | project httpCode, errorCode,errorMessage,requestID,fileName, fileNo, serviceHost,scheduleType, project文法解釋如下:
1行:project指令:從原始結果中保留Payload、error、tag:path、caller欄位,捨棄其他欄位,這些欄位用於後續解析。
2行:parse-json指令:將Payload字串展開為JSON,第一層欄位出現在結果中,包括lastNotified、serviceUri、jobID等。
3行:project-away指令:去除原始Payload欄位。
4行:parse-regexp指令:按照error欄位中的內容,解析其中的部分JSON內容,置於errorJson欄位。
5行:parse-json指令:展開errorJson欄位,得到httpCode、errorCode、errorMessage等欄位。
6行:parse-regexp指令:通過Regex解析出__tag__:__path__中的檔案名稱,並命名為serviceName。
7行:parse-regexp指令:通過Regex,解析出caller中的檔案名稱與行數,共置於fileName、fileNo欄位。
8行:project-rename指令:將__tag__:__hostname__欄位重新命名為serviceHost。
9行:extend指令:使用json_extract_scalar函數,提取schedule中的type欄位,並命名為scheduleType。
10行:project指令:保留需要的欄位列表,其中project欄位來自於Payload。
2. 建立SQL作業
登入Realtime Compute控制台,單擊目標工作空間。
在左側導覽列,選擇。
單擊建立,在新增作業草稿對話方塊,選擇,單擊下一步。

在作業草稿中輸入如下建立暫存資料表的語句。
CREATE TEMPORARY TABLE sls_input_complex ( errorCode STRING, errorMessage STRING, fileName STRING, fileNo STRING, httpCode STRING, requestID STRING, scheduleType STRING, serviceHost STRING, project STRING, proctime as PROCTIME() ) WITH ( 'connector' = 'sls', 'endpoint' ='cn-beijing-intranet.log.aliyuncs.com', 'accessId' = '${yourAccessKeyID}', 'accessKey' = '${yourAccessKeySecret}', 'starttime' = '2024-02-01 10:30:00', 'project' ='${project}', 'logstore' ='${logtore}', 'query' = '* | project Payload, error, "__tag__:__path__", "__tag__:__hostname__", caller | parse-json Payload | project-away Payload | parse-regexp error, ''CouldNotExecuteQuery : ({[\w":\s,\-}]+)'' as errorJson | parse-json errorJson | parse-regexp "__tag__:__path__", ''\/var\/log\/([\w\_]+).LOG'' as serviceName | parse-regexp caller, ''\w+/([\w\.]+):(\d+)'' as fileName, fileNo | project-rename serviceHost="__tag__:__hostname__" | extend scheduleType = json_extract_scalar(schedule, ''$.type'') | project httpCode, errorCode,errorMessage,requestID,fileName, fileNo, serviceHost,scheduleType,project' );SQL中的參數說明如下,請根據實際情況替換。
參數名
參數含義
樣本值
connector
連接器。更多資訊,請參見支援的連接器。
sls
endpoint
Log Service的私網網域名稱,擷取方式請參見服務存取點。
cn-hangzhou-intranet.log.aliyuncs.com
accessId
使用者身份識別ID,擷取方式,請參見建立AccessKey。
LTAI****************
accessKey
用於驗證您擁有該AccessKey ID的密碼。擷取方式,請參見建立AccessKey。
yourAccessKeySecret
starttime
指定查詢日誌的起始時間點。
2025-02-19 00:00:00
project
Log Service的Project名。
test-project
logstore
Log Service的Logstore名。
clb-access-log
query
填寫SLS的SPL語句,注意在阿里雲Flink的SQL作業開發中,字串需要使用英文單引號進行轉義。
* | where slbid = ''slb-01''
滑鼠選中SQL,滑鼠右擊,單擊運行,串連SLS。

3. 連續查詢及效果
在作業中輸入如下分析語句,按照slbid進行彙總查詢。
SELECT * FROM sls_input_complex;單擊右上方調試按鈕,在調試彈框,單擊選擇調試叢集下拉框中的建立新的叢集,參考下圖,建立新的調試叢集。

在調試彈框選擇建立好的調試叢集,然後單擊確定。

在結果地區,可以看到TABLE中每一列的值,對應SPL處理後的結果。SPL最終得到的欄位列表與TABLE中欄位對應。
