After you create the required resources in the Message Queue for Apache RocketMQ console, you can use Message Queue for Apache RocketMQ HTTP client SDK to send and subscribe to normal messages.

Prerequisites

  • Create resources
    Note In this topic, normal messages are used in the example. The topic that is created for normal messages cannot be used to send or subscribe to other types of messages, such as scheduled messages, delayed messages, ordered messages, and transactional messages. You must create topics based on message types.
  • An AccessKey pair is obtained.

Download and install an HTTP client SDK

Message Queue for Apache RocketMQ provides the following HTTP client SDKs for multiple programming languages. Download and install a client SDK for a specific language based on your business requirements.

Use HTTP client SDKs to send normal messages

After you obtain the client SDK for a specific programming language, you can run the sample code of the programming language to send normal messages:

                           
import com.aliyun.mq.http.MQClient;
import com.aliyun.mq.http.MQProducer;
import com.aliyun.mq.http.model.TopicMessage;

import java.util.Date;

public class Producer {

    public static void main(String[] args) {
        MQClient mqClient = new MQClient(
                // Specify the HTTP endpoint. 
                "${HTTP_ENDPOINT}",
                // Specify the AccessKey ID. The AccessKey ID is used for identity authentication. For information about how to obtain an AccessKey ID, see Obtain an AccessKey pair in the Prerequisites section. 
                "${ACCESS_KEY}",
                // Specify the AccessKey secret. The AccessKey secret is used for identity authentication. For information about how to obtain an AccessKey secret, see Obtain an AccessKey pair in the Prerequisites section. 
                "${SECRET_KEY}"
        );

        // Specify the topic that you want to send the messages. 
        final String topic = "${TOPIC}";
        // Specify the ID of the instance to which the topic belongs. The default value is NULL. 
        final String instanceId = "${INSTANCE_ID}";

        // Obtain the producer that sends the messages to the topic. 
        MQProducer producer;
        if (instanceId != null && instanceId != "") {
            producer = mqClient.getProducer(instanceId, topic);
        } else {
            producer = mqClient.getProducer(topic);
        }

        try {
            // Cyclically send four messages. 
            for (int i = 0; i < 4; i++) {
                TopicMessage pubMsg;
                if (i % 2 == 0) {
                    // Send the messages as normal messages. 
                    pubMsg = new TopicMessage(
                            // Specify the message content. 
                            "hello mq!".getBytes(),
                            // Specify the message tag. 
                            "A"
                    );
                    // Specify the attributes. 
                    pubMsg.getProperties().put("a", String.valueOf(i));
                    // Specify the message key. 
                    pubMsg.setMessageKey("MessageKey");
                } else {
                    pubMsg = new TopicMessage(
                            // Specify the message content. 
                            "hello mq!".getBytes(),
                            // Specify the message tag. 
                            "A"
                    );
                    // Specify the attributes. 
                    pubMsg.getProperties().put("a", String.valueOf(i));
                    // Schedule to send the messages 10 seconds later. 
                    pubMsg.setStartDeliverTime(System.currentTimeMillis() + 10 * 1000);
                }
                // Send the messages in synchronous mode. If no exception is thrown, the messages are sent. 
                TopicMessage pubResultMsg = producer.publishMessage(pubMsg);

                // Send the messages in synchronous mode. If no exception is thrown, the messages are sent. 
                System.out.println(new Date() + " Send mq message success. Topic is:" + topic + ", msgId is: " + pubResultMsg.getMessageId()
                        + ", bodyMD5 is: " + pubResultMsg.getMessageBodyMD5());
            }
        } catch (Throwable e) {
            // A message failed to be sent and needs to be sent again. You can resend the message or persist the message. 
            System.out.println(new Date() + " Send mq message failed. Topic is:" + topic);
            e.printStackTrace();
        }

        mqClient.close();
    }

}
                           
package main

import (
    "fmt"
    "time"
    "strconv"

    "github.com/aliyunmq/mq-http-go-sdk"
)

func main() {
    // Specify the HTTP endpoint. 
    endpoint := "${HTTP_ENDPOINT}"
    // Specify the AccessKey ID. The AccessKey ID is used for identity authentication. For information about how to obtain an AccessKey ID, see Obtain an AccessKey pair in the Prerequisites section. 
    accessKey := "${ACCESS_KEY}"
    // Specify the AccessKey secret. The AccessKey secret is used for identity authentication. For information about how to obtain an AccessKey secret, see Obtain an AccessKey pair in the Prerequisites section. 
    secretKey := "${SECRET_KEY}"
    // Specify the topic that you want to send the messages. 
    topic := "${TOPIC}"
    // Specify the ID of the instance to which the topic belongs. The default value is NULL. 
    instanceId := "${INSTANCE_ID}"

    client := mq_http_sdk.NewAliyunMQClient(endpoint, accessKey, secretKey, "")

    mqProducer := client.GetProducer(instanceId, topic)
    // Cyclically send four messages. 
    for i := 0; i < 4; i++ {
        var msg mq_http_sdk.PublishMessageRequest
        if i%2 == 0 {
            msg = mq_http_sdk.PublishMessageRequest{
                MessageBody: "hello mq!",         // Specify the message content. 
                MessageTag:  "",                  // Specify the message tag. 
                Properties:  map[string]string{}, // Specify the format of the attributes. 
            }
            // Specify the message key. 
            msg.MessageKey = "MessageKey"
            // Specify the attributes. 
            msg.Properties["a"] = strconv.Itoa(i)
        } else {
            msg = mq_http_sdk.PublishMessageRequest{
                MessageBody: "hello mq timer!",         // Specify the message content. 
                MessageTag:  "",                  // Specify the message tag. 
                Properties:  map[string]string{}, // Specify the format of the attributes. 
            }
            // Specify the attributes. 
            msg.Properties["a"] = strconv.Itoa(i)
            // Schedule to send the messages 10 seconds later. The value is a UNIX timestamp in milliseconds. 
            msg.StartDeliverTime = time.Now().UTC().Unix() * 1000 + 10 * 1000
        }
        ret, err := mqProducer.PublishMessage(msg)

        if err != nil {
            fmt.Println(err)
            return
        } else {
            fmt.Printf("Publish ---->\n\tMessageId:%s, BodyMD5:%s, \n", ret.MessageId, ret.MessageBodyMD5)
        }
        time.Sleep(time.Duration(100) * time.Millisecond)
    }
}
                           
