All Products
Search
Document Center

Simple Log Service:Ship logs to an on-premises SIEM

Last Updated:Sep 20, 2025

Ship logs from Simple Log Service (SLS) to an on-premises Security Information and Event Management (SIEM) platform, such as Splunk or QRadar. This lets you consolidate cloud logs with your existing on-premises security analytics platform for unified monitoring, auditing, and threat analysis.

How it works

A consumer application, deployed within your on-premises network, is responsible for fetching log data. It uses an SLS consumer group to pull logs in real time and then forwards them to your SIEM using protocols such as the Splunk HTTP Event Collector (HEC) or Syslog over TCP/TLS.

image

The process is built on a pull-based architecture, which provides the following key benefits by design:

  • Security: The pull model ensures all connections are initiated from your secure network. You do not need to open any inbound firewall ports, which preserves your on-premises security posture.

  • High throughput and scalability: High throughput is achieved by scaling horizontally. You can run multiple consumer instances concurrently, and the consumer group automatically balances the workload across all active instances.

  • Reliability: The consumer group provides at-least-once delivery guarantees. If a consumer instance fails, the shards it was processing are automatically reassigned to other healthy instances in the group. Consumption resumes from the last saved checkpoint, preventing data loss.

Prerequisites

  • Permission: Create a Resource Access Management (RAM) user, and attach the AliyunLogFullAccess policy to the user. For more information, see the related document.

  • Network requirements: The machine where the program runs must be able to access the SLS endpoint and be in the same network as the SIEM.

    • To obtain the endpoint:

      1. Log on to the SLS console. In the project list, click the target project.

      2. Click the image icon to the right of the project name to go to the project overview page.

      3. In the Endpoint section, view the public endpoint. The endpoint is https:// + public endpoint.

  • Environment requirements: A Python 3 runtime environment and the SLS Python SDK.

    1. Install the SLS Python SDK: pip install -U aliyun-log-python-sdk.

    2. Verify the installation: pip show aliyun-log-python-sdk. If the following information is returned, the installation is successful.

      Name: aliyun-log-python-sdk
      Version: 0.9.12
      Summary: Aliyun log service Python client SDK
      Home-page: https://github.com/aliyun/aliyun-log-python-sdk
      Author: Aliyun

Procedure

Step 1: Prepare the application

SLS provides two shipping methods:

  • Splunk HEC: HEC is a token-based mechanism that lets you send logs in various data formats directly to Splunk over HTTP securely and efficiently.

  • Syslog: A common log channel compatible with most SIEMs and supports text format.

Splunk HEC

To ship log data to Splunk, configure sync_data.py. The code has three main parts:

  • main() method: Main program entrypoint.

  • get_option() method: Consumption configuration options.

    • Basic configuration options: Includes connection settings for SLS and consumer group settings.

    • Advanced options for the consumer group: Performance-tuning parameters. Do not modify them unless necessary.

    • SIEM (Splunk)-related parameters and options.

    • To perform data cleansing during the shipping process (such as row filtering, column trimming, or data normalization), add rules using SPL queries. For example:

      # SPL statement
          query = "* | where instance_id in ('instance-1', 'instance-2')"
      # Create a consumer based on rules. Compared to normal consumption, the query parameter is added at the end of the parameter list.
          option = LogHubConfig(endpoint, accessKeyId, accessKey, project, logstore, consumer_group, consumer_name,
                                cursor_position=CursorPosition.SPECIAL_TIMER_CURSOR,
                                cursor_start_time=cursor_start_time,
                                heartbeat_interval=heartbeat_interval,
                                data_fetch_interval=data_fetch_interval,
                                query=query)
  • SyncData(ConsumerProcessorBase): Contains the logic for fetching data from SLS and shipping it to Splunk. Carefully review the comments in the code and make adjustments as needed.

The complete code is as follows:

sync_data.py

# -*- coding: utf-8 -*-
import os
import logging
from logging.handlers import RotatingFileHandler
from aliyun.log.consumer import *
from aliyun.log.pulllog_response import PullLogResponse
from multiprocessing import current_process
import json
import socket
import requests

