すべてのプロダクト
Search
ドキュメントセンター

Simple Log Service:ログを SIEM に転送する

最終更新日:Mar 26, 2026

ログをセキュリティ情報およびイベント管理 (SIEM) システムに転送するには、SLS をご利用の SIEM に接続するアプリケーションをデプロイします。このアプリケーションは、SLS コンシューマーグループを使用してログをプルし、Splunk HEC または Syslog を使用してご利用の SIEM に転送します。これにより、クラウドログをオンプレミスのセキュリティ分析プラットフォームと統合できます。

背景情報

多くの企業では、オンプレミスデータセンターに Splunk や QRadar などのセキュリティ情報およびイベント管理 (SIEM) プラットフォームをデプロイしています。セキュリティを維持するため、これらのプラットフォームは通常、データ受信用のパブリックエンドポイントを公開しません。ビジネスをクラウドに移行する際、統合されたモニタリング、監査、脅威分析のために、クラウドリソースからのログをオンプレミスの SIEM に集約する必要があります。既存システムのセキュリティを損なうことなく、SLS からオンプレミスの SIEM への安全なログ配布パイプラインを確立する必要があります。

仕組み

リアルタイムのデータ転送には、SLS コンシューマーグループを使用します。専用のアプリケーションが SLS からログをプルし、Splunk HTTP Event Collector (HEC) または TCP/TLS 経由の Syslog を使用してご利用の SIEM に転送します。

image

コアロジック

  1. ログのプル:コンシューマーグループをベースにしたアプリケーションが SLS からデータをプルします。このメカニズムは、同時消費とフェールオーバーをサポートします。

    • 同時実行性とスループット

      • より高いスループットを実現するには、コンシューマーアプリケーションの複数のインスタンスを実行します。各コンシューマーインスタンスは同じコンシューマーグループに属し、例えばプロセス ID をサフィックスとして使用するなどして、一意の名前を持つ必要があります。

      • 一度に 1 つのシャードを処理できるコンシューマーは 1 つだけです。したがって、同時コンシューマーの最大数は Logstore のシャード数によって制限されます。例えば、Logstore に 10 個のシャードがある場合、最大 10 個のコンシューマーを並列で実行できます。

      • 理想的なネットワーク条件下では:

        • 単一のコンシューマー (単一 CPU コアの約 20% を使用) は、10 MB/s のレートで生ログを消費できます。

        • 10 個のコンシューマーは、最大 100 MB/s の生ログを処理できます。

    • 高可用性

      • コンシューマーグループは、各コンシューマーの進捗をサーバー上のチェックポイントとして保存します。

      • コンシューマーインスタンスに障害が発生した場合、別の利用可能なインスタンスが自動的に割り当てられたシャードを引き継ぎ、最後に保存されたチェックポイントから処理を再開します。堅牢なフェールオーバーを確保するために、コンシューマーインスタンスを異なるマシンで実行できます。

      • シャード数よりも多くのコンシューマーインスタンスを実行できます。余分なインスタンスは、即時のフェールオーバーのためのスタンバイとして機能します。

  2. データ転送:ログをプルした後、アプリケーションは設定に基づいてログをフォーマットし、オンプレミスの SIEM に転送します。

前提条件

  • RAM ユーザーの作成と権限付与:RAM ユーザーには AliyunLogFullAccess ポリシーが必要です。

  • ネットワーク要件:アプリケーションを実行するマシンは、SLS エンドポイントにアクセスでき、SIEM と同じネットワーク上にある必要があります。

    • エンドポイントの取得方法:

      1. SLS コンソールにログインします。プロジェクトリストで、対象のプロジェクトをクリックします。

      2. プロジェクト名の右側にある image アイコンをクリックして、プロジェクトの概要ページに移動します。

      3. [Endpoint] セクションで、パブリックエンドポイントをコピーします。エンドポイントは https:// + パブリックエンドポイント です。

  • 環境要件:Python 3 実行環境を準備し、SLS Python SDK をインストールします。

    1. SLS Python SDK のインストール:pip install -U aliyun-log-python-sdk

    2. インストールの確認:pip show aliyun-log-python-sdk。インストールが成功すると、以下のような情報が返されます。

      Name: aliyun-log-python-sdk
      Version: 0.9.12
      Summary: Aliyun log service Python client SDK
      Home-page: https://github.com/aliyun/aliyun-log-python-sdk
      Author: Aliyun

