When using consumer groups to consume data, you can concentrate on your business logic rather than handling implementation details like load balancing and failover. This topic explains how to use the Simple Log Service SDK for Java, Python, or Go to consume logs from a logstore with a consumer group configured with a consumer processor.
Prerequisites
A Resource Access Management (RAM) user is created and granted the required permissions. For more information, see Create a RAM user and grant permissions.
The ALIBABA_CLOUD_ACCESS_KEY_ID and ALIBABA_CLOUD_ACCESS_KEY_SECRET environment variables are configured. For more information, see Configure environment variables in Linux, macOS, and Windows.
ImportantThe AccessKey pair of an Alibaba Cloud account has permissions on all API operations. We recommend that you use the AccessKey pair of a RAM user to call API operations or perform routine O&M.
Do not include your AccessKey ID or AccessKey secret in your project code. If either is leaked, the security of all resources in your account may be compromised.
Sample code
Java
Add Maven dependencies.
The com.aliyun.openservices package must be version 0.6.51 or later.
<dependency> <groupId>com.google.protobuf</groupId> <artifactId>protobuf-java</artifactId> <version>2.5.0</version> </dependency> <dependency> <groupId>com.aliyun.openservices</groupId> <artifactId>loghub-client-lib</artifactId> <version>0.6.51</version> </dependency>Create the data consumption logic. The following code is an example of
SPLLogHubProcessor.java.import com.aliyun.openservices.log.common.FastLog; import com.aliyun.openservices.log.common.FastLogContent; import com.aliyun.openservices.log.common.FastLogGroup; import com.aliyun.openservices.log.common.FastLogTag; import com.aliyun.openservices.log.common.LogGroupData; import com.aliyun.openservices.loghub.client.ILogHubCheckPointTracker; import com.aliyun.openservices.loghub.client.exceptions.LogHubCheckPointException; import com.aliyun.openservices.loghub.client.interfaces.ILogHubProcessor; import java.util.List; public class SPLLogHubProcessor implements ILogHubProcessor { private int shardId; // The time when the checkpoint was last persisted. private long mLastSaveTime = 0; // The initialize method is called once when a processor object is initialized. public void initialize(int shardId) { this.shardId = shardId; } // The main logic for data consumption. Handle all exceptions that occur during consumption. Do not throw exceptions directly. public String process(List<LogGroupData> logGroups, ILogHubCheckPointTracker checkPointTracker) { // Print the retrieved data. for (LogGroupData logGroup : logGroups) { FastLogGroup fastLogGroup = logGroup.GetFastLogGroup(); System.out.println("Tags"); for (int i = 0; i < fastLogGroup.getLogTagsCount(); ++i) { FastLogTag logTag = fastLogGroup.getLogTags(i); System.out.printf("%s : %s\n", logTag.getKey(), logTag.getValue()); } for (int i = 0; i < fastLogGroup.getLogsCount(); ++i) { FastLog log = fastLogGroup.getLogs(i); System.out.println("--------\nLog: " + i + ", time: " + log.getTime() + ", GetContentCount: " + log.getContentsCount()); for (int j = 0; j < log.getContentsCount(); ++j) { FastLogContent content = log.getContents(j); System.out.println(content.getKey() + "\t:\t" + content.getValue()); } } } long curTime = System.currentTimeMillis(); // Write a checkpoint to the server every 30 seconds. If a worker terminates unexpectedly within 30 seconds, the new worker starts to consume data from the last checkpoint. A small amount of data may be repeatedly consumed. try { if (curTime - mLastSaveTime > 30 * 1000) { // The true parameter indicates that the checkpoint is immediately updated to the server. By default, the checkpoint cached in memory is automatically updated to the server every 60 seconds. checkPointTracker.saveCheckPoint(true); mLastSaveTime = curTime; } else { // The false parameter indicates that the checkpoint is cached locally. The checkpoint can be updated to the server by the automatic update mechanism. checkPointTracker.saveCheckPoint(false); } } catch (LogHubCheckPointException e) { e.printStackTrace(); } return null; } // This function is called when the worker exits. You can perform cleanup tasks here. public void shutdown(ILogHubCheckPointTracker checkPointTracker) { // Immediately save the checkpoint to the server. try { checkPointTracker.saveCheckPoint(true); } catch (LogHubCheckPointException e) { e.printStackTrace(); } } }Create a consumer entity. The following code is an example of
SPLLogHubProcessorFactory.java.import com.aliyun.openservices.loghub.client.interfaces.ILogHubProcessor; import com.aliyun.openservices.loghub.client.interfaces.ILogHubProcessorFactory; class SPLLogHubProcessorFactory implements ILogHubProcessorFactory { public ILogHubProcessor generatorProcessor() { // Generate a consumer instance. Note: Each time the generatorProcessor method is called, a new SPLLogHubProcessor object must be returned. return new SPLLogHubProcessor(); } }Create a consumer and start a consumer thread. The consumer consumes data from the specified logstore.
import com.aliyun.openservices.loghub.client.ClientWorker; import com.aliyun.openservices.loghub.client.config.LogHubConfig; import com.aliyun.openservices.loghub.client.exceptions.LogHubClientWorkerException; public class Main { // The Simple Log Service endpoint. Replace the value with the actual endpoint. private static String Endpoint = "cn-hangzhou.log.aliyuncs.com"; // The name of the Simple Log Service project. Replace the value with the actual project name. Obtain the project name from an existing project. private static String Project = "ali-test-project"; // The name of the logstore. Replace the value with the actual logstore name. Obtain the logstore name from an existing logstore. private static String Logstore = "ali-test-logstore"; // The name of the consumer group. Replace the value with the actual consumer group name. You do not need to create the consumer group in advance. The program automatically creates it at runtime. private static String ConsumerGroup = "ali-test-consumergroup2"; // New content // The name of the consumer processor. Create the processor in the console or by calling the consumer processor API. private static String ConsumeProcessor = "test-consumer-processor"; // End of new content // This example obtains the AccessKey ID and AccessKey secret from environment variables. private static String AccessKeyId = System.getenv("ALIBABA_CLOUD_ACCESS_KEY_ID"); private static String AccessKeySecret = System.getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET"); public static void main(String[] args) throws LogHubClientWorkerException, InterruptedException { // consumer_1 is the consumer name. The names of consumers in the same consumer group must be unique. If different consumers start multiple processes on different machines to consume data from a Logstore in a balanced manner, you can use the machine IP addresses as consumer names. // maxFetchLogGroupSize specifies the maximum number of log groups that can be retrieved from the server at a time. You can use the default value. To change the value, use config.setMaxFetchLogGroupSize(100);. The value must be in the range of (0, 1000]. LogHubConfig config = new LogHubConfig(ConsumerGroup, "consumer_1", Endpoint, Project, Logstore, AccessKeyId, AccessKeySecret, LogHubConfig.ConsumePosition.BEGIN_CURSOR, 1000); // setProcessor specifies the consumer processor for data consumption. config.setProcessor(ConsumeProcessor); ClientWorker worker = new ClientWorker(new SPLLogHubProcessorFactory(), config); Thread thread = new Thread(worker); // After the thread runs, the ClientWorker automatically runs. The ClientWorker extends the Runnable interface. thread.start(); Thread.sleep(60 * 60 * 1000); // Call the shutdown function of the worker to exit the consumer instance. The associated thread also stops automatically. worker.shutdown(); // Multiple asynchronous tasks are generated when the ClientWorker is running. After the shutdown is complete, wait for the running tasks to securely exit. We recommend that you set the sleep time to 30 seconds. Thread.sleep(30 * 1000); } }Run
Main.java. This example consumes NGINX logs. The following log is printed.: GET request_uri : /request/path-3/file-7 status : 200 body_bytes_sent : 3820 host : www.example.com request_time : 43 request_length : 1987 http_user_agent : Mozilla/5.0 (Windows NT 6.1) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/41.0.2228.0 Safari/537.36 http_referer : www.example.com http_x_forwarded_for : 192.168.xx.xxx upstream_response_time : 0.02 -------- Log: 158, time: 1635629778, GetContentCount: 14 ...... category : null source : 127.0.0.1 topic : nginx_access_log machineUUID : null Tags __receive_time__ : 1635629815 -------- Log: 0, time: 1635629788, GetContentCount: 14 ...... category : null source : 127.0.0.1 topic : nginx_access_log
Python
Install the Simple Log Service SDK for Python. Create a project folder named spl_consumer_demo and run the following command in the folder. For more information, see Install the Simple Log Service SDK for Python.
The Simple Log Service SDK for Python must be version 0.9.28 or later.
pip install -U aliyun-log-python-sdkIn the spl_consumer_demo folder, create a main.py file. Create a consumer group and start a consumer thread. The consumer consumes data from the specified logstore.
import os import time from aliyun.log.consumer import * from aliyun.log import * class SPLConsumer(ConsumerProcessorBase): shard_id = -1 last_check_time = 0 def initialize(self, shard): self.shard_id = shard def process(self, log_groups, check_point_tracker): for log_group in log_groups.LogGroups: items = [] for log in log_group.Logs: item = dict() item['time'] = log.Time for content in log.Contents: item[content.Key] = content.Value items.append(item) log_items = dict() log_items['topic'] = log_group.Topic log_items['source'] = log_group.Source log_items['logs'] = items print(log_items) current_time = time.time() if current_time - self.last_check_time > 3: try: self.last_check_time = current_time check_point_tracker.save_check_point(True) except Exception: import traceback traceback.print_exc() else: try: check_point_tracker.save_check_point(False) except Exception: import traceback traceback.print_exc() # None means succesful process # if need to roll-back to previous checkpoint,return check_point_tracker.get_check_point() return None def shutdown(self, check_point_tracker): try: check_point_tracker.save_check_point(True) except Exception: import traceback traceback.print_exc() def sleep_until(seconds, exit_condition=None, expect_error=False): if not exit_condition: time.sleep(seconds) return s = time.time() while time.time() - s < seconds: try: if exit_condition(): break except Exception: if expect_error: continue time.sleep(1) def spl_consumer_group(): # The Simple Log Service endpoint. This example uses the endpoint of the China (Hangzhou) region. Replace the value with the actual endpoint. endpoint = os.environ.get('ALIYUN_LOG_SAMPLE_ENDPOINT', 'cn-hangzhou.log.aliyuncs.com') # This example obtains the AccessKey ID and AccessKey secret from environment variables. access_key_id = os.environ.get('ALIBABA_CLOUD_ACCESS_KEY_ID', '') access_key = os.environ.get('ALIBABA_CLOUD_ACCESS_KEY_SECRET', '') project = 'your_project' logstore = 'your_logstore' # The name of the consumer group. You do not need to create the consumer group in advance. The SDK automatically creates it. consumer_group = 'consumer-group' consumer_name = "consumer-group-name" # Create two consumers in the consumer group to consume data. option = LogHubConfig(endpoint, access_key_id, access_key, project, logstore, consumer_group, consumer_name, processor="test-consume-processor", cursor_position=CursorPosition.BEGIN_CURSOR, heartbeat_interval=6, data_fetch_interval=1) print("*** start to consume data...") client_worker = ConsumerWorker(SPLConsumer, consumer_option=option) client_worker.start() time.sleep(10000) if __name__ == '__main__': spl_consumer_group()Run main.py in the spl_consumer_demo folder and view the result.
python main.py
Go
Install the Go SDK. Create a project folder named spl_demo and run the following command in the folder. For more information, see Install the Go SDK.
The Simple Log Service SDK for Go must be version v0.1.107 or later.
go get -u github.com/aliyun/aliyun-log-go-sdkIn the spl_demo folder, create a main.go file. Create a consumer group and start a consumer thread. The consumer consumes data from the specified logstore.
package main import ( "fmt" "os" "os/signal" "syscall" sls "github.com/aliyun/aliyun-log-go-sdk" consumerLibrary "github.com/aliyun/aliyun-log-go-sdk/consumer" "github.com/go-kit/kit/log/level" ) // README : // This is a very simple example of pulling data from your logstore and printing it for consumption, including pre-handling for logs. func main() { // The Simple Log Service endpoint. This example uses the endpoint of the China (Hangzhou) region. Replace the value with the actual endpoint. option := consumerLibrary.LogHubConfig{ Endpoint: "cn-hangzhou.log.aliyuncs.com", AccessKeyID: os.Getenv("ALIBABA_CLOUD_ACCESS_KEY_ID"), AccessKeySecret: os.Getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET"), Project: "your_project", Logstore: "your_logstore", ConsumerGroupName: "test-spl-cg", ConsumerName: "test-spl-consumer", // This options is used for initialization, will be ignored once consumer group is created and each shard has been started to be consumed. // Could be "begin", "end", "specific time format in time stamp", it's log receiving time. CursorPosition: consumerLibrary.END_CURSOR, // Processor is for log pre-handling before the log is returned to the client. For more information, see https://www.alibabacloud.com/help/en/sls/user-guide/rule-based-consumption Processor: "test-consume-processor", } consumerWorker := consumerLibrary.InitConsumerWorkerWithCheckpointTracker(option, process) ch := make(chan os.Signal, 1) signal.Notify(ch, syscall.SIGINT, syscall.SIGTERM) consumerWorker.Start() if _, ok := <-ch; ok { level.Info(consumerWorker.Logger).Log("msg", "get stop signal, start to stop consumer worker", "consumer worker name", option.ConsumerName) consumerWorker.StopAndWait() } } // Fill in your consumption logic here, and be careful not to change the parameters of the function and the return value, // otherwise you will report errors. func process(shardId int, logGroupList *sls.LogGroupList, checkpointTracker consumerLibrary.CheckPointTracker) (string, error) { fmt.Println(shardId, "loggroup", len(logGroupList.LogGroups)) checkpointTracker.SaveCheckPoint(false) return "", nil }Run the following commands in the spl_demo folder to install dependencies.
go mod tidy go mod vendorRun the main function and view the output.
go run main.go