Log Service and Siem (Such as Splunk) Integration Solution
Abstract: This article mainly introduces how to connect Alibaba Cloud Log Service with your SIEM solution (such as Splunk), so as to ensure that all regulations, audits, and other related logs on Alibaba Cloud can be imported into your Security Operation Center (SOC) .
Background Information
Target
This article mainly introduces how to connect Alibaba Cloud Log Service with your SIEM solution (such as Splunk), so as to ensure that all regulations, audits, and other related logs on Alibaba Cloud can be imported into your Security Operation Center (SOC).
Glossary
LOG (SLS) - Alibaba Cloud Log Service, abbreviated as SLS (Simple Log Service).
SIEM - Security Information and Event Management System (Security Information and Event Management), such as Splunk, QRadar, etc.
Splunk HEC - Splunk Http Event Collector, an HTTP(s) interface for receiving logs.
Audit related logs
The security operation and maintenance team is generally interested in Alibaba Cloud-related audit logs. The following lists all relevant logs (but not limited to) that exist in all currently available log services:
Regions - updated from time to time, please refer to the latest product documentation.
Alibaba Cloud Log Service
Alibaba Cloud's log service is a one-stop service for log data. It can quickly complete the collection, consumption, delivery, query analysis and other functions of massive log data without development, improving O&M and operational efficiency. The log service mainly includes functions such as real-time collection and consumption, data delivery, query and real-time analysis, and is suitable for various development, operation and maintenance, operation and security scenarios from real-time monitoring to data warehouses:
At present, the above Alibaba Cloud products have been connected with the log service, providing near real-time automatic log collection and storage, and providing the log service-based query analysis, report alarm, downstream computing docking and delivery capabilities.
Suggestions for integration solutions
concept
Project
Project is the resource management unit in Log Service, which is used for resource isolation and control. You can manage all logs and related log sources for an application through a project. It manages all the user's log store (Logstore), the machine configuration of the log collection and other information, and it is also the entry for users to access the log service resources.
Logstore
Logstore is the collection, storage and query unit of log data in Log Service. Each log library belongs to a project, and each project can create multiple log libraries.
Partition (Shard)
Each log library is divided into several partitions (shards). Each partition consists of MD5 left-closed and right-open intervals. Each interval range does not cover each other, and the range of all intervals is the entire value range of MD5.
Service entry (Endpoint)
The log service entry is a URL to access a project and its internal log data. It is related to the Alibaba Cloud region where the project is located and the name of the project.
https://help.aliyun.com/document_detail/29008.html
AccessKey
The Alibaba Cloud access key is a "secure password" designed by Alibaba Cloud for users to access their cloud resources using APIs (non-console). You can use it to sign API request content for server-side security verification.
https://help.aliyun.com/document_detail/29009.html
Assumption
This assumes that your SIEM (like Splunk) is in your organization's on-premise environment (on-premise), not in the cloud. For security reasons, no ports are open for the outside world to access this SIEM.
Overview
It is recommended to use the SLS consumer group builder for real-time consumption from SLS, and then send logs to Splunk through the Splunk API (HEC).
Programming with consumer groups
The Consumer Library is an advanced mode for consuming logs in the log service. It provides the concept of a consumer group to abstract and manage the consumer. The difference between using the SDK directly for data reading is that users do not need to If you care about the implementation details of the log service, you only need to focus on the business logic. In addition, users such as load balancing and failover between consumers do not need to care.
Spark Streaming, Storm, and Flink Connector are all implemented based on the Consumer Library.
basic concept
Consumer Group - A consumer group consists of multiple consumers. Consumers under the same consumer group consume data in a logstore together, and consumers will not consume data repeatedly.
Consumer (Consumer) - The constituent unit of the consumption group, which actually undertakes the consumption task. The names of the consumers under the same consumption group must be different.
In the log service, there are multiple shards under a logstore. The function of the collaborative consumption library is to allocate shards to consumers under a consumption group. The allocation method follows the following principles:
Each shard will only be assigned to one consumer.
A consumer can have multiple shards at the same time.
When a new consumer joins a consumption group, the shard affiliation under this consumption group will be adjusted to achieve the purpose of balancing the consumption load, but the above allocation principle will not change, and the allocation process is transparent to the user.
Another function of the collaborative consumption library is to save the checkpoint, so that it can continue to consume from the breakpoint when the program fails to recover, so as to ensure that the data will not be repeatedly consumed.
Deployment Recommendations
Hardware Recommendations
Hardware parameters:
You need a machine to run the program and install a Linux (such as Ubuntu x64). The recommended hardware parameters are as follows:
2.0+ GHZ X 8 cores
16GB RAM, 32GB recommended
1 Gbps network card
At least 2GB free disk space, 10GB or more recommended
Network parameters:
The bandwidth from the environment within the organization to Alibaba Cloud should be greater than the speed at which data is generated in Alibaba Cloud, otherwise logs cannot be consumed in real time. Assuming that the data is generally generated at a uniform speed, the peak value is about 2 times, and the original log is 100TB per day. In the scenario of 5x compression, the recommended bandwidth should be around 4MB/s (32Mbps).
use (Python)
Here we describe programming with consumer groups in Python. For Java language usage, you can refer to this article.
Note: The code for this article may be updated, the latest version can be found here: Github samples.
Install
surroundings
PyPy3 is strongly recommended to run this program instead of using the standard CPython interpreter.
The Python SDK of Log Service can be installed as follows:
pypy3 -m pip install aliyun-log-python-sdk -U
For more SLS Python SDK manuals, you can refer to here
Program configuration
The following shows how to configure the program:
Configure the program log file for subsequent testing or diagnosing possible problems.
Configuration options for basic logging service connections and consumer groups.
Some advanced options for the consumer group (performance tuning, modification is not recommended).
SIEM (Splunk) related parameters and options.
Please read the relevant comments in the code carefully and adjust the options as needed:
#encoding: utf8
import os
import logging
from logging.handlers import RotatingFileHandler
root = logging.getLogger()
handler = RotatingFileHandler("{0}_{1}.log".format(os.path.basename(__file__), current_process().pid), maxBytes=100*1024*1024, backupCount=5)
handler.setFormatter(logging.Formatter(fmt='[%(asctime)s] - [%(threadName)s] - {%(module)s:%(funcName)s:%(lineno)d} %(levelname) s - %(message)s', datefmt='%Y-%m-%d %H:%M:%S'))
root.setLevel(logging.INFO)
root.addHandler(handler)
root.addHandler(logging.StreamHandler())
logger = logging.getLogger(__name__)
def get_option():
##########################
# Basic options
##########################
# Load SLS parameters and options from environment variables
endpoint = os.environ.get('SLS_ENDPOINT', '')
accessKeyId = os.environ.get('SLS_AK_ID', '')
accessKey = os.environ.get('SLS_AK_KEY', '')
project = os.environ.get('SLS_PROJECT', '')
logstore = os.environ.get('SLS_LOGSTORE', '')
consumer_group = os.environ.get('SLS_CG', '')
# The starting point of consumption. This parameter is valid when the program is run for the first time, and subsequent runs will continue from the savepoint of the last consumption.
# Can use "begin", "end", or a specific ISO time format.
cursor_start_time = "2018-12-26 0:0:0"
##########################
# some advanced options
##########################
# Generally do not modify the consumer name, especially when you need to run concurrently
consumer_name = "{0}-{1}".format(consumer_group, current_process().pid)
# Heartbeat duration. When the server does not receive a heartbeat report from a specific shard within twice the time, the server will consider the corresponding consumer offline and re-allocate the task.
# So when the network is not very good, don't adjust it very small.
heartbeat_interval = 20
# The maximum interval for consuming data. If the data is generated quickly, you do not need to adjust this parameter.
data_fetch_interval = 1
# Build a consumer group and consumers
option = LogHubConfig(endpoint, accessKeyId, accessKey, project, logstore, consumer_group, consumer_name,
cursor_position=CursorPosition.SPECIAL_TIMER_CURSOR,
cursor_start_time=cursor_start_time,
heartbeat_interval=heartbeat_interval,
data_fetch_interval=data_fetch_interval)
# Splunk options
settings = {
"host": "10.1.2.3",
"port": 80,
"token": "a023nsdu123123123",
'https': False, # optional, bool
'timeout': 120, # optional, int
'ssl_verify': True, # optional, bool
"sourcetype": "", # optional, sourcetype
"index": "", # optional, index
"source": "", # optional, source
}
return option, settings
Data consumption and forwarding
The following code shows how to get data from SLS and forward it to Splunk.
from aliyun.log.consumer import *
from aliyun.log.pulllog_response import PullLogResponse
from multiprocessing import current_process
import time
import json
import socket
import requests
class SyncData(ConsumerPrprocessorBase):
"""
This consumer consumes data from SLS and sends it to Splunk
"""
def __init__(self, splunk_setting):
"""Initialize and verify Splunk connectivity"""
super(SyncData, self).__init__()
assert splunk_setting, ValueError("You need to configure settings of remote target")
assert isinstance(splunk_setting, dict), ValueError("The settings should be dict to include necessary address and confidentials.")
self.option = splunk_setting
self.timeout = self.option.get("timeout", 120)
# Test Splunk connectivity
s = socket.socket()
s.settimeout(self.timeout)
s.connect((self.option["host"], self.option['port']))
self.r = requests.session()
self.r.max_redirects = 1
self.r.verify = self.option.get("ssl_verify", True)
self.r.headers['Authorization'] = "Splunk {}".format(self.option['token'])
self.url = "{0}://{1}:{2}/services/collector/event".format("http" if not self.option.get('https') else "https", self. option['host'], self.option['port'])
self.default_fields = {}
if self.option.get("sourcetype"):
self.default_fields['sourcetype'] = self.option.get("sourcetype")
if self.option.get("source"):
self.default_fields['source'] = self.option.get("source")
if self.option.get("index"):
self.default_fields['index'] = self.option.get("index")
def process(self, log_groups, check_point_tracker):
logs = PullLogResponse.loggroups_to_flattern_list(log_groups, time_as_str=True, decode_bytes=True)
logger.info("Get data from shard {0}, log count: {1}".format(self.shard_id, len(logs)))
for log in logs:
# Send data to Splunk
# The following code is just a sample (note: all strings are unicode)
# Python2: {u"__time__": u"12312312", u"__topic__": u"topic", u"field1": u"value1", u"field2": u"value2"}
# Python3: {"__time__": "12312312", "__topic__": "topic", "field1": "value1", "field2": "value2"}
event = {}
event.update(self.default_fields)
if log.get(u"__topic__") == 'audit_log':
# suppose we only care about audit log
event['time'] = log[u'__time__']
event['fields'] = {}
del log['__time__']
event['fields'].update(log)
data = json.dumps(event, sort_keys=True)
try:
req = self.r.post(self.url, data=data, timeout=self.timeout)
req.raise_for_status()
except Exception as err:
logger.debug("Failed to connect to remote Splunk server ({0}).Exception: {1}", self.url, err)
# TODO: Add some retry or report logic as needed
logger.info("Complete send data to remote")
self.save_checkpoint(check_point_tracker)
main logic
The following code shows the main program control logic:
def main():
option, settings = get_monitor_option()
logger.info("*** start to consume data...")
worker = ConsumerWorker(SyncData, option, args=(settings,) )
worker.start(join=True)
if __name__ == '__main__':
main()
start up
Assuming the program is named "sync_data.py", it can be started as follows:
export SLS_ENDPOINT=
export SLS_AK_ID=
export SLS_AK_KEY=
export SLS_PROJECT=
export SLS_LOGSTORE=
export SLS_CG=
pypy3 sync_data.py
Limits and Constraints
Each logstore (logstore) can be configured with up to 10 consumer groups. If the error ConsumerGroupQuotaExceed is encountered, it means that the limit is encountered. It is recommended to delete some unused consumer groups on the console side.
monitor
View the status of the consumer group in the console
View consumer group latency through cloud monitoring and configure alarms
performance considerations
start multiple consumers
Programs based on consumer groups can be directly started multiple times to achieve concurrent effects:
nohup pypy3 sync_data.py &
nohup pypy3 sync_data.py &
nohup pypy3 sync_data.py &
...
Notice:
All consumers use the same consumer group name and different consumer names (because the consumer name is suffixed with the process ID).
Because a partition (shard) can only be consumed by one consumer, assuming that a log library has 10 partitions, then a maximum of 10 consumers can consume at the same time.
Https
If the service entry (endpoint) is configured with https:// prefix, such as
https://cn-beijing.log.aliyuncs.com, the connection between the program and SLS will automatically use HTTPS encryption.
The server certificate *.aliyuncs.com is issued by GlobalSign. By default, most Linux/Windows machines will automatically trust this certificate. If the machine does not trust this certificate in some special cases, you can refer to here to download and install this certificate.
performance throughput
Based on testing, running the above sample with pypy3 in order to advance the hardware without bandwidth limitation and receiver rate limitation (such as Splunk side), a single consumer can consume up to 5 MB/s with a single core CPU occupying about 10% Raw log rate. Therefore, it is theoretically possible to achieve 50 MB/s raw logs per CPU core, that is, each CPU core can consume 4TB of raw logs per day.
Note: This data depends on bandwidth, hardware parameters and whether the SIEM receiver (such as Splunk) can receive the data faster.
high availability
The consumer group will save the check-point on the server side. When one consumer stops, another consumer will automatically take over and continue to consume from the breakpoint.
Consumers can be started on different machines so that when one machine stops or crashes, consumers on other machines can automatically take over and consume from the breakpoint.
In theory, it is also possible to start consumers larger than the number of shards for backup purposes.
more references
Log service Python consumption group combat (1): log service and SIEM (such as Splunk) integration combat
Log service Python consumer group combat (2): real-time log distribution
Log service Python consumer group combat (3): real-time cross-domain monitoring of multi-log database data
Github sample of this article
Author: Cheng Zhe
Background Information
Target
This article mainly introduces how to connect Alibaba Cloud Log Service with your SIEM solution (such as Splunk), so as to ensure that all regulations, audits, and other related logs on Alibaba Cloud can be imported into your Security Operation Center (SOC).
Glossary
LOG (SLS) - Alibaba Cloud Log Service, abbreviated as SLS (Simple Log Service).
SIEM - Security Information and Event Management System (Security Information and Event Management), such as Splunk, QRadar, etc.
Splunk HEC - Splunk Http Event Collector, an HTTP(s) interface for receiving logs.
Audit related logs
The security operation and maintenance team is generally interested in Alibaba Cloud-related audit logs. The following lists all relevant logs (but not limited to) that exist in all currently available log services:
Regions - updated from time to time, please refer to the latest product documentation.
Alibaba Cloud Log Service
Alibaba Cloud's log service is a one-stop service for log data. It can quickly complete the collection, consumption, delivery, query analysis and other functions of massive log data without development, improving O&M and operational efficiency. The log service mainly includes functions such as real-time collection and consumption, data delivery, query and real-time analysis, and is suitable for various development, operation and maintenance, operation and security scenarios from real-time monitoring to data warehouses:
At present, the above Alibaba Cloud products have been connected with the log service, providing near real-time automatic log collection and storage, and providing the log service-based query analysis, report alarm, downstream computing docking and delivery capabilities.
Suggestions for integration solutions
concept
Project
Project is the resource management unit in Log Service, which is used for resource isolation and control. You can manage all logs and related log sources for an application through a project. It manages all the user's log store (Logstore), the machine configuration of the log collection and other information, and it is also the entry for users to access the log service resources.
Logstore
Logstore is the collection, storage and query unit of log data in Log Service. Each log library belongs to a project, and each project can create multiple log libraries.
Partition (Shard)
Each log library is divided into several partitions (shards). Each partition consists of MD5 left-closed and right-open intervals. Each interval range does not cover each other, and the range of all intervals is the entire value range of MD5.
Service entry (Endpoint)
The log service entry is a URL to access a project and its internal log data. It is related to the Alibaba Cloud region where the project is located and the name of the project.
https://help.aliyun.com/document_detail/29008.html
AccessKey
The Alibaba Cloud access key is a "secure password" designed by Alibaba Cloud for users to access their cloud resources using APIs (non-console). You can use it to sign API request content for server-side security verification.
https://help.aliyun.com/document_detail/29009.html
Assumption
This assumes that your SIEM (like Splunk) is in your organization's on-premise environment (on-premise), not in the cloud. For security reasons, no ports are open for the outside world to access this SIEM.
Overview
It is recommended to use the SLS consumer group builder for real-time consumption from SLS, and then send logs to Splunk through the Splunk API (HEC).
Programming with consumer groups
The Consumer Library is an advanced mode for consuming logs in the log service. It provides the concept of a consumer group to abstract and manage the consumer. The difference between using the SDK directly for data reading is that users do not need to If you care about the implementation details of the log service, you only need to focus on the business logic. In addition, users such as load balancing and failover between consumers do not need to care.
Spark Streaming, Storm, and Flink Connector are all implemented based on the Consumer Library.
basic concept
Consumer Group - A consumer group consists of multiple consumers. Consumers under the same consumer group consume data in a logstore together, and consumers will not consume data repeatedly.
Consumer (Consumer) - The constituent unit of the consumption group, which actually undertakes the consumption task. The names of the consumers under the same consumption group must be different.
In the log service, there are multiple shards under a logstore. The function of the collaborative consumption library is to allocate shards to consumers under a consumption group. The allocation method follows the following principles:
Each shard will only be assigned to one consumer.
A consumer can have multiple shards at the same time.
When a new consumer joins a consumption group, the shard affiliation under this consumption group will be adjusted to achieve the purpose of balancing the consumption load, but the above allocation principle will not change, and the allocation process is transparent to the user.
Another function of the collaborative consumption library is to save the checkpoint, so that it can continue to consume from the breakpoint when the program fails to recover, so as to ensure that the data will not be repeatedly consumed.
Deployment Recommendations
Hardware Recommendations
Hardware parameters:
You need a machine to run the program and install a Linux (such as Ubuntu x64). The recommended hardware parameters are as follows:
2.0+ GHZ X 8 cores
16GB RAM, 32GB recommended
1 Gbps network card
At least 2GB free disk space, 10GB or more recommended
Network parameters:
The bandwidth from the environment within the organization to Alibaba Cloud should be greater than the speed at which data is generated in Alibaba Cloud, otherwise logs cannot be consumed in real time. Assuming that the data is generally generated at a uniform speed, the peak value is about 2 times, and the original log is 100TB per day. In the scenario of 5x compression, the recommended bandwidth should be around 4MB/s (32Mbps).
use (Python)
Here we describe programming with consumer groups in Python. For Java language usage, you can refer to this article.
Note: The code for this article may be updated, the latest version can be found here: Github samples.
Install
surroundings
PyPy3 is strongly recommended to run this program instead of using the standard CPython interpreter.
The Python SDK of Log Service can be installed as follows:
pypy3 -m pip install aliyun-log-python-sdk -U
For more SLS Python SDK manuals, you can refer to here
Program configuration
The following shows how to configure the program:
Configure the program log file for subsequent testing or diagnosing possible problems.
Configuration options for basic logging service connections and consumer groups.
Some advanced options for the consumer group (performance tuning, modification is not recommended).
SIEM (Splunk) related parameters and options.
Please read the relevant comments in the code carefully and adjust the options as needed:
#encoding: utf8
import os
import logging
from logging.handlers import RotatingFileHandler
root = logging.getLogger()
handler = RotatingFileHandler("{0}_{1}.log".format(os.path.basename(__file__), current_process().pid), maxBytes=100*1024*1024, backupCount=5)
handler.setFormatter(logging.Formatter(fmt='[%(asctime)s] - [%(threadName)s] - {%(module)s:%(funcName)s:%(lineno)d} %(levelname) s - %(message)s', datefmt='%Y-%m-%d %H:%M:%S'))
root.setLevel(logging.INFO)
root.addHandler(handler)
root.addHandler(logging.StreamHandler())
logger = logging.getLogger(__name__)
def get_option():
##########################
# Basic options
##########################
# Load SLS parameters and options from environment variables
endpoint = os.environ.get('SLS_ENDPOINT', '')
accessKeyId = os.environ.get('SLS_AK_ID', '')
accessKey = os.environ.get('SLS_AK_KEY', '')
project = os.environ.get('SLS_PROJECT', '')
logstore = os.environ.get('SLS_LOGSTORE', '')
consumer_group = os.environ.get('SLS_CG', '')
# The starting point of consumption. This parameter is valid when the program is run for the first time, and subsequent runs will continue from the savepoint of the last consumption.
# Can use "begin", "end", or a specific ISO time format.
cursor_start_time = "2018-12-26 0:0:0"
##########################
# some advanced options
##########################
# Generally do not modify the consumer name, especially when you need to run concurrently
consumer_name = "{0}-{1}".format(consumer_group, current_process().pid)
# Heartbeat duration. When the server does not receive a heartbeat report from a specific shard within twice the time, the server will consider the corresponding consumer offline and re-allocate the task.
# So when the network is not very good, don't adjust it very small.
heartbeat_interval = 20
# The maximum interval for consuming data. If the data is generated quickly, you do not need to adjust this parameter.
data_fetch_interval = 1
# Build a consumer group and consumers
option = LogHubConfig(endpoint, accessKeyId, accessKey, project, logstore, consumer_group, consumer_name,
cursor_position=CursorPosition.SPECIAL_TIMER_CURSOR,
cursor_start_time=cursor_start_time,
heartbeat_interval=heartbeat_interval,
data_fetch_interval=data_fetch_interval)
# Splunk options
settings = {
"host": "10.1.2.3",
"port": 80,
"token": "a023nsdu123123123",
'https': False, # optional, bool
'timeout': 120, # optional, int
'ssl_verify': True, # optional, bool
"sourcetype": "", # optional, sourcetype
"index": "", # optional, index
"source": "", # optional, source
}
return option, settings
Data consumption and forwarding
The following code shows how to get data from SLS and forward it to Splunk.
from aliyun.log.consumer import *
from aliyun.log.pulllog_response import PullLogResponse
from multiprocessing import current_process
import time
import json
import socket
import requests
class SyncData(ConsumerPrprocessorBase):
"""
This consumer consumes data from SLS and sends it to Splunk
"""
def __init__(self, splunk_setting):
"""Initialize and verify Splunk connectivity"""
super(SyncData, self).__init__()
assert splunk_setting, ValueError("You need to configure settings of remote target")
assert isinstance(splunk_setting, dict), ValueError("The settings should be dict to include necessary address and confidentials.")
self.option = splunk_setting
self.timeout = self.option.get("timeout", 120)
# Test Splunk connectivity
s = socket.socket()
s.settimeout(self.timeout)
s.connect((self.option["host"], self.option['port']))
self.r = requests.session()
self.r.max_redirects = 1
self.r.verify = self.option.get("ssl_verify", True)
self.r.headers['Authorization'] = "Splunk {}".format(self.option['token'])
self.url = "{0}://{1}:{2}/services/collector/event".format("http" if not self.option.get('https') else "https", self. option['host'], self.option['port'])
self.default_fields = {}
if self.option.get("sourcetype"):
self.default_fields['sourcetype'] = self.option.get("sourcetype")
if self.option.get("source"):
self.default_fields['source'] = self.option.get("source")
if self.option.get("index"):
self.default_fields['index'] = self.option.get("index")
def process(self, log_groups, check_point_tracker):
logs = PullLogResponse.loggroups_to_flattern_list(log_groups, time_as_str=True, decode_bytes=True)
logger.info("Get data from shard {0}, log count: {1}".format(self.shard_id, len(logs)))
for log in logs:
# Send data to Splunk
# The following code is just a sample (note: all strings are unicode)
# Python2: {u"__time__": u"12312312", u"__topic__": u"topic", u"field1": u"value1", u"field2": u"value2"}
# Python3: {"__time__": "12312312", "__topic__": "topic", "field1": "value1", "field2": "value2"}
event = {}
event.update(self.default_fields)
if log.get(u"__topic__") == 'audit_log':
# suppose we only care about audit log
event['time'] = log[u'__time__']
event['fields'] = {}
del log['__time__']
event['fields'].update(log)
data = json.dumps(event, sort_keys=True)
try:
req = self.r.post(self.url, data=data, timeout=self.timeout)
req.raise_for_status()
except Exception as err:
logger.debug("Failed to connect to remote Splunk server ({0}).Exception: {1}", self.url, err)
# TODO: Add some retry or report logic as needed
logger.info("Complete send data to remote")
self.save_checkpoint(check_point_tracker)
main logic
The following code shows the main program control logic:
def main():
option, settings = get_monitor_option()
logger.info("*** start to consume data...")
worker = ConsumerWorker(SyncData, option, args=(settings,) )
worker.start(join=True)
if __name__ == '__main__':
main()
start up
Assuming the program is named "sync_data.py", it can be started as follows:
export SLS_ENDPOINT=
export SLS_AK_ID=
export SLS_AK_KEY=
export SLS_PROJECT=
export SLS_LOGSTORE=
export SLS_CG=
pypy3 sync_data.py
Limits and Constraints
Each logstore (logstore) can be configured with up to 10 consumer groups. If the error ConsumerGroupQuotaExceed is encountered, it means that the limit is encountered. It is recommended to delete some unused consumer groups on the console side.
monitor
View the status of the consumer group in the console
View consumer group latency through cloud monitoring and configure alarms
performance considerations
start multiple consumers
Programs based on consumer groups can be directly started multiple times to achieve concurrent effects:
nohup pypy3 sync_data.py &
nohup pypy3 sync_data.py &
nohup pypy3 sync_data.py &
...
Notice:
All consumers use the same consumer group name and different consumer names (because the consumer name is suffixed with the process ID).
Because a partition (shard) can only be consumed by one consumer, assuming that a log library has 10 partitions, then a maximum of 10 consumers can consume at the same time.
Https
If the service entry (endpoint) is configured with https:// prefix, such as
https://cn-beijing.log.aliyuncs.com, the connection between the program and SLS will automatically use HTTPS encryption.
The server certificate *.aliyuncs.com is issued by GlobalSign. By default, most Linux/Windows machines will automatically trust this certificate. If the machine does not trust this certificate in some special cases, you can refer to here to download and install this certificate.
performance throughput
Based on testing, running the above sample with pypy3 in order to advance the hardware without bandwidth limitation and receiver rate limitation (such as Splunk side), a single consumer can consume up to 5 MB/s with a single core CPU occupying about 10% Raw log rate. Therefore, it is theoretically possible to achieve 50 MB/s raw logs per CPU core, that is, each CPU core can consume 4TB of raw logs per day.
Note: This data depends on bandwidth, hardware parameters and whether the SIEM receiver (such as Splunk) can receive the data faster.
high availability
The consumer group will save the check-point on the server side. When one consumer stops, another consumer will automatically take over and continue to consume from the breakpoint.
Consumers can be started on different machines so that when one machine stops or crashes, consumers on other machines can automatically take over and consume from the breakpoint.
In theory, it is also possible to start consumers larger than the number of shards for backup purposes.
more references
Log service Python consumption group combat (1): log service and SIEM (such as Splunk) integration combat
Log service Python consumer group combat (2): real-time log distribution
Log service Python consumer group combat (3): real-time cross-domain monitoring of multi-log database data
Github sample of this article
Author: Cheng Zhe
Related Articles
-
A detailed explanation of Hadoop core architecture HDFS
Knowledge Base Team
-
What Does IOT Mean
Knowledge Base Team
-
6 Optional Technologies for Data Storage
Knowledge Base Team
-
What Is Blockchain Technology
Knowledge Base Team
Explore More Special Offers
-
Short Message Service(SMS) & Mail Service
50,000 email package starts as low as USD 1.99, 120 short messages start at only USD 1.00