すべてのプロダクト
Search
ドキュメントセンター

Simple Log Service:コンシューマープロセッサ (SPL) に基づいてログを使用するコンシューマーグループ

最終更新日:Sep 01, 2025

コンシューマーグループを使用してデータを使用する場合、ロードバランシングやフェールオーバーなどの実装の詳細を処理するのではなく、ビジネスロジックに集中できます。 このトピックでは、Java、Python、または Go 用の Simple Log Service SDK を使用して、コンシューマープロセッサで構成されたコンシューマーグループを使用してログストアからログを使用する方法について説明します。

前提条件

  • Resource Access Management (RAM) ユーザーが作成され、必要な権限が付与されます。 詳細については、「RAM ユーザーを作成して権限を付与する」をご参照ください。

  • ALIBABA_CLOUD_ACCESS_KEY_ID および ALIBABA_CLOUD_ACCESS_KEY_SECRET 環境変数が構成されています。 詳細については、「Linux、macOS、および Windows で環境変数を構成する」をご参照ください。

    重要
    • Alibaba Cloud アカウントの AccessKey ペアには、すべての API 操作に対する権限があります。 API 操作を呼び出したり、日常の O&M を実行したりするには、RAM ユーザーの AccessKey ペアを使用することをお勧めします。

    • プロジェクトコードに AccessKey ID または AccessKey シークレットを含めないでください。 いずれかが漏洩した場合、アカウント内のすべてのリソースのセキュリティが侵害される可能性があります。

  • コンシューマープロセッサを作成する

サンプルコード

