Pesan normal adalah pesan tanpa fitur khusus yang disediakan oleh ApsaraMQ for RocketMQ. Pesan ini berbeda dari pesan berciri khas seperti pesan terjadwal, pesan tertunda, pesan terurut, dan pesan transaksional. Topik ini memberikan contoh kode untuk mengirim dan menerima pesan normal menggunakan SDK klien HTTP untuk C++.
Prasyarat
Sebelum memulai, pastikan langkah-langkah berikut telah dilakukan:
Instal SDK untuk C++. Untuk informasi lebih lanjut, lihat Persiapkan Lingkungan.
Buat sumber daya yang ingin Anda tentukan dalam kode di Konsol ApsaraMQ for RocketMQ. Sumber daya tersebut mencakup instance, topik, dan grup konsumen. Untuk informasi lebih lanjut, lihat Buat Sumber Daya.
Dapatkan pasangan AccessKey akun Alibaba Cloud Anda. Untuk informasi lebih lanjut, lihat Buat Pasangan AccessKey.
Kirim pesan normal
Kode sampel berikut menunjukkan cara mengirim pesan normal menggunakan SDK klien HTTP untuk C++:
#include <fstream>
#include <time.h>
#include "mq_http_sdk/mq_client.h"
using namespace std;
using namespace mq::http::sdk;
int main() {
MQClient mqClient(
// Titik akhir HTTP. Anda dapat memperoleh titik akhir di bagian HTTP Endpoint halaman Detail Instance di konsol ApsaraMQ for RocketMQ.
"${HTTP_ENDPOINT}",
// Pastikan variabel lingkungan ALIBABA_CLOUD_ACCESS_KEY_ID dan ALIBABA_CLOUD_ACCESS_KEY_SECRET dikonfigurasi.
// ID AccessKey yang digunakan untuk otentikasi.
System.getenv("ALIBABA_CLOUD_ACCESS_KEY_ID"),
// Rahasia AccessKey yang digunakan untuk otentikasi.
System.getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET")
);
// Topik tempat pesan diproduksi. Anda harus membuat topik di konsol ApsaraMQ for RocketMQ.
string topic = "${TOPIC}";
// ID instance tempat topik tersebut berada. Anda harus membuat instance di konsol ApsaraMQ for RocketMQ.
// Jika instance memiliki namespace, tentukan ID instance. Jika instance tidak memiliki namespace, atur parameter instanceID ke null atau string kosong. Anda dapat memperoleh namespace instance di halaman Detail Instance di konsol ApsaraMQ for RocketMQ.
string instanceId = "${INSTANCE_ID}";
MQProducerPtr producer;
if (instanceId == "") {
producer = mqClient.getProducerRef(topic);
} else {
producer = mqClient.getProducerRef(instanceId, topic);
}
try {
// Kirim empat pesan secara siklik.
for (int i = 0; i < 4; i++)
{
PublishMessageResponse pmResp;
// Isi pesan.
TopicMessage pubMsg("Halo, mq! punya kunci!");
// Atribut kustom pesan.
pubMsg.putProperty("a",std::to_string(i));
// Kunci pesan.
pubMsg.setMessageKey("MessageKey" + std::to_string(i));
producer->publishMessage(pubMsg, pmResp);
cout << "Pengiriman pesan mq berhasil. Topik adalah: " << topic
<< ", msgId adalah:" << pmResp.getMessageId()
<< ", bodyMD5 adalah:" << pmResp.getMessageBodyMD5() << endl;
}
} catch (MQServerException& me) {
cout << "Permintaan Gagal: " + me.GetErrorCode() << ", requestId adalah:" << me.GetRequestId() << endl;
return -1;
} catch (MQExceptionBase& mb) {
cout << "Permintaan Gagal: " + mb.ToString() << endl;
return -2;
}
return 0;
}Berlangganan pesan normal
Kode sampel berikut menunjukkan cara berlangganan pesan normal menggunakan SDK klien HTTP untuk C++:
#include <vector>
#include <fstream>
#include "mq_http_sdk/mq_client.h"
#ifdef _WIN32
#include <windows.h>
#else
#include <unistd.h>
#endif
using namespace std;
using namespace mq::http::sdk;
int main() {
MQClient mqClient(
// Titik akhir HTTP. Anda dapat memperoleh titik akhir di bagian HTTP Endpoint halaman Detail Instance di konsol ApsaraMQ for RocketMQ.
"${HTTP_ENDPOINT}",
// Pastikan variabel lingkungan ALIBABA_CLOUD_ACCESS_KEY_ID dan ALIBABA_CLOUD_ACCESS_KEY_SECRET dikonfigurasi.
// ID AccessKey yang digunakan untuk otentikasi.
System.getenv("ALIBABA_CLOUD_ACCESS_KEY_ID"),
// Rahasia AccessKey yang digunakan untuk otentikasi.
System.getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET")
);
// Topik tempat pesan diproduksi. Anda harus membuat topik di konsol ApsaraMQ for RocketMQ.
string topic = "${TOPIC}";
// ID grup konsumen yang Anda buat di konsol ApsaraMQ for RocketMQ.
string groupId = "${GROUP_ID}";
// ID instance tempat topik tersebut berada. Anda harus membuat instance di konsol ApsaraMQ for RocketMQ.
// Jika instance memiliki namespace, tentukan ID instance. Jika instance tidak memiliki namespace, atur parameter instanceID ke null atau string kosong. Anda dapat memperoleh namespace instance di halaman Detail Instance di konsol ApsaraMQ for RocketMQ.
string instanceId = "${INSTANCE_ID}";
MQConsumerPtr consumer;
if (instanceId == "") {
consumer = mqClient.getConsumerRef(topic, groupId);
} else {
consumer = mqClient.getConsumerRef(instanceId, topic, groupId, "");
}
do {
try {
std::vector<Message> messages;
// Konsumsi pesan dalam mode polling panjang.
// Dalam mode polling panjang, jika tidak ada pesan dalam topik yang tersedia untuk dikonsumsi, permintaan ditangguhkan pada broker selama periode waktu yang ditentukan. Jika pesan menjadi tersedia untuk dikonsumsi dalam periode waktu yang ditentukan, broker segera mengirimkan respons ke konsumen. Dalam contoh ini, nilainya ditentukan sebagai 3 detik.
consumer->consumeMessage(
3, // Jumlah maksimum pesan yang dapat dikonsumsi sekaligus. Dalam contoh ini, nilainya ditentukan sebagai 3. Nilai maksimum yang dapat Anda tentukan adalah 16.
3, // Durasi siklus polling panjang. Unit: detik. Dalam contoh ini, nilainya ditentukan sebagai 3. Nilai maksimum yang dapat Anda tentukan adalah 30.
messages
);
cout << "Konsumsi: " << messages.size() << " Pesan!" << endl;
// Logika konsumsi pesan.
std::vector<std::string> receiptHandles;
for (std::vector<Message>::iterator iter = messages.begin();
iter != messages.end(); ++iter)
{
cout << "MessageId: " << iter->getMessageId()
<< " WaktuPublikasi: " << iter->getPublishTime()
<< " Tag: " << iter->getMessageTag()
<< " Isi: " << iter->getMessageBody()
<< " WaktuKonsumsiPertama: " << iter->getFirstConsumeTime()
<< " WaktuKonsumsiBerikutnya: " << iter->getNextConsumeTime()
<< " JumlahDikonsumsi: " << iter->getConsumedTimes()
<< " Properti: " << iter->getPropertiesAsString()
<< " Kunci: " << iter->getMessageKey() << endl;
receiptHandles.push_back(iter->getReceiptHandle());
}
// Dapatkan pengakuan (ACK) dari konsumen.
// Jika broker gagal menerima ACK untuk pesan dari konsumen sebelum periode waktu yang ditentukan oleh parameter Message.NextConsumeTime berakhir, broker akan mengirimkan pesan untuk dikonsumsi lagi.
// Timestamp unik ditentukan untuk handle pesan setiap kali pesan dikonsumsi.
AckMessageResponse bdmResp;
consumer->ackMessage(receiptHandles, bdmResp);
if (!bdmResp.isSuccess()) {
// Jika handle pesan kedaluwarsa, broker tidak dapat menerima ACK untuk pesan dari konsumen.
const std::vector<AckMessageFailedItem>& failedItems =
bdmResp.getAckMessageFailedItem();
for (std::vector<AckMessageFailedItem>::const_iterator iter = failedItems.begin();
iter != failedItems.end(); ++iter)
{
cout << "AckFailedItem: " << iter->errorCode
<< " " << iter->receiptHandle << endl;
}
} else {
cout << "Ack: " << messages.size() << " pesan sukses!" << endl;
}
} catch (MQServerException& me) {
if (me.GetErrorCode() == "MessageNotExist") {
cout << "Tidak ada pesan untuk dikonsumsi! RequestId: " + me.GetRequestId() << endl;
continue;
}
cout << "Permintaan Gagal: " + me.GetErrorCode() + ".RequestId: " + me.GetRequestId() << endl;
#ifdef _WIN32
Sleep(2000);
#else
usleep(2000 * 1000);
#endif
} catch (MQExceptionBase& mb) {
cout << "Permintaan Gagal: " + mb.ToString() << endl;
#ifdef _WIN32
Sleep(2000);
#else
usleep(2000 * 1000);
#endif
}
} while(true);
}