All Products
Search
Document Center

ApsaraMQ for RocketMQ:Send and subscribe to normal messages with HTTP client SDKs

Last Updated:Mar 11, 2026

ApsaraMQ for RocketMQ provides HTTP client SDKs for seven programming languages. Use these SDKs to send normal messages to a topic, consume them from a consumer group, and acknowledge delivery.

Important

Topics created for normal messages cannot be used for other message types such as scheduled messages, delayed messages, ordered messages, or transactional messages. Create a separate topic for each message type.

Prerequisites

Before you begin, make sure that you have:

  • An ApsaraMQ for RocketMQ instance, a topic (message type: normal), and a group ID. For more information, see Create resources

  • An AccessKey pair (AccessKey ID and AccessKey secret) for authentication. For more information, see Create an AccessKey pair

Placeholders

Replace the following placeholders in all code examples with your actual values:

PlaceholderDescriptionExample
<your-http-endpoint>HTTP endpoint of your instancehttp://1234567890123456.mqrest.cn-hangzhou.aliyuncs.com
<your-access-key-id>AccessKey IDLTAI5tXxx
<your-access-key-secret>AccessKey secretxXxXxXx
<your-topic>Topic namenormal-topic-http
<your-instance-id>Instance IDMQ_INST_1380xxx_BbXbx0Y4
<your-group-id>Group ID (consumer ID)GID_http_test

Step 1: Install the SDK

Choose the SDK for your programming language and install it.

Java

For more information, see the Java SDK description and Release notes.

Go

go get github.com/aliyunmq/mq-http-go-sdk

For more information, see the Go SDK description and Release notes.

Python

pip install mq_http_sdk

For more information, see the Python SDK description and Release notes.

PHP

composer require aliyunmq/mq-http-sdk

For more information, see the PHP SDK description and Release notes.

Node.js

npm install @aliyunmq/mq-http-sdk --save

For more information, see the Node.js SDK description and Release notes.

C\#

For more information, see the C# SDK description and Release notes.

C++

Download the SDK source and build it with CMake. For more information, see the C++ SDK description and Release notes.

Step 2: Send normal messages

Each producer example follows this pattern:

  1. Create an MQClient with your HTTP endpoint and AccessKey pair.

  2. Get a producer for your instance and topic.

  3. Publish messages in a loop, with optional delayed delivery.

The publishMessage call is synchronous. If no exception is thrown, the message was sent.

Java

import com.aliyun.mq.http.MQClient;
import com.aliyun.mq.http.MQProducer;
import com.aliyun.mq.http.model.TopicMessage;

import java.util.Date;

public class Producer {

    public static void main(String[] args) {
        MQClient mqClient = new MQClient(
                "<your-http-endpoint>",
                "<your-access-key-id>",
                "<your-access-key-secret>"
        );

        final String topic = "<your-topic>";
        final String instanceId = "<your-instance-id>";

        // Get a producer for the specified instance and topic
        MQProducer producer;
        if (instanceId != null && instanceId != "") {
            producer = mqClient.getProducer(instanceId, topic);
        } else {
            producer = mqClient.getProducer(topic);
        }

        try {
            for (int i = 0; i < 4; i++) {
                TopicMessage pubMsg;
                if (i % 2 == 0) {
                    // Send a normal message with a tag, properties, and message key
                    pubMsg = new TopicMessage(
                            "hello mq!".getBytes(),
                            "A"  // Message tag
                    );
                    pubMsg.getProperties().put("a", String.valueOf(i));
                    pubMsg.setMessageKey("MessageKey");
                } else {
                    // Send a delayed message (delivered 10 seconds later)
                    pubMsg = new TopicMessage(
                            "hello mq!".getBytes(),
                            "A"
                    );
                    pubMsg.getProperties().put("a", String.valueOf(i));
                    pubMsg.setStartDeliverTime(System.currentTimeMillis() + 10 * 1000);
                }
                // publishMessage is synchronous. No exception means success.
                TopicMessage pubResultMsg = producer.publishMessage(pubMsg);

                System.out.println(new Date() + " Send mq message success. Topic is:" + topic
                        + ", msgId is: " + pubResultMsg.getMessageId()
                        + ", bodyMD5 is: " + pubResultMsg.getMessageBodyMD5());
            }
        } catch (Throwable e) {
            // Handle send failure: retry or persist the message
            System.out.println(new Date() + " Send mq message failed. Topic is:" + topic);
            e.printStackTrace();
        }

        mqClient.close();
    }

}

