After you create all the resources in the console, you can call an HTTP SDK to send and receive messages in Message Queue for Apache RocketMQ.

Prerequisites

  • Create resources
    Note Normal messages are provided in this example. The topic that is created for normal messages cannot be used to send or receive other types of messages, such as scheduled messages, delayed messages, ordered messages, and transactional messages. You must create topics based on message types.
  • Create an AccessKey pair

Download and install an HTTP SDK

Message Queue for Apache RocketMQ provides the following HTTP SDKs for multiple programming languages. Download and install the client SDK for a specific language as needed.

Call HTTP SDKs to send messages

After you obtain the client SDK for a specific language, you can run the following sample code to send messages:

                           
import com.aliyun.mq.http.MQClient;
import com.aliyun.mq.http.MQProducer;
import com.aliyun.mq.http.model.TopicMessage;

import java.util.Date;

public class Producer {

    public static void main(String[] args) {
        MQClient mqClient = new MQClient(
                // Set an HTTP endpoint. 
                "${HTTP_ENDPOINT}",
                // The AccessKey ID that you created in the Alibaba Cloud Management Console. 
                "${ACCESS_KEY}",
                // The AccessKey secret that you created in the Alibaba Cloud Management Console. 
                "${SECRET_KEY}"
        );

        // The topic of the message. 
        final String topic = "${TOPIC}";
        // The instance ID of the topic. The default value is NULL. 
        final String instanceId = "${INSTANCE_ID}";

        // Obtain the producer of the message. 
        MQProducer producer;
        if (instanceId ! = null && instanceId ! = "") {
            producer = mqClient.getProducer(instanceId, topic);
        } else {
            producer = mqClient.getProducer(topic);
        }

        try {
            // Send four messages. 
            for (int i = 0; i < 4; i++) {
                TopicMessage pubMsg;
                if (i % 2 == 0) {
                    // A message. 
                    pubMsg = new TopicMessage(
                            // The message content. 
                            "hello mq! ".getBytes(),
                            // The message tag. 
                            "A"
                    );
                    // Set properties. 
                    pubMsg.getProperties().put("a", String.valueOf(i));
                    // Set keys. 
                    pubMsg.setMessageKey("MessageKey");
                } else {
                    pubMsg = new TopicMessage(
                            // The message content. 
                            "hello mq! ".getBytes(),
                            // The message tag. 
                            "A"
                    );
                    // Set properties. 
                    pubMsg.getProperties().put("a", String.valueOf(i));
                    // Schedule to send the message 10 seconds later. 
                    pubMsg.setStartDeliverTime(System.currentTimeMillis() + 10 * 1000);
                }
                // Synchronously send the message. The message is sent if no error occurs. 
                TopicMessage pubResultMsg = producer.publishMessage(pubMsg);

                // Synchronously send the message. The message is sent if no error occurs. 
                System.out.println(new Date() + " Send mq message success. Topic is:" + topic + ", msgId is: " + pubResultMsg.getMessageId()
                        + ", bodyMD5 is: " + pubResultMsg.getMessageBodyMD5());
            }
        } catch (Throwable e) {
            // Specify the logic to resend or persist the message if an error occurs. 
            System.out.println(new Date() + " Send mq message failed. Topic is:" + topic);
            e.printStackTrace();
        }

        mqClient.close();
    }

}
                           
package main

import (
    "fmt"
    "time"
    "strconv"

    "github.com/aliyunmq/mq-http-go-sdk"
)

func main() {
    // Set an HTTP endpoint. 
    endpoint := "${HTTP_ENDPOINT}"
    // The AccessKey ID that you created in the Alibaba Cloud Management Console. 
    accessKey := "${ACCESS_KEY}"
    // The AccessKey secret that you created in the Alibaba Cloud Management Console. 
    secretKey := "${SECRET_KEY}"
    // The topic of the message. 
    topic := "${TOPIC}"
    // The instance ID of the topic. The default value is NULL. 
    instanceId := "${INSTANCE_ID}"

    client := mq_http_sdk.NewAliyunMQClient(endpoint, accessKey, secretKey, "")

    mqProducer := client.GetProducer(instanceId, topic)
    // Send four messages. 
    for i := 0; i < 4; i++ {
        var msg mq_http_sdk.PublishMessageRequest
        if i%2 == 0 {
            msg = mq_http_sdk.PublishMessageRequest{
                MessageBody: "hello mq! ",         // The message content. 
                MessageTag:  "",                  // The message tag. 
                Properties:  map[string]string{}, // The message properties. 
            }
            // Set keys. 
            msg.MessageKey = "MessageKey"
            // Set properties. 
            msg.Properties["a"] = strconv.Itoa(i)
        } else {
            msg = mq_http_sdk.PublishMessageRequest{
                MessageBody: "hello mq timer! ",         // The message content. 
                MessageTag:  "",                  // The message tag. 
                Properties:  map[string]string{}, // The message properties. 
            }
            // Set properties. 
            msg.Properties["a"] = strconv.Itoa(i)
            // Schedule to send the message 10 seconds later. The value is a UNIX timestamp in milliseconds. 
            msg.StartDeliverTime = time.Now().UTC().Unix() * 1000 + 10 * 1000
        }
        ret, err := mqProducer.PublishMessage(msg)

        if err ! = nil {
            fmt.Println(err)
            return
        } else {
            fmt.Printf("Publish ---->\n\tMessageId:%s, BodyMD5:%s, \n", ret.MessageId, ret.MessageBodyMD5)
        }
        time.Sleep(time.Duration(100) * time.Millisecond)
    }
}
                           
