All Products
Search
Document Center

Simple Log Service:Ship logs to SIEM

Last Updated:Mar 25, 2026

To ship logs to a Security Information and Event Management (SIEM) system, you can deploy an application that connects SLS to your SIEM. This application uses an SLS consumer group to pull logs and then forwards them to your SIEM using Splunk HEC or Syslog. This integrates your cloud logs with your on-premises security analytics platform.

Background

Enterprises often deploy Security Information and Event Management (SIEM) platforms, such as Splunk or QRadar, in on-premises data centers. To maintain security, these platforms typically do not expose public endpoints for receiving data. When you migrate your business to the cloud, you need to consolidate logs from cloud resources into your on-premises SIEM for unified monitoring, auditing, and threat analysis. You must establish a secure log shipping pipeline from SLS to your on-premises SIEM without compromising the security of your existing systems.

How it works

For real-time data shipping, use an SLS consumer group. A dedicated application pulls logs from SLS and forwards them to your SIEM using the Splunk HTTP Event Collector (HEC) or Syslog over TCP/TLS.

image

Core logic

  1. Log pulling: An application based on a consumer group pulls data from SLS. This mechanism supports concurrent consumption and failover.

    • Concurrency and throughput

      • To achieve higher throughput, run multiple instances of the consumer application. Each consumer instance must belong to the same consumer group and have a unique name, for example, by using a process ID as a suffix.

      • Only one consumer can process a shard at a time. Therefore, the maximum number of concurrent consumers is limited by the number of shards in the logstore. For example, if a logstore has 10 shards, you can run up to 10 consumers in parallel.

      • Under ideal network conditions:

        • A single consumer (using about 20% of a single CPU core) can consume raw logs at a rate of 10 MB/s.

        • Ten consumers can process up to 100 MB/s of raw logs.

    • High availability

      • The consumer group stores each consumer's progress as a checkpoint on the server.

      • If a consumer instance fails, another available instance automatically takes over its assigned shards and resumes processing from the last saved checkpoint. To ensure robust failover, you can run consumer instances on different machines.

      • You can run more consumer instances than the number of shards. The extra instances act as standbys for immediate failover.

  2. Data forwarding: After pulling the logs, the application formats and forwards them to your on-premises SIEM based on your configuration.

Prerequisites

  • Create a RAM user and grant permissions: The RAM user must have the AliyunLogFullAccess policy.

  • Network requirements: The machine running the application must be able to access the SLS endpoint and be on the same network as the SIEM.

    • To obtain the endpoint:

      1. Log on to the SLS console. In the project list, click the target project.

      2. Click the image icon to the right of the project name to go to the project overview page.

      3. In the Endpoint section, copy the public endpoint. The endpoint is https:// + the public endpoint.

  • Environment requirements: Prepare a Python 3 runtime environment and install the SLS Python SDK.

    1. Install the SLS Python SDK: pip install -U aliyun-log-python-sdk.

    2. Verify the installation: pip show aliyun-log-python-sdk. A successful installation returns information similar to the following.

      Name: aliyun-log-python-sdk
      Version: 0.9.12
      Summary: Aliyun log service Python client SDK
      Home-page: https://github.com/aliyun/aliyun-log-python-sdk
      Author: Aliyun

Procedure

Step 1: Prepare the application

SLS provides sample scripts for two shipping methods: Splunk HEC and Syslog. Select the method that matches your SIEM and configure the corresponding script.

  • Splunk HEC: The HTTP Event Collector (HEC) is a token-based mechanism that lets you send data in various formats securely and efficiently to Splunk over HTTP.

  • Syslog: A common logging protocol that is compatible with most SIEM systems and supports plain text format.

Splunk HEC

To ship log data to Splunk, configure the provided sync_data.py script. The script consists of three main parts:

  • main() method: The main program control logic.

  • get_option() method: Defines consumption configuration options.

    • Basic configuration: Includes connection settings for SLS and the consumer group.

    • Advanced consumer group options: Includes performance-tuning parameters. Do not modify these unless necessary.

    • SIEM (Splunk) parameters and options.

    • Add an SPL query to filter or transform data during shipping for tasks like row filtering, column trimming, or data normalization. Example:

      # SPL query
          query = "* | where instance_id in ('instance-1', 'instance-2')"
      # Create a consumer with the filter rule. The 'query' parameter is added to the configuration.
          option = LogHubConfig(endpoint, accessKeyId, accessKey, project, logstore, consumer_group, consumer_name,
                                cursor_position=CursorPosition.SPECIAL_TIMER_CURSOR,
                                cursor_start_time=cursor_start_time,
                                heartbeat_interval=heartbeat_interval,
                                data_fetch_interval=data_fetch_interval,
                                query=query)
  • SyncData(ConsumerProcessorBase) class: Contains the logic for fetching data from SLS and shipping it to Splunk. Review the comments in the code and adjust the logic as needed.

