このトピックでは、Search Processing Language (SPL) とソフトウェア開発キット (SDK) を備えたコンシューマープロセッサーを使用してログを消費する方法の例を示します。
前提条件
Resource Access Management (RAM) ユーザーが作成され、必要な権限が付与されています。詳細については、「RAM ユーザーの作成と権限の付与」をご参照ください。
ALIBABA_CLOUD_ACCESS_KEY_ID および ALIBABA_CLOUD_ACCESS_KEY_SECRET 環境変数が設定されています。詳細については、「Linux、macOS、および Windows で環境変数を設定する」をご参照ください。
重要Alibaba Cloud アカウントの AccessKey ペアは、すべての API 操作に対する権限を持っています。RAM ユーザーの AccessKey ペアを使用して API 操作を呼び出すか、日常の O&M を実行することをお勧めします。
プロジェクトコードに AccessKey ID または AccessKey Secret を含めないでください。どちらかが漏洩すると、アカウント内のすべてのリソースのセキュリティが危険にさらされる可能性があります。
コード例
Java
Simple Log Service SDK をインストールします。Java プロジェクトのルートディレクトリで、
pom.xmlファイルを開き、次の Maven 依存関係を追加します。詳細については、「Java SDK のインストール」をご参照ください。Simple Log Service SDK for Java のバージョンは 0.6.126 以降である必要があります。
<dependency> <groupId>com.google.protobuf</groupId> <artifactId>protobuf-java</artifactId> <version>2.5.0</version> </dependency> <!-- Java 用 Simple Log Service SDK をインポートします --> <dependency> <groupId>com.aliyun.openservices</groupId> <artifactId>aliyun-log</artifactId> <version>0.6.126</version> </dependency>PullLogsWithSPLDemo.javaという名前のファイルを作成します。この例では、PullLog 操作を呼び出してログデータを読み取ります。Java SDK を使用して SPL でログデータを消費する方法を示します。import com.aliyun.openservices.log.Client; import com.aliyun.openservices.log.common.*; import com.aliyun.openservices.log.request.PullLogsRequest; import com.aliyun.openservices.log.response.ListShardResponse; import com.aliyun.openservices.log.response.PullLogsResponse; import java.util.HashMap; import java.util.List; import java.util.Map; public class PullLogsWithSPLDemo { // Simple Log Service のサービスエンドポイント。この例では、中国 (杭州) リージョンのエンドポイントを使用します。エンドポイントを実際のリージョンのものに置き換えてください。 private static final String endpoint = "cn-hangzhou.log.aliyuncs.com"; // この例では、環境変数から AccessKey ID と AccessKey Secret を取得します。 private static final String accessKeyId = System.getenv("ALIBABA_CLOUD_ACCESS_KEY_ID"); private static final String accessKeySecret = System.getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET"); // プロジェクトの名前。値を実際のプロジェクト名に置き換えてください。 private static final String project = "ali-project-test"; // Logstore の名前。値を実際の Logstore 名に置き換えてください。 private static final String logStore = "test-logstore"; public static void main(String[] args) throws Exception { // processorName パラメーターは、コンシューマープロセッサーの ID を指定します。 String processorName = "processor-test"; // Simple Log Service クライアントを作成します。 Client client = new Client(endpoint, accessKeyId, accessKeySecret); // Logstore のシャードをクエリします。 ListShardResponse resp = client.ListShard(project, logStore); System.out.printf("%s has %d shards\n", logStore, resp.GetShards().size()); Map<Integer, String> cursorMap = new HashMap<>(); for (Shard shard : resp.GetShards()) { int shardId = shard.getShardId(); // 最初から消費を開始し、カーソルを取得します。最後から消費を開始するには、Consts.CursorMode.END を使用します。 cursorMap.put(shardId, client.GetCursor(project, logStore, shardId, Consts.CursorMode.BEGIN).GetCursor()); } try { while (true) { // 各シャードからログを取得します。 for (Shard shard : resp.GetShards()) { int shardId = shard.getShardId(); PullLogsRequest request = new PullLogsRequest(project, logStore, shardId, 1000, cursorMap.get(shardId)); // processorName パラメーターは、コンシューマープロセッサーの ID を指定します。 request.setProcessor(processorName); PullLogsResponse response = client.pullLogs(request); // ログはロググループに保存されます。ビジネスロジックに基づいてログを分割できます。 List<LogGroupData> logGroups = response.getLogGroups(); System.out.printf("Get %d logGroup from logstore:%s:\tShard:%d\n", logGroups.size(), logStore, shardId); // プルしたログを処理した後、カーソルを移動します。 cursorMap.put(shardId, response.getNextCursor()); } } } catch (LogException e) { System.out.println("error code :" + e.GetErrorCode()); System.out.println("error message :" + e.GetErrorMessage()); throw e; } } }main 関数を実行し、出力を表示します。
Get 41 logGroup from logstore:test-logstore: Shard:0 Get 49 logGroup from logstore:test-logstore: Shard:1 Get 43 logGroup from logstore:test-logstore: Shard:0 Get 39 logGroup from logstore:test-logstore: Shard:1 ... ...
Python
Simple Log Service SDK をインストールします。spl_demo という名前のプロジェクトフォルダを作成し、そのフォルダで次のコマンドを実行します。詳細については、「Simple Log Service SDK for Python のインストール」をご参照ください。
Simple Log Service SDK for Python のバージョンは 0.9.28 以降である必要があります。
pip install -U aliyun-log-python-sdkspl_demo フォルダに、main.py という名前のファイルを作成します。このファイルは、使用者グループを作成し、コンシューマースレッドを開始して、指定された Logstore からデータを消費します。
# encoding: utf-8 import time import os from aliyun.log import * def main(): # Simple Log Service のサービスエンドポイント。この例では、中国 (杭州) リージョンのエンドポイントを使用します。エンドポイントを実際のリージョンのものに置き換えてください。 endpoint = 'cn-hangzhou.log.aliyuncs.com' # この例では、環境変数から AccessKey ID と AccessKey Secret を取得します。 access_key_id = os.environ.get('ALIBABA_CLOUD_ACCESS_KEY_ID', '') access_key = os.environ.get('ALIBABA_CLOUD_ACCESS_KEY_SECRET', '') # プロジェクトの名前。値を実際のプロジェクト名に置き換えてください。 project_name = 'ali-project-test' # Logstore の名前。値を実際の Logstore 名に置き換えてください。 logstore_name = 'test-logstore' # processor パラメーターは、コンシューマープロセッサーの ID を指定します。 processor = "processor-test" init_cursor = 'end' log_group_count = 10 # Simple Log Service クライアントを作成します。 client = LogClient(endpoint, access_key_id, access_key) cursor_map = {} # Logstore のシャードをリストします。 res = client.list_shards(project_name, logstore_name) res.log_print() shards = res.shards # 初期カーソルを取得します。 for shard in shards: shard_id = shard.get('shardID') res = client.get_cursor(project_name, logstore_name, shard_id, init_cursor) cursor_map[shard_id] = res.get_cursor() # ループで各シャードからデータを読み取ります。 while True: for shard in shards: shard_id = shard.get('shardID') res = client.pull_logs(project_name, logstore_name, shard_id, cursor_map.get(shard_id), log_group_count, processor=processor) res.log_print() if cursor_map[shard_id] == res.next_cursor: # デバッグ専用 time.sleep(3) else: cursor_map[shard_id] = res.next_cursor if __name__ == '__main__': main()main 関数を実行し、出力を表示します。
ListShardResponse: headers: {'Server': 'AliyunSLS', 'Content-Type': 'application/json', 'Content-Length': '335', 'Connection': 'keep-alive', 'Access-Control-Allow-Origin': '*', 'Date': 'Wed, 26 Feb 2025 09:46:17 GMT', 'x-log-time': '1740563177', 'x-log-requestid': '67BEE2E9132069E22A1F967D'} res: [{'shardID': 0, 'status': 'readwrite', 'inclusiveBeginKey': '00000000000000000000000000000000', 'exclusiveEndKey': '80000000000000000000000000000000', 'createTime': 1737010019}, {'shardID': 1, 'status': 'readwrite', 'inclusiveBeginKey': '80000000000000000000000000000000', 'exclusiveEndKey': 'ffffffffffffffffffffffffffffffff', 'createTime': 1737010019}] PullLogResponse next_cursor MTczNz********c3ODgyMjQ0MQ== log_count 0 headers: {'Server': 'AliyunSLS', 'Content-Type': 'application/x-protobuf', 'Content-Length': '1', 'Connection': 'keep-alive', 'Access-Control-Allow-Origin': '*', 'Date': 'Wed, 26 Feb 2025 09:46:17 GMT', 'x-log-cursor-time': '0', 'x-log-end-of-cursor': '1', 'x-log-failedlines': '0', 'x-log-rawdatacount': '0', 'x-log-rawdatalines': '0', 'x-log-rawdatasize': '0', 'x-log-read-last-cursor': '0', 'x-log-resultlines': '0', 'x-log-time': '1740563177', 'x-log-bodyrawsize': '0', 'x-log-compresstype': 'gzip', 'x-log-count': '0', 'x-log-cursor': 'MTczNzAx********ODgyMjQ0MQ==', 'x-log-requestid': '67BEE2E974CA9ABCE7DDC7D6'} detail: [] PullLogResponse next_cursor MTczNz********c3OTg5NzE3NA== log_count 0 headers: {'Server': 'AliyunSLS', 'Content-Type': 'application/x-protobuf', 'Content-Length': '1', 'Connection': 'keep-alive', 'Access-Control-Allow-Origin': '*', 'Date': 'Wed, 26 Feb 2025 09:46:21 GMT', 'x-log-cursor-time': '0', 'x-log-end-of-cursor': '1', 'x-log-failedlines': '0', 'x-log-rawdatacount': '0', 'x-log-rawdatalines': '0', 'x-log-rawdatasize': '0', 'x-log-read-last-cursor': '0', 'x-log-resultlines': '0', 'x-log-time': '1740563181', 'x-log-bodyrawsize': '0', 'x-log-compresstype': 'gzip', 'x-log-count': '0', 'x-log-cursor': 'MTczNzAx********OTg5NzE3NA==', 'x-log-requestid': '67BEE2EDF2B58CF1756526EF'} detail: [] PullLogResponse ... ...
Go
Simple Log Service SDK をインストールします。spl_demo という名前のプロジェクトフォルダを作成し、そのフォルダで次のコマンドを実行します。詳細については、「Go SDK のインストール」をご参照ください。
Simple Log Service SDK for Go のバージョンは v0.1.107 以降である必要があります。
go get -u github.com/aliyun/aliyun-log-go-sdkspl_demo フォルダに、main.go という名前のファイルを作成します。このファイルは、使用者グループを作成し、コンシューマースレッドを開始して、指定された Logstore からデータを消費します。
package main import ( "fmt" "os" "time" sls "github.com/aliyun/aliyun-log-go-sdk" ) func main() { client := &sls.Client{ AccessKeyID: os.Getenv("ALIBABA_CLOUD_ACCESS_KEY_ID"), AccessKeySecret: os.Getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET"), Endpoint: "cn-chengdu.log.aliyuncs.com", } project := "ali-project-test" logstore := "test-logstore" initCursor := "end" // consumeProcessor パラメーターは、コンシューマープロセッサーの ID を指定します。 consumeProcessor := "ali-test-consume-processor" shards, err := client.ListShards(project, logstore) if err != nil { fmt.Println("ListShards error", err) return } shardCursorMap := map[int]string{} for _, shard := range shards { cursor, err := client.GetCursor(project, logstore, shard.ShardID, initCursor) if err != nil { fmt.Println("GetCursor error", shard.ShardID, err) return } shardCursorMap[shard.ShardID] = cursor } for { for _, shard := range shards { pullLogRequest := &sls.PullLogRequest{ Project: project, Logstore: logstore, ShardID: shard.ShardID, LogGroupMaxCount: 10, Processor: consumeProcessor, Cursor: shardCursorMap[shard.ShardID], } lg, nextCursor, err := client.PullLogsV2(pullLogRequest) fmt.Println("shard: ", shard.ShardID, "loggroups: ", len(lg.LogGroups), "nextCursor: ", nextCursor) if err != nil { fmt.Println("PullLogsV2 error", shard.ShardID, err) return } shardCursorMap[shard.ShardID] = nextCursor if len(lg.LogGroups) == 0 { // デバッグ専用 time.Sleep(time.Duration(3) * time.Second) } } } }main 関数を実行し、出力を表示します。
shard: 0 loggroups: 41 nextCursor: MTY5Mz*******TIxNjcxMDcwMQ== shard: 1 loggroups: 49 nextCursor: MTY5Mz*******DYwNDIyNDQ2Mw== shard: 0 loggroups: 43 nextCursor: MTY5Mz*******TIxNjcxMDcwMQ== shard: 1 loggroups: 39 nextCursor: MTY5Mz*******DYwNDIyNDQ2Mw== ... ...