Ordered messages, also known as first-in-first-out (FIFO) messages, are a type of message provided by Message Queue for Apache RocketMQ, which are delivered and consumed in a strict order. This topic provides the sample code for sending and subscribing to ordered messages through the C/C++ SDK over TCP.

Prerequisites

Ordered messages are classified into the following types:

  • Globally ordered messages: All messages of the specified topic are delivered and consumed strictly in the FIFO order.
  • Partitionally ordered messages: All messages of the specified topic are partitioned by the shard key. Messages in one shard are published and consumed strictly in FIFO order. A shard key is a key field that is used in ordered messages to distinguish different shards. It is completely different from the key used in normal messages.

For more information, see Ordered messages.

Send ordered messages

Note For more information about the sample code, see Message Queue for Apache RocketMQ code library.

The sample code for sending ordered messages is as follows:

#include "ONSFactory.h"
#include "ONSClientException.h"
#include <iostream>
using namespace ons;

int main()
{
    // The parameter required for the creation and normal operations of a producer, which must be set.
    ONSFactoryProperty factoryInfo = new ONSFactoryProperty();
    factoryInfo.setFactoryProperty(ONSFactoryProperty::ProducerId, "XXX");//The group ID you created in the console.
    factoryInfo.setFactoryProperty(ONSFactoryProperty::NAMESRV_ADDR, "XXX"); //The TCP endpoint. Go to the Instances page in the Message Queue for Apache RocketMQ console, and view the endpoint in the Endpoint Information section.
    factoryInfo.setFactoryProperty(ONSFactoryProperty::PublishTopics,"XXX" );//The topic you created in the console.
    factoryInfo.setFactoryProperty(ONSFactoryProperty::MsgContent, "XXX");//The message content.
    factoryInfo.setFactoryProperty(ONSFactoryProperty::AccessKey, "XXX");//The AccessKey ID you created in the Alibaba Cloud console for identity authentication.
    factoryInfo.setFactoryProperty(ONSFactoryProperty::SecretKey, "XXX" );//The AccessKey secret you created in the Alibaba Cloud console for identity authentication.

    // Create a producer instance.
    OrderProducer *pProducer = ONSFactory::getInstance()->createOrderProducer(factoryInfo);


    // Before sending a message, call the start method once to start the producer.
    pProducer->start();

    Message msg(
                // The message topic.
                factoryInfo.getPublishTopics(),
                // The message tag, which is similar to a Gmail tag. It is used to sort messages, enabling the consumer to filter messages on the Message Queue for Apache RocketMQ broker based on the specified criteria.
                "TagA",
                // The message body in any binary format. Message Queue for Apache RocketMQ does not process the message body. The producer and consumer must negotiate consistent serialization and deserialization methods.
                factoryInfo.getMessageContent()
    );

    // The message key, which must be globally unique.
    // A unique identifier enables you to query a message and resend it in the console if you fail to receive the message.
    // Note: Messages can still be sent and received even if this attribute is not set.
    msg.setKey("ORDERID_100");
    // The key field that is used to distinguish different shards in a partially ordered message.
    // This field can be set to any non-empty string for globally ordered messages.
  std::string shardingKey = "abc";  //Messages with the same shard key are sent in sequence.
    try
    {
        // The message sending result, which is successful if no exception occurs.
        SendResultONS sendResult = pProducer->send(msg, shardingKey);
    std::cout << "send success" << std::endl;
    }
    catch(ONSClientException & e)
    {
        // Handle exceptions.
    }
    // Destroy the producer object before exiting the application. Otherwise, memory leakage may occur.
    pProducer->shutdown();

    return 0;
}        

Subscribe to ordered messages

The sample code for subscribing to ordered messages is as follows:

#include "ONSFactory.h"
using namespace std;
using namespace ons;

// Create an instance for message consumption.
// After pulling the message, pushConsumer proactively calls the consumeMessage function of the instance.
class ONSCLIENT_API MyMsgListener : public MessageOrderListener
{
public:
    MyMsgListener()
    {
    }
    virtual ~MyMsgListener()
    {
    }

    virtual OrderAction consume(Message &message, ConsumeOrderContext &context)
    {
            // Consume messages based on business requirements.
        return Success; //CONSUME_SUCCESS;
    }
};


int main(int argc, char* argv[])
{
    // The parameter required for the creation and operation of OrderConsumer, which must be set.
    ONSFactoryProperty factoryInfo;
    factoryInfo.setFactoryProperty(ONSFactoryProperty::ConsumerId, "XXX");//The group ID you created in the console.
    factoryInfo.setFactoryProperty(ONSFactoryProperty::PublishTopics,"XXX" );//The topic you created in the console.
    factoryInfo.setFactoryProperty(ONSFactoryProperty::AccessKey, "XXX");//The AccessKey ID you created in the Alibaba Cloud console for identity authentication.
    factoryInfo.setFactoryProperty(ONSFactoryProperty::SecretKey,  "XXX");//The AccessKey secret you created in the Alibaba Cloud console for identity authentication.
    factoryInfo.setFactoryProperty(ONSFactoryProperty::NAMESRV_ADDR, "XXX");//The TCP endpoint. Go to the Instances page in the Message Queue for Apache RocketMQ console, and view the endpoint in the Endpoint Information section.


    // Create orderConsumer.
    OrderConsumer* orderConsumer = ONSFactory::getInstance()->createOrderConsumer(factoryInfo);
    MyMsgListener  msglistener;
    // Specify the message topic and tag that orderConsumer subscribes to.
    orderConsumer->subscribe(factoryInfo.getPublishTopics(), "*",&msglistener );

    // Register the processing instance for message listening. After pulling the message, orderConsumer calls the consumeMessage function.

    // Start orderConsumer.
    orderConsumer->start();

    for(volatile int i = 0; i < 1000000000; ++i) {
        // Wait.
    }

    // Destroy orderConsumer. Before exiting the application, the consumer object must be destroyed. Otherwise, memory leakage and other problems may occur.
    orderConsumer->shutdown();

   return 0;
}