<?php

require "vendor/autoload.php";

use MQ\Model\TopicMessage;
use MQ\MQClient;

class ProducerTest
{
    private $client;
    private $producer;

    public function __construct()
    {
        $this->client = new MQClient(
            // Specify the HTTP endpoint. 
            "${HTTP_ENDPOINT}",
            // Specify the AccessKey ID. The AccessKey ID is used for identity authentication. For information about how to obtain an AccessKey ID, see Obtain an AccessKey pair in the Prerequisites section. 
            "${ACCESS_KEY}",
            // Specify the AccessKey secret. The AccessKey secret is used for identity authentication. For information about how to obtain an AccessKey secret, see Obtain an AccessKey pair in the Prerequisites section. 
            "${SECRET_KEY}"
        );

        // Specify the topic that you want to send the messages. 
        $topic = "${TOPIC}";
        // Specify the ID of the instance to which the topic belongs. The default value is NULL. 
        $instanceId = "${INSTANCE_ID}";

        $this->producer = $this->client->getProducer($instanceId, $topic);
    }

    public function run()
    {
        try
        {
            for ($i=1; $i<=4; $i++)
            {
                $publishMessage = new TopicMessage(
                    "xxxxxxxx"// Specify the message content. 
                );
                // Specify the attributes. 
                $publishMessage->putProperty("a", $i);
                // Specify the message key. 
                $publishMessage->setMessageKey("MessageKey");
                if ($i % 2 == 0) {
                    // Schedule to send the messages 10 seconds later. 
                    $publishMessage->setStartDeliverTime(time() * 1000 + 10 * 1000);
                }
                $result = $this->producer->publishMessage($publishMessage);

                print "Send mq message success. msgId is:" . $result->getMessageId() . ", bodyMD5 is:" . $result->getMessageBodyMD5() . "\n";
            }
        } catch (\Exception $e) {
            print_r($e->getMessage() . "\n");
        }
    }
}


$instance = new ProducerTest();
$instance->run();

?>
                           
#!/usr/bin/env python
# coding=utf8
import sys

from mq_http_sdk.mq_exception import MQExceptionBase
from mq_http_sdk.mq_producer import *
from mq_http_sdk.mq_client import *
import time

# Initialize the Message Queue for Apache RocketMQ client. 
mq_client = MQClient(
    # Specify the HTTP endpoint. 
    "${HTTP_ENDPOINT}",
    # Specify the AccessKey ID. The AccessKey ID is used for identity authentication. For information about how to obtain an AccessKey ID, see Obtain an AccessKey pair in the Prerequisites section. 
    "${ACCESS_KEY}",
    # Specify the AccessKey secret. The AccessKey secret is used for identity authentication. For information about how to obtain an AccessKey secret, see Obtain an AccessKey pair in the Prerequisites section. 
    "${SECRET_KEY}"
	)
# Specify the topic that you want to send the messages. 
topic_name = "${TOPIC}"
# Specify the ID of the instance to which the topic belongs. The default value is None. 
instance_id = "${INSTANCE_ID}"

producer = mq_client.get_producer(instance_id, topic_name)

# Cyclically send multiple messages. 
msg_count = 4
print("%sPublish Message To %s\nTopicName:%s\nMessageCount:%s\n" % (10 * "=", 10 * "=", topic_name, msg_count))

try:
    for i in range(msg_count):
        if i % 2 == 0:
            msg = TopicMessage(
                    # Specify the message content. 
                    "I am test message %s.Hello" % i, 
                    # Specify the message tag. 
                    ""
                    )
            # Specify the attributes. 
            msg.put_property("a", "i")
            # Specify the message key. 
            msg.set_message_key("MessageKey")
            re_msg = producer.publish_message(msg)
            print("Publish Message Succeed. MessageID:%s, BodyMD5:%s" % (re_msg.message_id, re_msg.message_body_md5))
        else:
            msg = TopicMessage(
                    # Specify the message content. 
                    "I am test message %s." % i, 
                    # Specify the message tag. 
                    ""
                    )
            msg.put_property("a", i)
            # Schedule the absolute time in milliseconds to send the messages. 
            msg.set_start_deliver_time(int(round(time.time() * 1000)) + 5 * 1000)
            re_msg = producer.publish_message(msg)
            print("Publish Timer Message Succeed. MessageID:%s, BodyMD5:%s" % (re_msg.message_id, re_msg.message_body_md5))
        time.sleep(1)
except MQExceptionBase as e:
    if e.type == "TopicNotExist":
        print("Topic not exist, please create it.")
        sys.exit(1)
    print("Publish Message Fail. Exception:%s" % e)
                           
const {
  MQClient,
  MessageProperties
} = require('@aliyunmq/mq-http-sdk');

// Specify the HTTP endpoint. 
const endpoint = "${HTTP_ENDPOINT}";
// Specify the AccessKey ID. The AccessKey ID is used for identity authentication. For information about how to obtain an AccessKey ID, see Obtain an AccessKey pair in the Prerequisites section. 
const accessKeyId = "${ACCESS_KEY}";
// Specify the AccessKey secret. The AccessKey secret is used for identity authentication. For information about how to obtain an AccessKey secret, see Obtain an AccessKey pair in the Prerequisites section. 
const accessKeySecret = "${SECRET_KEY}";

var client = new MQClient(endpoint, accessKeyId, accessKeySecret);

// Specify the topic that you want to send the messages. 
const topic = "${TOPIC}";
// Specify the ID of the instance to which the topic belongs. The default value is NULL. 
const instanceId = "${INSTANCE_ID}";

