すべてのプロダクト
Search
ドキュメントセンター

Simple Log Service:オンプレミスの SIEM にログを転送する

最終更新日:Sep 21, 2025

Simple Log Service (SLS) から、Splunk や QRadar などのオンプレミスのセキュリティ情報およびイベント管理 (SIEM) プラットフォームにログを転送します。これにより、クラウドログを既存のオンプレミスのセキュリティ分析プラットフォームと統合し、統合されたモニタリング、監査、脅威分析を実現できます。

仕組み

オンプレミスネットワーク内にデプロイされたコンシューマーアプリケーションが、ログデータのフェッチを担当します。SLS 使用者グループを使用してリアルタイムでログをプルし、Splunk HTTP Event Collector (HEC) や TCP/TLS 経由の Syslog などのプロトコルを使用して SIEM に転送します。

このプロセスはプルベースのアーキテクチャ上に構築されており、設計上、以下の主要な利点があります。

  • セキュリティ: プルモデルにより、すべての接続がセキュアなネットワークから開始されることが保証されます。インバウンドファイアウォールポートを開く必要がなく、オンプレミスのセキュリティ体制を維持できます。

  • 高スループットとスケーラビリティ: 水平方向にスケーリングすることで高スループットを実現します。複数のコンシューマーインスタンスを同時に実行でき、使用者グループはすべてのアクティブなインスタンス間でワークロードを自動的に分散します。

  • 信頼性: 使用者グループは、少なくとも 1 回の配信を保証します。コンシューマーインスタンスに障害が発生した場合、処理していたシャードはグループ内の他の正常なインスタンスに自動的に再割り当てされます。消費は最後に保存されたチェックポイントから再開され、データ損失を防ぎます。

前提条件

  • 権限: Resource Access Management (RAM) ユーザーを作成し、AliyunLogFullAccess ポリシーをユーザーにアタッチします。詳細については、「関連ドキュメント」をご参照ください。

  • ネットワーク要件: プログラムを実行するマシンは、SLS エンドポイントにアクセスでき、SIEM と同じネットワーク内にある必要があります。

    • エンドポイントを取得するには:

      1. SLS コンソールにログインします。プロジェクトリストで、ターゲットプロジェクトをクリックします。

      2. プロジェクト名の右側にある image アイコンをクリックして、プロジェクトの概要ページに移動します。

      3. [エンドポイント] セクションで、パブリックエンドポイントを表示します。エンドポイントは https:// + パブリックエンドポイント です。

  • 環境要件: Python 3 ランタイム環境と SLS Python SDK。

    1. SLS Python SDK をインストールします: pip install -U aliyun-log-python-sdk

    2. インストールを確認します: pip show aliyun-log-python-sdk。次の情報が返された場合、インストールは成功です。

      Name: aliyun-log-python-sdk
      Version: 0.9.12
      Summary: Aliyun log service Python client SDK
      Home-page: https://github.com/aliyun/aliyun-log-python-sdk
      Author: Aliyun

手順

ステップ 1: アプリケーションの準備

SLS は 2 つの転送メソッドを提供します:

  • Splunk HEC: HEC はトークンベースのメカニズムで、さまざまなデータ形式のログを HTTP 経由で安全かつ効率的に Splunk に直接送信できます。

  • Syslog: ほとんどの SIEM と互換性があり、テキスト形式をサポートする一般的なログチャネルです。

Splunk HEC

