全部產品
Search
文件中心

Simple Log Service:Flink SQL基於SPL實現弱結構化分析

更新時間:Jan 06, 2026

本文介紹Flink SQL基於SPL實現弱結構化分析的操作步驟。

背景

Log ServiceSLS是雲原生觀測與分析平台,為Log、Metric、Trace等資料提供大規模、低成本、即時的平台化服務。基於Log Service的便捷的資料接入能力,可以將系統日誌、業務日誌等接入SLS進行儲存、分析。阿里雲Flink是阿里雲基於Apache Flink構建的巨量資料分析平台,在即時資料分析、風控監測等情境應用廣泛。阿里雲Flink原生支援阿里雲Log ServiceSLS的Connector,使用者可以在阿里雲Flink平台將SLS作為源表或者結果表使用。

阿里雲Flink SLS Connector對於結構化的日誌非常直接,通過配置,SLS的日誌欄位可以與Flink SQL的Table欄位列一一映射。然而仍有大量的業務日誌並非完全的結構化,例如會將所有日誌內容寫入一個欄位中,需要用Regex、分隔字元拆分等手段才可以提取出結構化的欄位,基於這個情境,本文介紹一種使用SLS SPL配置SLS Connector完成資料結構化的方案,覆蓋日誌清洗與格式規整情境。

弱結構化日誌資料

下面是一條日誌樣本,日誌格式較為複雜,既有JSON字串,又有字串與JSON混合的情境。其中:

  • Payload為JSON字串,其中schedule欄位的內容也是一段JSON結構。

  • requestURL為一段標準的URL Path路徑。

  • error欄位的前半部分包含CouldNotExecuteQuery字串,後半部分是一段JSON結構。

  • __tag__:__path__包含記錄檔的路徑,其中service_a可能是業務名稱。

  • caller中包含檔案名稱與檔案行數。

{
  "Payload": "{\"lastNotified\": 1705030483, \"serverUri\": \"http://test.alert.com/alert-api/tasks\", \"jobID\": \"44d6ce47bb4995ef0c8052a9a30ed6d8\", \"alertName\": \"alert-12345678-123456\", \"project\": \"test-sls-project\", \"projectId\": 123, \"aliuid\": \"1234567890\", \"alertDisplayName\": \"\\u6d4b\\u8bd5\\u963f\\u91cc\\u4e91\\u544a\\u8b66\", \"checkJobUri\": \"http://test.alert.com/alert-api/task_check\", \"schedule\": {\"timeZone\": \"\", \"delay\": 0, \"runImmediately\": false, \"type\": \"FixedRate\", \"interval\": \"1m\"}, \"jobRunID\": \"bf86aa5e67a6891d-61016da98c79b-5071a6b\", \"firedNotNotified\": 25161}",
  "TaskID": "bf86aa5e67a6891d-61016da98c79b-5071a6b-334f81a-5c38aaa1-9354-43ec-8369-4f41a7c23887",
  "TaskType": "ALERT",
  "__source__": "11.199.XXX.XXX",
  "__tag__:__hostname__": "iabcde12345.cloud.abc121",
  "__tag__:__path__": "/var/log/service_a.LOG",
  "caller": "executor/pool.go:64",
  "error": "CouldNotExecuteQuery : {\n    \"httpCode\": 404,\n    \"errorCode\": \"LogStoreNotExist\",\n    \"errorMessage\": \"logstore k8s-event does not exist\",\n    \"requestID\": \"65B7C10AB43D9895A8C3DB6A\"\n}",
  "requestURL": "/apis/autoscaling/v2beta1/namespaces/python-etl/horizontalpodautoscalers/cn-shenzhen-56492-1234567890123?timeout=30s",
  "ts": "2024-01-29 22:57:13"
}

結構化資料處理需求

對於這樣的日誌提取出更有價值的資訊需要進行資料清洗,首先需要提取重要的欄位,然後對這些欄位進行資料分析。本文關注重要欄位的提取,分析仍然可以在Flink中進行。假設提取欄位具體需求如下:

  • 提取error中的httpCodeerrorCodeerrorMessagerequestID

  • 提取 __tag__:__path_中的_service_a作為serviceName

  • 提取caller中的pool.go作為fileName,64作為fileNo

  • 提取Payload中的project,提取Payload下面的schedule中的typescheduleType

  • 重新命名 __source__ serviceIP

其餘欄位捨棄,最終需要的欄位列表如下。

image

解決方案

實現這樣的資料清洗,有很多種方法,這裡列舉幾種基於SLS與Flink的方案,不同方案之間沒有絕對的優劣,需要根據不同的情境選擇不同的方案。

  • 資料加工方案:在SLS控制台建立目標LogStore,通過建立資料加工任務,完成對資料的清洗。

  • Flink方案:將error和payload指定為源表欄位,通過SQL正則函數、JSON函數對欄位進行解析,將解析後的欄位寫入暫存資料表,然後對暫存資料表進行分析。

  • SPL方案:在Flink SLS Connector中配置SPL語句,對資料進行清洗,Flink中源表欄位定義為清洗後的資料結構。

從上述三種方案的原理不難看出,在需要資料清洗的情境中,在SLS Connector 中配置SPL,是一種更輕量化的方案。在日誌資料弱結構化的情境中,SPL方案既避免了方案一中建立臨時中間LogStore,也避免了方案二中在Flink中建立暫存資料表,在離資料來源更近的位置進行資料清洗,在計算平台關注商務邏輯,職責分離更加清晰。

在Flink中使用SPL