const producer = client.getProducer(instanceId, topic);

(async function(){
  try {
    // Cyclically send four messages. 
    for(var i = 0; i < 4; i++) {
      let res;
      if (i % 2 == 0) {
        msgProps = new MessageProperties();
        // Specify the attributes. 
        msgProps.putProperty("a", i);
        // Specify the message key. 
        msgProps.messageKey("MessageKey");
        res = await producer.publishMessage("hello mq.", "", msgProps);
      } else {
        msgProps = new MessageProperties();
        // Specify the attributes. 
        msgProps.putProperty("a", i);
        // Schedule to send the messages 10 seconds later. 
        msgProps.startDeliverTime(Date.now() + 10 * 1000);
        res = await producer.publishMessage("hello mq. timer msg!", "TagA", msgProps);
      }
      console.log("Publish message: MessageID:%s,BodyMD5:%s", res.body.MessageId, res.body.MessageBodyMD5);
    }

  } catch(e) {
    // A message failed to be sent and needs to be sent again. You can resend the message or persist the message. 
    console.log(e)
  }
})();
                           
//#include <iostream>
#include <fstream>
#include <time.h>
#include "mq_http_sdk/mq_client.h"

using namespace std;
using namespace mq::http::sdk;


int main() {

    MQClient mqClient(
            // Specify the HTTP endpoint. 
            "${HTTP_ENDPOINT}",
            // Specify the AccessKey ID. The AccessKey ID is used for identity authentication. For information about how to obtain an AccessKey ID, see Obtain an AccessKey pair in the Prerequisites section. 
            "${ACCESS_KEY}",
            // Specify the AccessKey secret. The AccessKey secret is used for identity authentication. For information about how to obtain an AccessKey secret, see Obtain an AccessKey pair in the Prerequisites section. 
            "${SECRET_KEY}"
            );

    // Specify the topic that you want to send the messages. 
    string topic = "${TOPIC}";
    // Specify the ID of the instance to which the topic belongs. The default value is NULL. 
    string instanceId = "${INSTANCE_ID}";

    MQProducerPtr producer;
    if (instanceId == "") {
        producer = mqClient.getProducerRef(topic);
    } else {
        producer = mqClient.getProducerRef(instanceId, topic);
    }

    try {
        for (int i = 0; i < 4; i++)
        {
            PublishMessageResponse pmResp;
            if (i % 4 == 0) {
                // publish message, only have body. 
                producer->publishMessage("Hello, mq!", pmResp);
            } else if (i % 4 == 1) {
                // publish message, only have body and tag.
                producer->publishMessage("Hello, mq!have tag!", "tag", pmResp);
            } else if (i % 4 == 2) {
                // publish message, have body,tag,properties and key.
                TopicMessage pubMsg("Hello, mq!have key!");
                pubMsg.putProperty("a",std::to_string(i));
                pubMsg.setMessageKey("MessageKey" + std::to_string(i));
                producer->publishMessage(pubMsg, pmResp);
            } else {
                // publish timer message, message will be consumed after StartDeliverTime
                TopicMessage pubMsg("Hello, mq!timer msg!", "tag");
                // StartDeliverTime is an absolute time in millisecond.
                pubMsg.setStartDeliverTime(time(NULL) * 1000 + 10 * 1000);
                pubMsg.putProperty("b",std::to_string(i));
                pubMsg.putProperty("c",std::to_string(i));
                producer->publishMessage(pubMsg, pmResp);
            }
            cout << "Publish mq message success. Topic is: " << topic 
                << ", msgId is:" << pmResp.getMessageId() 
                << ", bodyMD5 is:" << pmResp.getMessageBodyMD5() << endl;
        }
    } catch (MQServerException& me) {
        cout << "Request Failed: " + me.GetErrorCode() << ", requestId is:" << me.GetRequestId() << endl;
        return -1;
    } catch (MQExceptionBase& mb) {
        cout << "Request Failed: " + mb.ToString() << endl;
        return -2;
    }

    return 0;
}
                           
using System;
using System.Collections.Generic;
using System.Threading;
using Aliyun.MQ.Model;
using Aliyun.MQ.Model.Exp;
using Aliyun.MQ.Util;

namespace Aliyun.MQ.Sample
{
    public class ProducerSample
    {
        // Specify the HTTP endpoint. 
        private const string _endpoint = "${HTTP_ENDPOINT}";
        // Specify the AccessKey ID. The AccessKey ID is used for identity authentication. For information about how to obtain an AccessKey ID, see Obtain an AccessKey pair in the Prerequisites section. 
        private const string _accessKeyId = "${ACCESS_KEY}";
        // Specify the AccessKey secret. The AccessKey secret is used for identity authentication. For information about how to obtain an AccessKey secret, see Obtain an AccessKey pair in the Prerequisites section. 
        private const string _secretAccessKey = "${SECRET_KEY}";
        // Specify the topic that you want to send the messages. 
        private const string _topicName = "${TOPIC}";
        // Specify the ID of the instance to which the topic belongs. The default value is NULL. 
        private const string _instanceId = "${INSTANCE_ID}";

        private static MQClient _client = new Aliyun.MQ.MQClient(_accessKeyId, _secretAccessKey, _endpoint);

        static MQProducer producer = _client.GetProducer(_instanceId, _topicName);

        static void Main(string[] args)
        {
            try
            {
                // Cyclically send four messages. 
                for (int i = 0; i < 4; i++)
                {
                    TopicMessage sendMsg;
                    if (i % 2 == 0)
                    {
                        sendMsg = new TopicMessage("dfadfadfadf");
                        // Specify the attributes. 
                        sendMsg.PutProperty("a", i.ToString());
                        // Specify the message key. 
                        sendMsg.MessageKey = "MessageKey";
                    }
                    else
                    {
                        sendMsg = new TopicMessage("dfadfadfadf", "tag");
                        // Specify the attributes. 
                        sendMsg.PutProperty("a", i.ToString());
                        // Schedule to send the messages 10 seconds later. 
                        sendMsg.StartDeliverTime = AliyunSDKUtils.GetNowTimeStamp() + 10 * 1000;
                    }
                    TopicMessage result = producer.PublishMessage(sendMsg);
                    Console.WriteLine("publis message success:" + result);
                }
            }
            catch (Exception ex)
            {
                Console.Write(ex);
            }
        }
    }
}

