本文介紹Flink SQL用SPL完成行過濾與列裁剪的操作步驟。
背景
在阿里雲Flink配置SLS作為源表時,預設會消費SLS的Logstore資料進行動態表的構建,在消費的過程中,可以指定起始時間點,其消費的資料是指定時間點以後的全量資料。這樣做有兩個問題:
Connector 從源頭拉取了過多不必要的資料行或者資料列,造成了網路的開銷。
這些不必要的資料需要在Flink中進行過濾投影計算,這些清洗工作並不是資料分析的關注重點,造成了計算浪費。
為此,SLS SPL為Flink SLS Connector提供了過濾下推和投影下推的能力。通過配置SLS Connector的query語句或參數,可以實現過濾條件和投影欄位的下推,避免全量資料轉送和計算,提升效率。
方案原理
未配置SPL語句時:Flink會拉取SLS的全量日誌資料(包含所有列、所有行)進行計算,如圖所示。
配置SPL語句時:當SPL語句包含行過濾或列裁剪操作時,Flink拉取的資料是經過這些操作處理後的部分資料,用於後續計算,如圖所示。
準備工作
本文Logstore資料使用SLS的SLB七層日誌類比接入方式產生類比資料,其中包含10多個欄位。類比接入會持續產生隨機的日誌資料,日誌內容樣本如下:
{ "__source__": "127.0.0.1", "__tag__:__receive_time__": "1706531737", "__time__": "1706531727", "__topic__": "slb_layer7", "body_bytes_sent": "3577", "client_ip": "114.137.XXX.XXX", "host": "www.pi.mock.com", "http_host": "www.cwj.mock.com", "http_user_agent": "Mozilla/5.0 (Windows NT 6.2; rv:22.0) Gecko/20130405 Firefox/23.0", "request_length": "1662", "request_method": "GET", "request_time": "31", "request_uri": "/request/path-0/file-3", "scheme": "https", "slbid": "slb-02", "status": "200", "upstream_addr": "42.63.XXX.XXX", "upstream_response_time": "32", "upstream_status": "200", "vip_addr": "223.18.XX.XXX" }Logstore中slbid欄位有三種值,對15分鐘的日誌資料進行slbid統計,可以發現
slb-01與slb-02數量相當。
操作步驟
行過濾:SLS SPL為Flink SLS Connector提供了一種支援過濾下推的能力,通過配置SLS Connector的query語句中的過濾條件,即可實現過濾條件下推。避免全量資料轉送和全量資料過濾計算。
列過濾:SLS SPL為Flink SLS Connector提供了一種支援投影下推的能力,通過配置SLS Connector的query參數,即可實現投影欄位下推。避免全量資料轉送和全量資料過濾計算。
行過濾情境
步驟一:建立SQL作業
登入Realtime Compute控制台,單擊目標工作空間。
在左側導覽列,選擇。
單擊建立,在新增作業草稿對話方塊,選擇,單擊下一步。

拷貝如下建立暫存資料表的SQL到SQL編輯地區。
CREATE TEMPORARY TABLE sls_input( request_uri STRING, scheme STRING, slbid STRING, status STRING, `__topic__` STRING METADATA VIRTUAL, `__source__` STRING METADATA VIRTUAL, `__timestamp__` STRING METADATA VIRTUAL, __tag__ MAP<VARCHAR, VARCHAR> METADATA VIRTUAL, proctime as PROCTIME() ) WITH ( 'connector' = 'sls', 'endpoint' ='cn-hangzhou-intranet.log.aliyuncs.com', 'accessId' = 'yourAccessKeyID', 'accessKey' = 'yourAccessKeySecret', 'starttime' = '2025-02-19 00:00:00', 'project' ='test-project', 'logstore' ='clb-access-log', 'query' = '* | where slbid = ''slb-01''' );SQL中的參數說明如下:
參數名
參數含義
樣本值
connector
連接器。更多資訊,請參見支援的連接器。
sls
endpoint
Log Service的私網網域名稱,擷取方式請參見服務存取點。
cn-hangzhou-intranet.log.aliyuncs.com
accessId
使用者身份識別ID,擷取方式,請參見建立AccessKey。
LTAI****************
accessKey
用於驗證您擁有該AccessKey ID的密碼。擷取方式,請參見建立AccessKey。
yourAccessKeySecret
starttime
指定查詢日誌的起始時間點。
2025-02-19 00:00:00
project
Log Service的Project名。
test-project
logstore
Log Service的Logstore名。
clb-access-log
query
填寫SLS的SPL語句,注意在阿里雲Flink的SQL作業開發中,字串需要使用英文單引號進行轉義。
* | where slbid = ''slb-01''
滑鼠選中SQL,滑鼠右擊,單擊運行,串連SLS。

