ApsaraMQ for RocketMQ 5.x instances are compatible with RocketMQ 3.x and 4.x SDK clients. This topic provides C++ sample code for sending and receiving normal, ordered, scheduled/delayed, and transactional messages.
We recommend that you use the latest RocketMQ 5.x SDKs. These SDKs are fully compatible with ApsaraMQ for RocketMQ 5.x brokers and provides more functions and enhanced features. For more information, see Version Guide.
Alibaba Cloud only maintains RocketMQ 4.x, 3.x, and TCP client SDKs. We recommend that you use them only for existing business.
Prerequisites
Before you begin, make sure that you have:
The C++ dynamic library installed. For details, see Install the C++ dynamic library
A topic and a group ID created in the ApsaraMQ for RocketMQ console
The instance endpoint obtained from the ApsaraMQ for RocketMQ console
Common configuration
All examples in this topic share the same connection setup. Replace the following placeholders with your values before running the code.
| Placeholder | Description | Example |
|---|---|---|
<your-group-id> | Group ID created in the console | GID_example |
<your-access-point> | Instance endpoint in host:port format. Do not include http:// or https://, and do not use a resolved IP address. | rmq-cn-xxx.rmq.aliyuncs.com:8080 |
<your-topic> | Topic created in the console | normal_topic_01 |
<instance-username> | Instance username from the Intelligent Authentication tab on the Access Control page | N/A |
<instance-password> | Instance password from the Intelligent Authentication tab on the Access Control page | N/A |
Authentication:
Internet access: Username and password required.
VPC access: No username or password needed.
Serverless instance over the Internet: Username and password required.
Serverless instance in a VPC with authentication-free access enabled: No username or password needed.
Do not specify the instance ID when you use the RocketMQ 3.x or 4.x SDK for C++ with an ApsaraMQ for RocketMQ 5.0 instance. Specifying it causes connection failures.
Common producer setup:
#include <iostream>
#include <chrono>
#include <thread>
#include "DefaultMQProducer.h"
using namespace std;
using namespace rocketmq;
// Initialize the producer with your group ID
DefaultMQProducer producer("<your-group-id>");
// Set the endpoint (domain:port only, no http/https prefix)
producer.setNamesrvAddr("<your-access-point>");
// Set credentials (required for Internet access; skip for VPC access)
producer.setSessionCredentials("<instance-username>", "<instance-password>", "ALIYUN");
producer.start();Common consumer setup:
#include <iostream>
#include <thread>
#include "DefaultMQPushConsumer.h"
using namespace rocketmq;
// Initialize the push consumer with your group ID
DefaultMQPushConsumer *consumer = new DefaultMQPushConsumer("<your-group-id>");
// Set the endpoint (domain:port only, no http/https prefix)
consumer->setNamesrvAddr("<your-access-point>");
// Set credentials (required for Internet access; skip for VPC access)
consumer->setSessionCredentials("<instance-username>", "<instance-password>", "ALIYUN");Before you start a consumer:
Configure all subscriptions before calling
start().Make sure that all consumers in the same group use identical subscriptions.
Normal messages
Use normal messages for general-purpose messaging that does not require ordering, delayed delivery, or transaction support.
Send normal messages
#include <iostream>
#include <chrono>
#include <thread>
#include "DefaultMQProducer.h"
using namespace std;
using namespace rocketmq;
int main() {
cout << "=======Before sending messages=======" << endl;
DefaultMQProducer producer("<your-group-id>");
producer.setNamesrvAddr("<your-access-point>");
producer.setSessionCredentials("<instance-username>", "<instance-password>", "ALIYUN");
producer.start();
auto start = chrono::system_clock::now();
int count = 32;
for (int i = 0; i < count; ++i) {
MQMessage msg("<your-topic>", "HiTAG", "HelloCPPSDK.");
try {
SendResult sendResult = producer.send(msg);
cout << "SendResult:" << sendResult.getSendStatus()
<< ", Message ID: " << sendResult.getMsgId() << endl;
this_thread::sleep_for(chrono::seconds(1));
} catch (MQException& e) {
cout << "ErrorCode: " << e.GetError()
<< " Exception:" << e.what() << endl;
}
}
auto interval = chrono::system_clock::now() - start;
cout << "Send " << count << " messages OK, costs "
<< chrono::duration_cast<chrono::milliseconds>(interval).count()
<< "ms" << endl;
producer.shutdown();
cout << "=======After sending messages=======" << endl;
return 0;
}Subscribe to normal messages
#include <iostream>
#include <thread>
#include "DefaultMQPushConsumer.h"
using namespace rocketmq;
class ExampleMessageListener : public MessageListenerConcurrently {
public:
ConsumeStatus consumeMessage(const std::vector<MQMessageExt> &msgs) {
for (auto item = msgs.begin(); item != msgs.end(); item++) {
std::cout << "Received Message Topic:" << item->getTopic()
<< ", MsgId:" << item->getMsgId() << std::endl;
}
return CONSUME_SUCCESS;
}
};
int main(int argc, char *argv[]) {
std::cout << "=======Before consuming messages=======" << std::endl;
DefaultMQPushConsumer *consumer = new DefaultMQPushConsumer("<your-group-id>");
consumer->setNamesrvAddr("<your-access-point>");
consumer->setSessionCredentials("<instance-username>", "<instance-password>", "ALIYUN");
// Register the listener and subscribe to the topic
ExampleMessageListener *messageListener = new ExampleMessageListener();
consumer->subscribe("<your-topic>", "*");
consumer->registerMessageListener(messageListener);
// Start after subscriptions are configured
consumer->start();
// Keep the main thread alive
std::this_thread::sleep_for(std::chrono::milliseconds(60 * 1000));
consumer->shutdown();
std::cout << "=======After consuming messages======" << std::endl;
return 0;
}Ordered messages
Use ordered messages when messages within the same business entity must be processed in sequence, such as order status updates or inventory changes.
Ordered messages route messages with the same partition key to the same queue through a MessageQueueSelector, which guarantees processing order.
Send ordered messages
#include <iostream>
#include <chrono>
#include <thread>
#include "DefaultMQProducer.h"
using namespace std;
using namespace rocketmq;
class ExampleSelectMessageQueueByHash : public MessageQueueSelector {
public:
MQMessageQueue select(const std::vector<MQMessageQueue> &mqs,
const MQMessage &msg, void *arg) {
// Route messages to a queue based on the partition key
int orderId = *static_cast<int *>(arg);
int index = orderId % mqs.size();
return mqs[index];
}
};
int main() {
cout << "=======Before sending messages=======" << endl;
DefaultMQProducer producer("<your-group-id>");
producer.setNamesrvAddr("<your-access-point>");
producer.setSessionCredentials("<instance-username>", "<instance-password>", "ALIYUN");
producer.start();
auto start = chrono::system_clock::now();
int count = 32;
ExampleSelectMessageQueueByHash *pSelector = new ExampleSelectMessageQueueByHash();
for (int i = 0; i < count; ++i) {
MQMessage msg("<your-topic>", "HiTAG", "Hello,CPP SDK, Orderly Message.");
try {
// Pass the partition key (i) and the queue selector
SendResult sendResult = producer.send(msg, pSelector, &i, 1, false);
cout << "SendResult:" << sendResult.getSendStatus()
<< ", Message ID: " << sendResult.getMsgId()
<< " MessageQueue:" << sendResult.getMessageQueue().toString()
<< endl;
this_thread::sleep_for(chrono::seconds(1));
} catch (MQException& e) {
cout << "ErrorCode: " << e.GetError()
<< " Exception:" << e.what() << endl;
}
}
auto interval = chrono::system_clock::now() - start;
cout << "Send " << count << " messages OK, costs "
<< chrono::duration_cast<chrono::milliseconds>(interval).count()
<< "ms" << endl;
producer.shutdown();
cout << "=======After sending messages=======" << endl;
return 0;
}Subscribe to ordered messages
The consumer uses MessageListenerOrderly instead of MessageListenerConcurrently to process messages in order within each queue.
#include <iostream>
#include <thread>
#include "DefaultMQPushConsumer.h"
using namespace rocketmq;
class ExampleOrderlyMessageListener : public MessageListenerOrderly {
public:
ConsumeStatus consumeMessage(const std::vector<MQMessageExt> &msgs) {
for (auto item = msgs.begin(); item != msgs.end(); item++) {
std::cout << "Received Message Topic:" << item->getTopic()
<< ", MsgId:" << item->getMsgId() << std::endl;
}
return CONSUME_SUCCESS;
}
};
int main(int argc, char *argv[]) {
std::cout << "=======Before consuming messages=======" << std::endl;
DefaultMQPushConsumer *consumer = new DefaultMQPushConsumer("<your-group-id>");
consumer->setNamesrvAddr("<your-access-point>");
consumer->setSessionCredentials("<instance-username>", "<instance-password>", "ALIYUN");
// Register the orderly listener and subscribe to the topic
ExampleOrderlyMessageListener *messageListener = new ExampleOrderlyMessageListener();
consumer->subscribe("<your-topic>", "*");
consumer->registerMessageListener(messageListener);
// Start after subscriptions are configured
consumer->start();
// Keep the main thread alive
std::this_thread::sleep_for(std::chrono::seconds(60));
consumer->shutdown();
std::cout << "=======After consuming messages======" << std::endl;
return 0;
}Scheduled and delayed messages
Use scheduled or delayed messages to deliver messages at a specific time or after a delay, for example, timed notifications, retry mechanisms, or deferred task execution.
Set the __STARTDELIVERTIME message property to a Unix timestamp in milliseconds. The broker holds the message until the specified time.
If the timestamp is in the future, the broker delivers the message at that time.
If the timestamp is in the past, the broker delivers the message immediately.
Send scheduled or delayed messages
#include <iostream>
#include <chrono>
#include <thread>
#include "DefaultMQProducer.h"
using namespace std;
using namespace rocketmq;
int main() {
cout << "=======Before sending messages=======" << endl;
DefaultMQProducer producer("<your-group-id>");
producer.setNamesrvAddr("<your-access-point>");
producer.setSessionCredentials("<instance-username>", "<instance-password>", "ALIYUN");
producer.start();
auto start = chrono::system_clock::now();
int count = 32;
for (int i = 0; i < count; ++i) {
MQMessage msg("<your-topic>", "HiTAG", "Hello,CPP SDK, Delay Message.");
// Calculate a delivery time 10 seconds from now
chrono::system_clock::duration d = chrono::system_clock::now().time_since_epoch();
chrono::milliseconds mil = chrono::duration_cast<chrono::milliseconds>(d);
long exp = mil.count() + 10000; // Delay: 10,000 ms (10 seconds)
msg.setProperty("__STARTDELIVERTIME", to_string(exp));
cout << "Now: " << mil.count() << " Exp:" << exp << endl;
try {
SendResult sendResult = producer.send(msg);
cout << "SendResult:" << sendResult.getSendStatus()
<< ", Message ID: " << sendResult.getMsgId() << endl;
this_thread::sleep_for(chrono::seconds(1));
} catch (MQException& e) {
cout << "ErrorCode: " << e.GetError()
<< " Exception:" << e.what() << endl;
}
}
auto interval = chrono::system_clock::now() - start;
cout << "Send " << count << " messages OK, costs "
<< chrono::duration_cast<chrono::milliseconds>(interval).count()
<< "ms" << endl;
producer.shutdown();
cout << "=======After sending messages=======" << endl;
return 0;
}Subscribe to scheduled or delayed messages
The consumer uses MessageListenerConcurrently, the same listener type as normal messages. The broker handles timing; consumer code is identical.
#include <iostream>
#include <thread>
#include <chrono>
#include "DefaultMQPushConsumer.h"
using namespace rocketmq;
using namespace std;
class ExampleDelayMessageListener : public MessageListenerConcurrently {
public:
ConsumeStatus consumeMessage(const std::vector<MQMessageExt> &msgs) {
for (auto item = msgs.begin(); item != msgs.end(); item++) {
chrono::system_clock::duration d = chrono::system_clock::now().time_since_epoch();
chrono::milliseconds mil = chrono::duration_cast<chrono::milliseconds>(d);
cout << "Now: " << mil.count()
<< " Received Message Topic:" << item->getTopic()
<< ", MsgId:" << item->getMsgId()
<< " DelayTime:" << item->getProperty("__STARTDELIVERTIME")
<< endl;
}
return CONSUME_SUCCESS;
}
};
int main(int argc, char *argv[]) {
cout << "=======Before consuming messages=======" << endl;
DefaultMQPushConsumer *consumer = new DefaultMQPushConsumer("<your-group-id>");
consumer->setNamesrvAddr("<your-access-point>");
consumer->setSessionCredentials("<instance-username>", "<instance-password>", "ALIYUN");
// Register the listener and subscribe to the topic
ExampleDelayMessageListener *messageListener = new ExampleDelayMessageListener();
consumer->subscribe("<your-topic>", "*");
consumer->registerMessageListener(messageListener);
// Start after subscriptions are configured
consumer->start();
// Keep the main thread alive (10 minutes to allow delayed messages to arrive)
this_thread::sleep_for(chrono::seconds(600));
consumer->shutdown();
cout << "=======After consuming messages======" << endl;
return 0;
}Transactional messages
Use transactional messages when local business logic and message delivery must succeed or fail as a unit, for example, balance transfers between accounts.
Transactional messaging works in three steps:
The producer sends a half message to the broker.
The broker invokes
executeLocalTransactionon the producer side. ReturnCOMMIT_MESSAGEto deliver,ROLLBACK_MESSAGEto discard, orUNKNOWNto defer the decision.If the result is
UNKNOWN, the broker periodically invokescheckLocalTransactionuntil it receives a definitive result.
Send transactional messages
#include <iostream>
#include <chrono>
#include <thread>
#include "TransactionMQProducer.h"
#include "MQClientException.h"
#include "TransactionListener.h"
using namespace std;
using namespace rocketmq;
class ExampleTransactionListener : public TransactionListener {
public:
// Called after the half message is sent
LocalTransactionState executeLocalTransaction(const MQMessage &msg, void *arg) {
cout << "Execute Local Transaction, Received Message Topic:" << msg.getTopic()
<< ", Body:" << msg.getBody() << endl;
// Run local business logic here. Return:
// COMMIT_MESSAGE - deliver the message to consumers
// ROLLBACK_MESSAGE - discard the message
// UNKNOWN - defer; the broker will call checkLocalTransaction later
return UNKNOWN;
}
// Called by the broker when executeLocalTransaction returned UNKNOWN
LocalTransactionState checkLocalTransaction(const MQMessageExt &msg) {
cout << "Check Local Transaction, Received Message Topic:" << msg.getTopic()
<< ", MsgId:" << msg.getMsgId() << endl;
// Query local transaction status and return the result
return COMMIT_MESSAGE;
}
};
int main() {
cout << "=======Before sending messages=======" << endl;
// Use TransactionMQProducer instead of DefaultMQProducer
TransactionMQProducer producer("<your-group-id>");
producer.setNamesrvAddr("<your-access-point>");
producer.setSessionCredentials("<instance-username>", "<instance-password>", "ALIYUN");
// Register the transaction listener before starting the producer
ExampleTransactionListener *exampleTransactionListener = new ExampleTransactionListener();
producer.setTransactionListener(exampleTransactionListener);
producer.start();
auto start = chrono::system_clock::now();
int count = 3;
for (int i = 0; i < count; ++i) {
MQMessage msg("<your-topic>", "HiTAG", "Hello,CPP SDK, Transaction Message.");
try {
SendResult sendResult = producer.sendMessageInTransaction(msg, &i);
cout << "SendResult:" << sendResult.getSendStatus()
<< ", Message ID: " << sendResult.getMsgId() << endl;
this_thread::sleep_for(chrono::seconds(1));
} catch (MQException& e) {
cout << "ErrorCode: " << e.GetError()
<< " Exception:" << e.what() << endl;
}
}
auto interval = chrono::system_clock::now() - start;
cout << "Send " << count << " messages OK, costs "
<< chrono::duration_cast<chrono::milliseconds>(interval).count()
<< "ms" << endl;
// Wait 60 seconds for the broker to call checkLocalTransaction
cout << "Wait for local transaction check..... " << endl;
for (int i = 0; i < 6; ++i) {
this_thread::sleep_for(chrono::seconds(10));
cout << "Running " << i * 10 + 10 << " Seconds......" << endl;
}
producer.shutdown();
cout << "=======After sending messages=======" << endl;
return 0;
}Subscribe to transactional messages
Subscribe to transactional messages the same way as normal messages. Use MessageListenerConcurrently and DefaultMQPushConsumer. For the complete code, see Subscribe to normal messages.