All Products
Search
Document Center

ApsaraMQ for RocketMQ:Send and receive normal messages

Last Updated:Mar 10, 2026

This topic provides Java examples for sending and receiving normal messages through the ApsaraMQ for RocketMQ HTTP client SDK. Normal messages are the default message type -- they carry no special delivery semantics, unlike scheduled, delayed, ordered, and transactional messages, and are delivered to consumers as quickly as possible.

Prerequisites

Before you begin, make sure that you have:

  • Installed the Java SDK. For more information, see Prepare the environment

  • Created the required resources in the ApsaraMQ for RocketMQ console: an instance, a topic (set to the Normal message type), and a consumer group. For more information, see Create resources

  • Obtained your AccessKey pair (AccessKey ID and AccessKey secret). For more information, see Create an AccessKey pair

How it works

Both examples follow the same workflow:

  1. Create an MQClient with the HTTP endpoint and AccessKey credentials.

  2. Bind a producer or consumer to a specific instance and topic.

  3. Send or poll for messages, then close the client when finished.

Each topic supports only one message type. A topic created for normal messages cannot send or receive scheduled, delayed, ordered, or transactional messages.

Send a message

The following example sends four normal messages to a topic in a loop using synchronous publishing. If publishMessage returns without throwing an exception, the message was sent successfully.

Replace the placeholders with your actual values:

PlaceholderDescriptionWhere to find it
<your-http-endpoint>HTTP endpoint of the instanceInstance Details page > HTTP Endpoint section
<your-topic>Topic nameApsaraMQ for RocketMQ console
<your-instance-id>Instance ID. Set to null or "" if the instance has no namespaceInstance Details page
import com.aliyun.mq.http.MQClient;
import com.aliyun.mq.http.MQProducer;
import com.aliyun.mq.http.model.TopicMessage;

import java.util.Date;

public class Producer {

    public static void main(String[] args) {
        MQClient mqClient = new MQClient(
                "<your-http-endpoint>",
                System.getenv("ALIBABA_CLOUD_ACCESS_KEY_ID"),
                System.getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET")
        );

        final String topic = "<your-topic>";
        final String instanceId = "<your-instance-id>";

        // Get a producer for this topic and instance.
        MQProducer producer;
        if (instanceId != null && instanceId != "") {
            producer = mqClient.getProducer(instanceId, topic);
        } else {
            producer = mqClient.getProducer(topic);
        }

        try {
            for (int i = 0; i < 4; i++) {
                TopicMessage pubMsg = new TopicMessage(
                        "hello mq!".getBytes(),  // Message body
                        "A"                       // Message tag for filtering
                );
                // Custom property
                pubMsg.getProperties().put("a", String.valueOf(i));
                // Business key for message tracing (use an order ID, user ID, or similar identifier)
                pubMsg.setMessageKey("MessageKey");

                // Publish synchronously. No exception means success.
                TopicMessage pubResultMsg = producer.publishMessage(pubMsg);

                System.out.println(new Date() + " Send mq message success. Topic is:" + topic
                        + ", msgId is: " + pubResultMsg.getMessageId()
                        + ", bodyMD5 is: " + pubResultMsg.getMessageBodyMD5());
            }
        } catch (Throwable e) {
            System.out.println(new Date() + " Send mq message failed. Topic is:" + topic);
            e.printStackTrace();
        }

        mqClient.close();
    }
}

Key points:

  • Credentials from environment variables: ALIBABA_CLOUD_ACCESS_KEY_ID and ALIBABA_CLOUD_ACCESS_KEY_SECRET are read from environment variables. Set these variables before you run the code.

  • Message tag: Tags let subscribers filter messages within a topic. In this example, the tag is "A".

  • Message key: Set the message key to a business identifier such as an order ID or user ID. This makes it easier to query and trace messages in the ApsaraMQ for RocketMQ console.

Receive and acknowledge messages

The following example consumes messages from a topic in a long-polling loop. After processing each batch, it sends an acknowledgment (ACK) back to the broker. If the broker does not receive an ACK before the delivery retry interval elapses, it redelivers the message.

Replace the placeholders with your actual values:

PlaceholderDescriptionWhere to find it
<your-http-endpoint>HTTP endpoint of the instanceInstance Details page > HTTP Endpoint section
<your-topic>Topic nameApsaraMQ for RocketMQ console
<your-instance-id>Instance ID. Set to null or "" if the instance has no namespaceInstance Details page
<your-group-id>Consumer group IDApsaraMQ for RocketMQ console
import com.aliyun.mq.http.MQClient;
import com.aliyun.mq.http.MQConsumer;
import com.aliyun.mq.http.common.AckMessageException;
import com.aliyun.mq.http.model.Message;

import java.util.ArrayList;
import java.util.List;

public class Consumer {

    public static void main(String[] args) {
        MQClient mqClient = new MQClient(
                "<your-http-endpoint>",
                System.getenv("ALIBABA_CLOUD_ACCESS_KEY_ID"),
                System.getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET")
        );

        final String topic = "<your-topic>";
        final String groupId = "<your-group-id>";
        final String instanceId = "<your-instance-id>";

        // Get a consumer for this topic, group, and instance.
        final MQConsumer consumer;
        if (instanceId != null && instanceId != "") {
            consumer = mqClient.getConsumer(instanceId, topic, groupId, null);
        } else {
            consumer = mqClient.getConsumer(topic, groupId);
        }

        // Poll for messages in a loop.
        // In production, use multiple threads for concurrent consumption.
        do {
            List<Message> messages = null;

            try {
                // Long polling: if no message is available, the request hangs
                // on the broker for up to the specified number of seconds.
                messages = consumer.consumeMessage(
                        3,  // Max messages per batch (up to 16)
                        3   // Long-polling timeout in seconds (up to 30)
                );
            } catch (Throwable e) {
                e.printStackTrace();
                try {
                    Thread.sleep(2000);
                } catch (InterruptedException e1) {
                    e1.printStackTrace();
                }
            }

            if (messages == null || messages.isEmpty()) {
                System.out.println(Thread.currentThread().getName() + ": no new message, continue!");
                continue;
            }

            // --- Process messages ---
            for (Message message : messages) {
                System.out.println("Receive message: " + message);
            }

            // --- Acknowledge messages ---
            // Each message has a unique receipt handle that expires after
            // the delivery retry interval. ACK before it expires to prevent
            // redelivery.
            {
                List<String> handles = new ArrayList<String>();
                for (Message message : messages) {
                    handles.add(message.getReceiptHandle());
                }

                try {
                    consumer.ackMessage(handles);
                } catch (Throwable e) {
                    if (e instanceof AckMessageException) {
                        AckMessageException errors = (AckMessageException) e;
                        System.out.println("Ack message fail, requestId is:" + errors.getRequestId()
                                + ", fail handles:");
                        if (errors.getErrorMessages() != null) {
                            for (String errorHandle : errors.getErrorMessages().keySet()) {
                                System.out.println("Handle:" + errorHandle
                                        + ", ErrorCode:" + errors.getErrorMessages().get(errorHandle).getErrorCode()
                                        + ", ErrorMsg:" + errors.getErrorMessages().get(errorHandle).getErrorMessage());
                            }
                        }
                        continue;
                    }
                    e.printStackTrace();
                }
            }
        } while (true);
    }
}

Key points:

  • Long polling: The consumer holds the connection open on the broker for the specified timeout (3 seconds in this example). If a message arrives during that window, the broker responds immediately instead of waiting for the next poll cycle. The maximum timeout is 30 seconds.

  • Batch size: Each consumeMessage call returns up to the specified number of messages (3 in this example, maximum 16).

  • Receipt handle: Every time a message is delivered, it gets a new receipt handle with a unique timestamp. Use this handle to ACK the message. If the handle expires before the ACK reaches the broker, the message is redelivered.

  • Concurrency: This example uses a single thread. For higher throughput, consume with multiple threads in production.

Usage notes

  • One type per topic: A topic created for normal messages cannot send or receive other message types (scheduled, delayed, ordered, or transactional). Create separate topics for each message type.

  • Set meaningful message keys: Use business identifiers (such as order IDs or user IDs) as the message key. This makes it faster to query and trace messages in the console.

  • Handle ACK failures gracefully: If ackMessage throws an AckMessageException, log the failed handles and error codes. The broker redelivers unacknowledged messages automatically, so avoid treating ACK failures as fatal errors.

  • Clean up resources: Call mqClient.close() when the producer or consumer is no longer needed to release HTTP connections.

What's next