Java

  1. Maven 依存関係を追加します。

    com.aliyun.openservices パッケージはバージョン 0.6.51 以降である必要があります。
    <dependency>
      <groupId>com.google.protobuf</groupId>
      <artifactId>protobuf-java</artifactId>
      <version>2.5.0</version>
    </dependency>
    <dependency>
      <groupId>com.aliyun.openservices</groupId>
      <artifactId>loghub-client-lib</artifactId>
      <version>0.6.51</version>
    </dependency>
  2. データ消費ロジックを作成します。 次のコードは、SPLLogHubProcessor.java の例です。

    // com.aliyun.openservices.log.common.FastLog をインポートします
    // com.aliyun.openservices.log.common.FastLogContent をインポートします
    // com.aliyun.openservices.log.common.FastLogGroup をインポートします
    // com.aliyun.openservices.log.common.FastLogTag をインポートします
    // com.aliyun.openservices.log.common.LogGroupData をインポートします
    // com.aliyun.openservices.loghub.client.ILogHubCheckPointTracker をインポートします
    // com.aliyun.openservices.loghub.client.exceptions.LogHubCheckPointException をインポートします
    // com.aliyun.openservices.loghub.client.interfaces.ILogHubProcessor をインポートします
    
    import java.util.List;
    
    public class SPLLogHubProcessor implements ILogHubProcessor {
        private int shardId;
        // チェックポイントが最後に永続化された時刻。
        private long mLastSaveTime = 0;
    
        // プロセッサオブジェクトが初期化されるときに、initialize メソッドが 1 回呼び出されます。
        public void initialize(int shardId) {
            this.shardId = shardId;
        }
    
        // データ消費のメインロジック。 消費中に発生するすべての例外を処理します。 例外を直接スローしないでください。
        public String process(List<LogGroupData> logGroups, ILogHubCheckPointTracker checkPointTracker) {
            // 取得したデータを出力します。
            for (LogGroupData logGroup : logGroups) {
                FastLogGroup fastLogGroup = logGroup.GetFastLogGroup();
                System.out.println("タグ");
                for (int i = 0; i < fastLogGroup.getLogTagsCount(); ++i) {
                    FastLogTag logTag = fastLogGroup.getLogTags(i);
                    System.out.printf("%s : %s\n", logTag.getKey(), logTag.getValue());
                }
                for (int i = 0; i < fastLogGroup.getLogsCount(); ++i) {
                    FastLog log = fastLogGroup.getLogs(i);
                    System.out.println("--------\nログ: " + i + ", 時間: " + log.getTime() + ", GetContentCount: " + log.getContentsCount());
                    for (int j = 0; j < log.getContentsCount(); ++j) {
                        FastLogContent content = log.getContents(j);
                        System.out.println(content.getKey() + "\t:\t" + content.getValue());
                    }
                }
            }
            long curTime = System.currentTimeMillis();
            // 30 秒ごとにチェックポイントをサーバーに書き込みます。 ワーカーが 30 秒以内に予期せず終了した場合、新しいワーカーは最後のチェックポイントからデータの消費を開始します。 少量のデータが繰り返し消費される可能性があります。
            try {
                if (curTime - mLastSaveTime > 30 * 1000) {
                    // true パラメータは、チェックポイントがサーバーにすぐに更新されることを示します。 デフォルトでは、メモリにキャッシュされたチェックポイントは 60 秒ごとにサーバーに自動的に更新されます。
                    checkPointTracker.saveCheckPoint(true);
                    mLastSaveTime = curTime;
                } else {
                    // false パラメータは、チェックポイントがローカルにキャッシュされることを示します。 チェックポイントは、自動更新メカニズムによってサーバーに更新できます。
                    checkPointTracker.saveCheckPoint(false);
                }
            } catch (LogHubCheckPointException e) {
                e.printStackTrace();
            }
            return null;
        }
    
        // この関数は、ワーカーが終了するときに呼び出されます。 ここでクリーンアップタスクを実行できます。
        public void shutdown(ILogHubCheckPointTracker checkPointTracker) {
            // チェックポイントをサーバーにすぐに保存します。
            try {
                checkPointTracker.saveCheckPoint(true);
            } catch (LogHubCheckPointException e) {
                e.printStackTrace();
            }
        }
    }
  3. コンシューマーエンティティを作成します。 次のコードは、SPLLogHubProcessorFactory.java の例です。

    // com.aliyun.openservices.loghub.client.interfaces.ILogHubProcessor をインポートします
    // com.aliyun.openservices.loghub.client.interfaces.ILogHubProcessorFactory をインポートします
    
    class SPLLogHubProcessorFactory implements ILogHubProcessorFactory {
        public ILogHubProcessor generatorProcessor() {
            // コンシューマーインスタンスを生成します。 注: generatorProcessor メソッドが呼び出されるたびに、新しい SPLLogHubProcessor オブジェクトを返す必要があります。
            return new SPLLogHubProcessor();
        }
    }
  4. コンシューマーを作成し、コンシューマースレッドを開始します。 コンシューマーは、指定されたログストアからデータを使用します。

    // com.aliyun.openservices.loghub.client.ClientWorker をインポートします
    // com.aliyun.openservices.loghub.client.config.LogHubConfig をインポートします
    // com.aliyun.openservices.loghub.client.exceptions.LogHubClientWorkerException をインポートします
    
    public class Main {
        // Simple Log Service エンドポイント。 値を実際のエンドポイントに置き換えます。
        private static String Endpoint = "cn-hangzhou.log.aliyuncs.com";
        // Simple Log Service プロジェクトの名前。 値を実際のプロジェクト名に置き換えます。 既存のプロジェクトからプロジェクト名を取得します。
        private static String Project = "ali-test-project";
        // ログストアの名前。 値を実際のログストア名に置き換えます。 既存のログストアからログストア名を取得します。
        private static String Logstore = "ali-test-logstore";
        // コンシューマーグループの名前。 値を実際のコンシューマーグループ名に置き換えます。 コンシューマーグループを事前に作成する必要はありません。 プログラムは実行時に自動的に作成します。
        private static String ConsumerGroup = "ali-test-consumergroup2";
    
        // 新しいコンテンツ
        // コンシューマープロセッサの名前。 コンソールで、またはコンシューマープロセッサ API を呼び出すことによって、プロセッサを作成します。
        private static String ConsumeProcessor = "test-consumer-processor";
        // 新しいコンテンツの終わり
        // この例では、環境変数から AccessKey ID と AccessKey シークレットを取得します。
        private static String AccessKeyId = System.getenv("ALIBABA_CLOUD_ACCESS_KEY_ID");
        private static String AccessKeySecret = System.getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET");
    
    
        public static void main(String[] args) throws LogHubClientWorkerException, InterruptedException {
            // consumer_1 はコンシューマー名です。 同じコンシューマーグループ内のコンシューマーの名前は一意である必要があります。 異なるコンシューマーが異なるマシンで複数のプロセスを開始して、Logstore からバランスの取れた方法でデータを使用する場合、マシンの IP アドレスをコンシューマー名として使用できます。
            // maxFetchLogGroupSize は、サーバーから一度に取得できるロググループの最大数を指定します。 デフォルト値を使用できます。 値を変更するには、config.setMaxFetchLogGroupSize(100); を使用します。 値は (0, 1000] の範囲内である必要があります。
            LogHubConfig config = new LogHubConfig(ConsumerGroup, "consumer_1", Endpoint, Project, Logstore, AccessKeyId, AccessKeySecret, LogHubConfig.ConsumePosition.BEGIN_CURSOR, 1000);
            // setProcessor は、データ消費用のコンシューマープロセッサを指定します。
            config.setProcessor(ConsumeProcessor);
            ClientWorker worker = new ClientWorker(new SPLLogHubProcessorFactory(), config);
            Thread thread = new Thread(worker);
            // スレッドが実行された後、ClientWorker が自動的に実行されます。 ClientWorker は Runnable インターフェースを拡張します。
            thread.start();
            Thread.sleep(60 * 60 * 1000);
            // ワーカーの shutdown 関数を呼び出して、コンシューマーインスタンスを終了します。 関連付けられたスレッドも自動的に停止します。
            worker.shutdown();
            // ClientWorker の実行中に複数の非同期タスクが生成されます。 シャットダウンが完了したら、実行中のタスクが安全に終了するまで待ちます。 スリープ時間を 30 秒に設定することをお勧めします。
            Thread.sleep(30 * 1000);
        }
    }
  5. Main.java を実行します。 この例では、NGINX ログを使用します。 次のログが出力されます。

    :    GET
    request_uri    :    /request/path-3/file-7
    status    :    200
    body_bytes_sent    :    3820
    host    :    www.example.com
    request_time    :    43
    request_length    :    1987
    http_user_agent    :    Mozilla/5.0 (Windows NT 6.1) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/41.0.2228.0 Safari/537.36
    http_referer    :    www.example.com
    http_x_forwarded_for    :    192.168.xx.xxx
    upstream_response_time    :    0.02
    --------
    ログ: 158, 時間: 1635629778, GetContentCount: 14
    ......
        category    :    null
        source    :    127.0.0.1
        topic    :    nginx_access_log
        machineUUID    :    null
    タグ
        __receive_time__    :    1635629815
    --------
    ログ: 0, 時間: 1635629788, GetContentCount: 14
    ......
        category    :    null
        source    :    127.0.0.1
        topic    :    nginx_access_log

