After you create all the resources in the console, you can call an HTTP SDK to send and subscribe to messages in Message Queue for Apache RocketMQ.

Download and install an HTTP SDK

Message Queue for Apache RocketMQ provides the following multi-language HTTP SDKs. Download and install the client SDK in a specific language as needed.

Call HTTP SDKs to send messages

After you obtain the client SDK in a specific language, you can run the following sample code to send messages.

                           
import com.aliyun.mq.http.MQClient;
import com.aliyun.mq.http.MQProducer;
import com.aliyun.mq.http.model.TopicMessage;

import java.util.Date;

public class Producer {

    public static void main(String[] args) {
        MQClient mqClient = new MQClient(
                // The domain name used for HTTP access. (The production environment hosted on Alibaba Cloud is used here as an example.)
                "${HTTP_ENDPOINT}",
                // The AccessKey ID that you created in the Alibaba Cloud console for verifying the Alibaba Cloud account.
                "${ACCESS_KEY}",
                // The AccessKey secret that you created in the Alibaba Cloud console for verifying the Alibaba Cloud account.
                "${SECRET_KEY}"
        );

        // The topic of the message.
        final String topic = "${TOPIC}";
        // The instance ID of the topic. This parameter is null by default.
        final String instanceId = "${INSTANCE_ID}";

        // Obtain the producer of the message.
        MQProducer producer;
        if (instanceId ! = null && instanceId ! = "") {
            producer = mqClient.getProducer(instanceId, topic);
        } else {
            producer = mqClient.getProducer(topic);
        }

        try {
            // Send four messages cyclically.
            for (int i = 0; i < 4; i++) {
                TopicMessage pubMsg;
                if (i % 2 == 0) {
                    // A normal message.
                    pubMsg = new TopicMessage(
                            // The message content.
                            "hello mq! ".getBytes(),
                            // The message tag.
                            "A"
                    );
                    // Set the properties.
                    pubMsg.getProperties().put("a", String.valueOf(i));
                    // Set the message key.
                    pubMsg.setMessageKey("MessageKey");
                } else {
                    pubMsg = new TopicMessage(
                            // The message content.
                            "hello mq! ".getBytes(),
                            // The message tag.
                            "A"
                    );
                    // Set the properties.
                    pubMsg.getProperties().put("a", String.valueOf(i));
                    // Schedule the message to be sent 10 seconds later.
                    pubMsg.setStartDeliverTime(System.currentTimeMillis() + 10 * 1000);
                }
                // Send the message in synchronous mode. The message is sent if no exception occurs.
                TopicMessage pubResultMsg = producer.publishMessage(pubMsg);

                // Send the message in synchronous mode. The message is sent if no exception occurs.
                System.out.println(new Date() + " Send mq message success. Topic is:" + topic + ", msgId is: " + pubResultMsg.getMessageId()
                        + ", bodyMD5 is: " + pubResultMsg.getMessageBodyMD5());
            }
        } catch (Throwable e) {
            // The message failed to be sent and must be resent. The system can resend the message or store message data persistently.
            System.out.println(new Date() + " Send mq message failed. Topic is:" + topic);
            e.printStackTrace();
        }

        mqClient.close();
    }

}
                           
package main

import (
    "fmt"
    "time"
    "strconv"

    "github.com/aliyunmq/mq-http-go-sdk"
)

func main() {
    // The domain name used for HTTP access. (The production environment hosted on Alibaba Cloud is used here as an example.)
    endpoint := "${HTTP_ENDPOINT}"
    // The AccessKey ID that you created in the Alibaba Cloud console for verifying the Alibaba Cloud account.
    accessKey := "${ACCESS_KEY}"
    // The AccessKey secret that you created in the Alibaba Cloud console for verifying the Alibaba Cloud account.
    secretKey := "${SECRET_KEY}"
    // The topic of the message.
    topic := "${TOPIC}"
    // The instance ID of the topic. This parameter is null by default.
    instanceId := "${INSTANCE_ID}"

    client := mq_http_sdk.NewAliyunMQClient(endpoint, accessKey, secretKey, "")

    mqProducer := client.GetProducer(instanceId, topic)
    // Send four messages cyclically.
    for i := 0; i < 4; i++ {
        var msg mq_http_sdk.PublishMessageRequest
        if i%2 == 0 {
            msg = mq_http_sdk.PublishMessageRequest{
                MessageBody: "hello mq! ", // The message content.
                MessageTag:  "",                  // The message tag.
                Properties:  map[string]string{}, // The message properties.
            }
            // Set the message key.
            msg.MessageKey = "MessageKey"
            // Set the properties.
            msg.Properties["a"] = strconv.Itoa(i)
        } else {
            msg = mq_http_sdk.PublishMessageRequest{
                MessageBody: "hello mq timer! ", // The message content.
                MessageTag:  "",                  // The message tag.
                Properties:  map[string]string{}, // The message properties.
            }
            // Set the properties.
            msg.Properties["a"] = strconv.Itoa(i)
            // Schedule the message to be sent 10 later seconds. The scheduled time is in milliseconds in UNIX.
            msg.StartDeliverTime = time.Now().UTC().Unix() * 1000 + 10 * 1000
        }
        ret, err := mqProducer.PublishMessage(msg)

        if err ! = nil {
            fmt.Println(err)
            return
        } else {
            fmt.Printf("Publish ---->\n\tMessageId:%s, BodyMD5:%s, \n", ret.MessageId, ret.MessageBodyMD5)
        }
        time.Sleep(time.Duration(100) * time.Millisecond)
    }
}
                           
