Normal messages are the most basic message type in ApsaraMQ for RocketMQ. Unlike scheduled, delayed, ordered, and transactional messages, normal messages carry no special delivery semantics. Use normal messages when your application needs reliable delivery without ordering or timing constraints.
The following examples demonstrate how to send and consume normal messages with the TCP client SDK for C++ (Community Edition).
Prerequisites
Before you begin, make sure that you have:
Installed the C++ dynamic library for the RocketMQ SDK. For more information, see Install the C++ dynamic library.
Created an AccessKey pair for your Alibaba Cloud account. For more information, see Create an AccessKey pair.
Set the environment variables
ALIBABA_CLOUD_ACCESS_KEY_IDandALIBABA_CLOUD_ACCESS_KEY_SECRETto your AccessKey ID and AccessKey secret.
Send normal messages
Step 1: Create the producer file
Save the following code as ProducerDemo.cpp. Replace the placeholders with your actual values:
| Placeholder | Description | Example |
|---|---|---|
<your-group-id> | The group ID created in the ApsaraMQ for RocketMQ console | GID_demo_producer |
<your-endpoint> | The TCP endpoint from the Instance Details page in the ApsaraMQ for RocketMQ console | http://MQ_INST_xxxxx.mq-internet-access.mq-internet.aliyuncs.com:80 |
<your-topic> | The topic created in the ApsaraMQ for RocketMQ console | demo_topic |
#include <iostream>
#include <chrono>
#include <thread>
#include "DefaultMQProducer.h"
using namespace std;
using namespace rocketmq;
int main() {
std::cout << "=======Before sending messages=======" << std::endl;
// Group ID from the ApsaraMQ for RocketMQ console
DefaultMQProducer producer("<your-group-id>");
// TCP endpoint from the Instance Details page
producer.setNamesrvAddr("<your-endpoint>");
// Authenticate with AccessKey credentials stored in environment variables.
// User channel default: ALIYUN.
producer.setSessionCredentials(
getenv("ALIBABA_CLOUD_ACCESS_KEY_ID"),
getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET"),
"ALIYUN"
);
// Start the producer
producer.start();
auto start = std::chrono::system_clock::now();
int count = 32;
for (int i = 0; i < count; ++i) {
// Construct a message with topic, tag, and body
MQMessage msg("<your-topic>", "HiTAG", "HelloCPPSDK.");
try {
SendResult sendResult = producer.send(msg);
std::cout << "SendResult:" << sendResult.getSendStatus()
<< ", Message ID: " << sendResult.getMsgId() << std::endl;
this_thread::sleep_for(chrono::seconds(1));
} catch (MQException e) {
std::cout << "ErrorCode: " << e.GetError()
<< " Exception:" << e.what() << std::endl;
}
}
auto interval = std::chrono::system_clock::now() - start;
std::cout << "Send " << count << " messages OK, costs "
<< std::chrono::duration_cast<std::chrono::milliseconds>(interval).count()
<< "ms" << std::endl;
producer.shutdown();
std::cout << "=======After sending messages=======" << std::endl;
return 0;
}Step 2: Compile and run the producer
g++ -o producer_demo -std=c++11 -lz -lrocketmq ProducerDemo.cpp
./producer_demoExpected output:
SendResult:0, Message ID: C0A8XXXXXXXXXXXXXXXXXXXXA send status of 0 indicates success. Each message prints its unique message ID.
Consume normal messages
Step 1: Create the consumer file
Save the following code as ConsumerDemo.cpp. Use the same placeholder values as in the producer example above.
#include <iostream>
#include <thread>
#include "DefaultMQPushConsumer.h"
using namespace rocketmq;
// Implement a message listener to process incoming messages
class ExampleMessageListener : public MessageListenerConcurrently {
public:
ConsumeStatus consumeMessage(const std::vector<MQMessageExt> &msgs) {
for (auto item = msgs.begin(); item != msgs.end(); item++) {
std::cout << "Received Message Topic:" << item->getTopic()
<< ", MsgId:" << item->getMsgId() << std::endl;
}
return CONSUME_SUCCESS;
}
};
int main(int argc, char *argv[]) {
std::cout << "=======Before consuming messages=======" << std::endl;
// Group ID from the ApsaraMQ for RocketMQ console
DefaultMQPushConsumer *consumer = new DefaultMQPushConsumer("<your-group-id>");
// TCP endpoint from the Instance Details page
consumer->setNamesrvAddr("<your-endpoint>");
// Authenticate with AccessKey credentials stored in environment variables.
// User channel default: ALIYUN.
consumer->setSessionCredentials(
getenv("ALIBABA_CLOUD_ACCESS_KEY_ID"),
getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET"),
"ALIYUN"
);
// Subscribe to the topic and register a listener
ExampleMessageListener *messageListener = new ExampleMessageListener();
consumer->subscribe("<your-topic>", "*");
consumer->registerMessageListener(messageListener);
// Start the consumer after subscriptions are configured.
// All consumers in the same group must have identical subscriptions.
consumer->start();
// Keep the main thread alive for 60 seconds to receive messages
std::this_thread::sleep_for(std::chrono::milliseconds(60 * 1000));
consumer->shutdown();
std::cout << "=======After consuming messages======" << std::endl;
return 0;
}Step 2: Compile and run the consumer
g++ -o consumer_demo -std=c++11 -lz -lrocketmq ConsumerDemo.cpp
./consumer_demoExpected output:
Received Message Topic:demo_topic, MsgId:C0A8XXXXXXXXXXXXXXXXXXXXBest practices
Group ID consistency: All consumers in the same group must subscribe to the same topics with identical tag filters. Mismatched subscriptions cause undefined delivery behavior.
Consumer lifecycle: This sample runs for 60 seconds before shutting down. In production, keep the consumer process alive for as long as your application needs to receive messages.
Error handling: The producer catches
MQExceptionand prints the error code. Extend this pattern with retries or alerting as needed.