Go

package main

import (
    "fmt"
    "time"
    "strconv"

    "github.com/aliyunmq/mq-http-go-sdk"
)

func main() {
    endpoint := "<your-http-endpoint>"
    accessKey := "<your-access-key-id>"
    secretKey := "<your-access-key-secret>"
    topic := "<your-topic>"
    instanceId := "<your-instance-id>"

    client := mq_http_sdk.NewAliyunMQClient(endpoint, accessKey, secretKey, "")
    mqProducer := client.GetProducer(instanceId, topic)

    for i := 0; i < 4; i++ {
        var msg mq_http_sdk.PublishMessageRequest
        if i%2 == 0 {
            msg = mq_http_sdk.PublishMessageRequest{
                MessageBody: "hello mq!",
                MessageTag:  "",
                Properties:  map[string]string{},
            }
            msg.MessageKey = "MessageKey"
            msg.Properties["a"] = strconv.Itoa(i)
        } else {
            // Delayed message: delivered 10 seconds later
            msg = mq_http_sdk.PublishMessageRequest{
                MessageBody: "hello mq timer!",
                MessageTag:  "",
                Properties:  map[string]string{},
            }
            msg.Properties["a"] = strconv.Itoa(i)
            // StartDeliverTime is a UNIX timestamp in milliseconds
            msg.StartDeliverTime = time.Now().UTC().Unix()*1000 + 10*1000
        }
        ret, err := mqProducer.PublishMessage(msg)
        if err != nil {
            fmt.Println(err)
            return
        }
        fmt.Printf("Publish ---->\n\tMessageId:%s, BodyMD5:%s, \n",
            ret.MessageId, ret.MessageBodyMD5)
        time.Sleep(time.Duration(100) * time.Millisecond)
    }
}

Python

#!/usr/bin/env python
# coding=utf8
import sys
import time

from mq_http_sdk.mq_exception import MQExceptionBase
from mq_http_sdk.mq_producer import *
from mq_http_sdk.mq_client import *

# Initialize the client
mq_client = MQClient(
    "<your-http-endpoint>",
    "<your-access-key-id>",
    "<your-access-key-secret>"
)
topic_name = "<your-topic>"
instance_id = "<your-instance-id>"

producer = mq_client.get_producer(instance_id, topic_name)

# Send 4 messages
msg_count = 4
print("%sPublish Message To %s\nTopicName:%s\nMessageCount:%s\n"
      % (10 * "=", 10 * "=", topic_name, msg_count))

try:
    for i in range(msg_count):
        if i % 2 == 0:
            # Normal message with tag, properties, and message key
            msg = TopicMessage(
                "I am test message %s. Hello" % i,
                ""  # Message tag
            )
            msg.put_property("a", "i")
            msg.set_message_key("MessageKey")
            re_msg = producer.publish_message(msg)
            print("Publish Message Succeed. MessageID:%s, BodyMD5:%s"
                  % (re_msg.message_id, re_msg.message_body_md5))
        else:
            # Delayed message: delivered 5 seconds later
            msg = TopicMessage(
                "I am test message %s." % i,
                ""
            )
            msg.put_property("a", i)
            # Absolute time in milliseconds
            msg.set_start_deliver_time(int(round(time.time() * 1000)) + 5 * 1000)
            re_msg = producer.publish_message(msg)
            print("Publish Timer Message Succeed. MessageID:%s, BodyMD5:%s"
                  % (re_msg.message_id, re_msg.message_body_md5))
        time.sleep(1)
except MQExceptionBase as e:
    if e.type == "TopicNotExist":
        print("Topic not exist, please create it.")
        sys.exit(1)
    print("Publish Message Fail. Exception:%s" % e)

PHP

<?php

require "vendor/autoload.php";

use MQ\Model\TopicMessage;
use MQ\MQClient;

class ProducerTest
{
    private $client;
    private $producer;

    public function __construct()
    {
        $this->client = new MQClient(
            "<your-http-endpoint>",
            "<your-access-key-id>",
            "<your-access-key-secret>"
        );

        $topic = "<your-topic>";
        $instanceId = "<your-instance-id>";

        $this->producer = $this->client->getProducer($instanceId, $topic);
    }

