全部產品
Search
文件中心

Simple Log Service:投遞日誌到SIEM

更新時間:Jan 10, 2026

當需要投遞日誌到 SIEM 時,可通過部署一個能夠串連Log Service與 SIEM 的應用程式,利用 SLS 消費組拉取日誌,並使用 Splunk HEC 或 Syslog 將資料推送到 SIEM,實現雲上日誌與本地安全分析平台的整合。

業務背景

企業通常將安全資訊和事件管理(SIEM)平台(如 Splunk、QRadar)部署在本機資料中心,且不向公網暴露接收埠以保證安全。當業務上雲後,雲上資源產生的日誌資料又需要納入本地 SIEM 進行統一的監控、審計和威脅分析。因此在不降低本地系統安全性的前提下,需要建立一條從Log Service到本地 SIEM 的日誌投遞管道,以實現雲上日誌的投遞。

投遞流程

在資料投遞情境中,建議採用Log Service消費組來實現即時消費,並利用 Splunk 的 API(HTTP事件收集,HEC)或 Syslog over TCP/TLS 將日誌傳輸至 SIEM。

image

核心邏輯

  1. 日誌拉取:基於消費組構建程式,從Log Service拉取資料。此機制支援並發消費和容錯移轉。

    • 並發與吞吐

      • 可通過多次啟動程式來實現並發效果。多個消費者屬於同一消費組,且名稱均不相同(消費者名以進程ID為尾碼)。

      • 一個分區(Shard)只能被一個消費者消費,因此並發上限為Shard數量。例如一個日誌庫有10個分區,那麼最多有10個消費者同時消費。

      • 在理想網路條件下:

        • 單個消費者(約佔用20%單核CPU)可達10 MB/s原始日誌消費速率。

        • 10個消費者可消費100 MB/s原始日誌。

    • 高可用性

      • 消費組將檢測點(Checkpoint)儲存於服務端。

      • 當某一消費者執行個體終止運行,另一個消費者執行個體將自動接管並從斷點繼續消費。因此可在不同機器上啟動消費者,當一台機器故障的情況下,其他機器上的消費者便可以自動從斷點繼續消費。

      • 可在不同機器啟動大於Shard數量的消費者以作備用。

  2. 資料轉寄:程式收到日誌後,根據配置進行格式化,並發送到本地 SIEM。

準備工作

  • 建立RAM使用者及授權:該RAM需要擁有AliyunLogFullAccess的許可權。

  • 網路要求:程式所在機器需要能訪問Log ServiceEndpoint網域名稱,且與SIEM 處於相同網路環境。

    • Endpoint網域名稱擷取方式:

      1. 登入Log Service控制台,在Project列表中,單擊目標Project。

      2. 單擊Project名稱右側的image進入專案概覽頁面。

      3. 在訪問網域名稱中複製公網網域名稱。Endpoint網域名稱為https://+公網網域名稱

  • 環境要求:準備Python 3運行環境,並安裝Python sdk。

    1. 安裝Log ServicePython sdk:pip install -U aliyun-log-python-sdk

    2. 驗證安裝結果:pip show aliyun-log-python-sdk,返回如下資訊表示安裝成功。

      Name: aliyun-log-python-sdk
      Version: 0.9.12
      Summary: Aliyun log service Python client SDK
      Home-page: https://github.com/aliyun/aliyun-log-python-sdk
      Author: Aliyun

實施步驟

步驟一:應用程式準備

Log Service提供Splunk HEC 與 Syslog 兩種投遞方式,請選擇對應程式樣本進行配置。

  • Splunk HEC :HTTP 事件收集器 (HEC) 基於Token,通過 HTTP 以高效安全的方式將多種資料格式的日誌直接發送到 Splunk。

  • Syslog:常見的日誌通道,相容大多數SIEM,支援文字格式設定。

Splunk HEC

投遞日誌資料至Splunk時,可以參考sync_data.py進行配置,代碼主要由三部分內容組成:

  • main()方法:主程式控制邏輯。

  • get_option() 方法:消費配置項。

    • 基本配置項:包括Log Service串連配置和消費組配置。

    • 消費組的進階選項:效能調參,不推薦修改。

    • SIEM(Splunk)相關參數與選項。

    • 若在資料投遞過程中涉及資料清洗(如行過濾、列裁剪和資料規整等)時,可以通過添加SPL語句添加規則,參考如下:

      # SPL 語句
          query = "* | where instance_id in ('instance-1', 'instance-2')"
      # 基於規則建立消費,相比普通消費在參數列表最後增加了參數 query
          option = LogHubConfig(endpoint, accessKeyId, accessKey, project, logstore, consumer_group, consumer_name,
                                cursor_position=CursorPosition.SPECIAL_TIMER_CURSOR,
                                cursor_start_time=cursor_start_time,
                                heartbeat_interval=heartbeat_interval,
                                data_fetch_interval=data_fetch_interval,
                                query=query)
  • SyncData(ConsumerProcessorBase):內容包含如何從Log Service擷取資料並投遞到Splunk,請仔細閱讀代碼中相關注釋並根據需求調整。

