全部产品
Search
文档中心

ApsaraMQ for RocketMQ:Kode contoh

更新时间:Jul 02, 2025

Instance ApsaraMQ for RocketMQ 5.x kompatibel dengan klien menggunakan RocketMQ 3.x atau 4.x SDK. Anda dapat menggunakan SDK ini untuk terhubung ke instance ApsaraMQ for RocketMQ 5.x guna mengirim dan menerima pesan. Topik ini menyediakan kode contoh untuk mengirim dan menerima pesan menggunakan RocketMQ 3.x atau 4.x SDK untuk C++.

Penting
  • Disarankan untuk menggunakan RocketMQ 5.x SDK versi terbaru. SDK ini sepenuhnya kompatibel dengan broker ApsaraMQ for RocketMQ 5.x dan menawarkan lebih banyak fungsi serta fitur yang ditingkatkan. Untuk informasi lebih lanjut, lihat Deskripsi Versi.

  • Alibaba Cloud hanya memelihara RocketMQ 4.x, 3.x, dan TCP client SDK. Disarankan untuk hanya menggunakannya pada bisnis yang sudah ada.

Kirim dan terima pesan normal

Kirim pesan normal

#include <iostream>
#include <chrono>
#include <thread>
#include "DefaultMQProducer.h"

using namespace std;
using namespace rocketmq;

int main() {
    std::cout << "=======Sebelum mengirim pesan=======" << std::endl;
    // ID grup yang Anda buat di konsol ApsaraMQ for RocketMQ.       
    DefaultMQProducer producer("GID_XXX");          
    // Titik akhir yang Anda peroleh di konsol ApsaraMQ for RocketMQ. Format titik akhir mirip dengan rmq-cn-XXXX.rmq.aliyuncs.com:8080. 
    // Catatan: Masukkan nama domain dan nomor port yang ditampilkan di konsol ApsaraMQ for RocketMQ. Jangan sertakan awalan http:// atau https:// atau gunakan alamat IP yang telah diselesaikan.  
    producer.setNamesrvAddr("ACCESS POINT");
    // Tentukan nama pengguna instance, kata sandi instance, dan saluran pengguna. Saluran pengguna default adalah ALIYUN. 
    // Jika Anda menghubungkan instance ApsaraMQ for RocketMQ melalui Internet, Anda harus menentukan nama pengguna dan kata sandi. Jika Anda menghubungkan instance ApsaraMQ for RocketMQ dalam virtual private cloud (VPC), Anda tidak perlu menentukan nama pengguna atau kata sandi.   
    // Jika instance adalah instance serverless ApsaraMQ for RocketMQ, Anda harus menentukan nama pengguna dan kata sandi untuk mengakses instance melalui Internet. Jika Anda mengaktifkan fitur tanpa autentikasi di VPC untuk instance serverless dan mengakses instance dalam VPC, Anda tidak perlu menentukan nama pengguna atau kata sandi. 
    // Anda bisa mendapatkan nama pengguna dan kata sandi di tab Otentikasi Cerdas halaman Kontrol Akses yang sesuai dengan instance di konsol ApsaraMQ for RocketMQ. 
    producer.setSessionCredentials("INSTANCE USER NAME","INSTANCE PASSWORD","ALIYUN");
    // Catatan: Jika Anda menggunakan RocketMQ 3.x atau 4.x SDK untuk C++ untuk mengakses instance ApsaraMQ for RocketMQ 5.0, Anda tidak perlu menentukan ID instance. Jika tidak, akses akan gagal. 

    // Mulai produser setelah Anda mengonfigurasi parameter yang diperlukan. 
    producer.start();
    auto start = std::chrono::system_clock::now();
    int count = 32;
    for (int i = 0; i < count; ++i) {
        // Topik yang digunakan untuk mengirim pesan normal. Anda harus membuat topik di konsol ApsaraMQ for RocketMQ sebelumnya. 
        MQMessage msg("YOURTOPIC","HiTAG","HelloCPPSDK.");
        try {
            SendResult sendResult = producer.send(msg);
            std::cout <<"Hasil Pengiriman:"<<sendResult.getSendStatus()<< ", ID Pesan: " << sendResult.getMsgId() << std::endl;
            this_thread::sleep_for(chrono::seconds(1));
        } catch (MQException e) {
            std::cout << "Kode Kesalahan: " << e.GetError() << " Pengecualian:" << e.what() << std::endl;
        }
    }
    auto interval = std::chrono::system_clock::now() - start;
    std::cout << "Kirim " << count << " pesan OK, membutuhkan waktu "
              << std::chrono::duration_cast<std::chrono::milliseconds>(interval).count() << "ms" << std::endl;

    producer.shutdown();
    std::cout << "=======Setelah mengirim pesan=======" << std::endl;
    return 0;
}

