通過消費組(ConsumerGroup)消費資料無需關注Log Service的實現細節及消費者之間的負載平衡、容錯移轉(Failover)等,只需要專註於商務邏輯。本文介紹通過Java消費組、Go消費組和Python消費組的方式,設定SPL語句來消費LogStore中的資料。
前提條件
已建立RAM使用者並完成授權。具體操作,請參見建立RAM使用者及授權。
已配置環境變數ALIBABA_CLOUD_ACCESS_KEY_ID和ALIBABA_CLOUD_ACCESS_KEY_SECRET。具體操作,請參見在Linux、macOS和Windows系統配置環境變數。
重要阿里雲帳號的AccessKey擁有所有API的存取權限,建議您使用RAM使用者的AccessKey進行API訪問或日常營運。
強烈建議不要把AccessKey ID和AccessKey Secret儲存到工程代碼裡,否則可能導致AccessKey泄露,威脅您帳號下所有資源的安全。
程式碼範例
Java
添加Maven依賴。
com.aliyun.openservices包要求版本號碼不低於0.6.51。
<dependency> <groupId>com.google.protobuf</groupId> <artifactId>protobuf-java</artifactId> <version>2.5.0</version> </dependency> <dependency> <groupId>com.aliyun.openservices</groupId> <artifactId>loghub-client-lib</artifactId> <version>0.6.51</version> </dependency>建立消費邏輯,
SPLLogHubProcessor.java代碼如下所示。import com.aliyun.openservices.log.common.FastLog; import com.aliyun.openservices.log.common.FastLogContent; import com.aliyun.openservices.log.common.FastLogGroup; import com.aliyun.openservices.log.common.FastLogTag; import com.aliyun.openservices.log.common.LogGroupData; import com.aliyun.openservices.loghub.client.ILogHubCheckPointTracker; import com.aliyun.openservices.loghub.client.exceptions.LogHubCheckPointException; import com.aliyun.openservices.loghub.client.interfaces.ILogHubProcessor; import java.util.List; public class SPLLogHubProcessor implements ILogHubProcessor { private int shardId; // 記錄上次持久化Checkpoint的時間。 private long mLastSaveTime = 0; // initialize 方法會在 processor 對象初始化時被調用一次 public void initialize(int shardId) { this.shardId = shardId; } // 消費資料的主邏輯,消費時的所有異常都需要處理,不能直接拋出。 public String process(List<LogGroupData> logGroups, ILogHubCheckPointTracker checkPointTracker) { // 列印已擷取的資料。 for (LogGroupData logGroup : logGroups) { FastLogGroup fastLogGroup = logGroup.GetFastLogGroup(); System.out.println("Tags"); for (int i = 0; i < fastLogGroup.getLogTagsCount(); ++i) { FastLogTag logTag = fastLogGroup.getLogTags(i); System.out.printf("%s : %s\n", logTag.getKey(), logTag.getValue()); } for (int i = 0; i < fastLogGroup.getLogsCount(); ++i) { FastLog log = fastLogGroup.getLogs(i); System.out.println("--------\nLog: " + i + ", time: " + log.getTime() + ", GetContentCount: " + log.getContentsCount()); for (int j = 0; j < log.getContentsCount(); ++j) { FastLogContent content = log.getContents(j); System.out.println(content.getKey() + "\t:\t" + content.getValue()); } } } long curTime = System.currentTimeMillis(); // 每隔30秒,寫一次Checkpoint到服務端。如果30秒內發生Worker異常終止,新啟動的Worker會從上一個Checkpoint擷取消費資料,可能存在少量的重複資料。 try { if (curTime - mLastSaveTime > 30 * 1000) { // 參數為true表示立即手動將Checkpoint更新到服務端。此外,預設每60秒會自動將記憶體中緩衝的Checkpoint更新到服務端。 checkPointTracker.saveCheckPoint(true); mLastSaveTime = curTime; } else { // 參數為false表示將Checkpoint緩衝在本地,可被自動更新Checkpoint機制更新到服務端。 checkPointTracker.saveCheckPoint(false); } } catch (LogHubCheckPointException e) { e.printStackTrace(); } return null; } // 當Worker退出時,會調用該函數,您可以在此處執行清理工作。 public void shutdown(ILogHubCheckPointTracker checkPointTracker) { // 將Checkpoint立即儲存到服務端。 try { checkPointTracker.saveCheckPoint(true); } catch (LogHubCheckPointException e) { e.printStackTrace(); } } }建立消費者實體,
SPLLogHubProcessorFactory.java代碼如下所示。import com.aliyun.openservices.loghub.client.interfaces.ILogHubProcessor; import com.aliyun.openservices.loghub.client.interfaces.ILogHubProcessorFactory; class SPLLogHubProcessorFactory implements ILogHubProcessorFactory { public ILogHubProcessor generatorProcessor() { // 產生一個消費執行個體。注意:每次調用 generatorProcessor 方法,都應該返回一個新的 SPLLogHubProcessor 對象。 return new SPLLogHubProcessor(); } }建立一個消費者並啟動一個消費者線程,該消費者會從指定的LogStore中消費資料。代碼如下所示。
import com.aliyun.openservices.loghub.client.ClientWorker; import com.aliyun.openservices.loghub.client.config.LogHubConfig; import com.aliyun.openservices.loghub.client.exceptions.LogHubClientWorkerException; public class Main { // Log Service的服務存取點,請您根據實際情況填寫。 private static String Endpoint = "cn-hangzhou.log.aliyuncs.com"; // Log Service專案名稱,請您根據實際情況填寫。請從已建立專案中擷取專案名稱。 private static String Project = "ali-test-project"; // 日誌庫名稱,請您根據實際情況填寫。請從已建立日誌庫中擷取日誌庫名稱。 private static String Logstore = "ali-test-logstore"; // 消費組名稱,請您根據實際情況填寫。您無需提前建立,該程式運行時會自動建立該消費組。 private static String ConsumerGroup = "ali-test-consumergroup2"; // 新增內容 // 消費處理器名稱:請在控制台或者通過消費處理開放API建立 private static String ConsumeProcessor = "test-consumer-processor"; // 新增內容結束 // 本樣本從環境變數中擷取AccessKey ID和AccessKey Secret。。 private static String AccessKeyId = System.getenv("ALIBABA_CLOUD_ACCESS_KEY_ID"); private static String AccessKeySecret = System.getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET"); public static void main(String[] args) throws LogHubClientWorkerException, InterruptedException { // consumer_1是消費者名稱,同一個消費組下面的消費者名稱必須不同。不同消費者在多台機器上啟動多個進程,均衡消費一個Logstore時,消費者名稱可以使用機器IP地址來區分。 // maxFetchLogGroupSize用於設定每次從服務端擷取的LogGroup最大數目,使用預設值即可。您可以使用config.setMaxFetchLogGroupSize(100);調整,取值範圍為(0,1000]。 LogHubConfig config = new LogHubConfig(ConsumerGroup, "consumer_1", Endpoint, Project, Logstore, AccessKeyId, AccessKeySecret, LogHubConfig.ConsumePosition.BEGIN_CURSOR, 1000); // setProcessor可以設定消費過程中的SLS SPL語句 config.setProcessor(ConsumeProcessor); ClientWorker worker = new ClientWorker(new SPLLogHubProcessorFactory(), config); Thread thread = new Thread(worker); // Thread運行之後,ClientWorker會自動運行,ClientWorker擴充了Runnable介面。 thread.start(); Thread.sleep(60 * 60 * 1000); // 調用Worker的Shutdown函數,退出消費執行個體,關聯的線程也會自動停止。 worker.shutdown(); // ClientWorker運行過程中會產生多個非同步任務。Shutdown完成後,請等待還在執行的任務安全退出。建議設定sleep為30秒。 Thread.sleep(30 * 1000); } }運行
Main.java,以類比消費Nginx日誌為例,列印日誌如下。: GET request_uri : /request/path-3/file-7 status : 200 body_bytes_sent : 3820 host : www.example.com request_time : 43 request_length : 1987 http_user_agent : Mozilla/5.0 (Windows NT 6.1) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/41.0.2228.0 Safari/537.36 http_referer : www.example.com http_x_forwarded_for : 192.168.xx.xxx upstream_response_time : 0.02 -------- Log: 158, time: 1635629778, GetContentCount: 14 ...... category : null source : 127.0.0.1 topic : nginx_access_log machineUUID : null Tags __receive_time__ : 1635629815 -------- Log: 0, time: 1635629788, GetContentCount: 14 ...... category : null source : 127.0.0.1 topic : nginx_access_log
Python
安裝Log ServiceSDK:建立專案目錄spl_consumer_demo,在目錄下執行如下命令,更多資訊,請參見安裝Log ServicePython SDK。
阿里雲Log ServiceSDK版本號碼不低於0.9.28。
pip install -U aliyun-log-python-sdk在spl_consumer_demo目錄下建立main.py檔案。建立一個消費組並啟動一個消費者線程,該消費者會從指定的LogStore中消費資料。
import os import time from aliyun.log.consumer import * from aliyun.log import * class SPLConsumer(ConsumerProcessorBase): shard_id = -1 last_check_time = 0 def initialize(self, shard): self.shard_id = shard def process(self, log_groups, check_point_tracker): for log_group in log_groups.LogGroups: items = [] for log in log_group.Logs: item = dict() item['time'] = log.Time for content in log.Contents: item[content.Key] = content.Value items.append(item) log_items = dict() log_items['topic'] = log_group.Topic log_items['source'] = log_group.Source log_items['logs'] = items print(log_items) current_time = time.time() if current_time - self.last_check_time > 3: try: self.last_check_time = current_time check_point_tracker.save_check_point(True) except Exception: import traceback traceback.print_exc() else: try: check_point_tracker.save_check_point(False) except Exception: import traceback traceback.print_exc() # None means succesful process # if need to roll-back to previous checkpoint,return check_point_tracker.get_check_point() return None def shutdown(self, check_point_tracker): try: check_point_tracker.save_check_point(True) except Exception: import traceback traceback.print_exc() def sleep_until(seconds, exit_condition=None, expect_error=False): if not exit_condition: time.sleep(seconds) return s = time.time() while time.time() - s < seconds: try: if exit_condition(): break except Exception: if expect_error: continue time.sleep(1) def spl_consumer_group(): # Log Service的服務存取點。此處以杭州為例,其它地區請根據實際情況填寫。 endpoint = os.environ.get('ALIYUN_LOG_SAMPLE_ENDPOINT', 'cn-hangzhou.log.aliyuncs.com') # 本樣本從環境變數中擷取AccessKey ID和AccessKey Secret。 access_key_id = os.environ.get('ALIBABA_CLOUD_ACCESS_KEY_ID', '') access_key = os.environ.get('ALIBABA_CLOUD_ACCESS_KEY_SECRET', '') project = 'your_project' logstore = 'your_logstore' # 消費組名稱。您無需提前建立,SDK會自動建立該消費組。 consumer_group = 'consumer-group' consumer_name = "consumer-group-name" # 在消費組中建立2個消費者消費資料。 option = LogHubConfig(endpoint, access_key_id, access_key, project, logstore, consumer_group, consumer_name, processor="test-consume-processor", cursor_position=CursorPosition.BEGIN_CURSOR, heartbeat_interval=6, data_fetch_interval=1) print("*** start to consume data...") client_worker = ConsumerWorker(SPLConsumer, consumer_option=option) client_worker.start() time.sleep(10000) if __name__ == '__main__': spl_consumer_group()在spl_consumer_demo目錄下運行main.py,查看結果。
python main.py
Go
安裝Log ServiceSDK:建立專案目錄spl_demo,在目錄下執行如下命令,更多資訊,請參見安裝Go SDK。
阿里雲Log ServiceSDK版本號碼不低於v0.1.107。
go get -u github.com/aliyun/aliyun-log-go-sdk在spl_demo目錄下建立main.go檔案。建立一個消費組並啟動一個消費者線程,該消費者會從指定的LogStore中消費資料。
package main import ( "fmt" "os" "os/signal" "syscall" sls "github.com/aliyun/aliyun-log-go-sdk" consumerLibrary "github.com/aliyun/aliyun-log-go-sdk/consumer" "github.com/go-kit/kit/log/level" ) // README : // This is a very simple example of pulling data from your logstore and printing it for consumption, including pre-handling for logs. func main() { // Log Service的服務存取點。此處以杭州為例,其它地區請根據實際情況填寫。 option := consumerLibrary.LogHubConfig{ Endpoint: "cn-hangzhou.log.aliyuncs.com", AccessKeyID: os.Getenv("ALIBABA_CLOUD_ACCESS_KEY_ID"), AccessKeySecret: os.Getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET"), Project: "your_project", Logstore: "your_logstore", ConsumerGroupName: "test-spl-cg", ConsumerName: "test-spl-consumer", // This options is used for initialization, will be ignored once consumer group is created and each shard has been started to be consumed. // Could be "begin", "end", "specific time format in time stamp", it's log receiving time. CursorPosition: consumerLibrary.END_CURSOR, // Processor is for log pre-handling before return to client, more info refer to https://www.alibabacloud.com/help/zh/sls/user-guide/rule-based-consumption Processor: "test-consume-processor", } consumerWorker := consumerLibrary.InitConsumerWorkerWithCheckpointTracker(option, process) ch := make(chan os.Signal, 1) signal.Notify(ch, syscall.SIGINT, syscall.SIGTERM) consumerWorker.Start() if _, ok := <-ch; ok { level.Info(consumerWorker.Logger).Log("msg", "get stop signal, start to stop consumer worker", "consumer worker name", option.ConsumerName) consumerWorker.StopAndWait() } } // Fill in your consumption logic here, and be careful not to change the parameters of the function and the return value, // otherwise you will report errors. func process(shardId int, logGroupList *sls.LogGroupList, checkpointTracker consumerLibrary.CheckPointTracker) (string, error) { fmt.Println(shardId, "loggroup", len(logGroupList.LogGroups)) checkpointTracker.SaveCheckPoint(false) return "", nil }在spl_demo目錄下執行命令,安裝依賴。
go mod tidy go mod vendor運行main函數,查看輸出結果。
go run main.go