All Products
Search
Document Center

Simple Log Service:Kirimkan log ke SIEM

Last Updated:Mar 26, 2026

Untuk mengirimkan log ke sistem Security Information and Event Management (SIEM), Anda dapat men-deploy aplikasi yang menghubungkan SLS ke SIEM Anda. Aplikasi ini menggunakan consumer group SLS untuk menarik log, lalu meneruskannya ke SIEM Anda melalui Splunk HEC atau Syslog, sehingga mengintegrasikan log cloud dengan platform analitik keamanan on-premises Anda.

Latar Belakang

Perusahaan sering men-deploy platform Security Information and Event Management (SIEM), seperti Splunk atau QRadar, di pusat data on-premises. Untuk menjaga keamanan, platform tersebut biasanya tidak menyediakan titik akhir publik untuk menerima data. Saat memigrasikan bisnis ke cloud, Anda perlu mengonsolidasikan log dari sumber daya cloud ke SIEM on-premises guna pemantauan, audit, dan analisis ancaman terpadu. Oleh karena itu, Anda harus membuat pipeline pengiriman log yang aman dari SLS ke SIEM on-premises tanpa mengorbankan keamanan sistem yang sudah ada.

Cara Kerja

Untuk pengiriman data secara real-time, gunakan consumer group SLS. Aplikasi khusus menarik log dari SLS dan meneruskannya ke SIEM Anda melalui Splunk HTTP Event Collector (HEC) atau Syslog melalui TCP/TLS.

image

Logika Inti

  1. Penarikan log: Aplikasi berbasis consumer group menarik data dari SLS. Mekanisme ini mendukung konsumsi konkuren dan failover.

    • Konkurensi dan throughput

      • Untuk mencapai throughput lebih tinggi, jalankan beberapa instans aplikasi konsumen. Setiap instans konsumen harus berada dalam consumer group yang sama dan memiliki nama unik, misalnya dengan menggunakan ID proses sebagai akhiran.

      • Hanya satu konsumen yang dapat memproses satu shard dalam satu waktu. Oleh karena itu, jumlah maksimum konsumen konkuren dibatasi oleh jumlah shard dalam penyimpanan log. Misalnya, jika sebuah penyimpanan log memiliki 10 shard, Anda dapat menjalankan hingga 10 konsumen secara paralel.

      • Dalam kondisi jaringan ideal:

        • Satu konsumen (menggunakan sekitar 20% dari satu core CPU) dapat mengonsumsi log mentah dengan laju 10 MB/detik.

        • Sepuluh konsumen dapat memproses hingga 100 MB/detik log mentah.

    • Ketersediaan tinggi

      • Consumer group menyimpan progres setiap konsumen sebagai checkpoint di server.

      • Jika sebuah instans konsumen gagal, instans lain yang tersedia secara otomatis mengambil alih shard yang ditugaskan kepadanya dan melanjutkan pemrosesan dari checkpoint terakhir yang disimpan. Untuk memastikan failover yang andal, Anda dapat menjalankan instans konsumen pada mesin yang berbeda.

      • Anda dapat menjalankan lebih banyak instans konsumen daripada jumlah shard. Instans tambahan tersebut bertindak sebagai standby untuk failover segera.

  2. Penerusan data: Setelah menarik log, aplikasi memformat dan meneruskannya ke SIEM on-premises Anda sesuai konfigurasi Anda.