完整代碼如下:

sync_data.py

# -*- coding: utf-8 -*-
import os
import logging
from logging.handlers import RotatingFileHandler
from aliyun.log.consumer import *
from aliyun.log.pulllog_response import PullLogResponse
from multiprocessing import current_process
import json
import socket
import requests

# 配置程式記錄檔,以便後續測試或者診斷問題
root = logging.getLogger()
handler = RotatingFileHandler("{0}_{1}.log".format(os.path.basename(__file__), current_process().pid), maxBytes=100*1024*1024, backupCount=5)
handler.setFormatter(logging.Formatter(fmt='[%(asctime)s] - [%(threadName)s] - {%(module)s:%(funcName)s:%(lineno)d} %(levelname)s - %(message)s', datefmt='%Y-%m-%d %H:%M:%S'))
root.setLevel(logging.INFO)
root.addHandler(handler)
root.addHandler(logging.StreamHandler())

logger = logging.getLogger(__name__)


class SyncData(ConsumerProcessorBase):
    """
    這個消費者從Log Service消費資料並發送給Splunk
    """
    def __init__(self, splunk_setting=None):
        
        """初始化並驗證Splunk連通性"""
        super(SyncData, self).__init__()   # remember to call base's init

        assert splunk_setting, ValueError("You need to configure settings of remote target")
        assert isinstance(splunk_setting, dict), ValueError("The settings should be dict to include necessary address and confidentials.")

        self.option = splunk_setting
        self.timeout = self.option.get("timeout", 120)

        # 測試Splunk連通性
        s = socket.socket()
        s.settimeout(self.timeout)
        s.connect((self.option["host"], self.option['port']))

        self.r = requests.session()
        self.r.max_redirects = 1
        self.r.verify = self.option.get("ssl_verify", True)
        self.r.headers['Authorization'] = "Splunk {}".format(self.option['token'])
        self.url = "{0}://{1}:{2}/services/collector".format("http" if not self.option.get('https') else "https", self.option['host'], self.option['port'])

        self.default_fields = {}
        if self.option.get("sourcetype"):
            self.default_fields['sourcetype'] = self.option.get("sourcetype")
        if self.option.get("source"):
            self.default_fields['source'] = self.option.get("source")
        if self.option.get("index"):
            self.default_fields['index'] = self.option.get("index")

    def process(self, log_groups, check_point_tracker):
        logs = PullLogResponse.loggroups_to_flattern_list(log_groups, time_as_str=True, decode_bytes=True)
        logger.info("Get data from shard {0}, log count: {1}".format(self.shard_id, len(logs)))
        for log in logs:
            # 修改此處代碼:替換為實際的日誌發送到遠端的同步處理代碼
            # 日誌格式為字典類型,樣本如下(注意:所有字串必須為Unicode編碼):
            #    Python3: {"__time__": "12312312", "__topic__": "topic", "field1": "value1", "field2": "value2"}
            event = {}
            event.update(self.default_fields)
            event['time'] = log[u'__time__']
            del log['__time__']

            json_topic = {"actiontrail_audit_event": ["event"] }
            topic = log.get("__topic__", "")
            if topic in json_topic:
                try:
                    for field in json_topic[topic]:
                        log[field] = json.loads(log[field])
                except Exception as ex:
                    pass
            event['event'] = json.dumps(log)

            data = json.dumps(event, sort_keys=True)

            try:
                req = self.r.post(self.url, data=data, timeout=self.timeout)
                req.raise_for_status()
            except Exception as err:
                logger.debug("Failed to connect to remote Splunk server ({0}). Exception: {1}".format(self.url, err))
                raise err

                #根據需要,添加一些重試或者報告。

        logger.info("Complete send data to remote")

        self.save_checkpoint(check_point_tracker)