ログデータを Splunk に転送するには、sync_data.py を設定します。コードは主に 3 つの部分で構成されています:

  • main() メソッド: メインプログラムのエントリポイント。

  • get_option() メソッド: 消費設定オプション。

    • 基本設定オプション: SLS の接続設定と使用者グループの設定が含まれます。

    • 使用者グループの高度なオプション: パフォーマンスチューニングパラメーター。必要がない限り変更しないでください。

    • SIEM (Splunk) 関連のパラメーターとオプション。

    • 転送プロセス中にデータクレンジング (行のフィルタリング、列のトリミング、データの正規化など) を実行するには、SPL クエリを使用してルールを追加します。例:

      # SPL 文
          query = "* | where instance_id in ('instance-1', 'instance-2')"
      # ルールに基づいてコンシューマーを作成します。通常の消費と比較して、クエリパラメーターはパラメーターリストの末尾に追加されます。
          option = LogHubConfig(endpoint, accessKeyId, accessKey, project, logstore, consumer_group, consumer_name,
                                cursor_position=CursorPosition.SPECIAL_TIMER_CURSOR,
                                cursor_start_time=cursor_start_time,
                                heartbeat_interval=heartbeat_interval,
                                data_fetch_interval=data_fetch_interval,
                                query=query)
  • SyncData(ConsumerProcessorBase): SLS からデータをフェッチして Splunk に転送するロジックが含まれています。コード内のコメントを注意深く確認し、必要に応じて調整してください。

完全なコードは次のとおりです:

sync_data.py

# -*- coding: utf-8 -*-
import os
import logging
from logging.handlers import RotatingFileHandler
from aliyun.log.consumer import *
from aliyun.log.pulllog_response import PullLogResponse
from multiprocessing import current_process
import json
import socket
import requests

# 診断とトラブルシューティングのために、ローテーションするログファイルを設定します。
root = logging.getLogger()
handler = RotatingFileHandler("{0}_{1}.log".format(os.path.basename(__file__), current_process().pid), maxBytes=100*1024*1024, backupCount=5)
handler.setFormatter(logging.Formatter(fmt='[%(asctime)s] - [%(threadName)s] - {%(module)s:%(funcName)s:%(lineno)d} %(levelname)s - %(message)s', datefmt='%Y-%m-%d %H:%M:%S'))
root.setLevel(logging.INFO)
root.addHandler(handler)
root.addHandler(logging.StreamHandler())

logger = logging.getLogger(__name__)


class SyncData(ConsumerProcessorBase):
    """
    コンシューマーは Simple Log Service からデータを消費し、Splunk に送信します。
    """
    def __init__(self, splunk_setting=None):
        
        """Splunk への接続性を初期化し、検証します。"""
        super(SyncData, self).__init__()   # base の init を呼び出すことを忘れないでください

        assert splunk_setting, ValueError("You need to configure settings of remote target")
        assert isinstance(splunk_setting, dict), ValueError("The settings should be dict to include necessary address and confidentials.")

        self.option = splunk_setting
        self.timeout = self.option.get("timeout", 120)

        # Splunk への接続性をテストします。
        s = socket.socket()
        s.settimeout(self.timeout)
        s.connect((self.option["host"], self.option['port']))

        self.r = requests.session()
        self.r.max_redirects = 1
        self.r.verify = self.option.get("ssl_verify", True)
        self.r.headers['Authorization'] = "Splunk {}".format(self.option['token'])
        self.url = "{0}://{1}:{2}/services/collector".format("http" if not self.option.get('https') else "https", self.option['host'], self.option['port'])

        self.default_fields = {}
        if self.option.get("sourcetype"):
            self.default_fields['sourcetype'] = self.option.get("sourcetype")
        if self.option.get("source"):
            self.default_fields['source'] = self.option.get("source")
        if self.option.get("index"):
            self.default_fields['index'] = self.option.get("index")

    def process(self, log_groups, check_point_tracker):
        logs = PullLogResponse.loggroups_to_flattern_list(log_groups, time_as_str=True, decode_bytes=True)
        logger.info("Get data from shard {0}, log count: {1}".format(self.shard_id, len(logs)))
        for log in logs:
            # ここのコードを修正します: ログをリモートエンドに送信するための同期処理コードに置き換えます。
            # ログ形式は辞書です。例 (注: すべての文字列は Unicode でエンコードする必要があります):
            #    Python3: {"__time__": "12312312", "__topic__": "topic", "field1": "value1", "field2": "value2"}
            event = {}
            event.update(self.default_fields)
            event['time'] = log[u'__time__']
            del log['__time__']

            json_topic = {"actiontrail_audit_event": ["event"] }
            topic = log.get("__topic__", "")
            if topic in json_topic:
                try:
                    for field in json_topic[topic]:
                        log[field] = json.loads(log[field])
                except Exception as ex:
                    pass
            event['event'] = json.dumps(log)

            data = json.dumps(event, sort_keys=True)

            try:
                req = self.r.post(self.url, data=data, timeout=self.timeout)
                req.raise_for_status()
            except Exception as err:
                logger.debug("Failed to connect to remote Splunk server ({0}). Exception: {1}".format(self.url, err))
                raise err

                # 必要に応じて、いくつかの再試行またはレポートを追加します。

        logger.info("Complete send data to remote")

        self.save_checkpoint(check_point_tracker)


