全部产品
Search
文档中心

ApsaraMQ for RocketMQ:Kirim dan terima pesan normal

更新时间:Jul 02, 2025

Pesan normal adalah pesan tanpa fitur khusus yang disediakan oleh ApsaraMQ for RocketMQ. Pesan ini berbeda dari pesan berciri khas seperti pesan terjadwal, pesan tertunda, pesan terurut, dan pesan transaksional. Topik ini memberikan contoh kode untuk mengirim dan menerima pesan normal menggunakan SDK Klien HTTP untuk Python.

Prasyarat

Sebelum memulai, pastikan langkah-langkah berikut telah dilakukan:

  • Instal SDK untuk Python. Untuk informasi lebih lanjut, lihat Siapkan Lingkungan.

  • Buat sumber daya yang ingin Anda tentukan dalam kode di Konsol ApsaraMQ for RocketMQ. Sumber daya tersebut mencakup instance, topik, dan grup konsumen. Untuk informasi lebih lanjut, lihat Buat Sumber Daya.

  • Dapatkan pasangan AccessKey akun Alibaba Cloud Anda. Untuk informasi lebih lanjut, lihat Buat Pasangan AccessKey.

Kirim pesan normal

Contoh kode berikut menunjukkan cara mengirim pesan normal menggunakan SDK Klien HTTP untuk Python:

import sys

from mq_http_sdk.mq_exception import MQExceptionBase
from mq_http_sdk.mq_producer import *
from mq_http_sdk.mq_client import *
import time

# Inisialisasi klien produser.
mq_client = MQClient(
    # Titik akhir HTTP. Anda dapat memperoleh titik akhir di bagian HTTP Endpoint halaman Detail Instance di konsol ApsaraMQ for RocketMQ.
     "${HTTP_ENDPOINT}",
    # Pastikan variabel lingkungan ALIBABA_CLOUD_ACCESS_KEY_ID dan ALIBABA_CLOUD_ACCESS_KEY_SECRET dikonfigurasi.
    # ID AccessKey yang digunakan untuk otentikasi.
    os.environ['ALIBABA_CLOUD_ACCESS_KEY_ID'],
    # Rahasia AccessKey yang digunakan untuk otentikasi.
    os.environ['ALIBABA_CLOUD_ACCESS_KEY_SECRET']
    )
# Topik tempat pesan diproduksi. Anda harus membuat topik di konsol ApsaraMQ for RocketMQ.
topic_name = "${TOPIC}"
# ID instance tempat topik tersebut berada. Anda harus membuat instance di konsol ApsaraMQ for RocketMQ.
# Jika instance memiliki namespace, ID instance harus ditentukan. Jika instance tidak memiliki namespace, atur parameter instanceID ke string kosong. Anda dapat memperoleh namespace instance di halaman Detail Instance di konsol ApsaraMQ for RocketMQ.
instance_id = "${INSTANCE_ID}"

producer = mq_client.get_producer(instance_id, topic_name)

# Kirim empat pesan secara siklik.
msg_count = 4
print("%sPublikasikan Pesan Ke %s\nNamaTopik:%s\nJumlahPesan:%s\n" % (10 * "=", 10 * "=", topic_name, msg_count))

try:
    for i in range(msg_count):
            msg = TopicMessage(
                    # Isi pesan.
                    "Saya adalah pesan uji %s.halo" % i,
                    # Tag pesan.
                    "tag %s" % i
                        )
            # Properti khusus pesan.
            msg.put_property("a", i)
            # Kunci pesan.
            msg.set_message_key("KunciPesan")
            re_msg = producer.publish_message(msg)
            print("Publikasikan Pesan Berhasil. IDPesan:%s, BodyMD5:%s" % (re_msg.message_id, re_msg.message_body_md5))

except MQExceptionBase as e:
    if e.type == "TopicNotExist":
         print("Topik tidak ada, silakan buat.")
         sys.exit(1)
    print("Gagal Memublikasikan Pesan. Pengecualian:%s" % e)

Berlangganan pesan normal

Contoh kode berikut menunjukkan cara berlangganan pesan normal menggunakan SDK Klien HTTP untuk Python:

from mq_http_sdk.mq_exception import MQExceptionBase
from mq_http_sdk.mq_consumer import *
from mq_http_sdk.mq_client import *