操作手順

ステップ 1:アプリケーションの準備

SLS は、Splunk HEC と Syslog の 2 つの転送方法のサンプルスクリプトを提供しています。ご利用の SIEM に一致する方法を選択し、対応するスクリプトを設定します。

  • Splunk HEC: HTTP イベントコレクター (HEC) は、HTTP 経由で Splunk にさまざまなフォーマットのデータを安全かつ効率的に送信できるトークンベースの仕組みです。

  • Syslog:ほとんどの SIEM システムと互換性があり、プレーンテキスト形式をサポートする一般的なロギングプロトコルです。

Splunk HEC

ログデータを Splunk に転送するには、提供されている sync_data.py スクリプトを設定します。スクリプトは主に 3 つの部分で構成されています:

  • main() メソッド:メインプログラムの制御ロジック。

  • get_option() メソッド:消費設定オプションを定義します。

    • 基本設定:SLS とコンシューマーグループの接続設定が含まれます。

    • 高度なコンシューマーグループオプション:パフォーマンスチューニングパラメーターが含まれます。必要な場合以外は変更しないでください。

    • SIEM (Splunk) のパラメーターとオプション。

    • 行フィルタリング、列のトリミング、データ正規化などのタスクのために、転送中にデータをフィルタリングまたは変換するための SPL クエリを追加します。例:

      # SPL クエリ
          query = "* | where instance_id in ('instance-1', 'instance-2')"
      # フィルター ルールを持つコンシューマーを作成します。「query」パラメーターが設定に追加されます。
          option = LogHubConfig(endpoint, accessKeyId, accessKey, project, logstore, consumer_group, consumer_name,
                                cursor_position=CursorPosition.SPECIAL_TIMER_CURSOR,
                                cursor_start_time=cursor_start_time,
                                heartbeat_interval=heartbeat_interval,
                                data_fetch_interval=data_fetch_interval,
                                query=query)
  • SyncData(ConsumerProcessorBase) クラス:SLS からデータをフェッチし、Splunk に転送するロジックが含まれています。コード内のコメントを確認し、必要に応じてロジックを調整してください。

完全なスクリプトは以下の通りです:

sync_data.py

# -*- coding: utf-8 -*-
import os
import logging
from logging.handlers import RotatingFileHandler
from aliyun.log.consumer import *
from aliyun.log.pulllog_response import PullLogResponse
from multiprocessing import current_process
import json
import socket
import requests

# 診断とトラブルシューティングのために、ローテーションするログファイルを設定します。
root = logging.getLogger()
handler = RotatingFileHandler("{0}_{1}.log".format(os.path.basename(__file__), current_process().pid), maxBytes=100*1024*1024, backupCount=5)
handler.setFormatter(logging.Formatter(fmt='[%(asctime)s] - [%(threadName)s] - {%(module)s:%(funcName)s:%(lineno)d} %(levelname)s - %(message)s', datefmt='%Y-%m-%d %H:%M:%S'))
root.setLevel(logging.INFO)
root.addHandler(handler)
root.addHandler(logging.StreamHandler())

logger = logging.getLogger(__name__)


