All Products
Search
Document Center

ApsaraMQ for RocketMQ:Subscribe to messages

Last Updated:Aug 18, 2023

This topic describes how to subscribe to messages by using the TCP client SDK for C or C++ provided by ApsaraMQ for RocketMQ.

Subscription modes

ApsaraMQ for RocketMQ supports the following two subscription modes:

  • Clustering subscription

    All consumers identified by the same group ID consume an equal number of messages. For example, a topic contains nine messages and a group contains three consumer instances. In clustering consumption mode, each instance consumes three messages. Set the value to CLUSTERING.

    // Configure the clustering subscription mode. This is the default mode.
    factoryInfo.setFactoryProperty(ONSFactoryProperty:: MessageModel, ONSFactoryProperty::CLUSTERING);
  • Broadcasting subscription

    Each of the consumers identified by the same group ID consumes all messages once. For example, a topic contains nine messages and a group contains three consumer instances. In broadcasting consumption mode, each instance consumes nine messages. Set the value to BROADCASTING.

    // Configure the broadcasting subscription mode.
    factoryInfo.setFactoryProperty(ONSFactoryProperty:: MessageModel, ONSFactoryProperty::BROADCASTING);
Note
  • You must maintain consistent subscriptions for all consumer instances identified by the same group ID. For more information, see Subscription consistency.
  • The two subscription modes have different functional limits. For example, the broadcasting subscription mode does not support ordered messages, consumption progress maintenance, or consumer offset resetting. For more information, see Clustering consumption and broadcasting consumption.

Sample code:

#include "ONSFactory.h"

#include <iostream>
#include <thread>
#include <mutex>

using namespace ons;

std::mutex console_mtx;

class ExampleMessageListener : public MessageListener {
public:
    Action consume(Message& message, ConsumeContext& context) {
        // The consumer receives the message and attempts to consume it. After the message is consumed, CommitMessage is returned. 
        // If the consumer fails to consume the message or wants to consume the message again, ReconsumeLater is returned. Then, the message is delivered to the consumer again after a predefined period of time. 
        std::lock_guard<std::mutex> lk(console_mtx);
        std::cout << "Received a message. Topic: " << message.getTopic() << ", MsgId: "
        << message.getMsgID() << std::endl;
        return CommitMessage;
    }
};

int main(int argc, char* argv[]) {
    std::cout << "=======Before consuming messages=======" << std::endl;
    ONSFactoryProperty factoryInfo;
    // The ID of the consumer group that you created in the ApsaraMQ for RocketMQ console. ApsaraMQ for RocketMQ instances use group IDs instead of producer IDs and consumer IDs. This parameter is configured to ensure the compatibility with earlier versions. 
    factoryInfo.setFactoryProperty(ONSFactoryProperty::ConsumerId, "GID_XXX");
    // Make sure that the environment variables ALIBABA_CLOUD_ACCESS_KEY_ID and ALIBABA_CLOUD_ACCESS_KEY_SECRET are configured. 
    // The AccessKey ID that is used for authentication. 
    factoryInfo.setFactoryProperty(ONSFactoryProperty::AccessKey, getenv("ALIBABA_CLOUD_ACCESS_KEY_ID"));
		// The AccessKey secret that is used for authentication. 
    factoryInfo.setFactoryProperty(ONSFactoryProperty::SecretKey, getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET"));
    // The endpoint that is used to access the ApsaraMQ for RocketMQ instance. You can obtain the endpoint in the ApsaraMQ for RocketMQ console. 
    factoryInfo.setFactoryProperty(ONSFactoryProperty::NAMESRV_ADDR, "http://xxxxxxxxxxxxxxxx.aliyuncs.com:80");

    PushConsumer *consumer = ONSFactory::getInstance()->createPushConsumer(factoryInfo);

    // The topic that you created in the ApsaraMQ for RocketMQ console. 
    const char* topic_1 = "topic-1";
    // Subscribe to the messages that are attached with the tag-1 tag in topic-1. 
    const char* tag_1 = "tag-1";

    const char* topic_2 = "topic-2";
    // Subscribe to all messages in topic-2. 
    const char* tag_2 = "*";


    // Use a custom listener function to process the received messages and return the results. 
    ExampleMessageListener * message_listener = new ExampleMessageListener();
    consumer->subscribe(topic_1, tag_1, message_listener);
    consumer->subscribe(topic_2, tag_2, message_listener);

    // The preparation is complete. You must invoke the startup function to start the consumer. 
    consumer->start();

    // Keep the thread running and do not terminate the consumer. 
    std::this_thread::sleep_for(std::chrono::milliseconds(60 * 1000));
    consumer->shutdown();
    delete message_listener;
    std::cout << "=======After consuming messages======" << std::endl;
    return 0;
}

Additional information

For information about the best practices for consumer throttling in ApsaraMQ for RocketMQ, see RocketMQ client traffic control design.