全部產品
Search
文件中心

Simple Log Service:Flink SQL基於SPL實現行過濾與列裁剪

更新時間:Apr 23, 2025

本文介紹Flink SQL用SPL完成行過濾與列裁剪的操作步驟。

背景

在阿里雲Flink配置SLS作為源表時,預設會消費SLS的Logstore資料進行動態表的構建,在消費的過程中,可以指定起始時間點,其消費的資料是指定時間點以後的全量資料。這樣做有兩個問題:

  1. Connector 從源頭拉取了過多不必要的資料行或者資料列,造成了網路的開銷。

  2. 這些不必要的資料需要在Flink中進行過濾投影計算,這些清洗工作並不是資料分析的關注重點,造成了計算浪費。

為此,SLS SPL為Flink SLS Connector提供了過濾下推和投影下推的能力。通過配置SLS Connector的query語句或參數,可以實現過濾條件和投影欄位的下推,避免全量資料轉送和計算,提升效率。

方案原理

  • 未配置SPL語句時:Flink會拉取SLS的全量日誌資料(包含所有列、所有行)進行計算,如圖所示。

  • 配置SPL語句時:當SPL語句包含行過濾或列裁剪操作時,Flink拉取的資料是經過這些操作處理後的部分資料,用於後續計算,如圖所示。

準備工作

  • 開通Log Service,已建立Project和Logstore

  • 本文Logstore資料使用SLS的SLB七層日誌類比接入方式產生類比資料,其中包含10多個欄位。類比接入會持續產生隨機的日誌資料,日誌內容樣本如下:

    {
      "__source__": "127.0.0.1",
      "__tag__:__receive_time__": "1706531737",
      "__time__": "1706531727",
      "__topic__": "slb_layer7",
      "body_bytes_sent": "3577",
      "client_ip": "114.137.XXX.XXX",
      "host": "www.pi.mock.com",
      "http_host": "www.cwj.mock.com",
      "http_user_agent": "Mozilla/5.0 (Windows NT 6.2; rv:22.0) Gecko/20130405 Firefox/23.0",
      "request_length": "1662",
      "request_method": "GET",
      "request_time": "31",
      "request_uri": "/request/path-0/file-3",
      "scheme": "https",
      "slbid": "slb-02",
      "status": "200",
      "upstream_addr": "42.63.XXX.XXX",
      "upstream_response_time": "32",
      "upstream_status": "200",
      "vip_addr": "223.18.XX.XXX"
    }
  • Logstore中slbid欄位有三種值,對15分鐘的日誌資料進行slbid統計,可以發現slb-01slb-02數量相當。

    image

操作步驟

行過濾:SLS SPL為Flink SLS Connector提供了一種支援過濾下推的能力,通過配置SLS Connector的query語句中的過濾條件,即可實現過濾條件下推。避免全量資料轉送和全量資料過濾計算。

列過濾:SLS SPL為Flink SLS Connector提供了一種支援投影下推的能力,通過配置SLS Connector的query參數,即可實現投影欄位下推。避免全量資料轉送和全量資料過濾計算。

行過濾情境

步驟一:建立SQL作業

  1. 登入Realtime Compute控制台,單擊目標工作空間。

  2. 在左側導覽列,選擇資料開發 > ETL

  3. 單擊建立,在新增作業草稿對話方塊,選擇SQL基礎模板 > 空白的流作業草稿,單擊下一步

    image

  4. 拷貝如下建立暫存資料表的SQL到SQL編輯地區。

    CREATE TEMPORARY TABLE sls_input(
      request_uri STRING,
      scheme STRING,
      slbid STRING,
      status STRING,
      `__topic__` STRING METADATA VIRTUAL,
      `__source__` STRING METADATA VIRTUAL,
      `__timestamp__` STRING METADATA VIRTUAL,
       __tag__ MAP<VARCHAR, VARCHAR> METADATA VIRTUAL,
      proctime as PROCTIME()
    ) WITH (
      'connector' = 'sls',
      'endpoint' ='cn-hangzhou-intranet.log.aliyuncs.com',
      'accessId' = 'yourAccessKeyID',
      'accessKey' = 'yourAccessKeySecret',
      'starttime' = '2025-02-19 00:00:00',
      'project' ='test-project',
      'logstore' ='clb-access-log',
      'query' = '* | where slbid = ''slb-01'''
    );

    SQL中的參數說明如下:

    參數名

    參數含義

    樣本值

    connector

    連接器。更多資訊,請參見支援的連接器

    sls

    endpoint

    Log Service的私網網域名稱,擷取方式請參見服務存取點

    cn-hangzhou-intranet.log.aliyuncs.com

    accessId

    使用者身份識別ID,擷取方式,請參見建立AccessKey

    LTAI****************

    accessKey

    用於驗證您擁有該AccessKey ID的密碼。擷取方式,請參見建立AccessKey

    yourAccessKeySecret

    starttime

    指定查詢日誌的起始時間點。

    2025-02-19 00:00:00

    project

    Log Service的Project名。

    test-project

    logstore

    Log Service的Logstore名。

    clb-access-log

    query

    填寫SLS的SPL語句,注意在阿里雲Flink的SQL作業開發中,字串需要使用英文單引號進行轉義。

    * | where slbid = ''slb-01''

  5. 滑鼠選中SQL,滑鼠右擊,單擊運行,串連SLS。

    image