<? php

require "vendor/autoload.php";

use MQ\Model\TopicMessage;
use MQ\MQClient;

class ProducerTest
{
    private $client;
    private $producer;

    public function __construct()
    {
        $this->client = new MQClient(
            // The domain name used for HTTP access. (The production environment hosted on Alibaba Cloud is used here as an example.)
            "${HTTP_ENDPOINT}",
            // The AccessKey ID that you created in the Alibaba Cloud console for verifying the Alibaba Cloud account.
            "${ACCESS_KEY}",
            // The AccessKey secret that you created in the Alibaba Cloud console for verifying the Alibaba Cloud account.
            "${SECRET_KEY}"
        );

        // The topic of the message.
        $topic = "${TOPIC}";
        // The instance ID of the topic. This parameter is null by default.
        $instanceId = "${INSTANCE_ID}";

        $this->producer = $this->client->getProducer($instanceId, $topic);
    }

    public function run()
    {
        try
        {
            for ($i=1; $i<=4; $i++)
            {
                $publishMessage = new TopicMessage(
                    "xxxxxxxx"// The message content.
                );
                // Set the properties.
                $publishMessage->putProperty("a", $i);
                // Set the message key.
                $publishMessage->setMessageKey("MessageKey");
                if ($i % 2 == 0) {
                    // Schedule the message to be sent 10 seconds later.
                    $publishMessage->setStartDeliverTime(time() * 1000 + 10 * 1000);
                }
                $result = $this->producer->publishMessage($publishMessage);

                print "Send mq message success. msgId is:" . $result->getMessageId() . ", bodyMD5 is:" . $result->getMessageBodyMD5() . "\n";
            }
        } catch (\Exception $e) {
            print_r($e->getMessage() . "\n");
        }
    }
}


$instance = new ProducerTest();
$instance->run();

? >
                           
#! /usr/bin/env python
# coding=utf8
import sys

from mq_http_sdk.mq_exception import MQExceptionBase
from mq_http_sdk.mq_producer import *
from mq_http_sdk.mq_client import *
import time

# Initialize the client.
mq_client = MQClient(
    # The domain name used for HTTP access. (The production environment hosted on Alibaba Cloud is used here as an example.)
    "${HTTP_ENDPOINT}",
    # The AccessKey ID that you created in the Alibaba Cloud console for verifying the Alibaba Cloud account.
    "${ACCESS_KEY}",
    # The AccessKey secret that you created in the Alibaba Cloud console for verifying the Alibaba Cloud account.
    "${SECRET_KEY}"
	)
# The topic of the message.
topic_name = "${TOPIC}"
# The instance ID of the topic. This parameter is null by default.
instance_id = "${INSTANCE_ID}"

producer = mq_client.get_producer(instance_id, topic_name)

# Publish multiple messages cyclically.
msg_count = 4
print("%sPublish Message To %s\nTopicName:%s\nMessageCount:%s\n" % (10 * "=", 10 * "=", topic_name, msg_count))

try:
    for i in range(msg_count):
        if i % 2 == 0:
            msg = TopicMessage(
                    # The message content.
                    "I am test message %s. 你好" % i, 
                    # The message tag.
                    ""
                    )
            # Set the properties.
            msg.put_property("a", "i")
            # Set the message key.
            msg.set_message_key("MessageKey")
            re_msg = producer.publish_message(msg)
            print("Publish Message Succeed. MessageID:%s, BodyMD5:%s" % (re_msg.message_id, re_msg.message_body_md5))
        else:
            msg = TopicMessage(
                    # The message content.
                    "I am test message %s." % i, 
                    # The message tag.
                    ""
                    )
            msg.put_property("a", i)
            # Schedule an absolute time in milliseconds to send the message.
            msg.set_start_deliver_time(int(round(time.time() * 1000)) + 5 * 1000)
            re_msg = producer.publish_message(msg)
            print("Publish Timer Message Succeed. MessageID:%s, BodyMD5:%s" % (re_msg.message_id, re_msg.message_body_md5))
        time.sleep(1)
except MQExceptionBase as e:
    if e.type == "TopicNotExist":
        print("Topic not exist, please create it.")
        sys.exit(1)
    print("Publish Message Fail. Exception:%s" % e)
                           
const {
  MQClient,
  MessageProperties
} = require('@aliyunmq/mq-http-sdk');

// The domain name used for HTTP access. (The production environment hosted on Alibaba Cloud is used here as an example.)
const endpoint = "${HTTP_ENDPOINT}";
// The AccessKey ID that you created in the Alibaba Cloud console for verifying the Alibaba Cloud account.
const accessKeyId = "${ACCESS_KEY}";
// The AccessKey secret that you created in the Alibaba Cloud console for verifying the Alibaba Cloud account.
const accessKeySecret = "${SECRET_KEY}";

var client = new MQClient(endpoint, accessKeyId, accessKeySecret);

// The topic of the message.
const topic = "${TOPIC}";
// The instance ID of the topic. This parameter is null by default.
const instanceId = "${INSTANCE_ID}";

const producer = client.getProducer(instanceId, topic);

