All Products
Search
Document Center

ApsaraMQ for RocketMQ:Send and receive messages with the RocketMQ 3.x/4.x SDK for C++

Last Updated:Mar 11, 2026

ApsaraMQ for RocketMQ 5.x instances are compatible with RocketMQ 3.x and 4.x SDK clients. This topic provides C++ sample code for sending and receiving normal, ordered, scheduled/delayed, and transactional messages.

Important
  • We recommend that you use the latest RocketMQ 5.x SDKs. These SDKs are fully compatible with ApsaraMQ for RocketMQ 5.x brokers and provides more functions and enhanced features. For more information, see Version Guide.

  • Alibaba Cloud only maintains RocketMQ 4.x, 3.x, and TCP client SDKs. We recommend that you use them only for existing business.

Prerequisites

Before you begin, make sure that you have:

  • The C++ dynamic library installed. For details, see Install the C++ dynamic library

  • A topic and a group ID created in the ApsaraMQ for RocketMQ console

  • The instance endpoint obtained from the ApsaraMQ for RocketMQ console

Common configuration

All examples in this topic share the same connection setup. Replace the following placeholders with your values before running the code.

PlaceholderDescriptionExample
<your-group-id>Group ID created in the consoleGID_example
<your-access-point>Instance endpoint in host:port format. Do not include http:// or https://, and do not use a resolved IP address.rmq-cn-xxx.rmq.aliyuncs.com:8080
<your-topic>Topic created in the consolenormal_topic_01
<instance-username>Instance username from the Intelligent Authentication tab on the Access Control pageN/A
<instance-password>Instance password from the Intelligent Authentication tab on the Access Control pageN/A

Authentication:

  • Internet access: Username and password required.

  • VPC access: No username or password needed.

  • Serverless instance over the Internet: Username and password required.

  • Serverless instance in a VPC with authentication-free access enabled: No username or password needed.

Important

Do not specify the instance ID when you use the RocketMQ 3.x or 4.x SDK for C++ with an ApsaraMQ for RocketMQ 5.0 instance. Specifying it causes connection failures.

Common producer setup:

#include <iostream>
#include <chrono>
#include <thread>
#include "DefaultMQProducer.h"

using namespace std;
using namespace rocketmq;

// Initialize the producer with your group ID
DefaultMQProducer producer("<your-group-id>");

// Set the endpoint (domain:port only, no http/https prefix)
producer.setNamesrvAddr("<your-access-point>");

// Set credentials (required for Internet access; skip for VPC access)
producer.setSessionCredentials("<instance-username>", "<instance-password>", "ALIYUN");

producer.start();

Common consumer setup:

#include <iostream>
#include <thread>
#include "DefaultMQPushConsumer.h"

using namespace rocketmq;

// Initialize the push consumer with your group ID
DefaultMQPushConsumer *consumer = new DefaultMQPushConsumer("<your-group-id>");

// Set the endpoint (domain:port only, no http/https prefix)
consumer->setNamesrvAddr("<your-access-point>");

// Set credentials (required for Internet access; skip for VPC access)
consumer->setSessionCredentials("<instance-username>", "<instance-password>", "ALIYUN");
Important

Before you start a consumer:

  • Configure all subscriptions before calling start().

  • Make sure that all consumers in the same group use identical subscriptions.

Normal messages

Use normal messages for general-purpose messaging that does not require ordering, delayed delivery, or transaction support.

Send normal messages

#include <iostream>
#include <chrono>
#include <thread>
#include "DefaultMQProducer.h"

using namespace std;
using namespace rocketmq;

int main() {
    cout << "=======Before sending messages=======" << endl;

    DefaultMQProducer producer("<your-group-id>");
    producer.setNamesrvAddr("<your-access-point>");
    producer.setSessionCredentials("<instance-username>", "<instance-password>", "ALIYUN");
    producer.start();

    auto start = chrono::system_clock::now();
    int count = 32;

    for (int i = 0; i < count; ++i) {
        MQMessage msg("<your-topic>", "HiTAG", "HelloCPPSDK.");
        try {
            SendResult sendResult = producer.send(msg);
            cout << "SendResult:" << sendResult.getSendStatus()
                 << ", Message ID: " << sendResult.getMsgId() << endl;
            this_thread::sleep_for(chrono::seconds(1));
        } catch (MQException& e) {
            cout << "ErrorCode: " << e.GetError()
                 << " Exception:" << e.what() << endl;
        }
    }

    auto interval = chrono::system_clock::now() - start;
    cout << "Send " << count << " messages OK, costs "
         << chrono::duration_cast<chrono::milliseconds>(interval).count()
         << "ms" << endl;

    producer.shutdown();
    cout << "=======After sending messages=======" << endl;
    return 0;
}