<? php

require "vendor/autoload.php";

use MQ\Model\TopicMessage;
use MQ\MQClient;

class ProducerTest
{
    private $client;
    private $producer;

    public function __construct()
    {
        $this->client = new MQClient(
            // Set an HTTP endpoint. 
            "${HTTP_ENDPOINT}",
            // The AccessKey ID that you created in the Alibaba Cloud Management Console. 
            "${ACCESS_KEY}",
            // The AccessKey secret that you created in the Alibaba Cloud Management Console. 
            "${SECRET_KEY}"
        );

        // The topic of the message. 
        $topic = "${TOPIC}";
        // The instance ID of the topic. The default value is NULL. 
        $instanceId = "${INSTANCE_ID}";

        $this->producer = $this->client->getProducer($instanceId, $topic);
    }

    public function run()
    {
        try
        {
            for ($i=1; $i<=4; $i++)
            {
                $publishMessage = new TopicMessage(
                    "xxxxxxxx"// The message content. 
                );
                // Set properties. 
                $publishMessage->putProperty("a", $i);
                // Set keys. 
                $publishMessage->setMessageKey("MessageKey");
                if ($i % 2 == 0) {
                    // Schedule to send the message 10 seconds later. 
                    $publishMessage->setStartDeliverTime(time() * 1000 + 10 * 1000);
                }
                $result = $this->producer->publishMessage($publishMessage);

                print "Send mq message success. msgId is:" . $result->getMessageId() . ", bodyMD5 is:" . $result->getMessageBodyMD5() . "\n";
            }
        } catch (\Exception $e) {
            print_r($e->getMessage() . "\n");
        }
    }
}


$instance = new ProducerTest();
$instance->run();

? >
                           
#! /usr/bin/env python
# coding=utf8
import sys

from mq_http_sdk.mq_exception import MQExceptionBase
from mq_http_sdk.mq_producer import *
from mq_http_sdk.mq_client import *
import time

# Initialize the client. 
mq_client = MQClient(
    # Set an HTTP endpoint. 
    "${HTTP_ENDPOINT}",
    # The AccessKey ID that you created in the Alibaba Cloud Management Console. 
    "${ACCESS_KEY}",
    # The AccessKey secret that you created in the Alibaba Cloud Management Console. 
    "${SECRET_KEY}"
	)
# The topic of the message. 
topic_name = "${TOPIC}"
# The instance ID of the topic. The default value is None. 
instance_id = "${INSTANCE_ID}"

producer = mq_client.get_producer(instance_id, topic_name)

# Publish multiple messages. 
msg_count = 4
print("%sPublish Message To %s\nTopicName:%s\nMessageCount:%s\n" % (10 * "=", 10 * "=", topic_name, msg_count))

try:
    for i in range(msg_count):
        if i % 2 == 0:
            msg = TopicMessage(
                    # The message content. 
                    "I am test message %s. Hello" % i, 
                    # The message tag. 
                    ""
                    )
            # Set properties. 
            msg.put_property("a", "i")
            # Set keys. 
            msg.set_message_key("MessageKey")
            re_msg = producer.publish_message(msg)
            print("Publish Message Succeed. MessageID:%s, BodyMD5:%s" % (re_msg.message_id, re_msg.message_body_md5))
        else:
            msg = TopicMessage(
                    # The message content.
                    "I am test message %s." % i, 
                    # The message tag. 
                    ""
                    )
            msg.put_property("a", i)
            # Schedule an absolute time in milliseconds to send the message. 
            msg.set_start_deliver_time(int(round(time.time() * 1000)) + 5 * 1000)
            re_msg = producer.publish_message(msg)
            print("Publish Timer Message Succeed. MessageID:%s, BodyMD5:%s" % (re_msg.message_id, re_msg.message_body_md5))
        time.sleep(1)
except MQExceptionBase as e:
    if e.type == "TopicNotExist":
        print("Topic not exist, please create it.")
        sys.exit(1)
    print("Publish Message Fail. Exception:%s" % e)
                           
const {
  MQClient,
  MessageProperties
} = require('@aliyunmq/mq-http-sdk');

// Set an HTTP endpoint. 
const endpoint = "${HTTP_ENDPOINT}";
// The AccessKey ID that you created in the Alibaba Cloud Management Console. 
const accessKeyId = "${ACCESS_KEY}";
// The AccessKey secret that you created in the Alibaba Cloud Management Console. 
const accessKeySecret = "${SECRET_KEY}";

var client = new MQClient(endpoint, accessKeyId, accessKeySecret);

// The topic of the message. 
const topic = "${TOPIC}";
// The instance ID of the topic. The default value is NULL. 
const instanceId = "${INSTANCE_ID}";

const producer = client.getProducer(instanceId, topic);

