Pesan terurut, juga dikenal sebagai pesan first in, first out (FIFO), adalah jenis pesan yang disediakan oleh ApsaraMQ for RocketMQ. Jenis pesan ini dipublikasikan dan dikonsumsi dalam urutan yang ketat. Topik ini memberikan contoh kode berdasarkan SDK klien TCP untuk C++ Edisi Komunitas yang dapat digunakan untuk mengirim dan menerima pesan terurut.
Pesan terurut dalam topik tertentu dipartisi berdasarkan kunci sharding. Pesan dalam setiap partisi dikonsumsi dalam urutan FIFO yang ketat. Kunci sharding adalah bidang kunci yang digunakan untuk mengidentifikasi partisi berbeda dalam pesan terurut. Kunci sharding berbeda dari kunci pesan normal.
Untuk informasi lebih lanjut, lihat Pesan Terurut.
Prasyarat
Pustaka dinamis C++ telah diinstal. Untuk informasi lebih lanjut, lihat Instal Pustaka Dinamis C++.
Pasangan AccessKey telah dibuat untuk akun Alibaba Cloud Anda. Untuk informasi lebih lanjut, lihat Buat Pasangan AccessKey.
Kirim pesan terurut
Broker ApsaraMQ for RocketMQ menentukan urutan pembuatan pesan berdasarkan urutan pengiriman menggunakan satu produser atau thread. Jika pengirim menggunakan beberapa produser atau thread secara bersamaan, urutan pesan ditentukan oleh urutan penerimaan pesan oleh broker ApsaraMQ for RocketMQ. Urutan ini mungkin berbeda dari urutan pengiriman di sisi bisnis.
Salin kode berikut ke file OrderProducerDemo.cpp. Dalam file ini, konfigurasikan parameter yang sesuai, jalankan perintah g++ untuk mengompilasi kode, lalu hasilkan file yang dapat dieksekusi untuk dijalankan.
g++ -o order_producer_demo -std=c++11 -lz -lrocketmq OrderProducerDemo.cppKode contoh berikut menunjukkan cara mengirim pesan terurut menggunakan SDK klien TCP untuk C++ Edisi Komunitas:
#include <iostream> #include <chrono> #include <thread> #include "DefaultMQProducer.h" using namespace std; using namespace rocketmq; class ExampleSelectMessageQueueByHash : public MessageQueueSelector { public: MQMessageQueue select(const std::vector<MQMessageQueue> &mqs, const MQMessage &msg, void *arg) { // Tentukan logika khusus untuk partisi. Sistem menghitung antrian ke mana pesan dirutekan berdasarkan parameter arg yang menentukan kunci partisi. Dalam contoh ini, parameter arg bertipe INT digunakan. int orderId = *static_cast<int *>(arg); int index = orderId % mqs.size(); return mqs[0]; } }; int main() { std::cout << "=======Sebelum mengirim pesan=======" << std::endl; // ID grup yang Anda ajukan di konsol ApsaraMQ for RocketMQ. DefaultMQProducer producer("GID_XXXXXXXX"); // Titik akhir TCP yang Anda peroleh dari halaman Detail Instans di konsol ApsaraMQ for RocketMQ. producer.setNamesrvAddr("http://MQ_INST_XXXXXXXXXX.mq-internet-access.mq-internet.aliyuncs.com:80"); // Pastikan variabel lingkungan ALIBABA_CLOUD_ACCESS_KEY_ID dan ALIBABA_CLOUD_ACCESS_KEY_SECRET telah dikonfigurasi. // ALIBABA_CLOUD_ACCESS_KEY_ID dan ALIBABA_CLOUD_ACCESS_KEY_SECRET menentukan ID AccessKey dan Rahasia AccessKey dari akun Alibaba Cloud Anda, yang digunakan untuk verifikasi identitas. // Saluran pengguna. Nilai default: ALIYUN. producer.setSessionCredentials(getenv("ALIBABA_CLOUD_ACCESS_KEY_ID"), getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET"), "ALIYUN"); // Setelah Anda mengonfigurasi parameter yang diperlukan, mulailah produser. producer.start(); auto start = std::chrono::system_clock::now(); int count = 32; // Jumlah percobaan ulang yang dapat dilakukan pesan setelah gagal dikirim. Ini memastikan pengiriman pesan yang berhasil. int retryTimes = 1; // Gunakan parameter kustom yang dikonfigurasi arg untuk menghitung antrian ke mana pesan dikirim dan kirim pesan ke antrian yang ditentukan. Parameter arg menentukan ID partisi. Untuk informasi lebih lanjut tentang cara memilih antrian pesan, lihat operasi MessageQueueSelector. ExampleSelectMessageQueueByHash *pSelector = new ExampleSelectMessageQueueByHash(); for (int i = 0; i < count; ++i) { // Sebelum Anda mengirim pesan, tentukan topik yang Anda ajukan di konsol ApsaraMQ for RocketMQ. MQMessage msg("YOUR ORDERLY TOPIC", "HiTAG", "Hello,CPP SDK, Orderly Message."); try { SendResult sendResult = producer.send(msg, pSelector, &i, 1, false); std::cout << "SendResult:" << sendResult.getSendStatus() << ", Message ID: " << sendResult.getMsgId() << "MessageQueue:" << sendResult.getMessageQueue().toString() << std::endl; this_thread::sleep_for(chrono::seconds(1)); } catch (MQException e) { std::cout << "ErrorCode: " << e.GetError() << " Exception:" << e.what() << std::endl; } } auto interval = std::chrono::system_clock::now() - start; std::cout << "Kirim " << count << " pesan OK, biaya " << std::chrono::duration_cast<std::chrono::milliseconds>(interval).count() << "ms" << std::endl; producer.shutdown(); std::cout << "=======Setelah mengirim pesan=======" << std::endl; return 0; }
Konsumsi pesan terurut
Salin kode berikut ke file OrderConsumerDemo.cpp. Dalam file ini, konfigurasikan parameter yang sesuai, jalankan perintah g++ untuk mengompilasi kode, lalu hasilkan file yang dapat dieksekusi untuk dijalankan.
g++ -o order_consumer_demo -std=c++11 -lz -lrocketmq OrderConsumerDemo.cppKode contoh berikut menunjukkan cara mengonsumsi pesan terurut menggunakan edisi komersial SDK klien TCP untuk C++:
#include <iostream> #include <thread> #include "DefaultMQPushConsumer.h" using namespace rocketmq; class ExampleOrderlyMessageListener : public MessageListenerOrderly { public: ConsumeStatus consumeMessage(const std::vector<MQMessageExt> &msgs) { for (auto item = msgs.begin(); item != msgs.end(); item++) { std::cout << "Pesan Diterima Topik:" << item->getTopic() << ", MsgId:" << item->getMsgId() << std::endl; } return CONSUME_SUCCESS; } }; int main(int argc, char *argv[]) { std::cout << "=======Sebelum mengonsumsi pesan=======" << std::endl; // ID grup yang Anda ajukan di konsol ApsaraMQ for RocketMQ. DefaultMQPushConsumer *consumer = new DefaultMQPushConsumer("GID_XXXXXXXXXXX"); // Titik akhir TCP yang Anda peroleh dari halaman Detail Instans di konsol ApsaraMQ for RocketMQ. consumer->setNamesrvAddr("http://MQ_INST_XXXXXXXXXXXXXX.mq-internet-access.mq-internet.aliyuncs.com:80"); // Pastikan variabel lingkungan ALIBABA_CLOUD_ACCESS_KEY_ID dan ALIBABA_CLOUD_ACCESS_KEY_SECRET telah dikonfigurasi. // ALIBABA_CLOUD_ACCESS_KEY_ID dan ALIBABA_CLOUD_ACCESS_KEY_SECRET menentukan ID AccessKey dan Rahasia AccessKey dari akun Alibaba Cloud Anda, yang digunakan untuk verifikasi identitas. // Saluran pengguna. Nilai default: ALIYUN. consumer->setSessionCredentials(getenv("ALIBABA_CLOUD_ACCESS_KEY_ID"), getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET"), "ALIYUN"); auto start = std::chrono::system_clock::now(); //Daftarkan pendengar Anda sendiri di sini untuk menangani pesan yang diterima. // Gunakan fungsi pendengar kustom untuk memproses pesan yang diterima dan mengembalikan hasilnya. ExampleOrderlyMessageListener *messageListener = new ExampleOrderlyMessageListener(); consumer->subscribe("YOUR ORDERLY TOPIC", "*"); consumer->registerMessageListener(messageListener); //Mulai konsumen ini // Persiapan selesai. Anda harus memanggil fungsi startup untuk memulai konsumen. // ******************************************** // 1. Sebelum Anda memulai konsumen, pastikan langganan telah dikonfigurasi. // 2. Pastikan langganan konsumen dalam grup yang sama konsisten. // ********************************************* consumer->start(); //Jaga agar utas utama tetap berjalan sampai proses selesai. // Pertahankan utas tetap berjalan dan jangan matikan konsumen. std::this_thread::sleep_for(std::chrono::seconds (60 )); consumer->shutdown(); std::cout << "=======Setelah mengonsumsi pesan======" << std::endl; return 0; }