All Products
Search
Document Center

ApsaraMQ for RocketMQ:Sample code

Last Updated:Mar 11, 2026

Send and receive messages with ApsaraMQ for RocketMQ using the Apache RocketMQ Java SDK. This page covers both the gRPC protocol SDK (rocketmq-client-java) and the Remoting protocol SDK (rocketmq-client).

Prerequisites

Before you begin, make sure that you have:

  • An ApsaraMQ for RocketMQ instance with topics and group IDs created in the ApsaraMQ for RocketMQ console

  • The instance endpoint (for example, rmq-cn-XXXX.rmq.aliyuncs.com:8080)

  • The required SDK dependency added to your project. For version details, see Version guide

Important

If you use a Serverless instance, check the SDK version requirements for public network access before proceeding.

gRPC protocol SDK

The rocketmq-client-java SDK communicates over the gRPC protocol. The following tables list sample code for each message type, hosted in the Apache RocketMQ clients repository.

For Spring Boot integration, see rocketmq-v5-client-spring-boot-samples.

Important

When sending transactional messages with the gRPC protocol SDK, set a topic when you start the producer. Without a topic, transaction checks are delayed. If the message is not sent within four hours, the half-transactional message may be discarded.

Send messages

Message typeSample code
Normal message (synchronous)ProducerNormalMessageExample.java
Normal message (asynchronous)AsyncProducerExample.java
Ordered messageProducerFifoMessageExample.java
Scheduled or delayed messageProducerDelayMessageExample.java
Transactional messageProducerTransactionMessageExample.java
Lightweight messageLiteProducerExample.java

Consume messages

Consumer typeSample code
Push consumerPushConsumerExample.java
Simple consumer (synchronous)SimpleConsumerExample.java
Simple consumer (asynchronous)AsyncSimpleConsumerExample.java
Lightweight push consumerLitePushConsumerExample.java

For more information about push consumers and simple consumers, see Consumer types.

Remoting protocol SDK

The rocketmq-client SDK communicates over the Remoting protocol. This section provides inline code for each message type.

For Spring Boot integration, see rocketmq-spring-boot-samples.

Common setup

All Remoting protocol examples share the same authentication and connection setup. Review this section first, then refer to the message-type-specific code below.

Authentication

How you initialize the producer or consumer depends on the access method:

  • Public endpoint -- Pass an RPCHook with the instance username and password: Get the instance username and password from the Intelligent Identity Recognition tab of the Resource Access Management console. Do not use your Alibaba Cloud account's AccessKey ID and AccessKey secret.

      private static RPCHook getAclRPCHook() {
          return new AclClientRPCHook(new SessionCredentials("<instance-username>", "<instance-password>"));
      }
    
      // Pass the RPCHook when creating the producer
      DefaultMQProducer producer = new DefaultMQProducer(getAclRPCHook());
  • VPC endpoint -- No RPCHook required. The server authenticates based on the VPC:

      DefaultMQProducer producer = new DefaultMQProducer();
  • Serverless instance -- Pass an RPCHook for public network access. If password-free access over the internal network is enabled, no RPCHook is required.

Connection

// Group ID created in the ApsaraMQ for RocketMQ console
producer.setProducerGroup("<your-group-id>");

// Endpoint from the ApsaraMQ for RocketMQ console
// Use the domain name and port as shown. Do not add http:// or https://.
// Do not use a resolved IP address.
producer.setNamesrvAddr("<your-access-point>");

Message traces (optional)

To enable cloud message tracing, set the access channel and enable traces:

producer.setAccessChannel(AccessChannel.CLOUD);

// Required for SDK v5.3.0 and later, in addition to setAccessChannel
producer.setEnableTrace(true);

Placeholder reference

Replace the following placeholders with your actual values:

PlaceholderDescriptionExample
<instance-username>Instance username from the console--
<instance-password>Instance password from the console--
<your-group-id>Group ID created in the consoleGID_example
<your-access-point>Endpoint from the consolermq-cn-XXXX.rmq.aliyuncs.com:8080
<your-topic>Topic created in the consoletopic_example
<your-order-topic>Topic for ordered messagesorder_topic_example
<your-transaction-topic>Topic for transactional messagestransaction_topic_example
<your-transaction-group-id>Dedicated group ID for transactional messagesGID_transaction_example
<your-message-tag>Message tag for filteringTagA

Normal messages

Normal messages suit most use cases where ordering and transactional guarantees are not required.

Send a normal message (synchronous)

import org.apache.rocketmq.acl.common.AclClientRPCHook;
import org.apache.rocketmq.acl.common.SessionCredentials;
import org.apache.rocketmq.client.AccessChannel;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.RPCHook;
import org.apache.rocketmq.remoting.common.RemotingHelper;

