全部产品
Search
文档中心

Simple Log Service:Gunakan kelompok konsumen untuk mengonsumsi log

更新时间:Jul 02, 2025

Dengan menggunakan kelompok konsumen untuk mengonsumsi log, Anda dapat fokus pada logika bisnis tanpa perlu memikirkan detail implementasi seperti operasi Layanan Log Sederhana, penyeimbangan beban, atau failover antar konsumen selama konsumsi data log. Topik ini menjelaskan cara menggunakan SDK Layanan Log Sederhana untuk Python guna membuat kelompok konsumen dan menggunakannya untuk mengonsumsi log, serta menyediakan contoh kode.

Prasyarat

  • Pengguna Resource Access Management (RAM) telah dibuat, dan izin yang diperlukan telah diberikan kepada pengguna RAM. Untuk informasi lebih lanjut, lihat Buat Pengguna RAM dan Berikan Izin kepada Pengguna RAM.

  • Variabel lingkungan ALIBABA_CLOUD_ACCESS_KEY_ID dan ALIBABA_CLOUD_ACCESS_KEY_SECRET telah dikonfigurasi. Untuk informasi lebih lanjut, lihat Konfigurasikan Variabel Lingkungan di Linux, macOS, dan Windows.

    Penting
    • Pair AccessKey akun Alibaba Cloud memiliki izin untuk semua Operasi API. Kami menyarankan Anda menggunakan pair AccessKey dari pengguna RAM untuk memanggil Operasi API atau melakukan pemeliharaan rutin O&M.

    • Kami menyarankan Anda untuk tidak menyimpan ID AccessKey atau rahasia AccessKey dalam kode proyek Anda. Jika tidak, pair AccessKey mungkin bocor, dan keamanan semua sumber daya dalam akun Anda mungkin terganggu.

  • SDK Layanan Log Sederhana untuk Python telah diinstal. Untuk informasi lebih lanjut, lihat Instal SDK Layanan Log Sederhana untuk Python.

  • Proyek telah dibuat. Untuk informasi lebih lanjut, lihat Contoh Kode yang Digunakan untuk Membuat Proyek.

Catatan penggunaan

Dalam contoh ini, Titik akhir publik Layanan Log Sederhana untuk wilayah Tiongkok (Hangzhou) digunakan, yaitu https://cn-hangzhou.log.aliyuncs.com. Jika Anda ingin mengakses Layanan Log Sederhana dengan menggunakan layanan Alibaba Cloud lainnya yang berada di wilayah yang sama dengan proyek Anda, Anda dapat menggunakan Titik akhir internal Layanan Log Sederhana, yaitu https://cn-hangzhou-intranet.log.aliyuncs.com. Untuk informasi lebih lanjut tentang wilayah dan titik akhir yang didukung oleh Layanan Log Sederhana, lihat Titik Akhir.

Contoh kode

Contoh kode berikut menunjukkan cara membuat penyimpanan log, menulis log ke penyimpanan log, membuat kelompok konsumen, dan menggunakan kelompok konsumen untuk mengonsumsi log:

import os
import time

from aliyun.log.consumer import *
from aliyun.log import *
from threading import RLock


class SampleConsumer(ConsumerProcessorBase):
    shard_id = -1
    last_check_time = 0
    log_results = []
    lock = RLock()

    def initialize(self, shard):
        self.shard_id = shard

    def process(self, log_groups, check_point_tracker):
        for log_group in log_groups.LogGroups:
            items = []
            for log in log_group.Logs:
                item = dict()
                item['time'] = log.Time
                for content in log.Contents:
                    item[content.Key] = content.Value
                items.append(item)
            log_items = dict()
            log_items['topic'] = log_group.Topic
            log_items['source'] = log_group.Source
            log_items['logs'] = items

            with SampleConsumer.lock:
                SampleConsumer.log_results.append(log_items)
                print(log_items)

        current_time = time.time()
        if current_time - self.last_check_time > 3:
            try:
                self.last_check_time = current_time
                check_point_tracker.save_check_point(True)
            except Exception:
                import traceback
                traceback.print_exc()
        else:
            try:
                check_point_tracker.save_check_point(False)
            except Exception:
                import traceback
                traceback.print_exc()

        # None means succesful process
        # If you need to roll back to the previous checkpoint, return check_point_tracker.get_check_point().
        return None

    def shutdown(self, check_point_tracker):
        try:
            check_point_tracker.save_check_point(True)
        except Exception:
            import traceback
            traceback.print_exc()


