すべてのプロダクト
Search
ドキュメントセンター

Simple Log Service:SDK を使用して SPL に基づいてログを消費する

最終更新日:Nov 09, 2025

このトピックでは、Search Processing Language (SPL) とソフトウェア開発キット (SDK) を備えたコンシューマープロセッサーを使用してログを消費する方法の例を示します。

前提条件

  • Resource Access Management (RAM) ユーザーが作成され、必要な権限が付与されています。詳細については、「RAM ユーザーの作成と権限の付与」をご参照ください。

  • ALIBABA_CLOUD_ACCESS_KEY_ID および ALIBABA_CLOUD_ACCESS_KEY_SECRET 環境変数が設定されています。詳細については、「Linux、macOS、および Windows で環境変数を設定する」をご参照ください。

    重要
    • Alibaba Cloud アカウントの AccessKey ペアは、すべての API 操作に対する権限を持っています。RAM ユーザーの AccessKey ペアを使用して API 操作を呼び出すか、日常の O&M を実行することをお勧めします。

    • プロジェクトコードに AccessKey ID または AccessKey Secret を含めないでください。どちらかが漏洩すると、アカウント内のすべてのリソースのセキュリティが危険にさらされる可能性があります。

  • コンシューマープロセッサーを作成する

コード例

Java

  1. Simple Log Service SDK をインストールします。Java プロジェクトのルートディレクトリで、pom.xml ファイルを開き、次の Maven 依存関係を追加します。詳細については、「Java SDK のインストール」をご参照ください。

    Simple Log Service SDK for Java のバージョンは 0.6.126 以降である必要があります。
    <dependency>
      <groupId>com.google.protobuf</groupId>
      <artifactId>protobuf-java</artifactId>
      <version>2.5.0</version>
    </dependency>
    <!-- Java 用 Simple Log Service SDK をインポートします -->
    <dependency>
      <groupId>com.aliyun.openservices</groupId>
      <artifactId>aliyun-log</artifactId>
      <version>0.6.126</version>
    </dependency>
  2. PullLogsWithSPLDemo.java という名前のファイルを作成します。この例では、PullLog 操作を呼び出してログデータを読み取ります。Java SDK を使用して SPL でログデータを消費する方法を示します。

    import com.aliyun.openservices.log.Client;
    import com.aliyun.openservices.log.common.*;
    import com.aliyun.openservices.log.request.PullLogsRequest;
    import com.aliyun.openservices.log.response.ListShardResponse;
    import com.aliyun.openservices.log.response.PullLogsResponse;
    import java.util.HashMap;
    import java.util.List;
    import java.util.Map;
    
    public class PullLogsWithSPLDemo {
        // Simple Log Service のサービスエンドポイント。この例では、中国 (杭州) リージョンのエンドポイントを使用します。エンドポイントを実際のリージョンのものに置き換えてください。
        private static final String endpoint = "cn-hangzhou.log.aliyuncs.com";
        // この例では、環境変数から AccessKey ID と AccessKey Secret を取得します。
        private static final String accessKeyId = System.getenv("ALIBABA_CLOUD_ACCESS_KEY_ID");
        private static final String accessKeySecret = System.getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET");
        // プロジェクトの名前。値を実際のプロジェクト名に置き換えてください。
        private static final String project = "ali-project-test";
        // Logstore の名前。値を実際の Logstore 名に置き換えてください。
        private static final String logStore = "test-logstore";
    
        public static void main(String[] args) throws Exception {
            // processorName パラメーターは、コンシューマープロセッサーの ID を指定します。
            String processorName = "processor-test";
            // Simple Log Service クライアントを作成します。
            Client client = new Client(endpoint, accessKeyId, accessKeySecret);
            // Logstore のシャードをクエリします。
            ListShardResponse resp = client.ListShard(project, logStore);        System.out.printf("%s has %d shards\n", logStore, resp.GetShards().size());
            Map<Integer, String> cursorMap = new HashMap<>();
            for (Shard shard : resp.GetShards()) {
                int shardId = shard.getShardId();
                // 最初から消費を開始し、カーソルを取得します。最後から消費を開始するには、Consts.CursorMode.END を使用します。
                cursorMap.put(shardId, client.GetCursor(project, logStore, shardId, Consts.CursorMode.BEGIN).GetCursor());
            }
            try {
                while (true) {
                    // 各シャードからログを取得します。
                    for (Shard shard : resp.GetShards()) {
                        int shardId = shard.getShardId();
                        PullLogsRequest request = new PullLogsRequest(project, logStore, shardId, 1000, cursorMap.get(shardId));
                        // processorName パラメーターは、コンシューマープロセッサーの ID を指定します。
                        request.setProcessor(processorName);
                        PullLogsResponse response = client.pullLogs(request);
                        // ログはロググループに保存されます。ビジネスロジックに基づいてログを分割できます。
                        List<LogGroupData> logGroups = response.getLogGroups();
                        System.out.printf("Get %d logGroup from logstore:%s:\tShard:%d\n", logGroups.size(), logStore, shardId);
    
                        // プルしたログを処理した後、カーソルを移動します。
                        cursorMap.put(shardId, response.getNextCursor());
                    }
                }
            } catch (LogException e) {
                System.out.println("error code :" + e.GetErrorCode());
                System.out.println("error message :" + e.GetErrorMessage());
                throw e;
            }
        }
    }
  3. main 関数を実行し、出力を表示します。

    Get 41 logGroup from logstore:test-logstore:	Shard:0
    Get 49 logGroup from logstore:test-logstore:	Shard:1
    Get 43 logGroup from logstore:test-logstore:	Shard:0
    Get 39 logGroup from logstore:test-logstore:	Shard:1
    ... ...