Prasyarat

  • Buat Pengguna RAM dan berikan izin: Pengguna RAM harus memiliki kebijakan AliyunLogFullAccess.

  • Persyaratan jaringan: Mesin yang menjalankan aplikasi harus dapat mengakses titik akhir SLS dan berada dalam jaringan yang sama dengan SIEM.

    • Untuk mendapatkan titik akhir:

      1. Masuk ke Konsol SLS. Di daftar proyek, klik proyek target Anda.

      2. Klik ikon image di sebelah kanan nama proyek untuk membuka halaman ikhtisar proyek.

      3. Di bagian Endpoint, salin titik akhir publik. Titik akhir lengkapnya adalah https:// + titik akhir publik.

  • Persyaratan lingkungan: Siapkan lingkungan runtime Python 3 dan instal SDK Python SLS.

    1. Instal SDK Python SLS: pip install -U aliyun-log-python-sdk.

    2. Verifikasi instalasi: pip show aliyun-log-python-sdk. Instalasi yang berhasil akan mengembalikan informasi seperti berikut.

      Name: aliyun-log-python-sdk
      Version: 0.9.12
      Summary: Aliyun log service Python client SDK
      Home-page: https://github.com/aliyun/aliyun-log-python-sdk
      Author: Aliyun

Prosedur

Langkah 1: Siapkan aplikasi

SLS menyediakan skrip contoh untuk dua metode pengiriman: Splunk HEC dan Syslog. Pilih metode yang sesuai dengan SIEM Anda dan konfigurasikan skrip yang sesuai.

  • Splunk HEC: HTTP Event Collector (HEC) adalah mekanisme berbasis token yang memungkinkan Anda mengirim data dalam berbagai format secara aman dan efisien ke Splunk melalui HTTP.

  • Syslog: Protokol logging umum yang kompatibel dengan sebagian besar sistem SIEM dan mendukung format teks biasa.

Splunk HEC

Untuk mengirimkan data log ke Splunk, konfigurasikan skrip sync_data.py yang disediakan. Skrip ini terdiri dari tiga bagian utama:

  • Metode main(): Logika kontrol program utama.

  • Metode get_option(): Menentukan opsi konfigurasi konsumsi.

    • Konfigurasi dasar: Termasuk pengaturan koneksi untuk SLS dan consumer group.

    • Opsi consumer group lanjutan: Termasuk parameter penyetelan performa. Jangan ubah ini kecuali diperlukan.

    • Parameter dan opsi SIEM (Splunk).

    • Tambahkan kueri SPL untuk memfilter atau mentransformasi data selama pengiriman untuk tugas seperti filter baris, pemangkasan kolom, atau normalisasi data. Contoh:

      # Kueri SPL
          query = "* | where instance_id in ('instance-1', 'instance-2')"
      # Buat konsumen dengan aturan filter. Parameter 'query' ditambahkan ke konfigurasi.
          option = LogHubConfig(endpoint, accessKeyId, accessKey, project, logstore, consumer_group, consumer_name,
                                cursor_position=CursorPosition.SPECIAL_TIMER_CURSOR,
                                cursor_start_time=cursor_start_time,
                                heartbeat_interval=heartbeat_interval,
                                data_fetch_interval=data_fetch_interval,
                                query=query)
  • Kelas SyncData(ConsumerProcessorBase): Berisi logika untuk mengambil data dari SLS dan mengirimkannya ke Splunk. Tinjau komentar dalam kode dan sesuaikan logika sesuai kebutuhan.

Skrip lengkap disediakan di bawah ini:

sync_data.py

# -*- coding: utf-8 -*-
import os
import logging
from logging.handlers import RotatingFileHandler
from aliyun.log.consumer import *
from aliyun.log.pulllog_response import PullLogResponse
from multiprocessing import current_process
import json
import socket
import requests

# Konfigurasikan file log rotasi untuk diagnostik dan troubleshooting.
root = logging.getLogger()
handler = RotatingFileHandler("{0}_{1}.log".format(os.path.basename(__file__), current_process().pid), maxBytes=100*1024*1024, backupCount=5)
handler.setFormatter(logging.Formatter(fmt='[%(asctime)s] - [%(threadName)s] - {%(module)s:%(funcName)s:%(lineno)d} %(levelname)s - %(message)s', datefmt='%Y-%m-%d %H:%M:%S'))
root.setLevel(logging.INFO)
root.addHandler(handler)
root.addHandler(logging.StreamHandler())

logger = logging.getLogger(__name__)