    public function run()
    {
        try
        {
            for ($i=1; $i<=4; $i++)
            {
                $publishMessage = new TopicMessage(
                    "xxxxxxxx" // Message body
                );
                $publishMessage->putProperty("a", $i);
                $publishMessage->setMessageKey("MessageKey");
                if ($i % 2 == 0) {
                    // Delayed message: delivered 10 seconds later
                    $publishMessage->setStartDeliverTime(time() * 1000 + 10 * 1000);
                }
                $result = $this->producer->publishMessage($publishMessage);

                print "Send mq message success. msgId is:" . $result->getMessageId()
                    . ", bodyMD5 is:" . $result->getMessageBodyMD5() . "\n";
            }
        } catch (\Exception $e) {
            print_r($e->getMessage() . "\n");
        }
    }
}

$instance = new ProducerTest();
$instance->run();

?>

Node.js

const {
  MQClient,
  MessageProperties
} = require('@aliyunmq/mq-http-sdk');

const endpoint = "<your-http-endpoint>";
const accessKeyId = "<your-access-key-id>";
const accessKeySecret = "<your-access-key-secret>";

var client = new MQClient(endpoint, accessKeyId, accessKeySecret);

const topic = "<your-topic>";
const instanceId = "<your-instance-id>";

const producer = client.getProducer(instanceId, topic);

(async function(){
  try {
    for(var i = 0; i < 4; i++) {
      let res;
      if (i % 2 == 0) {
        // Normal message with properties and message key
        msgProps = new MessageProperties();
        msgProps.putProperty("a", i);
        msgProps.messageKey("MessageKey");
        res = await producer.publishMessage("hello mq.", "", msgProps);
      } else {
        // Delayed message: delivered 10 seconds later
        msgProps = new MessageProperties();
        msgProps.putProperty("a", i);
        msgProps.startDeliverTime(Date.now() + 10 * 1000);
        res = await producer.publishMessage("hello mq. timer msg!", "TagA", msgProps);
      }
      console.log("Publish message: MessageID:%s,BodyMD5:%s",
          res.body.MessageId, res.body.MessageBodyMD5);
    }

  } catch(e) {
    // Handle send failure: retry or persist the message
    console.log(e)
  }
})();

C++

//#include <iostream>
#include <fstream>
#include <time.h>
#include "mq_http_sdk/mq_client.h"

using namespace std;
using namespace mq::http::sdk;

int main() {

    MQClient mqClient(
            "<your-http-endpoint>",
            "<your-access-key-id>",
            "<your-access-key-secret>"
            );

    string topic = "<your-topic>";
    string instanceId = "<your-instance-id>";

    MQProducerPtr producer;
    if (instanceId == "") {
        producer = mqClient.getProducerRef(topic);
    } else {
        producer = mqClient.getProducerRef(instanceId, topic);
    }

    try {
        for (int i = 0; i < 4; i++)
        {
            PublishMessageResponse pmResp;
            if (i % 4 == 0) {
                // Message with body only
                producer->publishMessage("Hello, mq!", pmResp);
            } else if (i % 4 == 1) {
                // Message with body and tag
                producer->publishMessage("Hello, mq!have tag!", "tag", pmResp);
            } else if (i % 4 == 2) {
                // Message with body, tag, properties, and key
                TopicMessage pubMsg("Hello, mq!have key!");
                pubMsg.putProperty("a",std::to_string(i));
                pubMsg.setMessageKey("MessageKey" + std::to_string(i));
                producer->publishMessage(pubMsg, pmResp);
            } else {
                // Delayed message: delivered 10 seconds later
                // StartDeliverTime is an absolute time in milliseconds
                TopicMessage pubMsg("Hello, mq!timer msg!", "tag");
                pubMsg.setStartDeliverTime(time(NULL) * 1000 + 10 * 1000);
                pubMsg.putProperty("b",std::to_string(i));
                pubMsg.putProperty("c",std::to_string(i));
                producer->publishMessage(pubMsg, pmResp);
            }
            cout << "Publish mq message success. Topic is: " << topic
                << ", msgId is:" << pmResp.getMessageId()
                << ", bodyMD5 is:" << pmResp.getMessageBodyMD5() << endl;
        }
    } catch (MQServerException& me) {
        cout << "Request Failed: " + me.GetErrorCode()
            << ", requestId is:" << me.GetRequestId() << endl;
        return -1;
    } catch (MQExceptionBase& mb) {
        cout << "Request Failed: " + mb.ToString() << endl;
        return -2;
    }

    return 0;
}

C\#

using System;
using System.Collections.Generic;
using System.Threading;
using Aliyun.MQ.Model;
using Aliyun.MQ.Model.Exp;
using Aliyun.MQ.Util;

