通過配置Log ServiceSLS觸發器,您可以實現Log ServiceSLS與Function Compute的整合。SLS觸發器能夠在新日誌產生時自動觸發函數執行,從而增量消費Log ServiceLogstore的資料,並完成自訂加工任務。
使用情境
資料清洗、加工情境
通過Log Service,快速完成日誌採集、加工、查詢、分析。
資料投遞情境
為資料的目的端落地提供支撐,構建雲上巨量資料產品間的資料管道。
資料加工函數
函數類型
模板函數
更多資訊,請參見aliyun-log-fc-functions。
自訂函數
函數配置的格式與函數的具體實現有關。更多資訊,請參見ETL函數開發指南。
Function Compute觸發機制
Log ServiceETL Job對應於Function Compute的一個觸發器,當建立Log ServiceETL Job後,Log Service會根據該ETL Job的配置啟動定時器,定時器輪詢Logstore中的Shard資訊,當發現有新的資料寫入時,產生<shard_id,begin_cursor,end_cursor >三元組作為函數Event,並觸發函數執行。
當儲存系統升級時,即使沒有新資料寫入,也可能發生Cursor變化,在這種情況下,每個Shard會額外空觸發一次。此時,您可以在函數內通過Cursor嘗試擷取Shard的資料,如果擷取不到資料說明是一次空觸發,可以在函數內做忽略處理。更多資訊,請參見自訂函數開發指南。
Log Service的ETL任務觸發機制是時間觸發。例如:您設定的ETL Job觸發間隔為60秒,Logstore的Shard0一直有資料寫入,那麼Shard每60秒就會觸發一次函數執行(如果Shard沒有新的資料寫入則不會觸發函數執行),函數執行的輸入為最近60秒的Cursor區間。在函數內,可以根據Cursor讀取Shard0資料進行下一步處理。
使用限制
單個記錄項目(Project)關聯的SLS觸發器數量最大不得超過該Project下已有的Logstore數量的5倍。
建議每個Logstore配置的SLS觸發器數量不超過5個,否則可能會影響資料投遞到Function Compute的效率。
樣本情境
您可以配置一個SLS觸發器,該觸發器將定時擷取更新的資料並觸發函數執行,增量消費Log ServiceLogstore中的資料。在函數裡完成自訂加工任務(例如資料清洗和加工)以及將資料投遞給第三方服務。本樣本中只示範如何擷取日誌資料並列印。
用於資料加工的函數可以是Log Service提供的模板,也可以是您的自訂函數。
前提條件
Function Compute
Log ServiceSLS
您需要建立一個Project和兩個Logstore。一個Logstore用於儲存採集到的日誌(Function Compute基於增量日誌觸發,必須確保可以持續採集到日誌),另一個Logstore用於儲存SLS觸發器產生的日誌。
記錄項目(Project)所在地區和Function Compute服務所在地區必須一致。
入口參數說明
eventSLS觸發器觸發後將事件數目據傳遞給運行時,運行時將事件轉換為一個JSON對象,並將該對象傳遞給函數的入口參數
event。其格式如下所示:{ "parameter": {}, "source": { "endpoint": "http://cn-hangzhou-intranet.log.aliyuncs.com", "projectName": "fc-test-project", "logstoreName": "fc-test-logstore", "shardId": 0, "beginCursor": "MTUyOTQ4MDIwOTY1NTk3ODQ2Mw==", "endCursor": "MTUyOTQ4MDIwOTY1NTk3ODQ2NA==" }, "jobName": "1f7043ced683de1a4e3d8d70b5a412843d81****", "taskId": "c2691505-38da-4d1b-998a-f1d4bb8c****", "cursorTime": 1529486425 }參數說明如下所示:
參數
說明
parameter
您配置觸發器時填寫的調用參數的值。
source
設定函數讀取的日誌塊資訊。
endpoint:Log ServiceProject所屬的阿里雲地區。
projectName:Log ServiceProject名稱。
logstoreName:Function Compute要消費的Logstore名稱,當前觸發器會定時從該日誌庫中訂閱資料到函數服務進行自訂加工。
shardId:Logstore中一個確定的Shard。
beginCursor:開始消費資料的位置。
endCursor:停止消費資料的位置。
說明在函數調試的時候,您可以調用GetCursor - 通過時間查詢Cursor介面擷取beginCursor和endCursor,並按上述樣本構建一個函數Event用於測試。
jobName
Log ServiceETL Job名字,函數配置的SLS觸發器對應一個Log Service的ETL Job。
此參數由Function Compute自動產生,使用者無需配置。
taskId
對於ETL Job而言,taskId是一個確定性函數調用標識。
此參數由Function Compute自動產生,使用者無需配置。
cursorTime
最後一條日誌到達Log Service端的Unix時間戳記,單位:秒。
context當Function Compute運行您的函數時,會將內容物件傳遞函數的入口參數
context。該對象包含有關調用、服務、函數和執行環境等資訊。本文通過
context.credentials擷取密鑰資訊,如需瞭解更多欄位資訊,請參見上下文。
步驟一:建立SLS觸發器
登入Function Compute控制台,在左側導覽列,選擇。
在頂部功能表列,選擇地區,然後在函數列表頁面,單擊目標函數。
在函數詳情頁面,選擇觸發器頁簽,單擊建立觸發器,在建立觸發程序面板,觸發器類型選擇Log Service SLS,設定其他配置項,然後單擊確定。
配置項
操作
樣本
名稱
填寫自訂的觸發器名稱。如果不填,Function Compute將自動產生觸發器名稱。
log_trigger
版本或別名
預設值為LATEST,如果您需要建立其他版本或別名的觸發器,需先在函數詳情頁的右上方切換到該版本或別名。關於版本和別名的簡介,請參見版本管理和別名管理。
LATEST
記錄項目
選擇需要消費的Log ServiceProject。
aliyun-fc-cn-hangzhou-2238f0df-a742-524f-9f90-976ba457****
日誌庫
選擇需要消費的Logstore。當前觸發器會定時從該日誌庫中訂閱資料到函數服務進行自訂加工。
function-log
觸發間隔
填寫Log Service觸發函數啟動並執行時間間隔。
取值範圍:[3,600],單位:秒。預設值:60。
60
重試次數
填寫單次觸發允許的最大重試次數。
取值範圍:[0,100],預設值:3。
說明執行成功的情況為status=200並且header中參數
X-Fc-Error-Type的值不是UnhandledInvocationError和HandledInvocationError的錯誤。其他情況表示執行失敗,會觸發重試。關於參數X-Fc-Error-Type請參見返回資料。如果函數執行失敗,會一直重試當前請求,直到函數執行成功。首先會按照配置的重試次數進行重試,超過最大重試次數仍然無法成功的,會增加時間間隔進入退避重試。
3
觸發器日誌
選擇已建立的日誌庫,Log Service觸發函數執行過程的日誌會記錄到該日誌庫中。
function-log2
調用參數
如果您想傳入自訂參數,可以在此處配置。該參數將作為event的parameter參數傳入函數。該參數取值必須是JSON格式的字串。
預設值為空白。
無
角色名稱
選擇AliyunLogETLRole。
說明如果您第一次建立該類型的觸發器,則需要在單擊確定後,在彈出的對話方塊中選擇立即授權。
AliyunLogETLRole
建立完成後,在觸發器名稱列表中顯示已建立的觸發器。如需對建立的觸發器進行修改或刪除,具體操作,請參見觸發器管理。
步驟二:配置許可權
在函數詳情頁面,選擇配置頁簽,在進階配置地區,單擊編輯,然後在進階配置面板,選擇函數角色。
您可以選擇使用預設角色AliyunFCServerlessDevsRole,此角色預設具有Log Service唯讀許可權。
您也可以自訂一個RAM角色,自訂RAM角色必須滿足以下兩點要求:
建立RAM角色時,信任主體需選擇雲端服務,且受信服務選擇Function Compute。具體操作,請參見建立可信實體為阿里雲服務的RAM角色。
請根據函數中具體需求為RAM角色授予必要Log Service許可權。更多資訊,請參見RAM自訂授權樣本。
完成後單擊部署。
步驟三:部署函數並查看列印的日誌
在函數詳情頁面的代碼頁簽,在代碼編輯器中編寫代碼,然後單擊部署代碼。
本樣本部署一個Python函數,實現以下功能。
從
event中擷取endpoint、projectName、logstoreName和beginCursor等SLS事件觸發相關資訊,從
context中擷取授權資訊accessKeyId、accessKey和securityToken。根據以上擷取的資訊,初始化SLS用戶端。
從源Logstore擷取指定遊標(Cursor)位置的日誌資料。
說明以下範例程式碼可以作為提取大部分邏輯日誌的模板。
""" 本代碼範例主要實現以下功能: * 從 event 中解析出 SLS 事件觸發相關資訊 * 根據以上擷取的資訊,初始化 SLS 用戶端 * 從源 log store 擷取即時日誌資料 This sample code is mainly doing the following things: * Get SLS processing related information from event * Initiate SLS client * Pull logs from source log store """ #!/usr/bin/env python # -*- coding: utf-8 -*- import logging import json import os from aliyun.log import LogClient logger = logging.getLogger() def handler(event, context): # 可以通過 context.credentials 擷取密鑰資訊 # Access keys can be fetched through context.credentials print("The content in context entity is: ", context) creds = context.credentials access_key_id = creds.access_key_id access_key_secret = creds.access_key_secret security_token = creds.security_token # 解析 event 參數至 object 格式 # parse event in object event_obj = json.loads(event.decode()) print("The content in event entity is: ", event_obj) # 從 event.source 中擷取記錄項目名稱、日誌倉庫名稱、Log Service訪問 endpoint、日誌起始遊標、日誌終點遊標以及分區 id # Get the name of log project, the name of log store, the endpoint of sls, begin cursor, end cursor and shardId from event.source source = event_obj['source'] log_project = source['projectName'] log_store = source['logstoreName'] endpoint = source['endpoint'] begin_cursor = source['beginCursor'] end_cursor = source['endCursor'] shard_id = source['shardId'] # 初始化 sls 用戶端 # Initialize client of sls client = LogClient(endpoint=endpoint, accessKeyId=access_key_id, accessKey=access_key_secret, securityToken=security_token) # 基於日誌的遊標從源日誌庫中讀取日誌,本樣本中的遊標範圍包含了觸發本次執行的所有日誌內容 # Read data from source logstore within cursor: [begin_cursor, end_cursor) in the example, which contains all the logs trigger the invocation while True: response = client.pull_logs(project_name=log_project, logstore_name=log_store, shard_id=shard_id, cursor=begin_cursor, count=100, end_cursor=end_cursor, compress=False) log_group_cnt = response.get_loggroup_count() if log_group_cnt == 0: break logger.info("get %d log group from %s" % (log_group_cnt, log_store)) logger.info(response.get_loggroup_list()) begin_cursor = response.get_next_cursor() return 'success'在函數詳情頁面,選擇,查看函數運行時,擷取最新資料。如果當前函數尚未啟用日誌功能,請單擊一鍵啟用。
至此,您已完成SLS觸發器的配置。如果您需在控制台調試代碼,請繼續完成下面的步驟。
(可選)步驟四:使用類比事件測試函數
在函數詳情頁面的代碼頁簽,單擊測試函數右側的
表徵圖,從下拉式清單中,選擇配置測試參數。在配置測試參數面板,選擇建立新測試事件或編輯已有測試事件,填寫事件名稱和事件內容,然後單擊確定。如果是建立新測試事件,建議事件模板選擇Log Service。測試資料的具體配置請參見event。
虛擬事件配置完成後,單擊測試函數。
執行完成後,您可以在函數代碼頁簽的上方查看執行結果。
常見問題
在有新日誌產生時,SLS觸發器未觸發函數執行,如何解決?
您可以從以下兩個方面排查。
確認Function Compute觸發器任務配置的Logstore是否有資料增量修改,當Shard資料有變化時會觸發函數執行。
查看觸發器日誌、函數作業記錄查看是否有異常。
為什麼SLS觸發器觸發函數執行的頻次有時高於預期?
每個Shard是單獨觸發的,您看到的可能是一個Logstore整體觸發次數很多,但每個Shard即時觸發時間是符合間隔的。
單個Shard的觸發間隔和每次處理的資料範圍相同(時間區間)。觸發間隔在函數執行時分如下兩種情況,假設觸發間隔為60秒。
觸發沒有延遲:按照設定周期觸發,每60秒觸發一次,處理的資料範圍為
[now -60s, now)。說明函數觸發是分Shard獨立進行的, 假設Logstore有10個Shard,在即時處理資料時(觸發無延遲),每60秒對應10次函數觸發執行。
觸發發生延遲(當前處理到的Log ServiceShard位置落後於最新寫入資料超過10秒):觸發器會進行追趕,可能縮短到2秒觸發一次,每次處理的資料範圍仍是60秒視窗。
denied by sts or ram, action: log:GetCursorOrData, resource: ****
如果函數日誌中出現以上錯誤,可能是因為函數未配置許可權,或權限原則設定不正確引起。請參見步驟二:配置許可權。