(async function(){
  try {
    // Send four messages cyclically.
    for(var i = 0; i < 4; i++) {
      let res;
      if (i % 2 == 0) {
        msgProps = new MessageProperties();
        // Set the properties.
        msgProps.putProperty("a", i);
        // Set the message key.
        msgProps.messageKey("MessageKey");
        res = await producer.publishMessage("hello mq.", "", msgProps);
      } else {
        msgProps = new MessageProperties();
        // Set the properties.
        msgProps.putProperty("a", i);
        // Schedule the message to be sent 10 seconds later.
        msgProps.startDeliverTime(Date.now() + 10 * 1000);
        res = await producer.publishMessage("hello mq. timer msg! ", "TagA", msgProps);
      }
      console.log("Publish message: MessageID:%s,BodyMD5:%s", res.body.MessageId, res.body.MessageBodyMD5);
    }

  } catch(e) {
    // The message failed to be sent and must be resent. The system can resend the message or store message data persistently.
    console.log(e)
  }
})();
                           
//#include <iostream>
#include <fstream>
#include <time.h>
#include "mq_http_sdk/mq_client.h"

using namespace std;
using namespace mq::http::sdk;


int main() {

    MQClient mqClient(
            // The domain name used for HTTP access. (The production environment hosted on Alibaba Cloud is used here as an example.)
            "${HTTP_ENDPOINT}",
            // The AccessKey ID that you created in the Alibaba Cloud console for verifying the Alibaba Cloud account.
            "${ACCESS_KEY}",
            // The AccessKey secret that you created in the Alibaba Cloud console for verifying the Alibaba Cloud account.
            "${SECRET_KEY}"
            );

    // The topic of the message.
    string topic = "${TOPIC}";
    // The instance ID of the topic. This parameter is null by default.
    string instanceId = "${INSTANCE_ID}";

    MQProducerPtr producer;
    if (instanceId == "") {
        producer = mqClient.getProducerRef(topic);
    } else {
        producer = mqClient.getProducerRef(instanceId, topic);
    }

    try {
        for (int i = 0; i < 4; i++)
        {
            PublishMessageResponse pmResp;
            if (i % 4 == 0) {
                // publish message, only have body. 
                producer->publishMessage("Hello, mq! ", pmResp);
            } else if (i % 4 == 1) {
                // publish message, only have body and tag.
                producer->publishMessage("Hello, mq! have tag! ", "tag", pmResp);
            } else if (i % 4 == 2) {
                // publish message, have body,tag,properties and key.
                TopicMessage pubMsg("Hello, mq! have key! ");
                pubMsg.putProperty("a",std::to_string(i));
                pubMsg.setMessageKey("MessageKey" + std::to_string(i));
                producer->publishMessage(pubMsg, pmResp);
            } else {
                // publish timer message, message will be consumed after StartDeliverTime
                TopicMessage pubMsg("Hello, mq! timer msg! ", "tag");
                // StartDeliverTime is an absolute time in millisecond.
                pubMsg.setStartDeliverTime(time(NULL) * 1000 + 10 * 1000);
                pubMsg.putProperty("b",std::to_string(i));
                pubMsg.putProperty("c",std::to_string(i));
                producer->publishMessage(pubMsg, pmResp);
            }
            cout << "Publish mq message success. Topic is: " << topic 
                << ", msgId is:" << pmResp.getMessageId() 
                << ", bodyMD5 is:" << pmResp.getMessageBodyMD5() << endl;
        }
    } catch (MQServerException& me) {
        cout << "Request Failed: " + me.GetErrorCode() << ", requestId is:" << me.GetRequestId() << endl;
        return -1;
    } catch (MQExceptionBase& mb) {
        cout << "Request Failed: " + mb.ToString() << endl;
        return -2;
    }

    return 0;
}
                           
using System;
using System.Collections.Generic;
using System.Threading;
using Aliyun.MQ.Model;
using Aliyun.MQ.Model.Exp;
using Aliyun.MQ.Util;

namespace Aliyun.MQ.Sample
{
    public class ProducerSample
    {
        // The domain name used for HTTP access. (The production environment hosted on Alibaba Cloud is used here as an example.)
        private const string _endpoint = "${HTTP_ENDPOINT}";
        // The AccessKey ID that you created in the Alibaba Cloud console for verifying the Alibaba Cloud account.
        private const string _accessKeyId = "${ACCESS_KEY}";
        // The AccessKey secret that you created in the Alibaba Cloud console for verifying the Alibaba Cloud account.
        private const string _secretAccessKey = "${SECRET_KEY}";
        // The topic of the message.
        private const string _topicName = "${TOPIC}";
        // The instance ID of the topic. This parameter is null by default.
        private const string _instanceId = "${INSTANCE_ID}";

        private static MQClient _client = new Aliyun.MQ.MQClient(_accessKeyId, _secretAccessKey, _endpoint);

        static MQProducer producer = _client.GetProducer(_instanceId, _topicName);

        static void Main(string[] args)
        {
            try
            {
                // Send four messages cyclically.
                for (int i = 0; i < 4; i++)
                {
                    TopicMessage sendMsg;
                    if (i % 2 == 0)
                    {
                        sendMsg = new TopicMessage("dfadfadfadf");
                        // Set the properties.
                        sendMsg.PutProperty("a", i.ToString());
                        // Set the message key.
                        sendMsg.MessageKey = "MessageKey";
                    }
                    else
                    {
                        sendMsg = new TopicMessage("dfadfadfadf", "tag");
                        // Set the properties.
                        sendMsg.PutProperty("a", i.ToString());
                        // Schedule the message to be sent 10 seconds later.
                        sendMsg.StartDeliverTime = AliyunSDKUtils.GetNowTimeStamp() + 10 * 1000;
                    }
                    TopicMessage result = producer.PublishMessage(sendMsg);
                    Console.WriteLine("publis message success:" + result);
                }
            }
            catch (Exception ex)
            {
                Console.Write(ex);
            }
        }
    }
}