namespace Aliyun.MQ.Sample
{
    public class ProducerSample
    {
        private const string _endpoint = "<your-http-endpoint>";
        private const string _accessKeyId = "<your-access-key-id>";
        private const string _secretAccessKey = "<your-access-key-secret>";
        private const string _topicName = "<your-topic>";
        private const string _instanceId = "<your-instance-id>";

        private static MQClient _client = new Aliyun.MQ.MQClient(
            _accessKeyId, _secretAccessKey, _endpoint);

        static MQProducer producer = _client.GetProducer(_instanceId, _topicName);

        static void Main(string[] args)
        {
            try
            {
                for (int i = 0; i < 4; i++)
                {
                    TopicMessage sendMsg;
                    if (i % 2 == 0)
                    {
                        // Normal message with properties and message key
                        sendMsg = new TopicMessage("dfadfadfadf");
                        sendMsg.PutProperty("a", i.ToString());
                        sendMsg.MessageKey = "MessageKey";
                    }
                    else
                    {
                        // Delayed message: delivered 10 seconds later
                        sendMsg = new TopicMessage("dfadfadfadf", "tag");
                        sendMsg.PutProperty("a", i.ToString());
                        sendMsg.StartDeliverTime = AliyunSDKUtils.GetNowTimeStamp()
                            + 10 * 1000;
                    }
                    TopicMessage result = producer.PublishMessage(sendMsg);
                    Console.WriteLine("publis message success:" + result);
                }
            }
            catch (Exception ex)
            {
                Console.Write(ex);
            }
        }
    }
}
Note

To send messages from the console, log on to the ApsaraMQ for RocketMQ console, find your instance, and choose More > Quick Start in the Actions column.

Step 3: Consume normal messages

After messages are sent, start a consumer to receive and process them. Each consumer example follows this pattern:

  1. Create an MQClient with your HTTP endpoint and AccessKey pair.

  2. Get a consumer for your instance, topic, and group ID.

  3. Poll for messages in a loop using long polling.

  4. Process each message, then acknowledge it by sending the receipt handle back to the broker.

Long polling keeps the connection open for a specified duration (up to 30 seconds). If a message arrives during that window, the broker responds immediately rather than waiting for the timeout.

Important

If the broker does not receive an acknowledgment (ACK) before the NextConsumeTime for a message, the message is delivered again. Each delivery generates a new receipt handle.

ParameterDescriptionLimit
Batch sizeMaximum messages per requestUp to 16
Wait secondsLong polling timeoutUp to 30 seconds

Java

 import com.aliyun.mq.http.MQClient;
 import com.aliyun.mq.http.MQConsumer;
 import com.aliyun.mq.http.common.AckMessageException;
 import com.aliyun.mq.http.model.Message;

 import java.util.ArrayList;
 import java.util.List;

 public class Consumer {

     public static void main(String[] args) {
         MQClient mqClient = new MQClient(
                 "<your-http-endpoint>",
                 "<your-access-key-id>",
                 "<your-access-key-secret>"
         );

         final String topic = "<your-topic>";
         final String groupId = "<your-group-id>";
         final String instanceId = "<your-instance-id>";

         final MQConsumer consumer;
         if (instanceId != null && instanceId != "") {
             consumer = mqClient.getConsumer(instanceId, topic, groupId, null);
         } else {
             consumer = mqClient.getConsumer(topic, groupId);
         }

         // Use multiple threads for concurrent consumption in production
         do {
             List<Message> messages = null;

             try {
                 // Long polling: wait up to 3 seconds for messages
                 messages = consumer.consumeMessage(
                         3, // Max messages per batch (up to 16)
                         3  // Long polling timeout in seconds (up to 30)
                 );
             } catch (Throwable e) {
                 e.printStackTrace();
                 try {
                     Thread.sleep(2000);
                 } catch (InterruptedException e1) {
                     e1.printStackTrace();
                 }
             }
             if (messages == null || messages.isEmpty()) {
                 System.out.println(Thread.currentThread().getName()
                         + ": no new message, continue!");
                 continue;
             }

             // Process messages
             for (Message message : messages) {
                 System.out.println("Receive message: " + message);
             }

             // Acknowledge messages to prevent redelivery
             {
                 List<String> handles = new ArrayList<String>();
                 for (Message message : messages) {
                     handles.add(message.getReceiptHandle());
                 }

                 try {
                     consumer.ackMessage(handles);
                 } catch (Throwable e) {
                     if (e instanceof AckMessageException) {
                         AckMessageException errors = (AckMessageException) e;
                         System.out.println("Ack message fail, requestId is:"
                                 + errors.getRequestId() + ", fail handles:");
                         if (errors.getErrorMessages() != null) {
                             for (String errorHandle :
                                     errors.getErrorMessages().keySet()) {
                                 System.out.println("Handle:" + errorHandle
                                     + ", ErrorCode:" + errors.getErrorMessages()
                                         .get(errorHandle).getErrorCode()
                                     + ", ErrorMsg:" + errors.getErrorMessages()
                                         .get(errorHandle).getErrorMessage());
                             }
                         }
                         continue;
                     }
                     e.printStackTrace();
                 }
             }
         } while (true);
     }
}