Berlangganan pesan normal

#include <iostream>
#include <thread>
#include "DefaultMQPushConsumer.h"

using namespace rocketmq;


class ExampleMessageListener : public MessageListenerConcurrently {
public:
    ConsumeStatus consumeMessage(const std::vector<MQMessageExt> &msgs) {
        for (auto item = msgs.begin(); item != msgs.end(); item++) {
            std::cout << "Pesan Diterima Topik:" << item->getTopic() << ", ID Pesan:" << item->getMsgId() << std::endl;
        }
        return CONSUME_SUCCESS;
    }
};

int main(int argc, char *argv[]) {
    std::cout << "=======Sebelum mengonsumsi pesan=======" << std::endl;
    // ID grup yang Anda buat di konsol ApsaraMQ for RocketMQ. 
    DefaultMQPushConsumer *consumer = new DefaultMQPushConsumer("GID_XXX");
    // Titik akhir yang Anda peroleh di konsol ApsaraMQ for RocketMQ. Format titik akhir mirip dengan rmq-cn-XXXX.rmq.aliyuncs.com:8080. 
    // Catatan: Masukkan nama domain dan nomor port yang ditampilkan di konsol ApsaraMQ for RocketMQ. Jangan sertakan awalan http:// atau https:// atau gunakan alamat IP yang telah diselesaikan. 
    consumer->setNamesrvAddr("ACCESS POINT");
    // Tentukan nama pengguna instance, kata sandi instance, dan saluran pengguna. Saluran pengguna default adalah ALIYUN. 
    // Jika Anda menghubungkan instance ApsaraMQ for RocketMQ melalui Internet, Anda harus menentukan nama pengguna dan kata sandi. Jika Anda menghubungkan instance ApsaraMQ for RocketMQ dalam VPC, Anda tidak perlu menentukan nama pengguna atau kata sandi. 
    // Jika instance adalah instance serverless ApsaraMQ for RocketMQ, Anda harus menentukan nama pengguna dan kata sandi untuk mengakses instance melalui Internet. Jika Anda mengaktifkan fitur tanpa autentikasi di VPC untuk instance serverless dan mengakses instance dalam VPC, Anda tidak perlu menentukan nama pengguna atau kata sandi. 
    // Anda bisa mendapatkan nama pengguna dan kata sandi di tab Otentikasi Cerdas halaman Kontrol Akses yang sesuai dengan instance di konsol ApsaraMQ for RocketMQ. 
    consumer->setSessionCredentials("INSTANCE USER NAME","INSTANCE PASSWORD","ALIYUN");
    // Catatan: Jika Anda menggunakan RocketMQ 3.x atau 4.x SDK untuk C++ untuk mengakses instance ApsaraMQ for RocketMQ 5.0, Anda tidak perlu menentukan ID instance. Jika tidak, akses akan gagal. 
    
    auto start = std::chrono::system_clock::now();

    // Gunakan fungsi pendengar kustom untuk memproses pesan yang diterima dan mengembalikan hasil pemrosesan. 
    ExampleMessageListener *messageListener = new ExampleMessageListener();
    consumer->subscribe("YOURTOPIC", "*");
    consumer->registerMessageListener(messageListener);

    // Persiapan selesai. Anda harus memanggil fungsi start() untuk memulai konsumen. 
    // ********************************************
    // 1. Pastikan langganan dikonfigurasi sebelum Anda memulai konsumen. 
    // 2. Pastikan langganan konsumen dalam grup konsumen yang sama sama. 
    // *********************************************
    consumer->start();

    // Biarkan thread tetap berjalan dan jangan matikan konsumen. 
    std::this_thread::sleep_for(std::chrono::milliseconds(60 * 1000));
    consumer->shutdown();
    std::cout << "=======Setelah mengonsumsi pesan======" << std::endl;
    return 0;
}