def get_option():
    ##########################
    # 基本設定項目
    ##########################

    # 環境変数から Simple Log Service のパラメーターとオプションをロードします。
    accessKeyId = os.environ.get('SLS_AK_ID', '')
    accessKey = os.environ.get('SLS_AK_KEY', '')
    endpoint = os.environ.get('SLS_ENDPOINT', '')
    project = os.environ.get('SLS_PROJECT', '')
    logstore = os.environ.get('SLS_LOGSTORE', '')
    consumer_group = os.environ.get('SLS_CG', '')

    assert endpoint and accessKeyId and accessKey and project and logstore and consumer_group, \
        ValueError("endpoint/access_id/key/project/logstore/consumer_group/name cannot be empty")

    ##########################
    # 使用者グループの高度なオプション
    ##########################

    # 特に同時消費が必要な場合、コンシューマー名を変更することは一般的に推奨されません。
    consumer_name = "{0}-{1}".format(consumer_group, current_process().pid)

    # 消費の開始点。このパラメーターは、プログラムが初めて実行されるときに有効です。その後の実行では、最後に保存されたチェックポイントから消費が再開されます。
    # "begin"、"end"、または特定の ISO 時刻形式を使用します。
    cursor_start_time = "2018-12-26 0:0:0"

    # ハートビート間隔。サーバーが特定のシャードに対して間隔の 2 倍以内にハートビートレポートを受信しない場合、サーバーは対応するコンシューマーがオフラインであるとみなし、タスクを再割り当てします。
    # ネットワーク環境が悪い場合、短い間隔を設定することは推奨されません。
    heartbeat_interval = 20

    # データ消費の最大間隔。データが迅速に生成される場合、このパラメーターを調整する必要はありません。
    data_fetch_interval = 1
    
    # 使用者グループとコンシューマーを構築します。
    option = LogHubConfig(endpoint, accessKeyId, accessKey, project, logstore, consumer_group, consumer_name,
                          cursor_position=CursorPosition.SPECIAL_TIMER_CURSOR,
                          cursor_start_time=cursor_start_time,
                          heartbeat_interval=heartbeat_interval,
                          data_fetch_interval=data_fetch_interval)
"""
    ルールに基づいてコンシューマーを構築する場合は、次のコードを使用します:
    # カスタム SPL 文
    query = "* | where instance_id in ('instance-1', 'instance-2')"
    # ルールに基づいて消費を構築します。通常の消費と比較して、クエリパラメーターはパラメーターリストの末尾に追加されます。
    option = LogHubConfig(endpoint, accessKeyId, accessKey, project, logstore, consumer_group, consumer_name,
                          cursor_position=CursorPosition.SPECIAL_TIMER_CURSOR,
                          cursor_start_time=cursor_start_time,
                          heartbeat_interval=heartbeat_interval,
                          data_fetch_interval=data_fetch_interval,
                          query=query)
    """

    # Splunk オプション
    settings = {
                "host": "1.2.3.4",
                "port": 80,
                "token": "a0*****123",
                'https': False,              # オプション、bool
                'timeout': 120,             # オプション、int
                'ssl_verify': True,         # オプション、bool
                "sourcetype": "",            # オプション、sourcetype
                "index": "",                # オプション、index
                "source": "",               # オプション、source
            }

    return  option, settings

# メインプログラムのエントリポイント  
def main():
    option, settings = get_option()

    logger.info("*** start to consume data...")
    worker = ConsumerWorker(SyncData, option, args=(settings,) )
    worker.start(join=True)

