All Products
Search
Document Center

Simple Log Service:Ship logs to an on-premises SIEM

Last Updated:May 27, 2026

Deploy a consumer group application to pull logs from SLS and forward them to your on-premises SIEM through Splunk HEC or Syslog.

Background

SIEM platforms such as Splunk and QRadar typically run in on-premises data centers without public-facing endpoints. After you migrate to the cloud, you need a secure pipeline to ship cloud resource logs to your on-premises SIEM for monitoring, auditing, and threat analysis.

How it works

A consumer group application pulls logs from SLS in real time and forwards them to your SIEM through Splunk HEC or Syslog over TCP/TLS.

image

Core logic

  1. Log pulling: A consumer group application pulls data from SLS with built-in concurrency and failover.

    • Concurrency and throughput

      • For higher throughput, run multiple consumer instances in the same group. Each instance must have a unique name, such as a process ID suffix.

      • Each shard is processed by one consumer at a time. Maximum concurrency equals the shard count — for example, 10 shards support up to 10 parallel consumers.

      • Under ideal network conditions:

        • A single consumer (~20% of one CPU core) can consume raw logs at 10 MB/s.

        • Ten consumers can process up to 100 MB/s.

    • High availability

      • The consumer group stores each consumer's progress as a server-side checkpoint.

      • If a consumer fails, another instance takes over its shards and resumes from the last checkpoint. Deploy instances on separate machines for failover.

      • Extra instances beyond the shard count act as standbys for immediate failover.

  2. Data forwarding: The application formats and forwards pulled logs to your on-premises SIEM.

Prerequisites

  • Create a RAM user and grant permissions. The RAM user must have the AliyunLogFullAccess policy.

  • Network: The machine running the application must reach both the SLS endpoint and the SIEM.

    • To obtain the endpoint:

      1. Log on to the SLS console. In the project list, click the target project.

      2. Click the image icon to the right of the project name to go to the project overview page.

      3. In the Endpoint section, copy the public endpoint. The endpoint is https:// + the public endpoint.

  • Python 3 runtime with the SLS Python SDK installed.

    1. Install the SLS Python SDK: pip install -U aliyun-log-python-sdk.

    2. Verify: pip show aliyun-log-python-sdk. Expected output:

      Name: aliyun-log-python-sdk
      Version: 0.9.12
      Summary: Aliyun log service Python client SDK
      Home-page: https://github.com/aliyun/aliyun-log-python-sdk
      Author: Aliyun

Procedure

Step 1: Prepare the application

SLS provides sample scripts for two shipping methods. Select the one that matches your SIEM:

  • Splunk HEC: A token-based HTTP collector for sending data to Splunk.

  • Syslog: A standard logging protocol compatible with most SIEM systems.

Splunk HEC

Configure the sync_data.py script. It has three main parts:

  • main() method: The main program control logic.

  • get_option() method: Defines consumption configuration options.

    • Basic configuration: Connection settings for SLS and the consumer group.

    • Advanced consumer group options: Performance-tuning parameters. Do not modify unless necessary.

    • SIEM (Splunk) parameters and options.

    • Add an SPL query to filter or transform data during shipping for tasks like row filtering, column trimming, or data normalization. Example:

      # SPL query
          query = "* | where instance_id in ('instance-1', 'instance-2')"
      # Create a consumer with the filter rule. The 'query' parameter is added to the configuration.
          option = LogHubConfig(endpoint, accessKeyId, accessKey, project, logstore, consumer_group, consumer_name,
                                cursor_position=CursorPosition.SPECIAL_TIMER_CURSOR,
                                cursor_start_time=cursor_start_time,
                                heartbeat_interval=heartbeat_interval,
                                data_fetch_interval=data_fetch_interval,
                                query=query)
  • SyncData(ConsumerProcessorBase) class: Fetches data from SLS and ships it to Splunk. Review the code comments and adjust as needed.

Complete script:

sync_data.py

# -*- coding: utf-8 -*-
import os
import logging
from logging.handlers import RotatingFileHandler
from aliyun.log.consumer import *
from aliyun.log.pulllog_response import PullLogResponse
from multiprocessing import current_process
import json
import socket
import requests

