當需要投遞日誌到 SIEM 時,可通過部署一個能夠串連Log Service與 SIEM 的應用程式,利用 SLS 消費組拉取日誌,並使用 Splunk HEC 或 Syslog 將資料推送到 SIEM,實現雲上日誌與本地安全分析平台的整合。
業務背景
企業通常將安全資訊和事件管理(SIEM)平台(如 Splunk、QRadar)部署在本機資料中心,且不向公網暴露接收埠以保證安全。當業務上雲後,雲上資源產生的日誌資料又需要納入本地 SIEM 進行統一的監控、審計和威脅分析。因此在不降低本地系統安全性的前提下,需要建立一條從Log Service到本地 SIEM 的日誌投遞管道,以實現雲上日誌的投遞。
投遞流程
在資料投遞情境中,建議採用Log Service消費組來實現即時消費,並利用 Splunk 的 API(HTTP事件收集,HEC)或 Syslog over TCP/TLS 將日誌傳輸至 SIEM。
核心邏輯
日誌拉取:基於消費組構建程式,從Log Service拉取資料。此機制支援並發消費和容錯移轉。
並發與吞吐
可通過多次啟動程式來實現並發效果。多個消費者屬於同一消費組,且名稱均不相同(消費者名以進程ID為尾碼)。
一個分區(Shard)只能被一個消費者消費,因此並發上限為Shard數量。例如一個日誌庫有10個分區,那麼最多有10個消費者同時消費。
在理想網路條件下:
單個消費者(約佔用20%單核CPU)可達10 MB/s原始日誌消費速率。
10個消費者可消費100 MB/s原始日誌。
高可用性
消費組將檢測點(Checkpoint)儲存於服務端。
當某一消費者執行個體終止運行,另一個消費者執行個體將自動接管並從斷點繼續消費。因此可在不同機器上啟動消費者,當一台機器故障的情況下,其他機器上的消費者便可以自動從斷點繼續消費。
可在不同機器啟動大於Shard數量的消費者以作備用。
資料轉寄:程式收到日誌後,根據配置進行格式化,並發送到本地 SIEM。
準備工作
建立RAM使用者及授權:該RAM需要擁有AliyunLogFullAccess的許可權。
網路要求:程式所在機器需要能訪問Log ServiceEndpoint網域名稱,且與SIEM 處於相同網路環境。
Endpoint網域名稱擷取方式:
登入Log Service控制台,在Project列表中,單擊目標Project。
單擊Project名稱右側的
進入專案概覽頁面。在訪問網域名稱中複製公網網域名稱。Endpoint網域名稱為
https://+公網網域名稱。
環境要求:準備Python 3運行環境,並安裝Python sdk。
安裝Log ServicePython sdk:
pip install -U aliyun-log-python-sdk。驗證安裝結果:
pip show aliyun-log-python-sdk,返回如下資訊表示安裝成功。Name: aliyun-log-python-sdk Version: 0.9.12 Summary: Aliyun log service Python client SDK Home-page: https://github.com/aliyun/aliyun-log-python-sdk Author: Aliyun
實施步驟
步驟一:應用程式準備
Log Service提供Splunk HEC 與 Syslog 兩種投遞方式,請選擇對應程式樣本進行配置。
Splunk HEC :HTTP 事件收集器 (HEC) 基於Token,通過 HTTP 以高效安全的方式將多種資料格式的日誌直接發送到 Splunk。
Syslog:常見的日誌通道,相容大多數SIEM,支援文字格式設定。
Splunk HEC
投遞日誌資料至Splunk時,可以參考sync_data.py進行配置,代碼主要由三部分內容組成:
main()方法:主程式控制邏輯。
get_option() 方法:消費配置項。
基本配置項:包括Log Service串連配置和消費組配置。
消費組的進階選項:效能調參,不推薦修改。
SIEM(Splunk)相關參數與選項。
若在資料投遞過程中涉及資料清洗(如行過濾、列裁剪和資料規整等)時,可以通過添加SPL語句添加規則,參考如下:
# SPL 語句 query = "* | where instance_id in ('instance-1', 'instance-2')" # 基於規則建立消費,相比普通消費在參數列表最後增加了參數 query option = LogHubConfig(endpoint, accessKeyId, accessKey, project, logstore, consumer_group, consumer_name, cursor_position=CursorPosition.SPECIAL_TIMER_CURSOR, cursor_start_time=cursor_start_time, heartbeat_interval=heartbeat_interval, data_fetch_interval=data_fetch_interval, query=query)
SyncData(ConsumerProcessorBase):內容包含如何從Log Service擷取資料並投遞到Splunk,請仔細閱讀代碼中相關注釋並根據需求調整。
完整代碼如下:
Syslog
Syslog主要基於RFC5424和RFC3164定義相關日誌格式規範,推薦使用RFC5424協議。理論上TCP和UDP都支援Syslog,可以較好的保證資料轉送穩定性,RFC5424協議也定義了TLS的安全傳輸層,當SIEM支援TCP通道或者TLS通道時建議優先使用。
當需要投遞日誌資料至SIEM時可以參考sync_data.py進行配置,代碼主要由三部分內容組成:
main()方法:主程式控制邏輯。
get_monitor_option() 方法:消費配置項。
SyncData(ConsumerProcessorBase):內容包含如何從Log Service擷取資料投遞到SIEM Syslog伺服器,請仔細閱讀代碼中相關注釋並根據需求調整。
完整代碼如下:
步驟二:配置環境變數
程式配置完成後,進行表格中系統內容變數配置。
環境變數名 | 取值 | 樣本 |
SLS_ENDPOINT |
若Endpoint首碼配置為 |
|
SLS_PROJECT | 在Log Service控制台,複製目標Project名稱。 | my-sls-project-one |
SLS_LOGSTORE | 在Log Service控制台,複製目標LogStore名稱。 | my-sls-logstore-a1 |
SLS_AK_ID | 建議使用RAM使用者的AccessKey ID。 重要
| L***ky |
SLS_AK_KEY | 建議使用RAM使用者的AccessKey Secret。 | x***Xl |
SLS_CG | 消費組名,可以簡單命名為"syc_data",若消費組不存在,程式會自動建立。 | syc_data |
步驟三:啟動並驗證
啟動多消費者進行並發消費,支援的最大並發數等於總 Shard 數。
# 啟動第一個消費者進程 nohup python3 sync_data.py & # 啟動第二個消費者進程 nohup python3 sync_data.py &在Log Service控制台查看消費組狀態。
在Project列表地區,單擊目標Project。在頁簽中,單擊目標LogStore的
表徵圖,然後單擊資料消費的
表徵圖。在消費組列表中,單擊目標消費組,在Consumer Group狀態頁面,查看每個Shard消費資料的用戶端和時間。
常見問題
出現ConsumerGroupQuotaExceed錯誤
此錯誤表示超出限制,單個日誌庫(LogStore)配置消費組上限為30個,請在Log Service控制台刪除無用消費組。