class SyncData(ConsumerProcessorBase):
    """
    Kelas konsumen ini menarik data dari SLS dan mengirimkannya ke Splunk.
    """
    def __init__(self, splunk_setting=None):
        
        """Inisialisasi konsumen dan verifikasi konektivitas ke Splunk."""
        super(SyncData, self).__init__()   # ingat untuk memanggil init milik base class

        assert splunk_setting, ValueError("Anda perlu mengonfigurasi pengaturan target remote")
        assert isinstance(splunk_setting, dict), ValueError("Pengaturan harus berupa dict untuk menyertakan alamat dan kredensial yang diperlukan.")

        self.option = splunk_setting
        self.timeout = self.option.get("timeout", 120)

        # Uji konektivitas ke Splunk.
        s = socket.socket()
        s.settimeout(self.timeout)
        s.connect((self.option["host"], self.option['port']))

        self.r = requests.session()
        self.r.max_redirects = 1
        self.r.verify = self.option.get("ssl_verify", True)
        self.r.headers['Authorization'] = "Splunk {}".format(self.option['token'])
        self.url = "{0}://{1}:{2}/services/collector".format("http" if not self.option.get('https') else "https", self.option['host'], self.option['port'])

        self.default_fields = {}
        if self.option.get("sourcetype"):
            self.default_fields['sourcetype'] = self.option.get("sourcetype")
        if self.option.get("source"):
            self.default_fields['source'] = self.option.get("source")
        if self.option.get("index"):
            self.default_fields['index'] = self.option.get("index")

    def process(self, log_groups, check_point_tracker):
        logs = PullLogResponse.loggroups_to_flattern_list(log_groups, time_as_str=True, decode_bytes=True)
        logger.info("Dapatkan data dari shard {0}, jumlah log: {1}".format(self.shard_id, len(logs)))
        for log in logs:
            # TODO: Ganti ini dengan logika Anda sendiri untuk memproses dan mengirim log.
            # Log berupa dictionary. Contoh untuk Python 3 (semua string harus Unicode):
            #    {"__time__": "12312312", "__topic__": "topic", "field1": "value1", "field2": "value2"}
            event = {}
            event.update(self.default_fields)
            event['time'] = log[u'__time__']
            del log['__time__']

            json_topic = {"actiontrail_audit_event": ["event"] }
            topic = log.get("__topic__", "")
            if topic in json_topic:
                try:
                    for field in json_topic[topic]:
                        log[field] = json.loads(log[field])
                except Exception as ex:
                    pass
            event['event'] = json.dumps(log)

            data = json.dumps(event, sort_keys=True)

            try:
                req = self.r.post(self.url, data=data, timeout=self.timeout)
                req.raise_for_status()
            except Exception as err:
                logger.debug("Gagal terhubung ke server Splunk remote ({0}). Exception: {1}".format(self.url, err))
                raise err

                # Tambahkan logika retry atau mekanisme pelaporan sesuai kebutuhan.

        logger.info("Selesai mengirim data ke remote")

        self.save_checkpoint(check_point_tracker)


