All Products
Search
Document Center

ApsaraMQ for RocketMQ:Subscribe to messages

Last Updated:Mar 10, 2026

Subscribe to messages from ApsaraMQ for RocketMQ by using the TCP client SDK for C/C++. This topic explains the two subscription modes, provides a complete consumer implementation, and covers consumer lifecycle management.

Subscription modes

ApsaraMQ for RocketMQ supports two subscription modes that determine how messages are distributed across consumer instances within the same group ID.

Clustering subscription (default)

Each message is delivered to only one consumer instance in the group. All consumers identified by the same group ID consume an equal number of messages.

For example, if a topic contains nine messages and the group has three consumer instances, each instance receives three messages.

// Clustering subscription is the default mode. To set it explicitly:
factoryInfo.setFactoryProperty(ONSFactoryProperty::MessageModel, ONSFactoryProperty::CLUSTERING);

Use clustering subscription when each message only needs to be processed once, such as order processing or task distribution.

Broadcasting subscription

Every message is delivered to all consumer instances in the group. Each consumer receives a full copy of every message.

For example, if a topic contains nine messages and the group has three consumer instances, each instance receives all nine messages.

// Set broadcasting subscription mode.
factoryInfo.setFactoryProperty(ONSFactoryProperty::MessageModel, ONSFactoryProperty::BROADCASTING);

Use broadcasting subscription when every instance needs the same data, such as local cache refresh or configuration sync.

Note

Sample code

The following example creates a PushConsumer that subscribes to two topics, processes incoming messages through a listener callback, and handles the consumer lifecycle.

#include "ONSFactory.h"

#include <iostream>
#include <thread>
#include <mutex>

using namespace ons;

// Mutex to synchronize access to shared resources across listener threads.
std::mutex console_mtx;

class ExampleMessageListener : public MessageListener {
public:
    Action consume(Message& message, ConsumeContext& context) {
        // Return CommitMessage after successful processing.
        // Return ReconsumeLater if processing fails or you want to consume
        // the message again -- the broker redelivers the message after a
        // predefined interval.
        std::lock_guard<std::mutex> lk(console_mtx);
        std::cout << "Received a message. Topic: " << message.getTopic() << ", MsgId: "
        << message.getMsgID() << std::endl;
        return CommitMessage;
    }
};

int main(int argc, char* argv[]) {
    std::cout << "=======Before consuming messages=======" << std::endl;
    ONSFactoryProperty factoryInfo;

    // Group ID created in the ApsaraMQ for RocketMQ console.
    // ApsaraMQ for RocketMQ instances use group IDs instead of producer IDs
    // and consumer IDs. This parameter maintains backward compatibility.
    factoryInfo.setFactoryProperty(ONSFactoryProperty::ConsumerId, "GID_XXX");

    // Retrieve credentials from environment variables.
    // Make sure ALIBABA_CLOUD_ACCESS_KEY_ID and ALIBABA_CLOUD_ACCESS_KEY_SECRET
    // are set before running this program.
    factoryInfo.setFactoryProperty(ONSFactoryProperty::AccessKey, getenv("ALIBABA_CLOUD_ACCESS_KEY_ID"));
    factoryInfo.setFactoryProperty(ONSFactoryProperty::SecretKey, getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET"));

    // Endpoint of the ApsaraMQ for RocketMQ instance.
    // Get this value from the ApsaraMQ for RocketMQ console.
    factoryInfo.setFactoryProperty(ONSFactoryProperty::NAMESRV_ADDR, "http://xxxxxxxxxxxxxxxx.aliyuncs.com:80");

    PushConsumer *consumer = ONSFactory::getInstance()->createPushConsumer(factoryInfo);

    // Subscribe to messages with a specific tag in topic-1.
    const char* topic_1 = "topic-1";
    const char* tag_1 = "tag-1";

    // Subscribe to all messages in topic-2 by using the wildcard tag "*".
    const char* topic_2 = "topic-2";
    const char* tag_2 = "*";

    // Register the listener and subscribe to both topics.
    ExampleMessageListener * message_listener = new ExampleMessageListener();
    consumer->subscribe(topic_1, tag_1, message_listener);
    consumer->subscribe(topic_2, tag_2, message_listener);

    // Start the consumer. Messages begin arriving after this call.
    consumer->start();

    // Keep the main thread alive. In production, replace this with
    // your application's main loop or signal handler.
    std::this_thread::sleep_for(std::chrono::milliseconds(60 * 1000));
    consumer->shutdown();
    delete message_listener;
    std::cout << "=======After consuming messages======" << std::endl;
    return 0;
}

What's next