# Configure a rotating log file for diagnostics and troubleshooting.
root = logging.getLogger()
handler = RotatingFileHandler("{0}_{1}.log".format(os.path.basename(__file__), current_process().pid), maxBytes=100*1024*1024, backupCount=5)
handler.setFormatter(logging.Formatter(fmt='[%(asctime)s] - [%(threadName)s] - {%(module)s:%(funcName)s:%(lineno)d} %(levelname)s - %(message)s', datefmt='%Y-%m-%d %H:%M:%S'))
root.setLevel(logging.INFO)
root.addHandler(handler)
root.addHandler(logging.StreamHandler())

logger = logging.getLogger(__name__)


class SyncData(ConsumerProcessorBase):
    """
    The consumer consumes data from Simple Log Service and sends it to Splunk.
    """
    def __init__(self, splunk_setting=None):
        
        """Initialize and verify the connectivity to Splunk."""
        super(SyncData, self).__init__()   # remember to call base's init

        assert splunk_setting, ValueError("You need to configure settings of remote target")
        assert isinstance(splunk_setting, dict), ValueError("The settings should be dict to include necessary address and confidentials.")

        self.option = splunk_setting
        self.timeout = self.option.get("timeout", 120)

        # Test the connectivity to Splunk.
        s = socket.socket()
        s.settimeout(self.timeout)
        s.connect((self.option["host"], self.option['port']))

        self.r = requests.session()
        self.r.max_redirects = 1
        self.r.verify = self.option.get("ssl_verify", True)
        self.r.headers['Authorization'] = "Splunk {}".format(self.option['token'])
        self.url = "{0}://{1}:{2}/services/collector".format("http" if not self.option.get('https') else "https", self.option['host'], self.option['port'])

        self.default_fields = {}
        if self.option.get("sourcetype"):
            self.default_fields['sourcetype'] = self.option.get("sourcetype")
        if self.option.get("source"):
            self.default_fields['source'] = self.option.get("source")
        if self.option.get("index"):
            self.default_fields['index'] = self.option.get("index")

    def process(self, log_groups, check_point_tracker):
        logs = PullLogResponse.loggroups_to_flattern_list(log_groups, time_as_str=True, decode_bytes=True)
        logger.info("Get data from shard {0}, log count: {1}".format(self.shard_id, len(logs)))
        for log in logs:
            # Modify the code here: Replace it with the synchronous processing code for sending logs to the remote end.
            # The log format is a dictionary. Example (Note: All strings must be Unicode encoded):
            #    Python3: {"__time__": "12312312", "__topic__": "topic", "field1": "value1", "field2": "value2"}
            event = {}
            event.update(self.default_fields)
            event['time'] = log[u'__time__']
            del log['__time__']

            json_topic = {"actiontrail_audit_event": ["event"] }
            topic = log.get("__topic__", "")
            if topic in json_topic:
                try:
                    for field in json_topic[topic]:
                        log[field] = json.loads(log[field])
                except Exception as ex:
                    pass
            event['event'] = json.dumps(log)

            data = json.dumps(event, sort_keys=True)

            try:
                req = self.r.post(self.url, data=data, timeout=self.timeout)
                req.raise_for_status()
            except Exception as err:
                logger.debug("Failed to connect to remote Splunk server ({0}). Exception: {1}".format(self.url, err))
                raise err

                # Add some retries or reports as needed.

        logger.info("Complete send data to remote")

        self.save_checkpoint(check_point_tracker)