You can also start your instance by performing the following steps: Log on to the Message Queue for Apache RocketMQ console. Find the created instance and click More in the Actions column. Select Quick Start from the drop-down list.

Use HTTP client SDKs to consume normal messages

After normal messages are sent, you must start consumers to consume the messages. You can use the following sample code based on the programming language and your business requirements to start consumers and test the message consumption feature. Specify the parameters based on comments in the following code:

                           
import com.aliyun.mq.http.MQClient;
 import com.aliyun.mq.http.MQConsumer;
 import com.aliyun.mq.http.common.AckMessageException;
 import com.aliyun.mq.http.model.Message;
 
 import java.util.ArrayList;
 import java.util.List;
 
 public class Consumer {
 
     public static void main(String[] args) {
         MQClient mqClient = new MQClient(
                 // Specify the HTTP endpoint. 
                 "${HTTP_ENDPOINT}",
                 // Specify the AccessKey ID. The AccessKey ID is used for identity authentication. For information about how to obtain an AccessKey ID, see Obtain an AccessKey pair in the Prerequisites section. 
                 "${ACCESS_KEY}",
                 // Specify the AccessKey secret. The AccessKey secret is used for identity authentication. For information about how to obtain an AccessKey secret, see Obtain an AccessKey pair in the Prerequisites section. 
                 "${SECRET_KEY}"
         );
 
         // Specify the topic to which the messages belong. 
         final String topic = "${TOPIC}";
         // Specify the ID of the group that you created in the Message Queue for Apache RocketMQ console. A group ID was called a consumer ID. 
         final String groupId = "${GROUP_ID}";
         // Specify the ID of the instance to which the topic belongs. The default value is NULL. 
         final String instanceId = "${INSTANCE_ID}";
 
         final MQConsumer consumer;
         if (instanceId != null && instanceId != "") {
             consumer = mqClient.getConsumer(instanceId, topic, groupId, null);
         } else {
             consumer = mqClient.getConsumer(topic, groupId);
         }
 
         // Cyclically consume the messages in the thread. We recommend that you use multiple threads to concurrently consume the messages. 
         do {
             List<Message> messages = null;
 
             try {
                 // Consume the messages in long polling mode. 
                 // In long-polling mode, if no message on the topic is available for consumption, the request is hung on the server for three seconds. If a message is available for consumption within the duration, a response is immediately sent to the client. 
                 messages = consumer.consumeMessage(
                         3,// Specify the maximum number of messages that can be consumed at a time. In this example, the value is set to 3. The maximum value that you can specify is 16. 
                         3// Specify the long polling period. Unit: seconds. In this example, the value is set to 3. The maximum value that you can specify is 30. 
                 );
             } catch (Throwable e) {
                 e.printStackTrace();
                 try {
                     Thread.sleep(2000);
                 } catch (InterruptedException e1) {
                     e1.printStackTrace();
                 }
             }
             // Specify how the system processes the scenarios in which no messages exist. 
             if (messages == null || messages.isEmpty()) {
                 System.out.println(Thread.currentThread().getName() + ": no new message, continue!");
                 continue;
             }
 
             // Specify the logic to process the messages. 
             for (Message message : messages) {
                 System.out.println("Receive message: " + message);
             }
 
             // If a broker fails to receive an acknowledgment (ACK) for a message from a consumer before the period of time specified by the Message.nextConsumeTime parameter elapses, the message is consumed again. 
             // A unique timestamp is specified for the handle of a message each time the message is consumed. 
             {
                 List<String> handles = new ArrayList<String>();
                 for (Message message : messages) {
                     handles.add(message.getReceiptHandle());
                 }
 
                 try {
                     consumer.ackMessage(handles);
                 } catch (Throwable e) {
                     // A broker fails to receive an ACK for a message from a consumer if the handle of the message times out. 
                     if (e instanceof AckMessageException) {
                         AckMessageException errors = (AckMessageException) e;
                         System.out.println("Ack message fail, requestId is:" + errors.getRequestId() + ", fail handles:");
                         if (errors.getErrorMessages() != null) {
                             for (String errorHandle :errors.getErrorMessages().keySet()) {
                                 System.out.println("Handle:" + errorHandle + ", ErrorCode:" + errors.getErrorMessages().get(errorHandle).getErrorCode()
                                         + ", ErrorMsg:" + errors.getErrorMessages().get(errorHandle).getErrorMessage());
                             }
                         }
                         continue;
                     }
                     e.printStackTrace();
                 }
             }
         } while (true);
     }
 }
 
                           
package main

import (
	"fmt"
	"github.com/gogap/errors"
	"strings"
	"time"

	"github.com/aliyunmq/mq-http-go-sdk"
)

