All Products
Search
Document Center

Simple Log Service:Use a consumer group to consume logs based on a consumer processor (SPL)

Last Updated:Aug 31, 2025

When using consumer groups to consume data, you can concentrate on your business logic rather than handling implementation details like load balancing and failover. This topic explains how to use the Simple Log Service SDK for Java, Python, or Go to consume logs from a logstore with a consumer group configured with a consumer processor.

Prerequisites

  • A Resource Access Management (RAM) user is created and granted the required permissions. For more information, see Create a RAM user and grant permissions.

  • The ALIBABA_CLOUD_ACCESS_KEY_ID and ALIBABA_CLOUD_ACCESS_KEY_SECRET environment variables are configured. For more information, see Configure environment variables in Linux, macOS, and Windows.

    Important
    • The AccessKey pair of an Alibaba Cloud account has permissions on all API operations. We recommend that you use the AccessKey pair of a RAM user to call API operations or perform routine O&M.

    • Do not include your AccessKey ID or AccessKey secret in your project code. If either is leaked, the security of all resources in your account may be compromised.

  • Create a consumer processor

Sample code

Java

  1. Add Maven dependencies.

    The com.aliyun.openservices package must be version 0.6.51 or later.
    <dependency>
      <groupId>com.google.protobuf</groupId>
      <artifactId>protobuf-java</artifactId>
      <version>2.5.0</version>
    </dependency>
    <dependency>
      <groupId>com.aliyun.openservices</groupId>
      <artifactId>loghub-client-lib</artifactId>
      <version>0.6.51</version>
    </dependency>
  2. Create the data consumption logic. The following code is an example of SPLLogHubProcessor.java.

    import com.aliyun.openservices.log.common.FastLog;
    import com.aliyun.openservices.log.common.FastLogContent;
    import com.aliyun.openservices.log.common.FastLogGroup;
    import com.aliyun.openservices.log.common.FastLogTag;
    import com.aliyun.openservices.log.common.LogGroupData;
    import com.aliyun.openservices.loghub.client.ILogHubCheckPointTracker;
    import com.aliyun.openservices.loghub.client.exceptions.LogHubCheckPointException;
    import com.aliyun.openservices.loghub.client.interfaces.ILogHubProcessor;
    
    import java.util.List;
    
    public class SPLLogHubProcessor implements ILogHubProcessor {
        private int shardId;
        // The time when the checkpoint was last persisted.
        private long mLastSaveTime = 0;
    
        // The initialize method is called once when a processor object is initialized.
        public void initialize(int shardId) {
            this.shardId = shardId;
        }
    
        // The main logic for data consumption. Handle all exceptions that occur during consumption. Do not throw exceptions directly.
        public String process(List<LogGroupData> logGroups, ILogHubCheckPointTracker checkPointTracker) {
            // Print the retrieved data.
            for (LogGroupData logGroup : logGroups) {
                FastLogGroup fastLogGroup = logGroup.GetFastLogGroup();
                System.out.println("Tags");
                for (int i = 0; i < fastLogGroup.getLogTagsCount(); ++i) {
                    FastLogTag logTag = fastLogGroup.getLogTags(i);
                    System.out.printf("%s : %s\n", logTag.getKey(), logTag.getValue());
                }
                for (int i = 0; i < fastLogGroup.getLogsCount(); ++i) {
                    FastLog log = fastLogGroup.getLogs(i);
                    System.out.println("--------\nLog: " + i + ", time: " + log.getTime() + ", GetContentCount: " + log.getContentsCount());
                    for (int j = 0; j < log.getContentsCount(); ++j) {
                        FastLogContent content = log.getContents(j);
                        System.out.println(content.getKey() + "\t:\t" + content.getValue());
                    }
                }
            }
            long curTime = System.currentTimeMillis();
            // Write a checkpoint to the server every 30 seconds. If a worker terminates unexpectedly within 30 seconds, the new worker starts to consume data from the last checkpoint. A small amount of data may be repeatedly consumed.
            try {
                if (curTime - mLastSaveTime > 30 * 1000) {
                    // The true parameter indicates that the checkpoint is immediately updated to the server. By default, the checkpoint cached in memory is automatically updated to the server every 60 seconds.
                    checkPointTracker.saveCheckPoint(true);
                    mLastSaveTime = curTime;
                } else {
                    // The false parameter indicates that the checkpoint is cached locally. The checkpoint can be updated to the server by the automatic update mechanism.
                    checkPointTracker.saveCheckPoint(false);
                }
            } catch (LogHubCheckPointException e) {
                e.printStackTrace();
            }
            return null;
        }
    
        // This function is called when the worker exits. You can perform cleanup tasks here.
        public void shutdown(ILogHubCheckPointTracker checkPointTracker) {
            // Immediately save the checkpoint to the server.
            try {
                checkPointTracker.saveCheckPoint(true);
            } catch (LogHubCheckPointException e) {
                e.printStackTrace();
            }
        }
    }
  3. Create a consumer entity. The following code is an example of SPLLogHubProcessorFactory.java.

    import com.aliyun.openservices.loghub.client.interfaces.ILogHubProcessor;
    import com.aliyun.openservices.loghub.client.interfaces.ILogHubProcessorFactory;
    
    class SPLLogHubProcessorFactory implements ILogHubProcessorFactory {
        public ILogHubProcessor generatorProcessor() {
            // Generate a consumer instance. Note: Each time the generatorProcessor method is called, a new SPLLogHubProcessor object must be returned.
            return new SPLLogHubProcessor();
        }
    }
  4. Create a consumer and start a consumer thread. The consumer consumes data from the specified logstore.

    import com.aliyun.openservices.loghub.client.ClientWorker;
    import com.aliyun.openservices.loghub.client.config.LogHubConfig;
    import com.aliyun.openservices.loghub.client.exceptions.LogHubClientWorkerException;
    
    public class Main {
        // The Simple Log Service endpoint. Replace the value with the actual endpoint.
        private static String Endpoint = "cn-hangzhou.log.aliyuncs.com";
        // The name of the Simple Log Service project. Replace the value with the actual project name. Obtain the project name from an existing project.
        private static String Project = "ali-test-project";
        // The name of the logstore. Replace the value with the actual logstore name. Obtain the logstore name from an existing logstore.
        private static String Logstore = "ali-test-logstore";
        // The name of the consumer group. Replace the value with the actual consumer group name. You do not need to create the consumer group in advance. The program automatically creates it at runtime.
        private static String ConsumerGroup = "ali-test-consumergroup2";
    
        // New content
        // The name of the consumer processor. Create the processor in the console or by calling the consumer processor API.
        private static String ConsumeProcessor = "test-consumer-processor";
        // End of new content
        // This example obtains the AccessKey ID and AccessKey secret from environment variables.
        private static String AccessKeyId = System.getenv("ALIBABA_CLOUD_ACCESS_KEY_ID");
        private static String AccessKeySecret = System.getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET");
    
    
        public static void main(String[] args) throws LogHubClientWorkerException, InterruptedException {
            // consumer_1 is the consumer name. The names of consumers in the same consumer group must be unique. If different consumers start multiple processes on different machines to consume data from a Logstore in a balanced manner, you can use the machine IP addresses as consumer names.
            // maxFetchLogGroupSize specifies the maximum number of log groups that can be retrieved from the server at a time. You can use the default value. To change the value, use config.setMaxFetchLogGroupSize(100);. The value must be in the range of (0, 1000].
            LogHubConfig config = new LogHubConfig(ConsumerGroup, "consumer_1", Endpoint, Project, Logstore, AccessKeyId, AccessKeySecret, LogHubConfig.ConsumePosition.BEGIN_CURSOR, 1000);
            // setProcessor specifies the consumer processor for data consumption.
            config.setProcessor(ConsumeProcessor);
            ClientWorker worker = new ClientWorker(new SPLLogHubProcessorFactory(), config);
            Thread thread = new Thread(worker);
            // After the thread runs, the ClientWorker automatically runs. The ClientWorker extends the Runnable interface.
            thread.start();
            Thread.sleep(60 * 60 * 1000);
            // Call the shutdown function of the worker to exit the consumer instance. The associated thread also stops automatically.
            worker.shutdown();
            // Multiple asynchronous tasks are generated when the ClientWorker is running. After the shutdown is complete, wait for the running tasks to securely exit. We recommend that you set the sleep time to 30 seconds.
            Thread.sleep(30 * 1000);
        }
    }
  5. Run Main.java. This example consumes NGINX logs. The following log is printed.

    :    GET
    request_uri    :    /request/path-3/file-7
    status    :    200
    body_bytes_sent    :    3820
    host    :    www.example.com
    request_time    :    43
    request_length    :    1987
    http_user_agent    :    Mozilla/5.0 (Windows NT 6.1) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/41.0.2228.0 Safari/537.36
    http_referer    :    www.example.com
    http_x_forwarded_for    :    192.168.xx.xxx
    upstream_response_time    :    0.02
    --------
    Log: 158, time: 1635629778, GetContentCount: 14
    ......
        category    :    null
        source    :    127.0.0.1
        topic    :    nginx_access_log
        machineUUID    :    null
    Tags
        __receive_time__    :    1635629815
    --------
    Log: 0, time: 1635629788, GetContentCount: 14
    ......
        category    :    null
        source    :    127.0.0.1
        topic    :    nginx_access_log