# Inisialisasi klien konsumen.
mq_client = MQClient(
    # Titik akhir HTTP. Anda dapat memperoleh titik akhir di bagian HTTP Endpoint halaman Detail Instance di konsol ApsaraMQ for RocketMQ.
    "${HTTP_ENDPOINT}",
    # Pastikan variabel lingkungan ALIBABA_CLOUD_ACCESS_KEY_ID dan ALIBABA_CLOUD_ACCESS_KEY_SECRET dikonfigurasi.
    # ID AccessKey yang digunakan untuk otentikasi.
    os.environ['ALIBABA_CLOUD_ACCESS_KEY_ID'],
    # Rahasia AccessKey yang digunakan untuk otentikasi.
    os.environ['ALIBABA_CLOUD_ACCESS_KEY_SECRET']
     )
# Topik tempat pesan diproduksi. Anda harus membuat topik di konsol ApsaraMQ for RocketMQ.
topic_name = "${TOPIC}"
# ID grup konsumen yang Anda buat di konsol ApsaraMQ for RocketMQ.
group_id = "${GROUP_ID}"
# ID instance tempat topik tersebut berada. Anda harus membuat instance di konsol ApsaraMQ for RocketMQ.
# Jika instance memiliki namespace, ID instance harus ditentukan. Jika instance tidak memiliki namespace, atur parameter instanceID ke string kosong. Anda dapat memperoleh namespace instance di halaman Detail Instance di konsol ApsaraMQ for RocketMQ.
instance_id = "${INSTANCE_ID}"

consumer = mq_client.get_consumer(instance_id, topic_name, group_id)

# Dalam mode polling panjang, jika tidak ada pesan dalam topik yang tersedia untuk dikonsumsi, permintaan ditangguhkan pada broker selama periode waktu yang ditentukan. Jika pesan menjadi tersedia untuk dikonsumsi dalam periode waktu yang ditentukan, respons segera dikirim ke konsumen. Dalam contoh ini, nilainya ditentukan sebagai 3 detik.
# Periode polling panjang. Satuan: detik. Dalam contoh ini, nilainya ditentukan sebagai 3. Nilai maksimum yang dapat Anda tentukan adalah 30.
wait_seconds = 3
# Jumlah maksimum pesan yang dapat dikonsumsi sekaligus. Dalam contoh ini, nilainya ditentukan sebagai 3. Nilai maksimum yang dapat Anda tentukan adalah 16.
batch = 3
print(("%sKonsumsi Dan Akui Pesan Dari Topik%s\nNamaTopik:%s\nMQKonsumen:%s\nTungguDetik:%s\n" \
        % (10 * "=", 10 * "=", topic_name, group_id, wait_seconds)))
while True:
    try:
        # Konsumsi pesan dalam mode polling panjang.
        recv_msgs = consumer.consume_message(batch, wait_seconds)
        for msg in recv_msgs:
            print(("Terima, IDPesan: %s\nBodyMD5Pesan: %s \
                              \nTagPesan: %s\nDikonsumsiBerapaKali: %s \
                              \nWaktuPublikasi: %s\nIsi: %s \
                              \nWaktuKonsumsiBerikutnya: %s \
                              \nHandlePenerimaan: %s \
                              \nProperti: %s\n" % \
                  (msg.message_id, msg.message_body_md5,
                   msg.message_tag, msg.consumed_times,
                   msg.publish_time, msg.message_body,
                   msg.next_consume_time, msg.receipt_handle, msg.properties)))
    except MQExceptionBase as e:
        # Tidak ada pesan dalam topik yang tersedia untuk dikonsumsi.
        if e.type == "MessageNotExist":
            print(("Tidak ada pesan baru! IDPermintaan: %s" % e.req_id))
            continue

        print(("Gagal Mengonsumsi Pesan! Pengecualian:%s\n" % e))
        time.sleep(2)
        continue

    # Jika broker gagal menerima pengakuan (ACK) untuk pesan dari konsumen sebelum periode waktu yang ditentukan oleh parameter msg.next_consume_time berakhir, broker akan mengirimkan pesan untuk dikonsumsi lagi.
    # Timestamp unik ditentukan untuk handle pesan setiap kali pesan dikonsumsi.
    try:
        receipt_handle_list = [msg.receipt_handle for msg in recv_msgs]
        consumer.ack_message(receipt_handle_list)
        print(("Akui %s Pesan Berhasil.\n\n" % len(receipt_handle_list)))
    except MQExceptionBase as e:
        print(("\nGagal Mengakui Pesan! Pengecualian:%s" % e))
        # Jika handle pesan kedaluwarsa, broker tidak dapat menerima ACK untuk pesan dari konsumen.
        if e.sub_errors:
            for sub_error in e.sub_errors:
                print(("\tErrorHandle:%s,KodeError:%s,PesanError:%s" % \
                      (sub_error["ReceiptHandle"], sub_error["ErrorCode"], sub_error["ErrorMessage"])))