全部產品
Search
文件中心

Simple Log Service:使用消費組基於SPL消費日誌

更新時間:Jan 06, 2026

通過消費組(ConsumerGroup)消費資料無需關注Log Service的實現細節及消費者之間的負載平衡、容錯移轉(Failover)等,只需要專註於商務邏輯。本文介紹通過Java消費組、Go消費組和Python消費組的方式,設定SPL語句來消費LogStore中的資料。

前提條件

  • 已建立RAM使用者並完成授權。具體操作,請參見建立RAM使用者及授權

  • 已配置環境變數ALIBABA_CLOUD_ACCESS_KEY_IDALIBABA_CLOUD_ACCESS_KEY_SECRET。具體操作,請參見在Linux、macOS和Windows系統配置環境變數

    重要
    • 阿里雲帳號的AccessKey擁有所有API的存取權限,建議您使用RAM使用者的AccessKey進行API訪問或日常營運。

    • 強烈建議不要把AccessKey ID和AccessKey Secret儲存到工程代碼裡,否則可能導致AccessKey泄露,威脅您帳號下所有資源的安全。

  • 建立消費處理器

程式碼範例

Java

  1. 添加Maven依賴。

    com.aliyun.openservices包要求版本號碼不低於0.6.51。
    <dependency>
      <groupId>com.google.protobuf</groupId>
      <artifactId>protobuf-java</artifactId>
      <version>2.5.0</version>
    </dependency>
    <dependency>
      <groupId>com.aliyun.openservices</groupId>
      <artifactId>loghub-client-lib</artifactId>
      <version>0.6.51</version>
    </dependency>
  2. 建立消費邏輯,SPLLogHubProcessor.java代碼如下所示。

    import com.aliyun.openservices.log.common.FastLog;
    import com.aliyun.openservices.log.common.FastLogContent;
    import com.aliyun.openservices.log.common.FastLogGroup;
    import com.aliyun.openservices.log.common.FastLogTag;
    import com.aliyun.openservices.log.common.LogGroupData;
    import com.aliyun.openservices.loghub.client.ILogHubCheckPointTracker;
    import com.aliyun.openservices.loghub.client.exceptions.LogHubCheckPointException;
    import com.aliyun.openservices.loghub.client.interfaces.ILogHubProcessor;
    
    import java.util.List;
    
    public class SPLLogHubProcessor implements ILogHubProcessor {
        private int shardId;
        // 記錄上次持久化Checkpoint的時間。
        private long mLastSaveTime = 0;
    
        // initialize 方法會在 processor 對象初始化時被調用一次
        public void initialize(int shardId) {
            this.shardId = shardId;
        }
    
        // 消費資料的主邏輯,消費時的所有異常都需要處理,不能直接拋出。
        public String process(List<LogGroupData> logGroups, ILogHubCheckPointTracker checkPointTracker) {
            // 列印已擷取的資料。
            for (LogGroupData logGroup : logGroups) {
                FastLogGroup fastLogGroup = logGroup.GetFastLogGroup();
                System.out.println("Tags");
                for (int i = 0; i < fastLogGroup.getLogTagsCount(); ++i) {
                    FastLogTag logTag = fastLogGroup.getLogTags(i);
                    System.out.printf("%s : %s\n", logTag.getKey(), logTag.getValue());
                }
                for (int i = 0; i < fastLogGroup.getLogsCount(); ++i) {
                    FastLog log = fastLogGroup.getLogs(i);
                    System.out.println("--------\nLog: " + i + ", time: " + log.getTime() + ", GetContentCount: " + log.getContentsCount());
                    for (int j = 0; j < log.getContentsCount(); ++j) {
                        FastLogContent content = log.getContents(j);
                        System.out.println(content.getKey() + "\t:\t" + content.getValue());
                    }
                }
            }
            long curTime = System.currentTimeMillis();
            // 每隔30秒,寫一次Checkpoint到服務端。如果30秒內發生Worker異常終止,新啟動的Worker會從上一個Checkpoint擷取消費資料,可能存在少量的重複資料。
            try {
                if (curTime - mLastSaveTime > 30 * 1000) {
                    // 參數為true表示立即手動將Checkpoint更新到服務端。此外,預設每60秒會自動將記憶體中緩衝的Checkpoint更新到服務端。
                    checkPointTracker.saveCheckPoint(true);
                    mLastSaveTime = curTime;
                } else {
                    // 參數為false表示將Checkpoint緩衝在本地,可被自動更新Checkpoint機制更新到服務端。
                    checkPointTracker.saveCheckPoint(false);
                }
            } catch (LogHubCheckPointException e) {
                e.printStackTrace();
            }
            return null;
        }
    
        // 當Worker退出時,會調用該函數,您可以在此處執行清理工作。
        public void shutdown(ILogHubCheckPointTracker checkPointTracker) {
            // 將Checkpoint立即儲存到服務端。
            try {
                checkPointTracker.saveCheckPoint(true);
            } catch (LogHubCheckPointException e) {
                e.printStackTrace();
            }
        }
    }
  3. 建立消費者實體,SPLLogHubProcessorFactory.java代碼如下所示。

    import com.aliyun.openservices.loghub.client.interfaces.ILogHubProcessor;
    import com.aliyun.openservices.loghub.client.interfaces.ILogHubProcessorFactory;
    
    class SPLLogHubProcessorFactory implements ILogHubProcessorFactory {
        public ILogHubProcessor generatorProcessor() {
            // 產生一個消費執行個體。注意:每次調用 generatorProcessor 方法,都應該返回一個新的 SPLLogHubProcessor 對象。
            return new SPLLogHubProcessor();
        }
    }
  4. 建立一個消費者並啟動一個消費者線程,該消費者會從指定的LogStore中消費資料。代碼如下所示。

    import com.aliyun.openservices.loghub.client.ClientWorker;
    import com.aliyun.openservices.loghub.client.config.LogHubConfig;
    import com.aliyun.openservices.loghub.client.exceptions.LogHubClientWorkerException;
    
    public class Main {
        // Log Service的服務存取點,請您根據實際情況填寫。
        private static String Endpoint = "cn-hangzhou.log.aliyuncs.com";
        // Log Service專案名稱,請您根據實際情況填寫。請從已建立專案中擷取專案名稱。
        private static String Project = "ali-test-project";
        // 日誌庫名稱,請您根據實際情況填寫。請從已建立日誌庫中擷取日誌庫名稱。
        private static String Logstore = "ali-test-logstore";
        // 消費組名稱,請您根據實際情況填寫。您無需提前建立,該程式運行時會自動建立該消費組。
        private static String ConsumerGroup = "ali-test-consumergroup2";
    
        // 新增內容
        // 消費處理器名稱:請在控制台或者通過消費處理開放API建立
        private static String ConsumeProcessor = "test-consumer-processor";
        // 新增內容結束
        // 本樣本從環境變數中擷取AccessKey ID和AccessKey Secret。。
        private static String AccessKeyId = System.getenv("ALIBABA_CLOUD_ACCESS_KEY_ID");
        private static String AccessKeySecret = System.getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET");
    
    
        public static void main(String[] args) throws LogHubClientWorkerException, InterruptedException {
            // consumer_1是消費者名稱,同一個消費組下面的消費者名稱必須不同。不同消費者在多台機器上啟動多個進程,均衡消費一個Logstore時,消費者名稱可以使用機器IP地址來區分。
            // maxFetchLogGroupSize用於設定每次從服務端擷取的LogGroup最大數目,使用預設值即可。您可以使用config.setMaxFetchLogGroupSize(100);調整,取值範圍為(0,1000]。
            LogHubConfig config = new LogHubConfig(ConsumerGroup, "consumer_1", Endpoint, Project, Logstore, AccessKeyId, AccessKeySecret, LogHubConfig.ConsumePosition.BEGIN_CURSOR, 1000);
            // setProcessor可以設定消費過程中的SLS SPL語句
            config.setProcessor(ConsumeProcessor);
            ClientWorker worker = new ClientWorker(new SPLLogHubProcessorFactory(), config);
            Thread thread = new Thread(worker);
            // Thread運行之後,ClientWorker會自動運行,ClientWorker擴充了Runnable介面。
            thread.start();
            Thread.sleep(60 * 60 * 1000);
            // 調用Worker的Shutdown函數,退出消費執行個體,關聯的線程也會自動停止。
            worker.shutdown();
            // ClientWorker運行過程中會產生多個非同步任務。Shutdown完成後,請等待還在執行的任務安全退出。建議設定sleep為30秒。
            Thread.sleep(30 * 1000);
        }
    }
  5. 運行Main.java,以類比消費Nginx日誌為例,列印日誌如下。

    :    GET
    request_uri    :    /request/path-3/file-7
    status    :    200
    body_bytes_sent    :    3820
    host    :    www.example.com
    request_time    :    43
    request_length    :    1987
    http_user_agent    :    Mozilla/5.0 (Windows NT 6.1) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/41.0.2228.0 Safari/537.36
    http_referer    :    www.example.com
    http_x_forwarded_for    :    192.168.xx.xxx
    upstream_response_time    :    0.02
    --------
    Log: 158, time: 1635629778, GetContentCount: 14
    ......
        category    :    null
        source    :    127.0.0.1
        topic    :    nginx_access_log
        machineUUID    :    null
    Tags
        __receive_time__    :    1635629815
    --------
    Log: 0, time: 1635629788, GetContentCount: 14
    ......
        category    :    null
        source    :    127.0.0.1
        topic    :    nginx_access_log