Python

  1. Python 用の Simple Log Service SDK をインストールします。 spl_consumer_demo という名前のプロジェクトフォルダを作成し、そのフォルダで次のコマンドを実行します。 詳細については、「Python 用の Simple Log Service SDK をインストールする」をご参照ください。

    Python 用の Simple Log Service SDK は、バージョン 0.9.28 以降である必要があります。
    pip install -U aliyun-log-python-sdk
  2. spl_consumer_demo フォルダに、main.py ファイルを作成します。 コンシューマーグループを作成し、コンシューマースレッドを開始します。 コンシューマーは、指定されたログストアからデータを使用します。

    # os をインポートします
    # time をインポートします
    
    # aliyun.log.consumer から * をインポートします
    # aliyun.log から * をインポートします
    
    
    class SPLConsumer(ConsumerProcessorBase):
        shard_id = -1
        last_check_time = 0
    
        def initialize(self, shard):
            self.shard_id = shard
    
        def process(self, log_groups, check_point_tracker):
            for log_group in log_groups.LogGroups:
                items = []
                for log in log_group.Logs:
                    item = dict()
                    item['time'] = log.Time
                    for content in log.Contents:
                        item[content.Key] = content.Value
                    items.append(item)
                log_items = dict()
                log_items['topic'] = log_group.Topic
                log_items['source'] = log_group.Source
                log_items['logs'] = items
    
                print(log_items)
    
            current_time = time.time()
            if current_time - self.last_check_time > 3:
                try:
                    self.last_check_time = current_time
                    check_point_tracker.save_check_point(True)
                except Exception:
                    import traceback
                    traceback.print_exc()
            else:
                try:
                    check_point_tracker.save_check_point(False)
                except Exception:
                    import traceback
                    traceback.print_exc()
    
            # None は正常なプロセスを意味します
            # 前のチェックポイントにロールバックする必要がある場合は、check_point_tracker.get_check_point() を返します
            return None
    
        def shutdown(self, check_point_tracker):
            try:
                check_point_tracker.save_check_point(True)
            except Exception:
                import traceback
                traceback.print_exc()
    
    
    def sleep_until(seconds, exit_condition=None, expect_error=False):
        if not exit_condition:
            time.sleep(seconds)
            return
    
        s = time.time()
        while time.time() - s < seconds:
            try:
                if exit_condition():
                    break
            except Exception:
                if expect_error:
                    continue
            time.sleep(1)
    
    def spl_consumer_group():
        # Simple Log Service エンドポイント。 この例では、中国 (杭州) リージョンのエンドポイントを使用しています。 値を実際のエンドポイントに置き換えます。
        endpoint = os.environ.get('ALIYUN_LOG_SAMPLE_ENDPOINT', 'cn-hangzhou.log.aliyuncs.com')
    
        # この例では、環境変数から AccessKey ID と AccessKey シークレットを取得します。
        access_key_id = os.environ.get('ALIBABA_CLOUD_ACCESS_KEY_ID', '')
        access_key = os.environ.get('ALIBABA_CLOUD_ACCESS_KEY_SECRET', '')
    
        project = 'your_project'  # プロジェクト名
        logstore = 'your_logstore' # ログストア名
    
        # コンシューマーグループの名前。 コンシューマーグループを事前に作成する必要はありません。 SDK は自動的に作成します。
        consumer_group = 'consumer-group'
        consumer_name = "consumer-group-name"
    
        # コンシューマーグループに 2 つのコンシューマーを作成してデータを使用します。
        option = LogHubConfig(endpoint,
                              access_key_id,
                              access_key,
                              project,
                              logstore,
                              consumer_group,
                              consumer_name,
                              processor="test-consume-processor",
                              cursor_position=CursorPosition.BEGIN_CURSOR,
                              heartbeat_interval=6,
                              data_fetch_interval=1)
    
        print("*** データの消費を開始しています...")
        client_worker = ConsumerWorker(SPLConsumer, consumer_option=option)
        client_worker.start()
    
        time.sleep(10000)
    
    
    if __name__ == '__main__':
        spl_consumer_group()
  3. spl_consumer_demo フォルダで main.py を実行し、結果を表示します。

    python main.py

