All Products
Search
Document Center

ApsaraMQ for RocketMQ:Sample code for RocketMQ 3.x/4.x SDK for Java

Last Updated:Mar 11, 2026

ApsaraMQ for RocketMQ 5.x instances support clients built with the RocketMQ 3.x and 4.x SDKs. This page provides Java examples for sending and receiving normal, ordered, scheduled, delayed, and transactional messages using these legacy SDKs.

Important

The latest RocketMQ 5.x SDKs are fully compatible with ApsaraMQ for RocketMQ 5.x brokers and offer additional features. Use the 5.x SDKs for new projects. For details, see Release notes. The 3.x, 4.x, and TCP client SDKs are maintained for existing workloads only.

Prerequisites

Before you begin, make sure that you have:

  • An ApsaraMQ for RocketMQ 5.x instance

  • A topic and consumer group created in the ApsaraMQ for RocketMQ console

  • The RocketMQ 3.x or 4.x SDK for Java added to your project

  • The instance access point, in the format rmq-cn-XXXX.rmq.aliyuncs.com:8080 (available in the console)

For full setup instructions, see Preparations.

Shared configuration

All examples on this page share the same authentication and connection setup. The following blocks define the shared configuration so that each message-type example can focus on the sending or receiving logic.

Authentication

ApsaraMQ for RocketMQ supports two access methods. Choose based on your network environment:

  • Public endpoint -- Configure RPCHook with the instance username and password. Get these credentials from the Intelligent Authentication tab on the Access Control page in the ApsaraMQ for RocketMQ console. Do not use your Alibaba Cloud account AccessKey pair.

  • VPC endpoint -- No credentials required. When the client runs on an Elastic Compute Service (ECS) instance within a Virtual Private Cloud (VPC), the broker automatically obtains credentials from the VPC configuration.

For serverless instances, credentials are required for public endpoint access. If you enable the authentication-free in VPCs feature, VPC access does not require credentials.

RPCHook helper

import org.apache.rocketmq.acl.common.AclClientRPCHook;
import org.apache.rocketmq.acl.common.SessionCredentials;
import org.apache.rocketmq.remoting.RPCHook;

/**
 * Returns an RPCHook configured with instance credentials.
 * Required for public endpoint access and serverless instances over the Internet.
 * Not required for VPC access on standard instances.
 */
private static RPCHook getAclRPCHook() {
    return new AclClientRPCHook(new SessionCredentials(
        "<instance-username>",   // From the Intelligent Authentication tab
        "<instance-password>"    // From the Intelligent Authentication tab
    ));
}

Producer initialization

import org.apache.rocketmq.client.AccessChannel;
import org.apache.rocketmq.client.producer.DefaultMQProducer;

// --- Public endpoint: pass RPCHook ---
DefaultMQProducer producer = new DefaultMQProducer(getAclRPCHook());

// --- VPC endpoint: omit RPCHook ---
// DefaultMQProducer producer = new DefaultMQProducer();

// Group ID created in the console
producer.setProducerGroup("<your-group-id>");

// Required for message trace. Omit to disable tracing.
producer.setAccessChannel(AccessChannel.CLOUD);

// Access point from the console. Use the domain and port only -- no http:// prefix, no resolved IP.
producer.setNamesrvAddr("<your-access-point>");

producer.start();

Consumer initialization

import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;

// --- Public endpoint: pass RPCHook ---
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(getAclRPCHook());

// --- VPC endpoint: omit RPCHook ---
// DefaultMQPushConsumer consumer = new DefaultMQPushConsumer();

consumer.setConsumerGroup("<your-group-id>");
consumer.setAccessChannel(AccessChannel.CLOUD);
consumer.setNamesrvAddr("<your-access-point>");
consumer.subscribe("<your-topic>", "*");

Placeholder reference

Replace the following placeholders with your actual values:

PlaceholderDescriptionExample
<instance-username>Instance username from the Intelligent Authentication tabMjY1NTI5MD...
<instance-password>Instance password from the Intelligent Authentication tabOTk2QjFBMz...
<your-group-id>Consumer group ID created in the consoleGID_test_group
<your-access-point>Instance access point (domain:port) from the consolermq-cn-hangzhou.rmq.aliyuncs.com:8080
<your-topic>Topic created in the consoletest_topic

Normal messages

Normal messages support three sending modes: synchronous, asynchronous, and one-way. For conceptual details, see Normal messages.

Send normal messages synchronously

Synchronous sending blocks until the broker acknowledges each message. Use this mode when delivery confirmation is required.

import java.util.Date;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.common.RemotingHelper;

// After producer initialization (see Shared configuration)