# Configure a rotating log file for diagnostics and troubleshooting.
root = logging.getLogger()
handler = RotatingFileHandler("{0}_{1}.log".format(os.path.basename(__file__), current_process().pid), maxBytes=100*1024*1024, backupCount=5)
handler.setFormatter(logging.Formatter(fmt='[%(asctime)s] - [%(threadName)s] - {%(module)s:%(funcName)s:%(lineno)d} %(levelname)s - %(message)s', datefmt='%Y-%m-%d %H:%M:%S'))
root.setLevel(logging.INFO)
root.addHandler(handler)
root.addHandler(logging.StreamHandler())

logger = logging.getLogger(__name__)


class SyncData(ConsumerProcessorBase):
    """
    This consumer class pulls data from SLS and sends it to Splunk.
    """
    def __init__(self, splunk_setting=None):
        
        """Initializes the consumer and verifies connectivity to Splunk."""
        super(SyncData, self).__init__()   # remember to call base's init

        assert splunk_setting, ValueError("You need to configure settings of remote target")
        assert isinstance(splunk_setting, dict), ValueError("The settings should be dict to include necessary address and confidentials.")

        self.option = splunk_setting
        self.timeout = self.option.get("timeout", 120)

        # Test the connectivity to Splunk.
        s = socket.socket()
        s.settimeout(self.timeout)
        s.connect((self.option["host"], self.option['port']))

        self.r = requests.session()
        self.r.max_redirects = 1
        self.r.verify = self.option.get("ssl_verify", True)
        self.r.headers['Authorization'] = "Splunk {}".format(self.option['token'])
        self.url = "{0}://{1}:{2}/services/collector".format("http" if not self.option.get('https') else "https", self.option['host'], self.option['port'])

        self.default_fields = {}
        if self.option.get("sourcetype"):
            self.default_fields['sourcetype'] = self.option.get("sourcetype")
        if self.option.get("source"):
            self.default_fields['source'] = self.option.get("source")
        if self.option.get("index"):
            self.default_fields['index'] = self.option.get("index")

    def process(self, log_groups, check_point_tracker):
        logs = PullLogResponse.loggroups_to_flattern_list(log_groups, time_as_str=True, decode_bytes=True)
        logger.info("Get data from shard {0}, log count: {1}".format(self.shard_id, len(logs)))
        for log in logs:
            # TODO: Replace this with your own logic to process and send logs.
            # The log is a dictionary. Example for Python 3 (all strings must be Unicode):
            #    {"__time__": "12312312", "__topic__": "topic", "field1": "value1", "field2": "value2"}
            event = {}
            event.update(self.default_fields)
            event['time'] = log[u'__time__']
            del log['__time__']

            json_topic = {"actiontrail_audit_event": ["event"] }
            topic = log.get("__topic__", "")
            if topic in json_topic:
                try:
                    for field in json_topic[topic]:
                        log[field] = json.loads(log[field])
                except Exception as ex:
                    pass
            event['event'] = json.dumps(log)

            data = json.dumps(event, sort_keys=True)

            try:
                req = self.r.post(self.url, data=data, timeout=self.timeout)
                req.raise_for_status()
            except Exception as err:
                logger.debug("Failed to connect to remote Splunk server ({0}). Exception: {1}".format(self.url, err))
                raise err

                # Add retry logic or reporting mechanisms as needed.

        logger.info("Complete send data to remote")

        self.save_checkpoint(check_point_tracker)