On the Topics page, find the target topic and click Send Message in the Actions column.

Check whether messages are sent

After a message is sent, you can check its sending status in the console by performing the following operations:

  1. On the details page of the target instance, choose Message Query > By Topic from the left-side navigation pane.
  2. In the search box, enter the topic to which the message is sent, and click Searchto check its sending status.

    Stored Atindicates the time when the Message Queue for Apache RocketMQ broker stores the message. If the message can be queried out, the message has been sent to the Message Queue for Apache RocketMQ broker.

Notice This step demonstrates the situation where Message Queue for Apache RocketMQ is used for the first time and the consumer has not been started yet. Therefore, no consumption data is displayed in the message status information. To start the consumer and subscribe to messages, go to the next step. For more information about the message status, see Query messages and Query a message trace.

Call HTTP SDKs to subscribe to messages

Once a message is sent, you need to start the consumer to subscribe to the message. Run the following sample code as needed to start the consumer and test message subscription. You must set related parameters correctly according to the instructions.

                           
import com.aliyun.mq.http.MQClient;
 import com.aliyun.mq.http.MQConsumer;
 import com.aliyun.mq.http.common.AckMessageException;
 import com.aliyun.mq.http.model.Message;
 
 import java.util.ArrayList;
 import java.util.List;
 
 public class Consumer {
 
     public static void main(String[] args) {
         MQClient mqClient = new MQClient(
                 // The domain name used for HTTP access. (The production environment hosted on Alibaba Cloud is used here as an example.)
                 "${HTTP_ENDPOINT}",
                 // The AccessKey ID that you created in the Alibaba Cloud console for verifying the Alibaba Cloud account.
                 "${ACCESS_KEY}",
                 // The AccessKey secret that you created in the Alibaba Cloud console for verifying the Alibaba Cloud account.
                 "${SECRET_KEY}"
         );
 
         // The topic of the message.
         final String topic = "${TOPIC}";
         // The group ID (consumer ID) you created in the console.
         final String groupId = "${GROUP_ID}";
         // The instance ID of the topic. This parameter is null by default.
         final String instanceId = "${INSTANCE_ID}";
 
         final MQConsumer consumer;
         if (instanceId ! = null && instanceId ! = "") {
             consumer = mqClient.getConsumer(instanceId, topic, groupId, null);
         } else {
             consumer = mqClient.getConsumer(topic, groupId);
         }
 
         // Consume messages cyclically in the current thread. We recommend that you use multiple threads to concurrently consume messages.
         do {
             List<Message> messages = null;
 
             try {
                 // Consume messages in long-polling mode.
                 // In long-polling mode, if no message on the topic is available for consumption, the request is hung on the server for 3 seconds. If any message is available for consumption within the duration, a response is immediately sent to the client.
                 messages = consumer.consumeMessage(
                         3,// A maximum of 3 messages can be consumed at a time. The largest value is 16.
                         3// The duration of a long-polling cycle is 3 seconds. The largest value is 30 seconds.
                 );
             } catch (Throwable e) {
                 e.printStackTrace();
                 try {
                     Thread.sleep(2000);
                 } catch (InterruptedException e1) {
                     e1.printStackTrace();
                 }
             }
             // No messages.
             if (messages == null || messages.isEmpty()) {
                 System.out.println(Thread.currentThread().getName() + ": no new message, continue! ");
                 continue;
             }
 
             // Data consumption logic.
             for (Message message : messages) {
                 System.out.println("Receive message: " + message);
             }
 
             // If the consumption of a message is not confirmed before Message.nextConsumeTime, the message will be consumed again.
             // A unique timestamp is specified for the handle of a message each time the message is consumed.
             {
                 List<String> handles = new ArrayList<String>();
                 for (Message message : messages) {
                     handles.add(message.getReceiptHandle());
                 }
 
                 try {
                     consumer.ackMessage(handles);
                 } catch (Throwable e) {
                     // The consumption confirmation of some messages may fail due to timeout of the message handles.
                     if (e instanceof AckMessageException) {
                         AckMessageException errors = (AckMessageException) e;
                         System.out.println("Ack message fail, requestId is:" + errors.getRequestId() + ", fail handles:");
                         if (errors.getErrorMessages() ! = null) {
                             for (String errorHandle :errors.getErrorMessages().keySet()) {
                                 System.out.println("Handle:" + errorHandle + ", ErrorCode:" + errors.getErrorMessages().get(errorHandle).getErrorCode()
                                         + ", ErrorMsg:" + errors.getErrorMessages().get(errorHandle).getErrorMessage());
                             }
                         }
                         continue;
                     }
                     e.printStackTrace();
                 }
             }
         } while (true);
     }
 }
 
                           
package main

import (
	"fmt"
	"github.com/gogap/errors"
	"strings"
	"time"

	"github.com/aliyunmq/mq-http-go-sdk"
)