if __name__ == '__main__':
    main()

Syslog

Syslog は、主に RFC5424 および RFC3164 に基づくログ形式の仕様を定義しています。RFC 5424 プロトコルの使用を推奨します。TCP と UDP の両方で Syslog を転送できますが、TCP は UDP よりも信頼性の高いデータ転送を提供します。RFC 5424 プロトコルは、TLS を使用したセキュアなトランスポートレイヤーも定義しています。お使いの SIEM が Syslog 用の TCP または TLS チャネルをサポートしている場合は、そのチャネルを使用することをお勧めします。

Syslog を使用して SIEM にログデータを転送するには、sync_data.py を設定します。コードは主に 3 つの部分で構成されています:

  • main() メソッド: メインプログラムのエントリポイント。

  • get_monitor_option() メソッド: 消費設定オプション。

    • 基本設定オプション: SLS の接続設定と使用者グループの設定が含まれます。

    • 使用者グループの高度なオプション: パフォーマンスチューニングパラメーター。必要がない限り変更しないでください。

    • SIEM の Syslog サーバーに関連するパラメーターとオプション。

      • Syslog ファシリティ: プログラムコンポーネント。この例では、デフォルトとして syslogclient.FAC_USER を使用します。

      • Syslog 重大度: ログレベル。必要に応じて、特定のコンテンツのログレベルを設定します。ここでは、syslogclient.SEV_INFO が選択されています。

      • SIEM が TCP または TLS に基づく Syslog チャネルをサポートしている場合は、protoTLS に設定し、正しい SSL 証明書を設定します。

  • SyncData(ConsumerProcessorBase): SLS からデータを取得し、SIEM Syslog サーバーに配信する方法のロジックが含まれています。コード内のコメントを注意深く読み、必要に応じて調整してください。

完全なコードは次のとおりです:

sync_data.py

# -*- coding: utf-8 -*-

import os
import logging
from logging.handlers import RotatingFileHandler
from aliyun.log.consumer import *
from aliyun.log.pulllog_response import PullLogResponse
from multiprocessing import current_process
import aliyun.log.ext.syslogclient as syslogclient
from aliyun.log.ext.syslogclient import SyslogClientRFC5424 as SyslogClient
import six
from datetime import datetime

# 診断とトラブルシューティングのために、ローテーションするログファイルを設定します。
root = logging.getLogger()
handler = RotatingFileHandler("{0}_{1}.log".format(os.path.basename(__file__), current_process().pid), maxBytes=100*1024*1024, backupCount=5)
handler.setFormatter(logging.Formatter(fmt='[%(asctime)s] - [%(threadName)s] - {%(module)s:%(funcName)s:%(lineno)d} %(levelname)s - %(message)s', datefmt='%Y-%m-%d %H:%M:%S'))
root.setLevel(logging.INFO)
root.addHandler(handler)
root.addHandler(logging.StreamHandler())

logger = logging.getLogger(__name__)


