本文向您介紹使用SDK基於消費處理器(SPL)消費日誌的樣本。
前提條件
已建立RAM使用者並完成授權。具體操作,請參見建立RAM使用者及授權。
已配置環境變數ALIBABA_CLOUD_ACCESS_KEY_ID和ALIBABA_CLOUD_ACCESS_KEY_SECRET。具體操作,請參見在Linux、macOS和Windows系統配置環境變數。
重要阿里雲帳號的AccessKey擁有所有API的存取權限,建議您使用RAM使用者的AccessKey進行API訪問或日常營運。
強烈建議不要把AccessKey ID和AccessKey Secret儲存到工程代碼裡,否則可能導致AccessKey泄露,威脅您帳號下所有資源的安全。
程式碼範例
Java
安裝Log ServiceSDK:在Java專案的根目錄下,開啟
pom.xml檔案,添加以下Maven依賴。更多資訊,請參見安裝Java SDK。阿里雲Log ServiceSDK版本號碼不低於0.6.126。
<dependency> <groupId>com.google.protobuf</groupId> <artifactId>protobuf-java</artifactId> <version>2.5.0</version> </dependency> <!-- 引入阿里雲Log ServiceSDK --> <dependency> <groupId>com.aliyun.openservices</groupId> <artifactId>aliyun-log</artifactId> <version>0.6.126</version> </dependency>建立
PullLogsWithSPLDemo.java檔案,在本樣本中調用PullLog介面讀取日誌資料,完成使用Java SDK基於SPL消費日誌資料的示範。import com.aliyun.openservices.log.Client; import com.aliyun.openservices.log.common.*; import com.aliyun.openservices.log.request.PullLogsRequest; import com.aliyun.openservices.log.response.ListShardResponse; import com.aliyun.openservices.log.response.PullLogsResponse; import java.util.HashMap; import java.util.List; import java.util.Map; public class PullLogsWithSPLDemo { // Log Service的服務存取點。此處以杭州為例,其它地區請根據實際情況填寫。 private static final String endpoint = "cn-hangzhou.log.aliyuncs.com"; // 本樣本從環境變數中擷取 AccessKey ID 和 AccessKey Secret。 private static final String accessKeyId = System.getenv("ALIBABA_CLOUD_ACCESS_KEY_ID"); private static final String accessKeySecret = System.getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET"); // Project 名稱(需替換為實際Project名)。 private static final String project = "ali-project-test"; // LogStore 名稱(需替換為實際LogStore名)。 private static final String logStore = "test-logstore"; public static void main(String[] args) throws Exception { // processorName表示消費處理器的標識 String processorName = "processor-test"; // 建立Log Service Client。 Client client = new Client(endpoint, accessKeyId, accessKeySecret); // 查詢 LogStore 的 Shard。 ListShardResponse resp = client.ListShard(project, logStore); System.out.printf("%s has %d shards\n", logStore, resp.GetShards().size()); Map<Integer, String> cursorMap = new HashMap<>(); for (Shard shard : resp.GetShards()) { int shardId = shard.getShardId(); // 從頭開始消費,擷取遊標。(如果是從尾部開始消費,使用 Consts.CursorMode.END)。 cursorMap.put(shardId, client.GetCursor(project, logStore, shardId, Consts.CursorMode.BEGIN).GetCursor()); } try { while (true) { // 從每個Shard中擷取日誌。 for (Shard shard : resp.GetShards()) { int shardId = shard.getShardId(); PullLogsRequest request = new PullLogsRequest(project, logStore, shardId, 1000, cursorMap.get(shardId)); // processorName表示消費處理器的標識 request.setProcessor(processorName); PullLogsResponse response = client.pullLogs(request); // 日誌都在日誌組(LogGroup)中,按照邏輯拆分即可。 List<LogGroupData> logGroups = response.getLogGroups(); System.out.printf("Get %d logGroup from logstore:%s:\tShard:%d\n", logGroups.size(), logStore, shardId); // 完成處理拉取的日誌後,移動遊標。 cursorMap.put(shardId, response.getNextCursor()); } } } catch (LogException e) { System.out.println("error code :" + e.GetErrorCode()); System.out.println("error message :" + e.GetErrorMessage()); throw e; } } }運行Main函數,查看輸出結果。
Get 41 logGroup from logstore:test-logstore: Shard:0 Get 49 logGroup from logstore:test-logstore: Shard:1 Get 43 logGroup from logstore:test-logstore: Shard:0 Get 39 logGroup from logstore:test-logstore: Shard:1 ... ...
Python
安裝Log ServiceSDK:建立專案目錄spl_demo,在目錄下執行如下命令,更多資訊,請參見安裝Log ServicePython SDK。
阿里雲Log ServiceSDK版本號碼不低於0.9.28。
pip install -U aliyun-log-python-sdk在spl_demo目錄下建立main.py檔案。建立一個消費組並啟動一個消費者線程,該消費者會從指定的LogStore中消費資料。
# encoding: utf-8 import time import os from aliyun.log import * def main(): # Log Service的服務存取點。此處以杭州為例,其它地區請根據實際情況填寫。 endpoint = 'cn-hangzhou.log.aliyuncs.com' # 本樣本從環境變數中擷取AccessKey ID和AccessKey Secret。 access_key_id = os.environ.get('ALIBABA_CLOUD_ACCESS_KEY_ID', '') access_key = os.environ.get('ALIBABA_CLOUD_ACCESS_KEY_SECRET', '') # Project名稱(需替換為實際Project名)。 project_name = 'ali-project-test' # LogStore名稱(需替換為實際LogStore名)。 logstore_name = 'test-logstore' # processor表示消費處理器的標識 processor = "processor-test" init_cursor = 'end' log_group_count = 10 # 建立Log ServiceClient。 client = LogClient(endpoint, access_key_id, access_key) cursor_map = {} # 列舉logstore的shards res = client.list_shards(project_name, logstore_name) res.log_print() shards = res.shards # 擷取初始cursor for shard in shards: shard_id = shard.get('shardID') res = client.get_cursor(project_name, logstore_name, shard_id, init_cursor) cursor_map[shard_id] = res.get_cursor() # 迴圈讀取每個shard的資料 while True: for shard in shards: shard_id = shard.get('shardID') res = client.pull_logs(project_name, logstore_name, shard_id, cursor_map.get(shard_id), log_group_count, processor=processor) res.log_print() if cursor_map[shard_id] == res.next_cursor: # only for debug time.sleep(3) else: cursor_map[shard_id] = res.next_cursor if __name__ == '__main__': main()運行main函數,查看輸出結果。
ListShardResponse: headers: {'Server': 'AliyunSLS', 'Content-Type': 'application/json', 'Content-Length': '335', 'Connection': 'keep-alive', 'Access-Control-Allow-Origin': '*', 'Date': 'Wed, 26 Feb 2025 09:46:17 GMT', 'x-log-time': '1740563177', 'x-log-requestid': '67BEE2E9132069E22A1F967D'} res: [{'shardID': 0, 'status': 'readwrite', 'inclusiveBeginKey': '00000000000000000000000000000000', 'exclusiveEndKey': '80000000000000000000000000000000', 'createTime': 1737010019}, {'shardID': 1, 'status': 'readwrite', 'inclusiveBeginKey': '80000000000000000000000000000000', 'exclusiveEndKey': 'ffffffffffffffffffffffffffffffff', 'createTime': 1737010019}] PullLogResponse next_cursor MTczNz********c3ODgyMjQ0MQ== log_count 0 headers: {'Server': 'AliyunSLS', 'Content-Type': 'application/x-protobuf', 'Content-Length': '1', 'Connection': 'keep-alive', 'Access-Control-Allow-Origin': '*', 'Date': 'Wed, 26 Feb 2025 09:46:17 GMT', 'x-log-cursor-time': '0', 'x-log-end-of-cursor': '1', 'x-log-failedlines': '0', 'x-log-rawdatacount': '0', 'x-log-rawdatalines': '0', 'x-log-rawdatasize': '0', 'x-log-read-last-cursor': '0', 'x-log-resultlines': '0', 'x-log-time': '1740563177', 'x-log-bodyrawsize': '0', 'x-log-compresstype': 'gzip', 'x-log-count': '0', 'x-log-cursor': 'MTczNzAx********ODgyMjQ0MQ==', 'x-log-requestid': '67BEE2E974CA9ABCE7DDC7D6'} detail: [] PullLogResponse next_cursor MTczNz********c3OTg5NzE3NA== log_count 0 headers: {'Server': 'AliyunSLS', 'Content-Type': 'application/x-protobuf', 'Content-Length': '1', 'Connection': 'keep-alive', 'Access-Control-Allow-Origin': '*', 'Date': 'Wed, 26 Feb 2025 09:46:21 GMT', 'x-log-cursor-time': '0', 'x-log-end-of-cursor': '1', 'x-log-failedlines': '0', 'x-log-rawdatacount': '0', 'x-log-rawdatalines': '0', 'x-log-rawdatasize': '0', 'x-log-read-last-cursor': '0', 'x-log-resultlines': '0', 'x-log-time': '1740563181', 'x-log-bodyrawsize': '0', 'x-log-compresstype': 'gzip', 'x-log-count': '0', 'x-log-cursor': 'MTczNzAx********OTg5NzE3NA==', 'x-log-requestid': '67BEE2EDF2B58CF1756526EF'} detail: [] PullLogResponse ... ...
Go
安裝Log ServiceSDK:建立專案目錄spl_demo,在目錄下執行如下命令,更多資訊,請參見安裝Go SDK。
阿里雲Log ServiceSDK版本號碼不低於v0.1.107。
go get -u github.com/aliyun/aliyun-log-go-sdk在spl_demo目錄下建立main.go檔案。建立一個消費組並啟動一個消費者線程,該消費者會從指定的LogStore中消費資料。
package main import ( "fmt" "os" "time" sls "github.com/aliyun/aliyun-log-go-sdk" ) func main() { client := &sls.Client{ AccessKeyID: os.Getenv("ALIBABA_CLOUD_ACCESS_KEY_ID"), AccessKeySecret: os.Getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET"), Endpoint: "cn-chengdu.log.aliyuncs.com", } project := "ali-project-test" logstore := "test-logstore" initCursor := "end" // consumeProcessor表示消費處理器的標識 consumeProcessor := "ali-test-consume-processor" shards, err := client.ListShards(project, logstore) if err != nil { fmt.Println("ListShards error", err) return } shardCursorMap := map[int]string{} for _, shard := range shards { cursor, err := client.GetCursor(project, logstore, shard.ShardID, initCursor) if err != nil { fmt.Println("GetCursor error", shard.ShardID, err) return } shardCursorMap[shard.ShardID] = cursor } for { for _, shard := range shards { pullLogRequest := &sls.PullLogRequest{ Project: project, Logstore: logstore, ShardID: shard.ShardID, LogGroupMaxCount: 10, Processor: consumeProcessor, Cursor: shardCursorMap[shard.ShardID], } lg, nextCursor, err := client.PullLogsV2(pullLogRequest) fmt.Println("shard: ", shard.ShardID, "loggroups: ", len(lg.LogGroups), "nextCursor: ", nextCursor) if err != nil { fmt.Println("PullLogsV2 error", shard.ShardID, err) return } shardCursorMap[shard.ShardID] = nextCursor if len(lg.LogGroups) == 0 { // only for debug time.Sleep(time.Duration(3) * time.Second) } } } }運行main函數,查看輸出結果
shard: 0 loggroups: 41 nextCursor: MTY5Mz*******TIxNjcxMDcwMQ== shard: 1 loggroups: 49 nextCursor: MTY5Mz*******DYwNDIyNDQ2Mw== shard: 0 loggroups: 43 nextCursor: MTY5Mz*******TIxNjcxMDcwMQ== shard: 1 loggroups: 39 nextCursor: MTY5Mz*******DYwNDIyNDQ2Mw== ... ...