for (int i = 0; i < 128; i++) {
    try {
        Message msg = new Message(
            "<your-topic>",
            "<your-message-tag>",
            "Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET));

        SendResult sendResult = producer.send(msg);
        System.out.printf("%s%n", sendResult);
    } catch (Exception e) {
        // Handle send failure: retry or persist the message
        System.out.println(new Date() + " Send mq message failed.");
        e.printStackTrace();
    }
}

// Shut down the producer before exiting.
// Skip shutdown if your application sends messages continuously.
producer.shutdown();

Send normal messages asynchronously

Asynchronous sending returns immediately and delivers results through a callback. Use this mode for latency-sensitive producers.

import java.util.Date;
import java.util.concurrent.TimeUnit;
import org.apache.rocketmq.client.producer.SendCallback;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.common.RemotingHelper;

// After producer initialization (see Shared configuration)

for (int i = 0; i < 128; i++) {
    try {
        Message msg = new Message(
            "<your-topic>",
            "<your-message-tag>",
            "Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET));

        producer.send(msg, new SendCallback() {
            @Override
            public void onSuccess(SendResult result) {
                System.out.println("send message success. msgId= " + result.getMsgId());
            }

            @Override
            public void onException(Throwable throwable) {
                // Handle send failure: retry or persist the message
                System.out.println("send message failed.");
                throwable.printStackTrace();
            }
        });
    } catch (Exception e) {
        System.out.println(new Date() + " Send mq message failed.");
        e.printStackTrace();
    }
}

// Wait for async callbacks to complete before shutting down
TimeUnit.SECONDS.sleep(3);
producer.shutdown();

Send normal messages in one-way mode

One-way sending fires a message without waiting for a broker response. Use this mode for high-throughput workloads where occasional message loss is acceptable, such as log collection.

import java.util.Date;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.common.RemotingHelper;

// After producer initialization (see Shared configuration)

for (int i = 0; i < 128; i++) {
    try {
        Message msg = new Message(
            "<your-topic>",
            "<your-message-tag>",
            "Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET));

        producer.sendOneway(msg);
    } catch (Exception e) {
        System.out.println(new Date() + " Send mq message failed.");
        e.printStackTrace();
    }
}

producer.shutdown();

Receive normal messages

Subscribe to normal messages with a concurrent listener. Each message is processed independently, and ordering is not guaranteed.

import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.common.message.MessageExt;
import java.util.List;

// After consumer initialization (see Shared configuration)

consumer.registerMessageListener(new MessageListenerConcurrently() {
    @Override
    public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
                                                    ConsumeConcurrentlyContext context) {
        System.out.printf("Receive New Messages: %s %n", msgs);
        return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
    }
});
consumer.start();

Ordered messages

Ordered messages guarantee first-in-first-out (FIFO) delivery within the same message queue. Messages with the same sharding key are routed to the same queue. For conceptual details, see Ordered messages.

Send ordered messages

Unlike normal messages, ordered messages require a MessageQueueSelector to route messages with the same sharding key to the same queue.

import java.util.List;
import org.apache.rocketmq.client.producer.MessageQueueSelector;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.remoting.common.RemotingHelper;

// After producer initialization (see Shared configuration)

for (int i = 0; i < 128; i++) {
    try {
        int orderId = i % 10;
        Message msg = new Message(
            "<your-order-topic>",
            "<your-message-tag>",
            "OrderID188",
            "Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET));

        // Set the sharding key to ensure ordered delivery.
        // For ApsaraMQ for RocketMQ 5.x instances, an alternative is:
        // msg.putUserProperty(MessageConst.PROPERTY_SHARDING_KEY, orderId + "");
        msg.putUserProperty("__SHARDINGKEY", orderId + "");

        SendResult sendResult = producer.send(msg, new MessageQueueSelector() {
            @Override
            public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
                // Route messages with the same orderId to the same queue
                Integer id = (Integer) arg;
                int index = id % mqs.size();
                return mqs.get(index);
            }
        }, orderId);

        System.out.printf("%s%n", sendResult);
    } catch (Exception e) {
        e.printStackTrace();
    }
}
producer.shutdown();

Receive ordered messages

Use MessageListenerOrderly instead of MessageListenerConcurrently. The broker delivers messages in order within each queue.

import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerOrderly;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
import org.apache.rocketmq.common.message.MessageExt;
import java.util.List;

// After consumer initialization (see Shared configuration)
// Subscribe to the ordered topic instead of a normal topic:
// consumer.subscribe("<your-order-topic>", "*");

consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
consumer.registerMessageListener(new MessageListenerOrderly() {
    @Override
    public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs,
                                                ConsumeOrderlyContext context) {
        context.setAutoCommit(true);
        System.out.printf("%s Receive New Messages: %s %n",
            Thread.currentThread().getName(), msgs);

        // Return SUSPEND_CURRENT_QUEUE_A_MOMENT on failure to retry
        return ConsumeOrderlyStatus.SUCCESS;
    }
});

consumer.start();
System.out.printf("Consumer Started.%n");

Scheduled and delayed messages

Scheduled messages are delivered at a specified time. Delayed messages are delivered after a specified delay. Both use the __STARTDELIVERTIME property. For conceptual details, see Scheduled and delayed messages.

Send scheduled or delayed messages

import java.util.Date;
import java.text.SimpleDateFormat;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.common.RemotingHelper;

// After producer initialization (see Shared configuration)

for (int i = 0; i < 128; i++) {
    try {
        Message msg = new Message(
            "<your-topic>",
            "<your-message-tag>",
            "Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET));

        // Option 1: Delayed delivery -- deliver 3 seconds from now
        long delayTime = System.currentTimeMillis() + 3000;
        msg.putUserProperty("__STARTDELIVERTIME", String.valueOf(delayTime));

        // Option 2: Scheduled delivery -- deliver at a specific time
        // Format: yyyy-MM-dd HH:mm:ss
        // If the time is in the past, the message is delivered immediately.
        // long timeStamp = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss")
        //     .parse("2021-08-10 18:45:00").getTime();
        // msg.putUserProperty("__STARTDELIVERTIME", String.valueOf(timeStamp));

        SendResult sendResult = producer.send(msg);
        System.out.printf("%s%n", sendResult);
    } catch (Exception e) {
        System.out.println(new Date() + " Send mq message failed.");
        e.printStackTrace();
    }
}

producer.shutdown();

Receive scheduled or delayed messages

The consumer code is identical to Receive normal messages. No special configuration is required on the consumer side.

Transactional messages

Transactional messages provide distributed transaction support through a two-phase commit: the producer sends a half message, runs a local transaction, and then commits or rolls back the message based on the transaction result. For conceptual details, see Transactional messages.

Important

Transactional messages require a dedicated consumer group. Do not share the group with other message types.

Send transactional messages

This example sends a half message and runs a local transaction. The broker holds the half message until the producer commits or rolls it back.

import org.apache.rocketmq.client.producer.LocalTransactionExecuter;
import org.apache.rocketmq.client.producer.LocalTransactionState;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.client.producer.TransactionMQProducer;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.common.RemotingHelper;

// --- Public endpoint: pass RPCHook and a dedicated transaction group ID ---
TransactionMQProducer transactionMQProducer = new TransactionMQProducer(
    "<your-transaction-group-id>", getAclRPCHook());

// --- VPC endpoint: omit RPCHook ---
// TransactionMQProducer transactionMQProducer = new TransactionMQProducer(
//     "<your-transaction-group-id>");

transactionMQProducer.setAccessChannel(AccessChannel.CLOUD);
transactionMQProducer.setNamesrvAddr("<your-access-point>");

// Register the transaction status checker (see next section)
transactionMQProducer.setTransactionCheckListener(new LocalTransactionCheckerImpl());
transactionMQProducer.start();

for (int i = 0; i < 10; i++) {
    try {
        Message message = new Message(
            "<your-transaction-topic>",
            "<your-message-tag>",
            "Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET));

        SendResult sendResult = transactionMQProducer.sendMessageInTransaction(
            message,
            new LocalTransactionExecuter() {
                @Override
                public LocalTransactionState executeLocalTransactionBranch(
                        Message msg, Object arg) {
                    System.out.println("Start executing the local transaction: " + msg);
                    // Run your local transaction logic here.
                    // Return COMMIT_MESSAGE, ROLLBACK_MESSAGE, or UNKNOW.
                    return LocalTransactionState.UNKNOW;
                }
            },
            null);

        assert sendResult != null;
    } catch (Exception e) {
        e.printStackTrace();
    }
}

Check transaction status

When the producer returns UNKNOW, the broker periodically calls checkLocalTransactionState to resolve the transaction. Implement the TransactionCheckListener interface to query your local transaction state and return the final result.

import org.apache.rocketmq.client.producer.LocalTransactionState;
import org.apache.rocketmq.client.producer.TransactionCheckListener;
import org.apache.rocketmq.common.message.MessageExt;

public class LocalTransactionCheckerImpl implements TransactionCheckListener {
    @Override
    public LocalTransactionState checkLocalTransactionState(MessageExt msg) {
        System.out.println("Transaction status check received. MsgId: " + msg.getMsgId());
        // Query your local data store to determine the transaction outcome.
        // Return COMMIT_MESSAGE, ROLLBACK_MESSAGE, or UNKNOW.
        return LocalTransactionState.COMMIT_MESSAGE;
    }
}

Receive transactional messages

The consumer code is identical to Receive normal messages. No special configuration is required on the consumer side.