import java.util.Date;

public class RocketMQProducer {
    private static RPCHook getAclRPCHook() {
        return new AclClientRPCHook(new SessionCredentials("<instance-username>", "<instance-password>"));
    }

    public static void main(String[] args) throws MQClientException {
        DefaultMQProducer producer = new DefaultMQProducer(getAclRPCHook());
        producer.setProducerGroup("<your-group-id>");
        producer.setAccessChannel(AccessChannel.CLOUD);
        producer.setEnableTrace(true);
        producer.setNamesrvAddr("<your-access-point>");
        producer.start();

        for (int i = 0; i < 128; i++) {
            try {
                Message msg = new Message("<your-topic>",
                        "<your-message-tag>",
                        "Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET));
                SendResult sendResult = producer.send(msg);
                System.out.printf("%s%n", sendResult);
            } catch (Exception e) {
                System.out.println(new Date() + " Send mq message failed.");
                e.printStackTrace();
            }
        }

        // Shut down the producer before the application exits.
        // Note: Destroying the producer object saves system memory.
        // To send messages continuously, keep the producer running.
        producer.shutdown();
    }
}

Send a normal message (asynchronous)

The asynchronous send returns immediately and invokes a callback when the broker responds.

import org.apache.rocketmq.acl.common.AclClientRPCHook;
import org.apache.rocketmq.acl.common.SessionCredentials;
import org.apache.rocketmq.client.AccessChannel;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendCallback;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.RPCHook;
import org.apache.rocketmq.remoting.common.RemotingHelper;

import java.util.Date;
import java.util.concurrent.TimeUnit;

public class RocketMQAsyncProducer {
    private static RPCHook getAclRPCHook() {
        return new AclClientRPCHook(new SessionCredentials("<instance-username>", "<instance-password>"));
    }

    public static void main(String[] args) throws MQClientException, InterruptedException {
        DefaultMQProducer producer = new DefaultMQProducer(getAclRPCHook());
        producer.setProducerGroup("<your-group-id>");
        producer.setAccessChannel(AccessChannel.CLOUD);
        producer.setEnableTrace(true);
        producer.setNamesrvAddr("<your-access-point>");
        producer.start();

        for (int i = 0; i < 128; i++) {
            try {
                Message msg = new Message("<your-topic>",
                        "<your-message-tag>",
                        "Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET));
                producer.send(msg, new SendCallback() {
                    @Override
                    public void onSuccess(SendResult result) {
                        System.out.println("send message success. msgId= " + result.getMsgId());
                    }

                    @Override
                    public void onException(Throwable throwable) {
                        System.out.println("send message failed.");
                        throwable.printStackTrace();
                    }
                });
            } catch (Exception e) {
                System.out.println(new Date() + " Send mq message failed.");
                e.printStackTrace();
            }
        }
        // Wait 3 seconds for the asynchronous callbacks to complete
        TimeUnit.SECONDS.sleep(3);

        producer.shutdown();
    }
}

Send a normal message (one-way)

One-way sends do not wait for a broker response. Use this mode for high-throughput scenarios where occasional message loss is acceptable, such as log collection.

import org.apache.rocketmq.acl.common.AclClientRPCHook;
import org.apache.rocketmq.acl.common.SessionCredentials;
import org.apache.rocketmq.client.AccessChannel;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.RPCHook;
import org.apache.rocketmq.remoting.common.RemotingHelper;

import java.util.Date;

public class RocketMQOnewayProducer {
    private static RPCHook getAclRPCHook() {
        return new AclClientRPCHook(new SessionCredentials("<instance-username>", "<instance-password>"));
    }

    public static void main(String[] args) throws MQClientException {
        DefaultMQProducer producer = new DefaultMQProducer(getAclRPCHook());
        producer.setProducerGroup("<your-group-id>");
        producer.setAccessChannel(AccessChannel.CLOUD);
        producer.setEnableTrace(true);
        producer.setNamesrvAddr("<your-access-point>");
        producer.start();

        for (int i = 0; i < 128; i++) {
            try {
                Message msg = new Message("<your-topic>",
                        "<your-message-tag>",
                        "Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET));
                // sendOneway does not return a result or throw an exception on failure
                producer.sendOneway(msg);
            } catch (Exception e) {
                System.out.println(new Date() + " Send mq message failed.");
                e.printStackTrace();
            }
        }

        producer.shutdown();
    }
}

Consume normal messages

import org.apache.rocketmq.acl.common.AclClientRPCHook;
import org.apache.rocketmq.acl.common.SessionCredentials;
import org.apache.rocketmq.client.AccessChannel;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.remoting.RPCHook;

import java.util.List;

public class RocketMQPushConsumer {
    private static RPCHook getAclRPCHook() {
        return new AclClientRPCHook(new SessionCredentials("<instance-username>", "<instance-password>"));
    }