def get_option():
    ##########################
    # Basic configuration items
    ##########################

    # Load Simple Log Service parameters and options from environment variables.
    accessKeyId = os.environ.get('SLS_AK_ID', '')
    accessKey = os.environ.get('SLS_AK_KEY', '')
    endpoint = os.environ.get('SLS_ENDPOINT', '')
    project = os.environ.get('SLS_PROJECT', '')
    logstore = os.environ.get('SLS_LOGSTORE', '')
    consumer_group = os.environ.get('SLS_CG', '')

    assert endpoint and accessKeyId and accessKey and project and logstore and consumer_group, \
        ValueError("endpoint/access_id/key/project/logstore/consumer_group/name cannot be empty")

    ##########################
    # Advanced options for the consumer group
    ##########################

    # It is generally not recommended to modify the consumer name, especially when concurrent consumption is required.
    consumer_name = "{0}-{1}".format(consumer_group, current_process().pid)

    # The starting point of consumption. This parameter is valid when the program is run for the first time. Subsequent runs will resume consumption from the last saved checkpoint.
    # Use "begin", "end", or a specific ISO time format.
    cursor_start_time = "2018-12-26 0:0:0"

    # Heartbeat interval. If the server does not receive a heartbeat report for a specific shard within twice the interval, the server considers the corresponding consumer to be offline and reassigns the task.
    # When the network environment is poor, it is not recommended to set a short interval.
    heartbeat_interval = 20

    # The maximum interval for data consumption. If data is generated quickly, you do not need to adjust this parameter.
    data_fetch_interval = 1
    
    # Build a consumer group and a consumer.
    option = LogHubConfig(endpoint, accessKeyId, accessKey, project, logstore, consumer_group, consumer_name,
                          cursor_position=CursorPosition.SPECIAL_TIMER_CURSOR,
                          cursor_start_time=cursor_start_time,
                          heartbeat_interval=heartbeat_interval,
                          data_fetch_interval=data_fetch_interval)
"""
    When building a consumer based on rules, use the following code:
    # Custom SPL statement
    query = "* | where instance_id in ('instance-1', 'instance-2')"
    # Build consumption based on rules. Compared to normal consumption, the query parameter is added at the end of the parameter list.
    option = LogHubConfig(endpoint, accessKeyId, accessKey, project, logstore, consumer_group, consumer_name,
                          cursor_position=CursorPosition.SPECIAL_TIMER_CURSOR,
                          cursor_start_time=cursor_start_time,
                          heartbeat_interval=heartbeat_interval,
                          data_fetch_interval=data_fetch_interval,
                          query=query)
    """

    # Splunk options
    settings = {
                "host": "1.2.3.4",
                "port": 80,
                "token": "a0*****123",
                'https': False,              # Optional, bool
                'timeout': 120,             # Optional, int
                'ssl_verify': True,         # Optional, bool
                "sourcetype": "",            # Optional, sourcetype
                "index": "",                # Optional, index
                "source": "",               # Optional, source
            }

    return  option, settings

# Main program entrypoint  
def main():
    option, settings = get_option()

    logger.info("*** start to consume data...")
    worker = ConsumerWorker(SyncData, option, args=(settings,) )
    worker.start(join=True)

if __name__ == '__main__':
    main()

Syslog

Syslog defines log format specifications primarily based on RFC5424 and RFC3164. We recommend using the RFC 5424 protocol. While both TCP and UDP can transport Syslog, TCP provides more reliable data transmission than UDP. The RFC 5424 protocol also defines a secure transport layer using TLS. If your SIEM supports a TCP or TLS channel for Syslog, we recommend using that channel.

To ship log data to a SIEM by using Syslog, configure sync_data.py. The code has three main parts:

  • main() method: Main program entrypoint.

  • get_monitor_option() method: Consumption configuration options.

    • Basic configuration options: Includes connection settings for SLS and consumer group settings.

    • Advanced options for the consumer group: Performance-tuning parameters. Do not modify them unless necessary.

    • Parameters and options related to the SIEM's Syslog server.

      • Syslog facility: Program component. The example uses syslogclient.FAC_USER as the default.

      • Syslog severity: Log level. Set the log level for specific content as needed. Here, syslogclient.SEV_INFO is selected.

      • If the SIEM supports Syslog channels that are based on TCP or TLS, set proto to TLS and configure the correct SSL Certificate.

  • SyncData(ConsumerProcessorBase): Contains the logic for how to retrieve data from SLS and deliver it to the SIEM Syslog server. Read the comments in the code carefully and make adjustments as needed.

The complete code is as follows:

sync_data.py

# -*- coding: utf-8 -*-

import os
import logging
from logging.handlers import RotatingFileHandler
from aliyun.log.consumer import *
from aliyun.log.pulllog_response import PullLogResponse
from multiprocessing import current_process
import aliyun.log.ext.syslogclient as syslogclient
from aliyun.log.ext.syslogclient import SyslogClientRFC5424 as SyslogClient
import six
from datetime import datetime

