All Products
Search
Document Center

ApsaraMQ for RocketMQ:Send and receive ordered messages

Last Updated:Aug 17, 2023

Ordered messages are a type of message provided by ApsaraMQ for RocketMQ. Ordered messages are published and consumed in strict first-in-first-out order. This topic provides sample code on how to send and receive ordered messages by using the TCP client SDK for C or C++.

Background information

Ordered messages are classified into the following types:

  • Globally ordered messages: All messages of a specified topic are published and consumed in first-in-first-out (FIFO) order.
  • Partitionally ordered messages: All messages of a specified topic are distributed to different partitions by using sharding keys. The messages in each partition are published and consumed in FIFO order. A sharding key is a key field that is used for ordered messages to identify different partitions. A sharding key is different from the key of a normal message.

For more information, see Ordered messages.

Prerequisites

Make sure that the following operations are performed:

Send ordered messages

Important

An ApsaraMQ for RocketMQ broker determines the order in which messages are generated based on the order in which the sender uses a single producer or thread to send messages. If the sender uses multiple producers or threads to concurrently send messages, the message order is determined by the order in which the messages are received by the ApsaraMQ for RocketMQ broker. This order may be different from the sending order on the business side.

The following sample code provides an example on how to send ordered messages by using the TCP client SDK for C or C++:

#include "ONSFactory.h"
#include "ONSClientException.h"
#include <iostream>
using namespace ons;

int main()
{
    // The parameters that are required to create and use a producer. 
    ONSFactoryProperty factoryInfo;
    // The ID of the consumer group that you created in the ApsaraMQ for RocketMQ console. 
    factoryInfo.setFactoryProperty(ONSFactoryProperty::ProducerId, "XXX");
    // The TCP endpoint. You can obtain the endpoint in the TCP Endpoint section of the Instance Details page in the ApsaraMQ for RocketMQ console. 
    factoryInfo.setFactoryProperty(ONSFactoryProperty::NAMESRV_ADDR, "XXX"); 
    // The topic that you created in the ApsaraMQ for RocketMQ console. 
    factoryInfo.setFactoryProperty(ONSFactoryProperty::PublishTopics,"XXX" );
    // The message content. 
    factoryInfo.setFactoryProperty(ONSFactoryProperty::MsgContent, "XXX");
    // Make sure that the environment variables ALIBABA_CLOUD_ACCESS_KEY_ID and ALIBABA_CLOUD_ACCESS_KEY_SECRET are configured. 
    // The AccessKey ID that is used for authentication. 
    factoryInfo.setFactoryProperty(ONSFactoryProperty::AccessKey, getenv("ALIBABA_CLOUD_ACCESS_KEY_ID"));
		// The AccessKey secret that is used for authentication. 
    factoryInfo.setFactoryProperty(ONSFactoryProperty::SecretKey, getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET"));

    // Create the producer. 
    OrderProducer *pProducer = ONSFactory::getInstance()->createOrderProducer(factoryInfo);


    // Before you send the message, call the start() method only once to start the producer. 
    pProducer->start();

    Message msg(
                //Message Topic
                factoryInfo.getPublishTopics(),
                // The message tag. A message tag is similar to a Gmail tag and is used by consumers to filter messages in the ApsaraMQ for RocketMQ broker. 
                "TagA",
                // The message body. A message body is data in the binary format. ApsaraMQ for RocketMQ does not process message bodies. The producer and the consumer must agree on the serialization and deserialization methods. 
                factoryInfo.getMessageContent()
    );

    // The message key. A key is the business-specific attribute of a message and must be globally unique. 
    // If you cannot receive a message as expected, you can use the key to query the message in the ApsaraMQ for RocketMQ console. 
    // Note: You can send and receive a message even if you do not specify the key. 
    msg.setKey("ORDERID_100");
    // The key field that is used to identify partitions for partitionally ordered messages. 
    // This field can be set to a non-empty string for globally ordered messages. 
    std::string shardingKey = "abc";  
    // Messages that have the same sharding key are sent in order. 
    try
    {
        // Send the message. If no exception is thrown, the message is sent. 
        SendResultONS sendResult = pProducer->send(msg, shardingKey);
    std::cout << "send success" << std::endl;
    }
    catch(ONSClientException & e)
    {
        // Specify the logic to handle errors. 
    }
    // Before you exit your application, destroy the producer. If you do not destroy the producer, issues such as memory leaks may occur. 
    pProducer->shutdown();

    return 0;
}           

Subscribe to ordered messages

The following sample code provides an example on how to subscribe to ordered messages by using the TCP client SDK for C or C++:

#include "ONSFactory.h"
using namespace std;
using namespace ons;

// Create the consumer instance. 
//After the push consumer pulls the message, the push consumer calls the consumeMessage function of the instance. 
class ONSCLIENT_API MyMsgListener : public MessageOrderListener
{
public:
    MyMsgListener()
    {
    }
    virtual ~MyMsgListener()
    {
    }

    virtual OrderAction consume(Message &message, ConsumeOrderContext &context)
    {
        // Consume messages based on your business requirements. 
        return Success; //CONSUME_SUCCESS;
    }
};


int main(int argc, char* argv[])
{
    // The parameters that are required to create and use the consumer. 
    ONSFactoryProperty factoryInfo;
    // The ID of the consumer group that you created in the ApsaraMQ for RocketMQ console. 
    factoryInfo.setFactoryProperty(ONSFactoryProperty::ConsumerId, "XXX");
    // The topic that you created in the ApsaraMQ for RocketMQ console. 
    factoryInfo.setFactoryProperty(ONSFactoryProperty::PublishTopics,"XXX" );
    // Make sure that the environment variables ALIBABA_CLOUD_ACCESS_KEY_ID and ALIBABA_CLOUD_ACCESS_KEY_SECRET are configured. 
    // The AccessKey ID that is used for authentication. 
    factoryInfo.setFactoryProperty(ONSFactoryProperty::AccessKey, getenv("ALIBABA_CLOUD_ACCESS_KEY_ID"));
		// The AccessKey secret that is used for authentication. 
    factoryInfo.setFactoryProperty(ONSFactoryProperty::SecretKey, getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET"));
    // The TCP endpoint. You can obtain the endpoint in the TCP Endpoint section of the Instance Details page in the ApsaraMQ for RocketMQ console. 
    factoryInfo.setFactoryProperty(ONSFactoryProperty::NAMESRV_ADDR, "XXX");


    // Create the consumer. 
    OrderConsumer* orderConsumer = ONSFactory::getInstance()->createOrderConsumer(factoryInfo);
    MyMsgListener  msglistener;
    // The message topic and tag to which the consumer subscribes. 
    orderConsumer->subscribe(factoryInfo.getPublishTopics(), "*",&msglistener );

    // Register the instance to listen to messages. After the consumer pulls the messages, the consumer calls the consumeMessage function of the message listening class. 

    //Start the consumer. 
    orderConsumer->start();

    for(volatile int i = 0; i < 1000000000; ++i) {
        //wait
    }

    // Destroy the consumer. Before you exit the application, destroy the consumer. Otherwise, issues such as memory leaks may occur. 
    orderConsumer->shutdown();

   return 0;
}