def get_option():
    ##########################
    # Konfigurasi dasar
    ##########################

    # Muat parameter dan opsi SLS dari variabel lingkungan.
    accessKeyId = os.environ.get('SLS_AK_ID', '')
    accessKey = os.environ.get('SLS_AK_KEY', '')
    endpoint = os.environ.get('SLS_ENDPOINT', '')
    project = os.environ.get('SLS_PROJECT', '')
    logstore = os.environ.get('SLS_LOGSTORE', '')
    consumer_group = os.environ.get('SLS_CG', '')

    assert endpoint and accessKeyId and accessKey and project and logstore and consumer_group, \
        ValueError("endpoint/accessKeyId/accessKey/project/logstore/consumer_group tidak boleh kosong")

    ##########################
    # Opsi consumer group lanjutan
    ##########################

    # Tidak disarankan mengubah nama konsumen, terutama untuk konsumsi konkuren.
    consumer_name = "{0}-{1}".format(consumer_group, current_process().pid)

    # Titik awal konsumsi. Parameter ini hanya digunakan saat pertama kali dijalankan.
    # Eksekusi berikutnya dilanjutkan dari checkpoint terakhir yang disimpan.
    # Nilai valid: "begin", "end", atau waktu spesifik dalam format ISO 8601.
    cursor_start_time = "2018-12-26 0:0:0"

    # Interval heartbeat dalam detik. Jika server tidak menerima heartbeat dari konsumen
    # untuk shard tertentu dalam waktu 2 * heartbeat_interval, server menganggap konsumen offline
    # dan menugaskan ulang tugasnya. Jangan atur nilai ini terlalu rendah dalam kondisi jaringan buruk.
    heartbeat_interval = 20

    # Interval maksimum pengambilan data. Jika data baru dihasilkan dengan cepat,
    # Anda tidak perlu menyesuaikan parameter ini.
    data_fetch_interval = 1
    
    # Buat objek konfigurasi consumer group.
    option = LogHubConfig(endpoint, accessKeyId, accessKey, project, logstore, consumer_group, consumer_name,
                          cursor_position=CursorPosition.SPECIAL_TIMER_CURSOR,
                          cursor_start_time=cursor_start_time,
                          heartbeat_interval=heartbeat_interval,
                          data_fetch_interval=data_fetch_interval)
"""
    Untuk membuat konsumen dengan aturan filter, gunakan kode berikut:
    # Kueri SPL kustom
    query = "* | where instance_id in ('instance-1', 'instance-2')"
    # Buat konsumen dengan aturan filter. Parameter 'query' ditambahkan ke konfigurasi.
    option = LogHubConfig(endpoint, accessKeyId, accessKey, project, logstore, consumer_group, consumer_name,
                          cursor_position=CursorPosition.SPECIAL_TIMER_CURSOR,
                          cursor_start_time=cursor_start_time,
                          heartbeat_interval=heartbeat_interval,
                          data_fetch_interval=data_fetch_interval,
                          query=query)
    """

    # Opsi Splunk
    settings = {
                "host": "1.2.3.4",
                "port": 80,
                "token": "a0*****123",
                'https': False,              # Opsional, bool
                'timeout': 120,             # Opsional, int
                'ssl_verify': True,         # Opsional, bool
                "sourcetype": "",            # Opsional, sourcetype
                "index": "",                # Opsional, index
                "source": "",               # Opsional, source
            }

    return  option, settings

# Logika kontrol program utama 
def main():
    option, settings = get_option()

    logger.info("*** mulai mengonsumsi data...")
    worker = ConsumerWorker(SyncData, option, args=(settings,) )
    worker.start(join=True)

if __name__ == '__main__':
    main()

Syslog

Syslog menentukan spesifikasi format log berdasarkan protokol seperti RFC 5424 dan RFC 3164. Kami merekomendasikan penggunaan RFC 5424. Meskipun Syslog dapat ditransmisikan melalui UDP maupun TCP, TCP memberikan transmisi data yang lebih andal. RFC 5424 juga menentukan lapisan transport aman menggunakan TLS. Jika SIEM Anda mendukung Syslog melalui saluran TCP atau TLS, gunakanlah.