Go

  1. Go SDK をインストールします。 spl_demo という名前のプロジェクトフォルダを作成し、そのフォルダで次のコマンドを実行します。 詳細については、「Go SDK をインストールする」をご参照ください。

    Go 用の Simple Log Service SDK は、バージョン v0.1.107 以降である必要があります。
    go get -u github.com/aliyun/aliyun-log-go-sdk
  2. spl_demo フォルダに、main.go ファイルを作成します。 コンシューマーグループを作成し、コンシューマースレッドを開始します。 コンシューマーは、指定されたログストアからデータを使用します。

    package main
    
    import (
    	"fmt"
    	"os"
    	"os/signal"
    	"syscall"
    
    	sls "github.com/aliyun/aliyun-log-go-sdk"
    	consumerLibrary "github.com/aliyun/aliyun-log-go-sdk/consumer"
    	"github.com/go-kit/kit/log/level"
    )
    
    // README:
    // これは、ログストアからデータをプルして出力して使用するための非常に簡単な例であり、ログの前処理を含みます。
    
    func main() {
    	// Simple Log Service エンドポイント。 この例では、中国 (杭州) リージョンのエンドポイントを使用しています。 値を実際のエンドポイントに置き換えます。
    	option := consumerLibrary.LogHubConfig{
    		Endpoint:          "cn-hangzhou.log.aliyuncs.com",
    		AccessKeyID:       os.Getenv("ALIBABA_CLOUD_ACCESS_KEY_ID"),
    		AccessKeySecret:   os.Getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET"),
    		Project:           "your_project", // プロジェクト名
    		Logstore:          "your_logstore", // ログストア名
    		ConsumerGroupName: "test-spl-cg",
    		ConsumerName:      "test-spl-consumer",
    		// このオプションは初期化に使用され、コンシューマーグループが作成されて各シャードの消費が開始されると無視されます。
    		// "begin"、"end"、"タイムスタンプ形式の特定の時間" のいずれかになります。ログの受信時間です。
    		CursorPosition: consumerLibrary.END_CURSOR,
    		// プロセッサは、ログがクライアントに返される前にログを前処理するためのものです。 詳細については、https://www.alibabacloud.com/help/en/sls/user-guide/rule-based-consumption をご参照ください。
    		Processor: "test-consume-processor",
    	}
    
    	consumerWorker := consumerLibrary.InitConsumerWorkerWithCheckpointTracker(option, process)
    	ch := make(chan os.Signal, 1)
    	signal.Notify(ch, syscall.SIGINT, syscall.SIGTERM)
    	consumerWorker.Start()
    	if _, ok := <-ch; ok {
    		level.Info(consumerWorker.Logger).Log("msg", "停止シグナルを受信しました。コンシューマーワーカーの停止を開始します", "consumer worker name", option.ConsumerName)
    		consumerWorker.StopAndWait()
    	}
    }
    
    // ここに消費ロジックを入力し、関数のパラメータと戻り値を変更しないように注意してください。
    // そうしないと、エラーが報告されます。
    func process(shardId int, logGroupList *sls.LogGroupList, checkpointTracker consumerLibrary.CheckPointTracker) (string, error) {
    	fmt.Println(shardId, "loggroup", len(logGroupList.LogGroups))
    	checkpointTracker.SaveCheckPoint(false)
    	return "", nil
    }
  3. spl_demo フォルダで次のコマンドを実行して、依存関係をインストールします。

    go mod tidy
    go mod vendor
  4. main 関数を実行し、出力を表示します。

    go run main.go