def get_option():
    ##########################
    # Basic configuration
    ##########################

    # Load SLS parameters and options from environment variables.
    accessKeyId = os.environ.get('SLS_AK_ID', '')
    accessKey = os.environ.get('SLS_AK_KEY', '')
    endpoint = os.environ.get('SLS_ENDPOINT', '')
    project = os.environ.get('SLS_PROJECT', '')
    logstore = os.environ.get('SLS_LOGSTORE', '')
    consumer_group = os.environ.get('SLS_CG', '')

    assert endpoint and accessKeyId and accessKey and project and logstore and consumer_group, \
        ValueError("endpoint/accessKeyId/accessKey/project/logstore/consumer_group cannot be empty")

    ##########################
    # Advanced consumer group options
    ##########################

    # It is not recommended to change the consumer name, especially for concurrent consumption.
    consumer_name = "{0}-{1}".format(consumer_group, current_process().pid)

    # Consumption starting point. This parameter is only used on the first run.
    # Subsequent runs resume from the last saved checkpoint.
    # Valid values: "begin", "end", or a specific time in ISO 8601 format.
    cursor_start_time = "2018-12-26 0:0:0"

    # Heartbeat interval in seconds. If the server does not receive a heartbeat from a consumer
    # for a specific shard within 2 * heartbeat_interval, it considers the consumer offline
    # and reassigns its tasks. Do not set this to a low value in poor network conditions.
    heartbeat_interval = 20

    # Maximum data-fetching interval. If new data is generated rapidly,
    # you do not need to adjust this parameter.
    data_fetch_interval = 1
    
    # Create a consumer group configuration object.
    option = LogHubConfig(endpoint, accessKeyId, accessKey, project, logstore, consumer_group, consumer_name,
                          cursor_position=CursorPosition.SPECIAL_TIMER_CURSOR,
                          cursor_start_time=cursor_start_time,
                          heartbeat_interval=heartbeat_interval,
                          data_fetch_interval=data_fetch_interval)
"""
    To create a consumer with a filter rule, use the following code:
    # Custom SPL query
    query = "* | where instance_id in ('instance-1', 'instance-2')"
    # Create a consumer with the filter rule. The 'query' parameter is added to the configuration.
    option = LogHubConfig(endpoint, accessKeyId, accessKey, project, logstore, consumer_group, consumer_name,
                          cursor_position=CursorPosition.SPECIAL_TIMER_CURSOR,
                          cursor_start_time=cursor_start_time,
                          heartbeat_interval=heartbeat_interval,
                          data_fetch_interval=data_fetch_interval,
                          query=query)
    """

    # Splunk options
    settings = {
                "host": "1.2.3.4",
                "port": 80,
                "token": "a0*****123",
                'https': False,              # Optional, bool
                'timeout': 120,             # Optional, int
                'ssl_verify': True,         # Optional, bool
                "sourcetype": "",            # Optional, sourcetype
                "index": "",                # Optional, index
                "source": "",               # Optional, source
            }

    return  option, settings

# Main program control logic 
def main():
    option, settings = get_option()

    logger.info("*** start to consume data...")
    worker = ConsumerWorker(SyncData, option, args=(settings,) )
    worker.start(join=True)

if __name__ == '__main__':
    main()

Syslog

Syslog supports RFC 5424 and RFC 3164 formats. Use RFC 5424 with TCP or TLS transport for reliable, secure delivery.

Configure the sync_data.py script. It has three main parts:

  • main() method: The main program control logic.

  • get_monitor_option() method: Defines consumption configuration options.

    • Basic configuration: Connection settings for SLS and the consumer group.

    • Advanced consumer group options: Performance-tuning parameters. Do not modify unless necessary.

    • SIEM Syslog server parameters and options.

      • Syslog facility: The program component that generated the log. This example uses syslogclient.FAC_USER as the default.

      • Syslog severity: The log level. Customize based on log content. This example uses syslogclient.SEV_INFO.

      • If your SIEM supports Syslog over TCP or TLS, set the proto parameter to TLS and provide the path to a valid SSL certificate.

  • SyncData(ConsumerProcessorBase) class: Fetches data from SLS and delivers it to a Syslog server. Review the code comments and adjust as needed.

Complete script:

sync_data.py

# -*- coding: utf-8 -*-

import os
import logging
from logging.handlers import RotatingFileHandler
from aliyun.log.consumer import *
from aliyun.log.pulllog_response import PullLogResponse
from multiprocessing import current_process
import aliyun.log.ext.syslogclient as syslogclient
from aliyun.log.ext.syslogclient import SyslogClientRFC5424 as SyslogClient
import six
from datetime import datetime

