This topic provides examples of how to use the Simple Log Service SDK to consume logs based on Simple Log Service Processing Language (SPL) statements.
Prerequisites
A Resource Access Management (RAM) user is created and granted the required permissions.
The ALIBABA_CLOUD_ACCESS_KEY_ID and ALIBABA_CLOUD_ACCESS_KEY_SECRET environment variables are configured. For more information, see Configure environment variables in Linux, macOS, and Windows.
ImportantThe AccessKey pair of an Alibaba Cloud account has permissions on all API operations. We recommend that you use the AccessKey pair of a RAM user to call API operations or perform routine O&M.
Do not include your AccessKey ID or AccessKey secret in your project code. If either is leaked, the security of all resources in your account may be compromised.
Sample code
Java
Install the Simple Log Service SDK: In the root directory of your Java project, open the
pom.xml
file and add the following Maven dependencies. For more information, see Install Simple Log Service SDK for Java.<dependency> <groupId>com.google.protobuf</groupId> <artifactId>protobuf-java</artifactId> <version>2.5.0</version> </dependency> <dependency> <groupId>com.aliyun.openservices</groupId> <artifactId>aliyun-log</artifactId> <version>0.6.120</version> </dependency>
Create the
PullLogsWithSPLDemo.java
file. In this example, call the PullLog operation to read log data and complete the demo of consuming log data based on SPL using the Java SDK. Fill in the query field using SPL statements.import com.aliyun.openservices.log.Client; import com.aliyun.openservices.log.common.*; import com.aliyun.openservices.log.common.Consts; import com.aliyun.openservices.log.exception.LogException; import com.aliyun.openservices.log.request.PullLogsRequest; import com.aliyun.openservices.log.response.ListShardResponse; import com.aliyun.openservices.log.response.PullLogsResponse; import java.util.HashMap; import java.util.List; import java.util.Map; public class PullLogsWithSPLDemo { // Specify a Simple Log Service endpoint. In this example, the Simple Log Service endpoint for the Singapore region is used. Replace this parameter value with your actual endpoint. private static final String endpoint = "ap-southeast-1.log.aliyuncs.com"; // Obtain an AccessKey ID and an AccessKey secret from environment variables. private static final String accessKeyId = System.getenv("ALIBABA_CLOUD_ACCESS_KEY_ID"); private static final String accessKeySecret = System.getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET"); // The name of the project. Replace the value with the actual project name. private static final String project = "ali-project-test"; // The name of the logstore. Replace the value with the actual logstore name. private static final String logStore = "test-logstore"; public static void main(String[] args) throws Exception { // Create a Simple Log Service client. Client client = new Client(endpoint, accessKeyId, accessKeySecret); // Query the shards of the logstore. ListShardResponse resp = client.ListShard(project, logStore); System.out.printf("%s has %d shards\n", logStore, resp.GetShards().size()); Map<Integer, String> cursorMap = new HashMap<>(); for (Shard shard : resp.GetShards()) { int shardId = shard.getShardId(); // Use the BEGIN cursor or obtain a specific cursor to consume log data. (If you want to consume log data from the end, use Consts.CursorMode.END). cursorMap.put(shardId, client.GetCursor(project, logStore, shardId, Consts.CursorMode.BEGIN).GetCursor()); } try { while (true) { // Obtain log data from each shard. for (Shard shard : resp.GetShards()) { int shardId = shard.getShardId(); PullLogsRequest request = new PullLogsRequest(project, logStore, shardId, 1000, cursorMap.get(shardId)); request.setQuery("* | where cast(body_bytes_sent as bigint) > 14000"); request.setPullMode("scan_on_stream"); PullLogsResponse response = client.pullLogs(request); // Obtain logs from log groups by logic. Logs are usually stored in log groups. List<LogGroupData> logGroups = response.getLogGroups(); System.out.printf("Get %d logGroup from logstore:%s:\tShard:%d\n", logGroups.size(), logStore, shardId); // Move the cursor after the pulled logs are processed. cursorMap.put(shardId, response.getNextCursor()); } } } catch (LogException e) { System.out.println("error code :" + e.GetErrorCode()); System.out.println("error message :" + e.GetErrorMessage()); throw e; } } }
Run the main function to view the output.
Get 41 logGroup from logstore:test-logstore: Shard:0 Get 49 logGroup from logstore:test-logstore: Shard:1 Get 43 logGroup from logstore:test-logstore: Shard:0 Get 39 logGroup from logstore:test-logstore: Shard:1 ... ...
Go
Install the Simple Log Service SDK: Create the project directory spl_demo and run the following command in the directory. For more information, see Install Simple Log Service SDK for Go.
go get -u github.com/aliyun/aliyun-log-go-sdk
Create the main.go file in the spl_demo directory. Create a consumer group and start a consumer thread to allow consumers in the consumer group to consume data in the specified Logstore. Fill in the query field using SPL statements.
package main import ( "fmt" "time" "os" sls "github.com/aliyun/aliyun-log-go-sdk" ) func main() { client := &sls.Client{ AccessKeyID: os.Getenv("ALIBABA_CLOUD_ACCESS_KEY_ID"), AccessKeySecret: os.Getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET"), Endpoint: "ap-southeast-1.log.aliyuncs.com", } project := "ali-project-test" logstore := "test-logstore" initCursor := "end" query := "* | where cast(body_bytes_sent as bigint) > 14000" shards, err := client.ListShards(project, logstore) if err != nil { fmt.Println("ListShards error", err) return } shardCursorMap := map[int]string{} for _, shard := range shards { cursor, err := client.GetCursor(project, logstore, shard.ShardID, initCursor) if err != nil { fmt.Println("GetCursor error", shard.ShardID, err) return } shardCursorMap[shard.ShardID] = cursor } for { for _, shard := range shards { pullLogRequest := &sls.PullLogRequest{ Project: project, Logstore: logstore, ShardID: shard.ShardID, LogGroupMaxCount: 10, Query: query, Cursor: shardCursorMap[shard.ShardID], } lg, nextCursor, err := client.PullLogsV2(pullLogRequest) fmt.Println("shard: ", shard.ShardID, "loggroups: ", len(lg.LogGroups), "nextCursor: ", nextCursor) if err != nil { fmt.Println("PullLogsV2 error", shard.ShardID, err) return } shardCursorMap[shard.ShardID] = nextCursor if len(lg.LogGroups) == 0 { // only for debug time.Sleep(time.Duration(3) * time.Second) } } } }
Run the main function to view the output.
shard: 0 loggroups: 41 nextCursor: MTY5Mz*******TIxNjcxMDcwMQ== shard: 1 loggroups: 49 nextCursor: MTY5Mz*******DYwNDIyNDQ2Mw== shard: 0 loggroups: 43 nextCursor: MTY5Mz*******TIxNjcxMDcwMQ== shard: 1 loggroups: 39 nextCursor: MTY5Mz*******DYwNDIyNDQ2Mw== ... ...
Python
Install the Simple Log Service SDK: Create the project directory spl_demo and run the following command in the directory. For more information, see Install Simple Log Service SDK for Python.
pip install -U aliyun-log-python-sdk
Create the main.py file in the spl_demo directory. Create a consumer group and start a consumer thread to allow consumers in the consumer group to consume data in the specified Logstore. Fill in the query field using SPL statements.
# encoding: utf-8 import time import os from aliyun.log import * def main(): # Specify a Simple Log Service endpoint. In this example, the Simple Log Service endpoint for the Singapore region is used. Replace this parameter value with your actual endpoint. endpoint = 'ap-southeast-1.log.aliyuncs.com' # Obtain an AccessKey ID and an AccessKey secret from environment variables. access_key_id = os.environ.get('ALIBABA_CLOUD_ACCESS_KEY_ID', '') access_key = os.environ.get('ALIBABA_CLOUD_ACCESS_KEY_SECRET', '') # The name of the project. Replace the value with the actual project name. project_name = 'ali-project-test' # The name of the logstore. Replace the value with the actual logstore name. logstore_name = 'test-logstore' query = '* | where cast(cdn_in as bigint) > 70' init_cursor = 'end' log_group_count = 10 # Create a Simple Log Service client. client = LogClient(endpoint, access_key_id, access_key) cursor_map = {} # List the shards of the logstore. res = client.list_shards(project_name, logstore_name) res.log_print() shards = res.shards # Obtain the initial cursor. for shard in shards: shard_id = shard.get('shardID') res = client.get_cursor(project_name, logstore_name, shard_id, init_cursor) cursor_map[shard_id] = res.get_cursor() # Loop to read data from each shard. while True: for shard in shards: shard_id = shard.get('shardID') res = client.pull_logs(project_name, logstore_name, shard_id, cursor_map.get(shard_id), log_group_count, query=query) res.log_print() if cursor_map[shard_id] == res.next_cursor: # only for debug time.sleep(3) else: cursor_map[shard_id] = res.next_cursor if __name__ == '__main__': main()
Run the main function to view the output.
ListShardResponse: headers: {'Server': 'AliyunSLS', 'Content-Type': 'application/json', 'Content-Length': '335', 'Connection': 'keep-alive', 'Access-Control-Allow-Origin': '*', 'Date': 'Wed, 26 Feb 2025 09:46:17 GMT', 'x-log-time': '1740563177', 'x-log-requestid': '67BEE2E9132069E22A1F967D'} res: [{'shardID': 0, 'status': 'readwrite', 'inclusiveBeginKey': '00000000000000000000000000000000', 'exclusiveEndKey': '80000000000000000000000000000000', 'createTime': 1737010019}, {'shardID': 1, 'status': 'readwrite', 'inclusiveBeginKey': '80000000000000000000000000000000', 'exclusiveEndKey': 'ffffffffffffffffffffffffffffffff', 'createTime': 1737010019}] PullLogResponse next_cursor MTczNz********c3ODgyMjQ0MQ== log_count 0 headers: {'Server': 'AliyunSLS', 'Content-Type': 'application/x-protobuf', 'Content-Length': '1', 'Connection': 'keep-alive', 'Access-Control-Allow-Origin': '*', 'Date': 'Wed, 26 Feb 2025 09:46:17 GMT', 'x-log-cursor-time': '0', 'x-log-end-of-cursor': '1', 'x-log-failedlines': '0', 'x-log-rawdatacount': '0', 'x-log-rawdatalines': '0', 'x-log-rawdatasize': '0', 'x-log-read-last-cursor': '0', 'x-log-resultlines': '0', 'x-log-time': '1740563177', 'x-log-bodyrawsize': '0', 'x-log-compresstype': 'gzip', 'x-log-count': '0', 'x-log-cursor': 'MTczNzAx********ODgyMjQ0MQ==', 'x-log-requestid': '67BEE2E974CA9ABCE7DDC7D6'} detail: [] PullLogResponse next_cursor MTczNz********c3OTg5NzE3NA== log_count 0 headers: {'Server': 'AliyunSLS', 'Content-Type': 'application/x-protobuf', 'Content-Length': '1', 'Connection': 'keep-alive', 'Access-Control-Allow-Origin': '*', 'Date': 'Wed, 26 Feb 2025 09:46:21 GMT', 'x-log-cursor-time': '0', 'x-log-end-of-cursor': '1', 'x-log-failedlines': '0', 'x-log-rawdatacount': '0', 'x-log-rawdatalines': '0', 'x-log-rawdatasize': '0', 'x-log-read-last-cursor': '0', 'x-log-resultlines': '0', 'x-log-time': '1740563181', 'x-log-bodyrawsize': '0', 'x-log-compresstype': 'gzip', 'x-log-count': '0', 'x-log-cursor': 'MTczNzAx********OTg5NzE3NA==', 'x-log-requestid': '67BEE2EDF2B58CF1756526EF'} detail: [] PullLogResponse ... ...