(async function(){
  try {
    // Send four messages. 
    for(var i = 0; i < 4; i++) {
      let res;
      if (i % 2 == 0) {
        msgProps = new MessageProperties();
        // Set properties. 
        msgProps.putProperty("a", i);
        // Set keys. 
        msgProps.messageKey("MessageKey");
        res = await producer.publishMessage("hello mq.", "", msgProps);
      } else {
        msgProps = new MessageProperties();
        // Set properties. 
        msgProps.putProperty("a", i);
        // Schedule to send the message 10 seconds later. 
        msgProps.startDeliverTime(Date.now() + 10 * 1000);
        res = await producer.publishMessage("hello mq. timer msg! ", "TagA", msgProps);
      }
      console.log("Publish message: MessageID:%s,BodyMD5:%s", res.body.MessageId, res.body.MessageBodyMD5);
    }

  } catch(e) {
    // Specify the logic to resend or persist the message if an error occurs. 
    console.log(e)
  }
})();
                           
//#include <iostream>
#include <fstream>
#include <time.h>
#include "mq_http_sdk/mq_client.h"

using namespace std;
using namespace mq::http::sdk;


int main() {

    MQClient mqClient(
            // Set an HTTP endpoint. 
            "${HTTP_ENDPOINT}",
            // The AccessKey ID that you created in the Alibaba Cloud Management Console. 
            "${ACCESS_KEY}",
            // The AccessKey secret that you created in the Alibaba Cloud Management Console. 
            "${SECRET_KEY}"
            );

    // The topic of the message. 
    string topic = "${TOPIC}";
    // The instance ID of the topic. The default value is NULL. 
    string instanceId = "${INSTANCE_ID}";

    MQProducerPtr producer;
    if (instanceId == "") {
        producer = mqClient.getProducerRef(topic);
    } else {
        producer = mqClient.getProducerRef(instanceId, topic);
    }

    try {
        for (int i = 0; i < 4; i++)
        {
            PublishMessageResponse pmResp;
            if (i % 4 == 0) {
                // publish message, only have body. 
                producer->publishMessage("Hello, mq! ", pmResp);
            } else if (i % 4 == 1) {
                // publish message, only have body and tag.
                producer->publishMessage("Hello, mq! have tag! ", "tag", pmResp);
            } else if (i % 4 == 2) {
                // publish message, have body,tag,properties and key.
                TopicMessage pubMsg("Hello, mq! have key! ");
                pubMsg.putProperty("a",std::to_string(i));
                pubMsg.setMessageKey("MessageKey" + std::to_string(i));
                producer->publishMessage(pubMsg, pmResp);
            } else {
                // publish timer message, message will be consumed after StartDeliverTime
                TopicMessage pubMsg("Hello, mq! timer msg! ", "tag");
                // StartDeliverTime is an absolute time in millisecond.
                pubMsg.setStartDeliverTime(time(NULL) * 1000 + 10 * 1000);
                pubMsg.putProperty("b",std::to_string(i));
                pubMsg.putProperty("c",std::to_string(i));
                producer->publishMessage(pubMsg, pmResp);
            }
            cout << "Publish mq message success. Topic is: " << topic 
                << ", msgId is:" << pmResp.getMessageId() 
                << ", bodyMD5 is:" << pmResp.getMessageBodyMD5() << endl;
        }
    } catch (MQServerException& me) {
        cout << "Request Failed: " + me.GetErrorCode() << ", requestId is:" << me.GetRequestId() << endl;
        return -1;
    } catch (MQExceptionBase& mb) {
        cout << "Request Failed: " + mb.ToString() << endl;
        return -2;
    }

    return 0;
}
                           
using System;
using System.Collections.Generic;
using System.Threading;
using Aliyun.MQ.Model;
using Aliyun.MQ.Model.Exp;
using Aliyun.MQ.Util;

namespace Aliyun.MQ.Sample
{
    public class ProducerSample
    {
        // Set an HTTP endpoint. 
        private const string _endpoint = "${HTTP_ENDPOINT}";
        // The AccessKey ID that you created in the Alibaba Cloud Management Console. 
        private const string _accessKeyId = "${ACCESS_KEY}";
        // The AccessKey secret that you created in the Alibaba Cloud Management Console. 
        private const string _secretAccessKey = "${SECRET_KEY}";
        // The topic of the message. 
        private const string _topicName = "${TOPIC}";
        // The instance ID of the topic. The default value is NULL. 
        private const string _instanceId = "${INSTANCE_ID}";

        private static MQClient _client = new Aliyun.MQ.MQClient(_accessKeyId, _secretAccessKey, _endpoint);

        static MQProducer producer = _client.GetProducer(_instanceId, _topicName);

        static void Main(string[] args)
        {
            try
            {
                // Send four messages. 
                for (int i = 0; i < 4; i++)
                {
                    TopicMessage sendMsg;
                    if (i % 2 == 0)
                    {
                        sendMsg = new TopicMessage("dfadfadfadf");
                        // Set properties. 
                        sendMsg.PutProperty("a", i.ToString());
                        // Set keys. 
                        sendMsg.MessageKey = "MessageKey";
                    }
                    else
                    {
                        sendMsg = new TopicMessage("dfadfadfadf", "tag");
                        // Set properties. 
                        sendMsg.PutProperty("a", i.ToString());
                        // Schedule to send the message 10 seconds later. 
                        sendMsg.StartDeliverTime = AliyunSDKUtils.GetNowTimeStamp() + 10 * 1000;
                    }
                    TopicMessage result = producer.PublishMessage(sendMsg);
                    Console.WriteLine("publis message success:" + result);
                }
            }
            catch (Exception ex)
            {
                Console.Write(ex);
            }
        }
    }
}