func main() {
	// Specify the HTTP endpoint. 
	endpoint := "${HTTP_ENDPOINT}"
	// Specify the AccessKey ID. The AccessKey ID is used for identity authentication. For information about how to obtain an AccessKey ID, see Obtain an AccessKey pair in the Prerequisites section. 
	accessKey := "${ACCESS_KEY}"
	// Specify the AccessKey secret. The AccessKey secret is used for identity authentication. For information about how to obtain an AccessKey secret, see Obtain an AccessKey pair in the Prerequisites section. 
	secretKey := "${SECRET_KEY}"
	// Specify the topic to which the messages belong. 
	topic := "${TOPIC}"
	// Specify the ID of the instance to which the topic belongs. The default value is NULL. 
	instanceId := "${INSTANCE_ID}"
	// Specify the ID of the group that you created in the Message Queue for Apache RocketMQ console. A group ID was called a consumer ID. 
	groupId := "${GROUP_ID}"

	client := mq_http_sdk.NewAliyunMQClient(endpoint, accessKey, secretKey, "")

	mqConsumer := client.GetConsumer(instanceId, topic, groupId, "")

	for {
		endChan := make(chan int)
		respChan := make(chan mq_http_sdk.ConsumeMessageResponse)
		errChan := make(chan error)
		go func() {
			select {
			case resp := <-respChan:
				{
					// Specify the logic to process the messages. 
					var handles []string
					fmt.Printf("Consume %d messages---->\n", len(resp.Messages))
					for _, v := range resp.Messages {
						handles = append(handles, v.ReceiptHandle)
						fmt.Printf("\tMessageID: %s, PublishTime: %d, MessageTag: %s\n"+
							"\tConsumedTimes: %d, FirstConsumeTime: %d, NextConsumeTime: %d\n"+
                            "\tBody: %s\n"+
                            "\tProps: %s\n",
							v.MessageId, v.PublishTime, v.MessageTag, v.ConsumedTimes,
							v.FirstConsumeTime, v.NextConsumeTime, v.MessageBody, v.Properties)
					}

                    // If a broker fails to receive an ACK for a message from a consumer before the period of time specified by the NextConsumeTime parameter elapses, the message is consumed again. 
                    // A unique timestamp is specified for the handle of a message each time the message is consumed. 
                    ackerr := mqConsumer.AckMessage(handles)
					if ackerr != nil {
						// A broker fails to receive an ACK for a message from a consumer if the handle of the message times out. 
						fmt.Println(ackerr)
						for _, errAckItem := range ackerr.(errors.ErrCode).Context()["Detail"].([]mq_http_sdk.ErrAckItem) {
							fmt.Printf("\tErrorHandle:%s, ErrorCode:%s, ErrorMsg:%s\n",
								errAckItem.ErrorHandle, errAckItem.ErrorCode, errAckItem.ErrorMsg)
						}
						time.Sleep(time.Duration(3) * time.Second)
					} else {
						fmt.Printf("Ack ---->\n\t%s\n", handles)
					}

					endChan <- 1
				}
			case err := <-errChan:
				{
					// Specify how the system processes the scenarios in which no messages exist. 
					if strings.Contains(err.(errors.ErrCode).Error(), "MessageNotExist") {
						fmt.Println("\nNo new message, continue!")
					} else {
						fmt.Println(err)
						time.Sleep(time.Duration(3) * time.Second)
					}
					endChan <- 1
				}
			case <-time.After(35 * time.Second):
				{
					fmt.Println("Timeout of consumer message ??")
					endChan <- 1
				}
			}
		}()

		// Consume the messages in long polling mode. 
		// In long-polling mode, if no message on the topic is available for consumption, the request is hung on the server for three seconds. If a message is available for consumption within the duration, a response is immediately sent to the client. 
		mqConsumer.ConsumeMessage(respChan, errChan,
			3, //  Specify the maximum number of messages that can be consumed at a time. In this example, the value is set to 3. The maximum value that you can specify is 16. 
			3, // Specify the long polling period. Unit: seconds. In this example, the value is set to 3. The maximum value that you can specify is 30. 
		)
		<-endChan
	}
}
                           
<?php

require "vendor/autoload.php";

use MQ\Model\TopicMessage;
use MQ\MQClient;

class ConsumerTest
{
    private $client;
    private $consumer;

    public function __construct()
    {
        $this->client = new MQClient(
            // Specify the HTTP endpoint. 
            "${HTTP_ENDPOINT}",
            // Specify the AccessKey ID. The AccessKey ID is used for identity authentication. For information about how to obtain an AccessKey ID, see Obtain an AccessKey pair in the Prerequisites section. 
            "${ACCESS_KEY}",
            // Specify the AccessKey secret. The AccessKey secret is used for identity authentication. For information about how to obtain an AccessKey secret, see Obtain an AccessKey pair in the Prerequisites section. 
            "${SECRET_KEY}"
        );

        // Specify the topic to which the messages belong. 
        $topic = "${TOPIC}";
        // Specify the ID of the group that you created in the Message Queue for Apache RocketMQ console. A group ID was called a consumer ID. 
        $groupId = "${GROUP_ID}";
        // Specify the ID of the instance to which the topic belongs. The default value is NULL. 
        $instanceId = "${INSTANCE_ID}";

        $this->consumer = $this->client->getConsumer($instanceId, $topic, $groupId);
    }

    public function run()
    {
        // Cyclically consume the messages in the thread. We recommend that you use multiple threads to concurrently consume the messages. 
        while (True) {
            try {
                // Consume the messages in long polling mode. 
                // In long-polling mode, if no message on the topic is available for consumption, the request is hung on the server for three seconds. If a message is available for consumption within the duration, a response is immediately sent to the client. 
                $messages = $this->consumer->consumeMessage(
                    3, //  Specify the maximum number of messages that can be consumed at a time. In this example, the value is set to 3. The maximum value that you can specify is 16. 
                    3 // Specify the long polling period. Unit: seconds. In this example, the value is set to 3. The maximum value that you can specify is 30. 
                );
            } catch (\Exception $e) {
                if ($e instanceof MQ\Exception\MessageNotExistException) {
                    // If no message can be consumed, the polling continues. 
                    printf("No message, contine long polling!RequestId:%s\n", $e->getRequestId());
                    continue;
                }

                print_r($e->getMessage() . "\n");

                sleep(3);
                continue;
            }

            print "consume finish, messages:\n";

            // Specify the logic to process the messages. 
            $receiptHandles = array();
            foreach ($messages as $message) {
                $receiptHandles[] = $message->getReceiptHandle();
                printf("MessageID:%s TAG:%s BODY:%s \nPublishTime:%d, FirstConsumeTime:%d, \nConsumedTimes:%d, NextConsumeTime:%d,MessageKey:%s\n",
                    $message->getMessageId(), $message->getMessageTag(), $message->getMessageBody(),
                    $message->getPublishTime(), $message->getFirstConsumeTime(), $message->getConsumedTimes(), $message->getNextConsumeTime(),
                    $message->getMessageKey());
                print_r($message->getProperties());
            }

            // If a broker fails to receive an ACK for a message from a consumer before the period of time specified in $message->getNextConsumeTime() elapses, the message is consumed again. 
            // A unique timestamp is specified for the handle of a message each time the message is consumed. 
            print_r($receiptHandles);
            try {
                $this->consumer->ackMessage($receiptHandles);
            } catch (\Exception $e) {
                if ($e instanceof MQ\Exception\AckMessageException) {
                    // A broker fails to receive an ACK for a message from a consumer if the handle of the message times out. 
                    printf("Ack Error, RequestId:%s\n", $e->getRequestId());
                    foreach ($e->getAckMessageErrorItems() as $errorItem) {
                        printf("\tReceiptHandle:%s, ErrorCode:%s, ErrorMsg:%s\n", $errorItem->getReceiptHandle(), $errorItem->getErrorCode(), $errorItem->getErrorCode());
                    }
                }
            }
            print "ack finish\n";


        }

    }
}