The complete script is provided below:

sync_data.py

# -*- coding: utf-8 -*-
import os
import logging
from logging.handlers import RotatingFileHandler
from aliyun.log.consumer import *
from aliyun.log.pulllog_response import PullLogResponse
from multiprocessing import current_process
import json
import socket
import requests

# Configure a rotating log file for diagnostics and troubleshooting.
root = logging.getLogger()
handler = RotatingFileHandler("{0}_{1}.log".format(os.path.basename(__file__), current_process().pid), maxBytes=100*1024*1024, backupCount=5)
handler.setFormatter(logging.Formatter(fmt='[%(asctime)s] - [%(threadName)s] - {%(module)s:%(funcName)s:%(lineno)d} %(levelname)s - %(message)s', datefmt='%Y-%m-%d %H:%M:%S'))
root.setLevel(logging.INFO)
root.addHandler(handler)
root.addHandler(logging.StreamHandler())

logger = logging.getLogger(__name__)


class SyncData(ConsumerProcessorBase):
    """
    This consumer class pulls data from SLS and sends it to Splunk.
    """
    def __init__(self, splunk_setting=None):
        
        """Initializes the consumer and verifies connectivity to Splunk."""
        super(SyncData, self).__init__()   # remember to call base's init

        assert splunk_setting, ValueError("You need to configure settings of remote target")
        assert isinstance(splunk_setting, dict), ValueError("The settings should be dict to include necessary address and confidentials.")

        self.option = splunk_setting
        self.timeout = self.option.get("timeout", 120)

        # Test the connectivity to Splunk.
        s = socket.socket()
        s.settimeout(self.timeout)
        s.connect((self.option["host"], self.option['port']))

        self.r = requests.session()
        self.r.max_redirects = 1
        self.r.verify = self.option.get("ssl_verify", True)
        self.r.headers['Authorization'] = "Splunk {}".format(self.option['token'])
        self.url = "{0}://{1}:{2}/services/collector".format("http" if not self.option.get('https') else "https", self.option['host'], self.option['port'])

        self.default_fields = {}
        if self.option.get("sourcetype"):
            self.default_fields['sourcetype'] = self.option.get("sourcetype")
        if self.option.get("source"):
            self.default_fields['source'] = self.option.get("source")
        if self.option.get("index"):
            self.default_fields['index'] = self.option.get("index")

    def process(self, log_groups, check_point_tracker):
        logs = PullLogResponse.loggroups_to_flattern_list(log_groups, time_as_str=True, decode_bytes=True)
        logger.info("Get data from shard {0}, log count: {1}".format(self.shard_id, len(logs)))
        for log in logs:
            # TODO: Replace this with your own logic to process and send logs.
            # The log is a dictionary. Example for Python 3 (all strings must be Unicode):
            #    {"__time__": "12312312", "__topic__": "topic", "field1": "value1", "field2": "value2"}
            event = {}
            event.update(self.default_fields)
            event['time'] = log[u'__time__']
            del log['__time__']

            json_topic = {"actiontrail_audit_event": ["event"] }
            topic = log.get("__topic__", "")
            if topic in json_topic:
                try:
                    for field in json_topic[topic]:
                        log[field] = json.loads(log[field])
                except Exception as ex:
                    pass
            event['event'] = json.dumps(log)

            data = json.dumps(event, sort_keys=True)

            try:
                req = self.r.post(self.url, data=data, timeout=self.timeout)
                req.raise_for_status()
            except Exception as err:
                logger.debug("Failed to connect to remote Splunk server ({0}). Exception: {1}".format(self.url, err))
                raise err

                # Add retry logic or reporting mechanisms as needed.

        logger.info("Complete send data to remote")

        self.save_checkpoint(check_point_tracker)