func main() {
	// The domain name used for HTTP access. (The production environment hosted on Alibaba Cloud is used here as an example.)
	endpoint := "${HTTP_ENDPOINT}"
	// The AccessKey ID that you created in the Alibaba Cloud console for verifying the Alibaba Cloud account.
	accessKey := "${ACCESS_KEY}"
	// The AccessKey secret that you created in the Alibaba Cloud console for verifying the Alibaba Cloud account.
	secretKey := "${SECRET_KEY}"
	// The topic of the message.
	topic := "${TOPIC}"
	// The instance ID of the topic. This parameter is null by default.
	instanceId := "${INSTANCE_ID}"
	// The group ID (consumer ID) that you created in the console.
	groupId := "${GROUP_ID}"

	client := mq_http_sdk.NewAliyunMQClient(endpoint, accessKey, secretKey, "")

	mqConsumer := client.GetConsumer(instanceId, topic, groupId, "")

	for {
		endChan := make(chan int)
		respChan := make(chan mq_http_sdk.ConsumeMessageResponse)
		errChan := make(chan error)
		go func() {
			select {
			case resp := <-respChan:
				{
					// Data consumption logic.
					var handles []string
					fmt.Printf("Consume %d messages---->\n", len(resp.Messages))
					for _, v := range resp.Messages {
						handles = append(handles, v.ReceiptHandle)
						fmt.Printf("\tMessageID: %s, PublishTime: %d, MessageTag: %s\n"+
							"\tConsumedTimes: %d, FirstConsumeTime: %d, NextConsumeTime: %d\n"+
                            "\tBody: %s\n"+
                            "\tProps: %s\n",
							v.MessageId, v.PublishTime, v.MessageTag, v.ConsumedTimes,
							v.FirstConsumeTime, v.NextConsumeTime, v.MessageBody, v.Properties)
					}

                    // If the consumption of a message is not confirmed before NextConsumeTime, the message will be consumed again.
                    // A unique timestamp is specified for the handle of a message each time the message is consumed.
                    ackerr := mqConsumer.AckMessage(handles)
					if ackerr ! = nil {
						// The consumption confirmation of some messages may fail due to timeout of the message handles.
						fmt.Println(ackerr)
						for _, errAckItem := range ackerr.(errors.ErrCode).Context()["Detail"].([]mq_http_sdk.ErrAckItem) {
							fmt.Printf("\tErrorHandle:%s, ErrorCode:%s, ErrorMsg:%s\n",
								errAckItem.ErrorHandle, errAckItem.ErrorCode, errAckItem.ErrorMsg)
						}
						time.Sleep(time.Duration(3) * time.Second)
					} else {
						fmt.Printf("Ack ---->\n\t%s\n", handles)
					}

					endChan <- 1
				}
			case err := <-errChan:
				{
					// No messages.
					if strings.Contains(err.(errors.ErrCode).Error(), "MessageNotExist") {
						fmt.Println("\nNo new message, continue! ")
					} else {
						fmt.Println(err)
						time.Sleep(time.Duration(3) * time.Second)
					}
					endChan <- 1
				}
			case <-time.After(35 * time.Second):
				{
					fmt.Println("Timeout of consumer message ?? ")
					endChan <- 1
				}
			}
		}()

		// Consume messages in long-polling mode.
		// In long-polling mode, if no message on the topic is available for consumption, the request is hung on the server for 3 seconds. If any message is available for consumption within the duration, a response is immediately sent to the client.
		mqConsumer.ConsumeMessage(respChan, errChan,
			3, // A maximum of 3 messages can be consumed at a time. The largest value is 16.
			3, // The duration of a long-polling cycle is 3 seconds. The largest value is 30 seconds.
		)
		<-endChan
	}
}
                           
<? php

require "vendor/autoload.php";

use MQ\Model\TopicMessage;
use MQ\MQClient;

class ConsumerTest
{
    private $client;
    private $consumer;

    public function __construct()
    {
        $this->client = new MQClient(
            // The domain name used for HTTP access. (The production environment hosted on Alibaba Cloud is used here as an example.)
            "${HTTP_ENDPOINT}",
            // The AccessKey ID that you created in the Alibaba Cloud console for verifying the Alibaba Cloud account.
            "${ACCESS_KEY}",
            // The AccessKey secret that you created in the Alibaba Cloud console for verifying the Alibaba Cloud account.
            "${SECRET_KEY}"
        );

        // The topic of the message.
        $topic = "${TOPIC}";
        // The group ID (consumer ID) that you created in the console.
        $groupId = "${GROUP_ID}";
        // The instance ID of the topic. This parameter is null by default.
        $instanceId = "${INSTANCE_ID}";

        $this->consumer = $this->client->getConsumer($instanceId, $topic, $groupId);
    }

