This topic provides C++ sample code for sending and receiving normal messages by using the ApsaraMQ for RocketMQ HTTP client SDK. Normal messages are featureless messages provided by ApsaraMQ for RocketMQ -- unlike scheduled, delayed, ordered, and transactional messages, they carry no special delivery semantics. Use normal messages for general-purpose messaging where ordering, timing, and transaction guarantees are not required.
Prerequisites
Before you start, make sure that the following operations are performed:
Install the SDK for C++. For more information, see Prepare the environment.
Create the resources that you want to specify in the code in the ApsaraMQ for RocketMQ console. The resources include instances, topics, and consumer groups. For more information, see Create resources.
Obtain the AccessKey pair of your Alibaba Cloud account. For more information, see Create an AccessKey pair.
Send normal messages
Initialize an MQClient with your HTTP endpoint and AccessKey pair, then publish messages to a topic.
Replace the following placeholders with your actual values:
| Placeholder | Description | Where to find it |
|---|---|---|
${HTTP_ENDPOINT} | HTTP access endpoint | HTTP Endpoint section on the Instance Details page |
${TOPIC} | Target topic name | Created in the ApsaraMQ for RocketMQ console |
${INSTANCE_ID} | Instance ID. Set to an empty string if the instance has no namespace | Instance Details page in the console |
#include <fstream>
#include <time.h>
#include "mq_http_sdk/mq_client.h"
using namespace std;
using namespace mq::http::sdk;
int main() {
MQClient mqClient(
// The HTTP endpoint, found in the HTTP Endpoint section
// of the Instance Details page in the ApsaraMQ for RocketMQ console.
"${HTTP_ENDPOINT}",
// The AccessKey ID and AccessKey secret for authentication.
// Read from environment variables to avoid hardcoding credentials.
System.getenv("ALIBABA_CLOUD_ACCESS_KEY_ID"),
System.getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET")
);
// The topic to publish messages to. Create the topic in the ApsaraMQ for RocketMQ console.
string topic = "${TOPIC}";
// The instance ID. If the instance has no namespace, set this to an empty string.
// Check the namespace on the Instance Details page in the console.
string instanceId = "${INSTANCE_ID}";
MQProducerPtr producer;
if (instanceId == "") {
producer = mqClient.getProducerRef(topic);
} else {
producer = mqClient.getProducerRef(instanceId, topic);
}
try {
// Send four messages in a loop.
for (int i = 0; i < 4; i++)
{
PublishMessageResponse pmResp;
// The message body.
TopicMessage pubMsg("Hello, mq!have key!");
// Set a custom property on the message.
pubMsg.putProperty("a",std::to_string(i));
// The message key.
pubMsg.setMessageKey("MessageKey" + std::to_string(i));
producer->publishMessage(pubMsg, pmResp);
cout << "Publish mq message success. Topic is: " << topic
<< ", msgId is:" << pmResp.getMessageId()
<< ", bodyMD5 is:" << pmResp.getMessageBodyMD5() << endl;
}
} catch (MQServerException& me) {
cout << "Request Failed: " + me.GetErrorCode() << ", requestId is:" << me.GetRequestId() << endl;
return -1;
} catch (MQExceptionBase& mb) {
cout << "Request Failed: " + mb.ToString() << endl;
return -2;
}
return 0;
}Key points:
Set the
ALIBABA_CLOUD_ACCESS_KEY_IDandALIBABA_CLOUD_ACCESS_KEY_SECRETenvironment variables before running the code.Each message supports custom properties (
putProperty) and a message key (setMessageKey).If the instance has no namespace, pass an empty string for
instanceId. The SDK selects the correctgetProducerRefoverload automatically.
Subscribe to normal messages
How consumption works
The consumer uses HTTP long polling to retrieve messages:
The consumer sends a request to the broker.
If messages are available, the broker responds immediately.
If no messages are available, the broker holds the request open for up to the specified polling duration (maximum: 30 seconds) and responds as soon as a message arrives.
After processing each batch, the consumer sends an acknowledgment (ACK) to the broker.
If the broker does not receive an ACK before
NextConsumeTime, it redelivers the message.
Consumer parameters
| Parameter | Description | Example value | Maximum |
|---|---|---|---|
| Batch size | Maximum number of messages consumed per request | 3 | 16 |
| Polling duration | How long the broker waits for messages (seconds) | 3 | 30 |
Sample code
Replace these placeholders in addition to those listed in the Send normal messages section:
| Placeholder | Description | Where to find it |
|---|---|---|
${GROUP_ID} | Consumer group ID | Created in the ApsaraMQ for RocketMQ console |
#include <vector>
#include <fstream>
#include "mq_http_sdk/mq_client.h"
#ifdef _WIN32
#include <windows.h>
#else
#include <unistd.h>
#endif
using namespace std;
using namespace mq::http::sdk;
int main() {
MQClient mqClient(
// The HTTP endpoint, found in the HTTP Endpoint section
// of the Instance Details page in the ApsaraMQ for RocketMQ console.
"${HTTP_ENDPOINT}",
// The AccessKey ID and AccessKey secret for authentication.
// Read from environment variables to avoid hardcoding credentials.
System.getenv("ALIBABA_CLOUD_ACCESS_KEY_ID"),
System.getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET")
);
// The topic to consume messages from. Create the topic in the ApsaraMQ for RocketMQ console.
string topic = "${TOPIC}";
// The consumer group ID. Create the consumer group in the ApsaraMQ for RocketMQ console.
string groupId = "${GROUP_ID}";
// The instance ID. If the instance has no namespace, set this to an empty string.
// Check the namespace on the Instance Details page in the console.
string instanceId = "${INSTANCE_ID}";
MQConsumerPtr consumer;
if (instanceId == "") {
consumer = mqClient.getConsumerRef(topic, groupId);
} else {
consumer = mqClient.getConsumerRef(instanceId, topic, groupId, "");
}
do {
try {
std::vector<Message> messages;
// Consume messages in long polling mode.
// If no messages are available, the broker holds the request for the specified
// polling duration and responds immediately when a message arrives.
consumer->consumeMessage(
3, // Batch size: max messages per request. Maximum allowed: 16.
3, // Polling duration in seconds. Maximum allowed: 30.
messages
);
cout << "Consume: " << messages.size() << " Messages!" << endl;
// Process each message and collect receipt handles for acknowledgment.
std::vector<std::string> receiptHandles;
for (std::vector<Message>::iterator iter = messages.begin();
iter != messages.end(); ++iter)
{
cout << "MessageId: " << iter->getMessageId()
<< " PublishTime: " << iter->getPublishTime()
<< " Tag: " << iter->getMessageTag()
<< " Body: " << iter->getMessageBody()
<< " FirstConsumeTime: " << iter->getFirstConsumeTime()
<< " NextConsumeTime: " << iter->getNextConsumeTime()
<< " ConsumedTimes: " << iter->getConsumedTimes()
<< " Properties: " << iter->getPropertiesAsString()
<< " Key: " << iter->getMessageKey() << endl;
receiptHandles.push_back(iter->getReceiptHandle());
}
// Send an ACK to the broker. If the broker does not receive an ACK before
// NextConsumeTime, it redelivers the message. Each receipt handle carries
// a unique timestamp that expires after the NextConsumeTime window.
AckMessageResponse bdmResp;
consumer->ackMessage(receiptHandles, bdmResp);
if (!bdmResp.isSuccess()) {
// Log failed ACK items. A handle that has expired causes ACK failure,
// and the broker redelivers the corresponding message.
const std::vector<AckMessageFailedItem>& failedItems =
bdmResp.getAckMessageFailedItem();
for (std::vector<AckMessageFailedItem>::const_iterator iter = failedItems.begin();
iter != failedItems.end(); ++iter)
{
cout << "AckFailedItem: " << iter->errorCode
<< " " << iter->receiptHandle << endl;
}
} else {
cout << "Ack: " << messages.size() << " messages suc!" << endl;
}
} catch (MQServerException& me) {
if (me.GetErrorCode() == "MessageNotExist") {
cout << "No message to consume! RequestId: " + me.GetRequestId() << endl;
continue;
}
cout << "Request Failed: " + me.GetErrorCode() + ".RequestId: " + me.GetRequestId() << endl;
#ifdef _WIN32
Sleep(2000);
#else
usleep(2000 * 1000);
#endif
} catch (MQExceptionBase& mb) {
cout << "Request Failed: " + mb.ToString() << endl;
#ifdef _WIN32
Sleep(2000);
#else
usleep(2000 * 1000);
#endif
}
} while(true);
}Key points:
The consumer runs in an infinite loop, polling continuously. When no messages are available, it catches
MessageNotExistand retries.After processing a batch, call
ackMessageto confirm receipt. Unacknowledged messages are redelivered afterNextConsumeTime.Each receipt handle carries a unique timestamp. An expired handle causes ACK failure, which triggers redelivery.
On errors other than
MessageNotExist, the consumer pauses for 2 seconds before retrying to avoid tight loops.