Go

package main

import (
    "fmt"
    "github.com/gogap/errors"
    "strings"
    "time"

    "github.com/aliyunmq/mq-http-go-sdk"
)

func main() {
    endpoint := "<your-http-endpoint>"
    accessKey := "<your-access-key-id>"
    secretKey := "<your-access-key-secret>"
    topic := "<your-topic>"
    instanceId := "<your-instance-id>"
    groupId := "<your-group-id>"

    client := mq_http_sdk.NewAliyunMQClient(endpoint, accessKey, secretKey, "")
    mqConsumer := client.GetConsumer(instanceId, topic, groupId, "")

    for {
        endChan := make(chan int)
        respChan := make(chan mq_http_sdk.ConsumeMessageResponse)
        errChan := make(chan error)
        go func() {
            select {
            case resp := <-respChan:
                {
                    var handles []string
                    fmt.Printf("Consume %d messages---->\n", len(resp.Messages))
                    for _, v := range resp.Messages {
                        handles = append(handles, v.ReceiptHandle)
                        fmt.Printf("\tMessageID: %s, PublishTime: %d, MessageTag: %s\n"+
                            "\tConsumedTimes: %d, FirstConsumeTime: %d, NextConsumeTime: %d\n"+
                            "\tBody: %s\n"+
                            "\tProps: %s\n",
                            v.MessageId, v.PublishTime, v.MessageTag, v.ConsumedTimes,
                            v.FirstConsumeTime, v.NextConsumeTime, v.MessageBody,
                            v.Properties)
                    }

                    // Acknowledge processed messages
                    ackerr := mqConsumer.AckMessage(handles)
                    if ackerr != nil {
                        fmt.Println(ackerr)
                        for _, errAckItem := range ackerr.(errors.ErrCode).
                                Context()["Detail"].([]mq_http_sdk.ErrAckItem) {
                            fmt.Printf("\tErrorHandle:%s, ErrorCode:%s, ErrorMsg:%s\n",
                                errAckItem.ErrorHandle, errAckItem.ErrorCode,
                                errAckItem.ErrorMsg)
                        }
                        time.Sleep(time.Duration(3) * time.Second)
                    } else {
                        fmt.Printf("Ack ---->\n\t%s\n", handles)
                    }

                    endChan <- 1
                }
            case err := <-errChan:
                {
                    if strings.Contains(err.(errors.ErrCode).Error(),
                            "MessageNotExist") {
                        fmt.Println("\nNo new message, continue!")
                    } else {
                        fmt.Println(err)
                        time.Sleep(time.Duration(3) * time.Second)
                    }
                    endChan <- 1
                }
            case <-time.After(35 * time.Second):
                {
                    fmt.Println("Timeout of consumer message ??")
                    endChan <- 1
                }
            }
        }()

        // Long polling: wait up to 3 seconds for messages
        mqConsumer.ConsumeMessage(respChan, errChan,
            3, // Max messages per batch (up to 16)
            3, // Long polling timeout in seconds (up to 30)
        )
        <-endChan
    }
}

Python

#!/usr/bin/env python
# coding=utf8

from mq_http_sdk.mq_exception import MQExceptionBase
from mq_http_sdk.mq_consumer import *
from mq_http_sdk.mq_client import *

# Initialize the client
mq_client = MQClient(
    "<your-http-endpoint>",
    "<your-access-key-id>",
    "<your-access-key-secret>"
)
topic_name = "<your-topic>"
group_id = "<your-group-id>"
instance_id = "<your-instance-id>"

consumer = mq_client.get_consumer(instance_id, topic_name, group_id)

# Long polling: wait up to 3 seconds for messages
wait_seconds = 3
# Max messages per batch (up to 16)
batch = 3