    public function run()
    {
        // Consume messages cyclically in the current thread. We recommend that you use multiple threads to concurrently consume messages.
        while (True) {
            try {
                // Consume messages in long-polling mode.
                // In long-polling mode, if no message on the topic is available for consumption, the request is hung on the server for 3 seconds. If any message is available for consumption within the duration, a response is immediately sent to the client.
                $messages = $this->consumer->consumeMessage(
                    3, // A maximum of 3 messages can be consumed at a time. The largest value is 16.
                    3 // The duration of a long-polling cycle is 3 seconds. The largest value is 30 seconds.
                );
            } catch (\Exception $e) {
                if ($e instanceof MQ\Exception\MessageNotExistException) {
                    // If no message can be consumed, the polling continues.
                    printf("No message, contine long polling! RequestId:%s\n", $e->getRequestId());
                    continue;
                }

                print_r($e->getMessage() . "\n");

                sleep(3);
                continue;
            }

            print "consume finish, messages:\n";

            // Data consumption logic.
            $receiptHandles = array();
            foreach ($messages as $message) {
                $receiptHandles[] = $message->getReceiptHandle();
                printf("MessageID:%s TAG:%s BODY:%s \nPublishTime:%d, FirstConsumeTime:%d, \nConsumedTimes:%d, NextConsumeTime:%d,MessageKey:%s\n",
                    $message->getMessageId(), $message->getMessageTag(), $message->getMessageBody(),
                    $message->getPublishTime(), $message->getFirstConsumeTime(), $message->getConsumedTimes(), $message->getNextConsumeTime(),
                    $message->getMessageKey());
                print_r($message->getProperties());
            }

            // If the consumption of a message is not confirmed before $message->getNextConsumeTime(), the message will be consumed again.
            // A unique timestamp is specified for the handle of a message each time the message is consumed.
            print_r($receiptHandles);
            try {
                $this->consumer->ackMessage($receiptHandles);
            } catch (\Exception $e) {
                if ($e instanceof MQ\Exception\AckMessageException) {
                    // The consumption confirmation of some messages may fail due to timeout of the message handles.
                    printf("Ack Error, RequestId:%s\n", $e->getRequestId());
                    foreach ($e->getAckMessageErrorItems() as $errorItem) {
                        printf("\tReceiptHandle:%s, ErrorCode:%s, ErrorMsg:%s\n", $errorItem->getReceiptHandle(), $errorItem->getErrorCode(), $errorItem->getErrorCode());
                    }
                }
            }
            print "ack finish\n";


        }

    }
}


$instance = new ConsumerTest();
$instance->run();

? >
                           
#! /usr/bin/env python
# coding=utf8

from mq_http_sdk.mq_exception import MQExceptionBase
from mq_http_sdk.mq_consumer import *
from mq_http_sdk.mq_client import *

# Initialize the client.
mq_client = MQClient(
    # The domain name used for HTTP access. (The production environment hosted on Alibaba Cloud is used here as an example.)
    "",
    # The AccessKey ID that you created in the Alibaba Cloud console for verifying the Alibaba Cloud account.
    "${ACCESS_KEY}",
    # The AccessKey secret that you created in the Alibaba Cloud console for verifying the Alibaba Cloud account.
    "${SECRET_KEY}"
  )
# The topic of the message.
topic_name = "${TOPIC}"
# The group ID that you created in the console.
group_id = "GID_test"
# The instance ID of the topic. This parameter is null by default.
instance_id = "MQ_INST_1380156306793859_BbXbx0Y4"

consumer = mq_client.get_consumer(instance_id, topic_name, group_id)

# In long-polling mode, if no message on the topic is available for consumption, the request is hung on the server for 3 seconds. If any message is available for consumption within the duration, a response is immediately sent to the client.
# The long polling interval is 3 seconds (the maximum valid value is 30 seconds).
wait_seconds = 3
# A maximum of 3 messages can be consumed at a time. The largest value is 16.
batch = 3
print "%sConsume And Ak Message From Topic%s\nTopicName:%s\nMQConsumer:%s\nWaitSeconds:%s\n" % (10 * "=", 10 * "=", topic_name, group_id, wait_seconds)
while True:
    try:
        # Consume messages in long-polling mode.
        recv_msgs = consumer.consume_message(batch, wait_seconds)
        for msg in recv_msgs:
            print "Receive, MessageId: %s\nMessageBodyMD5: %s \
                              \nMessageTag: %s\nConsumedTimes: %s \
                              \nPublishTime: %s\nBody: %s \
                              \nNextConsumeTime: %s \
                              \nReceiptHandle: %s" % \
                             (msg.message_id, msg.message_body_md5,
                              msg.message_tag, msg.consumed_times,
                              msg.publish_time, msg.message_body,
                              msg.next_consume_time, msg.receipt_handle)
    except MQExceptionBase, e:
        if e.type == "MessageNotExist":
            print "No new message! RequestId: %s" % e.req_id
            continue

        print "Consume Message Fail! Exception:%s\n" % e
        time.sleep(2)
        continue

    # If the consumption of a message is not confirmed before msg.next_consume_time, the message will be consumed again.
    # A unique timestamp is specified for the handle of a message each time the message is consumed.
    try:
        receipt_handle_list = [msg.receipt_handle for msg in recv_msgs]
        consumer.ack_message(receipt_handle_list)
        print "Ak %s Message Succeed.\n\n" % len(receipt_handle_list)
    except MQExceptionBase, e:
        print "\nAk Message Fail! Exception:%s" % e
        # The consumption confirmation of some messages may fail due to timeout of the message handles.
        if e.sub_errors:
          for sub_error in e.sub_errors:
            print "\tErrorHandle:%s,ErrorCode:%s,ErrorMsg:%s" % (sub_error["ReceiptHandle"], sub_error["ErrorCode"], sub_error["ErrorMessage"])
                           
const {
  MQClient
} = require('@aliyunmq/mq-http-sdk');

// The domain name used for HTTP access. (The production environment hosted on Alibaba Cloud is used here as an example.)
const endpoint = "${HTTP_ENDPOINT}";
// The AccessKey ID that you created in the Alibaba Cloud console for verifying the Alibaba Cloud account.
const accessKeyId = "${ACCESS_KEY}";
// The AccessKey secret that you created in the Alibaba Cloud console for verifying the Alibaba Cloud account.
const accessKeySecret = "${SECRET_KEY}";

var client = new MQClient(endpoint, accessKeyId, accessKeySecret);