$instance = new ConsumerTest();
$instance->run();

?>
                           
#!/usr/bin/env python
# coding=utf8

from mq_http_sdk.mq_exception import MQExceptionBase
from mq_http_sdk.mq_consumer import *
from mq_http_sdk.mq_client import *

# Initialize the Message Queue for Apache RocketMQ client.
mq_client = MQClient(
    # Specify the HTTP endpoint. 
    "",
    # Specify the AccessKey ID. The AccessKey ID is used for identity authentication. For information about how to obtain an AccessKey ID, see Obtain an AccessKey pair in the Prerequisites section. 
    "${ACCESS_KEY}",
    # Specify the AccessKey secret. The AccessKey secret is used for identity authentication. For information about how to obtain an AccessKey secret, see Obtain an AccessKey pair in the Prerequisites section. 
    "${SECRET_KEY}"
  )
# Specify the topic to which the messages belong. 
topic_name = "${TOPIC}"
# Specify the ID of the group that you created in the Message Queue for Apache RocketMQ console. 
group_id = "GID_test"
# Specify the ID of the instance to which the topic belongs. The default value is None. 
instance_id = "MQ_INST_1380156306793859_BbXbx0Y4"

consumer = mq_client.get_consumer(instance_id, topic_name, group_id)

# In long-polling mode, if no message on the topic is available for consumption, the request is hung on the server for three seconds. If a message is available for consumption within the duration, a response is immediately sent to the client. 
# Specify the long polling period. Unit: seconds. In this example, the value is set to 3. The maximum value that you can specify is 30. 
wait_seconds = 3
# Specify the maximum number of messages that can be consumed at a time. In this example, the value is set to 3. The maximum value that you can specify is 16. 
batch = 3
print "%sConsume And Ak Message From Topic%s\nTopicName:%s\nMQConsumer:%s\nWaitSeconds:%s\n" % (10 * "=", 10 * "=", topic_name, group_id, wait_seconds)
while True:
    try:
        # Consume the messages in long polling mode. 
        recv_msgs = consumer.consume_message(batch, wait_seconds)
        for msg in recv_msgs:
            print "Receive, MessageId: %s\nMessageBodyMD5: %s \
                              \nMessageTag: %s\nConsumedTimes: %s \
                              \nPublishTime: %s\nBody: %s \
                              \nNextConsumeTime: %s \
                              \nReceiptHandle: %s" % \
                             (msg.message_id, msg.message_body_md5,
                              msg.message_tag, msg.consumed_times,
                              msg.publish_time, msg.message_body,
                              msg.next_consume_time, msg.receipt_handle)
    except MQExceptionBase, e:
        if e.type == "MessageNotExist":
            print "No new message! RequestId: %s" % e.req_id
            continue

        print "Consume Message Fail! Exception:%s\n" % e
        time.sleep(2)
        continue

    # If a broker fails to receive an ACK for a message from a consumer before the period of time specified by the msg.next_consume_time parameter elapses, the message is consumed again. 
    # A unique timestamp is specified for the handle of a message each time the message is consumed. 
    try:
        receipt_handle_list = [msg.receipt_handle for msg in recv_msgs]
        consumer.ack_message(receipt_handle_list)
        print "Ak %s Message Succeed.\n\n" % len(receipt_handle_list)
    except MQExceptionBase, e:
        print "\nAk Message Fail! Exception:%s" % e
        # A broker fails to receive an ACK for a message from a consumer if the handle of the message times out. 
        if e.sub_errors:
          for sub_error in e.sub_errors:
            print "\tErrorHandle:%s,ErrorCode:%s,ErrorMsg:%s" % (sub_error["ReceiptHandle"], sub_error["ErrorCode"], sub_error["ErrorMessage"])
                           
const {
  MQClient
} = require('@aliyunmq/mq-http-sdk');

// Specify the HTTP endpoint. 
const endpoint = "${HTTP_ENDPOINT}";
// Specify the AccessKey ID. The AccessKey ID is used for identity authentication. For information about how to obtain an AccessKey ID, see Obtain an AccessKey pair in the Prerequisites section. 
const accessKeyId = "${ACCESS_KEY}";
// Specify the AccessKey secret. The AccessKey secret is used for identity authentication. For information about how to obtain an AccessKey secret, see Obtain an AccessKey pair in the Prerequisites section. 
const accessKeySecret = "${SECRET_KEY}";

var client = new MQClient(endpoint, accessKeyId, accessKeySecret);

// Specify the topic to which the messages belong. 
const topic = "${TOPIC}";
// Specify the ID of the group that you created in the Message Queue for Apache RocketMQ console. A group ID was called a consumer ID. 
const groupId = "${GROUP_ID}";
// Specify the ID of the instance to which the topic belongs. The default value is NULL. 
const instanceId = "${INSTANCE_ID}";

