Instance ApsaraMQ for RocketMQ 5.x kompatibel dengan klien yang menggunakan RocketMQ 1.x SDK untuk C++. Anda dapat menggunakan RocketMQ 1.x SDK untuk C++ untuk terhubung ke instance ApsaraMQ for RocketMQ 5.x guna mengirim dan menerima pesan. Topik ini menyediakan contoh kode untuk mengirim dan menerima pesan menggunakan RocketMQ 1.x SDK untuk C++.
Kami merekomendasikan penggunaan RocketMQ 5.x SDK terbaru. SDK ini sepenuhnya kompatibel dengan broker ApsaraMQ for RocketMQ 5.x dan menawarkan lebih banyak fungsi serta fitur yang ditingkatkan. Untuk informasi lebih lanjut, lihat Deskripsi Versi.
Alibaba Cloud hanya memelihara RocketMQ 3.x, 4.x, dan TCP client SDK. Kami merekomendasikan penggunaannya hanya untuk bisnis yang sudah ada.
Mengirim dan menerima pesan normal
Mengirim pesan normal
#include "ONSFactory.h"
#include "ONSClientException.h"
using namespace ons;
int main()
{
// Parameter yang diperlukan untuk membuat produser dan mengirim pesan.
ONSFactoryProperty factoryInfo;
// ID grup konsumen yang Anda buat di konsol ApsaraMQ for RocketMQ.
factoryInfo.setFactoryProperty(ONSFactoryProperty::ProducerId, "XXX");
// Titik akhir yang Anda peroleh di konsol ApsaraMQ for RocketMQ. Format titik akhir serupa dengan rmq-cn-XXXX.rmq.aliyuncs.com:8080.
// Catatan: Masukkan nama domain dan nomor port yang disediakan di konsol ApsaraMQ for RocketMQ. Jangan tambahkan awalan http:// atau https:// atau gunakan alamat IP yang telah diselesaikan.
factoryInfo.setFactoryProperty(ONSFactoryProperty::NAMESRV_ADDR, "ACCESS POINT");
// Topik yang Anda buat di konsol ApsaraMQ for RocketMQ.
factoryInfo.setFactoryProperty(ONSFactoryProperty::PublishTopics,"XXX" );
// Isi pesan.
factoryInfo.setFactoryProperty(ONSFactoryProperty::MsgContent, "XXX");
/**
* Jika Anda menggunakan titik akhir publik untuk mengakses instance ApsaraMQ for RocketMQ, Anda harus mengonfigurasi parameter AccessKey dan SecretKey. Nilai parameter AccessKey adalah nama pengguna instance, dan nilai parameter SecretKey adalah kata sandi instance. Anda dapat memperoleh nama pengguna dan kata sandi di tab Otentikasi Cerdas halaman Kontrol Akses yang sesuai dengan instance di konsol ApsaraMQ for RocketMQ.
* Catatan: Jangan masukkan pasangan AccessKey dari akun Alibaba Cloud Anda.
* Jika klien instance ApsaraMQ for RocketMQ ditempatkan pada instance ECS dan Anda ingin mengakses instance ApsaraMQ for RocketMQ di jaringan internal, Anda tidak perlu menentukan nama pengguna atau kata sandi instance. Broker ApsaraMQ for RocketMQ secara otomatis mendapatkan nama pengguna dan kata sandi berdasarkan informasi VPC.
* Jika instance tersebut adalah instance serverless ApsaraMQ for RocketMQ, Anda harus menentukan nama pengguna dan kata sandi untuk mengakses instance melalui Internet. Jika Anda mengaktifkan fitur autentikasi bebas di VPC untuk instance serverless dan mengakses instance di VPC, Anda tidak perlu menentukan nama pengguna atau kata sandi.
*/
// Anda dapat memperoleh nama pengguna dan kata sandi di tab Otentikasi Cerdas halaman Kontrol Akses yang sesuai dengan instance di konsol ApsaraMQ for RocketMQ.
factoryInfo.setFactoryProperty(ONSFactoryProperty::AccessKey, "INSTANCE USER NAME");
factoryInfo.setFactoryProperty(ONSFactoryProperty::SecretKey, "INSTANCE PASSWORD" );
// Catatan: Jika Anda menggunakan RocketMQ 1.x atau 2.x SDK untuk C++ untuk mengakses instance ApsaraMQ for RocketMQ 5.x, Anda tidak perlu menentukan ID instance. Jika tidak, akses akan gagal.
// Buat produser.
Producer *pProducer = ONSFactory::getInstance()->createProducer(factoryInfo);
// Sebelum Anda mengirim pesan, panggil metode start() hanya sekali untuk memulai produser.
pProducer->start();
Message msg(
// Topik tempat pesan termasuk.
factoryInfo.getPublishTopics(),
// Tag pesan. Tag pesan mirip dengan tag Gmail dan dapat digunakan oleh konsumen untuk memfilter pesan di broker ApsaraMQ for RocketMQ.
"TagA",
// Tubuh pesan. ApsaraMQ for RocketMQ tidak memproses tubuh pesan. Produser dan konsumen harus menyetujui metode yang digunakan untuk serialisasi dan deserialisasi tubuh pesan. Anda tidak dapat meninggalkan parameter ini kosong.
factoryInfo.getMessageContent()
);
// Kunci pesan. Kunci adalah atribut spesifik bisnis dari pesan dan harus unik secara global jika memungkinkan.
// Jika Anda tidak dapat menerima pesan seperti yang diharapkan, Anda dapat menggunakan kunci untuk menanyakan pesan di konsol ApsaraMQ for RocketMQ.
// Catatan: Pesan masih dapat dikirim dan diterima meskipun Anda tidak menentukan kunci.
msg.setKey("ORDERID_100");
// Kirim pesan. Jika tidak ada pengecualian yang dilemparkan, pesan dikirim.
try
{
SendResultONS sendResult = pProducer->send(msg);
}
catch(ONSClientException & e)
{
// Metode yang digunakan untuk menangani pengecualian.
}
// Sebelum Anda keluar dari aplikasi, matikan konsumen. Jika tidak, masalah seperti kebocoran memori mungkin terjadi.
pProducer->shutdown();
return 0;
}Berlangganan pesan normal
#include "ONSFactory.h"
#include <iostream>
#include <thread>
#include <mutex>
using namespace ons;
std::mutex console_mtx;
class ExampleMessageListener : public MessageListener {
public:
Action consume(Message& message, ConsumeContext& context) {
// Konsumen menerima pesan dan mencoba mengonsumsinya. Setelah pesan dikonsumsi, CommitMessage dikembalikan.
// Jika konsumen gagal mengonsumsi pesan atau ingin mengonsumsi pesan lagi, ReconsumeLater dikembalikan. Kemudian, pesan dikirim ulang ke konsumen setelah periode waktu yang telah ditentukan.
std::lock_guard<std::mutex> lk(console_mtx);
std::cout << "Menerima pesan. Topik: " << message.getTopic() << ", MsgId: "
<< message.getMsgID() << std::endl;
return CommitMessage;
}
};
int main(int argc, char* argv[]) {
std::cout << "=======Sebelum mengonsumsi pesan=======" << std::endl;
ONSFactoryProperty factoryInfo;
// ID grup konsumen yang Anda buat di konsol ApsaraMQ for RocketMQ. Instance ApsaraMQ for RocketMQ menggunakan ID grup alih-alih ID produser dan ID konsumen. Mengonfigurasi parameter ini membantu memastikan kompatibilitas dengan versi sebelumnya.
factoryInfo.setFactoryProperty(ONSFactoryProperty::ConsumerId, "GID_XXX");
/**
* Jika Anda menggunakan titik akhir publik untuk mengakses instance ApsaraMQ for RocketMQ, Anda harus mengonfigurasi parameter AccessKey dan SecretKey. Nilai parameter AccessKey adalah nama pengguna instance, dan nilai parameter SecretKey adalah kata sandi instance. Anda dapat memperoleh nama pengguna dan kata sandi di tab Otentikasi Cerdas halaman Kontrol Akses yang sesuai dengan instance di konsol ApsaraMQ for RocketMQ.
* Catatan: Jangan masukkan pasangan AccessKey dari akun Alibaba Cloud Anda.
* Jika klien instance ApsaraMQ for RocketMQ ditempatkan pada instance ECS dan Anda ingin mengakses instance ApsaraMQ for RocketMQ di jaringan internal, Anda tidak perlu menentukan nama pengguna atau kata sandi instance. Broker ApsaraMQ for RocketMQ secara otomatis mendapatkan nama pengguna dan kata sandi berdasarkan informasi VPC.
* Jika instance tersebut adalah instance serverless ApsaraMQ for RocketMQ, Anda harus menentukan nama pengguna dan kata sandi untuk mengakses instance melalui Internet. Jika Anda mengaktifkan fitur autentikasi bebas di VPC untuk instance serverless dan mengakses instance di VPC, Anda tidak perlu menentukan nama pengguna atau kata sandi.
*/
// Anda dapat memperoleh nama pengguna dan kata sandi di tab Otentikasi Cerdas halaman Kontrol Akses yang sesuai dengan instance di konsol ApsaraMQ for RocketMQ.
factoryInfo.setFactoryProperty(ONSFactoryProperty::AccessKey, "INSTANCE USER NAME");
factoryInfo.setFactoryProperty(ONSFactoryProperty::SecretKey, "INSTANCE PASSWORD" );
// Titik akhir yang Anda peroleh di konsol ApsaraMQ for RocketMQ. Format titik akhir serupa dengan rmq-cn-XXXX.rmq.aliyuncs.com:8080.
// Catatan: Masukkan nama domain dan nomor port yang disediakan di konsol ApsaraMQ for RocketMQ. Jangan tambahkan awalan http:// atau https:// atau gunakan alamat IP yang telah diselesaikan.
factoryInfo.setFactoryProperty(ONSFactoryProperty::NAMESRV_ADDR, "ACCESS POINT");
// Catatan: Jika Anda menggunakan RocketMQ 1.x atau 2.x SDK untuk C++ untuk mengakses instance ApsaraMQ for RocketMQ 5.x, Anda tidak perlu menentukan ID instance. Jika tidak, akses akan gagal.
PushConsumer *consumer = ONSFactory::getInstance()->createPushConsumer(factoryInfo);
// Topik yang Anda buat di konsol ApsaraMQ for RocketMQ.
const char* topic_1 = "topic-1";
// Berlangganan pesan yang dilampirkan dengan tag-1 di topik-1.
const char* tag_1 = "tag-1";
const char* topic_2 = "topic-2";
// Berlangganan semua pesan di topik-2.
const char* tag_2 = "*";
// Gunakan fungsi pendengar kustom untuk memproses pesan yang diterima dan kembalikan hasil pemrosesan.
ExampleMessageListener * message_listener = new ExampleMessageListener();
consumer->subscribe(topic_1, tag_1, message_listener);
consumer->subscribe(topic_2, tag_2, message_listener);
// Persiapan selesai. Anda harus memanggil fungsi start() untuk memulai konsumen.
consumer->start();
// Pertahankan thread tetap berjalan dan jangan matikan konsumen.
std::this_thread::sleep_for(std::chrono::milliseconds(60 * 1000));
consumer->shutdown();
delete message_listener;
std::cout << "=======Setelah mengonsumsi pesan======" << std::endl;
return 0;
}Mengirim dan menerima pesan terurut
Mengirim pesan terurut
#include "ONSFactory.h"
#include "ONSClientException.h"
#include <iostream>
using namespace ons;
int main()
{
// Parameter yang diperlukan untuk membuat dan menggunakan produser.
ONSFactoryProperty factoryInfo;
// ID grup konsumen yang Anda buat di konsol ApsaraMQ for RocketMQ.
factoryInfo.setFactoryProperty(ONSFactoryProperty::ProducerId, "XXX");
// Titik akhir yang Anda peroleh di konsol ApsaraMQ for RocketMQ. Format titik akhir serupa dengan rmq-cn-XXXX.rmq.aliyuncs.com:8080.
// Catatan: Masukkan nama domain dan nomor port yang disediakan di konsol ApsaraMQ for RocketMQ. Jangan tambahkan awalan http:// atau https:// atau gunakan alamat IP yang telah diselesaikan.
factoryInfo.setFactoryProperty(ONSFactoryProperty::NAMESRV_ADDR, "ACCESS POINT");
// Topik yang Anda buat di konsol ApsaraMQ for RocketMQ.
factoryInfo.setFactoryProperty(ONSFactoryProperty::PublishTopics,"XXX" );
// Isi pesan.
factoryInfo.setFactoryProperty(ONSFactoryProperty::MsgContent, "XXX");
/**
* Jika Anda menggunakan titik akhir publik untuk mengakses instance ApsaraMQ for RocketMQ, Anda harus mengonfigurasi parameter AccessKey dan SecretKey. Nilai parameter AccessKey adalah nama pengguna instance, dan nilai parameter SecretKey adalah kata sandi instance. Anda dapat memperoleh nama pengguna dan kata sandi di tab Otentikasi Cerdas halaman Kontrol Akses yang sesuai dengan instance di konsol ApsaraMQ for RocketMQ.
* Catatan: Jangan masukkan pasangan AccessKey dari akun Alibaba Cloud Anda.
* Jika klien instance ApsaraMQ for RocketMQ ditempatkan pada instance ECS dan Anda ingin mengakses instance ApsaraMQ for RocketMQ di jaringan internal, Anda tidak perlu menentukan nama pengguna atau kata sandi instance. Broker ApsaraMQ for RocketMQ secara otomatis mendapatkan nama pengguna dan kata sandi berdasarkan informasi VPC.
* Jika instance tersebut adalah instance serverless ApsaraMQ for RocketMQ, Anda harus menentukan nama pengguna dan kata sandi untuk mengakses instance melalui Internet. Jika Anda mengaktifkan fitur autentikasi bebas di VPC untuk instance serverless dan mengakses instance di VPC, Anda tidak perlu menentukan nama pengguna atau kata sandi.
*/
// Anda dapat memperoleh nama pengguna dan kata sandi di tab Otentikasi Cerdas halaman Kontrol Akses yang sesuai dengan instance di konsol ApsaraMQ for RocketMQ.
factoryInfo.setFactoryProperty(ONSFactoryProperty::AccessKey, "INSTANCE USER NAME");
factoryInfo.setFactoryProperty(ONSFactoryProperty::SecretKey, "INSTANCE PASSWORD" );
// Catatan: Jika Anda menggunakan RocketMQ 1.x atau 2.x SDK untuk C++ untuk mengakses instance ApsaraMQ for RocketMQ 5.x, Anda tidak perlu menentukan ID instance. Jika tidak, akses akan gagal.
// Buat produser.
OrderProducer *pProducer = ONSFactory::getInstance()->createOrderProducer(factoryInfo);
// Sebelum Anda mengirim pesan, panggil metode start() hanya sekali untuk memulai produser.
pProducer->start();
Message msg(
// Topik tempat pesan termasuk.
factoryInfo.getPublishTopics(),
// Tag pesan. Tag pesan mirip dengan tag Gmail dan dapat digunakan oleh konsumen untuk memfilter pesan di broker ApsaraMQ for RocketMQ.
"TagA",
// Tubuh pesan. Tubuh pesan adalah data dalam format biner. ApsaraMQ for RocketMQ tidak memproses tubuh pesan. Produser dan konsumen harus menyetujui metode yang digunakan untuk serialisasi dan deserialisasi tubuh pesan.
factoryInfo.getMessageContent()
);
// Kunci pesan. Kunci adalah atribut spesifik bisnis dari pesan dan harus unik secara global.
// Jika Anda tidak dapat menerima pesan seperti yang diharapkan, Anda dapat menggunakan kunci untuk menanyakan pesan di konsol ApsaraMQ for RocketMQ.
// Catatan: Pesan masih dapat dikirim dan diterima meskipun Anda tidak menentukan kunci.
msg.setKey("ORDERID_100");
// Bidang kunci yang digunakan untuk mengidentifikasi partisi untuk pesan terurut berpartisi.
// Bidang ini dapat diatur ke string non-kosong jika pesan tersebut adalah pesan terurut global.
std::string shardingKey = "abc";
// Pesan yang memiliki kunci sharding yang sama dikirim secara berurutan.
try
{
// Kirim pesan. Jika tidak ada pengecualian yang dilemparkan, pesan dikirim.
SendResultONS sendResult = pProducer->send(msg, shardingKey);
std::cout << "pengiriman berhasil" << std::endl;
}
catch(ONSClientException & e)
{
// Metode yang digunakan untuk menangani pengecualian.
}
// Sebelum Anda keluar dari aplikasi, matikan konsumen. Jika tidak, masalah seperti kebocoran memori mungkin terjadi.
pProducer->shutdown();
return 0;
} Berlangganan pesan terurut
#include "ONSFactory.h"
using namespace std;
using namespace ons;
// Buat instance konsumen.
// Setelah konsumen dorongan menarik pesan, fungsi consumeMessage dari instance dipanggil.
class ONSCLIENT_API MyMsgListener : public MessageOrderListener
{
public:
MyMsgListener()
{
}
virtual ~MyMsgListener()
{
}
virtual OrderAction consume(Message &message, ConsumeOrderContext &context)
{
// Konsumsi pesan berdasarkan kebutuhan bisnis Anda.
return Success; //CONSUME_SUCCESS;
}
};
int main(int argc, char* argv[])
{
// Parameter yang diperlukan untuk membuat dan menggunakan konsumen pesanan.
ONSFactoryProperty factoryInfo;
// ID grup konsumen yang Anda buat di konsol ApsaraMQ for RocketMQ.
factoryInfo.setFactoryProperty(ONSFactoryProperty::ConsumerId, "XXX");
// Topik yang Anda buat di konsol ApsaraMQ for RocketMQ.
factoryInfo.setFactoryProperty(ONSFactoryProperty::PublishTopics,"XXX" );
/**
* Jika Anda menggunakan titik akhir publik untuk mengakses instance ApsaraMQ for RocketMQ, Anda harus mengonfigurasi parameter AccessKey dan SecretKey. Nilai parameter AccessKey adalah nama pengguna instance, dan nilai parameter SecretKey adalah kata sandi instance. Anda dapat memperoleh nama pengguna dan kata sandi di tab Otentikasi Cerdas halaman Kontrol Akses yang sesuai dengan instance di konsol ApsaraMQ for RocketMQ.
* Catatan: Jangan masukkan pasangan AccessKey dari akun Alibaba Cloud Anda.
* Jika klien instance ApsaraMQ for RocketMQ ditempatkan pada instance ECS dan Anda ingin mengakses instance ApsaraMQ for RocketMQ di jaringan internal, Anda tidak perlu menentukan nama pengguna atau kata sandi instance. Broker ApsaraMQ for RocketMQ secara otomatis mendapatkan nama pengguna dan kata sandi berdasarkan informasi VPC.
* Jika instance tersebut adalah instance serverless ApsaraMQ for RocketMQ, Anda harus menentukan nama pengguna dan kata sandi untuk mengakses instance melalui Internet. Jika Anda mengaktifkan fitur autentikasi bebas di VPC untuk instance serverless dan mengakses instance di VPC, Anda tidak perlu menentukan nama pengguna atau kata sandi.
*/
// Anda dapat memperoleh nama pengguna dan kata sandi di tab Otentikasi Cerdas halaman Kontrol Akses yang sesuai dengan instance di konsol ApsaraMQ for RocketMQ.
factoryInfo.setFactoryProperty(ONSFactoryProperty::AccessKey, "INSTANCE USER NAME");
factoryInfo.setFactoryProperty(ONSFactoryProperty::SecretKey, "INSTANCE PASSWORD" );
// Titik akhir yang Anda peroleh di konsol ApsaraMQ for RocketMQ. Format titik akhir serupa dengan rmq-cn-XXXX.rmq.aliyuncs.com:8080.
// Catatan: Masukkan nama domain dan nomor port yang disediakan di konsol ApsaraMQ for RocketMQ. Jangan tambahkan awalan http:// atau https:// atau gunakan alamat IP yang telah diselesaikan.
factoryInfo.setFactoryProperty(ONSFactoryProperty::NAMESRV_ADDR, "ACCESS POINT");
// Catatan: Jika Anda menggunakan RocketMQ 1.x atau 2.x SDK untuk C++ untuk mengakses instance ApsaraMQ for RocketMQ 5.x, Anda tidak perlu menentukan ID instance. Jika tidak, akses akan gagal.
// Buat konsumen pesanan.
OrderConsumer* orderConsumer = ONSFactory::getInstance()->createOrderConsumer(factoryInfo);
MyMsgListener msglistener;
// Topik dan tag pesan yang dikonsumsi oleh konsumen pesanan.
orderConsumer->subscribe(factoryInfo.getPublishTopics(), "*",&msglistener );
// Daftarkan instance yang digunakan untuk mendengarkan pesan. Setelah konsumen pesanan menarik pesan, fungsi consumeMessage dari kelas pendengar pesan dipanggil.
//Mulai konsumen pesanan.
orderConsumer->start();
for(volatile int i = 0; i < 10; ++i) {
// tunggu
}
// Sebelum Anda keluar dari aplikasi, matikan konsumen. Jika tidak, masalah seperti kebocoran memori mungkin terjadi.
orderConsumer->shutdown();
return 0;
} Mengirim dan menerima pesan terjadwal atau tertunda
Mengirim dan menerima pesan terjadwal atau tertunda
#include "ONSFactory.h"
#include "ONSClientException.h"
#include <windows.h>
using namespace ons;
int main()
{
// Parameter yang diperlukan untuk membuat produser dan mengirim pesan.
ONSFactoryProperty factoryInfo;
// ID grup konsumen yang Anda buat di konsol ApsaraMQ for RocketMQ.
factoryInfo.setFactoryProperty(ONSFactoryProperty::ProducerId, "XXX");
// Titik akhir yang Anda peroleh di konsol ApsaraMQ for RocketMQ. Format titik akhir serupa dengan rmq-cn-XXXX.rmq.aliyuncs.com:8080.
// Catatan: Masukkan nama domain dan nomor port yang disediakan di konsol ApsaraMQ for RocketMQ. Jangan tambahkan awalan http:// atau https:// atau gunakan alamat IP yang telah diselesaikan.
factoryInfo.setFactoryProperty(ONSFactoryProperty::NAMESRV_ADDR, "ACCESS POINT");
// Topik yang Anda buat di konsol ApsaraMQ for RocketMQ.
factoryInfo.setFactoryProperty(ONSFactoryProperty::PublishTopics,"XXX" );
// Isi pesan.
factoryInfo.setFactoryProperty(ONSFactoryProperty::MsgContent, "XXX");
/**
* Jika Anda menggunakan titik akhir publik untuk mengakses instance ApsaraMQ for RocketMQ, Anda harus mengonfigurasi parameter AccessKey dan SecretKey. Nilai parameter AccessKey adalah nama pengguna instance, dan nilai parameter SecretKey adalah kata sandi instance. Anda dapat memperoleh nama pengguna dan kata sandi di tab Otentikasi Cerdas halaman Kontrol Akses yang sesuai dengan instance di konsol ApsaraMQ for RocketMQ.
* Catatan: Jangan masukkan pasangan AccessKey dari akun Alibaba Cloud Anda.
* Jika klien instance ApsaraMQ for RocketMQ ditempatkan pada instance ECS dan Anda ingin mengakses instance ApsaraMQ for RocketMQ di jaringan internal, Anda tidak perlu menentukan nama pengguna atau kata sandi instance. Broker ApsaraMQ for RocketMQ secara otomatis mendapatkan nama pengguna dan kata sandi berdasarkan informasi VPC.
* Jika instance tersebut adalah instance serverless ApsaraMQ for RocketMQ, Anda harus menentukan nama pengguna dan kata sandi untuk mengakses instance melalui Internet. Jika Anda mengaktifkan fitur autentikasi bebas di VPC untuk instance serverless dan mengakses instance di VPC, Anda tidak perlu menentukan nama pengguna atau kata sandi.
*/
// Anda dapat memperoleh nama pengguna dan kata sandi di tab Otentikasi Cerdas halaman Kontrol Akses yang sesuai dengan instance di konsol ApsaraMQ for RocketMQ.
factoryInfo.setFactoryProperty(ONSFactoryProperty::AccessKey, "INSTANCE USER NAME");
factoryInfo.setFactoryProperty(ONSFactoryProperty::SecretKey, "INSTANCE PASSWORD" );
// Catatan: Jika Anda menggunakan RocketMQ 1.x atau 2.x SDK untuk C++ untuk mengakses instance ApsaraMQ for RocketMQ 5.x, Anda tidak perlu menentukan ID instance. Jika tidak, akses akan gagal.
// Buat produser.
Producer *pProducer = ONSFactory::getInstance()->createProducer(factoryInfo);
// Sebelum Anda mengirim pesan, panggil metode start() hanya sekali untuk memulai produser.
pProducer->start();
Message msg(
// Topik tempat pesan termasuk.
factoryInfo.getPublishTopics(),
// Tag pesan. Tag pesan mirip dengan tag Gmail dan dapat digunakan oleh konsumen untuk memfilter pesan di broker ApsaraMQ for RocketMQ.
"TagA",
// Tubuh pesan. ApsaraMQ for RocketMQ tidak memproses tubuh pesan. Produser dan konsumen harus menyetujui metode yang digunakan untuk serialisasi dan deserialisasi tubuh pesan. Anda tidak dapat meninggalkan parameter ini kosong.
factoryInfo.getMessageContent()
);
// Kunci pesan. Kunci adalah atribut spesifik bisnis dari pesan dan harus unik secara global jika memungkinkan.
// Jika Anda tidak dapat menerima pesan seperti yang diharapkan, Anda dapat menggunakan kunci untuk menanyakan pesan di konsol ApsaraMQ for RocketMQ.
// Catatan: Pesan masih dapat dikirim dan diterima meskipun Anda tidak menentukan kunci.
msg.setKey("ORDERID_100");
// Waktu ketika broker ApsaraMQ for RocketMQ mengirimkan pesan ke konsumen. Unit: milidetik. Pesan hanya dapat dikonsumsi setelah waktu yang ditentukan. Dalam contoh ini, pesan dapat dikonsumsi 3 detik kemudian.
long deliverTime = GetTickCount64() + 3000;
msg.setStartDeliverTime(deliverTime);
// Kirim pesan. Jika tidak ada pengecualian yang dilemparkan, pesan dikirim.
try
{
SendResultONS sendResult = pProducer->send(msg);
}
catch(ONSClientException & e)
{
// Metode yang digunakan untuk menangani pengecualian.
}
// Sebelum Anda keluar dari aplikasi, matikan konsumen. Jika tidak, masalah seperti kebocoran memori mungkin terjadi.
pProducer->shutdown();
return 0;
}
Berlangganan pesan terjadwal atau tertunda
Contoh kode untuk berlangganan pesan terjadwal atau tertunda sama dengan contoh kode untuk berlangganan pesan normal. Untuk informasi lebih lanjut, lihat Mengirim dan Menerima Pesan Normal.
Mengirim dan menerima pesan transaksional
Mengirim pesan transaksional
Kirim pesan setengah dan jalankan transaksi lokal yang sesuai. Contoh kode:
#include "ONSFactory.h" #include "ONSClientException.h" using namespace ons; class MyLocalTransactionExecuter : LocalTransactionExecuter { MyLocalTransactionExecuter() { } ~MyLocalTransactionExecuter() { } virtual TransactionStatus execute(Message &value) { // ID pesan. Dua pesan dapat memiliki tubuh pesan yang sama tetapi tidak ID yang sama. Anda tidak dapat menanyakan ID pesan saat ini di konsol ApsaraMQ for RocketMQ. ) string msgId = value.getMsgID(); // Hitung tubuh pesan menggunakan algoritma seperti CRC32 dan MD5. // ID pesan dan ID CRC32 digunakan untuk mencegah pesan duplikat. // Anda tidak perlu menentukan ID pesan atau ID CRC32 jika bisnis Anda bersifat idempoten. Jika tidak, tentukan ID pesan atau ID CRC32 untuk memastikan idempotensi. // Untuk mencegah pesan duplikat, kami merekomendasikan Anda menghitung tubuh pesan menggunakan algoritma CRC32 atau MD5. TransactionStatus transactionStatus = Unknow; try { boolean isCommit = Hasil eksekusi transaksi lokal; if (isCommit) { // Komit pesan jika transaksi lokal dieksekusi. transactionStatus = CommitTransaction; } else { // Gulung balik pesan jika transaksi lokal gagal dieksekusi. transactionStatus = RollbackTransaction; } } catch (...) { //penanganan pengecualian } return transactionStatus; } } int main(int argc, char* argv[]) { // Parameter yang diperlukan untuk membuat produser dan mengirim pesan. ONSFactoryProperty factoryInfo; // ID grup konsumen yang Anda buat di konsol ApsaraMQ for RocketMQ. factoryInfo.setFactoryProperty(ONSFactoryProperty::ProducerId, "XXX"); // Titik akhir yang Anda peroleh di konsol ApsaraMQ for RocketMQ. Format titik akhir serupa dengan rmq-cn-XXXX.rmq.aliyuncs.com:8080. // Catatan: Masukkan nama domain dan nomor port yang disediakan di konsol ApsaraMQ for RocketMQ. Jangan tambahkan awalan http:// atau https:// atau gunakan alamat IP yang telah diselesaikan. factoryInfo.setFactoryProperty(ONSFactoryProperty::NAMESRV_ADDR, "ACCESS POINT"); // Topik yang Anda buat di konsol ApsaraMQ for RocketMQ. factoryInfo.setFactoryProperty(ONSFactoryProperty::PublishTopics,"XXX" ); // Isi pesan. factoryInfo.setFactoryProperty(ONSFactoryProperty::MsgContent, "XXX"); /** * Jika Anda menggunakan titik akhir publik untuk mengakses instance ApsaraMQ for RocketMQ, Anda harus mengonfigurasi parameter AccessKey dan SecretKey. Nilai parameter AccessKey adalah nama pengguna instance, dan nilai parameter SecretKey adalah kata sandi instance. Anda dapat memperoleh nama pengguna dan kata sandi di tab Otentikasi Cerdas halaman Kontrol Akses yang sesuai dengan instance di konsol ApsaraMQ for RocketMQ. * Catatan: Jangan masukkan pasangan AccessKey dari akun Alibaba Cloud Anda. * Jika klien instance ApsaraMQ for RocketMQ ditempatkan pada instance ECS dan Anda ingin mengakses instance ApsaraMQ for RocketMQ di jaringan internal, Anda tidak perlu menentukan nama pengguna atau kata sandi instance. Broker ApsaraMQ for RocketMQ secara otomatis mendapatkan nama pengguna dan kata sandi berdasarkan informasi VPC. * Jika instance tersebut adalah instance serverless ApsaraMQ for RocketMQ, Anda harus menentukan nama pengguna dan kata sandi untuk mengakses instance melalui Internet. Jika Anda mengaktifkan fitur autentikasi bebas di VPC untuk instance serverless dan mengakses instance di VPC, Anda tidak perlu menentukan nama pengguna atau kata sand ma sandi. */ // Anda dapat memperoleh nama pengguna dan kata sandi di tab Otentikasi Cerdas halaman Kontrol Akses yang sesuai dengan instance di konsol ApsaraMQ for RocketMQ. factoryInfo.setFactoryProperty(ONSFactoryProperty::AccessKey, "INSTANCE USER NAME"); factoryInfo.setFactoryProperty(ONSFactoryProperty::SecretKey, "INSTANCE PASSWORD" ); // Catatan: Jika Anda menggunakan RocketMQ 1.x atau 2.x SDK untuk C++ untuk mengakses instance ApsaraMQ for RocketMQ 5.x, Anda tidak perlu menentukan ID instance. Jika tidak, akses akan gagal. // Buat produser. ApsaraMQ for RocketMQ tidak melepaskan pChecker. Anda harus melepaskan pChecker sendiri. MyLocalTransactionChecker *pChecker = new MyLocalTransactionChecker(); g_producer = ONSFactory::getInstance()->createTransactionProducer(factoryInfo,pChecker); // Sebelum Anda mengirim pesan, panggil metode start() hanya sekali untuk memulai produser. pProducer->start(); Message msg( // Topik tempat pesan termasuk. factoryInfo.getPublishTopics(), // Tag pesan. Tag pesan mirip dengan tag Gmail dan dapat digunakan oleh konsumen untuk memfilter pesan di broker ApsaraMQ for RocketMQ. "TagA", // Tubuh pesan. ApsaraMQ for RocketMQ tidak memproses tubuh pesan. Produser dan konsumen harus menyetujui metode yang digunakan untuk serialisasi dan deserialisasi tubuh pesan. Anda tidak dapat meninggalkan parameter ini kosong. factoryInfo.getMessageContent() ); // Kunci pesan. Kunci adalah atribut spesifik bisnis dari pesan dan harus unik secara global jika memungkinkan. // Jika Anda tidak dapat menerima pesan seperti yang diharapkan, Anda dapat menggunakan kunci untuk menanyakan pesan di konsol ApsaraMQ for RocketMQ. // Catatan: Pesan masih dapat dikirim dan diterima meskipun Anda tidak menentukan kunci. msg.setKey("ORDERID_100"); // Kirim pesan. Jika tidak ada pengecualian yang dilemparkan, pesan dikirim. try { // ApsaraMQ for RocketMQ tidak melepaskan pExecuter. Anda harus melepaskan pExecuter sendiri. MyLocalTransactionExecuter pExecuter = new MyLocalTransactionExecuter(); SendResultONS sendResult = pProducer->send(msg,pExecuter); } catch(ONSClientException & e) { // Metode yang digunakan untuk menangani pengecualian. } // Sebelum Anda keluar dari aplikasi, matikan konsumen. Jika tidak, masalah seperti kebocoran memori mungkin terjadi. pProducer->shutdown(); return 0; }Komit status pesan transaksional. Contoh kode:
class MyLocalTransactionChecker : LocalTransactionChecker { MyLocalTransactionChecker() { } ~MyLocalTransactionChecker() { } virtual TransactionStatus check(Message &value) { // ID pesan. Dua pesan dapat memiliki tubuh pesan yang sama tetapi tidak ID yang sama. Anda tidak dapat menanyakan ID pesan saat ini di konsol ApsaraMQ for RocketMQ. string msgId = value.getMsgID(); // Hitung tubuh pesan menggunakan algoritma seperti CRC32 dan MD5. // ID pesan dan ID CRC32 digunakan untuk mencegah pesan duplikat. // Anda tidak perlu menentukan ID pesan atau ID CRC32 jika bisnis Anda bersifat idempoten. Jika tidak, tentukan ID pesan atau ID CRC32 untuk memastikan idempotensi. // Untuk mencegah pesan duplikat, kami merekomendasikan Anda menghitung tubuh pesan menggunakan algoritma CRC32 atau MD5. TransactionStatus transactionStatus = Unknow; try { boolean isCommit = Hasil eksekusi transaksi lokal; if (isCommit) { // Komit pesan jika transaksi lokal dieksekusi. transactionStatus = CommitTransaction; } else { // Gulung balik pesan jika transaksi lokal gagal dieksekusi. transactionStatus = RollbackTransaction; } } catch(...) { //penanganan kesalahan pengecualian } return transactionStatus; } }
Berlangganan pesan transaksional
Contoh kode untuk berlangganan pesan transaksional sama dengan contoh kode untuk berlangganan pesan normal. Untuk informasi lebih lanjut, lihat Mengirim dan Menerima Pesan Normal.