Ordered messages, also known as first-in-first-out (FIFO) messages, are provided by Message Queue for Apache RocketMQ. Ordered messages are published and consumed in a strict order. This topic provides sample code to show you how to send and subscribe to ordered messages by using TCP client SDK for C/C++.

Background

Ordered messages are classified into the following types:

  • Globally ordered messages: All messages of a specified topic are published and consumed in first-in-first-out (FIFO) order.
  • Partitionally ordered messages: All messages of a specified topic are distributed to different partitions by using Sharding Keys. The messages in each partition are published and consumed in FIFO order. A Sharding Key is a key field that is used for ordered messages to identify different partitions. The Sharding Key is different from the key of a normal message.

For more information, see Ordered messages 2.0.

Prerequisites

You have completed the following operations:

Send ordered messages

Notice The Message Queue for Apache RocketMQ broker determines the order in which messages are generated based on the order in which the sender uses a single producer to concurrently send messages in a single thread. If the sender uses multiple producers or multiple threads to concurrently send messages, the message order is determined based on the order in which the messages are received by the Message Queue for Apache RocketMQ broker. This order may be different from the sending order on the business side.

The following sample code shows how to send ordered messages:

#include "ONSFactory.h"
#include "ONSClientException.h"
#include <iostream>
using namespace ons;

int main()
{
    // Set the parameters that are required to create and use a producer. 
    ONSFactoryProperty factoryInfo;
    .factoryInfo.setFactoryProperty(ONSFactoryProperty::ProducerId, "XXX");// The ID of the group that you created in the Message Queue for Apache RocketMQ console. 
    factoryInfo.setFactoryProperty(ONSFactoryProperty.NAMESRV_ADDR, "XXX"); // The TCP endpoint of your instance. To obtain the TCP endpoint, log on to the Message Queue for Apache RocketMQ console. In the left-side navigation pane, click Instances. On the Instances page, click the name of your instance. On the Instance Details page, scroll to the Basic Information section and view the TCP endpoint on the Endpoints tab. 
    factoryInfo.setFactoryProperty(ONSFactoryProperty::PublishTopics,"XXX" );// The topic that you created in the Message Queue for Apache RocketMQ console. 
    factoryInfo.setFactoryProperty(ONSFactoryProperty::MsgContent, "XXX");// The message content. 
    factoryInfo.setFactoryProperty(ONSFactoryProperty::AccessKey, "XXX");// The AccessKey ID that you created in the Alibaba Cloud Management Console for identity authentication. 
    factoryInfo.setFactoryProperty(ONSFactoryProperty::SecretKey, "XXX" );// The AccessKey secret that you created in the Alibaba Cloud Management Console for identity authentication. 

    // Create a producer. 
    OrderProducer *pProducer = ONSFactory::getInstance()->createOrderProducer(factoryInfo);


    //Before you send a message, call the start() method to start the producer. You can call the start() method only once. 
    pProducer->start();

    Message msg(
                //Message Topic
                factoryInfo.getPublishTopics(),
                // The message tag, which is similar to a Gmail tag. The message tag is used to sort messages and filter messages for the consumer on the Message Queue for Apache RocketMQ broker based on specified conditions. 
                "TagA",
                // The message body in the binary format. Message Queue for Apache RocketMQ does not process the message body. The producer and the consumer must agree on the serialization and deserialization methods. 
                factoryInfo.getMessageContent()
    );

    // The key of the message. The key is the business-specific attribute of the message and must be globally unique. 
    // A unique key helps you query and resend a message in the Message Queue for Apache RocketMQ console if the message fails to be received. 
    // Note: Messages can be sent and received even if you do not specify the message key. 
    msg.setKey("ORDERID_100");
    // The key field that is used to identify partitions for partitionally ordered messages. 
    // This field can be set to a non-empty string for globally ordered messages. 
    std::string shardingKey = "abc";  
    // Messages that have the same Sharding Key are sent in order. 
    try
    {
        // Send the message. If no exception is thrown, the message is sent. 
        SendResultONS sendResult = pProducer->send(msg, shardingKey);
    std::cout << "send success" << std::endl;
    }
    catch(ONSClientException & e)
    {
        // Add the exception handling operation. 
    }
    // Before you exit your application, shut down the producer. If you do not shut down the producer, issues such as memory leaks may occur. 
    pProducer->shutdown();

    return 0;
}           

Subscribe to ordered messages

The following sample code shows how to subscribe to ordered messages:

#include "ONSFactory.h"
using namespace std;
using namespace ons;

// Create a consumer instance. 
//After pushConsumer pulls the message, pushConsumer calls the consumeMessage function of the instance. 
class ONSCLIENT_API MyMsgListener : public MessageOrderListener
{
public:
    MyMsgListener()
    {
    }
    virtual ~MyMsgListener()
    {
    }

    virtual OrderAction consume(Message &message, ConsumeOrderContext &context)
    {
        // Consume messages based on business requirements. 
        return Success; //CONSUME_SUCCESS;
    }
};


int main(int argc, char* argv[])
{
    // Set the parameters that are required to create and use orderConsumer. 
    ONSFactoryProperty factoryInfo;
    factoryInfo.setFactoryProperty(ONSFactoryProperty::ConsumerId, "XXX");// The ID of the group that you created in the Message Queue for Apache RocketMQ console. 
    factoryInfo.setFactoryProperty(ONSFactoryProperty::PublishTopics,"XXX" );// The topic that you created in the Message Queue for Apache RocketMQ console. 
    factoryInfo.setFactoryProperty(ONSFactoryProperty::AccessKey, "XXX");// The AccessKey ID that you created in the Alibaba Cloud Management Console for identity authentication. 
    factoryInfo.setFactoryProperty(ONSFactoryProperty::SecretKey,  "XXX");// The AccessKey secret that you created in the Alibaba Cloud Management Console for identity authentication. 
    factoryInfo.setFactoryProperty(ONSFactoryProperty::NAMESRV_ADDR, "XXX");// The TCP endpoint of your instance. To obtain the TCP endpoint, log on to the Message Queue for Apache RocketMQ console. In the left-side navigation pane, click Instances. On the Instances page, click the name of your instance. On the Instance Details page, scroll to the Basic Information section and view the TCP endpoint on the Endpoints tab. 


    // Create orderConsumer. 
    OrderConsumer* orderConsumer = ONSFactory::getInstance()->createOrderConsumer(factoryInfo);
    MyMsgListener  msglistener;
    // Specify the message topic and tag to which orderConsumer subscribes. 
    orderConsumer->subscribe(factoryInfo.getPublishTopics(), "*",&msglistener );

    // Register the instance to listen to messages. After orderConsumer pulls the messages, orderConsumer calls the consumeMessage function of the message listening class. 

    //Start orderConsumer. 
    orderConsumer->start();

    for(volatile int i = 0; i < 1000000000; ++i) {
        //wait
    }

    // Shut down orderConsumer. Before you exit the application, shut down orderConsumer. If you do not shut down orderConsumer, issues such as memory leaks may occur. 
    orderConsumer->shutdown();

   return 0;
}