class SyncData(ConsumerProcessorBase):
    """
   コンシューマーは Simple Log Service からデータを消費し、Syslog サーバーに送信します。
    """
    def __init__(self, target_setting=None):
        """
        Syslog サーバーへの接続性を初期化し、検証します。
        """

        super(SyncData, self).__init__()   # base の init を呼び出すことを忘れないでください

        assert target_setting, ValueError("You need to configure settings of remote target")
        assert isinstance(target_setting, dict), ValueError("The settings should be dict to include necessary address and confidentials.")

        self.option = target_setting
        self.protocol = self.option['protocol']
        self.timeout = int(self.option.get('timeout', 120))
        self.sep = self.option.get('sep', "||")
        self.host = self.option["host"]
        self.port = int(self.option.get('port', 514))
        self.cert_path=self.option.get('cert_path', None)

        # 接続性をテストします。 
        with SyslogClient(self.host, self.port, proto=self.protocol, timeout=self.timeout, cert_path=self.cert_path) as client:
            pass

    def process(self, log_groups, check_point_tracker):
        logs = PullLogResponse.loggroups_to_flattern_list(log_groups, time_as_str=True, decode_bytes=True)
        logger.info("Get data from shard {0}, log count: {1}".format(self.shard_id, len(logs)))

        try:
            with SyslogClient(self.host, self.port, proto=self.protocol, timeout=self.timeout, cert_path=self.cert_path) as client:
                for log in logs:
                    # ログをリモートエンドに送信するための同期コードをここに配置します。
                    # ログ形式は辞書です。例 (注: すべての文字列は Unicode でエンコードする必要があります):
                    #    Python2: {"__time__": "12312312", "__topic__": "topic", u"field1": u"value1", u"field2": u"value2"}
                    #    Python3: {"__time__": "12312312", "__topic__": "topic", "field1": "value1", "field2": "value2"}
                
                    timestamp = datetime.fromtimestamp(int(log[u'__time__']))
                    del log['__time__']

                    io = six.StringIO()
                    first = True
                    # 必要に応じてフォーマットを変更します。ここでは、Key=Value が転送に使用され、デフォルトの二重縦線 (||) が区切り文字として使用されます。
                    for k, v in six.iteritems(log):
                        io.write("{0}{1}={2}".format(self.sep, k, v))

                    data = io.getvalue()
                    # 必要に応じてファシリティまたは重大度を変更します。
                    client.log(data, facility=self.option.get("facility", None), severity=self.option.get("severity", None), timestamp=timestamp, program=self.option.get("tag", None), hostname=self.option.get("hostname", None))

        except Exception as err:
            logger.debug("Failed to connect to remote syslog server ({0}). Exception: {1}".format(self.option, err))
            # 再試行や通知など、いくつかのエラー処理コードを追加する必要があります。 
            raise err

        logger.info("Complete send data to remote")

        self.save_checkpoint(check_point_tracker)


def get_monitor_option():
    ##########################
    # 基本設定項目
    ##########################

    # 環境変数から Simple Log Service のパラメーターとオプションをロードします。
    endpoint = os.environ.get('SLS_ENDPOINT', '')
    accessKeyId = os.environ.get('SLS_AK_ID', '')
    accessKey = os.environ.get('SLS_AK_KEY', '')
    project = os.environ.get('SLS_PROJECT', '')
    logstore = os.environ.get('SLS_LOGSTORE', '')
    consumer_group = os.environ.get('SLS_CG', '')

    assert endpoint and accessKeyId and accessKey and project and logstore and consumer_group, \
        ValueError("endpoint/access_id/key/project/logstore/consumer_group/name cannot be empty")

    ##########################
    # 使用者グループの高度なオプション
    ##########################

    # 特に同時消費が必要な場合、コンシューマー名を変更することは一般的に推奨されません。
    consumer_name = "{0}-{1}".format(consumer_group, current_process().pid)

    # 消費の開始点。このパラメーターは、プログラムが初めて実行されるときに有効です。その後の実行では、最後に保存されたチェックポイントから消費が再開されます。
    # "begin"、"end"、または特定の ISO 時刻形式を使用します。
    cursor_start_time = "2019-1-1 0:0:0+8:00"

    # ハートビート間隔。サーバーが特定のシャードに対して間隔の 2 倍以内にハートビートレポートを受信しない場合、サーバーは対応するコンシューマーがオフラインであるとみなし、タスクを再割り当てします。
    # ネットワーク環境が悪い場合、短い間隔を設定することは推奨されません。
    heartbeat_interval = 20

    # データ消費の最大間隔。データが迅速に生成される場合、このパラメーターを調整する必要はありません。
    data_fetch_interval = 1

    # 使用者グループとコンシューマーを構築します。
    option = LogHubConfig(endpoint, accessKeyId, accessKey, project, logstore, consumer_group, consumer_name,
                          cursor_position=CursorPosition.SPECIAL_TIMER_CURSOR,
                          cursor_start_time=cursor_start_time,
                          heartbeat_interval=heartbeat_interval,
                          data_fetch_interval=data_fetch_interval)

    # Syslog サーバーに関連するパラメーターとオプション
    settings = {
                "host": "1.2.3.4", # 必須
                "port": 514,       # 必須、ポート
                "protocol": "tcp", # 必須、TCP、UDP、または TLS (Python 3 のみ)
                "sep": "||",      # 必須、key=value ペアの区切り文字。ここでは、二重縦線 (||) が使用されます。
                "cert_path": None,  # オプション、TLS 証明書の場所
                "timeout": 120,   # オプション、タイムアウト期間、デフォルトは 120 秒
                "facility": syslogclient.FAC_USER,  # オプション、他の syslogclient.FAC_* 値を参照
                "severity": syslogclient.SEV_INFO,  # オプション、他の syslogclient.SEV_* 値を参照
                "hostname": None, # オプション、マシン名、デフォルトではローカルマシン名が使用されます
                "tag": None # オプション、タグ、デフォルトはハイフン (-)
    }

    return option, settings