class SyncData(ConsumerProcessorBase):
    """
    このコンシューマークラスは、SLS からデータをプルし、Splunk に送信します。
    """
    def __init__(self, splunk_setting=None):
        
        """コンシューマーを初期化し、Splunk への接続性を検証します。"""
        super(SyncData, self).__init__()   # remember to call base's init

        assert splunk_setting, ValueError("You need to configure settings of remote target")
        assert isinstance(splunk_setting, dict), ValueError("The settings should be dict to include necessary address and confidentials.")

        self.option = splunk_setting
        self.timeout = self.option.get("timeout", 120)

        # Splunk への接続性をテストします。
        s = socket.socket()
        s.settimeout(self.timeout)
        s.connect((self.option["host"], self.option['port']))

        self.r = requests.session()
        self.r.max_redirects = 1
        self.r.verify = self.option.get("ssl_verify", True)
        self.r.headers['Authorization'] = "Splunk {}".format(self.option['token'])
        self.url = "{0}://{1}:{2}/services/collector".format("http" if not self.option.get('https') else "https", self.option['host'], self.option['port'])

        self.default_fields = {}
        if self.option.get("sourcetype"):
            self.default_fields['sourcetype'] = self.option.get("sourcetype")
        if self.option.get("source"):
            self.default_fields['source'] = self.option.get("source")
        if self.option.get("index"):
            self.default_fields['index'] = self.option.get("index")

    def process(self, log_groups, check_point_tracker):
        logs = PullLogResponse.loggroups_to_flattern_list(log_groups, time_as_str=True, decode_bytes=True)
        logger.info("Get data from shard {0}, log count: {1}".format(self.shard_id, len(logs)))
        for log in logs:
            # TODO: ログを処理して送信するための独自のロジックに置き換えてください。
            # ログは辞書です。Python 3 の例 (すべての文字列は Unicode である必要があります):
            #    {"__time__": "12312312", "__topic__": "topic", "field1": "value1", "field2": "value2"}
            event = {}
            event.update(self.default_fields)
            event['time'] = log[u'__time__']
            del log['__time__']

            json_topic = {"actiontrail_audit_event": ["event"] }
            topic = log.get("__topic__", "")
            if topic in json_topic:
                try:
                    for field in json_topic[topic]:
                        log[field] = json.loads(log[field])
                except Exception as ex:
                    pass
            event['event'] = json.dumps(log)

            data = json.dumps(event, sort_keys=True)

            try:
                req = self.r.post(self.url, data=data, timeout=self.timeout)
                req.raise_for_status()
            except Exception as err:
                logger.debug("Failed to connect to remote Splunk server ({0}). Exception: {1}".format(self.url, err))
                raise err

                # 必要に応じて、リトライロジックやレポートメカニズムを追加します。

        logger.info("Complete send data to remote")

        self.save_checkpoint(check_point_tracker)


def get_option():
    ##########################
    # 基本設定
    ##########################

    # 環境変数から SLS のパラメーターとオプションをロードします。
    accessKeyId = os.environ.get('SLS_AK_ID', '')
    accessKey = os.environ.get('SLS_AK_KEY', '')
    endpoint = os.environ.get('SLS_ENDPOINT', '')
    project = os.environ.get('SLS_PROJECT', '')
    logstore = os.environ.get('SLS_LOGSTORE', '')
    consumer_group = os.environ.get('SLS_CG', '')

    assert endpoint and accessKeyId and accessKey and project and logstore and consumer_group, \
        ValueError("endpoint/accessKeyId/accessKey/project/logstore/consumer_group cannot be empty")

    ##########################
    # 高度なコンシューマーグループオプション
    ##########################

    # 特に同時消費の場合、コンシューマー名を変更することは推奨されません。
    consumer_name = "{0}-{1}".format(consumer_group, current_process().pid)

    # 消費開始点。このパラメーターは初回実行時にのみ使用されます。
    # 2回目以降の実行は、最後に保存されたチェックポイントから再開されます。
    # 有効な値: "begin"、"end"、または ISO 8601 形式の特定の時刻。
    cursor_start_time = "2018-12-26 0:0:0"

    # ハートビート間隔 (秒)。サーバーが特定のシャードのコンシューマーから 2 * heartbeat_interval 以内にハートビートを受信しない場合、そのコンシューマーをオフラインとみなし、タスクを再割り当てします。ネットワーク状態が悪い場合は、この値を低く設定しないでください。
    heartbeat_interval = 20

    # 最大データフェッチ間隔。新しいデータが急速に生成される場合、このパラメーターを調整する必要はありません。
    data_fetch_interval = 1
    
    # コンシューマーグループ設定オブジェクトを作成します。
    option = LogHubConfig(endpoint, accessKeyId, accessKey, project, logstore, consumer_group, consumer_name,
                          cursor_position=CursorPosition.SPECIAL_TIMER_CURSOR,
                          cursor_start_time=cursor_start_time,
                          heartbeat_interval=heartbeat_interval,
                          data_fetch_interval=data_fetch_interval)
"""
    フィルター ルールを持つコンシューマーを作成するには、次のコードを使用します:
    # カスタム SPL クエリ
    query = "* | where instance_id in ('instance-1', 'instance-2')"
    # フィルター ルールを持つコンシューマーを作成します。「query」パラメーターが設定に追加されます。
    option = LogHubConfig(endpoint, accessKeyId, accessKey, project, logstore, consumer_group, consumer_name,
                          cursor_position=CursorPosition.SPECIAL_TIMER_CURSOR,
                          cursor_start_time=cursor_start_time,
                          heartbeat_interval=heartbeat_interval,
                          data_fetch_interval=data_fetch_interval,
                          query=query)
    """

    # Splunk オプション
    settings = {
                "host": "1.2.3.4",
                "port": 80,
                "token": "a0*****123",
                'https': False,              # オプション、bool
                'timeout': 120,             # オプション、int
                'ssl_verify': True,         # オプション、bool
                "sourcetype": "",            # オプション、sourcetype
                "index": "",                # オプション、index
                "source": "",               # オプション、source
            }

    return  option, settings