Python

  1. Install the Simple Log Service SDK for Python. Create a project folder named spl_consumer_demo and run the following command in the folder. For more information, see Install the Simple Log Service SDK for Python.

    The Simple Log Service SDK for Python must be version 0.9.28 or later.
    pip install -U aliyun-log-python-sdk
  2. In the spl_consumer_demo folder, create a main.py file. Create a consumer group and start a consumer thread. The consumer consumes data from the specified logstore.

    import os
    import time
    
    from aliyun.log.consumer import *
    from aliyun.log import *
    
    
    class SPLConsumer(ConsumerProcessorBase):
        shard_id = -1
        last_check_time = 0
    
        def initialize(self, shard):
            self.shard_id = shard
    
        def process(self, log_groups, check_point_tracker):
            for log_group in log_groups.LogGroups:
                items = []
                for log in log_group.Logs:
                    item = dict()
                    item['time'] = log.Time
                    for content in log.Contents:
                        item[content.Key] = content.Value
                    items.append(item)
                log_items = dict()
                log_items['topic'] = log_group.Topic
                log_items['source'] = log_group.Source
                log_items['logs'] = items
    
                print(log_items)
    
            current_time = time.time()
            if current_time - self.last_check_time > 3:
                try:
                    self.last_check_time = current_time
                    check_point_tracker.save_check_point(True)
                except Exception:
                    import traceback
                    traceback.print_exc()
            else:
                try:
                    check_point_tracker.save_check_point(False)
                except Exception:
                    import traceback
                    traceback.print_exc()
    
            # None means succesful process
            # if need to roll-back to previous checkpoint,return check_point_tracker.get_check_point()
            return None
    
        def shutdown(self, check_point_tracker):
            try:
                check_point_tracker.save_check_point(True)
            except Exception:
                import traceback
                traceback.print_exc()
    
    
    def sleep_until(seconds, exit_condition=None, expect_error=False):
        if not exit_condition:
            time.sleep(seconds)
            return
    
        s = time.time()
        while time.time() - s < seconds:
            try:
                if exit_condition():
                    break
            except Exception:
                if expect_error:
                    continue
            time.sleep(1)
    
    def spl_consumer_group():
        # The Simple Log Service endpoint. This example uses the endpoint of the China (Hangzhou) region. Replace the value with the actual endpoint.
        endpoint = os.environ.get('ALIYUN_LOG_SAMPLE_ENDPOINT', 'cn-hangzhou.log.aliyuncs.com')
    
        # This example obtains the AccessKey ID and AccessKey secret from environment variables.
        access_key_id = os.environ.get('ALIBABA_CLOUD_ACCESS_KEY_ID', '')
        access_key = os.environ.get('ALIBABA_CLOUD_ACCESS_KEY_SECRET', '')
    
        project = 'your_project'
        logstore = 'your_logstore'
    
        # The name of the consumer group. You do not need to create the consumer group in advance. The SDK automatically creates it.
        consumer_group = 'consumer-group'
        consumer_name = "consumer-group-name"
    
        # Create two consumers in the consumer group to consume data.
        option = LogHubConfig(endpoint,
                              access_key_id,
                              access_key,
                              project,
                              logstore,
                              consumer_group,
                              consumer_name,
                              processor="test-consume-processor",
                              cursor_position=CursorPosition.BEGIN_CURSOR,
                              heartbeat_interval=6,
                              data_fetch_interval=1)
    
        print("*** start to consume data...")
        client_worker = ConsumerWorker(SPLConsumer, consumer_option=option)
        client_worker.start()
    
        time.sleep(10000)
    
    
    if __name__ == '__main__':
        spl_consumer_group()
  3. Run main.py in the spl_consumer_demo folder and view the result.

    python main.py

