本文介紹Flink SQL用SPL完成行過濾與列裁剪的操作步驟。
背景
在阿里雲Flink配置SLS作為源表時,預設會消費SLS的LogStore資料進行動態表的構建,在消費的過程中,可以指定起始時間點,其消費的資料是指定時間點以後的全量資料。這樣做有兩個問題:
-
Connector 從源頭拉取了過多不必要的資料行或者資料列,造成了網路的開銷。
-
這些不必要的資料需要在Flink中進行過濾投影計算,這些清洗工作並不是資料分析的關注重點,造成了計算浪費。
為此,SLS SPL為Flink SLS Connector提供了過濾下推和投影下推的能力。通過配置SLS Connector的query語句或參數,可以實現過濾條件和投影欄位的下推,避免全量資料轉送和計算,提升效率。
方案原理
-
未配置SPL語句時:Flink會拉取SLS的全量日誌資料(包含所有列、所有行)進行計算,如圖所示。
-
配置SPL語句時:當SPL語句包含行過濾或列裁剪操作時,Flink拉取的資料是經過這些操作處理後的部分資料,用於後續計算,如圖所示。
準備工作
-
本文LogStore資料使用SLS的SLB七層日誌類比接入方式產生類比資料,其中包含10多個欄位。類比接入會持續產生隨機的日誌資料,日誌內容樣本如下:
{ "__source__": "127.0.0.1", "__tag__:__receive_time__": "1706531737", "__time__": "1706531727", "__topic__": "slb_layer7", "body_bytes_sent": "3577", "client_ip": "114.137.XXX.XXX", "host": "www.pi.mock.com", "http_host": "www.cwj.mock.com", "http_user_agent": "Mozilla/5.0 (Windows NT 6.2; rv:22.0) Gecko/20130405 Firefox/23.0", "request_length": "1662", "request_method": "GET", "request_time": "31", "request_uri": "/request/path-0/file-3", "scheme": "https", "slbid": "slb-02", "status": "200", "upstream_addr": "42.63.XXX.XXX", "upstream_response_time": "32", "upstream_status": "200", "vip_addr": "223.18.XX.XXX" } -
LogStore中slbid欄位有三種值,對15分鐘的日誌資料進行slbid統計,可以發現
slb-01與slb-02數量相當。在查詢欄輸入
* | select slbid, count(1) cnt group by slbid,查詢結果表格顯示:slb-02 對應 2001 條,slb-01 對應 1986 條,lb-uf6qfcvsrouodhvw39oog 對應 4144 條。
操作步驟
行過濾:SLS SPL為Flink SLS Connector提供了一種支援過濾下推的能力,通過配置SLS Connector的query語句中的過濾條件,即可實現過濾條件下推。避免全量資料轉送和全量資料過濾計算。
列過濾:SLS SPL為Flink SLS Connector提供了一種支援投影下推的能力,通過配置SLS Connector的query參數,即可實現投影欄位下推。避免全量資料轉送和全量資料過濾計算。
行過濾情境
步驟一:建立SQL作業
-
登入Realtime Compute控制台,單擊目標工作空間。
-
在左側導覽列,選擇。
-
單擊建立,在新增作業草稿對話方塊,選擇,單擊下一步。
-
拷貝如下建立暫存資料表的SQL到SQL編輯地區。
CREATE TEMPORARY TABLE sls_input( request_uri STRING, scheme STRING, slbid STRING, status STRING, `__topic__` STRING METADATA VIRTUAL, `__source__` STRING METADATA VIRTUAL, `__timestamp__` STRING METADATA VIRTUAL, __tag__ MAP<VARCHAR, VARCHAR> METADATA VIRTUAL, proctime as PROCTIME() ) WITH ( 'connector' = 'sls', 'endpoint' ='cn-hangzhou-intranet.log.aliyuncs.com', 'accessId' = 'yourAccessKeyID', 'accessKey' = 'yourAccessKeySecret', 'starttime' = '2025-02-19 00:00:00', 'project' ='test-project', 'LogStore' ='clb-access-log', 'query' = '* | where slbid = ''slb-01''' );SQL中的參數說明如下:
參數名
參數含義
樣本值
connector
連接器。更多資訊,請參見支援的連接器。
sls
endpoint
Log Service的私網網域名稱,擷取方式請參見服務存取點。
cn-hangzhou-intranet.log.aliyuncs.com
accessId
使用者身份識別ID,擷取方式,請參見建立AccessKey。
LTAI****************
accessKey
用於驗證您擁有該AccessKey ID的密碼。擷取方式,請參見建立AccessKey。
yourAccessKeySecret
starttime
指定查詢日誌的起始時間點。
2025-02-19 00:00:00
project
Log Service的Project名。
test-project
LogStore
Log Service的LogStore名。
clb-access-log
query
填寫SLS的SPL語句,注意在阿里雲Flink的SQL作業開發中,字串需要使用英文單引號進行轉義。
* | where slbid = ''slb-01''
-
滑鼠選中SQL,滑鼠右擊,單擊運行,串連SLS。
CREATE TEMPORARY TABLE sls_input( request_uri STRING, scheme STRING, slbid STRING, status STRING, `__topic__` STRING METADATA VIRTUAL, `__source__` STRING METADATA VIRTUAL, `__timestamp__` STRING METADATA VIRTUAL, __tag__ MAP<VARCHAR, VARCHAR> METADATA VIRTUAL, proctime as PROCTIME() ) WITH ( 'connector' = 'sls', 'endpoint' = 'cn-haxxx', 'accessId' = 'xxx', 'accessKey' = 'xxx', 'starttime' = '202xxx', 'project' = 'test-pxxx', 'logstore' = 'clb-axxx', 'query' = '*|whexxx' );
步驟二:連續查詢及效果
-
在作業中輸入如下分析語句,按照slbid進行彙總查詢。
SELECT slbid, count(1) as slb_cnt FROM sls_input GROUP BY slbid; -
單擊右上方調試按鈕,在調試彈框,單擊選擇調試叢集下拉框中的建立新的叢集,參考以下配置,建立新的調試叢集。
在建立 Session 叢集頁面,設定 名稱 為
demo-test,部署目標 為default-queue,狀態 為 RUNNING,引擎版本 為vvr-8.0.11-flink-1.17,然後單擊 建立 Session 叢集。 -
在調試彈框選擇建立好的調試叢集,然後單擊確定。
注意:若作業源表的 Connectors 使用了 SLS,選擇線上資料調試會修改消費組的資料消費位點,部署後啟動時將從修改後的位點重新啟動,請謹慎使用。
-
在結果地區,可以看到結果中slbid的欄位值,始終是
slb-01。可以看出設定了SPL語句後,sls_input僅包含slbid='slb-01'的資料,其他不合格資料被過濾掉了。查詢結果表格中同時顯示 slb_cnt 欄位值為
185。
列裁剪情境
步驟一:建立SQL作業
-
登入Realtime Compute控制台,單擊目標工作空間。
-
在左側導覽列,選擇。
-
單擊建立,在新增作業草稿對話方塊,選擇,單擊下一步。
-
拷貝如下建立暫存資料表的SQL到SQL編輯地區。與行過濾情境不同的是,這裡query參數配置進行了修改,在過濾的基礎上增加了投影語句,使用
|符號(類似Unix管道)將不同指令進行分割,上一條指令的輸出作為下一條指令的輸入,最後指令的輸出表示整個管道的輸出。實現從SLS服務端僅拉取特定欄位的內容。CREATE TEMPORARY TABLE sls_input_project( request_uri STRING, scheme STRING, slbid STRING, status STRING, `__topic__` STRING METADATA VIRTUAL, `__source__` STRING METADATA VIRTUAL, `__timestamp__` STRING METADATA VIRTUAL, __tag__ MAP<VARCHAR, VARCHAR> METADATA VIRTUAL, proctime as PROCTIME() ) WITH ( 'connector' = 'sls', 'endpoint' ='cn-hangzhou-intranet.log.aliyuncs.com', 'accessId' = 'yourAccessKeyID', 'accessKey' = 'yourAccessKeySecret', 'starttime' = '2025-02-19 00:00:00', 'project' ='test-project', 'LogStore' ='clb-access-log', 'query' = '* | where slbid = ''slb-01'' | project request_uri, scheme, slbid, status, __topic__, __source__, "__tag__:__receive_time__"' );SQL中的參數說明如下:
參數名
參數含義
樣本值
connector
連接器。更多資訊,請參見支援的連接器。
sls
endpoint
Log Service的私網網域名稱,擷取方式請參見服務存取點。
cn-hangzhou-intranet.log.aliyuncs.com
accessId
使用者身份識別ID,擷取方式,請參見建立AccessKey。
LTAI****************
accessKey
用於驗證您擁有該AccessKey ID的密碼。擷取方式,請參見建立AccessKey。
yourAccessKeySecret
starttime
指定查詢日誌的起始時間點。
2025-02-19 00:00:00
project
Log Service的Project名。
test-project
LogStore
Log Service的LogStore名。
clb-access-log
query
填寫SLS的SPL語句,注意在阿里雲Flink的SQL作業開發中,字串需要使用英文單引號進行轉義。
* | where slbid = ''slb-01''
-
滑鼠選中SQL,滑鼠右擊,單擊運行,串連SLS。
步驟二:連續查詢及效果
-
在作業中輸入如下分析語句,按照slbid進行彙總查詢。
SELECT slbid, count(1) as slb_cnt FROM sls_input_project GROUP BY slbid; -
單擊右上方調試按鈕,在調試彈框,單擊選擇調試叢集下拉框中的建立新的叢集,參考以下配置,建立新的調試叢集。
在建立 Session 叢集頁面,設定 名稱 為
demo-test,部署目標 為default-queue,狀態 為 RUNNING,引擎版本 為vvr-8.0.11-flink-1.17,然後單擊 建立 Session 叢集。 -
在調試彈框選擇建立好的調試叢集,然後單擊確定。
注意:若作業源表的 Connectors 使用了 SLS,選擇線上資料調試會修改消費組的資料消費位點,部署後啟動時將從修改後的位點重新啟動,請謹慎使用。
-
在結果地區,可以看到結果與行過濾情境結果類似。
說明注意:這裡與行過濾不同的是,行過濾情境會返回全量的欄位,而當前的語句令SLS Connector只返回特定的欄位,再次減少了資料的網路傳輸。
查詢結果表格中同時顯示 slb_cnt 欄位值為
185。