This topic describes how to use the C/C++ SDK of Message Queue for Apache RocketMQ to subscribe to messages.

Subscription modes

Message Queue for Apache RocketMQ supports the following modes:

  • Clustering subscription

    All consumers identified by the same group ID consume messages in an even manner. For example, a topic contains nine messages and a group contains three consumer instances. In this case, each instance consumes three messages.

    // The configuration of clustering subscription (default mode).
    factoryInfo.setFactoryProperty(ONSFactoryProperty:: MessageModel, ONSFactoryProperty::CLUSTERING);
  • Broadcasting subscription

    Each of the consumers identified by the same group ID consumes all messages once. For example, a topic contains nine messages and a group contains three consumer instances. In this case, each instance consumes nine messages.

    // The configuration of broadcasting subscription.
    factoryInfo.setFactoryProperty(ONSFactoryProperty:: MessageModel, ONSFactoryProperty::BROADCASTING);
Notice
  • Maintain consistent subscription for all consumer instances with the same group ID. For more information, see Subscription consistency.
  • The two subscription patterns have different functional restrictions. For example, the broadcasting consumption pattern does not support ordered messages, consumption progress maintenance, or consumer offset resetting. For more information, see Clustering consumption and broadcasting consumption.

Sample code

For more information about the sample code, see Message Queue for Apache RocketMQ code library.

#include "ONSFactory.h"
using namespace ons;

// MyMsgListener: Create an instance for message consumption.
// After pulling the message, pushConsumer proactively calls the consumer function of the instance.
class MyMsgListener : public MessageListener
{

    public:

        MyMsgListener()
        {
        }

        virtual ~MyMsgListener()
        {
        }

        virtual Action consume(Message &message, ConsumeContext &context)
        {
            // Customize message processing details.
            return CommitMessage; //CONSUME_SUCCESS;
        }
};


int main(int argc, char* argv[])
{

    // The parameter required for the creation and operation of pushConsumer, which must be set.
    ONSFactoryProperty factoryInfo = new ONSFactoryProperty();
    factoryInfo.setFactoryProperty(ONSFactoryProperty::ConsumerId, "XXX");//The group ID you created in the console.
    factoryInfo.setFactoryProperty(ONSFactoryProperty::NAMESRV_ADDR, "XXX"); //The TCP endpoint. Go to the Instances page in the Message Queue for Apache RocketMQ console, and view the endpoint in the Endpoint Information section.
    factoryInfo.setFactoryProperty(ONSFactoryProperty::PublishTopics,"XXX" );//The topic you created in the console.
    factoryInfo.setFactoryProperty(ONSFactoryProperty::AccessKey, "XXX");//The AccessKey ID you created in the Alibaba Cloud console for identity authentication.
    factoryInfo.setFactoryProperty(ONSFactoryProperty::SecretKey,  "XXX");//The AccessKey secret you created in the Alibaba Cloud console for identity authentication.
      // Clustering subscription (default)
      // factoryInfo.setFactoryProperty(ONSFactoryProperty:: MessageModel, ONSFactoryProperty::CLUSTERING);
      // Broadcasting subscription
      // factoryInfo.setFactoryProperty(ONSFactoryProperty:: MessageModel, ONSFactoryProperty::BROADCASTING);

    //create pushConsumer
    PushConsumer* pushConsumer = ONSFactory::getInstance()->createPushConsumer(factoryInfo);

    // Specify the message topic and tag that pushConsumer subscribes to and register the message callback function.
    MyMsgListener  msglistener;
    pushConsumer->subscribe(factoryInfo.getPublishTopics(), "*",&msglistener );

    // Start pushConsumer.
    pushConsumer->start();

    // Note: The shutdown() method can be called only when no messages are received. After the shutdown() method is called, the consumer instance exits and no longer receives messages.

    // Destroy pushConsumer. Before exiting the application, the consumer object must be destroyed. Otherwise, memory leakage and other problems may occur.
    pushConsumer->shutdown();
    return 0;

}

References

For the best practices of consumer throttling in Message Queue for Apache RocketMQ, see RocketMQ client traffic control design.