    public static void main(String[] args) throws MQClientException {
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(getAclRPCHook());
        consumer.setConsumerGroup("<your-group-id>");
        consumer.setAccessChannel(AccessChannel.CLOUD);
        consumer.setEnableTrace(true);
        consumer.setNamesrvAddr("<your-access-point>");

        // Subscribe to a topic. Use "*" to receive all tags, or specify a tag expression.
        consumer.subscribe("<your-topic>", "*");
        consumer.registerMessageListener(new MessageListenerConcurrently() {
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
                                                            ConsumeConcurrentlyContext context) {
                System.out.printf("Receive New Messages: %s %n", msgs);
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });
        consumer.start();
    }
}

Ordered messages

Ordered messages guarantee FIFO delivery within the same sharding key. Use them for scenarios such as sequential event processing or real-time data synchronization.

Send an ordered message

Messages with the same sharding key are delivered to the same queue in order. Set the __SHARDINGKEY property so that the broker routes related messages to the same queue.

import org.apache.rocketmq.acl.common.AclClientRPCHook;
import org.apache.rocketmq.acl.common.SessionCredentials;
import org.apache.rocketmq.client.AccessChannel;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.MessageQueueSelector;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.remoting.RPCHook;
import org.apache.rocketmq.remoting.common.RemotingHelper;

import java.util.List;

public class RocketMQOrderProducer {
    private static RPCHook getAclRPCHook() {
        return new AclClientRPCHook(new SessionCredentials("<instance-username>", "<instance-password>"));
    }

