全部产品
Search
文档中心

ApsaraMQ for RocketMQ:Kirim dan terima pesan terjadwal dan pesan tertunda

更新时间:Jun 28, 2025

Topik ini menyediakan contoh kode untuk menggunakan TCP client SDK C++ dari Community Edition dalam mengirim dan menerima pesan terjadwal serta pesan tertunda.

Informasi latar belakang

  • Pesan terjadwal adalah pesan yang dikirim oleh broker ApsaraMQ for RocketMQ kepada konsumen pada waktu tertentu.

  • Pesan tertunda adalah pesan yang dikirim oleh broker ApsaraMQ for RocketMQ kepada konsumen setelah jangka waktu tertentu.

Untuk informasi lebih lanjut, lihat Pesan Terjadwal dan Pesan Tertunda.

Penting

Metode konfigurasi dan hasil pesan terjadwal serta pesan tertunda berbeda antara Apache RocketMQ dan ApsaraMQ for RocketMQ. Apache RocketMQ mendukung pesan tertunda tetapi tidak mendukung pesan terjadwal. Oleh karena itu, tidak ada antarmuka khusus untuk penjadwalan pesan. ApsaraMQ for RocketMQ mendukung kedua jenis pesan tersebut, memungkinkan pengaturan waktu terjadwal dan periode penundaan hingga detik serta menawarkan konkurensi yang lebih tinggi. Kami merekomendasikan penggunaan cloud untuk mengirim dan menerima pesan terjadwal serta pesan tertunda. Untuk informasi lebih lanjut, lihat bagian-bagian berikut.

Prasyarat