# Configure a rotating log file for diagnostics and troubleshooting.
root = logging.getLogger()
handler = RotatingFileHandler("{0}_{1}.log".format(os.path.basename(__file__), current_process().pid), maxBytes=100*1024*1024, backupCount=5)
handler.setFormatter(logging.Formatter(fmt='[%(asctime)s] - [%(threadName)s] - {%(module)s:%(funcName)s:%(lineno)d} %(levelname)s - %(message)s', datefmt='%Y-%m-%d %H:%M:%S'))
root.setLevel(logging.INFO)
root.addHandler(handler)
root.addHandler(logging.StreamHandler())

logger = logging.getLogger(__name__)


class SyncData(ConsumerProcessorBase):
    """
   This consumer class pulls data from SLS and sends it to a Syslog server.
    """
    def __init__(self, target_setting=None):
        """
        Initializes the consumer and verifies connectivity to the Syslog server.
        """

        super(SyncData, self).__init__()   # remember to call base's init

        assert target_setting, ValueError("You need to configure settings of remote target")
        assert isinstance(target_setting, dict), ValueError("The settings should be dict to include necessary address and confidentials.")

        self.option = target_setting
        self.protocol = self.option['protocol']
        self.timeout = int(self.option.get('timeout', 120))
        self.sep = self.option.get('sep', "||")
        self.host = self.option["host"]
        self.port = int(self.option.get('port', 514))
        self.cert_path=self.option.get('cert_path', None)

        # Test connectivity. 
        with SyslogClient(self.host, self.port, proto=self.protocol, timeout=self.timeout, cert_path=self.cert_path) as client:
            pass

    def process(self, log_groups, check_point_tracker):
        logs = PullLogResponse.loggroups_to_flattern_list(log_groups, time_as_str=True, decode_bytes=True)
        logger.info("Get data from shard {0}, log count: {1}".format(self.shard_id, len(logs)))

        try:
            with SyslogClient(self.host, self.port, proto=self.protocol, timeout=self.timeout, cert_path=self.cert_path) as client:
                for log in logs:
                    # TODO: Place your own logic here to process and send logs.
                    # The log is a dictionary. Examples (all strings must be Unicode):
                    #    Python 2: {"__time__": "12312312", "__topic__": "topic", u"field1": u"value1", u"field2": u"value2"}
                    #    Python 3: {"__time__": "12312312", "__topic__": "topic", "field1": "value1", "field2": "value2"}
                
                    timestamp = datetime.fromtimestamp(int(log[u'__time__']))
                    del log['__time__']

                    io = six.StringIO()
                    first = True
                    # You can modify the format as needed. This example uses key=value pairs
                    # separated by a double vertical bar (||).
                    for k, v in six.iteritems(log):
                        io.write("{0}{1}={2}".format(self.sep, k, v))

                    data = io.getvalue()
                    # You can modify the facility or severity as needed.
                    client.log(data, facility=self.option.get("facility", None), severity=self.option.get("severity", None), timestamp=timestamp, program=self.option.get("tag", None), hostname=self.option.get("hostname", None))

        except Exception as err:
            logger.debug("Failed to connect to remote syslog server ({0}). Exception: {1}".format(self.option, err))
            # Add error handling logic, such as retries or notifications.
            raise err

        logger.info("Complete send data to remote")

        self.save_checkpoint(check_point_tracker)