步驟二:連續查詢及效果
在作業中輸入如下分析語句,按照slbid進行彙總查詢。
SELECT slbid, count(1) as slb_cnt FROM sls_input GROUP BY slbid;單擊右上方調試按鈕,在調試彈框,單擊選擇調試叢集下拉框中的建立新的叢集,參考下圖,建立新的調試叢集。

在調試彈框選擇建立好的調試叢集,然後單擊確定。

在結果地區,可以看到結果中slbid的欄位值,始終是
slb-01。可以看出設定了SPL語句後,sls_input僅包含slbid='slb-01'的資料,其他不合格資料被過濾掉了。
列裁剪情境
步驟一:建立SQL作業
登入Realtime Compute控制台,單擊目標工作空間。
在左側導覽列,選擇。
單擊建立,在新增作業草稿對話方塊,選擇,單擊下一步。

拷貝如下建立暫存資料表的SQL到SQL編輯地區。與行過濾情境不同的是,這裡query參數配置進行了修改,在過濾的基礎上增加了投影語句,使用
|符號(類似Unix管道)將不同指令進行分割,上一條指令的輸出作為下一條指令的輸入,最後指令的輸出表示整個管道的輸出。實現從SLS服務端僅拉取特定欄位的內容。CREATE TEMPORARY TABLE sls_input_project( request_uri STRING, scheme STRING, slbid STRING, status STRING, `__topic__` STRING METADATA VIRTUAL, `__source__` STRING METADATA VIRTUAL, `__timestamp__` STRING METADATA VIRTUAL, __tag__ MAP<VARCHAR, VARCHAR> METADATA VIRTUAL, proctime as PROCTIME() ) WITH ( 'connector' = 'sls', 'endpoint' ='cn-hangzhou-intranet.log.aliyuncs.com', 'accessId' = 'yourAccessKeyID', 'accessKey' = 'yourAccessKeySecret', 'starttime' = '2025-02-19 00:00:00', 'project' ='test-project', 'logstore' ='clb-access-log', 'query' = '* | where slbid = ''slb-01'' | project request_uri, scheme, slbid, status, __topic__, __source__, "__tag__:__receive_time__"' );SQL中的參數說明如下:
參數名
參數含義
樣本值
connector
連接器。更多資訊,請參見支援的連接器。
sls
endpoint
Log Service的私網網域名稱,擷取方式請參見服務存取點。
cn-hangzhou-intranet.log.aliyuncs.com
accessId
使用者身份識別ID,擷取方式,請參見建立AccessKey。
LTAI****************
accessKey
用於驗證您擁有該AccessKey ID的密碼。擷取方式,請參見建立AccessKey。
yourAccessKeySecret
starttime
指定查詢日誌的起始時間點。
2025-02-19 00:00:00
project
Log Service的Project名。
test-project
logstore
Log Service的Logstore名。
clb-access-log
query
填寫SLS的SPL語句,注意在阿里雲Flink的SQL作業開發中,字串需要使用英文單引號進行轉義。
* | where slbid = ''slb-01''
滑鼠選中SQL,滑鼠右擊,單擊運行,串連SLS。

步驟二:連續查詢及效果
在作業中輸入如下分析語句,按照slbid進行彙總查詢。
SELECT slbid, count(1) as slb_cnt FROM sls_input_project GROUP BY slbid;單擊右上方調試按鈕,在調試彈框,單擊選擇調試叢集下拉框中的建立新的叢集,參考下圖,建立新的調試叢集。

在調試彈框選擇建立好的調試叢集,然後單擊確定。

在結果地區,可以看到結果與行過濾情境結果類似。
說明注意:這裡與行過濾不同的是,行過濾情境會返回全量的欄位,而當前的語句令SLS Connector只返回特定的欄位,再次減少了資料的網路傳輸。