You can also start your instance by performing the following steps: Log on to the Message Queue for Apache RocketMQ console. Find the created instance and click More in the Actions column. Select Quick Start from the drop-down list.

Call HTTP SDKs to receive messages

After a message is sent, you must enable a consumer client to receive the message. You can use the following sample code for a specific programming language based on your business requirements. Set the parameters based on the instructions.

                           
import com.aliyun.mq.http.MQClient;
 import com.aliyun.mq.http.MQConsumer;
 import com.aliyun.mq.http.common.AckMessageException;
 import com.aliyun.mq.http.model.Message;
 
 import java.util.ArrayList;
 import java.util.List;
 
 public class Consumer {
 
     public static void main(String[] args) {
         MQClient mqClient = new MQClient(
                 // Set an HTTP endpoint. 
                 "${HTTP_ENDPOINT}",
                 // The AccessKey ID that you created in the Alibaba Cloud Management Console. 
                 "${ACCESS_KEY}",
                 // The AccessKey secret that you created in the Alibaba Cloud Management Console. 
                 "${SECRET_KEY}"
         );
 
         // The topic of the message. 
         final String topic = "${TOPIC}";
         // The group ID (consumer ID) that you created in the console. 
         final String groupId = "${GROUP_ID}";
         // The instance ID of the topic. The default value is NULL. 
         final String instanceId = "${INSTANCE_ID}";
 
         final MQConsumer consumer;
         if (instanceId ! = null && instanceId ! = "") {
             consumer = mqClient.getConsumer(instanceId, topic, groupId, null);
         } else {
             consumer = mqClient.getConsumer(topic, groupId);
         }
 
         // Cyclically consume messages in the current thread. We recommend that you use multiple threads to concurrently consume messages. 
         do {
             List<Message> messages = null;
 
             try {
                 // Consume messages in long-polling mode. 
                 // In long-polling mode, if no message on the topic is available for consumption, the request is hung on the server for 3 seconds. If any message is available for consumption within the duration, a response is immediately sent to the client. 
                 messages = consumer.consumeMessage(
                         3,// A maximum of 3 messages can be consumed at a time. You can set the value up to 16. 
                         3 // The duration of a long-polling cycle is 3 seconds. You can set the value up to 30 seconds. 
                 );
             } catch (Throwable e) {
                 e.printStackTrace();
                 try {
                     Thread.sleep(2000);
                 } catch (InterruptedException e1) {
                     e1.printStackTrace();
                 }
             }
             // No messages. 
             if (messages == null || messages.isEmpty()) {
                 System.out.println(Thread.currentThread().getName() + ": no new message, continue! ");
                 continue;
             }
 
             // Specify business logic. 
             for (Message message : messages) {
                 System.out.println("Receive message: " + message);
             }
 
             // If the consumption of a message is not confirmed before the time that is specified by the Message.nextConsumeTime parameter, the message will be consumed again. 
             // A unique timestamp is specified for the handle of a message each time the message is consumed. 
             {
                 List<String> handles = new ArrayList<String>();
                 for (Message message : messages) {
                     handles.add(message.getReceiptHandle());
                 }
 
                 try {
                     consumer.ackMessage(handles);
                 } catch (Throwable e) {
                     // The confirmation of message consumption may fail due to the timeout of message handles. 
                     if (e instanceof AckMessageException) {
                         AckMessageException errors = (AckMessageException) e;
                         System.out.println("Ack message fail, requestId is:" + errors.getRequestId() + ", fail handles:");
                         if (errors.getErrorMessages() ! = null) {
                             for (String errorHandle :errors.getErrorMessages().keySet()) {
                                 System.out.println("Handle:" + errorHandle + ", ErrorCode:" + errors.getErrorMessages().get(errorHandle).getErrorCode()
                                         + ", ErrorMsg:" + errors.getErrorMessages().get(errorHandle).getErrorMessage());
                             }
                         }
                         continue;
                     }
                     e.printStackTrace();
                 }
             }
         } while (true);
     }
 }
 
                           
package main

import (
	"fmt"
	"github.com/gogap/errors"
	"strings"
	"time"

	"github.com/aliyunmq/mq-http-go-sdk"
)