Untuk mengirimkan data log ke SIEM menggunakan Syslog, Anda dapat mengonfigurasi skrip sync_data.py yang disediakan. Skrip ini terdiri dari tiga bagian utama:

  • Metode main(): Logika kontrol program utama.

  • Metode get_monitor_option(): Menentukan opsi konfigurasi konsumsi.

    • Konfigurasi dasar: Termasuk pengaturan koneksi untuk SLS dan consumer group.

    • Opsi consumer group lanjutan: Termasuk parameter penyetelan performa. Jangan ubah ini kecuali diperlukan.

    • Parameter dan opsi server Syslog SIEM.

      • Fasilitas Syslog: Komponen program yang menghasilkan log. Contoh ini menggunakan syslogclient.FAC_USER sebagai default.

      • Tingkat keparahan Syslog: tingkat log dari pesan. Anda dapat menyesuaikannya berdasarkan konten log. Contoh ini menggunakan syslogclient.SEV_INFO.

      • Jika SIEM Anda mendukung Syslog melalui TCP atau TLS, atur parameter proto ke TLS dan berikan path ke sertifikat SSL yang valid.

  • Kelas SyncData(ConsumerProcessorBase): Berisi logika untuk mengambil data dari SLS dan mengirimkannya ke server Syslog. Tinjau komentar dalam kode dan sesuaikan logika sesuai kebutuhan.

Skrip lengkap disediakan di bawah ini:

sync_data.py

# -*- coding: utf-8 -*-

import os
import logging
from logging.handlers import RotatingFileHandler
from aliyun.log.consumer import *
from aliyun.log.pulllog_response import PullLogResponse
from multiprocessing import current_process
import aliyun.log.ext.syslogclient as syslogclient
from aliyun.log.ext.syslogclient import SyslogClientRFC5424 as SyslogClient
import six
from datetime import datetime

# Konfigurasikan file log rotasi untuk diagnostik dan troubleshooting.
root = logging.getLogger()
handler = RotatingFileHandler("{0}_{1}.log".format(os.path.basename(__file__), current_process().pid), maxBytes=100*1024*1024, backupCount=5)
handler.setFormatter(logging.Formatter(fmt='[%(asctime)s] - [%(threadName)s] - {%(module)s:%(funcName)s:%(lineno)d} %(levelname)s - %(message)s', datefmt='%Y-%m-%d %H:%M:%S'))
root.setLevel(logging.INFO)
root.addHandler(handler)
root.addHandler(logging.StreamHandler())

logger = logging.getLogger(__name__)


class SyncData(ConsumerProcessorBase):
    """
   Kelas konsumen ini menarik data dari SLS dan mengirimkannya ke server Syslog.
    """
    def __init__(self, target_setting=None):
        """
        Inisialisasi konsumen dan verifikasi konektivitas ke server Syslog.
        """

        super(SyncData, self).__init__()   # ingat untuk memanggil init milik base class

        assert target_setting, ValueError("Anda perlu mengonfigurasi pengaturan target remote")
        assert isinstance(target_setting, dict), ValueError("Pengaturan harus berupa dict untuk menyertakan alamat dan kredensial yang diperlukan.")

        self.option = target_setting
        self.protocol = self.option['protocol']
        self.timeout = int(self.option.get('timeout', 120))
        self.sep = self.option.get('sep', "||")
        self.host = self.option["host"]
        self.port = int(self.option.get('port', 514))
        self.cert_path=self.option.get('cert_path', None)

        # Uji konektivitas. 
        with SyslogClient(self.host, self.port, proto=self.protocol, timeout=self.timeout, cert_path=self.cert_path) as client:
            pass

    def process(self, log_groups, check_point_tracker):
        logs = PullLogResponse.loggroups_to_flattern_list(log_groups, time_as_str=True, decode_bytes=True)
        logger.info("Dapatkan data dari shard {0}, jumlah log: {1}".format(self.shard_id, len(logs)))

        try:
            with SyslogClient(self.host, self.port, proto=self.protocol, timeout=self.timeout, cert_path=self.cert_path) as client:
                for log in logs:
                    # TODO: Tempatkan logika Anda sendiri di sini untuk memproses dan mengirim log.
                    # Log berupa dictionary. Contoh (semua string harus Unicode):
                    #    Python 2: {"__time__": "12312312", "__topic__": "topic", u"field1": u"value1", u"field2": u"value2"}
                    #    Python 3: {"__time__": "12312312", "__topic__": "topic", "field1": "value1", "field2": "value2"}
                
                    timestamp = datetime.fromtimestamp(int(log[u'__time__']))
                    del log['__time__']

                    io = six.StringIO()
                    first = True
                    # Anda dapat memodifikasi format sesuai kebutuhan. Contoh ini menggunakan pasangan key=value
                    # yang dipisahkan oleh double vertical bar (||).
                    for k, v in six.iteritems(log):
                        io.write("{0}{1}={2}".format(self.sep, k, v))

                    data = io.getvalue()
                    # Anda dapat memodifikasi facility atau severity sesuai kebutuhan.
                    client.log(data, facility=self.option.get("facility", None), severity=self.option.get("severity", None), timestamp=timestamp, program=self.option.get("tag", None), hostname=self.option.get("hostname", None))

        except Exception as err:
            logger.debug("Gagal terhubung ke server syslog remote ({0}). Exception: {1}".format(self.option, err))
            # Tambahkan logika penanganan error, seperti retry atau notifikasi.
            raise err

        logger.info("Selesai mengirim data ke remote")

        self.save_checkpoint(check_point_tracker)