Subscribe to normal messages

#include <iostream>
#include <thread>
#include "DefaultMQPushConsumer.h"

using namespace rocketmq;

class ExampleMessageListener : public MessageListenerConcurrently {
public:
    ConsumeStatus consumeMessage(const std::vector<MQMessageExt> &msgs) {
        for (auto item = msgs.begin(); item != msgs.end(); item++) {
            std::cout << "Received Message Topic:" << item->getTopic()
                      << ", MsgId:" << item->getMsgId() << std::endl;
        }
        return CONSUME_SUCCESS;
    }
};

int main(int argc, char *argv[]) {
    std::cout << "=======Before consuming messages=======" << std::endl;

    DefaultMQPushConsumer *consumer = new DefaultMQPushConsumer("<your-group-id>");
    consumer->setNamesrvAddr("<your-access-point>");
    consumer->setSessionCredentials("<instance-username>", "<instance-password>", "ALIYUN");

    // Register the listener and subscribe to the topic
    ExampleMessageListener *messageListener = new ExampleMessageListener();
    consumer->subscribe("<your-topic>", "*");
    consumer->registerMessageListener(messageListener);

    // Start after subscriptions are configured
    consumer->start();

    // Keep the main thread alive
    std::this_thread::sleep_for(std::chrono::milliseconds(60 * 1000));
    consumer->shutdown();
    std::cout << "=======After consuming messages======" << std::endl;
    return 0;
}

Ordered messages

Use ordered messages when messages within the same business entity must be processed in sequence, such as order status updates or inventory changes.

Ordered messages route messages with the same partition key to the same queue through a MessageQueueSelector, which guarantees processing order.

Send ordered messages

#include <iostream>
#include <chrono>
#include <thread>
#include "DefaultMQProducer.h"

using namespace std;
using namespace rocketmq;

class ExampleSelectMessageQueueByHash : public MessageQueueSelector {
public:
    MQMessageQueue select(const std::vector<MQMessageQueue> &mqs,
                          const MQMessage &msg, void *arg) {
        // Route messages to a queue based on the partition key
        int orderId = *static_cast<int *>(arg);
        int index = orderId % mqs.size();
        return mqs[index];
    }
};

int main() {
    cout << "=======Before sending messages=======" << endl;

    DefaultMQProducer producer("<your-group-id>");
    producer.setNamesrvAddr("<your-access-point>");
    producer.setSessionCredentials("<instance-username>", "<instance-password>", "ALIYUN");
    producer.start();

    auto start = chrono::system_clock::now();
    int count = 32;

    ExampleSelectMessageQueueByHash *pSelector = new ExampleSelectMessageQueueByHash();
    for (int i = 0; i < count; ++i) {
        MQMessage msg("<your-topic>", "HiTAG", "Hello,CPP SDK, Orderly Message.");
        try {
            // Pass the partition key (i) and the queue selector
            SendResult sendResult = producer.send(msg, pSelector, &i, 1, false);
            cout << "SendResult:" << sendResult.getSendStatus()
                 << ", Message ID: " << sendResult.getMsgId()
                 << " MessageQueue:" << sendResult.getMessageQueue().toString()
                 << endl;
            this_thread::sleep_for(chrono::seconds(1));
        } catch (MQException& e) {
            cout << "ErrorCode: " << e.GetError()
                 << " Exception:" << e.what() << endl;
        }
    }

    auto interval = chrono::system_clock::now() - start;
    cout << "Send " << count << " messages OK, costs "
         << chrono::duration_cast<chrono::milliseconds>(interval).count()
         << "ms" << endl;

    producer.shutdown();
    cout << "=======After sending messages=======" << endl;
    return 0;
}

Subscribe to ordered messages

The consumer uses MessageListenerOrderly instead of MessageListenerConcurrently to process messages in order within each queue.

#include <iostream>
#include <thread>
#include "DefaultMQPushConsumer.h"

using namespace rocketmq;

class ExampleOrderlyMessageListener : public MessageListenerOrderly {
public:
    ConsumeStatus consumeMessage(const std::vector<MQMessageExt> &msgs) {
        for (auto item = msgs.begin(); item != msgs.end(); item++) {
            std::cout << "Received Message Topic:" << item->getTopic()
                      << ", MsgId:" << item->getMsgId() << std::endl;
        }
        return CONSUME_SUCCESS;
    }
};