func main() {
	// Set an HTTP endpoint. 
	endpoint := "${HTTP_ENDPOINT}"
	// The AccessKey ID that you created in the Alibaba Cloud Management Console. 
	accessKey := "${ACCESS_KEY}"
	// The AccessKey secret that you created in the Alibaba Cloud Management Console. 
	secretKey := "${SECRET_KEY}"
	// The topic of the message. 
	topic := "${TOPIC}"
	// The instance ID of the topic. The default value is NULL. 
	instanceId := "${INSTANCE_ID}"
	// The group ID (consumer ID) that you created in the console. 
	groupId := "${GROUP_ID}"

	client := mq_http_sdk.NewAliyunMQClient(endpoint, accessKey, secretKey, "")

	mqConsumer := client.GetConsumer(instanceId, topic, groupId, "")

	for {
		endChan := make(chan int)
		respChan := make(chan mq_http_sdk.ConsumeMessageResponse)
		errChan := make(chan error)
		go func() {
			select {
			case resp := <-respChan:
				{
					// Specify business logic. 
					var handles []string
					fmt.Printf("Consume %d messages---->\n", len(resp.Messages))
					for _, v := range resp.Messages {
						handles = append(handles, v.ReceiptHandle)
						fmt.Printf("\tMessageID: %s, PublishTime: %d, MessageTag: %s\n"+
							"\tConsumedTimes: %d, FirstConsumeTime: %d, NextConsumeTime: %d\n"+
                            "\tBody: %s\n"+
                            "\tProps: %s\n",
							v.MessageId, v.PublishTime, v.MessageTag, v.ConsumedTimes,
							v.FirstConsumeTime, v.NextConsumeTime, v.MessageBody, v.Properties)
					}

                    // If the consumption of a message is not confirmed before the time that is specified by the Message.nextConsumeTime parameter, the message will be consumed again. 
                    // A unique timestamp is specified for the handle of a message each time the message is consumed. 
                    ackerr := mqConsumer.AckMessage(handles)
					if ackerr ! = nil {
						// The confirmation of message consumption may fail due to the timeout of message handles. 
						fmt.Println(ackerr)
						for _, errAckItem := range ackerr.(errors.ErrCode).Context()["Detail"].([]mq_http_sdk.ErrAckItem) {
							fmt.Printf("\tErrorHandle:%s, ErrorCode:%s, ErrorMsg:%s\n",
								errAckItem.ErrorHandle, errAckItem.ErrorCode, errAckItem.ErrorMsg)
						}
						time.Sleep(time.Duration(3) * time.Second)
					} else {
						fmt.Printf("Ack ---->\n\t%s\n", handles)
					}

					endChan <- 1
				}
			case err := <-errChan:
				{
					// No messages. 
					if strings.Contains(err.(errors.ErrCode).Error(), "MessageNotExist") {
						fmt.Println("\nNo new message, continue! ")
					} else {
						fmt.Println(err)
						time.Sleep(time.Duration(3) * time.Second)
					}
					endChan <- 1
				}
			case <-time.After(35 * time.Second):
				{
					fmt.Println("Timeout of consumer message ?? ")
					endChan <- 1
				}
			}
		}()

		// Consume messages in long-polling mode. 
		// In long-polling mode, if no message on the topic is available for consumption, the request is hung on the server for 3 seconds. If any message is available for consumption within the duration, a response is immediately sent to the client. 
		mqConsumer.ConsumeMessage(respChan, errChan,
			3, // A maximum of 3 messages can be consumed at a time. You can set the value up to 16. 
			3 // The duration of a long-polling cycle is 3 seconds. You can set the value up to 30 seconds. 
		)
		<-endChan
	}
}
                           
<? php

require "vendor/autoload.php";

use MQ\Model\TopicMessage;
use MQ\MQClient;

class ConsumerTest
{
    private $client;
    private $consumer;

    public function __construct()
    {
        $this->client = new MQClient(
            // Set an HTTP endpoint. 
            "${HTTP_ENDPOINT}",
            // The AccessKey ID that you created in the Alibaba Cloud Management Console. 
            "${ACCESS_KEY}",
            // The AccessKey secret that you created in the Alibaba Cloud Management Console. 
            "${SECRET_KEY}"
        );

        // The topic of the message. 
        $topic = "${TOPIC}";
        // The group ID (consumer ID) that you created in the console. 
        $groupId = "${GROUP_ID}";
        // The instance ID of the topic. The default value is NULL. 
        $instanceId = "${INSTANCE_ID}";

        $this->consumer = $this->client->getConsumer($instanceId, $topic, $groupId);
    }

    public function run()
    {
        // Cyclically consume messages in the current thread. We recommend that you use multiple threads to concurrently consume messages. 
        while (True) {
            try {
                // Consume messages in long-polling mode. 
                // In long-polling mode, if no message on the topic is available for consumption, the request is hung on the server for 3 seconds. If any message is available for consumption within the duration, a response is immediately sent to the client. 
                $messages = $this->consumer->consumeMessage(
                    3, // A maximum of 3 messages can be consumed at a time. You can set the value up to 16. 
                    3 // The duration of a long-polling cycle is 3 seconds. You can set the value up to 30 seconds. 
                );
            } catch (\Exception $e) {
                if ($e instanceof MQ\Exception\MessageNotExistException) {
                    // If no message can be consumed, the polling continues. 
                    printf("No message, contine long polling! RequestId:%s\n", $e->getRequestId());
                    continue;
                }

                print_r($e->getMessage() . "\n");

                sleep(3);
                continue;
            }

            print "consume finish, messages:\n";

            // Specify business logic. 
            $receiptHandles = array();
            foreach ($messages as $message) {
                $receiptHandles[] = $message->getReceiptHandle();
                printf("MessageID:%s TAG:%s BODY:%s \nPublishTime:%d, FirstConsumeTime:%d, \nConsumedTimes:%d, NextConsumeTime:%d,MessageKey:%s\n",
                    $message->getMessageId(), $message->getMessageTag(), $message->getMessageBody(),
                    $message->getPublishTime(), $message->getFirstConsumeTime(), $message->getConsumedTimes(), $message->getNextConsumeTime(),
                    $message->getMessageKey());
                print_r($message->getProperties());
            }

            // If the consumption of a message is not confirmed before the time that is specified by the $message->getNextConsumeTime() parameter, the message will be consumed again. 
            // A unique timestamp is specified for the handle of a message each time the message is consumed. 
            print_r($receiptHandles);
            try {
                $this->consumer->ackMessage($receiptHandles);
            } catch (\Exception $e) {
                if ($e instanceof MQ\Exception\AckMessageException) {
                    // The confirmation of message consumption may fail due to the timeout of message handles. 
                    printf("Ack Error, RequestId:%s\n", $e->getRequestId());
                    foreach ($e->getAckMessageErrorItems() as $errorItem) {
                        printf("\tReceiptHandle:%s, ErrorCode:%s, ErrorMsg:%s\n", $errorItem->getReceiptHandle(), $errorItem->getErrorCode(), $errorItem->getErrorCode());
                    }
                }
            }
            print "ack finish\n";


        }

    }
}