Kirim dan terima pesan terurut

Kirim pesan terurut

#include <iostream>
#include <chrono>
#include <thread>
#include "DefaultMQProducer.h"

using namespace std;
using namespace rocketmq;

class ExampleSelectMessageQueueByHash : public MessageQueueSelector {
public:
    MQMessageQueue select(const std::vector<MQMessageQueue> &mqs, const MQMessage &msg, void *arg) {
        // Untuk menyesuaikan partisi, hitung antrian ke mana pesan dirutekan berdasarkan parameter arg yang menentukan kunci partisi. Dalam contoh ini, parameter arg bertipe INT digunakan. 
        int orderId = *static_cast<int *>(arg);
        int index = orderId % mqs.size();
        return mqs[0];
    }
};

int main() {
    std::cout << "=======Sebelum mengirim pesan=======" << std::endl;
    // ID grup yang Anda buat di konsol ApsaraMQ for RocketMQ. 
    DefaultMQProducer producer("GID_XXX");
    // Titik akhir yang Anda peroleh di konsol ApsaraMQ for RocketMQ. Format titik akhir mirip dengan rmq-cn-XXXX.rmq.aliyuncs.com:8080. 
    // Catatan: Masukkan nama domain dan nomor port yang ditampilkan di konsol ApsaraMQ for RocketMQ. Jangan sertakan awalan http:// atau https:// atau gunakan alamat IP yang telah diselesaikan. 
    producer.setNamesrvAddr("ACCESS POINT");
    // Tentukan nama pengguna instance, kata sandi instance, dan saluran pengguna. Saluran pengguna default adalah ALIYUN. 
    // Jika Anda menghubungkan instance ApsaraMQ for RocketMQ melalui Internet, Anda harus menentukan nama pengguna dan kata sandi. Jika Anda menghubungkan instance ApsaraMQ for RocketMQ dalam VPC, Anda tidak perlu menentukan nama pengguna atau kata sandi. 
    // Jika instance adalah instance serverless ApsaraMQ for RocketMQ, Anda harus menentukan nama pengguna dan kata sandi untuk mengakses instance melalui Internet. Jika Anda mengaktifkan fitur tanpa autentikasi di VPC untuk instance serverless dan mengakses instance dalam VPC, Anda tidak perlu menentukan nama pengguna atau kata sandi. 
    // Anda bisa mendapatkan nama pengguna dan kata sandi di tab Otentikasi Cerdas halaman Kontrol Akses yang sesuai dengan instance di konsol ApsaraMQ for RocketMQ. 
    producer.setSessionCredentials("INSTANCE USER NAME","INSTANCE PASSWORD","ALIYUN");
    // Catatan: Jika Anda menggunakan RocketMQ 3.x atau 4.x SDK untuk C++ untuk mengakses instance ApsaraMQ for RocketMQ 5.0, Anda tidak perlu menentukan ID instance. Jika tidak, akses akan gagal. 

    // Mulai produser setelah Anda mengonfigurasi parameter yang diperlukan. 
    producer.start();
    auto start = std::chrono::system_clock::now();
    int count = 32;
    // Anda dapat mengonfigurasi jumlah percobaan ulang untuk pesan untuk memastikan bahwa pesan dikirim. 
    int retryTimes = 1;
    // Gunakan parameter arg yang telah dikonfigurasi untuk menghitung antrian ke mana pesan dikirim dan kirim pesan ke antrian yang ditentukan. Parameter arg menentukan ID partisi. Untuk informasi tentang cara memilih antrian pesan, lihat operasi MessageQueueSelector. 
    ExampleSelectMessageQueueByHash *pSelector = new ExampleSelectMessageQueueByHash();
    for (int i = 0; i < count; ++i) {
        // Topik yang digunakan untuk mengirim pesan terurut. Anda harus membuat topik di konsol ApsaraMQ for RocketMQ sebelumnya. 
        MQMessage msg("YOUR ORDERLY TOPIC", "HiTAG", "Hello,CPP SDK, Orderly Message.");
        try {
            SendResult sendResult = producer.send(msg, pSelector, &i, 1, false);
            std::cout << "Hasil Pengiriman:" << sendResult.getSendStatus() << ", ID Pesan: " << sendResult.getMsgId()
                      << "Antrian Pesan:" << sendResult.getMessageQueue().toString() << std::endl;
            this_thread::sleep_for(chrono::seconds(1));
        } catch (MQException e) {
            std::cout << "Kode Kesalahan: " << e.GetError() << " Pengecualian:" << e.what() << std::endl;
        }
    }
    auto interval = std::chrono::system_clock::now() - start;
    std::cout << "Kirim " << count << " pesan OK, membutuhkan waktu "
              << std::chrono::duration_cast<std::chrono::milliseconds>(interval).count() << "ms" << std::endl;

    producer.shutdown();
    std::cout << "=======Setelah mengirim pesan=======" << std::endl;
    return 0;
}

