This topic describes how to use ApsaraMQ for RocketMQ SDK for C and C++ to subscribe to messages.
Subscription modes
ApsaraMQ for RocketMQ supports the following two subscription modes:
- Clustering subscription
All consumers identified by the same group ID consume an equal number of messages. For example, a topic contains nine messages and a group contains three consumer instances. In clustering consumption mode, each instance consumes three messages. Set the value to CLUSTERING.
// Configure clustering subscription (default). factoryInfo.setFactoryProperty(ONSFactoryProperty:: MessageModel, ONSFactoryProperty::CLUSTERING);
- Broadcasting subscription
Each of the consumers identified by the same group ID consumes all messages once. For example, a topic contains nine messages and a group contains three consumer instances. In broadcasting consumption mode, each instance consumes nine messages. Set the value to BROADCASTING.
// Configure broadcasting subscription. factoryInfo.setFactoryProperty(ONSFactoryProperty:: MessageModel, ONSFactoryProperty::BROADCASTING);
- You must maintain consistent subscriptions for all consumer instances identified by the same group ID. For more information, see Subscription consistency.
- The two subscription modes have different functional limits. For example, the broadcasting subscription mode does not support ordered messages, consumption progress maintenance, or consumer offset resetting. For more information, see Clustering consumption and broadcasting consumption.
Sample code
#include "ONSFactory.h"
#include <iostream>
#include <thread>
#include <mutex>
using namespace ons;
std::mutex console_mtx;
class ExampleMessageListener : public MessageListener {
public:
Action consume(Message& message, ConsumeContext& context) {
// The consumer receives the message and attempts to consume it. After the message is consumed, CommitMessage is returned.
// If the consumer fails to consume the message or wants to consume the message again, ReconsumeLater is returned. Then, the message is delivered to the consumer again after a predefined period of time.
std::lock_guard<std::mutex> lk(console_mtx);
std::cout << "Received a message. Topic: " << message.getTopic() << ", MsgId: "
<< message.getMsgID() << std::endl;
return CommitMessage;
}
};
int main(int argc, char* argv[]) {
std::cout << "=======Before consuming messages=======" << std::endl;
ONSFactoryProperty factoryInfo;
// Specify the group ID that you created in the ApsaraMQ for RocketMQ console. Message Queue for Apache RocketMQ instances use the group ID instead of the producer ID and consumer ID. Specifying this value ensures compatibility with earlier versions.
factoryInfo.setFactoryProperty(ONSFactoryProperty::ConsumerId, "GID_XXX");
// An AccessKey ID is used as the identifier for identity authentication. For information about how to obtain an AccessKey ID, see Create an AccessKey pair.
factoryInfo.setFactoryProperty(ONSFactoryProperty::AccessKey, "Your Access Key");
// An AccessKey secret is used as the password for identity authentication. For information about how to obtain an AccessKey secret, see Create an AccessKey pair.
factoryInfo.setFactoryProperty(ONSFactoryProperty::SecretKey, "Your Secret Key");
// Specify the TCP endpoint of your Message Queue for Apache RocketMQ instance. You can view the endpoint in the ApsaraMQ for RocketMQ console.
factoryInfo.setFactoryProperty(ONSFactoryProperty::NAMESRV_ADDR,
"http://xxxxxxxxxxxxxxxx.aliyuncs.com:80");
PushConsumer *consumer = ONSFactory::getInstance()->createPushConsumer(factoryInfo);
// Specify a topic that you created in the ApsaraMQ for RocketMQ console.
const char* topic_1 = "topic-1";
// Subscribe to the messages attached with tag-1 in topic-1.
const char* tag_1 = "tag-1";
const char* topic_2 = "topic-2";
// Subscribe to all messages in topic-2.
const char* tag_2 = "*";
// Use a custom listener function to process the received messages and return the results.
ExampleMessageListener * message_listener = new ExampleMessageListener();
consumer->subscribe(topic_1, tag_1, message_listener);
consumer->subscribe(topic_2, tag_2, message_listener);
// The preparation is complete. You must invoke the startup function to start the consumer.
consumer->start();
// Keep the thread running and do not shut down the consumer.
std::this_thread::sleep_for(std::chrono::milliseconds(60 * 1000));
consumer->shutdown();
delete message_listener;
std::cout << "=======After consuming messages======" << std::endl;
return 0;
}
References
For more information about the best practices for traffic control over the ApsaraMQ for RocketMQ consumer, see ApsaraMQ for RocketMQ client traffic control design.