const consumer = client.getConsumer(instanceId, topic, groupId);

(async function(){
  // Cyclically consume the messages. 
  while(true) {
    try {
      // Consume the messages in long polling mode. 
      // In long-polling mode, if no message on the topic is available for consumption, the request is hung on the server for three seconds. If a message is available for consumption within the duration, a response is immediately sent to the client. 
      res = await consumer.consumeMessage(
          3, //  Specify the maximum number of messages that can be consumed at a time. In this example, the value is set to 3. The maximum value that you can specify is 16. 
          3 // Specify the long polling period. Unit: seconds. In this example, the value is set to 3. The maximum value that you can specify is 30. 
          );

      if (res.code == 200) {
        // Specify the logic to consume the messages. 
        console.log("Consume Messages, requestId:%s", res.requestId);
        const handles = res.body.map((message) => {
          console.log("\tMessageId:%s,Tag:%s,PublishTime:%d,NextConsumeTime:%d,FirstConsumeTime:%d,ConsumedTimes:%d,Body:%s" + 
            ",Props:%j,MessageKey:%s,Prop-A:%s",
              message.MessageId, message.MessageTag, message.PublishTime, message.NextConsumeTime, message.FirstConsumeTime, message.ConsumedTimes,
              message.MessageBody,message.Properties,message.MessageKey,message.Properties.a);
          return message.ReceiptHandle;
        });

        // If a broker fails to receive an ACK for a message from a consumer before the period of time specified by the message.NextConsumeTime parameter elapses, the message is consumed again. 
        // A unique timestamp is specified for the handle of a message each time the message is consumed. 
        res = await consumer.ackMessage(handles);
        if (res.code != 204) {
          // A broker fails to receive an ACK for a message from a consumer if the handle of the message times out. 
          console.log("Ack Message Fail:");
          const failHandles = res.body.map((error)=>{
            console.log("\tErrorHandle:%s, Code:%s, Reason:%s\n", error.ReceiptHandle, error.ErrorCode, error.ErrorMessage);
            return error.ReceiptHandle;
          });
          handles.forEach((handle)=>{
            if (failHandles.indexOf(handle) < 0) {
              console.log("\tSucHandle:%s\n", handle);
            }
          });
        } else {
          // Obtain an ACK from a consumer. 
          console.log("Ack Message suc, RequestId:%s\n\t", res.requestId, handles.join(','));
        }
      }
    } catch(e) {
      if (e.Code.indexOf("MessageNotExist") > -1) {
        // If no message is available for consumption in the topic, the long polling mode continues to take effect. 
        console.log("Consume Message: no new message, RequestId:%s, Code:%s", e.RequestId, e.Code);
      } else {
        console.log(e);
      }
    }
  }
})();
                           
using System;
using System.Collections.Generic;
using System.Threading;
using Aliyun.MQ.Model;
using Aliyun.MQ.Model.Exp;
using Aliyun.MQ;

namespace Aliyun.MQ.Sample
{
	public class ConsumerSample
    {
        // Specify the HTTP endpoint. 
        private const string _endpoint = "${HTTP_ENDPOINT}";
        // Specify the AccessKey ID. The AccessKey ID is used for identity authentication. For information about how to obtain an AccessKey ID, see Obtain an AccessKey pair in the Prerequisites section. 
        private const string _accessKeyId = "${ACCESS_KEY}";
        // Specify the AccessKey secret. The AccessKey secret is used for identity authentication. For information about how to obtain an AccessKey secret, see Obtain an AccessKey pair in the Prerequisites section. 
        private const string _secretAccessKey = "${SECRET_KEY}";
        // Specify the topic to which the messages belong. 
        private const string _topicName = "${TOPIC}";
        // Specify the ID of the instance to which the topic belongs. The default value is NULL. 
        private const string _instanceId = "${INSTANCE_ID}";
        // Specify the ID of the group that you created in the Message Queue for Apache RocketMQ console. A group ID was called a consumer ID. 
        private const string _groupId = "${GROUP_ID}";

        private static MQClient _client = new Aliyun.MQ.MQClient(_accessKeyId, _secretAccessKey, _endpoint);
        static MQConsumer consumer = _client.GetConsumer(_instanceId, _topicName, _groupId, null);

        static void Main(string[] args)
        {
            // Cyclically consume the messages in the thread. We recommend that you use multiple threads to concurrently consume the messages. 
            while (true)
            {
                try
                {
                    // Consume the messages in long polling mode. 
                    // In long-polling mode, if no message on the topic is available for consumption, the request is hung on the server for three seconds. If a message is available for consumption within the duration, a response is immediately sent to the client. 
                    List<Message> messages = null;

                    try
                    {
                        messages = consumer.ConsumeMessage(
                            3, //  Specify the maximum number of messages that can be consumed at a time. In this example, the value is set to 3. The maximum value that you can specify is 16. 
                            3 // Specify the long polling period. Unit: seconds. In this example, the value is set to 3. The maximum value that you can specify is 30. 
                        );
                    }
                    catch (Exception exp1)
                    {
                        if (exp1 is MessageNotExistException)
                        {
                            Console.WriteLine(Thread.CurrentThread.Name + " No new message, " + ((MessageNotExistException)exp1).RequestId);
                            continue;
                        }
                        Console.WriteLine(exp1);
                        Thread.Sleep(2000);
                    }

                    if (messages == null)
                    {
                        continue;
                    }

                    List<string> handlers = new List<string>();
                    Console.WriteLine(Thread.CurrentThread.Name + " Receive Messages:");
                    // Specify the logic to process the messages. 
                    foreach (Message message in messages)
                    {
                        Console.WriteLine(message);
                        Console.WriteLine("Property a is:" + message.GetProperty("a"));
                        handlers.Add(message.ReceiptHandle);
                    }
                    // If a broker fails to receive an ACK for a message from a consumer before the period of time specified by the Message.nextConsumeTime parameter elapses, the message is consumed again. 
                    // A unique timestamp is specified for the handle of a message each time the message is consumed. 
                    try
                    {
                        consumer.AckMessage(handlers);
                        Console.WriteLine("Ack message success:");
                        foreach (string handle in handlers)
                        {
                            Console.Write("\t" + handle);
                        }
                        Console.WriteLine();
                    }
                    catch (Exception exp2)
                    {
                        // A broker fails to receive an ACK for a message from a consumer if the handle of the message times out. 
                        if (exp2 is AckMessageException)
                        {
                            AckMessageException ackExp = (AckMessageException)exp2;
                            Console.WriteLine("Ack message fail, RequestId:" + ackExp.RequestId);
                            foreach (AckMessageErrorItem errorItem in ackExp.ErrorItems)
                            {
                                Console.WriteLine("\tErrorHandle:" + errorItem.ReceiptHandle + ",ErrorCode:" + errorItem.ErrorCode + ",ErrorMsg:" + errorItem.ErrorMessage);
                            }
                        }
                    }
                }
                catch (Exception ex)
                {
                    Console.WriteLine(ex);
                    Thread.Sleep(2000);
                }
            }
        }
    }
}
                           