Berlangganan pesan terurut


#include <iostream>
#include <thread>
#include "DefaultMQPushConsumer.h"

using namespace rocketmq;


class ExampleOrderlyMessageListener : public MessageListenerOrderly {
public:
    ConsumeStatus consumeMessage(const std::vector<MQMessageExt> &msgs) {
        for (auto item = msgs.begin(); item != msgs.end(); item++) {
            std::cout << "Pesan Diterima Topik:" << item->getTopic() << ", ID Pesan:" << item->getMsgId() << std::endl;
        }
        return CONSUME_SUCCESS;
    }
};

int main(int argc, char *argv[]) {
    std::cout << "=======Sebelum mengonsumsi pesan=======" << std::endl;
    // ID grup yang Anda buat di konsol ApsaraMQ for RocketMQ. 
    DefaultMQPushConsumer *consumer = new DefaultMQPushConsumer("GID_XXX");
    // Titik akhir yang Anda peroleh di konsol ApsaraMQ for RocketMQ. Format titik akhir mirip dengan rmq-cn-XXXX.rmq.aliyuncs.com:8080. 
    // Catatan: Masukkan nama domain dan nomor port yang ditampilkan di konsol ApsaraMQ for RocketMQ. Jangan sertakan awalan http:// atau https:// atau gunakan alamat IP yang telah diselesaikan. 
    consumer->setNamesrvAddr("ACCESS POINT");
    // Tentukan nama pengguna instance, kata sandi instance, dan saluran pengguna. Saluran pengguna default adalah ALIYUN. 
    // Jika Anda menghubungkan instance ApsaraMQ for RocketMQ melalui Internet, Anda harus menentukan nama pengguna dan kata sandi. Jika Anda menghubungkan instance ApsaraMQ for RocketMQ dalam VPC, Anda tidak perlu menentukan nama pengguna atau kata sandi. 
    // Jika instance adalah instance serverless ApsaraMQ for RocketMQ, Anda harus menentukan nama pengguna dan kata sandi untuk mengakses instance melalui Internet. Jika Anda mengaktifkan fitur tanpa autentikasi di VPC untuk instance serverless dan mengakses instance dalam VPC, Anda tidak perlu menentukan nama pengguna atau kata sandi. 
    // Anda bisa mendapatkan nama pengguna dan kata sandi di tab Otentikasi Cerdas halaman Kontrol Akses yang sesuai dengan instance di konsol ApsaraMQ for RocketMQ. 
    consumer->setSessionCredentials("INSTANCE USER NAME","INSTANCE PASSWORD","ALIYUN");
    // Catatan: Jika Anda menggunakan RocketMQ 3.x atau 4.x SDK untuk C++ untuk mengakses instance ApsaraMQ for RocketMQ 5.0, Anda tidak perlu menentukan ID instance. Jika tidak, akses akan gagal. 
 
    auto start = std::chrono::system_clock::now();

    // Gunakan fungsi pendengar kustom untuk memproses pesan yang diterima dan mengembalikan hasil pemrosesan. 
    ExampleOrderlyMessageListener *messageListener = new ExampleOrderlyMessageListener();
    consumer->subscribe("YOUR ORDERLY TOPIC", "*");
    consumer->registerMessageListener(messageListener);

    // Persiapan selesai. Anda harus memanggil fungsi start() untuk memulai konsumen. 
    // ********************************************
    // 1. Pastikan langganan dikonfigurasi sebelum Anda memulai konsumen. 
    // 2. Pastikan langganan konsumen dalam grup konsumen yang sama sama. 
    // *********************************************
    consumer->start();

    // Biarkan thread tetap berjalan dan jangan matikan konsumen. 
    std::this_thread::sleep_for(std::chrono::seconds (60 ));
    consumer->shutdown();
    std::cout << "=======Setelah mengonsumsi pesan======" << std::endl;
    return 0;
}