int main(int argc, char *argv[]) {
    std::cout << "=======Before consuming messages=======" << std::endl;

    DefaultMQPushConsumer *consumer = new DefaultMQPushConsumer("<your-group-id>");
    consumer->setNamesrvAddr("<your-access-point>");
    consumer->setSessionCredentials("<instance-username>", "<instance-password>", "ALIYUN");

    // Register the orderly listener and subscribe to the topic
    ExampleOrderlyMessageListener *messageListener = new ExampleOrderlyMessageListener();
    consumer->subscribe("<your-topic>", "*");
    consumer->registerMessageListener(messageListener);

    // Start after subscriptions are configured
    consumer->start();

    // Keep the main thread alive
    std::this_thread::sleep_for(std::chrono::seconds(60));
    consumer->shutdown();
    std::cout << "=======After consuming messages======" << std::endl;
    return 0;
}

Scheduled and delayed messages

Use scheduled or delayed messages to deliver messages at a specific time or after a delay, for example, timed notifications, retry mechanisms, or deferred task execution.

Set the __STARTDELIVERTIME message property to a Unix timestamp in milliseconds. The broker holds the message until the specified time.

  • If the timestamp is in the future, the broker delivers the message at that time.

  • If the timestamp is in the past, the broker delivers the message immediately.

Send scheduled or delayed messages

#include <iostream>
#include <chrono>
#include <thread>
#include "DefaultMQProducer.h"

using namespace std;
using namespace rocketmq;

int main() {
    cout << "=======Before sending messages=======" << endl;

    DefaultMQProducer producer("<your-group-id>");
    producer.setNamesrvAddr("<your-access-point>");
    producer.setSessionCredentials("<instance-username>", "<instance-password>", "ALIYUN");
    producer.start();

    auto start = chrono::system_clock::now();
    int count = 32;

    for (int i = 0; i < count; ++i) {
        MQMessage msg("<your-topic>", "HiTAG", "Hello,CPP SDK, Delay Message.");

        // Calculate a delivery time 10 seconds from now
        chrono::system_clock::duration d = chrono::system_clock::now().time_since_epoch();
        chrono::milliseconds mil = chrono::duration_cast<chrono::milliseconds>(d);
        long exp = mil.count() + 10000;  // Delay: 10,000 ms (10 seconds)
        msg.setProperty("__STARTDELIVERTIME", to_string(exp));

        cout << "Now: " << mil.count() << " Exp:" << exp << endl;
        try {
            SendResult sendResult = producer.send(msg);
            cout << "SendResult:" << sendResult.getSendStatus()
                 << ", Message ID: " << sendResult.getMsgId() << endl;
            this_thread::sleep_for(chrono::seconds(1));
        } catch (MQException& e) {
            cout << "ErrorCode: " << e.GetError()
                 << " Exception:" << e.what() << endl;
        }
    }

    auto interval = chrono::system_clock::now() - start;
    cout << "Send " << count << " messages OK, costs "
         << chrono::duration_cast<chrono::milliseconds>(interval).count()
         << "ms" << endl;

    producer.shutdown();
    cout << "=======After sending messages=======" << endl;
    return 0;
}

Subscribe to scheduled or delayed messages

The consumer uses MessageListenerConcurrently, the same listener type as normal messages. The broker handles timing; consumer code is identical.

#include <iostream>
#include <thread>
#include <chrono>
#include "DefaultMQPushConsumer.h"

using namespace rocketmq;
using namespace std;

class ExampleDelayMessageListener : public MessageListenerConcurrently {
public:
    ConsumeStatus consumeMessage(const std::vector<MQMessageExt> &msgs) {
        for (auto item = msgs.begin(); item != msgs.end(); item++) {
            chrono::system_clock::duration d = chrono::system_clock::now().time_since_epoch();
            chrono::milliseconds mil = chrono::duration_cast<chrono::milliseconds>(d);
            cout << "Now: " << mil.count()
                 << " Received Message Topic:" << item->getTopic()
                 << ", MsgId:" << item->getMsgId()
                 << " DelayTime:" << item->getProperty("__STARTDELIVERTIME")
                 << endl;
        }
        return CONSUME_SUCCESS;
    }
};