def get_monitor_option():
    ##########################
    # Konfigurasi dasar
    ##########################

    # Muat parameter dan opsi SLS dari variabel lingkungan.
    endpoint = os.environ.get('SLS_ENDPOINT', '')
    accessKeyId = os.environ.get('SLS_AK_ID', '')
    accessKey = os.environ.get('SLS_AK_KEY', '')
    project = os.environ.get('SLS_PROJECT', '')
    logstore = os.environ.get('SLS_LOGSTORE', '')
    consumer_group = os.environ.get('SLS_CG', '')

    assert endpoint and accessKeyId and accessKey and project and logstore and consumer_group, \
        ValueError("endpoint/accessKeyId/accessKey/project/logstore/consumer_group tidak boleh kosong")

    ##########################
    # Opsi consumer group lanjutan
    ##########################

    # Tidak disarankan mengubah nama konsumen, terutama untuk konsumsi konkuren.
    consumer_name = "{0}-{1}".format(consumer_group, current_process().pid)

    # Titik awal konsumsi. Parameter ini hanya digunakan saat pertama kali dijalankan.
    # Eksekusi berikutnya dilanjutkan dari checkpoint terakhir yang disimpan.
    # Nilai valid: "begin", "end", atau waktu spesifik dalam format ISO 8601.
    cursor_start_time = "2019-1-1 0:0:0+8:00"

    # Interval heartbeat dalam detik. Jika server tidak menerima heartbeat dari konsumen
    # untuk shard tertentu dalam waktu 2 * heartbeat_interval, server menganggap konsumen offline
    # dan menugaskan ulang tugasnya. Jangan atur nilai ini terlalu rendah dalam kondisi jaringan buruk.
    heartbeat_interval = 20

    # Interval maksimum pengambilan data. Jika data baru dihasilkan dengan cepat,
    # Anda tidak perlu menyesuaikan parameter ini.
    data_fetch_interval = 1

    # Buat objek konfigurasi consumer group.
    option = LogHubConfig(endpoint, accessKeyId, accessKey, project, logstore, consumer_group, consumer_name,
                          cursor_position=CursorPosition.SPECIAL_TIMER_CURSOR,
                          cursor_start_time=cursor_start_time,
                          heartbeat_interval=heartbeat_interval,
                          data_fetch_interval=data_fetch_interval)

    # Parameter dan opsi server Syslog
    settings = {
                "host": "1.2.3.4", # Wajib.
                "port": 514,       # Wajib. Nomor port.
                "protocol": "tcp", # Wajib. Bisa TCP, UDP, atau TLS (hanya Python 3).
                "sep": "||",      # Wajib. Pemisah untuk pasangan key=value. Default '||'.
                "cert_path": None,  # Opsional. Path ke file sertifikat TLS.
                "timeout": 120,   # Opsional. Timeout dalam detik. Default 120.
                "facility": syslogclient.FAC_USER,  # Opsional. Lihat nilai syslogclient.FAC_* lainnya.
                "severity": syslogclient.SEV_INFO,  # Opsional. Lihat nilai syslogclient.SEV_* lainnya.
                "hostname": None, # Opsional. Hostname. Default ke hostname mesin lokal.
                "tag": None # Opsional. Tag. Default ke tanda hubung (-).
    }

    return option, settings