Kirim dan terima pesan terjadwal atau tertunda

Kirim pesan terjadwal atau tertunda

#include <iostream>
#include <chrono>
#include <thread>
#include "DefaultMQProducer.h"

using namespace std;
using namespace rocketmq;

int main() {
    std::cout << "=======Sebelum mengirim pesan=======" << std::endl;
    // ID grup yang Anda buat di konsol ApsaraMQ for RocketMQ. 
    DefaultMQProducer producer("GID_XXX");
    // Titik akhir yang Anda peroleh di konsol ApsaraMQ for RocketMQ. Format titik akhir mirip dengan rmq-cn-XXXX.rmq.aliyuncs.com:8080. 
    // Catatan: Masukkan nama domain dan nomor port yang ditampilkan di konsol ApsaraMQ for RocketMQ. Jangan sertakan awalan http:// atau https:// atau gunakan alamat IP yang telah diselesaikan. 
    producer.setNamesrvAddr("ACCESS POINT");
    // Tentukan nama pengguna instance, kata sandi instance, dan saluran pengguna. Saluran pengguna default adalah ALIYUN. 
    // Jika Anda menghubungkan instance ApsaraMQ for RocketMQ melalui Internet, Anda harus menentukan nama pengguna dan kata sandi. Jika Anda menghubungkan instance ApsaraMQ for RocketMQ dalam VPC, Anda tidak perlu menentukan nama pengguna atau kata sandi. 
     // Jika instance adalah instance serverless ApsaraMQ for RocketMQ, Anda harus menentukan nama pengguna dan kata sandi untuk mengakses instance melalui Internet. Jika Anda mengaktifkan fitur tanpa autentikasi di VPC untuk instance serverless dan mengakses instance dalam VPC, Anda tidak perlu menentukan nama pengguna atau kata sandi. 
    // Anda bisa mendapatkan nama pengguna dan kata sandi di tab Otentikasi Cerdas halaman Kontrol Akses yang sesuai dengan instance di konsol ApsaraMQ for RocketMQ. 
    producer.setSessionCredentials("INSTANCE USER NAME","INSTANCE PASSWORD","ALIYUN");
    // Catatan: Jika Anda menggunakan RocketMQ 3.x atau 4.x SDK untuk C++ untuk mengakses instance ApsaraMQ for RocketMQ 5.0, Anda tidak perlu menentukan ID instance. Jika tidak, akses akan gagal. 

    // Mulai produser setelah Anda mengonfigurasi parameter yang diperlukan. 
    producer.start();
    auto start = std::chrono::system_clock::now();
    int count = 32;
    for (int i = 0; i < count; ++i) {
        // Topik yang digunakan untuk mengirim pesan terjadwal atau tertunda. Anda harus membuat topik di konsol ApsaraMQ for RocketMQ sebelumnya. 
        MQMessage msg("YOUR DELAY TOPIC", "HiTAG", "Hello,CPP SDK, Delay Message.");
        chrono::system_clock::duration d = chrono::system_clock::now().time_since_epoch();
        chrono::milliseconds mil = chrono::duration_cast<chrono::milliseconds>(d);
        // Waktu ketika broker ApsaraMQ for RocketMQ mengirimkan pesan ke konsumen. Unit: milidetik. Misalnya, jika Anda mengatur parameter ini ke 2022-08-07 16:21:00, broker mengirimkan pesan pada 16:21:00 tanggal 7 Agustus 2022. Nilainya harus lebih dari waktu saat ini. 
        Jika Anda mengatur parameter ini ke waktu yang lebih awal dari waktu saat ini, pesan langsung dikirimkan ke konsumen. 
        // Waktu terjadwal atau tertunda. Misalnya, jika Anda mengatur waktu terjadwal atau tertunda ke 10000, pesan dikirim 10 detik setelah waktu saat ini. 
        long exp = mil.count() + 10000;
        msg.setProperty("__STARTDELIVERTIME", to_string(exp));
        std::cout << "Sekarang: " << mil.count() << " Exp:" << exp << std::endl;
        try {
            SendResult sendResult = producer.send(msg);
            std::cout << "Hasil Pengiriman:" << sendResult.getSendStatus() << ", ID Pesan: " << sendResult.getMsgId()
                      << std::endl;
            this_thread::sleep_for(chrono::seconds(1));
        } catch (MQException e) {
            std::cout << "Kode Kesalahan: " << e.GetError() << " Pengecualian:" << e.what() << std::endl;
        }
    }
    auto interval = std::chrono::system_clock::now() - start;
    std::cout << "Kirim " << count << " pesan OK, membutuhkan waktu "
              << std::chrono::duration_cast<std::chrono::milliseconds>(interval).count() << "ms" << std::endl;

    producer.shutdown();
    std::cout << "=======Setelah mengirim pesan=======" << std::endl;
    return 0;
}