Kirim pesan terjadwal

  1. Salin kode berikut ke file DelayProducerDemo.cpp. Modifikasi parameter yang sesuai, jalankan perintah g++ untuk mengompilasi kode, lalu buat file eksekusi yang dapat dijalankan.

    g++ -o delay_producer_demo -std=c++11 -lz -lrocketmq DelayProducerDemo.cpp

  2. Kode sampel berikut menunjukkan cara mengirim pesan terjadwal menggunakan TCP client SDK C++ dari Community Edition:

    #include <iostream>
    #include <chrono>
    #include <thread>
    #include "DefaultMQProducer.h"
    
    using namespace std;
    using namespace rocketmq;
    
    int main() {
        std::cout << "=======Sebelum mengirim pesan=======" << std::endl;
        // ID grup yang Anda ajukan di konsol ApsaraMQ for RocketMQ. 
        DefaultMQProducer producer("GID_XXXXXXXXXXX");
        // Titik akhir TCP yang Anda peroleh dari halaman Detail Instansi di konsol ApsaraMQ for RocketMQ. 
        producer.setNamesrvAddr("http://MQ_INST_XXXXXXX.mq-internet-access.mq-internet.aliyuncs.com:80");
        // Pastikan variabel lingkungan ALIBABA_CLOUD_ACCESS_KEY_ID dan ALIBABA_CLOUD_ACCESS_KEY_SECRET telah dikonfigurasi. 
    	  // ALIBABA_CLOUD_ACCESS_KEY_ID dan ALIBABA_CLOUD_ACCESS_KEY_SECRET menentukan ID AccessKey dan rahasia AccessKey dari akun Alibaba Cloud Anda. Mereka digunakan untuk otentikasi. 
        // Saluran pengguna. Nilai default: ALIYUN. 
        producer.setSessionCredentials(getenv("ALIBABA_CLOUD_ACCESS_KEY_ID"), getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET"), "ALIYUN");
    
        // Setelah Anda mengonfigurasi parameter yang diperlukan, mulailah produser. 
        producer.start();
        auto start = std::chrono::system_clock::now();
        int count = 32;
        for (int i = 0; i < count; ++i) {
            // Topik yang Anda ajukan di konsol ApsaraMQ for RocketMQ. 
            MQMessage msg("TOPIK TERTUNDA ANDA", "HiTAG", "Halo,CPP SDK, Pesan Tertunda.");
            chrono::system_clock::duration d = chrono::system_clock::now().time_since_epoch();
            chrono::milliseconds mil = chrono::duration_cast<chrono::milliseconds>(d);
            // Cap waktu yang menunjukkan waktu setelah pesan terjadwal dikirim. Unit: milidetik. Sebagai contoh, Anda dapat menentukan cap waktu untuk mengirim pesan terjadwal pada pukul 16:21:00 tanggal 7 Maret 2020. 
            // Jika Anda menentukan cap waktu yang lebih awal dari titik waktu saat ini, pesan akan segera dikirimkan ke konsumen. 
            // Periode penundaan untuk pesan tertunda. Sebagai contoh, jika Anda ingin mengirim pesan 10 detik setelah titik waktu saat ini, tentukan nilainya sebagai 10000. 
            long exp = mil.count() + 10000;
            msg.setProperty("__STARTDELIVERTIME", to_string(exp));
            std::cout << "Sekarang: " << mil.count() << " Exp:" << exp << std::endl;
            try {
                SendResult sendResult = producer.send(msg);
                std::cout << "Hasil Pengiriman:" << sendResult.getSendStatus() << ", ID Pesan: " << sendResult.getMsgId()
                          << std::endl;
                this_thread::sleep_for(chrono::seconds(1));
            } catch (MQException e) {
                std::cout << "Kode Kesalahan: " << e.GetError() << " Pengecualian:" << e.what() << std::endl;
            }
        }
        auto interval = std::chrono::system_clock::now() - start;
        std::cout << "Kirim " << count << " pesan OK, biaya "
                  << std::chrono::duration_cast<std::chrono::milliseconds>(interval).count() << "ms" << std::endl;
    
        producer.shutdown();
        std::cout << "=======Setelah mengirim pesan=======" << std::endl;
        return 0;
    }

Konsumsi pesan terjadwal

  1. Salin kode berikut ke file DelayConsumerDemo.cpp. Modifikasi parameter yang sesuai, jalankan perintah g++ untuk mengompilasi kode, lalu buat file eksekusi yang dapat dijalankan.

    g++ -o delay_consumer_demo -std=c++11 -lz -lrocketmq DelayConsumerDemo.cpp

  2. Kode berikut menunjukkan cara mengonsumsi pesan terjadwal menggunakan TCP client SDK C++ dari Community Edition:

    
    #include <iostream>
    #include <thread>
    #include <chrono>
    #include "DefaultMQPushConsumer.h"
    
    using namespace rocketmq;
    using namespace std;
    
    class ExampleDelayMessageListener : public MessageListenerConcurrently {
    public:
        ConsumeStatus consumeMessage(const std::vector<MQMessageExt> &msgs) {
            for (auto item = msgs.begin(); item != msgs.end(); item++) {
                chrono::system_clock::duration d = chrono::system_clock::now().time_since_epoch();
                chrono::milliseconds mil = chrono::duration_cast<chrono::milliseconds>(d);
                std::cout << "Sekarang: " << mil.count() << " Pesan Diterima Topik:" << item->getTopic() << ", ID Pesan:"
                          << item->getMsgId() << " Waktu Penundaan:" << item->getProperty("__STARTDELIVERTIME") << std::endl;
            }
            return CONSUME_SUCCESS;
        }
    };
    
    int main(int argc, char *argv[]) {
        std::cout << "=======Sebelum mengonsumsi pesan=======" << std::endl;
        // ID grup yang Anda ajukan di konsol ApsaraMQ for RocketMQ. 
        DefaultMQPushConsumer *consumer = new DefaultMQPushConsumer("GID_XXXXXXXXXX");
        // Titik akhir TCP yang Anda peroleh dari halaman Detail Instansi di konsol ApsaraMQ for RocketMQ. 
        consumer->setNamesrvAddr("http://MQ_INST_XXXXXXXX.mq-internet-access.mq-internet.aliyuncs.com:80");
        // Pastikan variabel lingkungan ALIBABA_CLOUD_ACCESS_KEY_ID dan ALIBABA_CLOUD_ACCESS_KEY_SECRET telah dikonfigurasi. 
    	  // ALIBABA_CLOUD_ACCESS_KEY_ID dan ALIBABA_CLOUD_ACCESS_KEY_SECRET menentukan ID AccessKey dan rahasia AccessKey dari akun Alibaba Cloud Anda. Mereka digunakan untuk otentikasi. 
        // Saluran pengguna. Nilai default: ALIYUN. 
        consumer->setSessionCredentials(getenv("ALIBABA_CLOUD_ACCESS_KEY_ID"), getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET"), "ALIYUN");
        auto start = std::chrono::system_clock::now();
    
        // Daftarkan pendengar Anda sendiri di sini untuk menangani pesan yang diterima.
        // Gunakan fungsi pendengar kustom untuk memproses pesan yang diterima dan mengembalikan hasil pemrosesan. 
        ExampleDelayMessageListener *messageListener = new ExampleDelayMessageListener();
        consumer->subscribe("TOPIK TERTUNDA ANDA", "*");
        consumer->registerMessageListener(messageListener);
    
        // Mulai konsumen ini
        // Persiapan selesai. Anda harus memanggil fungsi startup untuk memulai konsumen. 
        // ********************************************
        // 1. Sebelum Anda memulai konsumen, pastikan bahwa langganan telah dikonfigurasi. 
        // 2. Pastikan bahwa langganan konsumen dalam grup yang sama konsisten. 
        // *********************************************
        consumer->start();
    
        // Pertahankan thread utama tetap berjalan sampai proses selesai.
        // Pertahankan thread tetap berjalan dan jangan matikan konsumen. 
        std::this_thread::sleep_for(std::chrono::seconds(600));
        consumer->shutdown();
        std::cout << "=======Setelah mengonsumsi pesan======" << std::endl;
        return 0;
    }