def get_option():
    ##########################
    # 基本配置項
    ##########################

    # 從環境變數中載入Log Service參數與選項
    accessKeyId = os.environ.get('SLS_AK_ID', '')
    accessKey = os.environ.get('SLS_AK_KEY', '')
    endpoint = os.environ.get('SLS_ENDPOINT', '')
    project = os.environ.get('SLS_PROJECT', '')
    logstore = os.environ.get('SLS_LOGSTORE', '')
    consumer_group = os.environ.get('SLS_CG', '')

    assert endpoint and accessKeyId and accessKey and project and logstore and consumer_group, \
        ValueError("endpoint/access_id/key/project/logstore/consumer_group/name cannot be empty")

    ##########################
    # 消費組的進階選項
    ##########################

    # 一般不建議修改消費者名稱,尤其是需要進行並發消費時。
    consumer_name = "{0}-{1}".format(consumer_group, current_process().pid)

    # 消費的起點。這個參數在首次運行程式的時候有效,後續再次運行時將從上一次消費的儲存點繼續消費。
    # 可以使用“begin”、“end”,或者特定的ISO時間格式。
    cursor_start_time = "2018-12-26 0:0:0"

    # 心跳時間長度,當伺服器在2倍時間內沒有收到特定Shard的心跳報告時,伺服器會認為對應消費者離線並重新調配任務。
    # 當網路環境不佳時,不建議將時間長度設定的比較小。
    heartbeat_interval = 20

    # 消費資料的最大間隔,如果資料產生的速度很快,不需要調整這個參數
    data_fetch_interval = 1
    
    # 構建一個消費組和消費者
    option = LogHubConfig(endpoint, accessKeyId, accessKey, project, logstore, consumer_group, consumer_name,
                          cursor_position=CursorPosition.SPECIAL_TIMER_CURSOR,
                          cursor_start_time=cursor_start_time,
                          heartbeat_interval=heartbeat_interval,
                          data_fetch_interval=data_fetch_interval)
"""
    基於規則構建消費者時可使用如下代碼:
    自訂 SPL 語句
    query = "* | where instance_id in ('instance-1', 'instance-2')"
    基於規則構建消費,相比普通消費在參數列表最後增加了參數 query
    option = LogHubConfig(endpoint, accessKeyId, accessKey, project, logstore, consumer_group, consumer_name,
                          cursor_position=CursorPosition.SPECIAL_TIMER_CURSOR,
                          cursor_start_time=cursor_start_time,
                          heartbeat_interval=heartbeat_interval,
                          data_fetch_interval=data_fetch_interval,
                          query=query)
    """

    # Splunk選項
    settings = {
                "host": "1.2.3.4",
                "port": 80,
                "token": "a0*****123",
                'https': False,              # 可選, bool
                'timeout': 120,             # 可選, int
                'ssl_verify': True,         # 可選, bool
                "sourcetype": "",            # 可選, sourcetype
                "index": "",                # 可選, index
                "source": "",               # 可選, source
            }

    return  option, settings

#主程式控制邏輯 
def main():
    option, settings = get_option()

    logger.info("*** start to consume data...")
    worker = ConsumerWorker(SyncData, option, args=(settings,) )
    worker.start(join=True)

if __name__ == '__main__':
    main()

Syslog

Syslog主要基於RFC5424RFC3164定義相關日誌格式規範,推薦使用RFC5424協議。理論上TCP和UDP都支援Syslog,可以較好的保證資料轉送穩定性,RFC5424協議也定義了TLS的安全傳輸層,當SIEM支援TCP通道或者TLS通道時建議優先使用。

當需要投遞日誌資料至SIEM時可以參考sync_data.py進行配置,代碼主要由三部分內容組成:

  • main()方法:主程式控制邏輯。

  • get_monitor_option() 方法:消費配置項。

    • 基本配置項:包括Log Service串連配置和消費組配置。

    • 消費組的進階選項:效能調參,不推薦修改。

    • SIEM的Syslog server相關參數與選項。

      • Syslog facility:程式組件,此處選擇syslogclient.FAC_USER作為預設組件。

      • Syslog severity:記錄層級,可根據需求設定指定內容的記錄層級。此處選擇syslogclient.SEV_INFO

      • 若SIEM支援基於TCP或TLS的Syslog通道,請配置protoTLS及配置正確的SSL認證。

  • SyncData(ConsumerProcessorBase):內容包含如何從Log Service擷取資料投遞到SIEM Syslog伺服器,請仔細閱讀代碼中相關注釋並根據需求調整。

完整代碼如下:

sync_data.py

# -*- coding: utf-8 -*-

import os
import logging
from logging.handlers import RotatingFileHandler
from aliyun.log.consumer import *
from aliyun.log.pulllog_response import PullLogResponse
from multiprocessing import current_process
import aliyun.log.ext.syslogclient as syslogclient
from aliyun.log.ext.syslogclient import SyslogClientRFC5424 as SyslogClient
import six
from datetime import datetime