$instance = new ConsumerTest();
$instance->run();

? >
                           
#! /usr/bin/env python
# coding=utf8

from mq_http_sdk.mq_exception import MQExceptionBase
from mq_http_sdk.mq_consumer import *
from mq_http_sdk.mq_client import *

# Initialize the client.
mq_client = MQClient(
    # Set an HTTP endpoint. 
    "",
    # The AccessKey ID that you created in the Alibaba Cloud Management Console. 
    "${ACCESS_KEY}",
    # The AccessKey secret that you created in the Alibaba Cloud Management Console. 
    "${SECRET_KEY}"
  )
# The topic of the message. 
topic_name = "${TOPIC}"
# The group ID that you created in the console. 
group_id = "GID_test"
# The instance ID of the topic. The default value is None. 
instance_id = "MQ_INST_1380156306793859_BbXbx0Y4"

consumer = mq_client.get_consumer(instance_id, topic_name, group_id)

# In long-polling mode, if no message on the topic is available for consumption, the request is hung on the server for 3 seconds. If any message is available for consumption within the duration, a response is immediately sent to the client. 
# The long polling interval is 3 seconds. You can set the value up to 30 seconds. 
wait_seconds = 3
#  A maximum of 3 messages can be consumed at a time. You can set the value up to 16. 
batch = 3
print "%sConsume And Ak Message From Topic%s\nTopicName:%s\nMQConsumer:%s\nWaitSeconds:%s\n" % (10 * "=", 10 * "=", topic_name, group_id, wait_seconds)
while True:
    try:
        # Consume messages in long-polling mode. 
        recv_msgs = consumer.consume_message(batch, wait_seconds)
        for msg in recv_msgs:
            print "Receive, MessageId: %s\nMessageBodyMD5: %s \
                              \nMessageTag: %s\nConsumedTimes: %s \
                              \nPublishTime: %s\nBody: %s \
                              \nNextConsumeTime: %s \
                              \nReceiptHandle: %s" % \
                             (msg.message_id, msg.message_body_md5,
                              msg.message_tag, msg.consumed_times,
                              msg.publish_time, msg.message_body,
                              msg.next_consume_time, msg.receipt_handle)
    except MQExceptionBase, e:
        if e.type == "MessageNotExist":
            print "No new message! RequestId: %s" % e.req_id
            continue

        print "Consume Message Fail! Exception:%s\n" % e
        time.sleep(2)
        continue

    # If the consumption of a message is not confirmed before the time that is specified by the msg.next_consume_time parameter, the message will be consumed again. 
    // A unique timestamp is specified for the handle of a message each time the message is consumed. 
    try:
        receipt_handle_list = [msg.receipt_handle for msg in recv_msgs]
        consumer.ack_message(receipt_handle_list)
        print "Ak %s Message Succeed.\n\n" % len(receipt_handle_list)
    except MQExceptionBase, e:
        print "\nAk Message Fail! Exception:%s" % e
        // The confirmation of message consumption may fail due to the timeout of message handles. 
        if e.sub_errors:
          for sub_error in e.sub_errors:
            print "\tErrorHandle:%s,ErrorCode:%s,ErrorMsg:%s" % (sub_error["ReceiptHandle"], sub_error["ErrorCode"], sub_error["ErrorMessage"])
                           
const {
  MQClient
} = require('@aliyunmq/mq-http-sdk');

// Set an HTTP endpoint. 
const endpoint = "${HTTP_ENDPOINT}";
// The AccessKey ID that you created in the Alibaba Cloud Management Console. 
const accessKeyId = "${ACCESS_KEY}";
// The AccessKey secret that you created in the Alibaba Cloud Management Console. 
const accessKeySecret = "${SECRET_KEY}";

var client = new MQClient(endpoint, accessKeyId, accessKeySecret);

// The topic of the message. 
const topic = "${TOPIC}";
// The group ID (consumer ID) that you created in the console. 
const groupId = "${GROUP_ID}";
// The instance ID of the topic. The default value is NULL. 
const instanceId = "${INSTANCE_ID}";

const consumer = client.getConsumer(instanceId, topic, groupId);