# メインプログラムのエントリポイント
def main():
    option, settings = get_monitor_option()

    logger.info("*** start to consume data...")
    worker = ConsumerWorker(SyncData, option, args=(settings,) )
    worker.start(join=True)


if __name__ == '__main__':
    main()

ステップ 2: 環境変数の設定

アプリケーションを設定した後、次の表で説明するようにシステム環境変数を設定します。

環境変数

SLS_ENDPOINT

  1. Simple Log Service コンソールにログインします。プロジェクトリストで、ターゲットプロジェクトをクリックします。

  2. プロジェクト名の右側にある image アイコンをクリックして、プロジェクトの概要ページに移動します。

  3. [エンドポイント] セクションで、パブリックエンドポイントをコピーし、https:// + パブリックエンドポイント として連結します。

エンドポイントに https:// のプレフィックスが付いている場合、たとえば https://cn-beijing.log.aliyuncs.com のように、プログラムは自動的に HTTPS を使用して SLS との接続を暗号化します。サーバー証明書 *.aliyuncs.com は GlobalSign によって発行され、デフォルトでほとんどのシステムで信頼されています。お使いのシステムがこの証明書を信頼しない場合は、証明書のインストールを通じてダウンロードしてインストールしてください。

https://cn-beijing.log.aliyuncs.com

SLS_PROJECT

SLS コンソールで、ターゲットプロジェクト名をコピーします。

my-sls-project-one

SLS_LOGSTORE

SLS コンソールで、ターゲット Logstore 名をコピーします。

my-sls-logstore-a1

SLS_AK_ID

RAM ユーザーの AccessKey ID を使用します。

重要
  • Alibaba Cloud アカウントの AccessKey ペアは、完全な API アクセスを許可します。セキュリティを向上させるため、API 呼び出しや日常の O&M には常に RAM ユーザーの AccessKey ペアを使用してください。

  • 認証情報の漏洩を防ぐため、コードに AccessKey ID と AccessKey Secret をハードコーディングしないでください。

L***ky

SLS_AK_KEY

RAM ユーザーの AccessKey シークレットを使用します。

x***Xl

SLS_CG

使用者グループ名。指定したグループが存在しない場合、アプリケーションは自動的に作成します。

syc_data

ステップ 3: 開始と検証

  1. 同時消費のために複数のコンシューマーを開始します。コンシューマーの最大数は、シャードの総数と同じです。

    # 最初のコンシューマープロセスを開始
    nohup python3 sync_data.py &
    # 2 番目のコンシューマープロセスを開始
    nohup python3 sync_data.py &
  2. SLS コンソールで使用者グループのステータスを確認します。

    1. プロジェクトリストで、目的のプロジェクトをクリックします。 [ログストレージ] > [Logstore] タブで、目的の Logstore の横にある Expand アイコンをクリックし、次に [データ消費] の横にある Expand アイコンをクリックします。

    2. 使用者グループリストで、目的のグループをクリックします。[使用者グループのステータス] ページで、各シャードのデータ消費のクライアントと時刻を表示します。

よくある質問

ConsumerGroupQuotaExceed エラーが発生する

このエラーは、制限を超えたことを示します。単一の Logstore には最大 30 の使用者グループを持つことができます。SLS コンソールで未使用の使用者グループを削除してください。