def get_monitor_option():
    ##########################
    # Basic configuration
    ##########################

    # Load SLS parameters and options from environment variables.
    endpoint = os.environ.get('SLS_ENDPOINT', '')
    accessKeyId = os.environ.get('SLS_AK_ID', '')
    accessKey = os.environ.get('SLS_AK_KEY', '')
    project = os.environ.get('SLS_PROJECT', '')
    logstore = os.environ.get('SLS_LOGSTORE', '')
    consumer_group = os.environ.get('SLS_CG', '')

    assert endpoint and accessKeyId and accessKey and project and logstore and consumer_group, \
        ValueError("endpoint/accessKeyId/accessKey/project/logstore/consumer_group cannot be empty")

    ##########################
    # Advanced consumer group options
    ##########################

    # It is not recommended to change the consumer name, especially for concurrent consumption.
    consumer_name = "{0}-{1}".format(consumer_group, current_process().pid)

    # Consumption starting point. This parameter is only used on the first run.
    # Subsequent runs resume from the last saved checkpoint.
    # Valid values: "begin", "end", or a specific time in ISO 8601 format.
    cursor_start_time = "2019-1-1 0:0:0+8:00"

    # Heartbeat interval in seconds. If the server does not receive a heartbeat from a consumer
    # for a specific shard within 2 * heartbeat_interval, it considers the consumer offline
    # and reassigns its tasks. Do not set this to a low value in poor network conditions.
    heartbeat_interval = 20

    # Maximum data-fetching interval. If new data is generated rapidly,
    # you do not need to adjust this parameter.
    data_fetch_interval = 1

    # Create a consumer group configuration object.
    option = LogHubConfig(endpoint, accessKeyId, accessKey, project, logstore, consumer_group, consumer_name,
                          cursor_position=CursorPosition.SPECIAL_TIMER_CURSOR,
                          cursor_start_time=cursor_start_time,
                          heartbeat_interval=heartbeat_interval,
                          data_fetch_interval=data_fetch_interval)

    # Syslog server parameters and options
    settings = {
                "host": "1.2.3.4", # Required.
                "port": 514,       # Required. Port number.
                "protocol": "tcp", # Required. Can be TCP, UDP, or TLS (Python 3 only).
                "sep": "||",      # Required. Separator for key=value pairs. Default is '||'.
                "cert_path": None,  # Optional. Path to the TLS certificate file.
                "timeout": 120,   # Optional. Timeout in seconds. Default is 120.
                "facility": syslogclient.FAC_USER,  # Optional. See other syslogclient.FAC_* values.
                "severity": syslogclient.SEV_INFO,  # Optional. See other syslogclient.SEV_* values.
                "hostname": None, # Optional. Hostname. Defaults to the local machine's hostname.
                "tag": None # Optional. Tag. Defaults to a hyphen (-).
    }

    return option, settings

# Main program control logic
def main():
    option, settings = get_monitor_option()

    logger.info("*** start to consume data...")
    worker = ConsumerWorker(SyncData, option, args=(settings,) )
    worker.start(join=True)


if __name__ == '__main__':
    main()

Step 2: Configure environment variables

Set the following environment variables:

Parameter

Value

Example

SLS_ENDPOINT

  1. Log on to the SLS console and click your target project.

  2. Click the image icon to the right of the project name to go to the project overview page.

  3. In the Endpoint section, copy the public endpoint. The full endpoint is https:// + your public endpoint.

An https:// prefix (for example, https://cn-beijing.log.aliyuncs.com) enables automatic HTTPS. The *.aliyuncs.com certificate is issued by GlobalSign and trusted by most systems. If your system does not trust it, follow the Certificate installation instructions.

https://cn-beijing.log.aliyuncs.com

SLS_PROJECT

The name of your project in the SLS console.

my-sls-project-one

SLS_LOGSTORE

The name of your Logstore in the SLS console.

my-sls-logstore-a1

SLS_AK_ID

AccessKey ID of your RAM user.

Important
  • An Alibaba Cloud account AccessKey pair grants full API access. Use a RAM user's AccessKey pair for better security.

  • Never hard-code your AccessKey ID or AccessKey Secret in application code.

L***ky

SLS_AK_KEY

AccessKey Secret of your RAM user.

x***Xl

SLS_CG

Consumer group name. Auto-created if it does not exist.

sync_data

Step 3: Start and verify

  1. Start multiple consumer processes for concurrent processing. Maximum concurrency equals the shard count of your Logstore.

    # Start the first consumer process
    nohup python3 sync_data.py &
    # Start the second consumer process
    nohup python3 sync_data.py &
  2. View the status of the consumer group in the SLS console.

    1. In the project list, click your target project. Go to the Log Storage > Logstores tab. Click the Expand icon next to your target logstore, and then click the Expand icon next to Data Consumption.

    2. Click your consumer group. On the Consumer Group Status tab, view the consumer client and progress for each shard.

FAQ

ConsumerGroupQuotaExceed error

Each Logstore supports up to 30 consumer groups. Delete unused groups in the SLS console to free quota.