# メインプログラムの制御ロジック 
def main():
    option, settings = get_option()

    logger.info("*** start to consume data...")
    worker = ConsumerWorker(SyncData, option, args=(settings,) )
    worker.start(join=True)

if __name__ == '__main__':
    main()

Syslog

Syslog は、RFC 5424RFC 3164 などのプロトコルに基づいたログフォーマット仕様を定義しています。RFC 5424 の使用を推奨します。Syslog は UDP と TCP の両方で転送できますが、TCP の方が信頼性の高いデータ伝送を提供します。RFC 5424 は、TLS を使用した安全なトランスポート層も定義しています。ご利用の SIEM が TCP または TLS チャンネル経由の Syslog をサポートしている場合は、それを使用してください。

Syslog を使用してログデータを SIEM に転送するには、提供されている sync_data.py スクリプトを設定します。スクリプトは主に 3 つの部分で構成されています:

  • main() メソッド:メインプログラムの制御ロジック。

  • get_monitor_option() メソッド:消費設定オプションを定義します。

    • 基本設定:SLS とコンシューマーグループの接続設定が含まれます。

    • 高度なコンシューマーグループオプション:パフォーマンスチューニングパラメーターが含まれます。必要な場合以外は変更しないでください。

    • SIEM Syslog サーバーのパラメーターとオプション。

      • Syslog ファシリティ:ログを生成したプログラムコンポーネント。この例では、デフォルトとして syslogclient.FAC_USER を使用します。

      • Syslog 重大度:メッセージのログレベル。ログの内容に基づいてカスタマイズできます。この例では syslogclient.SEV_INFO を使用します。

      • ご利用の SIEM が TCP または TLS 経由の Syslog をサポートしている場合は、proto パラメーターを TLS に設定し、有効な SSL 証明書へのパスを指定します。

  • SyncData(ConsumerProcessorBase) クラス:SLS からデータをフェッチし、Syslog サーバーに配信するロジックが含まれています。コード内のコメントを確認し、必要に応じてロジックを調整してください。

完全なスクリプトは以下の通りです:

sync_data.py

# -*- coding: utf-8 -*-

import os
import logging
from logging.handlers import RotatingFileHandler
from aliyun.log.consumer import *
from aliyun.log.pulllog_response import PullLogResponse
from multiprocessing import current_process
import aliyun.log.ext.syslogclient as syslogclient
from aliyun.log.ext.syslogclient import SyslogClientRFC5424 as SyslogClient
import six
from datetime import datetime

# 診断とトラブルシューティングのために、ローテーションするログファイルを設定します。
root = logging.getLogger()
handler = RotatingFileHandler("{0}_{1}.log".format(os.path.basename(__file__), current_process().pid), maxBytes=100*1024*1024, backupCount=5)
handler.setFormatter(logging.Formatter(fmt='[%(asctime)s] - [%(threadName)s] - {%(module)s:%(funcName)s:%(lineno)d} %(levelname)s - %(message)s', datefmt='%Y-%m-%d %H:%M:%S'))
root.setLevel(logging.INFO)
root.addHandler(handler)
root.addHandler(logging.StreamHandler())

logger = logging.getLogger(__name__)


