Ordered messages, which are also known as first in, first out (FIFO) messages, are a type of messages provided by Message Queue for Apache RocketMQ. The messages are published and consumed in a strict order. This topic provides sample code that shows how to use the HTTP client SDK for C++ to send and subscribe to ordered messages.
Background
Ordered messages are classified into the following types:
- Globally ordered messages in a topic are consumed in a first in, first out (FIFO) manner.
- Partitionally ordered messages in a topic are distributed to different partitions by using sharding keys. The messages in each partition are consumed in a FIFO manner. A sharding key is a key field that is used for ordered messages to identify different partitions. The sharding Key of an ordered message is different from the message key of a normal message.
For more information, see Ordered message.
Prerequisites
The following operations are performed:
- The SDK for C++ is installed. For more information, see Prepare the environment.
- Resources, such as instances, topics, and groups, that you want to specify in the code, are created in the Message Queue for Apache RocketMQ console. For more information, see Create resources.
Send ordered messages
Important A Message Queue for Apache RocketMQ broker determines the order in which messages are generated based on the order in
which the sender uses a single producer or thread to send messages. If the sender
uses multiple producers or multiple threads to concurrently send messages, the message
order is determined based on the order in which the messages are received by the Message Queue for Apache RocketMQ broker. This order may be different from the sending order on the business side.
The following sample code shows how to send ordered messages:
//#include <iostream>
#include <fstream>
#include <time.h>
#include "mq_http_sdk/mq_client.h"
using namespace std;
using namespace mq::http::sdk;
int main() {
MQClient mqClient(
// The HTTP endpoint. To obtain the HTTP endpoint, log on to the Message Queue for Apache RocketMQ console. In the left-side navigation pane, click Instances. On the Instances page, click the name of your instance. The endpoint is displayed on the Endpoints tab of the Instance Details page.
"${HTTP_ENDPOINT}",
// The AccessKey ID is used as an identifier of Alibaba Cloud service users. For more information, see Create an AccessKey pair.
"${ACCESS_KEY}",
// The AccessKey Secret is used to verify the identity of Alibaba Cloud service users. For more information, see Create an AccessKey pair.
"${SECRET_KEY}"
);
// The topic of the message. The topic is created in the Message Queue for Apache RocketMQ console.
string topic = "${TOPIC}";
// The ID of the instance to which the topic belongs. The instance is created in the Message Queue for Apache RocketMQ console.
// If the instance has a namespace, specify the ID of the instance. If the instance does not have a namespace, set the instance ID to null or an empty string. You can check whether your instance has a namespace on the Instance Details page in the Message Queue for Apache RocketMQ console.
string instanceId = "${INSTANCE_ID}";
MQProducerPtr producer;
if (instanceId == "") {
producer = mqClient.getProducerRef(topic);
} else {
producer = mqClient.getProducerRef(instanceId, topic);
}
try {
// Cyclically send four messages.
for (int i = 0; i < 8; i++)
{
PublishMessageResponse pmResp;
// The content of the message.
TopicMessage pubMsg("Hello, mq!order msg!");
// The Sharding Key that is used to distribute ordered messages to a specific partition. The Sharding Key is used to identify a partition. A Sharding Key is different from a message key.
pubMsg.setShardingKey(std::to_string(i % 2));
// The custom attributes of the message.
pubMsg.putProperty("a",std::to_string(i));
producer->publishMessage(pubMsg, pmResp);
cout << "Publish mq message success. Topic is: " << topic
<< ", msgId is:" << pmResp.getMessageId()
<< ", bodyMD5 is:" << pmResp.getMessageBodyMD5() << endl;
}
} catch (MQServerException& me) {
cout << "Request Failed: " + me.GetErrorCode() << ", requestId is:" << me.GetRequestId() << endl;
return -1;
} catch (MQExceptionBase& mb) {
cout << "Request Failed: " + mb.ToString() << endl;
return -2;
}
return 0;
}
Subscribe to ordered messages
The following sample code shows how to subscribe to ordered messages:
#include <vector>
#include <fstream>
#include "mq_http_sdk/mq_client.h"
#ifdef _WIN32
#include <windows.h>
#else
#include <unistd.h>
#endif
using namespace std;
using namespace mq::http::sdk;
int main() {
MQClient mqClient(
// The HTTP endpoint. To obtain the HTTP endpoint, log on to the Message Queue for Apache RocketMQ console. In the left-side navigation pane, click Instances. On the Instances page, click the name of your instance. The endpoint is displayed on the Endpoints tab of the Instance Details page.
"${HTTP_ENDPOINT}",
// The AccessKey ID is used as an identifier of Alibaba Cloud service users. For more information, see Create an AccessKey pair.
"${ACCESS_KEY}",
// The AccessKey Secret is used to verify the identity of Alibaba Cloud service users. For more information, see Create an AccessKey pair.
"${SECRET_KEY}"
);
// The topic of the message. The topic is created in the Message Queue for Apache RocketMQ console.
string topic = "${TOPIC}";
// The ID of the group that you created in the Message Queue for Apache RocketMQ console.
string groupId = "${GROUP_ID}";
// The ID of the instance to which the topic belongs. The instance is created in the Message Queue for Apache RocketMQ console.
// If the instance has a namespace, specify the ID of the instance. If the instance does not have a namespace, set the instance ID to null or an empty string. You can check whether your instance has a namespace on the Instance Details page in the Message Queue for Apache RocketMQ console.
string instanceId = "${INSTANCE_ID}";
MQConsumerPtr consumer;
if (instanceId == "") {
consumer = mqClient.getConsumerRef(topic, groupId);
} else {
consumer = mqClient.getConsumerRef(instanceId, topic, groupId, "");
}
do {
try {
std::vector<Message> messages;
// Consume messages in long polling mode. The consumer may pull partitionally ordered messages from multiple partitions. The consumer consumes messages from the same partition in the order in which the messages are sent.
// A consumer pulls partitionally ordered messages from a partition. If the broker does not receive the acknowledgment (ACK) for a message, the consumer consumes the message again.
// The consumer can consume the next batch of messages from a partition only after all messages that are pulled from the partition in the previous batch are acknowledged when they are consumed.
// In long polling mode, if no message is available for consumption in the topic, requests are suspended on the broker for a specified period of time. If a message becomes available for consumption during this time period, the broker immediately sends a response to the consumer. In this example, the time period is set to 3 seconds.
consumer->consumeMessageOrderly(
3, // The maximum number of messages that can be consumed at a time. In this example, the value is set to 3. The maximum value of this parameter is 16.
3, // The duration of a long polling cycle. Unit: seconds. In this example, the value is set to 3. The maximum value that you can specify is 30.
messages
);
cout << "Consume: " << messages.size() << " Messages!" << endl;
// Specify the message consumption logic.
std::vector<std::string> receiptHandles;
for (std::vector<Message>::iterator iter = messages.begin();
iter != messages.end(); ++iter)
{
cout << "MessageId: " << iter->getMessageId()
<< " PublishTime: " << iter->getPublishTime()
<< " Tag: " << iter->getMessageTag()
<< " Body: " << iter->getMessageBody()
<< " FirstConsumeTime: " << iter->getFirstConsumeTime()
<< " NextConsumeTime: " << iter->getNextConsumeTime()
<< " ConsumedTimes: " << iter->getConsumedTimes()
<< " Properties: " << iter->getPropertiesAsString()
<< " ShardingKey: " << iter->getShardingKey() << endl;
receiptHandles.push_back(iter->getReceiptHandle());
}
// Obtain an ACK from the consumer.
// If the broker does not receive an ACK from the consumer before the period of time that is specified by the Message.NextConsumeTime parameter elapses, the message is consumed again.
// A unique timestamp is specified for the handle of a message each time the message is consumed.
AckMessageResponse bdmResp;
consumer->ackMessage(receiptHandles, bdmResp);
if (!bdmResp.isSuccess()) {
// The broker may fail to receive an ACK for a message from the consumer if the handle of the message times out.
const std::vector<AckMessageFailedItem>& failedItems =
bdmResp.getAckMessageFailedItem();
for (std::vector<AckMessageFailedItem>::const_iterator iter = failedItems.begin();
iter != failedItems.end(); ++iter)
{
cout << "AckFailedItem: " << iter->errorCode
<< " " << iter->receiptHandle << endl;
}
} else {
cout << "Ack: " << messages.size() << " messages suc!" << endl;
}
} catch (MQServerException& me) {
if (me.GetErrorCode() == "MessageNotExist") {
cout << "No message to consume! RequestId: " + me.GetRequestId() << endl;
continue;
}
cout << "Request Failed: " + me.GetErrorCode() + ".RequestId: " + me.GetRequestId() << endl;
#ifdef _WIN32
Sleep(2000);
#else
usleep(2000 * 1000);
#endif
} catch (MQExceptionBase& mb) {
cout << "Request Failed: " + mb.ToString() << endl;
#ifdef _WIN32
Sleep(2000);
#else
usleep(2000 * 1000);
#endif
}
} while(true);
}