# Logika kontrol program utama
def main():
    option, settings = get_monitor_option()

    logger.info("*** mulai mengonsumsi data...")
    worker = ConsumerWorker(SyncData, option, args=(settings,) )
    worker.start(join=True)


if __name__ == '__main__':
    main()

Langkah 2: Konfigurasikan variabel lingkungan

Setelah mengonfigurasi program, lakukan konfigurasi variabel lingkungan sistem sesuai tabel berikut.

Parameter

Nilai

Contoh

SLS_ENDPOINT

  1. Masuk ke Konsol SLS dan klik proyek target Anda.

  2. Klik ikon image di sebelah kanan nama proyek untuk membuka halaman ikhtisar proyek.

  3. Di bagian Endpoint, salin titik akhir publik. Titik akhir lengkapnya adalah https:// + titik akhir publik Anda.

Jika titik akhir diawali dengan https://, misalnya https://cn-beijing.log.aliyuncs.com, aplikasi secara otomatis menggunakan HTTPS untuk terhubung ke SLS. Sertifikat server *.aliyuncs.com dikeluarkan oleh GlobalSign dan dipercaya oleh sebagian besar sistem. Jika sistem Anda tidak mempercayai sertifikat ini, unduh sertifikat tersebut dan ikuti petunjuk Instalasi Sertifikat.

https://cn-beijing.log.aliyuncs.com

SLS_PROJECT

Nama proyek target Anda di Konsol SLS.

my-sls-project-one

SLS_LOGSTORE

Nama penyimpanan log target Anda di Konsol SLS.

my-sls-logstore-a1

SLS_AK_ID

ID AccessKey Pengguna RAM Anda.

Penting
  • Pasangan AccessKey Akun Alibaba Cloud memberikan akses API penuh. Untuk keamanan yang lebih baik, gunakan pasangan AccessKey Pengguna RAM untuk panggilan API atau operasi harian.

  • Untuk mencegah risiko keamanan akibat kebocoran kredensial, jangan pernah menyematkan ID AccessKey dan Secret AccessKey Anda secara langsung dalam kode aplikasi.

L***ky

SLS_AK_KEY

Secret AccessKey Pengguna RAM Anda.

x***Xl

SLS_CG

Nama consumer group. Anda dapat menggunakan nama sederhana seperti "sync_data". Jika grup yang ditentukan belum ada, aplikasi akan membuatnya secara otomatis.

sync_data

Langkah 3: Mulai dan verifikasi

  1. Jalankan beberapa proses konsumen untuk mengaktifkan pemrosesan konkuren. Jumlah maksimum proses konkuren sama dengan jumlah shard dalam penyimpanan log Anda.

    # Jalankan proses konsumen pertama
    nohup python3 sync_data.py &
    # Jalankan proses konsumen kedua
    nohup python3 sync_data.py &
  2. Lihat status consumer group di Konsol SLS.

    1. Di daftar proyek, klik proyek target Anda. Buka tab Log Storage > Logstores. Klik ikon 展开节点 di sebelah nama penyimpanan log target Anda, lalu klik ikon 展开节点 di sebelah Data Consumption.

    2. Di daftar consumer group, klik consumer group target Anda. Di tab Consumer Group Status, lihat klien konsumen dan progres untuk setiap shard.

FAQ

ConsumerGroupQuotaExceed error

Error ini menunjukkan bahwa Anda telah melebihi kuota untuk consumer group. Satu penyimpanan log dapat memiliki maksimal 30 consumer group. Untuk mengatasi masalah ini, hapus consumer group yang tidak digunakan di Konsol SLS.