# Configure a rotating log file for diagnostics and troubleshooting.
root = logging.getLogger()
handler = RotatingFileHandler("{0}_{1}.log".format(os.path.basename(__file__), current_process().pid), maxBytes=100*1024*1024, backupCount=5)
handler.setFormatter(logging.Formatter(fmt='[%(asctime)s] - [%(threadName)s] - {%(module)s:%(funcName)s:%(lineno)d} %(levelname)s - %(message)s', datefmt='%Y-%m-%d %H:%M:%S'))
root.setLevel(logging.INFO)
root.addHandler(handler)
root.addHandler(logging.StreamHandler())

logger = logging.getLogger(__name__)


class SyncData(ConsumerProcessorBase):
    """
   The consumer consumes data from Simple Log Service and sends it to the Syslog server.
    """
    def __init__(self, target_setting=None):
        """
        Initialize and verify the connectivity to the Syslog server.
        """

        super(SyncData, self).__init__()   # remember to call base's init

        assert target_setting, ValueError("You need to configure settings of remote target")
        assert isinstance(target_setting, dict), ValueError("The settings should be dict to include necessary address and confidentials.")

        self.option = target_setting
        self.protocol = self.option['protocol']
        self.timeout = int(self.option.get('timeout', 120))
        self.sep = self.option.get('sep', "||")
        self.host = self.option["host"]
        self.port = int(self.option.get('port', 514))
        self.cert_path=self.option.get('cert_path', None)

        # Test connectivity. 
        with SyslogClient(self.host, self.port, proto=self.protocol, timeout=self.timeout, cert_path=self.cert_path) as client:
            pass

    def process(self, log_groups, check_point_tracker):
        logs = PullLogResponse.loggroups_to_flattern_list(log_groups, time_as_str=True, decode_bytes=True)
        logger.info("Get data from shard {0}, log count: {1}".format(self.shard_id, len(logs)))

        try:
            with SyslogClient(self.host, self.port, proto=self.protocol, timeout=self.timeout, cert_path=self.cert_path) as client:
                for log in logs:
                    # Place the synchronous code for sending logs to the remote end here.
                    # The log format is a dictionary. Example (Note: All strings must be Unicode encoded):
                    #    Python2: {"__time__": "12312312", "__topic__": "topic", u"field1": u"value1", u"field2": u"value2"}
                    #    Python3: {"__time__": "12312312", "__topic__": "topic", "field1": "value1", "field2": "value2"}
                
                    timestamp = datetime.fromtimestamp(int(log[u'__time__']))
                    del log['__time__']

                    io = six.StringIO()
                    first = True
                    # Modify the format as needed. Here, Key=Value is used for transmission, and the default double vertical line (||) is used for separation.
                    for k, v in six.iteritems(log):
                        io.write("{0}{1}={2}".format(self.sep, k, v))

                    data = io.getvalue()
                    # Modify the facility or severity as needed.
                    client.log(data, facility=self.option.get("facility", None), severity=self.option.get("severity", None), timestamp=timestamp, program=self.option.get("tag", None), hostname=self.option.get("hostname", None))

        except Exception as err:
            logger.debug("Failed to connect to remote syslog server ({0}). Exception: {1}".format(self.option, err))
            # You need to add some error handling code, such as retries or notifications. 
            raise err

        logger.info("Complete send data to remote")

        self.save_checkpoint(check_point_tracker)