class SyncData(ConsumerProcessorBase):
    """
   このコンシューマークラスは、SLS からデータをプルし、Syslog サーバーに送信します。
    """
    def __init__(self, target_setting=None):
        """
        コンシューマーを初期化し、Syslog サーバーへの接続性を検証します。
        """

        super(SyncData, self).__init__()   # remember to call base's init

        assert target_setting, ValueError("You need to configure settings of remote target")
        assert isinstance(target_setting, dict), ValueError("The settings should be dict to include necessary address and confidentials.")

        self.option = target_setting
        self.protocol = self.option['protocol']
        self.timeout = int(self.option.get('timeout', 120))
        self.sep = self.option.get('sep', "||")
        self.host = self.option["host"]
        self.port = int(self.option.get('port', 514))
        self.cert_path=self.option.get('cert_path', None)

        # 接続性をテストします。 
        with SyslogClient(self.host, self.port, proto=self.protocol, timeout=self.timeout, cert_path=self.cert_path) as client:
            pass

    def process(self, log_groups, check_point_tracker):
        logs = PullLogResponse.loggroups_to_flattern_list(log_groups, time_as_str=True, decode_bytes=True)
        logger.info("Get data from shard {0}, log count: {1}".format(self.shard_id, len(logs)))

        try:
            with SyslogClient(self.host, self.port, proto=self.protocol, timeout=self.timeout, cert_path=self.cert_path) as client:
                for log in logs:
                    # TODO: ログを処理して送信するための独自のロジックをここに配置してください。
                    # ログは辞書です。例 (すべての文字列は Unicode である必要があります):
                    #    Python 2: {"__time__": "12312312", "__topic__": "topic", u"field1": u"value1", u"field2": u"value2"}
                    #    Python 3: {"__time__": "12312312", "__topic__": "topic", "field1": "value1", "field2": "value2"}
                
                    timestamp = datetime.fromtimestamp(int(log[u'__time__']))
                    del log['__time__']

                    io = six.StringIO()
                    first = True
                    # 必要に応じてフォーマットを変更できます。この例では、二重縦棒 (||) で区切られた key=value ペアを使用します。
                    for k, v in six.iteritems(log):
                        io.write("{0}{1}={2}".format(self.sep, k, v))

                    data = io.getvalue()
                    # 必要に応じてファシリティや重大度を変更できます。
                    client.log(data, facility=self.option.get("facility", None), severity=self.option.get("severity", None), timestamp=timestamp, program=self.option.get("tag", None), hostname=self.option.get("hostname", None))

        except Exception as err:
            logger.debug("Failed to connect to remote syslog server ({0}). Exception: {1}".format(self.option, err))
            # リトライや通知などのエラー処理ロジックを追加します。
            raise err

        logger.info("Complete send data to remote")

        self.save_checkpoint(check_point_tracker)


def get_monitor_option():
    ##########################
    # 基本設定
    ##########################

    # 環境変数から SLS のパラメーターとオプションをロードします。
    endpoint = os.environ.get('SLS_ENDPOINT', '')
    accessKeyId = os.environ.get('SLS_AK_ID', '')
    accessKey = os.environ.get('SLS_AK_KEY', '')
    project = os.environ.get('SLS_PROJECT', '')
    logstore = os.environ.get('SLS_LOGSTORE', '')
    consumer_group = os.environ.get('SLS_CG', '')

    assert endpoint and accessKeyId and accessKey and project and logstore and consumer_group, \
        ValueError("endpoint/accessKeyId/accessKey/project/logstore/consumer_group cannot be empty")

    ##########################
    # 高度なコンシューマーグループオプション
    ##########################

    # 特に同時消費の場合、コンシューマー名を変更することは推奨されません。
    consumer_name = "{0}-{1}".format(consumer_group, current_process().pid)

    # 消費開始点。このパラメーターは初回実行時にのみ使用されます。
    # 2回目以降の実行は、最後に保存されたチェックポイントから再開されます。
    # 有効な値: "begin"、"end"、または ISO 8601 形式の特定の時刻。
    cursor_start_time = "2019-1-1 0:0:0+8:00"

    # ハートビート間隔 (秒)。サーバーが特定のシャードのコンシューマーから 2 * heartbeat_interval 以内にハートビートを受信しない場合、そのコンシューマーをオフラインとみなし、タスクを再割り当てします。ネットワーク状態が悪い場合は、この値を低く設定しないでください。
    heartbeat_interval = 20

    # 最大データフェッチ間隔。新しいデータが急速に生成される場合、このパラメーターを調整する必要はありません。
    data_fetch_interval = 1

    # コンシューマーグループ設定オブジェクトを作成します。
    option = LogHubConfig(endpoint, accessKeyId, accessKey, project, logstore, consumer_group, consumer_name,
                          cursor_position=CursorPosition.SPECIAL_TIMER_CURSOR,
                          cursor_start_time=cursor_start_time,
                          heartbeat_interval=heartbeat_interval,
                          data_fetch_interval=data_fetch_interval)

    # Syslog サーバーのパラメーターとオプション
    settings = {
                "host": "1.2.3.4", # 必須。
                "port": 514,       # 必須。ポート番号。
                "protocol": "tcp", # 必須。TCP、UDP、または TLS (Python 3 のみ) を指定できます。
                "sep": "||",      # 必須。key=value ペアの区切り文字。デフォルトは '||' です。
                "cert_path": None,  # オプション。TLS 証明書ファイルへのパス。
                "timeout": 120,   # オプション。タイムアウト (秒)。デフォルトは 120 です。
                "facility": syslogclient.FAC_USER,  # オプション。他の syslogclient.FAC_* 値を参照してください。
                "severity": syslogclient.SEV_INFO,  # オプション。他の syslogclient.SEV_* 値を参照してください。
                "hostname": None, # オプション。ホスト名。デフォルトはローカルマシンのホスト名です。
                "tag": None # オプション。タグ。デフォルトはハイフン (-) です。
    }

    return option, settings