def get_option():
    ##########################
    # Basic configuration
    ##########################

    # Load SLS parameters and options from environment variables.
    accessKeyId = os.environ.get('SLS_AK_ID', '')
    accessKey = os.environ.get('SLS_AK_KEY', '')
    endpoint = os.environ.get('SLS_ENDPOINT', '')
    project = os.environ.get('SLS_PROJECT', '')
    logstore = os.environ.get('SLS_LOGSTORE', '')
    consumer_group = os.environ.get('SLS_CG', '')

    assert endpoint and accessKeyId and accessKey and project and logstore and consumer_group, \
        ValueError("endpoint/accessKeyId/accessKey/project/logstore/consumer_group cannot be empty")

    ##########################
    # Advanced consumer group options
    ##########################

    # It is not recommended to change the consumer name, especially for concurrent consumption.
    consumer_name = "{0}-{1}".format(consumer_group, current_process().pid)

    # Consumption starting point. This parameter is only used on the first run.
    # Subsequent runs resume from the last saved checkpoint.
    # Valid values: "begin", "end", or a specific time in ISO 8601 format.
    cursor_start_time = "2018-12-26 0:0:0"

    # Heartbeat interval in seconds. If the server does not receive a heartbeat from a consumer
    # for a specific shard within 2 * heartbeat_interval, it considers the consumer offline
    # and reassigns its tasks. Do not set this to a low value in poor network conditions.
    heartbeat_interval = 20

    # Maximum data-fetching interval. If new data is generated rapidly,
    # you do not need to adjust this parameter.
    data_fetch_interval = 1
    
    # Create a consumer group configuration object.
    option = LogHubConfig(endpoint, accessKeyId, accessKey, project, logstore, consumer_group, consumer_name,
                          cursor_position=CursorPosition.SPECIAL_TIMER_CURSOR,
                          cursor_start_time=cursor_start_time,
                          heartbeat_interval=heartbeat_interval,
                          data_fetch_interval=data_fetch_interval)
"""
    To create a consumer with a filter rule, use the following code:
    # Custom SPL query
    query = "* | where instance_id in ('instance-1', 'instance-2')"
    # Create a consumer with the filter rule. The 'query' parameter is added to the configuration.
    option = LogHubConfig(endpoint, accessKeyId, accessKey, project, logstore, consumer_group, consumer_name,
                          cursor_position=CursorPosition.SPECIAL_TIMER_CURSOR,
                          cursor_start_time=cursor_start_time,
                          heartbeat_interval=heartbeat_interval,
                          data_fetch_interval=data_fetch_interval,
                          query=query)
    """

    # Splunk options
    settings = {
                "host": "1.2.3.4",
                "port": 80,
                "token": "a0*****123",
                'https': False,              # Optional, bool
                'timeout': 120,             # Optional, int
                'ssl_verify': True,         # Optional, bool
                "sourcetype": "",            # Optional, sourcetype
                "index": "",                # Optional, index
                "source": "",               # Optional, source
            }

    return  option, settings

# Main program control logic 
def main():
    option, settings = get_option()

    logger.info("*** start to consume data...")
    worker = ConsumerWorker(SyncData, option, args=(settings,) )
    worker.start(join=True)

if __name__ == '__main__':
    main()

Syslog

Syslog defines log format specifications based on protocols such as RFC 5424 and RFC 3164. We recommend using RFC 5424. While Syslog can be transported over both UDP and TCP, TCP provides more reliable data transmission. RFC 5424 also defines a secure transport layer using TLS. If your SIEM supports Syslog over a TCP or TLS channel, use it.

To ship log data to a SIEM by using Syslog, you can configure the provided sync_data.py script. The script consists of three main parts:

  • main() method: The main program control logic.

  • get_monitor_option() method: Defines consumption configuration options.

    • Basic configuration: Includes connection settings for SLS and the consumer group.

    • Advanced consumer group options: Includes performance-tuning parameters. Do not modify these unless necessary.

    • SIEM Syslog server parameters and options.

      • Syslog facility: The program component that generated the log. This example uses syslogclient.FAC_USER as the default.

      • Syslog severity: The log level of the message. You can customize this based on the log content. This example uses syslogclient.SEV_INFO.

      • If your SIEM supports Syslog over TCP or TLS, set the proto parameter to TLS and provide the path to a valid SSL certificate.

  • SyncData(ConsumerProcessorBase) class: Contains the logic for fetching data from SLS and delivering it to a Syslog server. Review the comments in the code and adjust the logic as needed.

The complete script is provided below:

sync_data.py

# -*- coding: utf-8 -*-

import os
import logging
from logging.handlers import RotatingFileHandler
from aliyun.log.consumer import *
from aliyun.log.pulllog_response import PullLogResponse
from multiprocessing import current_process
import aliyun.log.ext.syslogclient as syslogclient
from aliyun.log.ext.syslogclient import SyslogClientRFC5424 as SyslogClient
import six
from datetime import datetime