Go

  1. Install the Go SDK. Create a project folder named spl_demo and run the following command in the folder. For more information, see Install the Go SDK.

    The Simple Log Service SDK for Go must be version v0.1.107 or later.
    go get -u github.com/aliyun/aliyun-log-go-sdk
  2. In the spl_demo folder, create a main.go file. Create a consumer group and start a consumer thread. The consumer consumes data from the specified logstore.

    package main
    
    import (
      "fmt"
      "os"
      "os/signal"
      "syscall"
    
      sls "github.com/aliyun/aliyun-log-go-sdk"
      consumerLibrary "github.com/aliyun/aliyun-log-go-sdk/consumer"
      "github.com/go-kit/kit/log/level"
    )
    
    // README :
    // This is a very simple example of pulling data from your logstore and printing it for consumption, including pre-handling for logs.
    
    func main() {
      // The Simple Log Service endpoint. This example uses the endpoint of the China (Hangzhou) region. Replace the value with the actual endpoint.
    option := consumerLibrary.LogHubConfig{
        Endpoint:          "cn-hangzhou.log.aliyuncs.com",
        AccessKeyID:       os.Getenv("ALIBABA_CLOUD_ACCESS_KEY_ID"),
        AccessKeySecret:   os.Getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET"),
        Project:           "your_project",
        Logstore:          "your_logstore",
        ConsumerGroupName: "test-spl-cg",
        ConsumerName:      "test-spl-consumer",
        // This options is used for initialization, will be ignored once consumer group is created and each shard has been started to be consumed.
        // Could be "begin", "end", "specific time format in time stamp", it's log receiving time.
        CursorPosition: consumerLibrary.END_CURSOR,
        // Processor is for log pre-handling before the log is returned to the client. For more information, see https://www.alibabacloud.com/help/en/sls/user-guide/rule-based-consumption
        Processor: "test-consume-processor",
      }
    
      consumerWorker := consumerLibrary.InitConsumerWorkerWithCheckpointTracker(option, process)
      ch := make(chan os.Signal, 1)
      signal.Notify(ch, syscall.SIGINT, syscall.SIGTERM)
      consumerWorker.Start()
      if _, ok := <-ch; ok {
        level.Info(consumerWorker.Logger).Log("msg", "get stop signal, start to stop consumer worker", "consumer worker name", option.ConsumerName)
        consumerWorker.StopAndWait()
      }
    }
    
    // Fill in your consumption logic here, and be careful not to change the parameters of the function and the return value,
    // otherwise you will report errors.
    func process(shardId int, logGroupList *sls.LogGroupList, checkpointTracker consumerLibrary.CheckPointTracker) (string, error) {
      fmt.Println(shardId, "loggroup", len(logGroupList.LogGroups))
      checkpointTracker.SaveCheckPoint(false)
      return "", nil
    }
  3. Run the following commands in the spl_demo folder to install dependencies.

    go mod tidy
    go mod vendor
  4. Run the main function and view the output.

    go run main.go