Python

  1. 安裝Log ServiceSDK:建立專案目錄spl_consumer_demo,在目錄下執行如下命令,更多資訊,請參見安裝Log ServicePython SDK

    阿里雲Log ServiceSDK版本號碼不低於0.9.28。
    pip install -U aliyun-log-python-sdk
  2. 在spl_consumer_demo目錄下建立main.py檔案。建立一個消費組並啟動一個消費者線程,該消費者會從指定的LogStore中消費資料。

    import os
    import time
    
    from aliyun.log.consumer import *
    from aliyun.log import *
    
    
    class SPLConsumer(ConsumerProcessorBase):
        shard_id = -1
        last_check_time = 0
    
        def initialize(self, shard):
            self.shard_id = shard
    
        def process(self, log_groups, check_point_tracker):
            for log_group in log_groups.LogGroups:
                items = []
                for log in log_group.Logs:
                    item = dict()
                    item['time'] = log.Time
                    for content in log.Contents:
                        item[content.Key] = content.Value
                    items.append(item)
                log_items = dict()
                log_items['topic'] = log_group.Topic
                log_items['source'] = log_group.Source
                log_items['logs'] = items
    
                print(log_items)
    
            current_time = time.time()
            if current_time - self.last_check_time > 3:
                try:
                    self.last_check_time = current_time
                    check_point_tracker.save_check_point(True)
                except Exception:
                    import traceback
                    traceback.print_exc()
            else:
                try:
                    check_point_tracker.save_check_point(False)
                except Exception:
                    import traceback
                    traceback.print_exc()
    
            # None means succesful process
            # if need to roll-back to previous checkpoint,return check_point_tracker.get_check_point()
            return None
    
        def shutdown(self, check_point_tracker):
            try:
                check_point_tracker.save_check_point(True)
            except Exception:
                import traceback
                traceback.print_exc()
    
    
    def sleep_until(seconds, exit_condition=None, expect_error=False):
        if not exit_condition:
            time.sleep(seconds)
            return
    
        s = time.time()
        while time.time() - s < seconds:
            try:
                if exit_condition():
                    break
            except Exception:
                if expect_error:
                    continue
            time.sleep(1)
    
    def spl_consumer_group():
        # Log Service的服務存取點。此處以杭州為例,其它地區請根據實際情況填寫。
        endpoint = os.environ.get('ALIYUN_LOG_SAMPLE_ENDPOINT', 'cn-hangzhou.log.aliyuncs.com')
    
        # 本樣本從環境變數中擷取AccessKey ID和AccessKey Secret。
        access_key_id = os.environ.get('ALIBABA_CLOUD_ACCESS_KEY_ID', '')
        access_key = os.environ.get('ALIBABA_CLOUD_ACCESS_KEY_SECRET', '')
    
        project = 'your_project'
        logstore = 'your_logstore'
    
        # 消費組名稱。您無需提前建立,SDK會自動建立該消費組。
        consumer_group = 'consumer-group'
        consumer_name = "consumer-group-name"
    
        # 在消費組中建立2個消費者消費資料。
        option = LogHubConfig(endpoint,
                              access_key_id,
                              access_key,
                              project,
                              logstore,
                              consumer_group,
                              consumer_name,
                              processor="test-consume-processor",
                              cursor_position=CursorPosition.BEGIN_CURSOR,
                              heartbeat_interval=6,
                              data_fetch_interval=1)
    
        print("*** start to consume data...")
        client_worker = ConsumerWorker(SPLConsumer, consumer_option=option)
        client_worker.start()
    
        time.sleep(10000)
    
    
    if __name__ == '__main__':
        spl_consumer_group()
  3. 在spl_consumer_demo目錄下運行main.py,查看結果。

    python main.py

