Pesan terurut adalah jenis pesan yang disediakan oleh ApsaraMQ for RocketMQ. Pesan ini diterbitkan dan dikonsumsi dalam urutan first-in-first-out (FIFO) yang ketat. Topik ini memberikan contoh kode untuk mengirim dan menerima pesan terurut menggunakan SDK klien HTTP untuk Python.
Informasi latar belakang
Pesan terurut dibagi menjadi jenis-jenis berikut:
Pesan terurut global: Jika pesan dalam sebuah topik termasuk jenis ini, pesan tersebut diterbitkan dan dikonsumsi dalam urutan FIFO.
Pesan terurut berpartisi: Jika pesan dalam sebuah topik termasuk jenis ini, pesan tersebut didistribusikan ke partisi berbeda menggunakan kunci sharding. Pesan dalam setiap partisi dikonsumsi dalam urutan FIFO. Kunci sharding adalah bidang kunci yang digunakan untuk pesan terurut guna mengidentifikasi partisi. Kunci sharding berbeda dari kunci pesan.
Untuk informasi lebih lanjut, lihat Pesan Terurut.
Prasyarat
Sebelum memulai, pastikan operasi berikut telah dilakukan:
Instal SDK untuk Python. Untuk informasi lebih lanjut, lihat Persiapkan Lingkungan.
Buat sumber daya yang ingin Anda tentukan dalam kode di konsol ApsaraMQ for RocketMQ. Sumber daya tersebut mencakup instance, topik, dan grup konsumen. Untuk informasi lebih lanjut, lihat Buat Sumber Daya.
Peroleh pasangan AccessKey akun Alibaba Cloud Anda. Untuk informasi lebih lanjut, lihat Buat Pasangan AccessKey.
Kirim pesan terurut
Broker ApsaraMQ for RocketMQ menentukan urutan pembuatan pesan berdasarkan urutan pengiriman menggunakan satu produser atau thread untuk mengirim pesan. Jika pengirim menggunakan beberapa produser atau thread untuk mengirim pesan secara bersamaan, urutan pesan ditentukan oleh urutan penerimaan pesan oleh broker ApsaraMQ for RocketMQ. Urutan ini mungkin berbeda dari urutan pengiriman di sisi bisnis.
Contoh kode berikut menunjukkan cara mengirim pesan terurut menggunakan SDK klien HTTP untuk Python:
import sys
from mq_http_sdk.mq_exception import MQExceptionBase
from mq_http_sdk.mq_producer import *
from mq_http_sdk.mq_client import *
# Inisialisasi klien produser.
mq_client = MQClient(
# Titik akhir HTTP. Anda dapat memperoleh titik akhir di bagian HTTP Endpoint pada halaman Detail Instance di konsol ApsaraMQ for RocketMQ.
"${HTTP_ENDPOINT}",
# Pastikan variabel lingkungan ALIBABA_CLOUD_ACCESS_KEY_ID dan ALIBABA_CLOUD_ACCESS_KEY_SECRET telah dikonfigurasi.
# ID AccessKey yang digunakan untuk otentikasi.
os.environ['ALIBABA_CLOUD_ACCESS_KEY_ID'],
# Rahasia AccessKey yang digunakan untuk otentikasi.
os.environ['ALIBABA_CLOUD_ACCESS_KEY_SECRET']
)
# Topik tempat pesan diproduksi. Anda harus membuat topik di konsol ApsaraMQ for RocketMQ.
topic_name = "${TOPIC}"
# ID instance tempat topik tersebut berada. Anda harus membuat instance di konsol ApsaraMQ for RocketMQ.
# Jika instance memiliki namespace, ID instance harus ditentukan. Jika instance tidak memiliki namespace, atur parameter instanceID sebagai string kosong. Anda dapat memperoleh namespace instance di halaman Detail Instance di konsol ApsaraMQ for RocketMQ.
instance_id = "${INSTANCE_ID}"
producer = mq_client.get_producer(instance_id, topic_name)
# Kirim delapan pesan secara siklik.
msg_count = 8
print("%sPublikasikan Pesan Ke %s\nNamaTopik:%s\nJumlahPesan:%s\n" % (10 * "=", 10 * "=", topic_name, msg_count))
try:
for i in range(msg_count):
msg = TopicMessage(
# Isi pesan.
"Saya adalah pesan uji %s.halo" % i,
# Tag pesan.
"tag %s" % i
)
# Properti kustom pesan.
msg.put_property("a", str(i))
# Kunci sharding yang digunakan untuk mendistribusikan pesan terurut ke partisi tertentu. Kunci sharding dapat digunakan untuk mengidentifikasi partisi yang berbeda. Kunci sharding berbeda dari kunci pesan.
msg.set_sharding_key(str(i % 3))
re_msg = producer.publish_message(msg)
print("Publikasikan Pesan Berhasil. IDPesan:%s, BodyMD5:%s" % (re_msg.message_id, re_msg.message_body_md5))
except MQExceptionBase as e:
if e.type == "TopicNotExist":
print("Topik tidak ada, silakan buat.")
sys.exit(1)
print("Gagal Memublikasikan Pesan. Pengecualian:%s" % e)
Berlangganan pesan terurut
Contoh kode berikut menunjukkan cara berlangganan pesan terurut menggunakan SDK klien HTTP untuk Python:
from mq_http_sdk.mq_exception import MQExceptionBase
from mq_http_sdk.mq_consumer import *
from mq_http_sdk.mq_client import *
# Inisialisasi klien konsumen.
mq_client = MQClient(
# Titik akhir HTTP. Anda dapat memperoleh titik akhir di bagian HTTP Endpoint pada halaman Detail Instance di konsol ApsaraMQ for RocketMQ.
"${HTTP_ENDPOINT}",
# Pastikan variabel lingkungan ALIBABA_CLOUD_ACCESS_KEY_ID dan ALIBABA_CLOUD_ACCESS_KEY_SECRET telah dikonfigurasi.
# ID AccessKey yang digunakan untuk otentikasi.
os.environ['ALIBABA_CLOUD_ACCESS_KEY_ID'],
# Rahasia AccessKey yang digunakan untuk otentikasi.
os.environ['ALIBABA_CLOUD_ACCESS_KEY_SECRET']
)
# Topik tempat pesan diproduksi. Anda harus membuat topik di konsol ApsaraMQ for RocketMQ.
topic_name = "${TOPIC}"
# ID grup konsumen yang Anda buat di konsol ApsaraMQ for RocketMQ.
group_id = "${GROUP_ID}"
# ID instance tempat topik tersebut berada. Anda harus membuat instance di konsol ApsaraMQ for RocketMQ.
# Jika instance memiliki namespace, ID instance harus ditentukan. Jika instance tidak memiliki namespace, atur parameter instanceID sebagai string kosong. Anda dapat memperoleh namespace instance di halaman Detail Instance di konsol ApsaraMQ for RocketMQ.
instance_id = "${INSTANCE_ID}"
consumer = mq_client.get_consumer(instance_id, topic_name, group_id)
# Konsumsi pesan dalam mode polling panjang. Konsumen mungkin menarik pesan terurut berpartisi dari beberapa partisi. Konsumen mengonsumsi pesan dalam partisi yang sama sesuai dengan urutan pengiriman pesan.
# Jika broker gagal menerima pengakuan (ACK) untuk pesan dalam partisi dari konsumen, broker akan mengirimkan pesan yang sama dalam partisi ke konsumen lagi.
# Konsumen hanya dapat mengonsumsi batch pesan berikutnya dari partisi setelah semua pesan yang ditarik dari partisi dalam batch sebelumnya diakui sebagai dikonsumsi.
# Dalam mode polling panjang, jika tidak ada pesan dalam topik yang tersedia untuk dikonsumsi, permintaan ditangguhkan di broker selama periode waktu yang ditentukan. Jika pesan tersedia untuk dikonsumsi dalam periode waktu yang ditentukan, respons segera dikirim ke konsumen. Dalam contoh ini, ditentukan sebagai 3 detik.
wait_seconds = 3
# Jumlah maksimum pesan yang dapat dikonsumsi sekaligus. Dalam contoh ini, nilainya ditentukan sebagai 3. Nilai terbesar yang dapat Anda tentukan adalah 16.
batch = 3
print(("%sKonsumsi Dan Ak Pesan Dari Topik%s\nNamaTopik:%s\nMQKonsumen:%s\nTungguDetik:%s\n" \
% (10 * "=", 10 * "=", topic_name, group_id, wait_seconds)))
while True:
try:
recv_msgs = consumer.consume_message_orderly(batch, wait_seconds)
print("=======>Terima %d pesan:" % len(recv_msgs))
for msg in recv_msgs:
print("\tIDPesan: %s, BodyMD5Pesan: %s,WaktuKonsumsiBerikutnya: %s,JumlahDikonsumsi: %s,WaktuPublikasi: %s\n\tIsi: %s \
\n\tHandlePenerimaan: %s \
\n\tProperti: %s,KunciSharding: %s\n" % \
(msg.message_id, msg.message_body_md5,
msg.next_consume_time, msg.consumed_times,
msg.publish_time, msg.message_body,
msg.receipt_handle, msg.properties, msg.get_sharding_key()))
except MQExceptionBase as e:
if e.type == "MessageNotExist":
print(("Tidak ada pesan baru! RequestId: %s" % e.req_id))
continue
print(("Gagal Mengonsumsi Pesan! Pengecualian:%s\n" % e))
time.sleep(2)
continue
# Jika broker tidak menerima ACK dari konsumen sebelum periode waktu yang ditentukan oleh parameter msg.next_consume_time berakhir, broker akan mengirimkan pesan ke konsumen lagi.
# Timestamp unik ditentukan untuk handle pesan setiap kali pesan dikonsumsi.
try:
receipt_handle_list = [msg.receipt_handle for msg in recv_msgs]
consumer.ack_message(receipt_handle_list)
print(("========>Ak %s Pesan Berhasil.\n\n" % len(receipt_handle_list)))
except MQExceptionBase as e:
print(("\nGagal Mengakui Pesan! Pengecualian:%s" % e))
# Jika handle pesan habis waktu, broker tidak dapat menerima ACK untuk pesan dari konsumen.
if e.sub_errors:
for sub_error in e.sub_errors:
print(("\tHandleError:%s,KodeKesalahan:%s,PesanKesalahan:%s" % \
(sub_error["ReceiptHandle"], sub_error["ErrorCode"], sub_error["ErrorMessage"])))