Dengan menggunakan kelompok konsumen untuk mengonsumsi log, Anda dapat fokus pada logika bisnis tanpa perlu memikirkan detail implementasi seperti operasi Layanan Log Sederhana, penyeimbangan beban, atau failover antar konsumen selama konsumsi data log. Topik ini menjelaskan cara menggunakan SDK Layanan Log Sederhana untuk Python guna membuat kelompok konsumen dan menggunakannya untuk mengonsumsi log, serta menyediakan contoh kode.
Prasyarat
Pengguna Resource Access Management (RAM) telah dibuat, dan izin yang diperlukan telah diberikan kepada pengguna RAM. Untuk informasi lebih lanjut, lihat Buat Pengguna RAM dan Berikan Izin kepada Pengguna RAM.
Variabel lingkungan ALIBABA_CLOUD_ACCESS_KEY_ID dan ALIBABA_CLOUD_ACCESS_KEY_SECRET telah dikonfigurasi. Untuk informasi lebih lanjut, lihat Konfigurasikan Variabel Lingkungan di Linux, macOS, dan Windows.
PentingPair AccessKey akun Alibaba Cloud memiliki izin untuk semua Operasi API. Kami menyarankan Anda menggunakan pair AccessKey dari pengguna RAM untuk memanggil Operasi API atau melakukan pemeliharaan rutin O&M.
Kami menyarankan Anda untuk tidak menyimpan ID AccessKey atau rahasia AccessKey dalam kode proyek Anda. Jika tidak, pair AccessKey mungkin bocor, dan keamanan semua sumber daya dalam akun Anda mungkin terganggu.
SDK Layanan Log Sederhana untuk Python telah diinstal. Untuk informasi lebih lanjut, lihat Instal SDK Layanan Log Sederhana untuk Python.
Proyek telah dibuat. Untuk informasi lebih lanjut, lihat Contoh Kode yang Digunakan untuk Membuat Proyek.
Catatan penggunaan
Dalam contoh ini, Titik akhir publik Layanan Log Sederhana untuk wilayah Tiongkok (Hangzhou) digunakan, yaitu https://cn-hangzhou.log.aliyuncs.com. Jika Anda ingin mengakses Layanan Log Sederhana dengan menggunakan layanan Alibaba Cloud lainnya yang berada di wilayah yang sama dengan proyek Anda, Anda dapat menggunakan Titik akhir internal Layanan Log Sederhana, yaitu https://cn-hangzhou-intranet.log.aliyuncs.com. Untuk informasi lebih lanjut tentang wilayah dan titik akhir yang didukung oleh Layanan Log Sederhana, lihat Titik Akhir.
Contoh kode
Contoh kode berikut menunjukkan cara membuat penyimpanan log, menulis log ke penyimpanan log, membuat kelompok konsumen, dan menggunakan kelompok konsumen untuk mengonsumsi log:
import os
import time
from aliyun.log.consumer import *
from aliyun.log import *
from threading import RLock
class SampleConsumer(ConsumerProcessorBase):
shard_id = -1
last_check_time = 0
log_results = []
lock = RLock()
def initialize(self, shard):
self.shard_id = shard
def process(self, log_groups, check_point_tracker):
for log_group in log_groups.LogGroups:
items = []
for log in log_group.Logs:
item = dict()
item['time'] = log.Time
for content in log.Contents:
item[content.Key] = content.Value
items.append(item)
log_items = dict()
log_items['topic'] = log_group.Topic
log_items['source'] = log_group.Source
log_items['logs'] = items
with SampleConsumer.lock:
SampleConsumer.log_results.append(log_items)
print(log_items)
current_time = time.time()
if current_time - self.last_check_time > 3:
try:
self.last_check_time = current_time
check_point_tracker.save_check_point(True)
except Exception:
import traceback
traceback.print_exc()
else:
try:
check_point_tracker.save_check_point(False)
except Exception:
import traceback
traceback.print_exc()
# None means succesful process
# If you need to roll back to the previous checkpoint, return check_point_tracker.get_check_point().
return None
def shutdown(self, check_point_tracker):
try:
check_point_tracker.save_check_point(True)
except Exception:
import traceback
traceback.print_exc()
test_item_count = 20
# Write logs to the Logstore.
def _prepare_data(client, project, logstore):
topic = 'python-ide-test'
source = ''
for i in range(0, test_item_count):
logitemList = [] # LogItem list
contents = [
('user', 'magic_user_' + str(i)),
('avg', 'magic_age_' + str(i))
]
logItem = LogItem()
logItem.set_time(int(time.time()))
logItem.set_contents(contents)
logitemList.append(logItem)
# Write logs to the Logstore.
request = PutLogsRequest(project, logstore, topic, source, logitemList)
response = client.put_logs(request)
print("successfully put logs in logstore")
def sleep_until(seconds, exit_condition=None, expect_error=False):
if not exit_condition:
time.sleep(seconds)
return
s = time.time()
while time.time() - s < seconds:
try:
if exit_condition():
break
except Exception:
if expect_error:
continue
time.sleep(1)
def sample_consumer_group():
# The Simple Log Service endpoint. In this example, the Simple Log Service endpoint for the China (Hangzhou) region is used. Replace the parameter value with the actual endpoint.
endpoint = os.environ.get('ALIYUN_LOG_SAMPLE_ENDPOINT', 'cn-hangzhou.log.aliyuncs.com')
# Configure environment variables. In this example, the AccessKey ID and AccessKey secret are obtained from environment variables.
accessKeyId = os.environ.get('ALIBABA_CLOUD_ACCESS_KEY_ID', '')
accessKey = os.environ.get('ALIBABA_CLOUD_ACCESS_KEY_SECRET', '')
# The name of the project. In this example, the project is not created by using the SDK. Before you can run the code, you must create a project.
project = os.environ.get('ALIYUN_LOG_SAMPLE_PROJECT', 'ali-test-project-python')
# The name of the Logstore. In this example, the Logstore is automatically created by using the SDK. You do not need to create a Logstore in advance.
logstore = 'ali-test-logstore'
# The name of the consumer group. When the SDK runs, a consumer group is automatically created. You do not need to create a consumer group in advance.
consumer_group = 'consumer-group-1'
consumer_name1 = "consumer-group-1-A"
consumer_name2 = "consumer-group-1-B"
token = ""
if not logstore:
logstore = 'consumer_group_test_' + str(time.time()).replace('.', '_')
assert endpoint and accessKeyId and accessKey and project, ValueError("endpoint/access_id/key and "
"project cannot be empty")
# Create the Logstore.
client = LogClient(endpoint, accessKeyId, accessKey, token)
ret = client.create_logstore(project, logstore, 2, 4)
print("successfully create logstore")
time.sleep(60)
SampleConsumer.log_results = []
try:
# Write logs to the Logstore.
_prepare_data(client, project, logstore)
# Create two consumers in the consumer group to consume data.
option1 = LogHubConfig(endpoint, accessKeyId, accessKey, project, logstore, consumer_group,
consumer_name1, cursor_position=CursorPosition.BEGIN_CURSOR, heartbeat_interval=6,
data_fetch_interval=1)
option2 = LogHubConfig(endpoint, accessKeyId, accessKey, project, logstore, consumer_group,
consumer_name2, cursor_position=CursorPosition.BEGIN_CURSOR, heartbeat_interval=6,
data_fetch_interval=1)
print("*** start to consume data...")
client_worker1 = ConsumerWorker(SampleConsumer, consumer_option=option1)
client_worker1.start()
client_worker2 = ConsumerWorker(SampleConsumer, consumer_option=option2)
client_worker2.start()
sleep_until(300, lambda: len(SampleConsumer.log_results) >= test_item_count)
print("*** consumer group status ***")
ret = client.list_consumer_group(project, logstore)
print("successfully list consumergroup")
for c in ret.get_consumer_groups():
ret = client.get_check_point_fixed(project, logstore, c.get_consumer_group_name())
print("successfully get checkpoint fixed")
print("*** stopping workers")
client_worker1.shutdown()
client_worker2.shutdown()
finally:
# clean-up
# ret = client.delete_logstore(project, logstore)
ret = client.list_logstore(project, logstore)
print("successfully list logstore")
# validate
ret = str(SampleConsumer.log_results)
print("*** get content:")
print(ret)
assert 'magic_user_0' in ret and 'magic_age_0' in ret \
and 'magic_user_' + str(test_item_count-1) in ret \
and 'magic_age_' + str(test_item_count-1) in ret
if __name__ == '__main__':
sample_consumer_group()Hasil yang Diharapkan:
successfully create logstore
successfully put logs in logstore
successfully put logs in logstore
successfully put logs in logstore
successfully put logs in logstore
successfully put logs in logstore
......
*** start to consume data...
{'topic': 'python-ide-test', 'source': '47.100.XX.XX', 'logs': [{'time': 1672123452, 'user': 'magic_user_0', 'avg': 'magic_age_0'}]}
{'topic': 'python-ide-test', 'source': '47.100.XX.XX', 'logs': [{'time': 1672123453, 'user': 'magic_user_1', 'avg': 'magic_age_1'}]}
{'topic': 'python-ide-test', 'source': '47.100.XX.XX', 'logs': [{'time': 1672123453, 'user': 'magic_user_3', 'avg': 'magic_age_3'}]}
{'topic': 'python-ide-test', 'source': '47.100.XX.XX', 'logs': [{'time': 1672123453, 'user': 'magic_user_5', 'avg': 'magic_age_5'}]}
{'topic': 'python-ide-test', 'source': '47.100.XX.XX', 'logs': [{'time': 1672123453, 'user': 'magic_user_6', 'avg': 'magic_age_6'}]}
......
*** consumer group status ***
successfully list consumergroup
successfully get checkpoint fixed
*** stopping workers
successfully list logstore
*** get content:
[{'topic': 'python-ide-test', 'source': '47.100.XX.XX', 'logs': [{'time': 1672123452, 'user': 'magic_user_0', 'avg': 'magic_age_0'}]}, {'topic': 'python-ide-test', 'source': '47.100.XX.XX', 'logs': [{'time': 1672123453, 'user': 'magic_user_1', 'avg': 'magic_age_1'}]}, {'topic': 'python-ide-test', 'source': '47.100.XX.XX', 'logs': [{'time': 1672123453, 'user': 'magic_user_3', 'avg': 'magic_age_3'}]}, {'topic': 'python-ide-test', 'source': '47.100.XX.XX', 'logs': [{'time': 1672123453, 'user': 'magic_user_5', 'avg': 'magic_age_5'}]}, {'topic': 'python-ide-test', 'source': '47.100.XX.XX', 'logs': [{'time': 1672123453, 'user': 'magic_user_6', 'avg': 'magic_age_6'}]}, {'topic': 'python-ide-test', 'source': '47.100.XX.XX', 'logs': [{'time': 1672123453, 'user': 'magic_user_8', 'avg': 'magic_age_8'}]}, {'topic': 'python-ide-test', 'source': '47.100.XX.XX', 'logs': [{'time': 1672123453, 'user': 'magic_user_9', 'avg': 'magic_age_9'}]}, {'topic': 'python-ide-test', 'source': '47.100.XX.XX', 'logs': [{'time': 1672123453, 'user': 'magic_user_2', 'avg': 'magic_age_2'}]}, {'topic': 'python-ide-test', 'source': '47.100.XX.XX', 'logs': [{'time': 1672123453, 'user': 'magic_user_11', 'avg': 'magic_age_11'}]}, ......}]Referensi
Layanan Log Sederhana juga kompatibel dengan SDK Alibaba Cloud. Untuk informasi lebih lanjut, lihat Simple Log Service_SDK Center_Alibaba Cloud OpenAPI Explorer.
Layanan Log Sederhana menyediakan Antarmuka Baris Perintah (CLI) untuk memenuhi persyaratan konfigurasi otomatis dalam Layanan Log Sederhana. Untuk informasi lebih lanjut, lihat Ikhtisar CLI Layanan Log Sederhana.
Untuk lebih banyak contoh kode, lihat Alibaba Cloud Simple Log Service SDK for Python di GitHub.