All Products
Search
Document Center

ApsaraMQ for RocketMQ:Send and receive normal messages

Last Updated:Mar 11, 2026

Normal messages are the basic message type in ApsaraMQ for RocketMQ. Unlike scheduled, delayed, ordered, and transactional messages, normal messages have no special delivery semantics.

The examples on this page use the HTTP client SDK for Node.js (@aliyunmq/mq-http-sdk) to publish and consume normal messages.

Before you begin

  1. Install the SDK -- Set up the Node.js development environment and install the @aliyunmq/mq-http-sdk package. For more information, see Prepare the environment.

  2. Create resources -- Create an instance, a topic, and a consumer group in the ApsaraMQ for RocketMQ console. For more information, see Create resources.

  3. Configure credentials -- Obtain an AccessKey pair for your Alibaba Cloud account and export the values as environment variables. For more information, see Create an AccessKey pair.

       export ALIBABA_CLOUD_ACCESS_KEY_ID=<your-access-key-id>
       export ALIBABA_CLOUD_ACCESS_KEY_SECRET=<your-access-key-secret>

Placeholders

Replace these placeholders in the sample code with your actual values:

PlaceholderDescriptionWhere to find it
<http-endpoint>HTTP access endpoint of your instanceInstance Details page > HTTP Endpoint in the ApsaraMQ for RocketMQ console
<topic>Topic nameTopics page in the ApsaraMQ for RocketMQ console
<instance-id>Instance ID. Set to null or "" if the instance has no namespace.Instance Details page in the ApsaraMQ for RocketMQ console
<group-id>Consumer group ID (consumer only)Groups page in the ApsaraMQ for RocketMQ console

Send normal messages

  1. Create a file named producer.js and add the following code:

       const {
         MQClient,
         MessageProperties
       } = require('@aliyunmq/mq-http-sdk');
    
       // HTTP endpoint of your ApsaraMQ for RocketMQ instance.
       const endpoint = "<http-endpoint>";
       // Read credentials from environment variables.
       const accessKeyId = process.env['ALIBABA_CLOUD_ACCESS_KEY_ID'];
       const accessKeySecret = process.env['ALIBABA_CLOUD_ACCESS_KEY_SECRET'];
    
       const client = new MQClient(endpoint, accessKeyId, accessKeySecret);
    
       // Topic to publish to. Create this topic in the console first.
       const topic = "<topic>";
       // Instance ID. Set to null or "" if the instance has no namespace.
       const instanceId = "<instance-id>";
    
       const producer = client.getProducer(instanceId, topic);
    
       (async function () {
         try {
           for (let i = 0; i < 4; i++) {
             const msgProps = new MessageProperties();
             // Custom property.
             msgProps.putProperty("a", i);
             // Message key for tracing or deduplication.
             msgProps.messageKey("MessageKey");
    
             // Publish the message with a body, tag, and properties.
             const res = await producer.publishMessage("hello mq.", "TagA", msgProps);
             console.log("Publish message: MessageID:%s, BodyMD5:%s",
               res.body.MessageId, res.body.MessageBodyMD5);
           }
         } catch (e) {
           // Handle failures: retry or persist the message for later redelivery.
           console.log(e);
         }
       })();
  2. Run the producer:

       node producer.js
  3. Verify the output. A successful run prints four lines similar to:

       Publish message: MessageID:7F00000100246A4F0F2906B5BE4F0000, BodyMD5:4E69B245B8D0E47E5BE0B8520E8E608F
       Publish message: MessageID:7F00000100246A4F0F2906B5BE650001, BodyMD5:4E69B245B8D0E47E5BE0B8520E8E608F
       Publish message: MessageID:7F00000100246A4F0F2906B5BE780002, BodyMD5:4E69B245B8D0E47E5BE0B8520E8E608F
       Publish message: MessageID:7F00000100246A4F0F2906B5BE890003, BodyMD5:4E69B245B8D0E47E5BE0B8520E8E608F

Receive and acknowledge normal messages