Python

  1. Simple Log Service SDK をインストールします。spl_demo という名前のプロジェクトフォルダを作成し、そのフォルダで次のコマンドを実行します。詳細については、「Simple Log Service SDK for Python のインストール」をご参照ください。

    Simple Log Service SDK for Python のバージョンは 0.9.28 以降である必要があります。
    pip install -U aliyun-log-python-sdk
  2. spl_demo フォルダに、main.py という名前のファイルを作成します。このファイルは、使用者グループを作成し、コンシューマースレッドを開始して、指定された Logstore からデータを消費します。

    # encoding: utf-8
    
    import time
    import os
    from aliyun.log import *
    
    def main():
        # Simple Log Service のサービスエンドポイント。この例では、中国 (杭州) リージョンのエンドポイントを使用します。エンドポイントを実際のリージョンのものに置き換えてください。
        endpoint = 'cn-hangzhou.log.aliyuncs.com'
        # この例では、環境変数から AccessKey ID と AccessKey Secret を取得します。
        access_key_id = os.environ.get('ALIBABA_CLOUD_ACCESS_KEY_ID', '')
        access_key = os.environ.get('ALIBABA_CLOUD_ACCESS_KEY_SECRET', '')
        # プロジェクトの名前。値を実際のプロジェクト名に置き換えてください。
        project_name = 'ali-project-test'
        # Logstore の名前。値を実際の Logstore 名に置き換えてください。
        logstore_name = 'test-logstore'
        # processor パラメーターは、コンシューマープロセッサーの ID を指定します。
        processor = "processor-test"
        init_cursor = 'end'
        log_group_count = 10
    
        # Simple Log Service クライアントを作成します。
        client = LogClient(endpoint, access_key_id, access_key)
    
        cursor_map = {}
        # Logstore のシャードをリストします。
        res = client.list_shards(project_name, logstore_name)
        res.log_print()
        shards = res.shards
    
        # 初期カーソルを取得します。
        for shard in shards:
            shard_id = shard.get('shardID')
            res = client.get_cursor(project_name, logstore_name, shard_id, init_cursor)
            cursor_map[shard_id] = res.get_cursor()
    
        # ループで各シャードからデータを読み取ります。
        while True:
            for shard in shards:
                shard_id = shard.get('shardID')
                res = client.pull_logs(project_name, logstore_name, shard_id, cursor_map.get(shard_id), log_group_count,
                                       processor=processor)
                res.log_print()
                if cursor_map[shard_id] == res.next_cursor:
                    # デバッグ専用
                    time.sleep(3)
                else:
                    cursor_map[shard_id] = res.next_cursor
    
    
    if __name__ == '__main__':
        main()
  3. main 関数を実行し、出力を表示します。

    ListShardResponse:
    headers: {'Server': 'AliyunSLS', 'Content-Type': 'application/json', 'Content-Length': '335', 'Connection': 'keep-alive', 'Access-Control-Allow-Origin': '*', 'Date': 'Wed, 26 Feb 2025 09:46:17 GMT', 'x-log-time': '1740563177', 'x-log-requestid': '67BEE2E9132069E22A1F967D'}
    res: [{'shardID': 0, 'status': 'readwrite', 'inclusiveBeginKey': '00000000000000000000000000000000', 'exclusiveEndKey': '80000000000000000000000000000000', 'createTime': 1737010019}, {'shardID': 1, 'status': 'readwrite', 'inclusiveBeginKey': '80000000000000000000000000000000', 'exclusiveEndKey': 'ffffffffffffffffffffffffffffffff', 'createTime': 1737010019}]
    PullLogResponse
    next_cursor MTczNz********c3ODgyMjQ0MQ==
    log_count 0
    headers: {'Server': 'AliyunSLS', 'Content-Type': 'application/x-protobuf', 'Content-Length': '1', 'Connection': 'keep-alive', 'Access-Control-Allow-Origin': '*', 'Date': 'Wed, 26 Feb 2025 09:46:17 GMT', 'x-log-cursor-time': '0', 'x-log-end-of-cursor': '1', 'x-log-failedlines': '0', 'x-log-rawdatacount': '0', 'x-log-rawdatalines': '0', 'x-log-rawdatasize': '0', 'x-log-read-last-cursor': '0', 'x-log-resultlines': '0', 'x-log-time': '1740563177', 'x-log-bodyrawsize': '0', 'x-log-compresstype': 'gzip', 'x-log-count': '0', 'x-log-cursor': 'MTczNzAx********ODgyMjQ0MQ==', 'x-log-requestid': '67BEE2E974CA9ABCE7DDC7D6'}
    detail: []
    PullLogResponse
    next_cursor MTczNz********c3OTg5NzE3NA==
    log_count 0
    headers: {'Server': 'AliyunSLS', 'Content-Type': 'application/x-protobuf', 'Content-Length': '1', 'Connection': 'keep-alive', 'Access-Control-Allow-Origin': '*', 'Date': 'Wed, 26 Feb 2025 09:46:21 GMT', 'x-log-cursor-time': '0', 'x-log-end-of-cursor': '1', 'x-log-failedlines': '0', 'x-log-rawdatacount': '0', 'x-log-rawdatalines': '0', 'x-log-rawdatasize': '0', 'x-log-read-last-cursor': '0', 'x-log-resultlines': '0', 'x-log-time': '1740563181', 'x-log-bodyrawsize': '0', 'x-log-compresstype': 'gzip', 'x-log-count': '0', 'x-log-cursor': 'MTczNzAx********OTg5NzE3NA==', 'x-log-requestid': '67BEE2EDF2B58CF1756526EF'}
    detail: []
    PullLogResponse
    ... ...