# Configure a rotating log file for diagnostics and troubleshooting.
root = logging.getLogger()
handler = RotatingFileHandler("{0}_{1}.log".format(os.path.basename(__file__), current_process().pid), maxBytes=100*1024*1024, backupCount=5)
handler.setFormatter(logging.Formatter(fmt='[%(asctime)s] - [%(threadName)s] - {%(module)s:%(funcName)s:%(lineno)d} %(levelname)s - %(message)s', datefmt='%Y-%m-%d %H:%M:%S'))
root.setLevel(logging.INFO)
root.addHandler(handler)
root.addHandler(logging.StreamHandler())

logger = logging.getLogger(__name__)


class SyncData(ConsumerProcessorBase):
    """
   This consumer class pulls data from SLS and sends it to a Syslog server.
    """
    def __init__(self, target_setting=None):
        """
        Initializes the consumer and verifies connectivity to the Syslog server.
        """

        super(SyncData, self).__init__()   # remember to call base's init

        assert target_setting, ValueError("You need to configure settings of remote target")
        assert isinstance(target_setting, dict), ValueError("The settings should be dict to include necessary address and confidentials.")

        self.option = target_setting
        self.protocol = self.option['protocol']
        self.timeout = int(self.option.get('timeout', 120))
        self.sep = self.option.get('sep', "||")
        self.host = self.option["host"]
        self.port = int(self.option.get('port', 514))
        self.cert_path=self.option.get('cert_path', None)

        # Test connectivity. 
        with SyslogClient(self.host, self.port, proto=self.protocol, timeout=self.timeout, cert_path=self.cert_path) as client:
            pass

    def process(self, log_groups, check_point_tracker):
        logs = PullLogResponse.loggroups_to_flattern_list(log_groups, time_as_str=True, decode_bytes=True)
        logger.info("Get data from shard {0}, log count: {1}".format(self.shard_id, len(logs)))

        try:
            with SyslogClient(self.host, self.port, proto=self.protocol, timeout=self.timeout, cert_path=self.cert_path) as client:
                for log in logs:
                    # TODO: Place your own logic here to process and send logs.
                    # The log is a dictionary. Examples (all strings must be Unicode):
                    #    Python 2: {"__time__": "12312312", "__topic__": "topic", u"field1": u"value1", u"field2": u"value2"}
                    #    Python 3: {"__time__": "12312312", "__topic__": "topic", "field1": "value1", "field2": "value2"}
                
                    timestamp = datetime.fromtimestamp(int(log[u'__time__']))
                    del log['__time__']

                    io = six.StringIO()
                    first = True
                    # You can modify the format as needed. This example uses key=value pairs
                    # separated by a double vertical bar (||).
                    for k, v in six.iteritems(log):
                        io.write("{0}{1}={2}".format(self.sep, k, v))

                    data = io.getvalue()
                    # You can modify the facility or severity as needed.
                    client.log(data, facility=self.option.get("facility", None), severity=self.option.get("severity", None), timestamp=timestamp, program=self.option.get("tag", None), hostname=self.option.get("hostname", None))

        except Exception as err:
            logger.debug("Failed to connect to remote syslog server ({0}). Exception: {1}".format(self.option, err))
            # Add error handling logic, such as retries or notifications.
            raise err

        logger.info("Complete send data to remote")

        self.save_checkpoint(check_point_tracker)