def get_monitor_option():
    ##########################
    # Basic configuration items
    ##########################

    # Load Simple Log Service parameters and options from environment variables.
    endpoint = os.environ.get('SLS_ENDPOINT', '')
    accessKeyId = os.environ.get('SLS_AK_ID', '')
    accessKey = os.environ.get('SLS_AK_KEY', '')
    project = os.environ.get('SLS_PROJECT', '')
    logstore = os.environ.get('SLS_LOGSTORE', '')
    consumer_group = os.environ.get('SLS_CG', '')

    assert endpoint and accessKeyId and accessKey and project and logstore and consumer_group, \
        ValueError("endpoint/access_id/key/project/logstore/consumer_group/name cannot be empty")

    ##########################
    # Advanced options for the consumer group
    ##########################

    # It is generally not recommended to modify the consumer name, especially when concurrent consumption is required.
    consumer_name = "{0}-{1}".format(consumer_group, current_process().pid)

    # The starting point of consumption. This parameter is valid when the program is run for the first time. Subsequent runs will resume consumption from the last saved checkpoint.
    # Use "begin", "end", or a specific ISO time format.
    cursor_start_time = "2019-1-1 0:0:0+8:00"

    # Heartbeat interval. If the server does not receive a heartbeat report for a specific shard within twice the interval, the server considers the corresponding consumer to be offline and reassigns the task.
    # When the network environment is poor, it is not recommended to set a short interval.
    heartbeat_interval = 20

    # The maximum interval for data consumption. If data is generated quickly, you do not need to adjust this parameter.
    data_fetch_interval = 1

    # Build a consumer group and a consumer.
    option = LogHubConfig(endpoint, accessKeyId, accessKey, project, logstore, consumer_group, consumer_name,
                          cursor_position=CursorPosition.SPECIAL_TIMER_CURSOR,
                          cursor_start_time=cursor_start_time,
                          heartbeat_interval=heartbeat_interval,
                          data_fetch_interval=data_fetch_interval)

    # Parameters and options related to the Syslog server
    settings = {
                "host": "1.2.3.4", # Required
                "port": 514,       # Required, port
                "protocol": "tcp", # Required, TCP, UDP, or TLS (Python 3 only)
                "sep": "||",      # Required, separator for key=value pairs. Here, a double vertical line (||) is used.
                "cert_path": None,  # Optional, location of the TLS certificate
                "timeout": 120,   # Optional, timeout period, default is 120 seconds
                "facility": syslogclient.FAC_USER,  # Optional, refer to other syslogclient.FAC_* values
                "severity": syslogclient.SEV_INFO,  # Optional, refer to other syslogclient.SEV_* values
                "hostname": None, # Optional, machine name, the local machine name is used by default
                "tag": None # Optional, tag, the default is a hyphen (-)
    }

    return option, settings

# Main program entrypoint
def main():
    option, settings = get_monitor_option()

    logger.info("*** start to consume data...")
    worker = ConsumerWorker(SyncData, option, args=(settings,) )
    worker.start(join=True)


if __name__ == '__main__':
    main()

Step 2: Configure environment variables

After configuring the application, set the system environment variables as described in the following table.

Environment variable

Value

Example

SLS_ENDPOINT

  1. Log on to the Simple Log Service console. In the project list, click the target project.

  2. Click the image icon to the right of the project name to go to the project overview page.

  3. In the Endpoint section, copy the public endpoint and concatenate it as https:// + public endpoint.

If the endpoint is prefixed with https://, such as https://cn-beijing.log.aliyuncs.com, the program automatically uses HTTPS to encrypt the connection with SLS. The server certificate *.aliyuncs.com is issued by GlobalSign and is trusted by most systems by default. If your system does not trust this certificate, download and install it through Certificate installation.

https://cn-beijing.log.aliyuncs.com

SLS_PROJECT

In the SLS console, copy the target project name.

my-sls-project-one

SLS_LOGSTORE

In the SLS console, copy the target logstore name.

my-sls-logstore-a1

SLS_AK_ID

Use the AccessKey ID of a RAM user.

Important
  • An Alibaba Cloud account's AccessKey pair grants full API access. For better security, always use a RAM user's AccessKey pair for API calls or routine O&M.

  • To prevent credential leaks, avoid hard-coding the AccessKey ID and AccessKey Secret in your code.

L***ky

SLS_AK_KEY

Use the AccessKey secret of a RAM user.

x***Xl

SLS_CG

The consumer group name. If the specified group does not exist, the application creates it automatically.

syc_data

Step 3: Start and verify

  1. Start multiple consumers for concurrent consumption. The maximum number of consumers equals the total number of shards.

    # Start the first consumer process
    nohup python3 sync_data.py &
    # Start the second consumer process
    nohup python3 sync_data.py &
  2. Check the status of the consumer group in the SLS console.

    1. In the project list, click the target project. On the Log Storage > Logstores tab, click the Expand icon next to the target logstore, then click the Expand icon next to Data Consumption.

    2. In the consumer group list, click the one you want. On the Consumer Group Status page, view the client and time for data consumption of each shard.

FAQ

ConsumerGroupQuotaExceed error occurs

This error indicates that a limit is exceeded. A single logstore can have a maximum of 30 consumer groups. Delete unused consumer groups in the SLS console.