全部产品
Search
文档中心

ApsaraMQ for RocketMQ:Kirim dan terima pesan normal

更新时间:Jun 28, 2025

Pesan normal adalah pesan tanpa fitur khusus yang disediakan oleh ApsaraMQ for RocketMQ. Pesan ini berbeda dari jenis pesan lain seperti pesan terjadwal, tertunda, terurut, dan transaksional. Topik ini memberikan contoh kode untuk mengirim dan menerima pesan normal menggunakan SDK Klien TCP untuk C++ Edisi Komunitas.

Prasyarat

Kirim pesan normal

  1. Salin kode berikut ke file ProducerDemo.cpp. Ubah parameter yang sesuai, jalankan perintah g++ untuk mengompilasi kode, lalu hasilkan file eksekusi.

    g++ -o producer_demo -std=c++11 -lz -lrocketmq ProducerDemo.cpp

  2. Contoh kode berikut menunjukkan cara mengirim pesan normal menggunakan SDK Klien TCP untuk C++ Edisi Komunitas:

    #include <iostream>
    #include <chrono>
    #include <thread>
    #include "DefaultMQProducer.h"
    
    using namespace std;
    using namespace rocketmq;
    
    int main() {
        std::cout << "=======Sebelum mengirim pesan=======" << std::endl;
        // ID grup yang Anda ajukan di konsol ApsaraMQ for RocketMQ.
        DefaultMQProducer producer("GID_XXXXXXXXX");
        // Titik akhir TCP yang Anda peroleh dari halaman Detail Instans di konsol ApsaraMQ for RocketMQ.
        producer.setNamesrvAddr("http://MQ_INST_XXXXXXXX.mq-internet-access.mq-internet.aliyuncs.com:80");
        // Pastikan variabel lingkungan ALIBABA_CLOUD_ACCESS_KEY_ID dan ALIBABA_CLOUD_ACCESS_KEY_SECRET telah dikonfigurasi.
    	  // ALIBABA_CLOUD_ACCESS_KEY_ID dan ALIBABA_CLOUD_ACCESS_KEY_SECRET menentukan ID AccessKey dan Rahasia AccessKey dari akun Alibaba Cloud Anda, yang digunakan untuk verifikasi identitas.
        // Saluran pengguna. Nilai default: ALIYUN.
        producer.setSessionCredentials(getenv("ALIBABA_CLOUD_ACCESS_KEY_ID"), getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET"), "ALIYUN");
    
        // Setelah Anda mengonfigurasi parameter yang diperlukan, mulailah produsen.
        producer.start();
        auto start = std::chrono::system_clock::now();
        int count = 32;
        for (int i = 0; i < count; ++i) {
            // Sebelum Anda mengirim pesan, tentukan topik yang Anda ajukan di konsol ApsaraMQ for RocketMQ.
            MQMessage msg("YOURTOPIC","HiTAG","HelloCPPSDK.");
            try {
                SendResult sendResult = producer.send(msg);
                std::cout <<"Hasil Pengiriman:"<<sendResult.getSendStatus()<< ", ID Pesan: " << sendResult.getMsgId() << std::endl;
                this_thread::sleep_for(chrono::seconds(1));
            } catch (MQException e) {
                std::cout << "Kode Kesalahan: " << e.GetError() << " Pengecualian:" << e.what() << std::endl;
            }
        }
        auto interval = std::chrono::system_clock::now() - start;
        std::cout << "Kirim " << count << " pesan OK, membutuhkan waktu "
                  << std::chrono::duration_cast<std::chrono::milliseconds>(interval).count() << "ms" << std::endl;
    
        producer.shutdown();
        std::cout << "=======Setelah mengirim pesan=======" << std::endl;
        return 0;
    }

Konsumsi pesan normal

  1. Salin kode berikut ke file ConsumerDemo.cpp. Ubah parameter yang sesuai, jalankan perintah g++ untuk mengompilasi kode, lalu hasilkan file eksekusi.

    g++ -o consumer_demo -std=c++11 -lz -lrocketmq ConsumerDemo.cpp

  2. Contoh kode berikut menunjukkan cara mengonsumsi pesan normal menggunakan SDK Klien TCP untuk C++ Edisi Komunitas:

    #include <iostream>
    #include <thread>
    #include "DefaultMQPushConsumer.h"
    
    using namespace rocketmq;
    
    
    class ExampleMessageListener : public MessageListenerConcurrently {
    public:
        ConsumeStatus consumeMessage(const std::vector<MQMessageExt> &msgs) {
            for (auto item = msgs.begin(); item != msgs.end(); item++) {
                std::cout << "Pesan Diterima Topik:" << item->getTopic() << ", ID Pesan:" << item->getMsgId() << std::endl;
            }
            return CONSUME_SUCCESS;
        }
    };
    
    int main(int argc, char *argv[]) {
        std::cout << "=======Sebelum mengonsumsi pesan=======" << std::endl;
        // ID grup yang Anda ajukan di konsol ApsaraMQ for RocketMQ.
        DefaultMQPushConsumer *consumer = new DefaultMQPushConsumer("GID_XXXXXXXX");
        // Titik akhir TCP yang Anda peroleh dari halaman Detail Instans di konsol ApsaraMQ for RocketMQ.
        consumer->setNamesrvAddr("http://MQ_INST_XXXXXXXXXXX.mq-internet-access.mq-internet.aliyuncs.com:80");
        // Pastikan variabel lingkungan ALIBABA_CLOUD_ACCESS_KEY_ID dan ALIBABA_CLOUD_ACCESS_KEY_SECRET telah dikonfigurasi.
    	  // ALIBABA_CLOUD_ACCESS_KEY_ID dan ALIBABA_CLOUD_ACCESS_KEY_SECRET menentukan ID AccessKey dan Rahasia AccessKey dari akun Alibaba Cloud Anda, yang digunakan untuk verifikasi identitas.
        // Saluran pengguna. Nilai default: ALIYUN.
        consumer->setSessionCredentials(getenv("ALIBABA_CLOUD_ACCESS_KEY_ID"), getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET"), "ALIYUN");
        auto start = std::chrono::system_clock::now();
    
    
    
        // Gunakan fungsi pendengar kustom untuk memproses pesan yang diterima dan mengembalikan hasilnya.
        ExampleMessageListener *messageListener = new ExampleMessageListener();
        consumer->subscribe("YOURTOPIC", "*");
        consumer->registerMessageListener(messageListener);
    
        // Persiapan selesai. Anda harus memanggil fungsi startup untuk memulai konsumen.
        // ********************************************
        // 1. Sebelum Anda memulai konsumen, pastikan bahwa langganan telah dikonfigurasi.
        // 2. Pastikan bahwa langganan konsumen dalam grup yang sama konsisten.
        // *********************************************
        consumer->start();
    
        // Pertahankan utas utama tetap berjalan hingga proses dimatikan.
        std::this_thread::sleep_for(std::chrono::milliseconds(60 * 1000));
        consumer->shutdown();
        std::cout << "=======Setelah mengonsumsi pesan======" << std::endl;
        return 0;
    }