Berlangganan pesan terjadwal atau tertunda


#include <iostream>
#include <thread>
#include <chrono>
#include "DefaultMQPushConsumer.h"

using namespace rocketmq;
using namespace std;

class ExampleDelayMessageListener : public MessageListenerConcurrently {
public:
    ConsumeStatus consumeMessage(const std::vector<MQMessageExt> &msgs) {
        for (auto item = msgs.begin(); item != msgs.end(); item++) {
            chrono::system_clock::duration d = chrono::system_clock::now().time_since_epoch();
            chrono::milliseconds mil = chrono::duration_cast<chrono::milliseconds>(d);
            std::cout << "Sekarang: " << mil.count() << " Pesan Diterima Topik:" << item->getTopic() << ", ID Pesan:"
                      << item->getMsgId() << " Waktu Tertunda:" << item->getProperty("__STARTDELIVERTIME") << std::endl;
        }
        return CONSUME_SUCCESS;
    }
};

int main(int argc, char *argv[]) {
    std::cout << "=======Sebelum mengonsumsi pesan=======" << std::endl;
    // ID grup yang Anda buat di konsol ApsaraMQ for RocketMQ. 
    DefaultMQPushConsumer *consumer = new DefaultMQPushConsumer("GID_XXX");
    // Titik akhir yang Anda peroleh di konsol ApsaraMQ for RocketMQ. Format titik akhir mirip dengan rmq-cn-XXXX.rmq.aliyuncs.com:8080. 
    // Catatan: Masukkan nama domain dan nomor port yang ditampilkan di konsol ApsaraMQ for RocketMQ. Jangan sertakan awalan http:// atau https:// atau gunakan alamat IP yang telah diselesaikan. 
    consumer->setNamesrvAddr("ACCESS POINT");
    // Tentukan nama pengguna instance, kata sandi instance, dan saluran pengguna. Saluran pengguna default adalah ALIYUN. 
    // Jika Anda menghubungkan instance ApsaraMQ for RocketMQ melalui Internet, Anda harus menentukan nama pengguna dan kata sandi. Jika Anda menghubungkan instance ApsaraMQ for RocketMQ dalam VPC, Anda tidak perlu menentukan nama pengguna atau kata sandi. 
    // Jika instance adalah instance serverless ApsaraMQ for RocketMQ, Anda harus menentukan nama pengguna dan kata sandi untuk mengakses instance melalui Internet. Jika Anda mengaktifkan fitur tanpa autentikasi di VPC untuk instance serverless dan mengakses instance dalam VPC, Anda tidak perlu menentukan nama pengguna atau kata sandi. 
    // Anda bisa mendapatkan nama pengguna dan kata sandi di tab Otentikasi Cerdas halaman Kontrol Akses yang sesuai dengan instance di konsol ApsaraMQ for RocketMQ. 
    consumer->setSessionCredentials("INSTANCE USER NAME","INSTANCE PASSWORD","ALIYUN");
    // Catatan: Jika Anda menggunakan RocketMQ 3.x atau 4.x SDK untuk C++ untuk mengakses instance ApsaraMQ for RocketMQ 5.0, Anda tidak perlu menentukan ID instance. Jika tidak, akses akan gagal. 
    
    auto start = std::chrono::system_clock::now();

    // Daftarkan pendengar Anda sendiri di sini untuk menangani pesan yang diterima.
    // Gunakan fungsi pendengar kustom untuk memproses pesan yang diterima dan mengembalikan hasil pemrosesan. 
    ExampleDelayMessageListener *messageListener = new ExampleDelayMessageListener();
    consumer->subscribe("YOUR DELAY TOPIC", "*");
    consumer->registerMessageListener(messageListener);

    // Persiapan selesai. Anda harus memanggil fungsi startup untuk memulai konsumen. 
    // ********************************************
    // 1. Pastikan langganan dikonfigurasi sebelum Anda memulai konsumen. 
    // 2. Pastikan langganan konsumen dalam grup konsumen yang sama sama. 
    // *********************************************
    consumer->start();

    // Biarkan thread tetap berjalan dan jangan matikan konsumen. 
    std::this_thread::sleep_for(std::chrono::seconds(600));
    consumer->shutdown();
    std::cout << "=======Setelah mengonsumsi pesan======" << std::endl;
    return 0;
}