#include <vector>
#include <fstream>
#include "mq_http_sdk/mq_client.h"

#ifdef _WIN32
#include <windows.h>
#else
#include <unistd.h>
#endif

using namespace std;
using namespace mq::http::sdk;


int main() {

    MQClient mqClient(
            // Specify the HTTP endpoint. 
            "${HTTP_ENDPOINT}",
            // Specify the AccessKey ID. The AccessKey ID is used for identity authentication. For information about how to obtain an AccessKey ID, see Obtain an AccessKey pair in the Prerequisites section. 
            "${ACCESS_KEY}",
            // Specify the AccessKey secret. The AccessKey secret is used for identity authentication. For information about how to obtain an AccessKey secret, see Obtain an AccessKey pair in the Prerequisites section. 
            "${SECRET_KEY}"
            );

    // Specify the topic to which the messages belong. 
    string topic = "${TOPIC}";
    // Specify the ID of the group that you created in the Message Queue for Apache RocketMQ console. A group ID was called a consumer ID. 
    string groupId = "${GROUP_ID}";
    // Specify the ID of the instance to which the topic belongs. The default value is NULL. 
    string instanceId = "${INSTANCE_ID}";

    MQConsumerPtr consumer;
    if (instanceId == "") {
        consumer = mqClient.getConsumerRef(topic, groupId);
    } else {
        consumer = mqClient.getConsumerRef(instanceId, topic, groupId, "");
    }

    do {
        try {
            std::vector<Message> messages;
            // Consume the messages in long polling mode. 
            // In long-polling mode, if no message on the topic is available for consumption, the request is hung on the server for three seconds. If a message is available for consumption within the duration, a response is immediately sent to the client. 
            consumer->consumeMessage(
                    3,// Specify the maximum number of messages that can be consumed at a time. In this example, the value is set to 3. The maximum value that you can specify is 16. 
                    3,// Specify the long polling period. Unit: seconds. In this example, the value is set to 3. The maximum value that you can specify is 30. 
                    messages
            );
            cout << "Consume: " << messages.size() << " Messages!" << endl;

            // Process the messages. 
            std::vector<std::string> receiptHandles;
            for (std::vector<Message>::iterator iter = messages.begin();
                    iter != messages.end(); ++iter)
            {
                cout << "MessageId: " << iter->getMessageId()
                    << " PublishTime: " << iter->getPublishTime()
                    << " Tag: " << iter->getMessageTag()
                    << " Body: " << iter->getMessageBody()
                    << " FirstConsumeTime: " << iter->getFirstConsumeTime()
                    << " NextConsumeTime: " << iter->getNextConsumeTime()
                    << " ConsumedTimes: " << iter->getConsumedTimes() 
                    << " Properties: " << iter->getPropertiesAsString() 
                    << " Key: " << iter->getMessageKey() << endl;
                receiptHandles.push_back(iter->getReceiptHandle());
            }

            // Obtain an ACK from a consumer. 
            // If a broker fails to receive an ACK for a message from a consumer before the period of time specified by the Message.NextConsumeTime parameter elapses, the message is consumed again. 
            // A unique timestamp is specified for the handle of a message each time the message is consumed. 
            AckMessageResponse bdmResp;
            consumer->ackMessage(receiptHandles, bdmResp);
            if (!bdmResp.isSuccess()) {
                // A broker fails to receive an ACK for a message from a consumer if the handle of the message times out. 
                const std::vector<AckMessageFailedItem>& failedItems =
                    bdmResp.getAckMessageFailedItem();
                for (std::vector<AckMessageFailedItem>::const_iterator iter = failedItems.begin();
                        iter != failedItems.end(); ++iter)
                {
                    cout << "AckFailedItem: " << iter->errorCode
                        << "  " << iter->receiptHandle << endl;
                }
            } else {
                cout << "Ack: " << messages.size() << " messages suc!" << endl;
            }
        } catch (MQServerException& me) {
            if (me.GetErrorCode() == "MessageNotExist") {
                cout << "No message to consume! RequestId: " + me.GetRequestId() << endl;
                continue;
            }
            cout << "Request Failed: " + me.GetErrorCode() + ".RequestId: " + me.GetRequestId() << endl;
#ifdef _WIN32
            Sleep(2000);
#else
            usleep(2000 * 1000);
#endif
        } catch (MQExceptionBase& mb) {
            cout << "Request Failed: " + mb.ToString() << endl;
#ifdef _WIN32
            Sleep(2000);
#else
            usleep(2000 * 1000);
#endif
        }

    } while(true);
}

What to do next

You can query messages and the traces of the messages to verify whether the messages are consumed. For more information, see Query a message and Query a message trace.