This topic describes how to use the Message Queue for Apache RocketMQ SDK for C and C++ to subscribe to messages.

Subscription modes

Message Queue for Apache RocketMQ supports the following two subscription modes:

  • Clustering subscription

    All consumers identified by the same group ID consume an equal number of messages. For example, a topic contains nine messages and a group contains three consumer instances. In clustering consumption mode, each instance consumes three messages. Set the value to CLUSTERING.

    // Configure clustering subscription, which is the default mode.
    factoryInfo.setFactoryProperty(ONSFactoryProperty:: MessageModel, ONSFactoryProperty::CLUSTERING);
  • Broadcasting subscription

    Each of the consumers identified by the same group ID consumes all messages once. For example, a topic contains nine messages and a group contains three consumer instances. In broadcasting consumption mode, each instance consumes nine messages. Set the value to BROADCASTING.

    // Configure broadcasting subscription.
    factoryInfo.setFactoryProperty(ONSFactoryProperty:: MessageModel, ONSFactoryProperty::BROADCASTING);
Note
  • You must maintain consistent subscriptions for all consumer instances identified by the same group ID. For more information, see Subscription consistency.
  • The two subscription modes have different functional limits. For example, the broadcasting subscription mode does not support ordered messages, consumption progress maintenance, or consumer offset resetting. For more information, see Clustering consumption and broadcasting consumption.

Sample code

#include "rocketmq/ONSFactory.h"

#include <iostream>
#include <thread>
#include <mutex>

using namespace ons;

std::mutex console_mtx;

class ExampleMessageListener : public MessageListener {
public:
    Action consume(Message& message, ConsumeContext& context) {
        // The system processes the message. After the processing result is accepted by the consumer, the system returns CommitMessage to the producer.
        // If a message consumption failure occurs or the consumer wants to consume the message again, the system returns ReconsumeLater to the producer. Then, the message is delivered to the consumer again after a specific period.
        std::lock_guard<std::mutex> lk(console_mtx);
        std::cout << "Received a message. Topic: " << message.getTopic() << ", MsgId: "
        << message.getMsgID() << std::endl;
        return CommitMessage;
    }
};

int main(int argc, char* argv[]) {
    std::cout << "=======Before consuming messages=======" << std::endl;
    ONSFactoryProperty factoryInfo;
    // Specify the group ID that you created in the Message Queue for Apache RocketMQ console. For the service versions that involve instances, producer IDs and consumer IDs are replaced with group IDs. This configuration ensures the compatibility with earlier versions.
    factoryInfo.setFactoryProperty(ONSFactoryProperty::ConsumerId, "GID_XXX");
    // Specify the AccessKey ID of your Alibaba Cloud account.
    factoryInfo.setFactoryProperty(ONSFactoryProperty::AccessKey, "Your Access Key");
    // Specify the AccessKey secret of your Alibaba Cloud account.
    factoryInfo.setFactoryProperty(ONSFactoryProperty::SecretKey, "Your Secret Key");
    // Specify the TCP endpoint of your Message Queue for Apache RocketMQ instance. You can view the endpoint in the Message Queue for Apache RocketMQ console.
    factoryInfo.setFactoryProperty(ONSFactoryProperty::NAMESRV_ADDR,
                                   "http://xxxxxxxxxxxxxxxx.aliyuncs.com:80");

    PushConsumer *consumer = ONSFactory::getInstance()->createPushConsumer(factoryInfo);

    // Specify a topic that you created in the Message Queue for Apache RocketMQ console.
    const char* topic_1 = "topic-1";
    // Subscribe to the messages attached with the tag-1 tag in topic-1.
    const char* tag_1 = "tag-1";

    const char* topic_2 = "topic-2";
    // Subscribe to all messages in topic-2.
    const char* tag_2 = "*";


    // Use a custom listener function to process the received messages and return the processing results.
    ExampleMessageListener * message_listener = new ExampleMessageListener();
    consumer->subscribe(topic_1, tag_1, message_listener);
    consumer->subscribe(topic_2, tag_2, message_listener);

    // The preparation is complete. You must invoke the startup function to start the consumer.
    consumer->start();

    // Keep the thread running and do not shut down the consumer.
    std::this_thread::sleep_for(std::chrono::milliseconds(60 * 1000));
    consumer->shutdown();
    delete message_listener;
    std::cout << "=======After consuming messages======" << std::endl;
    return 0;
}

References

For more information about the best practices for consumer throttling in Message Queue for Apache RocketMQ, see RocketMQ client traffic control design.