The consumer uses long polling: if no message is available, the request blocks on the broker for a configurable number of seconds. If a message arrives within that window, the broker responds immediately.

  1. Create a file named consumer.js and add the following code:

       const {
         MQClient
       } = require('@aliyunmq/mq-http-sdk');
    
       // HTTP endpoint of your ApsaraMQ for RocketMQ instance.
       const endpoint = "<http-endpoint>";
       // Read credentials from environment variables.
       const accessKeyId = process.env['ALIBABA_CLOUD_ACCESS_KEY_ID'];
       const accessKeySecret = process.env['ALIBABA_CLOUD_ACCESS_KEY_SECRET'];
    
       const client = new MQClient(endpoint, accessKeyId, accessKeySecret);
    
       // Topic to consume from.
       const topic = "<topic>";
       // Consumer group ID. Create this in the console first.
       const groupId = "<group-id>";
       // Instance ID. Set to null or "" if the instance has no namespace.
       const instanceId = "<instance-id>";
    
       const consumer = client.getConsumer(instanceId, topic, groupId);
    
       (async function () {
         while (true) {
           try {
             // consumeMessage(batchSize, pollingSeconds)
             //   batchSize:      max messages per request (1-16)
             //   pollingSeconds: long-polling timeout in seconds (max 30)
             const res = await consumer.consumeMessage(3, 3);
    
             if (res.code === 200) {
               console.log("Consume messages, requestId: %s", res.requestId);
    
               const handles = res.body.map((message) => {
                 console.log(
                   "\tMessageId:%s, Tag:%s, PublishTime:%d, NextConsumeTime:%d, " +
                   "FirstConsumeTime:%d, ConsumedTimes:%d, Body:%s, Props:%j, MessageKey:%s, Prop-A:%s",
                   message.MessageId,
                   message.MessageTag,
                   message.PublishTime,
                   message.NextConsumeTime,
                   message.FirstConsumeTime,
                   message.ConsumedTimes,
                   message.MessageBody,
                   message.Properties,
                   message.MessageKey,
                   message.Properties.a
                 );
                 return message.ReceiptHandle;
               });
    
               // Acknowledge consumed messages. If the broker does not receive
               // an ACK before NextConsumeTime, the message is delivered again.
               const ackRes = await consumer.ackMessage(handles);
               if (ackRes.code !== 204) {
                 // If the handle of the message times out, the broker fails to receive an ACK.
                 console.log("Ack failed:");
                 const failHandles = ackRes.body.map((error) => {
                   console.log(
                     "\tErrorHandle:%s, Code:%s, Reason:%s",
                     error.ReceiptHandle, error.ErrorCode, error.ErrorMessage
                   );
                   return error.ReceiptHandle;
                 });
                 handles.forEach((handle) => {
                   if (failHandles.indexOf(handle) < 0) {
                     console.log("\tSucHandle:%s", handle);
                   }
                 });
               } else {
                 console.log("Ack succeeded, requestId: %s\n\t", ackRes.requestId, handles.join(','));
               }
             }
           } catch (e) {
             if (e.Code && e.Code.indexOf("MessageNotExist") > -1) {
               // No messages available -- long polling continues on next iteration.
               console.log("No new messages. requestId: %s, Code: %s", e.RequestId, e.Code);
             } else {
               console.log(e);
             }
           }
         }
       })();
  2. Run the consumer:

       node consumer.js
  3. Verify the output. When messages are available, the consumer prints:

       Consume messages, requestId: B1A341B6F2A6XXXX
       	MessageId:7F00000100246A4F0F2906B5BE4F0000, Tag:TagA, PublishTime:1620000000000, NextConsumeTime:1620000030000, FirstConsumeTime:1620000000000, ConsumedTimes:1, Body:hello mq., Props:{"a":"0"}, MessageKey:MessageKey
       Ack succeeded, requestId: B1A341B6F2A6XXXX

    When no messages are available, it prints:

       No new messages. requestId: B1A341B6F2A6XXXX, Code: MessageNotExist

Key API methods

MethodDescription
new MQClient(endpoint, accessKeyId, accessKeySecret)Creates an HTTP client connected to the specified endpoint.
client.getProducer(instanceId, topic)Returns a producer bound to the given instance and topic.
client.getConsumer(instanceId, topic, groupId)Returns a consumer bound to the given instance, topic, and consumer group.
producer.publishMessage(body, tag, properties)Publishes a message with the specified body, tag, and optional properties. The response contains MessageId and MessageBodyMD5.
consumer.consumeMessage(batchSize, pollingSeconds)Fetches up to batchSize messages (max 16). Blocks for up to pollingSeconds seconds (max 30) if no messages are available.
consumer.ackMessage(handles)Acknowledges messages by their receipt handles. Returns status code 204 on success.

Message acknowledgment

After processing a message, acknowledge it by calling ackMessage with the message's receipt handle. If the broker does not receive an acknowledgment before the NextConsumeTime deadline, the message is re-delivered.

Each time a message is consumed, the broker assigns a fresh receipt handle with a unique timestamp. Always use the latest handle when acknowledging.

Common ACK failures:

ScenarioSymptomResolution
Handle expiredACK returns an error with an expired-handle error codeProcess and acknowledge messages faster to avoid handle expiration.
Network timeoutACK call throws an exceptionRetry the ACK with the same handle before it expires.
Duplicate deliveryConsumedTimes on the message is greater than 1Design idempotent message processing to handle re-deliveries safely.