int main(int argc, char *argv[]) {
    cout << "=======Before consuming messages=======" << endl;

    DefaultMQPushConsumer *consumer = new DefaultMQPushConsumer("<your-group-id>");
    consumer->setNamesrvAddr("<your-access-point>");
    consumer->setSessionCredentials("<instance-username>", "<instance-password>", "ALIYUN");

    // Register the listener and subscribe to the topic
    ExampleDelayMessageListener *messageListener = new ExampleDelayMessageListener();
    consumer->subscribe("<your-topic>", "*");
    consumer->registerMessageListener(messageListener);

    // Start after subscriptions are configured
    consumer->start();

    // Keep the main thread alive (10 minutes to allow delayed messages to arrive)
    this_thread::sleep_for(chrono::seconds(600));
    consumer->shutdown();
    cout << "=======After consuming messages======" << endl;
    return 0;
}

Transactional messages

Use transactional messages when local business logic and message delivery must succeed or fail as a unit, for example, balance transfers between accounts.

Transactional messaging works in three steps:

  1. The producer sends a half message to the broker.

  2. The broker invokes executeLocalTransaction on the producer side. Return COMMIT_MESSAGE to deliver, ROLLBACK_MESSAGE to discard, or UNKNOWN to defer the decision.

  3. If the result is UNKNOWN, the broker periodically invokes checkLocalTransaction until it receives a definitive result.

Send transactional messages

#include <iostream>
#include <chrono>
#include <thread>
#include "TransactionMQProducer.h"
#include "MQClientException.h"
#include "TransactionListener.h"

using namespace std;
using namespace rocketmq;

class ExampleTransactionListener : public TransactionListener {
public:
    // Called after the half message is sent
    LocalTransactionState executeLocalTransaction(const MQMessage &msg, void *arg) {
        cout << "Execute Local Transaction, Received Message Topic:" << msg.getTopic()
             << ", Body:" << msg.getBody() << endl;

        // Run local business logic here. Return:
        //   COMMIT_MESSAGE  - deliver the message to consumers
        //   ROLLBACK_MESSAGE - discard the message
        //   UNKNOWN         - defer; the broker will call checkLocalTransaction later
        return UNKNOWN;
    }

    // Called by the broker when executeLocalTransaction returned UNKNOWN
    LocalTransactionState checkLocalTransaction(const MQMessageExt &msg) {
        cout << "Check Local Transaction, Received Message Topic:" << msg.getTopic()
             << ", MsgId:" << msg.getMsgId() << endl;

        // Query local transaction status and return the result
        return COMMIT_MESSAGE;
    }
};

int main() {
    cout << "=======Before sending messages=======" << endl;

    // Use TransactionMQProducer instead of DefaultMQProducer
    TransactionMQProducer producer("<your-group-id>");
    producer.setNamesrvAddr("<your-access-point>");
    producer.setSessionCredentials("<instance-username>", "<instance-password>", "ALIYUN");

    // Register the transaction listener before starting the producer
    ExampleTransactionListener *exampleTransactionListener = new ExampleTransactionListener();
    producer.setTransactionListener(exampleTransactionListener);
    producer.start();

    auto start = chrono::system_clock::now();
    int count = 3;

    for (int i = 0; i < count; ++i) {
        MQMessage msg("<your-topic>", "HiTAG", "Hello,CPP SDK, Transaction Message.");
        try {
            SendResult sendResult = producer.sendMessageInTransaction(msg, &i);
            cout << "SendResult:" << sendResult.getSendStatus()
                 << ", Message ID: " << sendResult.getMsgId() << endl;
            this_thread::sleep_for(chrono::seconds(1));
        } catch (MQException& e) {
            cout << "ErrorCode: " << e.GetError()
                 << " Exception:" << e.what() << endl;
        }
    }

    auto interval = chrono::system_clock::now() - start;
    cout << "Send " << count << " messages OK, costs "
         << chrono::duration_cast<chrono::milliseconds>(interval).count()
         << "ms" << endl;

    // Wait 60 seconds for the broker to call checkLocalTransaction
    cout << "Wait for local transaction check..... " << endl;
    for (int i = 0; i < 6; ++i) {
        this_thread::sleep_for(chrono::seconds(10));
        cout << "Running " << i * 10 + 10 << " Seconds......" << endl;
    }

    producer.shutdown();
    cout << "=======After sending messages=======" << endl;
    return 0;
}

Subscribe to transactional messages

Subscribe to transactional messages the same way as normal messages. Use MessageListenerConcurrently and DefaultMQPushConsumer. For the complete code, see Subscribe to normal messages.