print("%sConsume And Ack Message From Topic%s\nTopicName:%s\nMQConsumer:%s\nWaitSeconds:%s\n"
      % (10 * "=", 10 * "=", topic_name, group_id, wait_seconds))

while True:
    try:
        recv_msgs = consumer.consume_message(batch, wait_seconds)
        for msg in recv_msgs:
            print("Receive, MessageId: %s\nMessageBodyMD5: %s \
                              \nMessageTag: %s\nConsumedTimes: %s \
                              \nPublishTime: %s\nBody: %s \
                              \nNextConsumeTime: %s \
                              \nReceiptHandle: %s"
                             % (msg.message_id, msg.message_body_md5,
                              msg.message_tag, msg.consumed_times,
                              msg.publish_time, msg.message_body,
                              msg.next_consume_time, msg.receipt_handle))
    except MQExceptionBase as e:
        if e.type == "MessageNotExist":
            print("No new message! RequestId: %s" % e.req_id)
            continue

        print("Consume Message Fail! Exception:%s\n" % e)
        time.sleep(2)
        continue

    # Acknowledge processed messages to prevent redelivery
    try:
        receipt_handle_list = [msg.receipt_handle for msg in recv_msgs]
        consumer.ack_message(receipt_handle_list)
        print("Ack %s Message Succeed.\n\n" % len(receipt_handle_list))
    except MQExceptionBase as e:
        print("\nAck Message Fail! Exception:%s" % e)
        if e.sub_errors:
            for sub_error in e.sub_errors:
                print("\tErrorHandle:%s,ErrorCode:%s,ErrorMsg:%s"
                      % (sub_error["ReceiptHandle"],
                         sub_error["ErrorCode"],
                         sub_error["ErrorMessage"]))

PHP

<?php

require "vendor/autoload.php";

use MQ\Model\TopicMessage;
use MQ\MQClient;

class ConsumerTest
{
    private $client;
    private $consumer;

    public function __construct()
    {
        $this->client = new MQClient(
            "<your-http-endpoint>",
            "<your-access-key-id>",
            "<your-access-key-secret>"
        );

        $topic = "<your-topic>";
        $groupId = "<your-group-id>";
        $instanceId = "<your-instance-id>";

        $this->consumer = $this->client->getConsumer($instanceId, $topic, $groupId);
    }

    public function run()
    {
        // Use multiple threads for concurrent consumption in production
        while (True) {
            try {
                // Long polling: wait up to 3 seconds for messages
                $messages = $this->consumer->consumeMessage(
                    3, // Max messages per batch (up to 16)
                    3  // Long polling timeout in seconds (up to 30)
                );
            } catch (\Exception $e) {
                if ($e instanceof MQ\Exception\MessageNotExistException) {
                    printf("No message, continue long polling!RequestId:%s\n",
                        $e->getRequestId());
                    continue;
                }

                print_r($e->getMessage() . "\n");

                sleep(3);
                continue;
            }

            print "consume finish, messages:\n";

            // Process messages
            $receiptHandles = array();
            foreach ($messages as $message) {
                $receiptHandles[] = $message->getReceiptHandle();
                printf("MessageID:%s TAG:%s BODY:%s \nPublishTime:%d, "
                    . "FirstConsumeTime:%d, \nConsumedTimes:%d, "
                    . "NextConsumeTime:%d,MessageKey:%s\n",
                    $message->getMessageId(), $message->getMessageTag(),
                    $message->getMessageBody(),
                    $message->getPublishTime(), $message->getFirstConsumeTime(),
                    $message->getConsumedTimes(), $message->getNextConsumeTime(),
                    $message->getMessageKey());
                print_r($message->getProperties());
            }

            // Acknowledge processed messages
            print_r($receiptHandles);
            try {
                $this->consumer->ackMessage($receiptHandles);
            } catch (\Exception $e) {
                if ($e instanceof MQ\Exception\AckMessageException) {
                    printf("Ack Error, RequestId:%s\n", $e->getRequestId());
                    foreach ($e->getAckMessageErrorItems() as $errorItem) {
                        printf("\tReceiptHandle:%s, ErrorCode:%s, ErrorMsg:%s\n",
                            $errorItem->getReceiptHandle(),
                            $errorItem->getErrorCode(),
                            $errorItem->getErrorCode());
                    }
                }
            }
            print "ack finish\n";
        }

    }
}

$instance = new ConsumerTest();
$instance->run();

?>

Node.js

const {
  MQClient
} = require('@aliyunmq/mq-http-sdk');