    public static void main(String[] args) throws MQClientException {
        DefaultMQProducer producer = new DefaultMQProducer(getAclRPCHook());
        producer.setProducerGroup("<your-group-id>");
        producer.setAccessChannel(AccessChannel.CLOUD);
        producer.setEnableTrace(true);
        producer.setNamesrvAddr("<your-access-point>");
        producer.start();

        for (int i = 0; i < 128; i++) {
            try {
                int orderId = i % 10;
                Message msg = new Message("<your-order-topic>",
                        "<your-message-tag>",
                        "OrderID188",
                        "Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET));

                // Set the sharding key to route related messages to the same queue.
                // In v5.x, you can replace the following line with:
                // msg.putUserProperty(MessageConst.PROPERTY_SHARDING_KEY, orderId + "");
                msg.putUserProperty("__SHARDINGKEY", orderId + "");

                SendResult sendResult = producer.send(msg, new MessageQueueSelector() {
                    @Override
                    public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
                        // Select a queue based on the orderId
                        Integer id = (Integer) arg;
                        int index = id % mqs.size();
                        return mqs.get(index);
                    }
                }, orderId);

                System.out.printf("%s%n", sendResult);
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
        producer.shutdown();
    }
}

Consume ordered messages

Use MessageListenerOrderly instead of MessageListenerConcurrently to preserve consumption order within each queue.

import org.apache.rocketmq.acl.common.AclClientRPCHook;
import org.apache.rocketmq.acl.common.SessionCredentials;
import org.apache.rocketmq.client.AccessChannel;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerOrderly;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.remoting.RPCHook;

import java.util.List;

public class RocketMQOrderConsumer {
    private static RPCHook getAclRPCHook() {
        return new AclClientRPCHook(new SessionCredentials("<instance-username>", "<instance-password>"));
    }

    public static void main(String[] args) throws MQClientException {
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(getAclRPCHook());
        consumer.setConsumerGroup("<your-group-id>");
        consumer.setAccessChannel(AccessChannel.CLOUD);
        consumer.setEnableTrace(true);
        consumer.setNamesrvAddr("<your-access-point>");
        consumer.subscribe("<your-order-topic>", "*");

        consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
        consumer.registerMessageListener(new MessageListenerOrderly() {

            @Override
            public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {
                context.setAutoCommit(true);
                System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
                // Return SUSPEND_CURRENT_QUEUE_A_MOMENT on failure to retry
                return ConsumeOrderlyStatus.SUCCESS;
            }
        });

        consumer.start();
        System.out.printf("Consumer Started.%n");
    }
}

Scheduled and delayed messages

Scheduled messages are delivered at a specific time. Delayed messages are delivered after a configurable delay. Both use the __STARTDELIVERTIME user property.

Send a scheduled or delayed message

import org.apache.rocketmq.acl.common.AclClientRPCHook;
import org.apache.rocketmq.acl.common.SessionCredentials;
import org.apache.rocketmq.client.AccessChannel;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.RPCHook;
import org.apache.rocketmq.remoting.common.RemotingHelper;

import java.util.Date;

public class RocketMQDelayProducer {
    private static RPCHook getAclRPCHook() {
        return new AclClientRPCHook(new SessionCredentials("<instance-username>", "<instance-password>"));
    }

    public static void main(String[] args) throws MQClientException {
        DefaultMQProducer producer = new DefaultMQProducer(getAclRPCHook());
        producer.setProducerGroup("<your-group-id>");
        producer.setAccessChannel(AccessChannel.CLOUD);
        producer.setEnableTrace(true);
        producer.setNamesrvAddr("<your-access-point>");
        producer.start();

        for (int i = 0; i < 128; i++) {
            try {
                Message msg = new Message("<your-topic>",
                        "<your-message-tag>",
                        "Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET));

                // Delayed message: deliver after 3 seconds
                long delayTime = System.currentTimeMillis() + 3000;
                msg.putUserProperty("__STARTDELIVERTIME", String.valueOf(delayTime));

                // Scheduled message: deliver at a specific time (format: yyyy-MM-dd HH:mm:ss)
                // If the specified time is earlier than the current time, the message is delivered immediately.
                // long timeStamp = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").parse("2021-08-10 18:45:00").getTime();
                // msg.putUserProperty("__STARTDELIVERTIME", String.valueOf(timeStamp));

                SendResult sendResult = producer.send(msg);
                System.out.printf("%s%n", sendResult);
            } catch (Exception e) {
                System.out.println(new Date() + " Send mq message failed.");
                e.printStackTrace();
            }
        }

        producer.shutdown();
    }
}

Consume scheduled and delayed messages

Consume scheduled and delayed messages the same way as normal messages. No additional configuration is required.

Transactional messages

Transactional messages use a two-phase commit protocol. The producer sends a half-transactional message, runs a local transaction, and then commits or rolls back. The broker periodically checks uncommitted transactions by calling checkLocalTransaction.

Important

The group ID for transactional messages cannot be shared with other message types. Create a dedicated group ID for transaction producers.

Send a transactional message

import org.apache.rocketmq.acl.common.AclClientRPCHook;
import org.apache.rocketmq.acl.common.SessionCredentials;
import org.apache.rocketmq.client.AccessChannel;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.LocalTransactionState;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.client.producer.TransactionListener;
import org.apache.rocketmq.client.producer.TransactionMQProducer;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.remoting.RPCHook;
import org.apache.rocketmq.remoting.common.RemotingHelper;

public class RocketMQTransactionProducer {

    private static RPCHook getAclRPCHook() {
        return new AclClientRPCHook(new SessionCredentials("<instance-username>", "<instance-password>"));
    }

    public static void main(String[] args) throws MQClientException {
        // Use a dedicated group ID for transactional messages
        TransactionMQProducer transactionMQProducer = new TransactionMQProducer("<your-transaction-group-id>", getAclRPCHook());
        transactionMQProducer.setAccessChannel(AccessChannel.CLOUD);
        transactionMQProducer.setEnableTrace(true);
        transactionMQProducer.setNamesrvAddr("<your-access-point>");

        transactionMQProducer.setTransactionListener(new TransactionListener() {

            @Override
            public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
                // Run your local transaction logic here
                System.out.println("Start to execute the local transaction: " + msg);
                return LocalTransactionState.UNKNOW;
            }

            @Override
            public LocalTransactionState checkLocalTransaction(MessageExt msg) {
                // Return the local transaction status when the broker checks back
                System.out.println("Received a transaction check request, MsgId: " + msg.getMsgId());
                return LocalTransactionState.COMMIT_MESSAGE;
            }
        });
        transactionMQProducer.start();

        for (int i = 0; i < 10; i++) {
            try {
                Message message = new Message("<your-transaction-topic>",
                        "<your-message-tag>",
                        "Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET));
                SendResult sendResult = transactionMQProducer.sendMessageInTransaction(message, null);
                assert sendResult != null;
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    }
}

Consume transactional messages

Consume transactional messages the same way as normal messages. No additional configuration is required.

Serverless instance public network access

To access a Serverless instance over the public network, your SDK must meet a minimum version requirement. Add the namespace configuration shown below.

Note

Replace InstanceId with your actual instance ID.

Remoting protocol SDK (rocketmq-client >= 5.2.0)

Add the namespace to the producer or consumer:

// Producer
producer.setNamespaceV2("InstanceId");

// Consumer
consumer.setNamespaceV2("InstanceId");

gRPC protocol SDK (rocketmq-client-java >= 5.0.6)

Set the namespace in the ClientConfiguration:

ClientConfiguration clientConfiguration = ClientConfiguration.newBuilder()
    .setEndpoints(endpoints)
    .setNamespace("InstanceId")
    .setCredentialProvider(sessionCredentialsProvider)
    .build();