def get_monitor_option():
    ##########################
    # Basic configuration
    ##########################

    # Load SLS parameters and options from environment variables.
    endpoint = os.environ.get('SLS_ENDPOINT', '')
    accessKeyId = os.environ.get('SLS_AK_ID', '')
    accessKey = os.environ.get('SLS_AK_KEY', '')
    project = os.environ.get('SLS_PROJECT', '')
    logstore = os.environ.get('SLS_LOGSTORE', '')
    consumer_group = os.environ.get('SLS_CG', '')

    assert endpoint and accessKeyId and accessKey and project and logstore and consumer_group, \
        ValueError("endpoint/accessKeyId/accessKey/project/logstore/consumer_group cannot be empty")

    ##########################
    # Advanced consumer group options
    ##########################

    # It is not recommended to change the consumer name, especially for concurrent consumption.
    consumer_name = "{0}-{1}".format(consumer_group, current_process().pid)

    # Consumption starting point. This parameter is only used on the first run.
    # Subsequent runs resume from the last saved checkpoint.
    # Valid values: "begin", "end", or a specific time in ISO 8601 format.
    cursor_start_time = "2019-1-1 0:0:0+8:00"

    # Heartbeat interval in seconds. If the server does not receive a heartbeat from a consumer
    # for a specific shard within 2 * heartbeat_interval, it considers the consumer offline
    # and reassigns its tasks. Do not set this to a low value in poor network conditions.
    heartbeat_interval = 20

    # Maximum data-fetching interval. If new data is generated rapidly,
    # you do not need to adjust this parameter.
    data_fetch_interval = 1

    # Create a consumer group configuration object.
    option = LogHubConfig(endpoint, accessKeyId, accessKey, project, logstore, consumer_group, consumer_name,
                          cursor_position=CursorPosition.SPECIAL_TIMER_CURSOR,
                          cursor_start_time=cursor_start_time,
                          heartbeat_interval=heartbeat_interval,
                          data_fetch_interval=data_fetch_interval)

    # Syslog server parameters and options
    settings = {
                "host": "1.2.3.4", # Required.
                "port": 514,       # Required. Port number.
                "protocol": "tcp", # Required. Can be TCP, UDP, or TLS (Python 3 only).
                "sep": "||",      # Required. Separator for key=value pairs. Default is '||'.
                "cert_path": None,  # Optional. Path to the TLS certificate file.
                "timeout": 120,   # Optional. Timeout in seconds. Default is 120.
                "facility": syslogclient.FAC_USER,  # Optional. See other syslogclient.FAC_* values.
                "severity": syslogclient.SEV_INFO,  # Optional. See other syslogclient.SEV_* values.
                "hostname": None, # Optional. Hostname. Defaults to the local machine's hostname.
                "tag": None # Optional. Tag. Defaults to a hyphen (-).
    }

    return option, settings

# Main program control logic
def main():
    option, settings = get_monitor_option()

    logger.info("*** start to consume data...")
    worker = ConsumerWorker(SyncData, option, args=(settings,) )
    worker.start(join=True)


if __name__ == '__main__':
    main()

Step 2: Configure environment variables

After configuring the program, perform the system environment variable configuration in the table.

Parameter

Value

Example

SLS_ENDPOINT

  1. Log on to the SLS console and click your target project.

  2. Click the image icon to the right of the project name to go to the project overview page.

  3. In the Endpoint section, copy the public endpoint. The full endpoint is https:// + your public endpoint.

If the endpoint is prefixed with https://, for example https://cn-beijing.log.aliyuncs.com, the application automatically uses HTTPS to connect to SLS. The server certificate *.aliyuncs.com is issued by GlobalSign and is trusted by most systems. If your system does not trust this certificate, download the certificate and follow the Certificate installation instructions.

https://cn-beijing.log.aliyuncs.com

SLS_PROJECT

The name of your target project in the SLS console.

my-sls-project-one

SLS_LOGSTORE

The name of your target logstore in the SLS console.

my-sls-logstore-a1

SLS_AK_ID

The AccessKey ID of your RAM user.

Important
  • An Alibaba Cloud account's AccessKey pair grants full API access. For better security, use a RAM user's AccessKey pair for API calls or daily operations.

  • To prevent security risks from credential leaks, never hard-code your AccessKey ID and AccessKey Secret in your application code.

L***ky

SLS_AK_KEY

The AccessKey Secret of your RAM user.

x***Xl

SLS_CG

The name of the consumer group. You can use a simple name like "sync_data". If the specified group does not exist, the application creates it automatically.

sync_data

Step 3: Start and verify

  1. Start multiple consumer processes to enable concurrent processing. The maximum number of concurrent processes equals the number of shards in your logstore.

    # Start the first consumer process
    nohup python3 sync_data.py &
    # Start the second consumer process
    nohup python3 sync_data.py &
  2. View the status of the consumer group in the SLS console.

    1. In the project list, click your target project. Go to the Log Storage > Logstores tab. Click the 展开节点 icon next to your target logstore, and then click the 展开节点 icon next to Data Consumption.

    2. In the consumer group list, click your target consumer group. On the Consumer Group Status tab, view the consumer client and progress for each shard.

FAQ

ConsumerGroupQuotaExceed error

This error indicates that you have exceeded the quota for consumer groups. A single logstore can have a maximum of 30 consumer groups. To resolve this issue, delete any unused consumer groups in the SLS console.