# 配置程式記錄檔,以便後續測試或者診斷問題
root = logging.getLogger()
handler = RotatingFileHandler("{0}_{1}.log".format(os.path.basename(__file__), current_process().pid), maxBytes=100*1024*1024, backupCount=5)
handler.setFormatter(logging.Formatter(fmt='[%(asctime)s] - [%(threadName)s] - {%(module)s:%(funcName)s:%(lineno)d} %(levelname)s - %(message)s', datefmt='%Y-%m-%d %H:%M:%S'))
root.setLevel(logging.INFO)
root.addHandler(handler)
root.addHandler(logging.StreamHandler())

logger = logging.getLogger(__name__)


class SyncData(ConsumerProcessorBase):
    """
   消費者從Log Service消費資料並發送給Syslog server
    """
    def __init__(self, target_setting=None):
        """
        初始化並驗證Syslog server連通性
        """

        super(SyncData, self).__init__()   # remember to call base's init

        assert target_setting, ValueError("You need to configure settings of remote target")
        assert isinstance(target_setting, dict), ValueError("The settings should be dict to include necessary address and confidentials.")

        self.option = target_setting
        self.protocol = self.option['protocol']
        self.timeout = int(self.option.get('timeout', 120))
        self.sep = self.option.get('sep', "||")
        self.host = self.option["host"]
        self.port = int(self.option.get('port', 514))
        self.cert_path=self.option.get('cert_path', None)

        # 測試連通性 
        with SyslogClient(self.host, self.port, proto=self.protocol, timeout=self.timeout, cert_path=self.cert_path) as client:
            pass

    def process(self, log_groups, check_point_tracker):
        logs = PullLogResponse.loggroups_to_flattern_list(log_groups, time_as_str=True, decode_bytes=True)
        logger.info("Get data from shard {0}, log count: {1}".format(self.shard_id, len(logs)))

        try:
            with SyslogClient(self.host, self.port, proto=self.protocol, timeout=self.timeout, cert_path=self.cert_path) as client:
                for log in logs:
                    # 將日誌發送到遠端的同步代碼置於此處
                    # 日誌格式為字典類型,樣本如下(注意:所有字串必須為Unicode編碼):
                    #    Python2: {"__time__": "12312312", "__topic__": "topic", u"field1": u"value1", u"field2": u"value2"}
                    #    Python3: {"__time__": "12312312", "__topic__": "topic", "field1": "value1", "field2": "value2"}
                
                    timestamp = datetime.fromtimestamp(int(log[u'__time__']))
                    del log['__time__']

                    io = six.StringIO()
                    first = True
                    # 可以根據需要修改格式化內容,這裡使用Key=Value傳輸,並使用預設的雙豎線(||)進行分割
                    for k, v in six.iteritems(log):
                        io.write("{0}{1}={2}".format(self.sep, k, v))

                    data = io.getvalue()
                    # 可以根據需要修改facility或者severity
                    client.log(data, facility=self.option.get("facility", None), severity=self.option.get("severity", None), timestamp=timestamp, program=self.option.get("tag", None), hostname=self.option.get("hostname", None))

        except Exception as err:
            logger.debug("Failed to connect to remote syslog server ({0}). Exception: {1}".format(self.option, err))
            # 需要添加一些錯誤處理的代碼,例如重試或者通知等 
            raise err

        logger.info("Complete send data to remote")

        self.save_checkpoint(check_point_tracker)


