全部产品
Search
文档中心

ApsaraMQ for RocketMQ:Berlangganan pesan

更新时间:Jul 06, 2025

Topik ini menjelaskan cara berlangganan pesan menggunakan TCP Client SDK untuk C atau C++ yang disediakan oleh ApsaraMQ for RocketMQ.

Mode langganan

ApsaraMQ for RocketMQ mendukung mode langganan berikut:

  • Langganan Klustering

    Semua konsumen dengan ID grup yang sama mengonsumsi jumlah pesan yang sama. Sebagai contoh, sebuah topik berisi sembilan pesan dan grup konsumen terdiri dari tiga konsumen. Dalam mode konsumsi klustering, setiap konsumen mengonsumsi tiga pesan. Kode berikut menunjukkan cara mengonfigurasi mode langganan klustering:

    // Konfigurasikan mode langganan klustering. Ini adalah mode default.
    factoryInfo.setFactoryProperty(ONSFactoryProperty:: MessageModel, ONSFactoryProperty::CLUSTERING);
  • Langganan Siaran

    Setiap konsumen dengan ID grup yang sama mengonsumsi semua pesan sekali. Sebagai contoh, sebuah topik berisi sembilan pesan dan grup konsumen terdiri dari tiga konsumen. Dalam mode konsumsi siaran, setiap konsumen mengonsumsi sembilan pesan. Kode berikut menunjukkan cara mengonfigurasi mode langganan siaran:

    // Konfigurasikan mode langganan siaran.
    factoryInfo.setFactoryProperty(ONSFactoryProperty:: MessageModel, ONSFactoryProperty::BROADCASTING);
Catatan
  • Pastikan langganan tetap konsisten di semua instance konsumen dengan ID grup yang sama. Untuk informasi lebih lanjut, lihat Konsistensi Langganan.

  • Batas berbeda diberlakukan pada mode langganan di atas. Sebagai contoh, dalam mode langganan siaran, Anda tidak dapat mengirim atau menerima pesan terurut, mempertahankan kemajuan konsumsi, atau menyetel ulang offset konsumen. Untuk informasi lebih lanjut, lihat Konsumsi Klustering dan Konsumsi Siaran.

Kode contoh:

#include "ONSFactory.h"

#include <iostream>
#include <thread>
#include <mutex>

using namespace ons;

std::mutex console_mtx;

class ExampleMessageListener : public MessageListener {
public:
    Action consume(Message& message, ConsumeContext& context) {
        // Konsumen menerima pesan dan mencoba mengonsumsinya. Setelah pesan dikonsumsi, CommitMessage dikembalikan. 
        // Jika konsumen gagal mengonsumsi pesan atau ingin mengonsumsi pesan lagi, ReconsumeLater dikembalikan. Kemudian, pesan dikirimkan ke konsumen lagi setelah periode waktu yang telah ditentukan. 
        std::lock_guard<std::mutex> lk(console_mtx);
        std::cout << "Menerima pesan. Topik: " << message.getTopic() << ", MsgId: "
        << message.getMsgID() << std::endl;
        return CommitMessage;
    }
};

int main(int argc, char* argv[]) {
    std::cout << "=======Sebelum mengonsumsi pesan=======" << std::endl;
    ONSFactoryProperty factoryInfo;
    // ID grup konsumen yang Anda buat di konsol ApsaraMQ for RocketMQ. Instance ApsaraMQ for RocketMQ menggunakan ID grup alih-alih ID produsen dan ID konsumen. Parameter ini dikonfigurasi untuk memastikan kompatibilitas dengan versi sebelumnya. 
    factoryInfo.setFactoryProperty(ONSFactoryProperty::ConsumerId, "GID_XXX");
    // Pastikan variabel lingkungan ALIBABA_CLOUD_ACCESS_KEY_ID dan ALIBABA_CLOUD_ACCESS_KEY_SECRET dikonfigurasi. 
    // ID AccessKey yang digunakan untuk otentikasi. 
    factoryInfo.setFactoryProperty(ONSFactoryProperty::AccessKey, getenv("ALIBABA_CLOUD_ACCESS_KEY_ID"));
		// Rahasia AccessKey yang digunakan untuk otentikasi. 
    factoryInfo.setFactoryProperty(ONSFactoryProperty::SecretKey, getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET"));
    // Titik akhir yang digunakan untuk mengakses instance ApsaraMQ for RocketMQ. Anda bisa mendapatkan titik akhir di konsol ApsaraMQ for RocketMQ. 
    factoryInfo.setFactoryProperty(ONSFactoryProperty::NAMESRV_ADDR, "http://xxxxxxxxxxxxxxxx.aliyuncs.com:80");

    PushConsumer *consumer = ONSFactory::getInstance()->createPushConsumer(factoryInfo);

    // Topik yang Anda buat di konsol ApsaraMQ for RocketMQ. 
    const char* topic_1 = "topic-1";
    // Berlangganan pesan yang dilampirkan dengan tag tag-1 di topik-1. 
    const char* tag_1 = "tag-1";

    const char* topic_2 = "topic-2";
    // Berlangganan semua pesan di topik-2. 
    const char* tag_2 = "*";


    // Gunakan fungsi pendengar kustom untuk memproses pesan yang diterima dan mengembalikan hasilnya. 
    ExampleMessageListener * message_listener = new ExampleMessageListener();
    consumer->subscribe(topic_1, tag_1, message_listener);
    consumer->subscribe(topic_2, tag_2, message_listener);

    // Persiapan selesai. Anda harus memanggil fungsi startup untuk memulai konsumen. 
    consumer->start();

    // Pertahankan thread tetap berjalan dan jangan hentikan konsumen. 
    std::this_thread::sleep_for(std::chrono::milliseconds(60 * 1000));
    consumer->shutdown();
    delete message_listener;
    std::cout << "=======Setelah mengonsumsi pesan======" << std::endl;
    return 0;
}

Informasi tambahan

Untuk informasi tentang praktik terbaik pembatasan konsumen di ApsaraMQ for RocketMQ, lihat Desain Kontrol Lalu Lintas Klien RocketMQ.