const endpoint = "<your-http-endpoint>";
const accessKeyId = "<your-access-key-id>";
const accessKeySecret = "<your-access-key-secret>";

var client = new MQClient(endpoint, accessKeyId, accessKeySecret);

const topic = "<your-topic>";
const groupId = "<your-group-id>";
const instanceId = "<your-instance-id>";

const consumer = client.getConsumer(instanceId, topic, groupId);

(async function(){
  while(true) {
    try {
      // Long polling: wait up to 3 seconds for messages
      res = await consumer.consumeMessage(
          3, // Max messages per batch (up to 16)
          3  // Long polling timeout in seconds (up to 30)
          );

      if (res.code == 200) {
        console.log("Consume Messages, requestId:%s", res.requestId);
        const handles = res.body.map((message) => {
          console.log("\tMessageId:%s,Tag:%s,PublishTime:%d,NextConsumeTime:%d,"
            + "FirstConsumeTime:%d,ConsumedTimes:%d,Body:%s"
            + ",Props:%j,MessageKey:%s,Prop-A:%s",
              message.MessageId, message.MessageTag, message.PublishTime,
              message.NextConsumeTime, message.FirstConsumeTime,
              message.ConsumedTimes,
              message.MessageBody, message.Properties, message.MessageKey,
              message.Properties.a);
          return message.ReceiptHandle;
        });

        // Acknowledge processed messages
        res = await consumer.ackMessage(handles);
        if (res.code != 204) {
          console.log("Ack Message Fail:");
          const failHandles = res.body.map((error)=>{
            console.log("\tErrorHandle:%s, Code:%s, Reason:%s\n",
                error.ReceiptHandle, error.ErrorCode, error.ErrorMessage);
            return error.ReceiptHandle;
          });
          handles.forEach((handle)=>{
            if (failHandles.indexOf(handle) < 0) {
              console.log("\tSucHandle:%s\n", handle);
            }
          });
        } else {
          console.log("Ack Message suc, RequestId:%s\n\t",
              res.requestId, handles.join(','));
        }
      }
    } catch(e) {
      if (e.Code.indexOf("MessageNotExist") > -1) {
        console.log("Consume Message: no new message, RequestId:%s, Code:%s",
            e.RequestId, e.Code);
      } else {
        console.log(e);
      }
    }
  }
})();

C++

#include <vector>
#include <fstream>
#include "mq_http_sdk/mq_client.h"

#ifdef _WIN32
#include <windows.h>
#else
#include <unistd.h>
#endif

using namespace std;
using namespace mq::http::sdk;

int main() {

    MQClient mqClient(
            "<your-http-endpoint>",
            "<your-access-key-id>",
            "<your-access-key-secret>"
            );

    string topic = "<your-topic>";
    string groupId = "<your-group-id>";
    string instanceId = "<your-instance-id>";

    MQConsumerPtr consumer;
    if (instanceId == "") {
        consumer = mqClient.getConsumerRef(topic, groupId);
    } else {
        consumer = mqClient.getConsumerRef(instanceId, topic, groupId, "");
    }

    do {
        try {
            std::vector<Message> messages;
            // Long polling: wait up to 3 seconds for messages
            consumer->consumeMessage(
                    3, // Max messages per batch (up to 16)
                    3, // Long polling timeout in seconds (up to 30)
                    messages
            );
            cout << "Consume: " << messages.size() << " Messages!" << endl;

            // Process messages
            std::vector<std::string> receiptHandles;
            for (std::vector<Message>::iterator iter = messages.begin();
                    iter != messages.end(); ++iter)
            {
                cout << "MessageId: " << iter->getMessageId()
                    << " PublishTime: " << iter->getPublishTime()
                    << " Tag: " << iter->getMessageTag()
                    << " Body: " << iter->getMessageBody()
                    << " FirstConsumeTime: " << iter->getFirstConsumeTime()
                    << " NextConsumeTime: " << iter->getNextConsumeTime()
                    << " ConsumedTimes: " << iter->getConsumedTimes()
                    << " Properties: " << iter->getPropertiesAsString()
                    << " Key: " << iter->getMessageKey() << endl;
                receiptHandles.push_back(iter->getReceiptHandle());
            }

            // Acknowledge processed messages
            AckMessageResponse bdmResp;
            consumer->ackMessage(receiptHandles, bdmResp);
            if (!bdmResp.isSuccess()) {
                const std::vector<AckMessageFailedItem>& failedItems =
                    bdmResp.getAckMessageFailedItem();
                for (std::vector<AckMessageFailedItem>::const_iterator iter =
                        failedItems.begin();
                        iter != failedItems.end(); ++iter)
                {
                    cout << "AckFailedItem: " << iter->errorCode
                        << "  " << iter->receiptHandle << endl;
                }
            } else {
                cout << "Ack: " << messages.size() << " messages suc!" << endl;
            }
        } catch (MQServerException& me) {
            if (me.GetErrorCode() == "MessageNotExist") {
                cout << "No message to consume! RequestId: "
                    + me.GetRequestId() << endl;
                continue;
            }
            cout << "Request Failed: " + me.GetErrorCode()
                + ".RequestId: " + me.GetRequestId() << endl;
#ifdef _WIN32
            Sleep(2000);
#else
            usleep(2000 * 1000);
#endif
        } catch (MQExceptionBase& mb) {
            cout << "Request Failed: " + mb.ToString() << endl;
#ifdef _WIN32
            Sleep(2000);
#else
            usleep(2000 * 1000);
#endif
        }

    } while(true);
}