// The topic of the message.
const topic = "${TOPIC}";
// The group ID (consumer ID) that you created in the console.
const groupId = "${GROUP_ID}";
// The instance ID of the topic. This parameter is null by default.
const instanceId = "${INSTANCE_ID}";

const consumer = client.getConsumer(instanceId, topic, groupId);

(async function(){
  // Consume messages cyclically.
  while(true) {
    try {
      // Consume messages in long-polling mode.
      // In long-polling mode, if no message on the topic is available for consumption, the request is hung on the server for 3 seconds. If any message is available for consumption within the duration, a response is immediately sent to the client.
      res = await consumer.consumeMessage(
          3, // A maximum of 3 messages can be consumed at a time. The largest value is 16.
          3 // The duration of a long-polling cycle is 3 seconds. The largest value is 30 seconds.
          );

      if (res.code == 200) {
        // Consume messages and create data consumption logic.
        console.log("Consume Messages, requestId:%s", res.requestId);
        const handles = res.body.map((message) => {
          console.log("\tMessageId:%s,Tag:%s,PublishTime:%d,NextConsumeTime:%d,FirstConsumeTime:%d,ConsumedTimes:%d,Body:%s" + 
            ",Props:%j,MessageKey:%s,Prop-A:%s",
              message.MessageId, message.MessageTag, message.PublishTime, message.NextConsumeTime, message.FirstConsumeTime, message.ConsumedTimes,
              message.MessageBody,message.Properties,message.MessageKey,message.Properties.a);
          return message.ReceiptHandle;
        });

        // If the consumption of a message is not confirmed before message.NextConsumeTime, the message will be consumed again.
        // A unique timestamp is specified for the handle of a message each time the message is consumed.
        res = await consumer.ackMessage(handles);
        if (res.code ! = 204) {
          // The consumption confirmation of some messages may fail due to timeout of the message handles.
          console.log("Ack Message Fail:");
          const failHandles = res.body.map((error)=>{
            console.log("\tErrorHandle:%s, Code:%s, Reason:%s\n", error.ReceiptHandle, error.ErrorCode, error.ErrorMessage);
            return error.ReceiptHandle;
          });
          handles.forEach((handle)=>{
            if (failHandles.indexOf(handle) < 0) {
              console.log("\tSucHandle:%s\n", handle);
            }
          });
        } else {
          // The consumption of the message is confirmed.
          console.log("Ack Message suc, RequestId:%s\n\t", res.requestId, handles.join(','));
        }
      }
    } catch(e) {
      if (e.Code.indexOf("MessageNotExist") > -1) {
        // If no message can be consumed, the polling continues.
        console.log("Consume Message: no new message, RequestId:%s, Code:%s", e.RequestId, e.Code);
      } else {
        console.log(e);
      }
    }
  }
})();
                           
using System;
using System.Collections.Generic;
using System.Threading;
using Aliyun.MQ.Model;
using Aliyun.MQ.Model.Exp;
using Aliyun.MQ;

namespace Aliyun.MQ.Sample
{
	public class ConsumerSample
    {
        // The domain name used for HTTP access. (The production environment hosted on Alibaba Cloud is used here as an example.)
        private const string _endpoint = "${HTTP_ENDPOINT}";
        // The AccessKey ID that you created in the Alibaba Cloud console for verifying the Alibaba Cloud account.
        private const string _accessKeyId = "${ACCESS_KEY}";
        // The AccessKey secret that you created in the Alibaba Cloud console for verifying the Alibaba Cloud account.
        private const string _secretAccessKey = "${SECRET_KEY}";
        // The topic of the message.
        private const string _topicName = "${TOPIC}";
        // The instance ID of the topic. This parameter is null by default.
        private const string _instanceId = "${INSTANCE_ID}";
        // The group ID (consumer ID) that you created in the console.
        private const string _groupId = "${GROUP_ID}";

        private static MQClient _client = new Aliyun.MQ.MQClient(_accessKeyId, _secretAccessKey, _endpoint);
        static MQConsumer consumer = _client.GetConsumer(_instanceId, _topicName, _groupId, null);

        static void Main(string[] args)
        {
            // Consume messages cyclically in the current thread. We recommend that you use multiple threads to concurrently consume messages.
            while (true)
            {
                try
                {
                    // Consume messages in long-polling mode.
                    // In long-polling mode, if no message on the topic is available for consumption, the request is hung on the server for 3 seconds. If any message is available for consumption within the duration, a response is immediately sent to the client.
                    List<Message> messages = null;

                    try
                    {
                        messages = consumer.ConsumeMessage(
                            3, // A maximum of 3 messages can be consumed at a time. The largest value is 16.
                            3 // The duration of a long-polling cycle is 3 seconds. The largest value is 30 seconds.
                        );
                    }
                    catch (Exception exp1)
                    {
                        if (exp1 is MessageNotExistException)
                        {
                            Console.WriteLine(Thread.CurrentThread.Name + " No new message, " + ((MessageNotExistException)exp1).RequestId);
                            continue;
                        }
                        Console.WriteLine(exp1);
                        Thread.Sleep(2000);
                    }

                    if (messages == null)
                    {
                        continue;
                    }

                    List<string> handlers = new List<string>();
                    Console.WriteLine(Thread.CurrentThread.Name + " Receive Messages:");
                    // Data consumption logic.
                    foreach (Message message in messages)
                    {
                        Console.WriteLine(message);
                        Console.WriteLine("Property a is:" + message.GetProperty("a"));
                        handlers.Add(message.ReceiptHandle);
                    }
                    // If the consumption of a message is not confirmed before Message.nextConsumeTime, the message will be consumed again.
                    // A unique timestamp is specified for the handle of a message each time the message is consumed.
                    try
                    {
                        consumer.AckMessage(handlers);
                        Console.WriteLine("Ack message success:");
                        foreach (string handle in handlers)
                        {
                            Console.Write("\t" + handle);
                        }
                        Console.WriteLine();
                    }
                    catch (Exception exp2)
                    {
                        // The consumption confirmation of some messages may fail due to timeout of the message handles.
                        if (exp2 is AckMessageException)
                        {
                            AckMessageException ackExp = (AckMessageException)exp2;
                            Console.WriteLine("Ack message fail, RequestId:" + ackExp.RequestId);
                            foreach (AckMessageErrorItem errorItem in ackExp.ErrorItems)
                            {
                                Console.WriteLine("\tErrorHandle:" + errorItem.ReceiptHandle + ",ErrorCode:" + errorItem.ErrorCode + ",ErrorMsg:" + errorItem.ErrorMessage);
                            }
                        }
                    }
                }
                catch (Exception ex)
                {
                    Console.WriteLine(ex);
                    Thread.Sleep(2000);
                }
            }
        }
    }
}
                           