def get_monitor_option():
    ##########################
    # 基本配置項
    ##########################

    # 從環境變數中載入Log Service參數與選項
    endpoint = os.environ.get('SLS_ENDPOINT', '')
    accessKeyId = os.environ.get('SLS_AK_ID', '')
    accessKey = os.environ.get('SLS_AK_KEY', '')
    project = os.environ.get('SLS_PROJECT', '')
    logstore = os.environ.get('SLS_LOGSTORE', '')
    consumer_group = os.environ.get('SLS_CG', '')

    assert endpoint and accessKeyId and accessKey and project and logstore and consumer_group, \
        ValueError("endpoint/access_id/key/project/logstore/consumer_group/name cannot be empty")

    ##########################
    # 消費組的進階選項
    ##########################

    # 一般不建議修改消費者名稱,尤其是需要進行並發消費時。
    consumer_name = "{0}-{1}".format(consumer_group, current_process().pid)

    # 消費的起點。這個參數在首次運行程式的時候有效,後續再次運行時將從上一次消費的儲存點繼續消費。
    # 可以使用“begin”、“end”,或者特定的ISO時間格式。
    cursor_start_time = "2019-1-1 0:0:0+8:00"

    # 心跳時間長度,當伺服器在2倍時間內沒有收到特定Shard的心跳報告時,伺服器會認為對應消費者離線並重新調配任務。
    # 當網路環境不佳時,不建議將時間長度設定的比較小。
    heartbeat_interval = 20

    # 消費資料的最大間隔,如果資料產生的速度很快,不需要調整這個參數
    data_fetch_interval = 1

    # 構建一個消費組和消費者
    option = LogHubConfig(endpoint, accessKeyId, accessKey, project, logstore, consumer_group, consumer_name,
                          cursor_position=CursorPosition.SPECIAL_TIMER_CURSOR,
                          cursor_start_time=cursor_start_time,
                          heartbeat_interval=heartbeat_interval,
                          data_fetch_interval=data_fetch_interval)

    # Syslog server相關參數與選項
    settings = {
                "host": "1.2.3.4", # 必選
                "port": 514,       # 必選, 連接埠
                "protocol": "tcp", # 必選, TCP、UDP或TLS(僅Python3)
                "sep": "||",      # 必選, key=value索引值對的分隔字元,這裡用雙豎線(||)分隔
                "cert_path": None,  # 可選,TLS的認證位置
                "timeout": 120,   # 可選,逾時時間,預設120秒
                "facility": syslogclient.FAC_USER,  # 可選,可以參考其他syslogclient.FAC_*的值
                "severity": syslogclient.SEV_INFO,  # 可選,可以參考其他syslogclient.SEV_*的值
                "hostname": None, # 可選,機器名,預設選擇本機機器名
                "tag": None # 可選,標籤,預設是短劃線(-)
    }

    return option, settings

#主程式控制邏輯
def main():
    option, settings = get_monitor_option()

    logger.info("*** start to consume data...")
    worker = ConsumerWorker(SyncData, option, args=(settings,) )
    worker.start(join=True)


if __name__ == '__main__':
    main()

步驟二:配置環境變數

程式配置完成後,進行表格中系統內容變數配置

環境變數名

取值

樣本

SLS_ENDPOINT

  1. 登入Log Service控制台,在Project列表中,單擊目標Project。

  2. 單擊Project名稱右側的image進入專案概覽頁面。

  3. 在訪問網域名稱中複製公網網域名稱,拼接為https://+公網網域名稱

若Endpoint首碼配置為https://,如https://cn-beijing.log.aliyuncs.com,則程式自動使用HTTPS加密與Log Service串連。伺服器憑證*.aliyuncs.com由GlobalSign簽發,一般機器會自動信任此認證。若機器不信任此認證,通過Certificate installation下載並安裝。

https://cn-beijing.log.aliyuncs.com

SLS_PROJECT

在Log Service控制台,複製目標Project名稱。

my-sls-project-one

SLS_LOGSTORE

在Log Service控制台,複製目標LogStore名稱。

my-sls-logstore-a1

SLS_AK_ID

建議使用RAM使用者的AccessKey ID。

重要
  • 阿里雲帳號的AccessKey擁有所有API的存取權限,建議使用RAM使用者的AccessKey進行API訪問或日常營運。

  • 強烈建議不要把AccessKey ID和AccessKey Secret儲存到工程代碼中,否則可能導致AccessKey泄露,威脅您帳號下所有資源的安全。

L***ky

SLS_AK_KEY

建議使用RAM使用者的AccessKey Secret。

x***Xl

SLS_CG

消費組名,可以簡單命名為"syc_data",若消費組不存在,程式會自動建立。

syc_data

步驟三:啟動並驗證

  1. 啟動多消費者進行並發消費,支援的最大並發數等於總 Shard 數。

    # 啟動第一個消費者進程
    nohup python3 sync_data.py &
    # 啟動第二個消費者進程
    nohup python3 sync_data.py &
  2. Log Service控制台查看消費組狀態。

    1. 在Project列表地區,單擊目標Project。在日誌儲存 > 日誌庫頁簽中,單擊目標LogStore的展開節點表徵圖,然後單擊資料消費展開節點表徵圖。

    2. 在消費組列表中,單擊目標消費組,在Consumer Group狀態頁面,查看每個Shard消費資料的用戶端和時間。

常見問題

出現ConsumerGroupQuotaExceed錯誤

此錯誤表示超出限制,單個日誌庫(LogStore)配置消費組上限為30個,請在Log Service控制台刪除無用消費組。