# メインプログラムの制御ロジック
def main():
    option, settings = get_monitor_option()

    logger.info("*** start to consume data...")
    worker = ConsumerWorker(SyncData, option, args=(settings,) )
    worker.start(join=True)


if __name__ == '__main__':
    main()

ステップ 2:環境変数の設定

プログラムを設定した後、表のシステム環境変数の設定を実行します。

パラメーター

SLS_ENDPOINT

  1. SLS コンソールにログインし、対象のプロジェクトをクリックします。

  2. プロジェクト名の右側にある image アイコンをクリックして、プロジェクトの概要ページに移動します。

  3. [Endpoint] セクションで、パブリックエンドポイントをコピーします。完全なエンドポイントは https:// + ご利用のパブリックエンドポイント です。

エンドポイントのプレフィックスが https:// の場合、例えば https://cn-beijing.log.aliyuncs.com のように、アプリケーションは自動的に HTTPS を使用して SLS に接続します。サーバー証明書 *.aliyuncs.com は GlobalSign によって発行されており、ほとんどのシステムで信頼されています。ご利用のシステムがこの証明書を信頼しない場合は、証明書をダウンロードし、証明書のインストールの指示に従ってください。

https://cn-beijing.log.aliyuncs.com

SLS_PROJECT

SLS コンソール内の対象プロジェクトの名前。

my-sls-project-one

SLS_LOGSTORE

SLS コンソール内の対象 Logstore の名前。

my-sls-logstore-a1

SLS_AK_ID

RAM ユーザーの AccessKey ID。

重要
  • Alibaba Cloud アカウントの AccessKey ペアは、完全な API アクセス権を付与します。セキュリティを向上させるため、API 呼び出しや日常の操作には RAM ユーザーの AccessKey ペアを使用してください。

  • 認証情報の漏洩によるセキュリティリスクを防ぐため、AccessKey ID と AccessKey Secret をアプリケーションコードにハードコーディングしないでください。

L***ky

SLS_AK_KEY

RAM ユーザーの AccessKey Secret。

x***Xl

SLS_CG

コンシューマーグループの名前。「sync_data」のような単純な名前を使用できます。指定したグループが存在しない場合、アプリケーションは自動的に作成します。

sync_data

ステップ 3:起動と検証

  1. 複数のコンシューマープロセスを開始して、並列処理を有効にします。同時プロセスの最大数は、Logstore のシャード数と同じです。

    # 最初のコンシューマープロセスを開始
    nohup python3 sync_data.py &
    # 2番目のコンシューマープロセスを開始
    nohup python3 sync_data.py &
  2. SLS コンソールでコンシューマーグループのステータスを表示します。

    1. プロジェクト一覧で、対象のプロジェクトをクリックします。「[ログストレージ] > [Logstore]」タブに移動します。対象の Logstore の横にある 展开节点 アイコンをクリックし、次に [データ消費] の横にある 展开节点 アイコンをクリックします。

    2. コンシューマーグループリストで、対象のコンシューマーグループをクリックします。[コンシューマーグループのステータス] タブで、各シャードのコンシューマークライアントと進捗状況を表示します。

よくある質問

ConsumerGroupQuotaExceed エラー

このエラーは、コンシューマーグループのクォータを超えたことを示します。1 つの Logstore には最大 30 のコンシューマーグループしか設定できません。この問題を解決するには、SLS コンソールで未使用のコンシューマーグループを削除してください。