1. SLS準備資料

  1. 開通Log Service,已建立Project和LogStore

  2. 將上文日誌片段通過SDK方式寫入到目標LogStore,用於範例類比資料。

    image

  3. 在LogStore,編寫SLS SPL管道式文法,預覽SPL效果。

    image

    查詢分析語句如下,SLS SPL管道式文法使用|分隔字元分割不同的指令,每次輸入一個指令可以即時查看結果,然後再增加管道數,漸進式、探索式擷取最終結果。更多資訊,請參考掃描(Scan)查詢文法

    * | project Payload, error, "__tag__:__path__", "__tag__:__hostname__", caller 
     | parse-json Payload 
     | project-away Payload 
     | parse-regexp error, 'CouldNotExecuteQuery : ({[\w":\s,\-}]+)' as errorJson 
     | parse-json errorJson 
     | parse-regexp "__tag__:__path__", '\/var\/log\/([\w\_]+).LOG' as serviceName 
     | parse-regexp caller, '\w+/([\w\.]+):(\d+)' as fileName, fileNo 
     | project-rename serviceHost="__tag__:__hostname__" 
     | extend scheduleType = json_extract_scalar(schedule, '$.type') 
     | project httpCode, errorCode,errorMessage,requestID,fileName, fileNo, serviceHost,scheduleType, project

    文法解釋如下:

    • 1行:project指令:從原始結果中保留Payload、error、tag:path、caller欄位,捨棄其他欄位,這些欄位用於後續解析。

    • 2行:parse-json指令:將Payload字串展開為JSON,第一層欄位出現在結果中,包括lastNotified、serviceUri、jobID等。

    • 3行:project-away指令:去除原始Payload欄位。

    • 4行:parse-regexp指令:按照error欄位中的內容,解析其中的部分JSON內容,置於errorJson欄位。

    • 5行:parse-json指令:展開errorJson欄位,得到httpCode、errorCode、errorMessage等欄位。

    • 6行:parse-regexp指令:通過Regex解析出__tag__:__path__中的檔案名稱,並命名為serviceName。

    • 7行:parse-regexp指令:通過Regex,解析出caller中的檔案名稱與行數,共置於fileName、fileNo欄位。

    • 8行:project-rename指令:將__tag__:__hostname__欄位重新命名為serviceHost。

    • 9行:extend指令:使用json_extract_scalar函數,提取schedule中的type欄位,並命名為scheduleType。

    • 10行:project指令:保留需要的欄位列表,其中project欄位來自於Payload。

2. 建立SQL作業

  1. 登入Realtime Compute控制台,單擊目標工作空間。

  2. 在左側導覽列,選擇資料開發 > ETL

  3. 單擊建立,在新增作業草稿對話方塊,選擇SQL基礎模板 > 空白的流作業草稿,單擊下一步

    image

  4. 在作業草稿中輸入如下建立暫存資料表的語句。

    CREATE TEMPORARY TABLE sls_input_complex (
      errorCode STRING,
      errorMessage STRING,
      fileName STRING,
      fileNo STRING,
      httpCode STRING,
      requestID STRING,
      scheduleType STRING,
      serviceHost STRING,
      project STRING,
      proctime as PROCTIME()
    ) WITH (
      'connector' = 'sls',
      'endpoint' ='cn-beijing-intranet.log.aliyuncs.com',
      'accessId' = '${yourAccessKeyID}',
      'accessKey' = '${yourAccessKeySecret}',
      'starttime' = '2024-02-01 10:30:00',
      'project' ='${project}',
      'logstore' ='${logtore}',
      'query' = '* | project Payload, error, "__tag__:__path__", "__tag__:__hostname__", caller | parse-json Payload | project-away Payload | parse-regexp error, ''CouldNotExecuteQuery : ({[\w":\s,\-}]+)'' as errorJson | parse-json errorJson | parse-regexp "__tag__:__path__", ''\/var\/log\/([\w\_]+).LOG'' as serviceName | parse-regexp caller, ''\w+/([\w\.]+):(\d+)'' as fileName, fileNo | project-rename serviceHost="__tag__:__hostname__" | extend scheduleType = json_extract_scalar(schedule, ''$.type'') | project httpCode, errorCode,errorMessage,requestID,fileName, fileNo, serviceHost,scheduleType,project'
      );

    SQL中的參數說明如下,請根據實際情況替換。

    參數名

    參數含義

    樣本值

    connector

    連接器。更多資訊,請參見支援的連接器

    sls

    endpoint

    Log Service的私網網域名稱,擷取方式請參見服務存取點

    cn-hangzhou-intranet.log.aliyuncs.com

    accessId

    使用者身份識別ID,擷取方式,請參見建立AccessKey

    LTAI****************

    accessKey

    用於驗證您擁有該AccessKey ID的密碼。擷取方式,請參見建立AccessKey

    yourAccessKeySecret

    starttime

    指定查詢日誌的起始時間點。

    2025-02-19 00:00:00

    project

    Log Service的Project名。

    test-project

    logstore

    Log Service的Logstore名。

    clb-access-log

    query

    填寫SLS的SPL語句,注意在阿里雲Flink的SQL作業開發中,字串需要使用英文單引號進行轉義。

    * | where slbid = ''slb-01''

  5. 滑鼠選中SQL,滑鼠右擊,單擊運行,串連SLS。

    image

3. 連續查詢及效果

  1. 在作業中輸入如下分析語句,按照slbid進行彙總查詢。

    SELECT * FROM sls_input_complex;
  2. 單擊右上方調試按鈕,在調試彈框,單擊選擇調試叢集下拉框中的建立新的叢集,參考下圖,建立新的調試叢集。

    image

  3. 在調試彈框選擇建立好的調試叢集,然後單擊確定

    image

  4. 在結果地區,可以看到TABLE中每一列的值,對應SPL處理後的結果。SPL最終得到的欄位列表與TABLE中欄位對應。

    image