ApsaraMQ for RocketMQ menyediakan fitur pemrosesan transaksi terdistribusi yang serupa dengan eXtended Architecture (X/Open XA). ApsaraMQ for RocketMQ menggunakan pesan transaksional untuk memastikan konsistensi akhir dari transaksi terdistribusi. Topik ini memberikan contoh kode tentang cara mengirim dan menerima pesan transaksional menggunakan HTTP client SDK untuk Node.js.
Informasi latar belakang
Gambar berikut mengilustrasikan proses interaksi pesan transaksional.

Untuk informasi lebih lanjut, lihat Pesan Transaksional.
Prasyarat
Sebelum memulai, pastikan langkah-langkah berikut telah dilakukan:
Instal SDK untuk Node.js. Untuk informasi lebih lanjut, lihat Persiapkan Lingkungan.
Buat sumber daya yang ingin Anda tentukan dalam kode di Konsol ApsaraMQ for RocketMQ. Sumber daya tersebut mencakup instance, topik, dan grup konsumen. Untuk informasi lebih lanjut, lihat Buat Sumber Daya.
Peroleh pasangan AccessKey dari akun Alibaba Cloud Anda. Untuk informasi lebih lanjut, lihat Buat Pasangan AccessKey.
Kirim pesan transaksional
Contoh kode berikut menunjukkan cara mengirim pesan transaksional menggunakan HTTP client SDK untuk Node.js:
const {
MQClient,
MessageProperties
} = require('@aliyunmq/mq-http-sdk');
// Titik akhir HTTP. Anda dapat memperoleh titik akhir di bagian HTTP Endpoint pada halaman Detail Instance di konsol ApsaraMQ for RocketMQ.
const endpoint = "${HTTP_ENDPOINT}";
// Pastikan variabel lingkungan ALIBABA_CLOUD_ACCESS_KEY_ID dan ALIBABA_CLOUD_ACCESS_KEY_SECRET telah dikonfigurasi.
// ID AccessKey yang digunakan untuk autentikasi.
const accessKeyId = process.env['ALIBABA_CLOUD_ACCESS_KEY_ID'];
// Rahasia AccessKey yang digunakan untuk autentikasi.
const accessKeySecret = process.env['ALIBABA_CLOUD_ACCESS_KEY_SECRET'];
var client = new MQClient(endpoint, accessKeyId, accessKeySecret);
// Topik tempat pesan diproduksi. Anda harus membuat topik di konsol ApsaraMQ for RocketMQ.
const topic = "${TOPIC}";
// ID grup konsumen yang Anda buat di konsol ApsaraMQ for RocketMQ.
const groupId = "${GROUP_ID}";
// ID instance tempat topik berada. Anda harus membuat instance di konsol ApsaraMQ for RocketMQ.
// Jika instance memiliki namespace, tentukan ID instance. Jika instance tidak memiliki namespace, atur parameter instanceID ke null atau string kosong. Anda dapat memperoleh namespace instance di halaman Detail Instance di konsol ApsaraMQ for RocketMQ.
const instanceId = "${INSTANCE_ID}";
const mqTransProducer = client.getTransProducer(instanceId, topic, groupId);
async function processTransResult(res, msgId) {
if (!res) {
return;
}
if (res.code != 204) {
// Jika pesan transaksional tidak dikomit atau dibatalkan sebelum periode timeout yang ditentukan oleh parameter TransCheckImmunityTime untuk penanganan pesan transaksional berakhir atau sebelum periode timeout yang ditentukan untuk penanganan consumeHalfMessage berakhir, operasi commit atau rollback gagal. Dalam contoh ini, periode timeout untuk penanganan consumeHalfMessage ditentukan sebagai 10 detik.
console.log("Commit/Rollback Pesan Gagal:");
const failHandles = res.body.map((error) => {
console.log("\tErrorHandle:%s, Kode:%s, Alasan:%s\n", error.ReceiptHandle, error.ErrorCode, error.ErrorMessage);
return error.ReceiptHandle;
});
} else {
console.log("Commit/Rollback Pesan berhasil!!! %s", msgId);
}
}
var halfMessageCount = 0;
var halfMessageConsumeCount = 0;
(async function(){
try {
// Kirim empat pesan transaksional secara siklik.
for(var i = 0; i < 4; i++) {
let res;
msgProps = new MessageProperties();
// Atribut kustom pesan.
msgProps.putProperty("a", i);
// Kunci pesan.
msgProps.messageKey("MessageKey");
// Interval waktu antara waktu pengiriman pesan transaksional dan waktu mulai pemeriksaan pertama status transaksi lokal. Interval waktu ini menentukan waktu relatif saat status diperiksa pertama kali. Satuan: detik. Nilai valid: 10 hingga 300.
// Jika pesan tidak dikomit atau dibatalkan setelah pemeriksaan balik pertama status transaksi lokal, broker memulai permintaan pemeriksaan balik status transaksi lokal setiap 10 detik dalam 24 jam.
msgProps.transCheckImmunityTime(10);
res = await mqTransProducer.publishMessage("hello mq.", "tagA", msgProps);
console.log("Publikasikan pesan: MessageID:%s,BodyMD5:%s,Handle:%s", res.body.MessageId, res.body.MessageBodyMD5, res.body.ReceiptHandle);
if (res && i == 0) {
// Setelah produsen mengirim pesan transaksional, broker memperoleh handle dari half message yang sesuai dengan pesan transaksional dan melakukan commit atau rollback pesan transaksional berdasarkan status handle.
const msgId = res.body.MessageId;
res = await mqTransProducer.commit(res.body.ReceiptHandle);
console.log("Commit pesan saat publikasi, %s", msgId);
// Jika pesan transaksional tidak dikomit atau dibatalkan sebelum periode timeout yang ditentukan oleh parameter TransCheckImmunityTime berakhir, operasi commit atau rollback gagal.
processTransResult(res, msgId);
}
}
} catch(e) {
// Tentukan logika yang ingin Anda gunakan untuk mengirim ulang atau menyimpan pesan jika pesan gagal dikirim dan perlu dikirim ulang.
console.log(e)
}
})();
// Klien memerlukan thread atau proses untuk memproses pesan transaksional yang belum diakui.
// Proses pesan transaksional yang belum diakui.
(async function() {
// Periksa pesan setengah jadi secara siklik. Ini mirip dengan mengonsumsi pesan normal.
while(halfMessageCount < 3 && halfMessageConsumeCount < 15) {
try {
halfMessageConsumeCount++;
res = await mqTransProducer.consumeHalfMessage(3, 3);
if (res.code == 200) {
// Logika konsumsi pesan.
console.log("Konsumsi Pesan, requestId:%s", res.requestId);
res.body.forEach(async (message) => {
console.log("\tMessageId:%s,Tag:%s,WaktuPublikasi:%d,WaktuKonsumsiBerikutnya:%d,WaktuKonsumsiPertama:%d,JumlahKonsumsi:%d,Isi:%s" +
",Prop:%j,KunciPesan:%s,Prop-A:%s",
message.MessageId, message.MessageTag, message.PublishTime, message.NextConsumeTime, message.FirstConsumeTime, message.ConsumedTimes,
message.MessageBody,message.Properties,message.MessageKey,message.Properties.a);
var propA = message.Properties && message.Properties.a ? parseInt(message.Properties.a) : 0;
var opResp;
if (propA == 1 || (propA == 2 && message.ConsumedTimes > 1)) {
opResp = await mqTransProducer.commit(message.ReceiptHandle);
console.log("Commit pesan saat memeriksa setengah, %s", message.MessageId);
halfMessageCount++;
} else if (propA == 3) {
opResp = await mqTransProducer.rollback(message.ReceiptHandle);
console.log("Rollback pesan saat memeriksa setengah, %s", message.MessageId);
halfMessageCount++;
}
processTransResult(opResp, message.MessageId);
});
}
} catch(e) {
if (e.Code && e.Code.indexOf("MessageNotExist") > -1) {
// Jika tidak ada pesan yang tersedia untuk dikonsumsi di topik, mode polling panjang tetap berlaku.
console.log("Konsumsi Pesan Transaksi Setengah: tidak ada pesan baru, RequestId:%s, Kode:%s", e.RequestId, e.Code);
} else {
console.log(e);
}
}
}
})(); Berlangganan pesan transaksional
Contoh kode berikut menunjukkan cara berlangganan pesan transaksional menggunakan HTTP client SDK untuk Node.js:
const {
MQClient
} = require('@aliyunmq/mq-http-sdk');
// Titik akhir HTTP. Anda dapat memperoleh titik akhir di bagian HTTP Endpoint pada halaman Detail Instance di konsol ApsaraMQ for RocketMQ.
const endpoint = "${HTTP_ENDPOINT}";
// Pastikan variabel lingkungan ALIBABA_CLOUD_ACCESS_KEY_ID dan ALIBABA_CLOUD_ACCESS_KEY_SECRET telah dikonfigurasi.
// ID AccessKey yang digunakan untuk autentikasi.
const accessKeyId = process.env['ALIBABA_CLOUD_ACCESS_KEY_ID'];
// Rahasia AccessKey yang digunakan untuk autentikasi.
const accessKeySecret = process.env['ALIBABA_CLOUD_ACCESS_KEY_SECRET'];
var client = new MQClient(endpoint, accessKeyId, accessKeySecret);
// Topik tempat pesan diproduksi. Anda harus membuat topik di konsol ApsaraMQ for RocketMQ.
const topic = "${TOPIC}";
// ID grup konsumen yang Anda buat di konsol ApsaraMQ for RocketMQ.
const groupId = "${GROUP_ID}";
// ID instance tempat topik berada. Anda harus membuat instance di konsol ApsaraMQ for RocketMQ.
// Jika instance memiliki namespace, tentukan ID instance. Jika instance tidak memiliki namespace, atur parameter instanceID ke null atau string kosong. Anda dapat memperoleh namespace instance di halaman Detail Instance di konsol ApsaraMQ for RocketMQ.
const instanceId = "${INSTANCE_ID}";
const consumer = client.getConsumer(instanceId, topic, groupId);
(async function(){
// Konsumsi pesan secara siklik.
while(true) {
try {
// Konsumsi pesan dalam mode polling panjang.
// Dalam mode polling panjang, jika tidak ada pesan di topik yang tersedia untuk dikonsumsi, permintaan ditangguhkan di broker selama periode waktu yang ditentukan. Jika pesan menjadi tersedia untuk dikonsumsi dalam periode waktu yang ditentukan, broker segera mengirim respons ke konsumen. Dalam contoh ini, nilainya ditentukan sebagai 3 detik.
res = await consumer.consumeMessage(
3, // Jumlah maksimum pesan yang dapat dikonsumsi sekaligus. Dalam contoh ini, nilainya ditentukan sebagai 3. Nilai terbesar yang dapat Anda tentukan adalah 16.
3 // Durasi periode polling panjang. Satuan: detik. Dalam contoh ini, nilainya ditentukan sebagai 3. Nilai maksimum yang dapat Anda tentukan adalah 30.
);
if (res.code == 200) {
// Logika konsumsi pesan.
console.log("Konsumsi Pesan, requestId:%s", res.requestId);
const handles = res.body.map((message) => {
console.log("\tMessageId:%s,Tag:%s,WaktuPublikasi:%d,WaktuKonsumsiBerikutnya:%d,WaktuKonsumsiPertama:%d,JumlahKonsumsi:%d,Isi:%s" +
",Prop:%j,KunciPesan:%s,Prop-A:%s",
message.MessageId, message.MessageTag, message.PublishTime, message.NextConsumeTime, message.FirstConsumeTime, message.ConsumedTimes,
message.MessageBody,message.Properties,message.MessageKey,message.Properties.a);
return message.ReceiptHandle;
});
// Jika broker tidak menerima pengakuan (ACK) untuk pesan dari konsumen sebelum periode waktu yang ditentukan oleh parameter message.NextConsumeTime berakhir, pesan dikonsumsi lagi.
// Timestamp unik ditentukan untuk handle pesan setiap kali pesan dikonsumsi.
res = await consumer.ackMessage(handles);
if (res.code != 204) {
// Jika handle pesan kedaluwarsa, broker gagal menerima ACK untuk pesan dari konsumen.
console.log("Ack Pesan Gagal:");
const failHandles = res.body.map((error)=>{
console.log("\tErrorHandle:%s, Kode:%s, Alasan:%s\n", error.ReceiptHandle, error.ErrorCode, error.ErrorMessage);
return error.ReceiptHandle;
});
handles.forEach((handle)=>{
if (failHandles.indexOf(handle) < 0) {
console.log("\tSucHandle:%s\n", handle);
}
});
} else {
// Memperoleh ACK dari konsumen.
console.log("Ack Pesan berhasil, RequestId:%s\n\t", res.requestId, handles.join(','));
}
}
} catch(e) {
if (e.Code.indexOf("MessageNotExist") > -1) {
// Jika tidak ada pesan yang tersedia untuk dikonsumsi di topik, mode polling panjang tetap berlaku.
console.log("Konsumsi Pesan: tidak ada pesan baru, RequestId:%s, Kode:%s", e.RequestId, e.Code);
} else {
console.log(e);
}
}
}
})();