Kirim dan terima pesan transaksional

Kirim pesan transaksional

#include <iostream>
#include <chrono>
#include <thread>
#include "TransactionMQProducer.h"
#include "MQClientException.h"
#include "TransactionListener.h"

using namespace std;
using namespace rocketmq;

class ExampleTransactionListener : public TransactionListener {
public:
    LocalTransactionState executeLocalTransaction(const MQMessage &msg, void *arg) {
        // Jalankan transaksi lokal. Jika transaksi lokal dijalankan, COMMIT_MESSAGE dikembalikan. Jika transaksi lokal gagal dijalankan, ROLLBACK_MESSAGE dikembalikan. Jika status eksekusi transaksi lokal tidak diketahui, UNKNOWN dikembalikan. 
        // Jika UNKNOWN dikembalikan, tugas terjadwal untuk memeriksa status transaksi lokal dipicu. 
        std::cout << "Jalankan Transaksi Lokal, Pesan Diterima Topik:" << msg.getTopic() << ", ID Pesan:"
                  << msg.getBody() << std::endl;
        return UNKNOWN;
    }

    LocalTransactionState checkLocalTransaction(const MQMessageExt &msg) {
        // Periksa status eksekusi transaksi lokal. Jika transaksi lokal dijalankan, COMMIT_MESSAGE dikembalikan. Jika transaksi lokal gagal dijalankan, ROLLBACK_MESSAGE dikembalikan. Jika status eksekusi transaksi lokal tidak diketahui, UNKNOWN dikembalikan. 
        // Jika UNKNOWN dikembalikan, tunggu hingga tugas terjadwal berikutnya untuk memeriksa status transaksi lokal dipicu. 
        std::cout << "Periksa Transaksi Lokal, Pesan Diterima Topik:" << msg.getTopic() << ", ID Pesan:" << msg.getMsgId()
                  << std::endl;
        return COMMIT_MESSAGE;
    }
};

