全部产品
Search
文档中心

ApsaraMQ for RocketMQ:Kirim dan terima pesan terurut

更新时间:Jul 02, 2025

Pesan terurut adalah jenis pesan yang disediakan oleh ApsaraMQ for RocketMQ. Pesan ini diterbitkan dan dikonsumsi dalam urutan pertama masuk pertama keluar (FIFO) yang ketat. Topik ini memberikan contoh kode untuk mengirim dan menerima pesan terurut menggunakan SDK klien TCP untuk C atau C++.

Informasi latar belakang

Pesan terurut berpartisi dalam topik tertentu dipartisi berdasarkan kunci sharding. Pesan di setiap partisi dikonsumsi dalam urutan FIFO yang ketat. Kunci sharding adalah bidang kunci yang digunakan untuk pesan terurut guna mengidentifikasi partisi yang berbeda. Kunci sharding berbeda dari kunci pesan normal.

    Untuk informasi lebih lanjut, lihat Pesan Terurut.

    Prasyarat

    Pastikan langkah-langkah berikut telah dilakukan:

    • SDK untuk C atau C++ telah diunduh. Untuk informasi lebih lanjut, lihat Catatan Rilis.

    • Lingkungan telah dipersiapkan. Untuk informasi lebih lanjut, lihat Persiapan Lingkungan (V1.x.x).

    • Sumber daya yang ingin Anda tentukan dalam kode telah dibuat di konsol ApsaraMQ for RocketMQ. Sumber daya tersebut mencakup instance, topik, dan grup konsumen. Untuk informasi lebih lanjut, lihat Buat Sumber Daya.

    • Pasangan AccessKey akun Alibaba Cloud Anda telah diperoleh. Untuk informasi lebih lanjut, lihat Buat Pasangan AccessKey.

    Kirim pesan terurut

    Penting

    Broker ApsaraMQ for RocketMQ menentukan urutan pembuatan pesan berdasarkan urutan pengiriman menggunakan satu produser atau thread. Jika pengirim menggunakan beberapa produser atau thread secara bersamaan, urutan pesan ditentukan oleh urutan penerimaan pesan oleh broker ApsaraMQ for RocketMQ. Urutan ini mungkin berbeda dari urutan pengiriman di sisi bisnis.

    Berikut ini adalah contoh kode untuk mengirim pesan terurut menggunakan SDK klien TCP untuk C atau C++:

    #include "ONSFactory.h"
    #include "ONSClientException.h"
    #include <iostream>
    using namespace ons;
    
    int main()
    {
        // Parameter yang diperlukan untuk membuat dan menggunakan produser.
        ONSFactoryProperty factoryInfo;
        // ID grup konsumen yang Anda buat di konsol ApsaraMQ for RocketMQ.
        factoryInfo.setFactoryProperty(ONSFactoryProperty::ProducerId, "XXX");
        // Titik akhir TCP. Anda dapat memperoleh titik akhir di bagian TCP Endpoint halaman Detail Instance di konsol ApsaraMQ for RocketMQ.
        factoryInfo.setFactoryProperty(ONSFactoryProperty::NAMESRV_ADDR, "XXX"); 
        // Topik yang Anda buat di konsol ApsaraMQ for RocketMQ.
        factoryInfo.setFactoryProperty(ONSFactoryProperty::PublishTopics,"XXX" );
        // Isi pesan.
        factoryInfo.setFactoryProperty(ONSFactoryProperty::MsgContent, "XXX");
        // Pastikan variabel lingkungan ALIBABA_CLOUD_ACCESS_KEY_ID dan ALIBABA_CLOUD_ACCESS_KEY_SECRET telah dikonfigurasi.
        // ID AccessKey yang digunakan untuk autentikasi.
        factoryInfo.setFactoryProperty(ONSFactoryProperty::AccessKey, getenv("ALIBABA_CLOUD_ACCESS_KEY_ID"));
    		// Rahasia AccessKey yang digunakan untuk autentikasi.
        factoryInfo.setFactoryProperty(ONSFactoryProperty::SecretKey, getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET"));
    
        // Buat produser.
        OrderProducer *pProducer = ONSFactory::getInstance()->createOrderProducer(factoryInfo);
    
    
        // Sebelum Anda mengirim pesan, panggil metode start() hanya sekali untuk memulai produser.
        pProducer->start();
    
        Message msg(
                    //Topik Pesan
                    factoryInfo.getPublishTopics(),
                    // Tag pesan. Tag pesan mirip dengan tag Gmail dan digunakan oleh konsumen untuk menyaring pesan di broker ApsaraMQ for RocketMQ.
                    "TagA",
                    // Tubuh pesan. Tubuh pesan adalah data dalam format biner. ApsaraMQ for RocketMQ tidak memproses tubuh pesan. Produser dan konsumen harus sepakat pada metode serialisasi dan deserialisasi.
                    factoryInfo.getMessageContent()
        );
    
        // Kunci pesan. Kunci adalah atribut spesifik bisnis dari sebuah pesan dan harus unik secara global.
        // Jika Anda tidak dapat menerima pesan seperti yang diharapkan, Anda dapat menggunakan kunci untuk menanyakan pesan di konsol ApsaraMQ for RocketMQ.
        // Catatan: Anda dapat mengirim dan menerima pesan bahkan jika Anda tidak menentukan kunci.
        msg.setKey("ORDERID_100");
        // Bidang kunci yang digunakan untuk mengidentifikasi partisi untuk pesan terurut berpartisi.
        // Bidang ini dapat diatur ke string non-kosong untuk pesan dengan urutan global.
        std::string shardingKey = "abc";  
        // Pesan yang memiliki kunci sharding yang sama dikirim sesuai urutan.
        try
        {
            // Kirim pesan. Jika tidak ada pengecualian yang dilempar, pesan terkirim.
            SendResultONS sendResult = pProducer->send(msg, shardingKey);
        std::cout << "pengiriman berhasil" << std::endl;
        }
        catch(ONSClientException & e)
        {
            // Tentukan logika untuk menangani kesalahan.
        }
        // Sebelum Anda keluar dari aplikasi Anda, hancurkan produser. Jika Anda tidak menghancurkan produser, masalah seperti kebocoran memori mungkin terjadi.
        pProducer->shutdown();
    
        return 0;
    }           

    Berlangganan pesan terurut

    Berikut ini adalah contoh kode untuk berlangganan pesan terurut menggunakan SDK klien TCP untuk C atau C++:

    #include "ONSFactory.h"
    using namespace std;
    using namespace ons;
    
    // Buat instance konsumen.
    //Setelah konsumen push menarik pesan, konsumen push memanggil fungsi consumeMessage dari instance.
    class ONSCLIENT_API MyMsgListener : public MessageOrderListener
    {
    public:
        MyMsgListener()
        {
        }
        virtual ~MyMsgListener()
        {
        }
    
        virtual OrderAction consume(Message &message, ConsumeOrderContext &context)
        {
            // Konsumsi pesan berdasarkan kebutuhan bisnis Anda.
            return Success; //CONSUME_SUCCESS;
        }
    };
    
    
    int main(int argc, char* argv[])
    {
        // Parameter yang diperlukan untuk membuat dan menggunakan konsumen.
        ONSFactoryProperty factoryInfo;
        // ID grup konsumen yang Anda buat di konsol ApsaraMQ for RocketMQ.
        factoryInfo.setFactoryProperty(ONSFactoryProperty::ConsumerId, "XXX");
        // Topik yang Anda buat di konsol ApsaraMQ for RocketMQ.
        factoryInfo.setFactoryProperty(ONSFactoryProperty::PublishTopics,"XXX" );
        // Pastikan variabel lingkungan ALIBABA_CLOUD_ACCESS_KEY_ID dan ALIBABA_CLOUD_ACCESS_KEY_SECRET telah dikonfigurasi.
        // ID AccessKey yang digunakan untuk autentikasi.
        factoryInfo.setFactoryProperty(ONSFactoryProperty::AccessKey, getenv("ALIBABA_CLOUD_ACCESS_KEY_ID"));
    		// Rahasia AccessKey yang digunakan untuk autentikasi.
        factoryInfo.setFactoryProperty(ONSFactoryProperty::SecretKey, getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET"));
        // Titik akhir TCP. Anda dapat memperoleh titik akhir di bagian TCP Endpoint halaman Detail Instance di konsol ApsaraMQ for RocketMQ.
        factoryInfo.setFactoryProperty(ONSFactoryProperty::NAMESRV_ADDR, "XXX");
    
    
        // Buat konsumen.
        OrderConsumer* orderConsumer = ONSFactory::getInstance()->createOrderConsumer(factoryInfo);
        MyMsgListener  msglistener;
        // Topik dan tag pesan yang dikonsumsi oleh konsumen.
        orderConsumer->subscribe(factoryInfo.getPublishTopics(), "*",&msglistener );
    
        // Daftarkan instance untuk mendengarkan pesan. Setelah konsumen menarik pesan, konsumen memanggil fungsi consumeMessage dari kelas pendengar pesan.
    
        //Mulai konsumen.
        orderConsumer->start();
    
        for(volatile int i = 0; i < 1000000000; ++i) {
            //menunggu
        }
    
        // Hancurkan konsumen. Sebelum Anda keluar dari aplikasi, hancurkan konsumen. Jika tidak, masalah seperti kebocoran memori mungkin terjadi.
        orderConsumer->shutdown();
    
       return 0;
    }