Go

  1. 安裝Log ServiceSDK:建立專案目錄spl_demo,在目錄下執行如下命令,更多資訊,請參見安裝Go SDK

    阿里雲Log ServiceSDK版本號碼不低於v0.1.107。
    go get -u github.com/aliyun/aliyun-log-go-sdk
  2. 在spl_demo目錄下建立main.go檔案。建立一個消費組並啟動一個消費者線程,該消費者會從指定的LogStore中消費資料。

    package main
    
    import (
      "fmt"
      "os"
      "os/signal"
      "syscall"
    
      sls "github.com/aliyun/aliyun-log-go-sdk"
      consumerLibrary "github.com/aliyun/aliyun-log-go-sdk/consumer"
      "github.com/go-kit/kit/log/level"
    )
    
    // README :
    // This is a very simple example of pulling data from your logstore and printing it for consumption, including pre-handling for logs.
    
    func main() {
      // Log Service的服務存取點。此處以杭州為例,其它地區請根據實際情況填寫。
    option := consumerLibrary.LogHubConfig{
        Endpoint:          "cn-hangzhou.log.aliyuncs.com",
        AccessKeyID:       os.Getenv("ALIBABA_CLOUD_ACCESS_KEY_ID"),
        AccessKeySecret:   os.Getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET"),
        Project:           "your_project",
        Logstore:          "your_logstore",
        ConsumerGroupName: "test-spl-cg",
        ConsumerName:      "test-spl-consumer",
        // This options is used for initialization, will be ignored once consumer group is created and each shard has been started to be consumed.
        // Could be "begin", "end", "specific time format in time stamp", it's log receiving time.
        CursorPosition: consumerLibrary.END_CURSOR,
        // Processor is for log pre-handling before return to client, more info refer to https://www.alibabacloud.com/help/zh/sls/user-guide/rule-based-consumption
        Processor: "test-consume-processor",
      }
    
      consumerWorker := consumerLibrary.InitConsumerWorkerWithCheckpointTracker(option, process)
      ch := make(chan os.Signal, 1)
      signal.Notify(ch, syscall.SIGINT, syscall.SIGTERM)
      consumerWorker.Start()
      if _, ok := <-ch; ok {
        level.Info(consumerWorker.Logger).Log("msg", "get stop signal, start to stop consumer worker", "consumer worker name", option.ConsumerName)
        consumerWorker.StopAndWait()
      }
    }
    
    // Fill in your consumption logic here, and be careful not to change the parameters of the function and the return value,
    // otherwise you will report errors.
    func process(shardId int, logGroupList *sls.LogGroupList, checkpointTracker consumerLibrary.CheckPointTracker) (string, error) {
      fmt.Println(shardId, "loggroup", len(logGroupList.LogGroups))
      checkpointTracker.SaveCheckPoint(false)
      return "", nil
    }
  3. 在spl_demo目錄下執行命令,安裝依賴。

    go mod tidy
    go mod vendor
  4. 運行main函數,查看輸出結果。

    go run main.go