int main() {
    std::cout << "=======Sebelum mengirim pesan=======" << std::endl;
    // ID grup yang Anda buat di konsol ApsaraMQ for RocketMQ.   
    TransactionMQProducer producer("GID_XXX");
    // Titik akhir yang Anda peroleh di konsol ApsaraMQ for RocketMQ. Format titik akhir mirip dengan rmq-cn-XXXX.rmq.aliyuncs.com:8080.  // Catatan: Masukkan nama domain dan nomor port yang ditampilkan di konsol ApsaraMQ for RocketMQ. Jangan sertakan awalan http:// atau https:// atau gunakan alamat IP yang telah diselesaikan. producer.setNamesrvAddr("ACCESS POINT");
    // Tentukan nama pengguna instance, kata sandi instance, dan saluran pengguna. Saluran pengguna default adalah ALIYUN.      
    // Jika Anda menghubungkan instance ApsaraMQ for RocketMQ melalui Internet, Anda harus menentukan nama pengguna dan kata sandi. Jika Anda menghubungkan instance ApsaraMQ for RocketMQ dalam VPC, Anda tidak perlu menentukan nama pengguna atau kata sandi. 
    // Jika instance adalah instance serverless ApsaraMQ for RocketMQ, Anda harus menentukan nama pengguna dan kata sandi untuk mengakses instance melalui Internet. Jika Anda mengaktifkan fitur tanpa autentikasi di VPC untuk instance serverless dan mengakses instance dalam VPC, Anda tidak perlu menentukan nama pengguna atau kata sandi. 
    // Anda bisa mendapatkan nama pengguna dan kata sandi di tab Otentikasi Cerdas halaman Kontrol Akses yang sesuai dengan instance di konsol ApsaraMQ for RocketMQ. 
    producer.setSessionCredentials("INSTANCE USER NAME","INSTANCE PASSWORD","ALIYUN");
    // Catatan: Jika Anda menggunakan RocketMQ 3.x atau 4.x SDK untuk C++ untuk mengakses instance ApsaraMQ for RocketMQ 5.0, Anda tidak perlu menentukan ID instance. Jika tidak, akses akan gagal. 

    // Pendengar transaksi. 
    ExampleTransactionListener *exampleTransactionListener = new ExampleTransactionListener();
    producer.setTransactionListener(exampleTransactionListener);
    // Mulai produser setelah Anda mengonfigurasi parameter yang diperlukan. 
    producer.start();
    auto start = std::chrono::system_clock::now();
    int count = 3;
    for (int i = 0; i < count; ++i) {
        // Topik yang digunakan untuk mengirim pesan transaksional. Anda harus membuat topik di konsol ApsaraMQ for RocketMQ sebelumnya. 
        MQMessage msg("YOUR TRANSACTION TOPIC", "HiTAG", "Hello,CPP SDK, Transaction Message.");
        try {
            SendResult sendResult = producer.sendMessageInTransaction(msg, &i);
            std::cout << "Hasil Pengiriman:" << sendResult.getSendStatus() << ", ID Pesan: " << sendResult.getMsgId()
                      << std::endl;
            this_thread::sleep_for(chrono::seconds(1));
        } catch (MQException e) {
            std::cout << "Kode Kesalahan: " << e.GetError() << " Pengecualian:" << e.what() << std::endl;
        }
    }
    auto interval = std::chrono::system_clock::now() - start;
    std::cout << "Kirim " << count << " pesan OK, membutuhkan waktu "
              << std::chrono::duration_cast<std::chrono::milliseconds>(interval).count() << "ms" << std::endl;

    std::cout << "Menunggu pemeriksaan transaksi lokal..... " << std::endl;
    for (int i = 0; i < 6; ++i) {
        this_thread::sleep_for(chrono::seconds(10));
        std::cout << "Berjalan "<< i*10 + 10 << " Detik......"<< std::endl;
    }
    producer.shutdown();
    std::cout << "=======Setelah mengirim pesan=======" << std::endl;
    return 0;
}

Berlangganan pesan transaksional

Kode contoh untuk berlangganan pesan transaksional sama dengan kode contoh untuk berlangganan pesan normal. Untuk informasi lebih lanjut, lihat Mengirim dan Menerima Pesan Normal.