Pesan normal adalah pesan tanpa fitur khusus yang disediakan oleh ApsaraMQ for RocketMQ. Pesan ini berbeda dari pesan berciri khas seperti pesan terjadwal, pesan tertunda, pesan terurut, dan pesan transaksional. Topik ini memberikan contoh kode untuk mengirim dan menerima pesan normal menggunakan SDK klien HTTP untuk Node.js.
Prasyarat
Sebelum memulai, pastikan langkah-langkah berikut telah dilakukan:
Instal SDK untuk Node.js. Untuk informasi lebih lanjut, lihat Persiapkan Lingkungan.
Buat sumber daya yang ingin Anda tentukan dalam kode di konsol ApsaraMQ for RocketMQ. Sumber daya tersebut mencakup instance, topik, dan grup konsumen. Untuk informasi lebih lanjut, lihat Buat Sumber Daya.
Peroleh pasangan AccessKey dari akun Alibaba Cloud Anda. Untuk informasi lebih lanjut, lihat Buat Pasangan AccessKey.
Kirim pesan normal
Contoh kode berikut menunjukkan cara mengirim pesan normal menggunakan SDK klien HTTP untuk Node.js:
const {
MQClient,
MessageProperties
} = require('@aliyunmq/mq-http-sdk');
// Titik akhir HTTP. Anda dapat memperoleh titik akhir di bagian HTTP Endpoint pada halaman Detail Instance di konsol ApsaraMQ for RocketMQ.
const endpoint = "${HTTP_ENDPOINT}";
// Pastikan variabel lingkungan ALIBABA_CLOUD_ACCESS_KEY_ID dan ALIBABA_CLOUD_ACCESS_KEY_SECRET telah dikonfigurasi.
// ID AccessKey yang digunakan untuk autentikasi.
const accessKeyId = process.env['ALIBABA_CLOUD_ACCESS_KEY_ID'];
// Rahasia AccessKey yang digunakan untuk autentikasi.
const accessKeySecret = process.env['ALIBABA_CLOUD_ACCESS_KEY_SECRET'];
var client = new MQClient(endpoint, accessKeyId, accessKeySecret);
// Topik tempat pesan diproduksi. Anda harus membuat topik di konsol ApsaraMQ for RocketMQ.
const topic = "${TOPIC}";
// ID instance tempat topik tersebut berada. Anda harus membuat instance di konsol ApsaraMQ for RocketMQ.
// Jika instance memiliki namespace, tentukan ID instance. Jika instance tidak memiliki namespace, atur parameter instanceID ke null atau string kosong. Anda dapat memperoleh namespace instance di halaman Detail Instance di konsol ApsaraMQ for RocketMQ.
const instanceId = "${INSTANCE_ID}";
const producer = client.getProducer(instanceId, topic);
(async function(){
try {
// Kirim empat pesan secara siklik.
for(var i = 0; i < 4; i++) {
let res;
msgProps = new MessageProperties();
// Atribut kustom pesan.
msgProps.putProperty("a", i);
// Kunci pesan.
msgProps.messageKey("MessageKey");
// Tubuh dan tag pesan.
res = await producer.publishMessage("hello mq.", "TagA", msgProps);
console.log("Publish message: MessageID:%s,BodyMD5:%s", res.body.MessageId, res.body.MessageBodyMD5);
}
} catch(e) {
// Tentukan logika yang ingin Anda gunakan untuk mengirim ulang atau menyimpan pesan jika pengiriman pesan gagal dan perlu dikirim ulang.
console.log(e)
}
})();Berlangganan pesan normal
Contoh kode berikut menunjukkan cara berlangganan pesan normal menggunakan SDK klien HTTP untuk Node.js:
const {
MQClient
} = require('@aliyunmq/mq-http-sdk');
// Titik akhir HTTP. Anda dapat memperoleh titik akhir di bagian HTTP Endpoint pada halaman Detail Instance di konsol ApsaraMQ for RocketMQ.
const endpoint = "${HTTP_ENDPOINT}";
// Pastikan variabel lingkungan ALIBABA_CLOUD_ACCESS_KEY_ID dan ALIBABA_CLOUD_ACCESS_KEY_SECRET telah dikonfigurasi.
// ID AccessKey yang digunakan untuk autentikasi.
const accessKeyId = process.env['ALIBABA_CLOUD_ACCESS_KEY_ID'];
// Rahasia AccessKey yang digunakan untuk autentikasi.
const accessKeySecret = process.env['ALIBABA_CLOUD_ACCESS_KEY_SECRET'];
var client = new MQClient(endpoint, accessKeyId, accessKeySecret);
// Topik tempat pesan diproduksi. Anda harus membuat topik di konsol ApsaraMQ for RocketMQ.
const topic = "${TOPIC}";
// ID grup konsumen yang Anda buat di konsol ApsaraMQ for RocketMQ.
const groupId = "${GROUP_ID}";
// ID instance tempat topik tersebut berada. Anda harus membuat instance di konsol ApsaraMQ for RocketMQ.
// Jika instance memiliki namespace, tentukan ID instance. Jika instance tidak memiliki namespace, atur parameter instanceID ke null atau string kosong. Anda dapat memperoleh namespace instance di halaman Detail Instance di konsol ApsaraMQ for RocketMQ.
const instanceId = "${INSTANCE_ID}";
const consumer = client.getConsumer(instanceId, topic, groupId);
(async function(){
// Konsumsi pesan secara siklik.
while(true) {
try {
// Konsumsi pesan dalam mode polling panjang.
// Dalam mode polling panjang, jika tidak ada pesan dalam topik yang tersedia untuk dikonsumsi, permintaan ditangguhkan di broker selama periode waktu yang ditentukan. Jika pesan menjadi tersedia untuk dikonsumsi dalam periode waktu yang ditentukan, broker segera mengirimkan respons ke konsumen. Dalam contoh ini, nilainya ditentukan sebagai 3 detik.
res = await consumer.consumeMessage(
3, // Jumlah maksimum pesan yang dapat dikonsumsi sekaligus. Dalam contoh ini, nilainya ditentukan sebagai 3. Nilai terbesar yang dapat Anda tentukan adalah 16.
3 // Durasi periode polling panjang. Satuan: detik. Dalam contoh ini, nilainya ditentukan sebagai 3. Nilai maksimum yang dapat Anda tentukan adalah 30.
);
if (res.code == 200) {
// Logika konsumsi pesan.
console.log("Konsumsi Pesan, requestId:%s", res.requestId);
const handles = res.body.map((message) => {
console.log("\tMessageId:%s,Tag:%s,WaktuPublikasi:%d,WaktuKonsumsiBerikutnya:%d,WaktuKonsumsiPertama:%d,JumlahKonsumsi:%d,Tubuh:%s" +
",Prop:%j,KunciPesan:%s,Prop-A:%s",
message.MessageId, message.MessageTag, message.PublishTime, message.NextConsumeTime, message.FirstConsumeTime, message.ConsumedTimes,
message.MessageBody,message.Properties,message.MessageKey,message.Properties.a);
return message.ReceiptHandle;
});
// Jika broker tidak menerima pengakuan (ACK) untuk pesan dari konsumen sebelum periode waktu yang ditentukan oleh parameter message.NextConsumeTime berakhir, pesan tersebut dikonsumsi lagi.
// Timestamp unik ditentukan untuk handle pesan setiap kali pesan tersebut dikonsumsi.
res = await consumer.ackMessage(handles);
if (res.code != 204) {
// Jika handle pesan kedaluwarsa, broker gagal menerima ACK untuk pesan dari konsumen.
console.log("Gagal Mengonfirmasi Pesan:");
const failHandles = res.body.map((error)=>{
console.log("\tHandleError:%s, Kode:%s, Alasan:%s\n", error.ReceiptHandle, error.ErrorCode, error.ErrorMessage);
return error.ReceiptHandle;
});
handles.forEach((handle)=>{
if (failHandles.indexOf(handle) < 0) {
console.log("\tHandleSukses:%s\n", handle);
}
});
} else {
// Memperoleh ACK dari konsumen.
console.log("Konfirmasi Pesan sukses, RequestId:%s\n\t", res.requestId, handles.join(','));
}
}
} catch(e) {
if (e.Code.indexOf("MessageNotExist") > -1) {
// Jika tidak ada pesan yang tersedia untuk dikonsumsi dalam topik, mode polling panjang terus berlaku.
console.log("Konsumsi Pesan: tidak ada pesan baru, RequestId:%s, Kode:%s", e.RequestId, e.Code);
} else {
console.log(e);
}
}
}
})();