ApsaraMQ for RocketMQ 5.x instances support clients built with the RocketMQ 3.x and 4.x SDKs. This page provides Java examples for sending and receiving normal, ordered, scheduled, delayed, and transactional messages using these legacy SDKs.
The latest RocketMQ 5.x SDKs are fully compatible with ApsaraMQ for RocketMQ 5.x brokers and offer additional features. Use the 5.x SDKs for new projects. For details, see Release notes. The 3.x, 4.x, and TCP client SDKs are maintained for existing workloads only.
Prerequisites
Before you begin, make sure that you have:
An ApsaraMQ for RocketMQ 5.x instance
A topic and consumer group created in the ApsaraMQ for RocketMQ console
The RocketMQ 3.x or 4.x SDK for Java added to your project
The instance access point, in the format
rmq-cn-XXXX.rmq.aliyuncs.com:8080(available in the console)
For full setup instructions, see Preparations.
Shared configuration
All examples on this page share the same authentication and connection setup. The following blocks define the shared configuration so that each message-type example can focus on the sending or receiving logic.
Authentication
ApsaraMQ for RocketMQ supports two access methods. Choose based on your network environment:
Public endpoint -- Configure
RPCHookwith the instance username and password. Get these credentials from the Intelligent Authentication tab on the Access Control page in the ApsaraMQ for RocketMQ console. Do not use your Alibaba Cloud account AccessKey pair.VPC endpoint -- No credentials required. When the client runs on an Elastic Compute Service (ECS) instance within a Virtual Private Cloud (VPC), the broker automatically obtains credentials from the VPC configuration.
For serverless instances, credentials are required for public endpoint access. If you enable the authentication-free in VPCs feature, VPC access does not require credentials.
RPCHook helper
import org.apache.rocketmq.acl.common.AclClientRPCHook;
import org.apache.rocketmq.acl.common.SessionCredentials;
import org.apache.rocketmq.remoting.RPCHook;
/**
* Returns an RPCHook configured with instance credentials.
* Required for public endpoint access and serverless instances over the Internet.
* Not required for VPC access on standard instances.
*/
private static RPCHook getAclRPCHook() {
return new AclClientRPCHook(new SessionCredentials(
"<instance-username>", // From the Intelligent Authentication tab
"<instance-password>" // From the Intelligent Authentication tab
));
}Producer initialization
import org.apache.rocketmq.client.AccessChannel;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
// --- Public endpoint: pass RPCHook ---
DefaultMQProducer producer = new DefaultMQProducer(getAclRPCHook());
// --- VPC endpoint: omit RPCHook ---
// DefaultMQProducer producer = new DefaultMQProducer();
// Group ID created in the console
producer.setProducerGroup("<your-group-id>");
// Required for message trace. Omit to disable tracing.
producer.setAccessChannel(AccessChannel.CLOUD);
// Access point from the console. Use the domain and port only -- no http:// prefix, no resolved IP.
producer.setNamesrvAddr("<your-access-point>");
producer.start();Consumer initialization
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
// --- Public endpoint: pass RPCHook ---
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(getAclRPCHook());
// --- VPC endpoint: omit RPCHook ---
// DefaultMQPushConsumer consumer = new DefaultMQPushConsumer();
consumer.setConsumerGroup("<your-group-id>");
consumer.setAccessChannel(AccessChannel.CLOUD);
consumer.setNamesrvAddr("<your-access-point>");
consumer.subscribe("<your-topic>", "*");Placeholder reference
Replace the following placeholders with your actual values:
| Placeholder | Description | Example |
|---|---|---|
<instance-username> | Instance username from the Intelligent Authentication tab | MjY1NTI5MD... |
<instance-password> | Instance password from the Intelligent Authentication tab | OTk2QjFBMz... |
<your-group-id> | Consumer group ID created in the console | GID_test_group |
<your-access-point> | Instance access point (domain:port) from the console | rmq-cn-hangzhou.rmq.aliyuncs.com:8080 |
<your-topic> | Topic created in the console | test_topic |
Normal messages
Normal messages support three sending modes: synchronous, asynchronous, and one-way. For conceptual details, see Normal messages.
Send normal messages synchronously
Synchronous sending blocks until the broker acknowledges each message. Use this mode when delivery confirmation is required.
import java.util.Date;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.common.RemotingHelper;
// After producer initialization (see Shared configuration)
for (int i = 0; i < 128; i++) {
try {
Message msg = new Message(
"<your-topic>",
"<your-message-tag>",
"Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET));
SendResult sendResult = producer.send(msg);
System.out.printf("%s%n", sendResult);
} catch (Exception e) {
// Handle send failure: retry or persist the message
System.out.println(new Date() + " Send mq message failed.");
e.printStackTrace();
}
}
// Shut down the producer before exiting.
// Skip shutdown if your application sends messages continuously.
producer.shutdown();Send normal messages asynchronously
Asynchronous sending returns immediately and delivers results through a callback. Use this mode for latency-sensitive producers.
import java.util.Date;
import java.util.concurrent.TimeUnit;
import org.apache.rocketmq.client.producer.SendCallback;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.common.RemotingHelper;
// After producer initialization (see Shared configuration)
for (int i = 0; i < 128; i++) {
try {
Message msg = new Message(
"<your-topic>",
"<your-message-tag>",
"Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET));
producer.send(msg, new SendCallback() {
@Override
public void onSuccess(SendResult result) {
System.out.println("send message success. msgId= " + result.getMsgId());
}
@Override
public void onException(Throwable throwable) {
// Handle send failure: retry or persist the message
System.out.println("send message failed.");
throwable.printStackTrace();
}
});
} catch (Exception e) {
System.out.println(new Date() + " Send mq message failed.");
e.printStackTrace();
}
}
// Wait for async callbacks to complete before shutting down
TimeUnit.SECONDS.sleep(3);
producer.shutdown();Send normal messages in one-way mode
One-way sending fires a message without waiting for a broker response. Use this mode for high-throughput workloads where occasional message loss is acceptable, such as log collection.
import java.util.Date;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.common.RemotingHelper;
// After producer initialization (see Shared configuration)
for (int i = 0; i < 128; i++) {
try {
Message msg = new Message(
"<your-topic>",
"<your-message-tag>",
"Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET));
producer.sendOneway(msg);
} catch (Exception e) {
System.out.println(new Date() + " Send mq message failed.");
e.printStackTrace();
}
}
producer.shutdown();Receive normal messages
Subscribe to normal messages with a concurrent listener. Each message is processed independently, and ordering is not guaranteed.
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.common.message.MessageExt;
import java.util.List;
// After consumer initialization (see Shared configuration)
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
ConsumeConcurrentlyContext context) {
System.out.printf("Receive New Messages: %s %n", msgs);
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
consumer.start();Ordered messages
Ordered messages guarantee first-in-first-out (FIFO) delivery within the same message queue. Messages with the same sharding key are routed to the same queue. For conceptual details, see Ordered messages.
Send ordered messages
Unlike normal messages, ordered messages require a MessageQueueSelector to route messages with the same sharding key to the same queue.
import java.util.List;
import org.apache.rocketmq.client.producer.MessageQueueSelector;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.remoting.common.RemotingHelper;
// After producer initialization (see Shared configuration)
for (int i = 0; i < 128; i++) {
try {
int orderId = i % 10;
Message msg = new Message(
"<your-order-topic>",
"<your-message-tag>",
"OrderID188",
"Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET));
// Set the sharding key to ensure ordered delivery.
// For ApsaraMQ for RocketMQ 5.x instances, an alternative is:
// msg.putUserProperty(MessageConst.PROPERTY_SHARDING_KEY, orderId + "");
msg.putUserProperty("__SHARDINGKEY", orderId + "");
SendResult sendResult = producer.send(msg, new MessageQueueSelector() {
@Override
public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
// Route messages with the same orderId to the same queue
Integer id = (Integer) arg;
int index = id % mqs.size();
return mqs.get(index);
}
}, orderId);
System.out.printf("%s%n", sendResult);
} catch (Exception e) {
e.printStackTrace();
}
}
producer.shutdown();Receive ordered messages
Use MessageListenerOrderly instead of MessageListenerConcurrently. The broker delivers messages in order within each queue.
import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerOrderly;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
import org.apache.rocketmq.common.message.MessageExt;
import java.util.List;
// After consumer initialization (see Shared configuration)
// Subscribe to the ordered topic instead of a normal topic:
// consumer.subscribe("<your-order-topic>", "*");
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
consumer.registerMessageListener(new MessageListenerOrderly() {
@Override
public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs,
ConsumeOrderlyContext context) {
context.setAutoCommit(true);
System.out.printf("%s Receive New Messages: %s %n",
Thread.currentThread().getName(), msgs);
// Return SUSPEND_CURRENT_QUEUE_A_MOMENT on failure to retry
return ConsumeOrderlyStatus.SUCCESS;
}
});
consumer.start();
System.out.printf("Consumer Started.%n");Scheduled and delayed messages
Scheduled messages are delivered at a specified time. Delayed messages are delivered after a specified delay. Both use the __STARTDELIVERTIME property. For conceptual details, see Scheduled and delayed messages.
Send scheduled or delayed messages
import java.util.Date;
import java.text.SimpleDateFormat;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.common.RemotingHelper;
// After producer initialization (see Shared configuration)
for (int i = 0; i < 128; i++) {
try {
Message msg = new Message(
"<your-topic>",
"<your-message-tag>",
"Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET));
// Option 1: Delayed delivery -- deliver 3 seconds from now
long delayTime = System.currentTimeMillis() + 3000;
msg.putUserProperty("__STARTDELIVERTIME", String.valueOf(delayTime));
// Option 2: Scheduled delivery -- deliver at a specific time
// Format: yyyy-MM-dd HH:mm:ss
// If the time is in the past, the message is delivered immediately.
// long timeStamp = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss")
// .parse("2021-08-10 18:45:00").getTime();
// msg.putUserProperty("__STARTDELIVERTIME", String.valueOf(timeStamp));
SendResult sendResult = producer.send(msg);
System.out.printf("%s%n", sendResult);
} catch (Exception e) {
System.out.println(new Date() + " Send mq message failed.");
e.printStackTrace();
}
}
producer.shutdown();Receive scheduled or delayed messages
The consumer code is identical to Receive normal messages. No special configuration is required on the consumer side.
Transactional messages
Transactional messages provide distributed transaction support through a two-phase commit: the producer sends a half message, runs a local transaction, and then commits or rolls back the message based on the transaction result. For conceptual details, see Transactional messages.
Transactional messages require a dedicated consumer group. Do not share the group with other message types.
Send transactional messages
This example sends a half message and runs a local transaction. The broker holds the half message until the producer commits or rolls it back.
import org.apache.rocketmq.client.producer.LocalTransactionExecuter;
import org.apache.rocketmq.client.producer.LocalTransactionState;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.client.producer.TransactionMQProducer;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.common.RemotingHelper;
// --- Public endpoint: pass RPCHook and a dedicated transaction group ID ---
TransactionMQProducer transactionMQProducer = new TransactionMQProducer(
"<your-transaction-group-id>", getAclRPCHook());
// --- VPC endpoint: omit RPCHook ---
// TransactionMQProducer transactionMQProducer = new TransactionMQProducer(
// "<your-transaction-group-id>");
transactionMQProducer.setAccessChannel(AccessChannel.CLOUD);
transactionMQProducer.setNamesrvAddr("<your-access-point>");
// Register the transaction status checker (see next section)
transactionMQProducer.setTransactionCheckListener(new LocalTransactionCheckerImpl());
transactionMQProducer.start();
for (int i = 0; i < 10; i++) {
try {
Message message = new Message(
"<your-transaction-topic>",
"<your-message-tag>",
"Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET));
SendResult sendResult = transactionMQProducer.sendMessageInTransaction(
message,
new LocalTransactionExecuter() {
@Override
public LocalTransactionState executeLocalTransactionBranch(
Message msg, Object arg) {
System.out.println("Start executing the local transaction: " + msg);
// Run your local transaction logic here.
// Return COMMIT_MESSAGE, ROLLBACK_MESSAGE, or UNKNOW.
return LocalTransactionState.UNKNOW;
}
},
null);
assert sendResult != null;
} catch (Exception e) {
e.printStackTrace();
}
}Check transaction status
When the producer returns UNKNOW, the broker periodically calls checkLocalTransactionState to resolve the transaction. Implement the TransactionCheckListener interface to query your local transaction state and return the final result.
import org.apache.rocketmq.client.producer.LocalTransactionState;
import org.apache.rocketmq.client.producer.TransactionCheckListener;
import org.apache.rocketmq.common.message.MessageExt;
public class LocalTransactionCheckerImpl implements TransactionCheckListener {
@Override
public LocalTransactionState checkLocalTransactionState(MessageExt msg) {
System.out.println("Transaction status check received. MsgId: " + msg.getMsgId());
// Query your local data store to determine the transaction outcome.
// Return COMMIT_MESSAGE, ROLLBACK_MESSAGE, or UNKNOW.
return LocalTransactionState.COMMIT_MESSAGE;
}
}Receive transactional messages
The consumer code is identical to Receive normal messages. No special configuration is required on the consumer side.