This topic describes how to use Message Queue for Apache RocketMQ SDK for C and C++ to subscribe to messages.

Subscription modes

Message Queue for Apache RocketMQ supports the following two subscription modes:

  • Clustering subscription

    All consumers identified by the same group ID consume an equal number of messages. For example, a topic contains nine messages and a group contains three consumer instances. In clustering consumption mode, each instance consumes three messages. Set the value to CLUSTERING.

    // Configure clustering subscription (default).
    factoryInfo.setFactoryProperty(ONSFactoryProperty:: MessageModel, ONSFactoryProperty::CLUSTERING);
  • Broadcasting subscription

    Each of the consumers identified by the same group ID consumes all messages once. For example, a topic contains nine messages and a group contains three consumer instances. In broadcasting consumption mode, each instance consumes nine messages. Set the value to BROADCASTING.

    // Configure broadcasting subscription.
    factoryInfo.setFactoryProperty(ONSFactoryProperty:: MessageModel, ONSFactoryProperty::BROADCASTING);
Note
  • You must maintain consistent subscriptions for all consumer instances identified by the same group ID. For more information, see Subscription consistency.
  • The two subscription modes have different functional limits. For example, the broadcasting subscription mode does not support ordered messages, consumption progress maintenance, or consumer offset resetting. For more information, see Clustering consumption and broadcasting consumption.

Sample code

#include "ONSFactory.h"

#include <iostream>
#include <thread>
#include <mutex>

using namespace ons;

std::mutex console_mtx;

class ExampleMessageListener : public MessageListener {
public:
    Action consume(Message& message, ConsumeContext& context) {
        // The consumer receives the message and attempts to consume it. After the message is consumed, CommitMessage is returned. 
        // If the consumer fails to consume the message or wants to consume the message again, ReconsumeLater is returned. Then, the message is delivered to the consumer again after a predefined period of time. 
        std::lock_guard<std::mutex> lk(console_mtx);
        std::cout << "Received a message. Topic: " << message.getTopic() << ", MsgId: "
        << message.getMsgID() << std::endl;
        return CommitMessage;
    }
};

int main(int argc, char* argv[]) {
    std::cout << "=======Before consuming messages=======" << std::endl;
    ONSFactoryProperty factoryInfo;
    // Specify the group ID that you created in the Message Queue for Apache RocketMQ console. Message Queue for Apache RocketMQ instances use the group ID instead of the producer ID and consumer ID. Specifying this value ensures compatibility with earlier versions. 
    factoryInfo.setFactoryProperty(ONSFactoryProperty::ConsumerId, "GID_XXX");
    // Specify the AccessKey ID of your Alibaba Cloud account. 
    factoryInfo.setFactoryProperty(ONSFactoryProperty::AccessKey, "Your Access Key");
    // Specify the AccessKey secret of your Alibaba Cloud account. 
    factoryInfo.setFactoryProperty(ONSFactoryProperty::SecretKey, "Your Secret Key");
    // Specify the TCP endpoint of your Message Queue for Apache RocketMQ instance. You can view the endpoint in the Message Queue for Apache RocketMQ console. 
    factoryInfo.setFactoryProperty(ONSFactoryProperty::NAMESRV_ADDR,
                                   "http://xxxxxxxxxxxxxxxx.aliyuncs.com:80");

    PushConsumer *consumer = ONSFactory::getInstance()->createPushConsumer(factoryInfo);

    // Specify a topic that you created in the Message Queue for Apache RocketMQ console. 
    const char* topic_1 = "topic-1";
    // Subscribe to the messages attached with tag-1 in topic-1. 
    const char* tag_1 = "tag-1";

    const char* topic_2 = "topic-2";
    // Subscribe to all messages in topic-2. 
    const char* tag_2 = "*";


    // Use a custom listener function to process the received messages and return the results. 
    ExampleMessageListener * message_listener = new ExampleMessageListener();
    consumer->subscribe(topic_1, tag_1, message_listener);
    consumer->subscribe(topic_2, tag_2, message_listener);

    // The preparation is complete. You must invoke the startup function to start the consumer. 
    consumer->start();

    // Keep the thread running and do not shut down the consumer. 
    std::this_thread::sleep_for(std::chrono::milliseconds(60 * 1000));
    consumer->shutdown();
    delete message_listener;
    std::cout << "=======After consuming messages======" << std::endl;
    return 0;
}

References

For more information about the best practices for traffic control over the Message Queue for Apache RocketMQ consumer, see Message Queue for Apache RocketMQ client traffic control design.