C\#

using System;
using System.Collections.Generic;
using System.Threading;
using Aliyun.MQ.Model;
using Aliyun.MQ.Model.Exp;
using Aliyun.MQ;

namespace Aliyun.MQ.Sample
{
    public class ConsumerSample
    {
        private const string _endpoint = "<your-http-endpoint>";
        private const string _accessKeyId = "<your-access-key-id>";
        private const string _secretAccessKey = "<your-access-key-secret>";
        private const string _topicName = "<your-topic>";
        private const string _instanceId = "<your-instance-id>";
        private const string _groupId = "<your-group-id>";

        private static MQClient _client = new Aliyun.MQ.MQClient(
            _accessKeyId, _secretAccessKey, _endpoint);
        static MQConsumer consumer = _client.GetConsumer(
            _instanceId, _topicName, _groupId, null);

        static void Main(string[] args)
        {
            // Use multiple threads for concurrent consumption in production
            while (true)
            {
                try
                {
                    List<Message> messages = null;

                    try
                    {
                        // Long polling: wait up to 3 seconds for messages
                        messages = consumer.ConsumeMessage(
                            3, // Max messages per batch (up to 16)
                            3  // Long polling timeout in seconds (up to 30)
                        );
                    }
                    catch (Exception exp1)
                    {
                        if (exp1 is MessageNotExistException)
                        {
                            Console.WriteLine(Thread.CurrentThread.Name
                                + " No new message, "
                                + ((MessageNotExistException)exp1).RequestId);
                            continue;
                        }
                        Console.WriteLine(exp1);
                        Thread.Sleep(2000);
                    }

                    if (messages == null)
                    {
                        continue;
                    }

                    List<string> handlers = new List<string>();
                    Console.WriteLine(Thread.CurrentThread.Name
                        + " Receive Messages:");
                    // Process messages
                    foreach (Message message in messages)
                    {
                        Console.WriteLine(message);
                        Console.WriteLine("Property a is:"
                            + message.GetProperty("a"));
                        handlers.Add(message.ReceiptHandle);
                    }

                    // Acknowledge processed messages
                    try
                    {
                        consumer.AckMessage(handlers);
                        Console.WriteLine("Ack message success:");
                        foreach (string handle in handlers)
                        {
                            Console.Write("\t" + handle);
                        }
                        Console.WriteLine();
                    }
                    catch (Exception exp2)
                    {
                        if (exp2 is AckMessageException)
                        {
                            AckMessageException ackExp =
                                (AckMessageException)exp2;
                            Console.WriteLine("Ack message fail, RequestId:"
                                + ackExp.RequestId);
                            foreach (AckMessageErrorItem errorItem
                                    in ackExp.ErrorItems)
                            {
                                Console.WriteLine("\tErrorHandle:"
                                    + errorItem.ReceiptHandle
                                    + ",ErrorCode:" + errorItem.ErrorCode
                                    + ",ErrorMsg:" + errorItem.ErrorMessage);
                            }
                        }
                    }
                }
                catch (Exception ex)
                {
                    Console.WriteLine(ex);
                    Thread.Sleep(2000);
                }
            }
        }
    }
}

Verify the result

After running the producer and consumer, verify message delivery:

  1. Log on to the ApsaraMQ for RocketMQ console.

  2. Find your instance and go to the message query page.

  3. Search by topic, message ID, or message key to confirm the messages were sent.

  4. Check the message trace to verify that the consumer received and acknowledged each message.

For more information, see Query messages and Query message traces.