test_item_count = 20

# Write logs to the Logstore. 
def _prepare_data(client, project, logstore):
    topic = 'python-ide-test'
    source = ''

    for i in range(0, test_item_count):
        logitemList = []  # LogItem list

        contents = [
            ('user', 'magic_user_' + str(i)),
            ('avg', 'magic_age_' + str(i))
        ]
        logItem = LogItem()
        logItem.set_time(int(time.time()))
        logItem.set_contents(contents)

        logitemList.append(logItem)
        # Write logs to the Logstore. 
        request = PutLogsRequest(project, logstore, topic, source, logitemList)

        response = client.put_logs(request)
        print("successfully put logs in logstore")


def sleep_until(seconds, exit_condition=None, expect_error=False):
    if not exit_condition:
        time.sleep(seconds)
        return

    s = time.time()
    while time.time() - s < seconds:
        try:
            if exit_condition():
                break
        except Exception:
            if expect_error:
                continue
        time.sleep(1)


def sample_consumer_group():
    # The Simple Log Service endpoint. In this example, the Simple Log Service endpoint for the China (Hangzhou) region is used. Replace the parameter value with the actual endpoint. 
    endpoint = os.environ.get('ALIYUN_LOG_SAMPLE_ENDPOINT', 'cn-hangzhou.log.aliyuncs.com')

    # Configure environment variables. In this example, the AccessKey ID and AccessKey secret are obtained from environment variables. 
    accessKeyId = os.environ.get('ALIBABA_CLOUD_ACCESS_KEY_ID', '')
    accessKey = os.environ.get('ALIBABA_CLOUD_ACCESS_KEY_SECRET', '')

    # The name of the project. In this example, the project is not created by using the SDK. Before you can run the code, you must create a project. 
    project = os.environ.get('ALIYUN_LOG_SAMPLE_PROJECT', 'ali-test-project-python')

    # The name of the Logstore. In this example, the Logstore is automatically created by using the SDK. You do not need to create a Logstore in advance. 
    logstore = 'ali-test-logstore'

    # The name of the consumer group. When the SDK runs, a consumer group is automatically created. You do not need to create a consumer group in advance. 
    consumer_group = 'consumer-group-1'
    consumer_name1 = "consumer-group-1-A"
    consumer_name2 = "consumer-group-1-B"
    token = ""

    if not logstore:
        logstore = 'consumer_group_test_' + str(time.time()).replace('.', '_')

    assert endpoint and accessKeyId and accessKey and project, ValueError("endpoint/access_id/key and "
                                                                          "project cannot be empty")

    # Create the Logstore. 
    client = LogClient(endpoint, accessKeyId, accessKey, token)
    ret = client.create_logstore(project, logstore, 2, 4)
    print("successfully create logstore")

    time.sleep(60)
    SampleConsumer.log_results = []

    try:
        # Write logs to the Logstore. 
        _prepare_data(client, project, logstore)

        # Create two consumers in the consumer group to consume data. 
        option1 = LogHubConfig(endpoint, accessKeyId, accessKey, project, logstore, consumer_group,
                               consumer_name1, cursor_position=CursorPosition.BEGIN_CURSOR, heartbeat_interval=6,
                               data_fetch_interval=1)
        option2 = LogHubConfig(endpoint, accessKeyId, accessKey, project, logstore, consumer_group,
                               consumer_name2, cursor_position=CursorPosition.BEGIN_CURSOR, heartbeat_interval=6,
                               data_fetch_interval=1)

        print("*** start to consume data...")
        client_worker1 = ConsumerWorker(SampleConsumer, consumer_option=option1)
        client_worker1.start()
        client_worker2 = ConsumerWorker(SampleConsumer, consumer_option=option2)
        client_worker2.start()

        sleep_until(300, lambda: len(SampleConsumer.log_results) >= test_item_count)

        print("*** consumer group status ***")
        ret = client.list_consumer_group(project, logstore)
        print("successfully list consumergroup")

        for c in ret.get_consumer_groups():
            ret = client.get_check_point_fixed(project, logstore, c.get_consumer_group_name())
            print("successfully get checkpoint fixed")

        print("*** stopping workers")
        client_worker1.shutdown()
        client_worker2.shutdown()

    finally:
        # clean-up
        # ret = client.delete_logstore(project, logstore)
        ret = client.list_logstore(project, logstore)
        print("successfully list logstore")

    # validate
    ret = str(SampleConsumer.log_results)
    print("*** get content:")
    print(ret)

    assert 'magic_user_0' in ret and 'magic_age_0' in ret \
           and 'magic_user_' + str(test_item_count-1) in ret \
           and 'magic_age_' + str(test_item_count-1) in ret


