阿里雲Realtime Compute(Flink)通過建立Log Service源表的方式,可以直接消費Log Service的資料。本文介紹了如何為Realtime Compute建立Log Service源表以及建立過程涉及到的屬性欄位提取。
背景資訊
Flink消費日誌支援的資訊如下:
|
類別 |
詳情 |
|
支援類型 |
源表和結果表。 |
|
運行模式 |
僅支援流模式。 |
|
特有監控指標 |
暫不適用。 |
|
資料格式 |
暫無。 |
|
API種類 |
SQL。 |
|
是否支援更新或刪除結果表資料 |
不支援更新和刪除結果表資料,只支援插入資料。 |
使用Flink消費日誌的入口,請參見Flink SQL作業快速入門。
前提條件
-
如果您使用RAM使用者或RAM角色等身份訪問,需要確認已具有Flink控制台相關許可權,詳情請參見許可權管理。
-
已建立Flink工作空間,詳情請參見開通Realtime ComputeFlink版。
-
已建立Project和LogStore。更多資訊,請參見建立Project和LogStore。
使用限制
僅Realtime Compute引擎VVR 11.1及以上版本支援Log ServiceSLS作為資料攝入YAML的同步資料來源。
SLS連接器僅保證At-Least-Once語義。
強烈建議不要設定Source並發度大於Shard個數,不僅會造成資源浪費,且在8.0.5及更低版本中,如果後續Shard數目發生變化,自動Failover功能可能會失效,造成部分Shard不被消費。
建立Log Service源表和結果表
使用Flink消費Log Service資料,必須有一個完整的SQL作業。完整的SQL作業包含:源表、結果表,在經過商務邏輯處理後將源表資料插入到結果表(INSERT INTO語句)。
FlinkSQL作業開發請參見作業開發地圖。
Log Service是即時資料儲存,Realtime Compute能將其作為流式資料輸入。假設有如下日誌內容:
__source__: 11.85.*.199
__tag__:__receive_time__: 1562125591
__topic__: test-topic
request_method: GET
status: 200
程式碼範例
Flink消費Log Service資料的SQL開發作業代碼如下:
SQL中的表名、列名和保留字衝突時,需要使用反引號'`'括起來。
CREATE TEMPORARY TABLE sls_input(
request_method STRING,
status BIGINT,
`__topic__` STRING METADATA VIRTUAL,
`__source__` STRING METADATA VIRTUAL,
`__timestamp__` BIGINT METADATA VIRTUAL,
__tag__ MAP<VARCHAR, VARCHAR> METADATA VIRTUAL,
proctime as PROCTIME()
) WITH (
'connector' = 'sls',
'endpoint' ='cn-hangzhou-intranet.log.aliyuncs.com',
'accessId' = '${secret_values.ak_id}',
'accessKey' = '${secret_values.ak_secret}',
'starttime' = '2023-08-30 00:00:00',
'project' ='sls-test',
'logstore' ='sls-input'
);
CREATE TEMPORARY TABLE sls_sink(
request_method STRING,
status BIGINT,
`__topic__` STRING,
`__source__` STRING,
`__timestamp__` BIGINT ,
receive_time BIGINT
) WITH (
'connector' = 'sls',
'endpoint' ='cn-hangzhou-intranet.log.aliyuncs.com',
'accessId' = '${ak_id}',
'accessKey' = '${ak_secret}',
'project' ='sls-test',
'logstore' ='sls-output'
);
INSERT INTO sls_sink
SELECT
request_method,
status,
`__topic__` ,
`__source__` ,
`__timestamp__` ,
cast(__tag__['__receive_time__'] as bigint) as receive_time
FROM sls_input;
WITH參數
通用
參數
說明
資料類型
是否必填
預設值
備忘
connector
表類型。
String
是
無
固定值sls。
endPoint
EndPoint地址。
String
是
無
請填寫SLS的私網服務地址,詳情請參見服務存取點。
project
SLS專案名稱。
String
是
無
無。
logStore
SLS LogStore或metricstore名稱。
String
是
無
logStore和metricstore是相同的消費方式。
accessId
阿里雲帳號的AccessKey ID。
String
是
無
詳情請參見如何查看AccessKey ID和AccessKey Secret資訊?。
重要為了避免您的AK資訊泄露,建議您使用變數的方式填寫AccessKey取值,詳情請參見專案變數。
accessKey
阿里雲帳號的AccessKey Secret。
String
是
無
源表專屬
參數
說明
資料類型
是否必填
預設值
備忘
enableNewSource
是否啟用實現了FLIP-27介面的新資料來源。
Boolean
否
false
新資料來源可以自動適應Shard變化,同時儘可能保證Shard在所有的source並發上分布均勻。
重要僅Realtime Compute引擎VVR 8.0.9及以上版本支援該參數。從Realtime Compute引擎VVR 11.1版本開始該參數預設為true。
作業在該配置項發生變化後無法從狀態恢複。可通過先設定配置項consumerGroup啟動作業,將消費進度記錄到SLS消費組中,再將配置項consumeFromCheckpoint設為true後無狀態啟動作業,從而實現從歷史進度繼續消費。
如果SLS中存在唯讀Shard,Flink的某些並發任務在完成對唯讀Shard的消費後會繼續請求讀取其他未完成的Shard。這可能導致部分並發任務被分配到多個Shard,從而造成不同並發任務之間的Shard分配不均衡。這種不均衡會影響整體的消費效率和系統效能。為緩解這一問題,您可以通過調整並發度、最佳化任務調度策略、合并小Shard等方法,以減少Shard數量和任務分配複雜度。
shardDiscoveryIntervalMs
動態檢測shard變化時間間隔,單位為毫秒。
Long
否
60000
設定為負值時可以關閉動態檢測。
說明該參數值不能少於1分鐘(60000毫秒)。
僅當配置項enableNewSource為true時生效。
僅Realtime Compute引擎VVR 8.0.9及以上版本支援該參數。
startupMode
源表啟動模式。
String
否
timestamp
timestamp(預設):從指定的起始時間開始消費日誌。latest:從最新位點開始消費日誌。earliest:從最早位點開始消費日誌。consumer_group:從消費組記錄位點開始消費日誌。若消費組未記錄某shard消費位點,則會從最早位點開始消費日誌。
重要Realtime Compute引擎VVR 11.1以下版本,不支援取值為consumer_group,需要將
consumeFromCheckpoint設為true,此時會從指定消費組記錄的位點開始消費日誌,此處的啟動模式將不會生效。
startTime
消費日誌的開始時間。
String
否
目前時間
格式為
yyyy-MM-dd hh:mm:ss。僅當
startupMode設為timestamp時生效。說明startTime和stopTime基於SLS中的__receive_time__屬性,而非__timestamp__屬性。
stopTime
消費日誌的結束時間。
String
否
無
格式為
yyyy-MM-dd hh:mm:ss。說明僅用於消費歷史日誌,應設定為過去時間點。若配置為未來時間,可能因暫無新日誌寫入而導致消費提前終止,表現為資料流中斷且無異常提示。
如期望日誌消費到結尾時退出Flink程式,需要同時設定exitAfterFinish=true.
consumerGroup
消費組名稱。
String
否
無
消費組用於記錄消費進度。您可以自訂消費組名,無固定格式。
說明不支援通過相同的消費組進行多作業的協同消費。不同的Flink作業應該設定不同的消費組。如果不同的Flink作業使用相同的消費組,它們將會消費全部資料。這是因為在Flink消費SLS的資料時,並不會經過SLS消費組進行分區分配,因此導致各個消費者獨立消費各自的訊息,即使消費組是相同的。
consumeFromCheckpoint
是否從指定的消費組中儲存的Checkpoint開始消費日誌。
String
否
false
true:必須同時指定消費組,Flink程式會從消費組中儲存的Checkpoint開始消費日誌,如果該消費組沒有對應的Checkpoint,則從startTime配置值開始消費。false(預設值):不從指定的消費組中儲存的Checkpoint開始消費日誌。
重要Realtime Compute引擎VVR 11.1版本開始不再支援配置該參數。對於VVR 11.1及更高版本,需要將
startupMode配置為consumer_group。maxRetries
讀取SLS失敗後重試次數。
String
否
3
無。
batchGetSize
單次請求讀取logGroup的個數。
String
否
100
batchGetSize設定不能超過1000,否則會報錯。exitAfterFinish
在資料消費完成後,Flink程式是否退出。
String
否
false
true:資料消費完後,Flink程式退出。false(預設):資料消費完後,Flink程式不退出。
query
重要自VVR 11.3起廢棄,後續版本仍相容。
SLS消費預先處理語句。
String
否
無
通過使用query參數,您可以在消費SLS資料之前對其進行過濾,以避免將所有資料都消費到Flink中,從而實現節約成本和提高處理速度的目的。
例如
'query' = '*| where request_method = ''GET'''表示在Flink讀取SLS資料前,先匹配出request_method欄位值等於get的資料。說明query需使用Log ServiceSPL語言,請參見SPL文法。
重要僅Realtime Compute引擎VVR 8.0.1及以上版本支援該參數。
該功能會產生Log ServiceSLS費用,詳情請參見費用說明。
processor
SLS 消費處理器,與query欄位同時存在時,query生效,processor不生效。
String
否
無
通過使用processor參數,您可以在消費SLS資料之前對其進行過濾,以避免將所有資料都消費到Flink中,從而實現節約成本和提高處理速度的目的。推薦使用processor參數而不是query參數。
例如
'processor' = 'test-filter-processor'表示在Flink讀取SLS資料前,先經過SLS消費處理器的過濾。重要僅Realtime Compute引擎VVR 11.3及以上版本支援該參數。
該功能會產生Log ServiceSLS費用,詳情請參見費用說明。
結果表專屬
參數
說明
資料類型
是否必填
預設值
備忘
topicField
指定欄位名,該欄位的值會覆蓋__topic__屬性欄位的值,表示日誌的主題。
String
否
無
該參數值是表中已存在的欄位之一。
timeField
指定欄位名,該欄位的值會覆蓋__timestamp__屬性欄位的值,表示日誌寫入時間。
String
否
目前時間
該參數值是表中已存在的欄位之一,且欄位類型必須為INT。如果未指定,則預設填充目前時間。
sourceField
指定欄位名,該欄位的值會覆蓋__source__屬性欄位的值,表示日誌的來源地,例如產生該日誌機器的IP地址。
String
否
無
該參數值是表中已存在的欄位之一。
partitionField
指定欄位名,資料寫入時會根據該列值計算Hash值,Hash值相同的資料會寫入同一個shard。
String
否
無
如果未指定,則每條資料會隨機寫入當前可用的Shard中。
buckets
當指定partitionField時,根據Hash值重新分組的個數。
String
否
64
該參數的取值範圍是[1, 256],且必須是2的整數次冪。同時,buckets個數應當大於等於Shard個數,否則會出現部分Shard沒有資料寫入的情況。
flushIntervalMs
觸發資料寫入的時間周期。
String
否
2000
單位為毫秒。
writeNullProperties
是否將null值作為空白字串寫入SLS。
Boolean
否
true
true(預設值):將null值作為空白字串寫入日誌。false:計算結果為null的欄位不會寫入到日誌中。
說明僅Realtime Compute引擎VVR 8.0.6及以上版本支援該參數。
屬性欄位提取
除日誌欄位外,支援提取如下四個屬性欄位,也支援提取其它自訂欄位。
|
欄位名 |
欄位類型 |
欄位說明 |
|
__source__ |
STRING METADATA VIRTUAL |
訊息源。 |
|
__topic__ |
STRING METADATA VIRTUAL |
訊息主題。 |
|
__timestamp__ |
BIGINT METADATA VIRTUAL |
日誌時間。 |
|
__tag__ |
MAP<VARCHAR, VARCHAR> METADATA VIRTUAL |
訊息TAG。 對於屬性 |
屬性欄位的提取需要添加HEADER聲明,樣本如下:
create table sls_stream(
__timestamp__ bigint HEADER,
__receive_time__ bigint HEADER
b int,
c varchar
) with (
'connector' = 'sls',
'endpoint' ='cn-hangzhou.log.aliyuncs.com',
'accessId' = '${secret_values.ak_id}',
'accessKey' = '${secret_values.ak_secret}',
'starttime' = '2023-08-30 00:00:00',
'project' ='sls-test',
'logstore' ='sls-input'
);
相關文檔
使用Flink DataStream API消費資料,請參見DataStream API。