Normal messages are the basic message type in ApsaraMQ for RocketMQ. Unlike scheduled, delayed, ordered, and transactional messages, normal messages have no special delivery semantics.
The examples on this page use the HTTP client SDK for Node.js (@aliyunmq/mq-http-sdk) to publish and consume normal messages.
Before you begin
Install the SDK -- Set up the Node.js development environment and install the
@aliyunmq/mq-http-sdkpackage. For more information, see Prepare the environment.Create resources -- Create an instance, a topic, and a consumer group in the ApsaraMQ for RocketMQ console. For more information, see Create resources.
Configure credentials -- Obtain an AccessKey pair for your Alibaba Cloud account and export the values as environment variables. For more information, see Create an AccessKey pair.
export ALIBABA_CLOUD_ACCESS_KEY_ID=<your-access-key-id> export ALIBABA_CLOUD_ACCESS_KEY_SECRET=<your-access-key-secret>
Placeholders
Replace these placeholders in the sample code with your actual values:
| Placeholder | Description | Where to find it |
|---|---|---|
<http-endpoint> | HTTP access endpoint of your instance | Instance Details page > HTTP Endpoint in the ApsaraMQ for RocketMQ console |
<topic> | Topic name | Topics page in the ApsaraMQ for RocketMQ console |
<instance-id> | Instance ID. Set to null or "" if the instance has no namespace. | Instance Details page in the ApsaraMQ for RocketMQ console |
<group-id> | Consumer group ID (consumer only) | Groups page in the ApsaraMQ for RocketMQ console |
Send normal messages
Create a file named
producer.jsand add the following code:const { MQClient, MessageProperties } = require('@aliyunmq/mq-http-sdk'); // HTTP endpoint of your ApsaraMQ for RocketMQ instance. const endpoint = "<http-endpoint>"; // Read credentials from environment variables. const accessKeyId = process.env['ALIBABA_CLOUD_ACCESS_KEY_ID']; const accessKeySecret = process.env['ALIBABA_CLOUD_ACCESS_KEY_SECRET']; const client = new MQClient(endpoint, accessKeyId, accessKeySecret); // Topic to publish to. Create this topic in the console first. const topic = "<topic>"; // Instance ID. Set to null or "" if the instance has no namespace. const instanceId = "<instance-id>"; const producer = client.getProducer(instanceId, topic); (async function () { try { for (let i = 0; i < 4; i++) { const msgProps = new MessageProperties(); // Custom property. msgProps.putProperty("a", i); // Message key for tracing or deduplication. msgProps.messageKey("MessageKey"); // Publish the message with a body, tag, and properties. const res = await producer.publishMessage("hello mq.", "TagA", msgProps); console.log("Publish message: MessageID:%s, BodyMD5:%s", res.body.MessageId, res.body.MessageBodyMD5); } } catch (e) { // Handle failures: retry or persist the message for later redelivery. console.log(e); } })();Run the producer:
node producer.jsVerify the output. A successful run prints four lines similar to:
Publish message: MessageID:7F00000100246A4F0F2906B5BE4F0000, BodyMD5:4E69B245B8D0E47E5BE0B8520E8E608F Publish message: MessageID:7F00000100246A4F0F2906B5BE650001, BodyMD5:4E69B245B8D0E47E5BE0B8520E8E608F Publish message: MessageID:7F00000100246A4F0F2906B5BE780002, BodyMD5:4E69B245B8D0E47E5BE0B8520E8E608F Publish message: MessageID:7F00000100246A4F0F2906B5BE890003, BodyMD5:4E69B245B8D0E47E5BE0B8520E8E608F
Receive and acknowledge normal messages
The consumer uses long polling: if no message is available, the request blocks on the broker for a configurable number of seconds. If a message arrives within that window, the broker responds immediately.
Create a file named
consumer.jsand add the following code:const { MQClient } = require('@aliyunmq/mq-http-sdk'); // HTTP endpoint of your ApsaraMQ for RocketMQ instance. const endpoint = "<http-endpoint>"; // Read credentials from environment variables. const accessKeyId = process.env['ALIBABA_CLOUD_ACCESS_KEY_ID']; const accessKeySecret = process.env['ALIBABA_CLOUD_ACCESS_KEY_SECRET']; const client = new MQClient(endpoint, accessKeyId, accessKeySecret); // Topic to consume from. const topic = "<topic>"; // Consumer group ID. Create this in the console first. const groupId = "<group-id>"; // Instance ID. Set to null or "" if the instance has no namespace. const instanceId = "<instance-id>"; const consumer = client.getConsumer(instanceId, topic, groupId); (async function () { while (true) { try { // consumeMessage(batchSize, pollingSeconds) // batchSize: max messages per request (1-16) // pollingSeconds: long-polling timeout in seconds (max 30) const res = await consumer.consumeMessage(3, 3); if (res.code === 200) { console.log("Consume messages, requestId: %s", res.requestId); const handles = res.body.map((message) => { console.log( "\tMessageId:%s, Tag:%s, PublishTime:%d, NextConsumeTime:%d, " + "FirstConsumeTime:%d, ConsumedTimes:%d, Body:%s, Props:%j, MessageKey:%s, Prop-A:%s", message.MessageId, message.MessageTag, message.PublishTime, message.NextConsumeTime, message.FirstConsumeTime, message.ConsumedTimes, message.MessageBody, message.Properties, message.MessageKey, message.Properties.a ); return message.ReceiptHandle; }); // Acknowledge consumed messages. If the broker does not receive // an ACK before NextConsumeTime, the message is delivered again. const ackRes = await consumer.ackMessage(handles); if (ackRes.code !== 204) { // If the handle of the message times out, the broker fails to receive an ACK. console.log("Ack failed:"); const failHandles = ackRes.body.map((error) => { console.log( "\tErrorHandle:%s, Code:%s, Reason:%s", error.ReceiptHandle, error.ErrorCode, error.ErrorMessage ); return error.ReceiptHandle; }); handles.forEach((handle) => { if (failHandles.indexOf(handle) < 0) { console.log("\tSucHandle:%s", handle); } }); } else { console.log("Ack succeeded, requestId: %s\n\t", ackRes.requestId, handles.join(',')); } } } catch (e) { if (e.Code && e.Code.indexOf("MessageNotExist") > -1) { // No messages available -- long polling continues on next iteration. console.log("No new messages. requestId: %s, Code: %s", e.RequestId, e.Code); } else { console.log(e); } } } })();Run the consumer:
node consumer.jsVerify the output. When messages are available, the consumer prints:
Consume messages, requestId: B1A341B6F2A6XXXX MessageId:7F00000100246A4F0F2906B5BE4F0000, Tag:TagA, PublishTime:1620000000000, NextConsumeTime:1620000030000, FirstConsumeTime:1620000000000, ConsumedTimes:1, Body:hello mq., Props:{"a":"0"}, MessageKey:MessageKey Ack succeeded, requestId: B1A341B6F2A6XXXXWhen no messages are available, it prints:
No new messages. requestId: B1A341B6F2A6XXXX, Code: MessageNotExist
Key API methods
| Method | Description |
|---|---|
new MQClient(endpoint, accessKeyId, accessKeySecret) | Creates an HTTP client connected to the specified endpoint. |
client.getProducer(instanceId, topic) | Returns a producer bound to the given instance and topic. |
client.getConsumer(instanceId, topic, groupId) | Returns a consumer bound to the given instance, topic, and consumer group. |
producer.publishMessage(body, tag, properties) | Publishes a message with the specified body, tag, and optional properties. The response contains MessageId and MessageBodyMD5. |
consumer.consumeMessage(batchSize, pollingSeconds) | Fetches up to batchSize messages (max 16). Blocks for up to pollingSeconds seconds (max 30) if no messages are available. |
consumer.ackMessage(handles) | Acknowledges messages by their receipt handles. Returns status code 204 on success. |
Message acknowledgment
After processing a message, acknowledge it by calling ackMessage with the message's receipt handle. If the broker does not receive an acknowledgment before the NextConsumeTime deadline, the message is re-delivered.
Each time a message is consumed, the broker assigns a fresh receipt handle with a unique timestamp. Always use the latest handle when acknowledging.
Common ACK failures:
| Scenario | Symptom | Resolution |
|---|---|---|
| Handle expired | ACK returns an error with an expired-handle error code | Process and acknowledge messages faster to avoid handle expiration. |
| Network timeout | ACK call throws an exception | Retry the ACK with the same handle before it expires. |
| Duplicate delivery | ConsumedTimes on the message is greater than 1 | Design idempotent message processing to handle re-deliveries safely. |