if __name__ == '__main__':
    sample_consumer_group()

Hasil yang Diharapkan:

successfully create logstore
successfully put logs in logstore
successfully put logs in logstore
successfully put logs in logstore
successfully put logs in logstore
successfully put logs in logstore
......
*** start to consume data...
{'topic': 'python-ide-test', 'source': '47.100.XX.XX', 'logs': [{'time': 1672123452, 'user': 'magic_user_0', 'avg': 'magic_age_0'}]}
{'topic': 'python-ide-test', 'source': '47.100.XX.XX', 'logs': [{'time': 1672123453, 'user': 'magic_user_1', 'avg': 'magic_age_1'}]}
{'topic': 'python-ide-test', 'source': '47.100.XX.XX', 'logs': [{'time': 1672123453, 'user': 'magic_user_3', 'avg': 'magic_age_3'}]}
{'topic': 'python-ide-test', 'source': '47.100.XX.XX', 'logs': [{'time': 1672123453, 'user': 'magic_user_5', 'avg': 'magic_age_5'}]}
{'topic': 'python-ide-test', 'source': '47.100.XX.XX', 'logs': [{'time': 1672123453, 'user': 'magic_user_6', 'avg': 'magic_age_6'}]}
......
*** consumer group status ***
successfully list consumergroup
successfully get checkpoint fixed
*** stopping workers
successfully list logstore
*** get content:
[{'topic': 'python-ide-test', 'source': '47.100.XX.XX', 'logs': [{'time': 1672123452, 'user': 'magic_user_0', 'avg': 'magic_age_0'}]}, {'topic': 'python-ide-test', 'source': '47.100.XX.XX', 'logs': [{'time': 1672123453, 'user': 'magic_user_1', 'avg': 'magic_age_1'}]}, {'topic': 'python-ide-test', 'source': '47.100.XX.XX', 'logs': [{'time': 1672123453, 'user': 'magic_user_3', 'avg': 'magic_age_3'}]}, {'topic': 'python-ide-test', 'source': '47.100.XX.XX', 'logs': [{'time': 1672123453, 'user': 'magic_user_5', 'avg': 'magic_age_5'}]}, {'topic': 'python-ide-test', 'source': '47.100.XX.XX', 'logs': [{'time': 1672123453, 'user': 'magic_user_6', 'avg': 'magic_age_6'}]}, {'topic': 'python-ide-test', 'source': '47.100.XX.XX', 'logs': [{'time': 1672123453, 'user': 'magic_user_8', 'avg': 'magic_age_8'}]}, {'topic': 'python-ide-test', 'source': '47.100.XX.XX', 'logs': [{'time': 1672123453, 'user': 'magic_user_9', 'avg': 'magic_age_9'}]}, {'topic': 'python-ide-test', 'source': '47.100.XX.XX', 'logs': [{'time': 1672123453, 'user': 'magic_user_2', 'avg': 'magic_age_2'}]}, {'topic': 'python-ide-test', 'source': '47.100.XX.XX', 'logs': [{'time': 1672123453, 'user': 'magic_user_11', 'avg': 'magic_age_11'}]}, ......}]

Referensi