Go

  1. Simple Log Service SDK をインストールします。spl_demo という名前のプロジェクトフォルダを作成し、そのフォルダで次のコマンドを実行します。詳細については、「Go SDK のインストール」をご参照ください。

    Simple Log Service SDK for Go のバージョンは v0.1.107 以降である必要があります。
    go get -u github.com/aliyun/aliyun-log-go-sdk
  2. spl_demo フォルダに、main.go という名前のファイルを作成します。このファイルは、使用者グループを作成し、コンシューマースレッドを開始して、指定された Logstore からデータを消費します。

    package main
    
    import (
    	"fmt"
    	"os"
    	"time"
    
    	sls "github.com/aliyun/aliyun-log-go-sdk"
    )
    
    func main() {
    	client := &sls.Client{
    		AccessKeyID:     os.Getenv("ALIBABA_CLOUD_ACCESS_KEY_ID"),
    		AccessKeySecret: os.Getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET"),
    		Endpoint:        "cn-chengdu.log.aliyuncs.com",
    	}
    
    	project := "ali-project-test"
    	logstore := "test-logstore"
    	initCursor := "end"
        // consumeProcessor パラメーターは、コンシューマープロセッサーの ID を指定します。
    	consumeProcessor := "ali-test-consume-processor"
    	shards, err := client.ListShards(project, logstore)
    	if err != nil {
    		fmt.Println("ListShards error", err)
    		return
    	}
    
    	shardCursorMap := map[int]string{}
    	for _, shard := range shards {
    		cursor, err := client.GetCursor(project, logstore, shard.ShardID, initCursor)
    		if err != nil {
    			fmt.Println("GetCursor error", shard.ShardID, err)
    			return
    		}
    		shardCursorMap[shard.ShardID] = cursor
    	}
    
    	for {
    		for _, shard := range shards {
    			pullLogRequest := &sls.PullLogRequest{
    				Project:          project,
    				Logstore:         logstore,
    				ShardID:          shard.ShardID,
    				LogGroupMaxCount: 10,
    				Processor:        consumeProcessor,
    				Cursor:           shardCursorMap[shard.ShardID],
    			  }
    			lg, nextCursor, err := client.PullLogsV2(pullLogRequest)
    			fmt.Println("shard: ", shard.ShardID, "loggroups: ", len(lg.LogGroups), "nextCursor: ", nextCursor)
    			if err != nil {
    				fmt.Println("PullLogsV2 error", shard.ShardID, err)
    				return
    			}
    			shardCursorMap[shard.ShardID] = nextCursor
    			if len(lg.LogGroups) == 0 {
    				// デバッグ専用
    				time.Sleep(time.Duration(3) * time.Second)
    			}
    		}
    	}
    }
  3. main 関数を実行し、出力を表示します。

    shard:  0 loggroups:  41 nextCursor:  MTY5Mz*******TIxNjcxMDcwMQ==
    shard:  1 loggroups:  49 nextCursor:  MTY5Mz*******DYwNDIyNDQ2Mw==
    shard:  0 loggroups:  43 nextCursor:  MTY5Mz*******TIxNjcxMDcwMQ==
    shard:  1 loggroups:  39 nextCursor:  MTY5Mz*******DYwNDIyNDQ2Mw==
    ... ...