步驟二:連續查詢及效果

  1. 在作業中輸入如下分析語句,按照slbid進行彙總查詢。

    SELECT slbid, count(1) as slb_cnt FROM sls_input GROUP BY slbid;
  2. 單擊右上方調試按鈕,在調試彈框,單擊選擇調試叢集下拉框中的建立新的叢集,參考下圖,建立新的調試叢集。

    image

  3. 在調試彈框選擇建立好的調試叢集,然後單擊確定

    image

  4. 在結果地區,可以看到結果中slbid的欄位值,始終是slb-01。可以看出設定了SPL語句後,sls_input僅包含slbid='slb-01'的資料,其他不合格資料被過濾掉了。

    image

列裁剪情境

步驟一:建立SQL作業

  1. 登入Realtime Compute控制台,單擊目標工作空間。

  2. 在左側導覽列,選擇資料開發 > ETL

  3. 單擊建立,在新增作業草稿對話方塊,選擇SQL基礎模板 > 空白的流作業草稿,單擊下一步

    image

  4. 拷貝如下建立暫存資料表的SQL到SQL編輯地區。與行過濾情境不同的是,這裡query參數配置進行了修改,在過濾的基礎上增加了投影語句,使用|符號(類似Unix管道)將不同指令進行分割,上一條指令的輸出作為下一條指令的輸入,最後指令的輸出表示整個管道的輸出。實現從SLS服務端僅拉取特定欄位的內容。

    CREATE TEMPORARY TABLE sls_input_project(
      request_uri STRING,
      scheme STRING,
      slbid STRING,
      status STRING,
      `__topic__` STRING METADATA VIRTUAL,
      `__source__` STRING METADATA VIRTUAL,
      `__timestamp__` STRING METADATA VIRTUAL,
       __tag__ MAP<VARCHAR, VARCHAR> METADATA VIRTUAL,
      proctime as PROCTIME()
    ) WITH (
      'connector' = 'sls',
      'endpoint' ='cn-hangzhou-intranet.log.aliyuncs.com',
      'accessId' = 'yourAccessKeyID',
      'accessKey' = 'yourAccessKeySecret',
      'starttime' = '2025-02-19 00:00:00',
      'project' ='test-project',
      'logstore' ='clb-access-log',
      'query' = '* | where slbid = ''slb-01'' | project request_uri, scheme, slbid, status, __topic__, __source__, "__tag__:__receive_time__"'
    );

    SQL中的參數說明如下:

    參數名

    參數含義

    樣本值

    connector

    連接器。更多資訊,請參見支援的連接器

    sls

    endpoint

    Log Service的私網網域名稱,擷取方式請參見服務存取點

    cn-hangzhou-intranet.log.aliyuncs.com

    accessId

    使用者身份識別ID,擷取方式,請參見建立AccessKey

    LTAI****************

    accessKey

    用於驗證您擁有該AccessKey ID的密碼。擷取方式,請參見建立AccessKey

    yourAccessKeySecret

    starttime

    指定查詢日誌的起始時間點。

    2025-02-19 00:00:00

    project

    Log Service的Project名。

    test-project

    logstore

    Log Service的Logstore名。

    clb-access-log

    query

    填寫SLS的SPL語句,注意在阿里雲Flink的SQL作業開發中,字串需要使用英文單引號進行轉義。

    * | where slbid = ''slb-01''

  5. 滑鼠選中SQL,滑鼠右擊,單擊運行,串連SLS。

    image

步驟二:連續查詢及效果

  1. 在作業中輸入如下分析語句,按照slbid進行彙總查詢。

    SELECT slbid, count(1) as slb_cnt FROM sls_input_project GROUP BY slbid;
  2. 單擊右上方調試按鈕,在調試彈框,單擊選擇調試叢集下拉框中的建立新的叢集,參考下圖,建立新的調試叢集。

    image

  3. 在調試彈框選擇建立好的調試叢集,然後單擊確定

    image

  4. 在結果地區,可以看到結果與行過濾情境結果類似。

    說明

    注意:這裡與行過濾不同的是,行過濾情境會返回全量的欄位,而當前的語句令SLS Connector只返回特定的欄位,再次減少了資料的網路傳輸。

    image