(async function(){
  // Cyclically consume messages. 
  while(true) {
    try {
      // Consume messages in long-polling mode. 
      // In long-polling mode, if no message on the topic is available for consumption, the request is hung on the server for 3 seconds. If any message is available for consumption within the duration, a response is immediately sent to the client. 
      res = await consumer.consumeMessage(
          3, // A maximum of 3 messages can be consumed at a time. You can set the value up to 16. 
          3 // The duration of a long-polling cycle is 3 seconds. You can set the value up to 30 seconds. 
          );

      if (res.code == 200) {
        // Specify the logic to consume messages. 
        console.log("Consume Messages, requestId:%s", res.requestId);
        const handles = res.body.map((message) => {
          console.log("\tMessageId:%s,Tag:%s,PublishTime:%d,NextConsumeTime:%d,FirstConsumeTime:%d,ConsumedTimes:%d,Body:%s" + 
            ",Props:%j,MessageKey:%s,Prop-A:%s",
              message.MessageId, message.MessageTag, message.PublishTime, message.NextConsumeTime, message.FirstConsumeTime, message.ConsumedTimes,
              message.MessageBody,message.Properties,message.MessageKey,message.Properties.a);
          return message.ReceiptHandle;
        });

        // If the consumption of a message is not confirmed before the time that is specified by the message.NextConsumeTime parameter, the message will be consumed again. 
        // A unique timestamp is specified for the handle of a message each time the message is consumed. 
        res = await consumer.ackMessage(handles);
        if (res.code ! = 204) {
          // The confirmation of message consumption may fail due to the timeout of message handles. 
          console.log("Ack Message Fail:");
          const failHandles = res.body.map((error)=>{
            console.log("\tErrorHandle:%s, Code:%s, Reason:%s\n", error.ReceiptHandle, error.ErrorCode, error.ErrorMessage);
            return error.ReceiptHandle;
          });
          handles.forEach((handle)=>{
            if (failHandles.indexOf(handle) < 0) {
              console.log("\tSucHandle:%s\n", handle);
            }
          });
        } else {
          // The consumption of the message is confirmed. 
          console.log("Ack Message suc, RequestId:%s\n\t", res.requestId, handles.join(','));
        }
      }
    } catch(e) {
      if (e.Code.indexOf("MessageNotExist") > -1) {
        // If no message can be consumed, the polling continues. 
        console.log("Consume Message: no new message, RequestId:%s, Code:%s", e.RequestId, e.Code);
      } else {
        console.log(e);
      }
    }
  }
})();
                           
using System;
using System.Collections.Generic;
using System.Threading;
using Aliyun.MQ.Model;
using Aliyun.MQ.Model.Exp;
using Aliyun.MQ;

namespace Aliyun.MQ.Sample
{
	public class ConsumerSample
    {
        // Set an HTTP endpoint. 
        private const string _endpoint = "${HTTP_ENDPOINT}";
        // The AccessKey ID that you created in the Alibaba Cloud Management Console. 
        private const string _accessKeyId = "${ACCESS_KEY}";
        // The AccessKey secret that you created in the Alibaba Cloud Management Console. 
        private const string _secretAccessKey = "${SECRET_KEY}";
        // The topic of the message. 
        private const string _topicName = "${TOPIC}";
        // The instance ID of the topic. The default value is NULL. 
        private const string _instanceId = "${INSTANCE_ID}";
        // The group ID (consumer ID) that you created in the console. 
        private const string _groupId = "${GROUP_ID}";

        private static MQClient _client = new Aliyun.MQ.MQClient(_accessKeyId, _secretAccessKey, _endpoint);
        static MQConsumer consumer = _client.GetConsumer(_instanceId, _topicName, _groupId, null);

        static void Main(string[] args)
        {
            // Cyclically consume messages in the current thread. We recommend that you use multiple threads to concurrently consume messages. 
            while (true)
            {
                try
                {
                    // Consume messages in long-polling mode. 
                    // In long-polling mode, if no message on the topic is available for consumption, the request is hung on the server for 3 seconds. If any message is available for consumption within the duration, a response is immediately sent to the client. 
                    List<Message> messages = null;

                    try
                    {
                        messages = consumer.ConsumeMessage(
                            3, // A maximum of 3 messages can be consumed at a time. You can set the value up to 16. 
                            3 // The duration of a long-polling cycle is 3 seconds. You can set the value up to 30 seconds. 
                        );
                    }
                    catch (Exception exp1)
                    {
                        if (exp1 is MessageNotExistException)
                        {
                            Console.WriteLine(Thread.CurrentThread.Name + " No new message, " + ((MessageNotExistException)exp1).RequestId);
                            continue;
                        }
                        Console.WriteLine(exp1);
                        Thread.Sleep(2000);
                    }

                    if (messages == null)
                    {
                        continue;
                    }

                    List<string> handlers = new List<string>();
                    Console.WriteLine(Thread.CurrentThread.Name + " Receive Messages:");
                    // Specify business logic. 
                    foreach (Message message in messages)
                    {
                        Console.WriteLine(message);
                        Console.WriteLine("Property a is:" + message.GetProperty("a"));
                        handlers.Add(message.ReceiptHandle);
                    }
                    // If the consumption of a message is not confirmed before the time that is specified by the Message.nextConsumeTime parameter, the message will be consumed again. 
                    // A unique timestamp is specified for the handle of a message each time the message is consumed. 
                    try
                    {
                        consumer.AckMessage(handlers);
                        Console.WriteLine("Ack message success:");
                        foreach (string handle in handlers)
                        {
                            Console.Write("\t" + handle);
                        }
                        Console.WriteLine();
                    }
                    catch (Exception exp2)
                    {
                        // The confirmation of message consumption may fail due to the timeout of message handles. 
                        if (exp2 is AckMessageException)
                        {
                            AckMessageException ackExp = (AckMessageException)exp2;
                            Console.WriteLine("Ack message fail, RequestId:" + ackExp.RequestId);
                            foreach (AckMessageErrorItem errorItem in ackExp.ErrorItems)
                            {
                                Console.WriteLine("\tErrorHandle:" + errorItem.ReceiptHandle + ",ErrorCode:" + errorItem.ErrorCode + ",ErrorMsg:" + errorItem.ErrorMessage);
                            }
                        }
                    }
                }
                catch (Exception ex)
                {
                    Console.WriteLine(ex);
                    Thread.Sleep(2000);
                }
            }
        }
    }
}
                           
