ApsaraMQ for RocketMQ provides HTTP client SDKs for seven programming languages. Use these SDKs to send normal messages to a topic, consume them from a consumer group, and acknowledge delivery.
Topics created for normal messages cannot be used for other message types such as scheduled messages, delayed messages, ordered messages, or transactional messages. Create a separate topic for each message type.
Prerequisites
Before you begin, make sure that you have:
An ApsaraMQ for RocketMQ instance, a topic (message type: normal), and a group ID. For more information, see Create resources
An AccessKey pair (AccessKey ID and AccessKey secret) for authentication. For more information, see Create an AccessKey pair
Placeholders
Replace the following placeholders in all code examples with your actual values:
| Placeholder | Description | Example |
|---|---|---|
<your-http-endpoint> | HTTP endpoint of your instance | http://1234567890123456.mqrest.cn-hangzhou.aliyuncs.com |
<your-access-key-id> | AccessKey ID | LTAI5tXxx |
<your-access-key-secret> | AccessKey secret | xXxXxXx |
<your-topic> | Topic name | normal-topic-http |
<your-instance-id> | Instance ID | MQ_INST_1380xxx_BbXbx0Y4 |
<your-group-id> | Group ID (consumer ID) | GID_http_test |
Step 1: Install the SDK
Choose the SDK for your programming language and install it.
Java SDK
PHP SDK
Go SDK
Python SDK
Node.js SDK
C# SDK
C++ SDK
Java
For more information, see the Java SDK description and Release notes.
Go
go get github.com/aliyunmq/mq-http-go-sdkFor more information, see the Go SDK description and Release notes.
Python
pip install mq_http_sdkFor more information, see the Python SDK description and Release notes.
PHP
composer require aliyunmq/mq-http-sdkFor more information, see the PHP SDK description and Release notes.
Node.js
npm install @aliyunmq/mq-http-sdk --saveFor more information, see the Node.js SDK description and Release notes.
C\#
For more information, see the C# SDK description and Release notes.
C++
Download the SDK source and build it with CMake. For more information, see the C++ SDK description and Release notes.
Step 2: Send normal messages
Each producer example follows this pattern:
Create an
MQClientwith your HTTP endpoint and AccessKey pair.Get a producer for your instance and topic.
Publish messages in a loop, with optional delayed delivery.
The publishMessage call is synchronous. If no exception is thrown, the message was sent.
Java
import com.aliyun.mq.http.MQClient;
import com.aliyun.mq.http.MQProducer;
import com.aliyun.mq.http.model.TopicMessage;
import java.util.Date;
public class Producer {
public static void main(String[] args) {
MQClient mqClient = new MQClient(
"<your-http-endpoint>",
"<your-access-key-id>",
"<your-access-key-secret>"
);
final String topic = "<your-topic>";
final String instanceId = "<your-instance-id>";
// Get a producer for the specified instance and topic
MQProducer producer;
if (instanceId != null && instanceId != "") {
producer = mqClient.getProducer(instanceId, topic);
} else {
producer = mqClient.getProducer(topic);
}
try {
for (int i = 0; i < 4; i++) {
TopicMessage pubMsg;
if (i % 2 == 0) {
// Send a normal message with a tag, properties, and message key
pubMsg = new TopicMessage(
"hello mq!".getBytes(),
"A" // Message tag
);
pubMsg.getProperties().put("a", String.valueOf(i));
pubMsg.setMessageKey("MessageKey");
} else {
// Send a delayed message (delivered 10 seconds later)
pubMsg = new TopicMessage(
"hello mq!".getBytes(),
"A"
);
pubMsg.getProperties().put("a", String.valueOf(i));
pubMsg.setStartDeliverTime(System.currentTimeMillis() + 10 * 1000);
}
// publishMessage is synchronous. No exception means success.
TopicMessage pubResultMsg = producer.publishMessage(pubMsg);
System.out.println(new Date() + " Send mq message success. Topic is:" + topic
+ ", msgId is: " + pubResultMsg.getMessageId()
+ ", bodyMD5 is: " + pubResultMsg.getMessageBodyMD5());
}
} catch (Throwable e) {
// Handle send failure: retry or persist the message
System.out.println(new Date() + " Send mq message failed. Topic is:" + topic);
e.printStackTrace();
}
mqClient.close();
}
}Go
package main
import (
"fmt"
"time"
"strconv"
"github.com/aliyunmq/mq-http-go-sdk"
)
func main() {
endpoint := "<your-http-endpoint>"
accessKey := "<your-access-key-id>"
secretKey := "<your-access-key-secret>"
topic := "<your-topic>"
instanceId := "<your-instance-id>"
client := mq_http_sdk.NewAliyunMQClient(endpoint, accessKey, secretKey, "")
mqProducer := client.GetProducer(instanceId, topic)
for i := 0; i < 4; i++ {
var msg mq_http_sdk.PublishMessageRequest
if i%2 == 0 {
msg = mq_http_sdk.PublishMessageRequest{
MessageBody: "hello mq!",
MessageTag: "",
Properties: map[string]string{},
}
msg.MessageKey = "MessageKey"
msg.Properties["a"] = strconv.Itoa(i)
} else {
// Delayed message: delivered 10 seconds later
msg = mq_http_sdk.PublishMessageRequest{
MessageBody: "hello mq timer!",
MessageTag: "",
Properties: map[string]string{},
}
msg.Properties["a"] = strconv.Itoa(i)
// StartDeliverTime is a UNIX timestamp in milliseconds
msg.StartDeliverTime = time.Now().UTC().Unix()*1000 + 10*1000
}
ret, err := mqProducer.PublishMessage(msg)
if err != nil {
fmt.Println(err)
return
}
fmt.Printf("Publish ---->\n\tMessageId:%s, BodyMD5:%s, \n",
ret.MessageId, ret.MessageBodyMD5)
time.Sleep(time.Duration(100) * time.Millisecond)
}
}Python
#!/usr/bin/env python
# coding=utf8
import sys
import time
from mq_http_sdk.mq_exception import MQExceptionBase
from mq_http_sdk.mq_producer import *
from mq_http_sdk.mq_client import *
# Initialize the client
mq_client = MQClient(
"<your-http-endpoint>",
"<your-access-key-id>",
"<your-access-key-secret>"
)
topic_name = "<your-topic>"
instance_id = "<your-instance-id>"
producer = mq_client.get_producer(instance_id, topic_name)
# Send 4 messages
msg_count = 4
print("%sPublish Message To %s\nTopicName:%s\nMessageCount:%s\n"
% (10 * "=", 10 * "=", topic_name, msg_count))
try:
for i in range(msg_count):
if i % 2 == 0:
# Normal message with tag, properties, and message key
msg = TopicMessage(
"I am test message %s. Hello" % i,
"" # Message tag
)
msg.put_property("a", "i")
msg.set_message_key("MessageKey")
re_msg = producer.publish_message(msg)
print("Publish Message Succeed. MessageID:%s, BodyMD5:%s"
% (re_msg.message_id, re_msg.message_body_md5))
else:
# Delayed message: delivered 5 seconds later
msg = TopicMessage(
"I am test message %s." % i,
""
)
msg.put_property("a", i)
# Absolute time in milliseconds
msg.set_start_deliver_time(int(round(time.time() * 1000)) + 5 * 1000)
re_msg = producer.publish_message(msg)
print("Publish Timer Message Succeed. MessageID:%s, BodyMD5:%s"
% (re_msg.message_id, re_msg.message_body_md5))
time.sleep(1)
except MQExceptionBase as e:
if e.type == "TopicNotExist":
print("Topic not exist, please create it.")
sys.exit(1)
print("Publish Message Fail. Exception:%s" % e)PHP
<?php
require "vendor/autoload.php";
use MQ\Model\TopicMessage;
use MQ\MQClient;
class ProducerTest
{
private $client;
private $producer;
public function __construct()
{
$this->client = new MQClient(
"<your-http-endpoint>",
"<your-access-key-id>",
"<your-access-key-secret>"
);
$topic = "<your-topic>";
$instanceId = "<your-instance-id>";
$this->producer = $this->client->getProducer($instanceId, $topic);
}
public function run()
{
try
{
for ($i=1; $i<=4; $i++)
{
$publishMessage = new TopicMessage(
"xxxxxxxx" // Message body
);
$publishMessage->putProperty("a", $i);
$publishMessage->setMessageKey("MessageKey");
if ($i % 2 == 0) {
// Delayed message: delivered 10 seconds later
$publishMessage->setStartDeliverTime(time() * 1000 + 10 * 1000);
}
$result = $this->producer->publishMessage($publishMessage);
print "Send mq message success. msgId is:" . $result->getMessageId()
. ", bodyMD5 is:" . $result->getMessageBodyMD5() . "\n";
}
} catch (\Exception $e) {
print_r($e->getMessage() . "\n");
}
}
}
$instance = new ProducerTest();
$instance->run();
?>Node.js
const {
MQClient,
MessageProperties
} = require('@aliyunmq/mq-http-sdk');
const endpoint = "<your-http-endpoint>";
const accessKeyId = "<your-access-key-id>";
const accessKeySecret = "<your-access-key-secret>";
var client = new MQClient(endpoint, accessKeyId, accessKeySecret);
const topic = "<your-topic>";
const instanceId = "<your-instance-id>";
const producer = client.getProducer(instanceId, topic);
(async function(){
try {
for(var i = 0; i < 4; i++) {
let res;
if (i % 2 == 0) {
// Normal message with properties and message key
msgProps = new MessageProperties();
msgProps.putProperty("a", i);
msgProps.messageKey("MessageKey");
res = await producer.publishMessage("hello mq.", "", msgProps);
} else {
// Delayed message: delivered 10 seconds later
msgProps = new MessageProperties();
msgProps.putProperty("a", i);
msgProps.startDeliverTime(Date.now() + 10 * 1000);
res = await producer.publishMessage("hello mq. timer msg!", "TagA", msgProps);
}
console.log("Publish message: MessageID:%s,BodyMD5:%s",
res.body.MessageId, res.body.MessageBodyMD5);
}
} catch(e) {
// Handle send failure: retry or persist the message
console.log(e)
}
})();C++
//#include <iostream>
#include <fstream>
#include <time.h>
#include "mq_http_sdk/mq_client.h"
using namespace std;
using namespace mq::http::sdk;
int main() {
MQClient mqClient(
"<your-http-endpoint>",
"<your-access-key-id>",
"<your-access-key-secret>"
);
string topic = "<your-topic>";
string instanceId = "<your-instance-id>";
MQProducerPtr producer;
if (instanceId == "") {
producer = mqClient.getProducerRef(topic);
} else {
producer = mqClient.getProducerRef(instanceId, topic);
}
try {
for (int i = 0; i < 4; i++)
{
PublishMessageResponse pmResp;
if (i % 4 == 0) {
// Message with body only
producer->publishMessage("Hello, mq!", pmResp);
} else if (i % 4 == 1) {
// Message with body and tag
producer->publishMessage("Hello, mq!have tag!", "tag", pmResp);
} else if (i % 4 == 2) {
// Message with body, tag, properties, and key
TopicMessage pubMsg("Hello, mq!have key!");
pubMsg.putProperty("a",std::to_string(i));
pubMsg.setMessageKey("MessageKey" + std::to_string(i));
producer->publishMessage(pubMsg, pmResp);
} else {
// Delayed message: delivered 10 seconds later
// StartDeliverTime is an absolute time in milliseconds
TopicMessage pubMsg("Hello, mq!timer msg!", "tag");
pubMsg.setStartDeliverTime(time(NULL) * 1000 + 10 * 1000);
pubMsg.putProperty("b",std::to_string(i));
pubMsg.putProperty("c",std::to_string(i));
producer->publishMessage(pubMsg, pmResp);
}
cout << "Publish mq message success. Topic is: " << topic
<< ", msgId is:" << pmResp.getMessageId()
<< ", bodyMD5 is:" << pmResp.getMessageBodyMD5() << endl;
}
} catch (MQServerException& me) {
cout << "Request Failed: " + me.GetErrorCode()
<< ", requestId is:" << me.GetRequestId() << endl;
return -1;
} catch (MQExceptionBase& mb) {
cout << "Request Failed: " + mb.ToString() << endl;
return -2;
}
return 0;
}C\#
using System;
using System.Collections.Generic;
using System.Threading;
using Aliyun.MQ.Model;
using Aliyun.MQ.Model.Exp;
using Aliyun.MQ.Util;
namespace Aliyun.MQ.Sample
{
public class ProducerSample
{
private const string _endpoint = "<your-http-endpoint>";
private const string _accessKeyId = "<your-access-key-id>";
private const string _secretAccessKey = "<your-access-key-secret>";
private const string _topicName = "<your-topic>";
private const string _instanceId = "<your-instance-id>";
private static MQClient _client = new Aliyun.MQ.MQClient(
_accessKeyId, _secretAccessKey, _endpoint);
static MQProducer producer = _client.GetProducer(_instanceId, _topicName);
static void Main(string[] args)
{
try
{
for (int i = 0; i < 4; i++)
{
TopicMessage sendMsg;
if (i % 2 == 0)
{
// Normal message with properties and message key
sendMsg = new TopicMessage("dfadfadfadf");
sendMsg.PutProperty("a", i.ToString());
sendMsg.MessageKey = "MessageKey";
}
else
{
// Delayed message: delivered 10 seconds later
sendMsg = new TopicMessage("dfadfadfadf", "tag");
sendMsg.PutProperty("a", i.ToString());
sendMsg.StartDeliverTime = AliyunSDKUtils.GetNowTimeStamp()
+ 10 * 1000;
}
TopicMessage result = producer.PublishMessage(sendMsg);
Console.WriteLine("publis message success:" + result);
}
}
catch (Exception ex)
{
Console.Write(ex);
}
}
}
}To send messages from the console, log on to the ApsaraMQ for RocketMQ console, find your instance, and choose More > Quick Start in the Actions column.
Step 3: Consume normal messages
After messages are sent, start a consumer to receive and process them. Each consumer example follows this pattern:
Create an
MQClientwith your HTTP endpoint and AccessKey pair.Get a consumer for your instance, topic, and group ID.
Poll for messages in a loop using long polling.
Process each message, then acknowledge it by sending the receipt handle back to the broker.
Long polling keeps the connection open for a specified duration (up to 30 seconds). If a message arrives during that window, the broker responds immediately rather than waiting for the timeout.
If the broker does not receive an acknowledgment (ACK) before the NextConsumeTime for a message, the message is delivered again. Each delivery generates a new receipt handle.
| Parameter | Description | Limit |
|---|---|---|
| Batch size | Maximum messages per request | Up to 16 |
| Wait seconds | Long polling timeout | Up to 30 seconds |
Java
import com.aliyun.mq.http.MQClient;
import com.aliyun.mq.http.MQConsumer;
import com.aliyun.mq.http.common.AckMessageException;
import com.aliyun.mq.http.model.Message;
import java.util.ArrayList;
import java.util.List;
public class Consumer {
public static void main(String[] args) {
MQClient mqClient = new MQClient(
"<your-http-endpoint>",
"<your-access-key-id>",
"<your-access-key-secret>"
);
final String topic = "<your-topic>";
final String groupId = "<your-group-id>";
final String instanceId = "<your-instance-id>";
final MQConsumer consumer;
if (instanceId != null && instanceId != "") {
consumer = mqClient.getConsumer(instanceId, topic, groupId, null);
} else {
consumer = mqClient.getConsumer(topic, groupId);
}
// Use multiple threads for concurrent consumption in production
do {
List<Message> messages = null;
try {
// Long polling: wait up to 3 seconds for messages
messages = consumer.consumeMessage(
3, // Max messages per batch (up to 16)
3 // Long polling timeout in seconds (up to 30)
);
} catch (Throwable e) {
e.printStackTrace();
try {
Thread.sleep(2000);
} catch (InterruptedException e1) {
e1.printStackTrace();
}
}
if (messages == null || messages.isEmpty()) {
System.out.println(Thread.currentThread().getName()
+ ": no new message, continue!");
continue;
}
// Process messages
for (Message message : messages) {
System.out.println("Receive message: " + message);
}
// Acknowledge messages to prevent redelivery
{
List<String> handles = new ArrayList<String>();
for (Message message : messages) {
handles.add(message.getReceiptHandle());
}
try {
consumer.ackMessage(handles);
} catch (Throwable e) {
if (e instanceof AckMessageException) {
AckMessageException errors = (AckMessageException) e;
System.out.println("Ack message fail, requestId is:"
+ errors.getRequestId() + ", fail handles:");
if (errors.getErrorMessages() != null) {
for (String errorHandle :
errors.getErrorMessages().keySet()) {
System.out.println("Handle:" + errorHandle
+ ", ErrorCode:" + errors.getErrorMessages()
.get(errorHandle).getErrorCode()
+ ", ErrorMsg:" + errors.getErrorMessages()
.get(errorHandle).getErrorMessage());
}
}
continue;
}
e.printStackTrace();
}
}
} while (true);
}
}Go
package main
import (
"fmt"
"github.com/gogap/errors"
"strings"
"time"
"github.com/aliyunmq/mq-http-go-sdk"
)
func main() {
endpoint := "<your-http-endpoint>"
accessKey := "<your-access-key-id>"
secretKey := "<your-access-key-secret>"
topic := "<your-topic>"
instanceId := "<your-instance-id>"
groupId := "<your-group-id>"
client := mq_http_sdk.NewAliyunMQClient(endpoint, accessKey, secretKey, "")
mqConsumer := client.GetConsumer(instanceId, topic, groupId, "")
for {
endChan := make(chan int)
respChan := make(chan mq_http_sdk.ConsumeMessageResponse)
errChan := make(chan error)
go func() {
select {
case resp := <-respChan:
{
var handles []string
fmt.Printf("Consume %d messages---->\n", len(resp.Messages))
for _, v := range resp.Messages {
handles = append(handles, v.ReceiptHandle)
fmt.Printf("\tMessageID: %s, PublishTime: %d, MessageTag: %s\n"+
"\tConsumedTimes: %d, FirstConsumeTime: %d, NextConsumeTime: %d\n"+
"\tBody: %s\n"+
"\tProps: %s\n",
v.MessageId, v.PublishTime, v.MessageTag, v.ConsumedTimes,
v.FirstConsumeTime, v.NextConsumeTime, v.MessageBody,
v.Properties)
}
// Acknowledge processed messages
ackerr := mqConsumer.AckMessage(handles)
if ackerr != nil {
fmt.Println(ackerr)
for _, errAckItem := range ackerr.(errors.ErrCode).
Context()["Detail"].([]mq_http_sdk.ErrAckItem) {
fmt.Printf("\tErrorHandle:%s, ErrorCode:%s, ErrorMsg:%s\n",
errAckItem.ErrorHandle, errAckItem.ErrorCode,
errAckItem.ErrorMsg)
}
time.Sleep(time.Duration(3) * time.Second)
} else {
fmt.Printf("Ack ---->\n\t%s\n", handles)
}
endChan <- 1
}
case err := <-errChan:
{
if strings.Contains(err.(errors.ErrCode).Error(),
"MessageNotExist") {
fmt.Println("\nNo new message, continue!")
} else {
fmt.Println(err)
time.Sleep(time.Duration(3) * time.Second)
}
endChan <- 1
}
case <-time.After(35 * time.Second):
{
fmt.Println("Timeout of consumer message ??")
endChan <- 1
}
}
}()
// Long polling: wait up to 3 seconds for messages
mqConsumer.ConsumeMessage(respChan, errChan,
3, // Max messages per batch (up to 16)
3, // Long polling timeout in seconds (up to 30)
)
<-endChan
}
}Python
#!/usr/bin/env python
# coding=utf8
from mq_http_sdk.mq_exception import MQExceptionBase
from mq_http_sdk.mq_consumer import *
from mq_http_sdk.mq_client import *
# Initialize the client
mq_client = MQClient(
"<your-http-endpoint>",
"<your-access-key-id>",
"<your-access-key-secret>"
)
topic_name = "<your-topic>"
group_id = "<your-group-id>"
instance_id = "<your-instance-id>"
consumer = mq_client.get_consumer(instance_id, topic_name, group_id)
# Long polling: wait up to 3 seconds for messages
wait_seconds = 3
# Max messages per batch (up to 16)
batch = 3
print("%sConsume And Ack Message From Topic%s\nTopicName:%s\nMQConsumer:%s\nWaitSeconds:%s\n"
% (10 * "=", 10 * "=", topic_name, group_id, wait_seconds))
while True:
try:
recv_msgs = consumer.consume_message(batch, wait_seconds)
for msg in recv_msgs:
print("Receive, MessageId: %s\nMessageBodyMD5: %s \
\nMessageTag: %s\nConsumedTimes: %s \
\nPublishTime: %s\nBody: %s \
\nNextConsumeTime: %s \
\nReceiptHandle: %s"
% (msg.message_id, msg.message_body_md5,
msg.message_tag, msg.consumed_times,
msg.publish_time, msg.message_body,
msg.next_consume_time, msg.receipt_handle))
except MQExceptionBase as e:
if e.type == "MessageNotExist":
print("No new message! RequestId: %s" % e.req_id)
continue
print("Consume Message Fail! Exception:%s\n" % e)
time.sleep(2)
continue
# Acknowledge processed messages to prevent redelivery
try:
receipt_handle_list = [msg.receipt_handle for msg in recv_msgs]
consumer.ack_message(receipt_handle_list)
print("Ack %s Message Succeed.\n\n" % len(receipt_handle_list))
except MQExceptionBase as e:
print("\nAck Message Fail! Exception:%s" % e)
if e.sub_errors:
for sub_error in e.sub_errors:
print("\tErrorHandle:%s,ErrorCode:%s,ErrorMsg:%s"
% (sub_error["ReceiptHandle"],
sub_error["ErrorCode"],
sub_error["ErrorMessage"]))PHP
<?php
require "vendor/autoload.php";
use MQ\Model\TopicMessage;
use MQ\MQClient;
class ConsumerTest
{
private $client;
private $consumer;
public function __construct()
{
$this->client = new MQClient(
"<your-http-endpoint>",
"<your-access-key-id>",
"<your-access-key-secret>"
);
$topic = "<your-topic>";
$groupId = "<your-group-id>";
$instanceId = "<your-instance-id>";
$this->consumer = $this->client->getConsumer($instanceId, $topic, $groupId);
}
public function run()
{
// Use multiple threads for concurrent consumption in production
while (True) {
try {
// Long polling: wait up to 3 seconds for messages
$messages = $this->consumer->consumeMessage(
3, // Max messages per batch (up to 16)
3 // Long polling timeout in seconds (up to 30)
);
} catch (\Exception $e) {
if ($e instanceof MQ\Exception\MessageNotExistException) {
printf("No message, continue long polling!RequestId:%s\n",
$e->getRequestId());
continue;
}
print_r($e->getMessage() . "\n");
sleep(3);
continue;
}
print "consume finish, messages:\n";
// Process messages
$receiptHandles = array();
foreach ($messages as $message) {
$receiptHandles[] = $message->getReceiptHandle();
printf("MessageID:%s TAG:%s BODY:%s \nPublishTime:%d, "
. "FirstConsumeTime:%d, \nConsumedTimes:%d, "
. "NextConsumeTime:%d,MessageKey:%s\n",
$message->getMessageId(), $message->getMessageTag(),
$message->getMessageBody(),
$message->getPublishTime(), $message->getFirstConsumeTime(),
$message->getConsumedTimes(), $message->getNextConsumeTime(),
$message->getMessageKey());
print_r($message->getProperties());
}
// Acknowledge processed messages
print_r($receiptHandles);
try {
$this->consumer->ackMessage($receiptHandles);
} catch (\Exception $e) {
if ($e instanceof MQ\Exception\AckMessageException) {
printf("Ack Error, RequestId:%s\n", $e->getRequestId());
foreach ($e->getAckMessageErrorItems() as $errorItem) {
printf("\tReceiptHandle:%s, ErrorCode:%s, ErrorMsg:%s\n",
$errorItem->getReceiptHandle(),
$errorItem->getErrorCode(),
$errorItem->getErrorCode());
}
}
}
print "ack finish\n";
}
}
}
$instance = new ConsumerTest();
$instance->run();
?>Node.js
const {
MQClient
} = require('@aliyunmq/mq-http-sdk');
const endpoint = "<your-http-endpoint>";
const accessKeyId = "<your-access-key-id>";
const accessKeySecret = "<your-access-key-secret>";
var client = new MQClient(endpoint, accessKeyId, accessKeySecret);
const topic = "<your-topic>";
const groupId = "<your-group-id>";
const instanceId = "<your-instance-id>";
const consumer = client.getConsumer(instanceId, topic, groupId);
(async function(){
while(true) {
try {
// Long polling: wait up to 3 seconds for messages
res = await consumer.consumeMessage(
3, // Max messages per batch (up to 16)
3 // Long polling timeout in seconds (up to 30)
);
if (res.code == 200) {
console.log("Consume Messages, requestId:%s", res.requestId);
const handles = res.body.map((message) => {
console.log("\tMessageId:%s,Tag:%s,PublishTime:%d,NextConsumeTime:%d,"
+ "FirstConsumeTime:%d,ConsumedTimes:%d,Body:%s"
+ ",Props:%j,MessageKey:%s,Prop-A:%s",
message.MessageId, message.MessageTag, message.PublishTime,
message.NextConsumeTime, message.FirstConsumeTime,
message.ConsumedTimes,
message.MessageBody, message.Properties, message.MessageKey,
message.Properties.a);
return message.ReceiptHandle;
});
// Acknowledge processed messages
res = await consumer.ackMessage(handles);
if (res.code != 204) {
console.log("Ack Message Fail:");
const failHandles = res.body.map((error)=>{
console.log("\tErrorHandle:%s, Code:%s, Reason:%s\n",
error.ReceiptHandle, error.ErrorCode, error.ErrorMessage);
return error.ReceiptHandle;
});
handles.forEach((handle)=>{
if (failHandles.indexOf(handle) < 0) {
console.log("\tSucHandle:%s\n", handle);
}
});
} else {
console.log("Ack Message suc, RequestId:%s\n\t",
res.requestId, handles.join(','));
}
}
} catch(e) {
if (e.Code.indexOf("MessageNotExist") > -1) {
console.log("Consume Message: no new message, RequestId:%s, Code:%s",
e.RequestId, e.Code);
} else {
console.log(e);
}
}
}
})();C++
#include <vector>
#include <fstream>
#include "mq_http_sdk/mq_client.h"
#ifdef _WIN32
#include <windows.h>
#else
#include <unistd.h>
#endif
using namespace std;
using namespace mq::http::sdk;
int main() {
MQClient mqClient(
"<your-http-endpoint>",
"<your-access-key-id>",
"<your-access-key-secret>"
);
string topic = "<your-topic>";
string groupId = "<your-group-id>";
string instanceId = "<your-instance-id>";
MQConsumerPtr consumer;
if (instanceId == "") {
consumer = mqClient.getConsumerRef(topic, groupId);
} else {
consumer = mqClient.getConsumerRef(instanceId, topic, groupId, "");
}
do {
try {
std::vector<Message> messages;
// Long polling: wait up to 3 seconds for messages
consumer->consumeMessage(
3, // Max messages per batch (up to 16)
3, // Long polling timeout in seconds (up to 30)
messages
);
cout << "Consume: " << messages.size() << " Messages!" << endl;
// Process messages
std::vector<std::string> receiptHandles;
for (std::vector<Message>::iterator iter = messages.begin();
iter != messages.end(); ++iter)
{
cout << "MessageId: " << iter->getMessageId()
<< " PublishTime: " << iter->getPublishTime()
<< " Tag: " << iter->getMessageTag()
<< " Body: " << iter->getMessageBody()
<< " FirstConsumeTime: " << iter->getFirstConsumeTime()
<< " NextConsumeTime: " << iter->getNextConsumeTime()
<< " ConsumedTimes: " << iter->getConsumedTimes()
<< " Properties: " << iter->getPropertiesAsString()
<< " Key: " << iter->getMessageKey() << endl;
receiptHandles.push_back(iter->getReceiptHandle());
}
// Acknowledge processed messages
AckMessageResponse bdmResp;
consumer->ackMessage(receiptHandles, bdmResp);
if (!bdmResp.isSuccess()) {
const std::vector<AckMessageFailedItem>& failedItems =
bdmResp.getAckMessageFailedItem();
for (std::vector<AckMessageFailedItem>::const_iterator iter =
failedItems.begin();
iter != failedItems.end(); ++iter)
{
cout << "AckFailedItem: " << iter->errorCode
<< " " << iter->receiptHandle << endl;
}
} else {
cout << "Ack: " << messages.size() << " messages suc!" << endl;
}
} catch (MQServerException& me) {
if (me.GetErrorCode() == "MessageNotExist") {
cout << "No message to consume! RequestId: "
+ me.GetRequestId() << endl;
continue;
}
cout << "Request Failed: " + me.GetErrorCode()
+ ".RequestId: " + me.GetRequestId() << endl;
#ifdef _WIN32
Sleep(2000);
#else
usleep(2000 * 1000);
#endif
} catch (MQExceptionBase& mb) {
cout << "Request Failed: " + mb.ToString() << endl;
#ifdef _WIN32
Sleep(2000);
#else
usleep(2000 * 1000);
#endif
}
} while(true);
}C\#
using System;
using System.Collections.Generic;
using System.Threading;
using Aliyun.MQ.Model;
using Aliyun.MQ.Model.Exp;
using Aliyun.MQ;
namespace Aliyun.MQ.Sample
{
public class ConsumerSample
{
private const string _endpoint = "<your-http-endpoint>";
private const string _accessKeyId = "<your-access-key-id>";
private const string _secretAccessKey = "<your-access-key-secret>";
private const string _topicName = "<your-topic>";
private const string _instanceId = "<your-instance-id>";
private const string _groupId = "<your-group-id>";
private static MQClient _client = new Aliyun.MQ.MQClient(
_accessKeyId, _secretAccessKey, _endpoint);
static MQConsumer consumer = _client.GetConsumer(
_instanceId, _topicName, _groupId, null);
static void Main(string[] args)
{
// Use multiple threads for concurrent consumption in production
while (true)
{
try
{
List<Message> messages = null;
try
{
// Long polling: wait up to 3 seconds for messages
messages = consumer.ConsumeMessage(
3, // Max messages per batch (up to 16)
3 // Long polling timeout in seconds (up to 30)
);
}
catch (Exception exp1)
{
if (exp1 is MessageNotExistException)
{
Console.WriteLine(Thread.CurrentThread.Name
+ " No new message, "
+ ((MessageNotExistException)exp1).RequestId);
continue;
}
Console.WriteLine(exp1);
Thread.Sleep(2000);
}
if (messages == null)
{
continue;
}
List<string> handlers = new List<string>();
Console.WriteLine(Thread.CurrentThread.Name
+ " Receive Messages:");
// Process messages
foreach (Message message in messages)
{
Console.WriteLine(message);
Console.WriteLine("Property a is:"
+ message.GetProperty("a"));
handlers.Add(message.ReceiptHandle);
}
// Acknowledge processed messages
try
{
consumer.AckMessage(handlers);
Console.WriteLine("Ack message success:");
foreach (string handle in handlers)
{
Console.Write("\t" + handle);
}
Console.WriteLine();
}
catch (Exception exp2)
{
if (exp2 is AckMessageException)
{
AckMessageException ackExp =
(AckMessageException)exp2;
Console.WriteLine("Ack message fail, RequestId:"
+ ackExp.RequestId);
foreach (AckMessageErrorItem errorItem
in ackExp.ErrorItems)
{
Console.WriteLine("\tErrorHandle:"
+ errorItem.ReceiptHandle
+ ",ErrorCode:" + errorItem.ErrorCode
+ ",ErrorMsg:" + errorItem.ErrorMessage);
}
}
}
}
catch (Exception ex)
{
Console.WriteLine(ex);
Thread.Sleep(2000);
}
}
}
}
}Verify the result
After running the producer and consumer, verify message delivery:
Log on to the ApsaraMQ for RocketMQ console.
Find your instance and go to the message query page.
Search by topic, message ID, or message key to confirm the messages were sent.
Check the message trace to verify that the consumer received and acknowledged each message.
For more information, see Query messages and Query message traces.