ApsaraMQ for RocketMQ menyediakan fitur pemrosesan transaksi terdistribusi yang mirip dengan eXtended Architecture (X/Open XA). ApsaraMQ for RocketMQ menggunakan pesan transaksional untuk memastikan konsistensi transaksi. Topik ini memberikan contoh kode tentang cara mengirim dan menerima pesan transaksional menggunakan SDK klien HTTP untuk Python.
Informasi latar belakang
Gambar berikut menunjukkan proses interaksi pesan transaksional.

Untuk informasi lebih lanjut, lihat Pesan Transaksional.
Prasyarat
Sebelum memulai, pastikan operasi berikut telah dilakukan:
Instal SDK untuk Python. Untuk informasi lebih lanjut, lihat Siapkan Lingkungan.
Buat sumber daya yang ingin Anda tentukan dalam kode di konsol ApsaraMQ for RocketMQ. Sumber daya tersebut mencakup instance, topik, dan grup konsumen. Untuk informasi lebih lanjut, lihat Buat Sumber Daya.
Dapatkan pasangan AccessKey dari akun Alibaba Cloud Anda. Untuk informasi lebih lanjut, lihat Buat Pasangan AccessKey.
Kirim pesan transaksional
Contoh kode berikut menunjukkan cara mengirim pesan transaksional menggunakan SDK klien HTTP untuk Python:
#!/usr/bin/env python
# coding=utf8
import sys
from mq_http_sdk.mq_exception import MQExceptionBase
from mq_http_sdk.mq_producer import *
from mq_http_sdk.mq_client import *
import time
import threading
# Inisialisasi klien produser.
mq_client = MQClient(
# Titik akhir HTTP. Anda dapat memperoleh titik akhir di bagian HTTP Endpoint pada halaman Detail Instance di konsol ApsaraMQ for RocketMQ.
"${HTTP_ENDPOINT}",
# Pastikan variabel lingkungan ALIBABA_CLOUD_ACCESS_KEY_ID dan ALIBABA_CLOUD_ACCESS_KEY_SECRET dikonfigurasi.
# ID AccessKey yang digunakan untuk autentikasi.
os.environ['ALIBABA_CLOUD_ACCESS_KEY_ID'],
# Rahasia AccessKey yang digunakan untuk autentikasi.
os.environ['ALIBABA_CLOUD_ACCESS_KEY_SECRET']
)
# Topik tempat pesan diproduksi. Anda harus membuat topik di konsol ApsaraMQ for RocketMQ.
topic_name = "${TOPIC}"
# ID grup konsumen yang Anda buat di konsol ApsaraMQ for RocketMQ.
group_id = "${GROUP_ID}"
# ID instance tempat topik berada. Anda harus membuat instance di konsol ApsaraMQ for RocketMQ.
# Jika instance memiliki namespace, ID instance harus ditentukan. Jika instance tidak memiliki namespace, atur parameter instanceID ke string kosong. Anda dapat memperoleh namespace instance di halaman Detail Instance di konsol ApsaraMQ for RocketMQ.
instance_id = "${INSTANCE_ID}"
# Kirim empat pesan transaksional secara siklik.
msg_count = 4
print("%sPublikasikan Pesan Transaksi Ke %s\nNamaTopik:%s\nJumlahPesan:%s\n" \
% (10 * "=", 10 * "=", topic_name, msg_count))
def process_trans_error(exp):
print("\nCommit/Roll Pesan Transaksi Gagal! Pengecualian:%s" % exp)
# Jika jumlah waktu yang diperlukan untuk melakukan commit atau rollback pesan transaksional melebihi nilai parameter TransCheckImmunityTime atau periode timeout yang ditentukan untuk handle consumeHalfMessage, operasi commit atau rollback gagal. Parameter TransCheckImmunityTime menentukan periode timeout untuk handle pesan transaksional. Dalam contoh ini, periode timeout untuk handle consumeHalfMessage adalah 10 detik.
if exp.sub_errors:
for sub_error in exp.sub_errors:
print("\tHandleKesalahan:%s,KodeKesalahan:%s,PesanKesalahan:%s" % \
(sub_error["ReceiptHandle"], sub_error["ErrorCode"], sub_error["ErrorMessage"]))
# Klien memerlukan thread atau proses untuk memproses pesan transaksional yang belum diakui.
# Mulai thread untuk memproses pesan transaksional yang belum diakui.
class ConsumeHalfMessageThread(threading.Thread):
def __init__(self):
threading.Thread.__init__(self)
self.count = 0
# Buat klien lain.
self.mq_client = MQClient(
# Atur titik akhir HTTP. Di konsol Message Queue for Apache RocketMQ, pergi ke bagian HTTP endpoint pada halaman Detail Instance.
"${HTTP_ENDPOINT}",
# ID AccessKey yang Anda buat di konsol Resource Access Management (RAM) untuk autentikasi.
"${ACCESS_KEY}",
# Rahasia AccessKey yang Anda buat di konsol RAM untuk autentikasi.
"${SECRET_KEY}"
)
self.trans_producer = self.mq_client.get_trans_producer(instance_id, topic_name, group_id)
def run(self):
while 1:
if self.count == 3:
break;
try:
half_msgs = self.trans_producer.consume_half_message(1, 3)
for half_msg in half_msgs:
print("Terima Pesan Setengah, IDPesan: %s\nMD5IsiPesan: %s \
\nTagPesan: %s\nWaktuDikonsumsi: %s \
\nWaktuPublikasi: %s\nIsi: %s \
\nWaktuKonsumsiBerikutnya: %s \
\nHandlePenerimaan: %s \
\nProperti: %s" % \
(half_msg.message_id, half_msg.message_body_md5,
half_msg.message_tag, half_msg.consumed_times,
half_msg.publish_time, half_msg.message_body,
half_msg.next_consume_time, half_msg.receipt_handle, half_msg.properties))
a = int(half_msg.get_property("a"))
try:
if a == 1:
# Konfirmasi untuk commit pesan transaksional.
self.trans_producer.commit(half_msg.receipt_handle)
self.count += 1
print("------>commit")
elif a == 2 and half_msg.consumed_times > 1:
# Konfirmasi untuk commit pesan transaksional.
self.trans_producer.commit(half_msg.receipt_handle)
self.count += 1
print("------>commit")
elif a == 3:
# Konfirmasi untuk rollback pesan transaksional.
self.trans_producer.rollback(half_msg.receipt_handle)
self.count += 1
print("------>rollback")
else:
# Periksa status lain kali.
print("------>tidak diketahui")
except MQExceptionBase as rec_commit_roll_e:
process_trans_error(rec_commit_roll_e)
except MQExceptionBase as half_e:
if half_e.type == "MessageNotExist":
print("Tidak ada pesan setengah! RequestId: %s" % half_e.req_id)
continue
print("Konsumsi pesan setengah Gagal! Pengecualian:%s\n" % half_e)
break
consume_half_thread = ConsumeHalfMessageThread()
consume_half_thread.setDaemon(True)
consume_half_thread.start()
try:
trans_producer = mq_client.get_trans_producer(instance_id, topic_name, group_id)
for i in range(msg_count):
msg = TopicMessage(
# Isi pesan.
"Saya adalah pesan uji %s." % i,
# Tag pesan.
"tagA"
)
# Properti kustom pesan.
msg.put_property("a", i)
# Kunci pesan.
msg.set_message_key("KunciPesan")
# Interval waktu antara waktu pengiriman pesan transaksional dan waktu mulai pemeriksaan pertama status transaksi lokal. Interval waktu ini menentukan waktu relatif saat status pertama kali diperiksa. Unit: detik. Nilai valid: 10 hingga 300.
# Jika pesan tidak dicommit atau dirollback setelah pemeriksaan pertama status transaksi lokal, broker memulai permintaan pemeriksaan status transaksi lokal setiap 10 detik dalam 24 jam berikutnya.
msg.set_trans_check_immunity_time(10)
re_msg = trans_producer.publish_message(msg)
print("Publikasikan Pesan Transaksi Berhasil. IDPesan:%s, MD5Isi:%s, Handle:%s" \
% (re_msg.message_id, re_msg.message_body_md5, re_msg.receipt_handle))
time.sleep(1)
if i == 0:
# Setelah produser mengirim pesan transaksional, broker mendapatkan handle pesan setengah yang sesuai dengan pesan transaksional, dan dapat langsung commit atau rollback pesan setengah.
try:
trans_producer.commit(re_msg.receipt_handle)
except MQExceptionBase as pub_commit_roll_e:
process_trans_error(pub_commit_roll_e)
except MQExceptionBase as pub_e:
if pub_e.type == "TopicNotExist":
print("Topik tidak ada, silakan buat.")
sys.exit(1)
print("Publikasi Pesan Gagal. Pengecualian:%s" % pub_e)
while 1:
if not consume_half_thread.is_alive():
break
time.sleep(1)Berlangganan pesan transaksional
Contoh kode berikut menunjukkan cara berlangganan pesan transaksional menggunakan SDK klien HTTP untuk Python:
from mq_http_sdk.mq_exception import MQExceptionBase
from mq_http_sdk.mq_consumer import *
from mq_http_sdk.mq_client import *
# Inisialisasi klien konsumen.
mq_client = MQClient(
# Titik akhir HTTP. Anda dapat memperoleh titik akhir di bagian HTTP Endpoint pada halaman Detail Instance di konsol ApsaraMQ for RocketMQ.
"${HTTP_ENDPOINT}",
# Pastikan variabel lingkungan ALIBABA_CLOUD_ACCESS_KEY_ID dan ALIBABA_CLOUD_ACCESS_KEY_SECRET dikonfigurasi.
# ID AccessKey yang digunakan untuk autentikasi.
os.environ['ALIBABA_CLOUD_ACCESS_KEY_ID'],
# Rahasia AccessKey yang digunakan untuk autentikasi.
os.environ['ALIBABA_CLOUD_ACCESS_KEY_SECRET']
)
# Topik tempat pesan diproduksi. Anda harus membuat topik di konsol ApsaraMQ for RocketMQ.
topic_name = "${TOPIC}"
# ID grup konsumen yang Anda buat di konsol ApsaraMQ for RocketMQ.
group_id = "${GROUP_ID}"
# ID instance tempat topik berada. Anda harus membuat instance di konsol ApsaraMQ for RocketMQ.
# Jika instance memiliki namespace, ID instance harus ditentukan. Jika instance tidak memiliki namespace, atur parameter instanceID ke string kosong. Anda dapat memperoleh namespace instance di halaman Detail Instance di konsol ApsaraMQ for RocketMQ.
instance_id = "${INSTANCE_ID}"
consumer = mq_client.get_consumer(instance_id, topic_name, group_id)
# Dalam mode polling panjang, jika tidak ada pesan dalam topik yang tersedia untuk dikonsumsi, permintaan ditangguhkan di broker selama periode waktu tertentu. Jika pesan menjadi tersedia untuk dikonsumsi dalam periode waktu tertentu, respons segera dikirim ke konsumen. Dalam contoh ini, nilainya ditentukan sebagai 3 detik.
# Periode polling panjang. Unit: detik. Dalam contoh ini, nilainya ditentukan sebagai 3. Nilai maksimum yang dapat Anda tentukan adalah 30.
wait_seconds = 3
# Jumlah maksimum pesan yang dapat dikonsumsi sekaligus. Dalam contoh ini, nilainya ditentukan sebagai 3. Nilai maksimum yang dapat Anda tentukan adalah 16.
batch = 3
print(("%sKonsumsi Dan Ak Pesan Dari Topik%s\nNamaTopik:%s\nMQKonsumen:%s\nDetikTunggu:%s\n" \
% (10 * "=", 10 * "=", topic_name, group_id, wait_seconds)))
while True:
try:
# Konsumsi pesan dalam mode polling panjang.
recv_msgs = consumer.consume_message(batch, wait_seconds)
for msg in recv_msgs:
print(("Terima, IDPesan: %s\nMD5IsiPesan: %s \
\nTagPesan: %s\nWaktuDikonsumsi: %s \
\nWaktuPublikasi: %s\nIsi: %s \
\nWaktuKonsumsiBerikutnya: %s \
\nHandlePenerimaan: %s \
\nProperti: %s\n" % \
(msg.message_id, msg.message_body_md5,
msg.message_tag, msg.consumed_times,
msg.publish_time, msg.message_body,
msg.next_consume_time, msg.receipt_handle, msg.properties)))
except MQExceptionBase as e:
# Tidak ada pesan dalam topik yang tersedia untuk dikonsumsi.
if e.type == "MessageNotExist":
print(("Tidak ada pesan baru! RequestId: %s" % e.req_id))
continue
print(("Konsumsi Pesan Gagal! Pengecualian:%s\n" % e))
time.sleep(2)
continue
# Jika broker gagal menerima pengakuan (ACK) untuk pesan dari konsumen sebelum periode waktu yang ditentukan oleh parameter msg.next_consume_time berakhir, broker mengirimkan pesan untuk dikonsumsi lagi.
# Timestamp unik ditentukan untuk handle pesan setiap kali pesan dikonsumsi.
try:
receipt_handle_list = [msg.receipt_handle for msg in recv_msgs]
consumer.ack_message(receipt_handle_list)
print(("Ak %s Pesan Berhasil.\n\n" % len(receipt_handle_list)))
except MQExceptionBase as e:
print(("\nAk Pesan Gagal! Pengecualian:%s" % e))
# Jika handle pesan habis waktu, broker tidak dapat menerima ACK untuk pesan dari konsumen.
if e.sub_errors:
for sub_error in e.sub_errors:
print(("\tHandleKesalahan:%s,KodeKesalahan:%s,PesanKesalahan:%s" % \
(sub_error["ReceiptHandle"], sub_error["ErrorCode"], sub_error["ErrorMessage"])))