#include <vector>
#include <fstream>
#include "mq_http_sdk/mq_client.h"

#ifdef _WIN32
#include <windows.h>
#else
#include <unistd.h>
#endif

using namespace std;
using namespace mq::http::sdk;


int main() {

    MQClient mqClient(
            // Set an HTTP endpoint. 
            "${HTTP_ENDPOINT}",
            // The AccessKey ID that you created in the Alibaba Cloud Management Console. 
            "${ACCESS_KEY}",
            // The AccessKey secret that you created in the Alibaba Cloud Management Console. 
            "${SECRET_KEY}"
            );

    // The topic of the message. 
    string topic = "${TOPIC}";
    // The group ID (consumer ID) that you created in the console. 
    string groupId = "${GROUP_ID}";
    // The instance ID of the topic. The default value is NULL. 
    string instanceId = "${INSTANCE_ID}";

    MQConsumerPtr consumer;
    if (instanceId == "") {
        consumer = mqClient.getConsumerRef(topic, groupId);
    } else {
        consumer = mqClient.getConsumerRef(instanceId, topic, groupId, "");
    }

    do {
        try {
            std::vector<Message> messages;
            // Consume messages in long-polling mode. 
            // In long-polling mode, if no message on the topic is available for consumption, the request is hung on the server for 3 seconds. If any message is available for consumption within the duration, a response is immediately sent to the client. 
            consumer->consumeMessage(
                    3, // A maximum of 3 messages can be consumed at a time. You can set the value up to 16. 
                    3,// The duration of a long-polling cycle is 3 seconds. You can set the value up to 30 seconds. 
                    messages
            );
            cout << "Consume: " << messages.size() << " Messages! " << endl;

            // Process the message. 
            std::vector<std::string> receiptHandles;
            for (std::vector<Message>::iterator iter = messages.begin();
                    iter ! = messages.end(); ++iter)
            {
                cout << "MessageId: " << iter->getMessageId()
                    << " PublishTime: " << iter->getPublishTime()
                    << " Tag: " << iter->getMessageTag()
                    << " Body: " << iter->getMessageBody()
                    << " FirstConsumeTime: " << iter->getFirstConsumeTime()
                    << " NextConsumeTime: " << iter->getNextConsumeTime()
                    << " ConsumedTimes: " << iter->getConsumedTimes() 
                    << " Properties: " << iter->getPropertiesAsString() 
                    << " Key: " << iter->getMessageKey() << endl;
                receiptHandles.push_back(iter->getReceiptHandle());
            }

            // The consumption of the message is confirmed. 
            // If the consumption of a message is not confirmed before the time that is specified by the Message.NextConsumeTime parameter, the message will be consumed again. 
            // A unique timestamp is specified for the handle of a message each time the message is consumed. 
            AckMessageResponse bdmResp;
            consumer->ackMessage(receiptHandles, bdmResp);
            if (! bdmResp.isSuccess()) {
                // The confirmation of message consumption may fail due to the timeout of message handles. 
                const std::vector<AckMessageFailedItem>& failedItems =
                    bdmResp.getAckMessageFailedItem();
                for (std::vector<AckMessageFailedItem>::const_iterator iter = failedItems.begin();
                        iter ! = failedItems.end(); ++iter)
                {
                    cout << "AckFailedItem: " << iter->errorCode
                        << "  " << iter->receiptHandle << endl;
                }
            } else {
                cout << "Ack: " << messages.size() << " messages suc! " << endl;
            }
        } catch (MQServerException& me) {
            if (me.GetErrorCode() == "MessageNotExist") {
                cout << "No message to consume! RequestId: " + me.GetRequestId() << endl;
                continue;
            }
            cout << "Request Failed: " + me.GetErrorCode() + ".RequestId: " + me.GetRequestId() << endl;
#ifdef _WIN32
            Sleep(2000);
#else
            usleep(2000 * 1000);
#endif
        } catch (MQExceptionBase& mb) {
            cout << "Request Failed: " + mb.ToString() << endl;
#ifdef _WIN32
            Sleep(2000);
#else
            usleep(2000 * 1000);
#endif
        }

    } while(true);
}

What to do next

You can query messages and their traces to verify whether messages are consumed. For more information, see Query messages and Query a message trace.