#include <vector>
#include <fstream>
#include "mq_http_sdk/mq_client.h"

#ifdef _WIN32
#include <windows.h>
#else
#include <unistd.h>
#endif

using namespace std;
using namespace mq::http::sdk;


int main() {

    MQClient mqClient(
            // The domain name used for HTTP access. (The production environment hosted on Alibaba Cloud is used here as an example.)
            "${HTTP_ENDPOINT}",
            // The AccessKey ID that you created in the Alibaba Cloud console for verifying the Alibaba Cloud account.
            "${ACCESS_KEY}",
            // The AccessKey secret that you created in the Alibaba Cloud console for verifying the Alibaba Cloud account.
            "${SECRET_KEY}"
            );

    // The topic of the message.
    string topic = "${TOPIC}";
    // The group ID (consumer ID) that you created in the console.
    string groupId = "${GROUP_ID}";
    // The instance ID of the topic. This parameter is null by default.
    string instanceId = "${INSTANCE_ID}";

    MQConsumerPtr consumer;
    if (instanceId == "") {
        consumer = mqClient.getConsumerRef(topic, groupId);
    } else {
        consumer = mqClient.getConsumerRef(instanceId, topic, groupId, "");
    }

    do {
        try {
            std::vector<Message> messages;
            // Consume messages in long-polling mode.
            // In long-polling mode, if no message on the topic is available for consumption, the request is hung on the server for 3 seconds. If any message is available for consumption within the duration, a response is immediately sent to the client.
            consumer->consumeMessage(
                    3,// A maximum of 3 messages can be consumed at a time. The largest value is 16.
                    3,// The duration of a long-polling cycle is 3 seconds. The largest value is 30 seconds.
                    messages
            );
            cout << "Consume: " << messages.size() << " Messages! " << endl;

            // Process the message.
            std::vector<std::string> receiptHandles;
            for (std::vector<Message>::iterator iter = messages.begin();
                    iter ! = messages.end(); ++iter)
            {
                cout << "MessageId: " << iter->getMessageId()
                    << " PublishTime: " << iter->getPublishTime()
                    << " Tag: " << iter->getMessageTag()
                    << " Body: " << iter->getMessageBody()
                    << " FirstConsumeTime: " << iter->getFirstConsumeTime()
                    << " NextConsumeTime: " << iter->getNextConsumeTime()
                    << " ConsumedTimes: " << iter->getConsumedTimes() 
                    << " Properties: " << iter->getPropertiesAsString() 
                    << " Key: " << iter->getMessageKey() << endl;
                receiptHandles.push_back(iter->getReceiptHandle());
            }

            // The consumption of the message is confirmed.
            // If the consumption of a message is not confirmed before Message.NextConsumeTime, the message will be consumed again.
            // A unique timestamp is specified for the handle of a message each time the message is consumed.
            AckMessageResponse bdmResp;
            consumer->ackMessage(receiptHandles, bdmResp);
            if (! bdmResp.isSuccess()) {
                // The consumption confirmation of some messages may fail due to timeout of the message handles.
                const std::vector<AckMessageFailedItem>& failedItems =
                    bdmResp.getAckMessageFailedItem();
                for (std::vector<AckMessageFailedItem>::const_iterator iter = failedItems.begin();
                        iter ! = failedItems.end(); ++iter)
                {
                    cout << "AckFailedItem: " << iter->errorCode
                        << "  " << iter->receiptHandle << endl;
                }
            } else {
                cout << "Ack: " << messages.size() << " messages suc! " << endl;
            }
        } catch (MQServerException& me) {
            if (me.GetErrorCode() == "MessageNotExist") {
                cout << "No message to consume! RequestId: " + me.GetRequestId() << endl;
                continue;
            }
            cout << "Request Failed: " + me.GetErrorCode() + ".RequestId: " + me.GetRequestId() << endl;
#ifdef _WIN32
            Sleep(2000);
#else
            usleep(2000 * 1000);
#endif
        } catch (MQExceptionBase& mb) {
            cout << "Request Failed: " + mb.ToString() << endl;
#ifdef _WIN32
            Sleep(2000);
#else
            usleep(2000 * 1000);
#endif
        }

    } while(true);
}