All Products
Search
Document Center

ApsaraMQ for RocketMQ:Sample code for the RocketMQ 1.x TCP client SDK for Java

Last Updated:Mar 11, 2026

ApsaraMQ for RocketMQ 5.x instances support the RocketMQ 1.x TCP client SDK for Java. The following sample code covers four message types: normal, ordered, scheduled and delayed, and transactional.

Important

The latest RocketMQ 5.x SDKs are fully compatible with 5.x brokers and offer more features. Use them for all new projects. For more information, see Release notes. Alibaba Cloud maintains only the RocketMQ 3.x, 4.x, and TCP client SDKs. Use them only for existing workloads.

Code index

Use the following table to jump to the code you need.

Message typeSendReceive
NormalSynchronousPush
NormalAsynchronousBatch push
NormalOne-wayPull
NormalMulti-threaded
OrderedSendSubscribe
ScheduledSendSame as normal
DelayedSendSame as normal
TransactionalSendSame as normal

Prerequisites

Before you begin, make sure that you have:

  • An ApsaraMQ for RocketMQ 5.x instance with the endpoint, topics, and consumer groups created in the console

  • The RocketMQ 1.x TCP client SDK for Java added to your project. For more information, see Preparations

Common configuration

All examples share the same connection and authentication setup shown below. Each subsequent code block omits this boilerplate and focuses on the message-specific logic.

import com.aliyun.openservices.ons.api.PropertyKeyConst;
import java.util.Properties;

Properties properties = new Properties();

// Instance credentials
// Obtain the username and password on the Intelligent Authentication tab
// of the Access Control page for your instance in the ApsaraMQ for RocketMQ console.
// Do not use your Alibaba Cloud account AccessKey pair.
properties.put(PropertyKeyConst.AccessKey, "<instance-username>");
properties.put(PropertyKeyConst.SecretKey, "<instance-password>");

// Endpoint
// Enter the domain name and port from the console (e.g., rmq-cn-XXXX.rmq.aliyuncs.com:8080).
// Do not add an http:// or https:// prefix. Do not use a resolved IP address.
properties.put(PropertyKeyConst.NAMESRV_ADDR, "<endpoint>");

// Send timeout in milliseconds
properties.setProperty(PropertyKeyConst.SendMsgTimeoutMillis, "3000");

Replace the following placeholders with actual values:

PlaceholderDescriptionExample
<instance-username>Instance username from the consoleLTAI5tXxx
<instance-password>Instance password from the consolexXxXxXx
<endpoint>Instance endpoint from the consolermq-cn-XXXX.rmq.aliyuncs.com:8080

Authentication rules

Access methodCredentials required?
Public endpointYes. Set AccessKey and SecretKey
Virtual Private Cloud (VPC) from an Elastic Compute Service (ECS) instanceNo. The broker retrieves credentials from VPC information automatically
Serverless instance over the InternetYes
Serverless instance in a VPC (authentication-free enabled)No
Do not set the instance ID when using a TCP client SDK to connect to a 5.x instance. Setting it causes connection failures.

Serverless instances over the Internet

If you access a serverless instance over the Internet, use SDK version 1.9.0.Final or later, and add the following property:

properties.setProperty(PropertyKeyConst.Namespace, "<instance-id>");

Replace <instance-id> with your ApsaraMQ for RocketMQ instance ID.

Send and receive normal messages

Send normal messages synchronously

Synchronous sending blocks until the broker acknowledges the message. Use this mode when delivery confirmation matters.

import com.aliyun.openservices.ons.api.Message;
import com.aliyun.openservices.ons.api.Producer;
import com.aliyun.openservices.ons.api.SendResult;
import com.aliyun.openservices.ons.api.ONSFactory;
import com.aliyun.openservices.ons.api.PropertyKeyConst;

import java.util.Date;
import java.util.Properties;

public class ProducerTest {
    public static void main(String[] args) {
        Properties properties = new Properties();
        // ... common configuration (see above) ...

        Producer producer = ONSFactory.createProducer(properties);
        // Call start() once before sending any messages.
        producer.start();

        for (int i = 0; i < 100; i++) {
            Message msg = new Message(
                    "TopicTestMQ",           // Topic created in the console.
                                             // A normal message topic cannot send or receive other message types.
                    "TagA",                  // Tag for consumer-side filtering.
                    "Hello MQ".getBytes());  // Message body (binary). Producer and consumer must
                                             // agree on serialization and deserialization methods.

            // Business key. Should be globally unique when possible.
            // Use it to look up messages in the console if delivery issues occur.
            msg.setKey("ORDERID_" + i);

            try {
                SendResult sendResult = producer.send(msg);
                // No exception means the message was sent successfully.
                if (sendResult != null) {
                    System.out.println(new Date() + " Send mq message success. Topic is:" + msg.getTopic() + " msgId is: " + sendResult.getMessageId());
                }
            } catch (Exception e) {
                // Implement retry or persistence logic here.
                System.out.println(new Date() + " Send mq message failed. Topic is:" + msg.getTopic());
                e.printStackTrace();
            }
        }

        // Shut down the producer before exiting the application.
        // Skip this if the application sends messages continuously.
        producer.shutdown();
    }
}

Send normal messages asynchronously

Asynchronous sending returns immediately and delivers the result through a callback. Use this mode for higher throughput when the calling thread should not block.

import com.aliyun.openservices.ons.api.Message;
import com.aliyun.openservices.ons.api.OnExceptionContext;
import com.aliyun.openservices.ons.api.Producer;
import com.aliyun.openservices.ons.api.SendCallback;
import com.aliyun.openservices.ons.api.SendResult;
import com.aliyun.openservices.ons.api.ONSFactory;
import com.aliyun.openservices.ons.api.PropertyKeyConst;

import java.util.Properties;
import java.util.concurrent.TimeUnit;

public class ProducerTest {
    public static void main(String[] args) throws InterruptedException {
        Properties properties = new Properties();
        // ... common configuration (see above) ...

        Producer producer = ONSFactory.createProducer(properties);
        producer.start();

        Message msg = new Message(
                "TopicTestMQ",
                "TagA",
                "Hello MQ".getBytes());

        msg.setKey("ORDERID_100");

        // sendAsync returns immediately. The result is delivered through the callback.
        producer.sendAsync(msg, new SendCallback() {
            @Override
            public void onSuccess(final SendResult sendResult) {
                System.out.println("send message success. topic=" + sendResult.getTopic() + ", msgId=" + sendResult.getMessageId());
            }

            @Override
            public void onException(OnExceptionContext context) {
                // Implement retry or persistence logic here.
                System.out.println("send message failed. topic=" + context.getTopic() + ", msgId=" + context.getMessageId());
            }
        });

        // Wait for the async callback before shutting down.
        TimeUnit.SECONDS.sleep(3);

        producer.shutdown();
    }
}

Send normal messages in one-way mode

One-way sending fires the message without waiting for a broker response. This mode gives the highest throughput but provides no delivery guarantee. Use it only for scenarios that tolerate occasional message loss, such as log collection.

import com.aliyun.openservices.ons.api.Message;
import com.aliyun.openservices.ons.api.Producer;
import com.aliyun.openservices.ons.api.ONSFactory;
import com.aliyun.openservices.ons.api.PropertyKeyConst;

import java.util.Properties;

public class ProducerTest {
    public static void main(String[] args) {
        Properties properties = new Properties();
        // ... common configuration (see above) ...

        Producer producer = ONSFactory.createProducer(properties);
        producer.start();

        for (int i = 0; i < 100; i++) {
            Message msg = new Message(
                    "TopicTestMQ",
                    "TagA",
                    "Hello MQ".getBytes());

            msg.setKey("ORDERID_" + i);

            // sendOneway does not wait for a broker response.
            // If delivery confirmation matters, use synchronous or asynchronous sending instead.
            producer.sendOneway(msg);
        }

        producer.shutdown();
    }
}

Send normal messages from multiple threads

The producer is thread-safe. Share a single producer instance across threads instead of creating one per thread.

import com.aliyun.openservices.ons.api.Message;
import com.aliyun.openservices.ons.api.Producer;
import com.aliyun.openservices.ons.api.ONSFactory;
import com.aliyun.openservices.ons.api.PropertyKeyConst;
import com.aliyun.openservices.ons.api.SendResult;

import java.util.Date;
import java.util.Properties;

public class SharedProducer {
    public static void main(String[] args) {
        Properties properties = new Properties();
        properties.put(PropertyKeyConst.GROUP_ID, "XXX");
        // ... common configuration (see above) ...

        Producer producer = ONSFactory.createProducer(properties);
        producer.start();

        // Both threads share the same producer instance.
        Thread thread = new Thread(new Runnable() {
            @Override
            public void run() {
                Message msg = new Message(
                        "TopicTestMQ",
                        "TagA",
                        "Hello MQ".getBytes());
                try {
                    SendResult sendResult = producer.send(msg);
                    if (sendResult != null) {
                        System.out.println(new Date() + " Send mq message success. Topic is:" + msg.getTopic() + " msgId is: " + sendResult.getMessageId());
                    }
                } catch (Exception e) {
                    System.out.println(new Date() + " Send mq message failed. Topic is:" + msg.getTopic());
                    e.printStackTrace();
                }
            }
        });
        thread.start();

        Thread anotherThread = new Thread(new Runnable() {
            @Override
            public void run() {
                Message msg = new Message("TopicTestMQ", "TagA", "Hello MQ".getBytes());
                try {
                    SendResult sendResult = producer.send(msg);
                    if (sendResult != null) {
                        System.out.println(new Date() + " Send mq message success. Topic is:" + msg.getTopic() + " msgId is: " + sendResult.getMessageId());
                    }
                } catch (Exception e) {
                    System.out.println(new Date() + " Send mq message failed. Topic is:" + msg.getTopic());
                    e.printStackTrace();
                }
            }
        });
        anotherThread.start();

        // (Optional) Shut down the producer when it is no longer needed.
        // producer.shutdown();
    }
}

Subscribe to normal messages in push mode

In push mode, the broker delivers messages to consumers as they arrive. This is the most common consumption pattern.

import com.aliyun.openservices.ons.api.Action;
import com.aliyun.openservices.ons.api.ConsumeContext;
import com.aliyun.openservices.ons.api.Consumer;
import com.aliyun.openservices.ons.api.Message;
import com.aliyun.openservices.ons.api.MessageListener;
import com.aliyun.openservices.ons.api.ONSFactory;
import com.aliyun.openservices.ons.api.PropertyKeyConst;

import java.util.Properties;

public class ConsumerTest {
    public static void main(String[] args) {
        Properties properties = new Properties();
        properties.put(PropertyKeyConst.GROUP_ID, "XXX");
        // ... common configuration (see above) ...

        // Consumption mode (optional):
        // properties.put(PropertyKeyConst.MessageModel, PropertyValueConst.CLUSTERING);    // Default
        // properties.put(PropertyKeyConst.MessageModel, PropertyValueConst.BROADCASTING);

        Consumer consumer = ONSFactory.createConsumer(properties);

        // Subscribe to multiple tags with "||".
        consumer.subscribe("TopicTestMQ", "TagA||TagB", new MessageListener() {
            public Action consume(Message message, ConsumeContext context) {
                System.out.println("Receive: " + message);
                return Action.CommitMessage;
            }
        });

        // Subscribe to another topic. Use "*" to receive all tags.
        // To unsubscribe, remove the subscription code and restart the consumer.
        consumer.subscribe("TopicTestMQ-Other", "*", new MessageListener() {
            public Action consume(Message message, ConsumeContext context) {
                System.out.println("Receive: " + message);
                return Action.CommitMessage;
            }
        });

        consumer.start();
        System.out.println("Consumer Started");
    }
}

Subscribe to normal messages in batch push mode

Batch push mode delivers multiple messages per callback invocation, reducing per-message overhead.

import com.aliyun.openservices.ons.api.Action;
import com.aliyun.openservices.ons.api.ConsumeContext;
import com.aliyun.openservices.ons.api.Message;
import com.aliyun.openservices.ons.api.batch.BatchConsumer;
import com.aliyun.openservices.ons.api.batch.BatchMessageListener;
import java.util.List;
import java.util.Properties;

import com.aliyun.openservices.ons.api.ONSFactory;
import com.aliyun.openservices.ons.api.PropertyKeyConst;

public class SimpleBatchConsumer {

    public static void main(String[] args) {
        Properties consumerProperties = new Properties();
        consumerProperties.put(PropertyKeyConst.GROUP_ID, "XXX");
        // ... common configuration (see above) ...

        // Maximum number of messages per batch. Valid values: 1 to 1024. Default: 32.
        consumerProperties.setProperty(PropertyKeyConst.ConsumeMessageBatchMaxSize, String.valueOf(128));
        // Maximum wait time between batches in seconds. Valid values: 0 to 450. Default: 0.
        consumerProperties.setProperty(PropertyKeyConst.BatchConsumeMaxAwaitDurationInSeconds, String.valueOf(10));

        BatchConsumer batchConsumer = ONSFactory.createBatchConsumer(consumerProperties);
        batchConsumer.subscribe("TopicTestMQ", "TagA", new BatchMessageListener() {

             @Override
            public Action consume(final List<Message> messages, ConsumeContext context) {
                System.out.printf("Batch-size: %d\n", messages.size());
                // Process multiple messages at a time.
                return Action.CommitMessage;
            }
        });

        batchConsumer.start();
        System.out.println("Consumer start success.");

        // Keep the process alive.
        try {
            Thread.sleep(200000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

Subscribe to normal messages in pull mode

In pull mode, the consumer controls when to fetch messages. Use this mode when you need precise control over consumption pacing.

import com.aliyun.openservices.ons.api.Message;
import com.aliyun.openservices.ons.api.ONSFactory;
import com.aliyun.openservices.ons.api.PropertyKeyConst;
import com.aliyun.openservices.ons.api.PullConsumer;
import com.aliyun.openservices.ons.api.TopicPartition;
import java.util.List;
import java.util.Properties;
import java.util.Set;

public class PullConsumerClient {
    public static void main(String[] args) {
        Properties properties = new Properties();
        properties.setProperty(PropertyKeyConst.GROUP_ID, "GID-xxxxx");
        // ... common configuration (see above) ...

        PullConsumer consumer = ONSFactory.createPullConsumer(properties);
        consumer.start();

        // Get all partitions for the topic.
        Set<TopicPartition> topicPartitions = consumer.topicPartitions("topic-xxx");
        // Assign partitions to pull from.
        consumer.assign(topicPartitions);

        while (true) {
            // Poll with a 3000 ms timeout.
            List<Message> messages = consumer.poll(3000);
            System.out.printf("Received message: %s %n", messages);
        }
    }
}

Send and receive ordered messages

Ordered messages guarantee that messages with the same sharding key are consumed in the order they were sent. Use them for scenarios that require strict sequencing, such as order status updates.

Send ordered messages

Use OrderProducer and pass a sharding key to route related messages to the same partition.

import com.aliyun.openservices.ons.api.Message;
import com.aliyun.openservices.ons.api.ONSFactory;
import com.aliyun.openservices.ons.api.PropertyKeyConst;
import com.aliyun.openservices.ons.api.SendResult;
import com.aliyun.openservices.ons.api.order.OrderProducer;

import java.util.Date;
import java.util.Properties;

public class ProducerClient {

    public static void main(String[] args) {
        Properties properties = new Properties();
        properties.put(PropertyKeyConst.GROUP_ID, "XXX");
        // ... common configuration (see above) ...

        OrderProducer producer = ONSFactory.createOrderProducer(properties);
        producer.start();

        for (int i = 0; i < 1000; i++) {
            String orderId = "biz_" + i % 10;
            Message msg = new Message(
                    "Order_global_topic",              // Topic for ordered messages.
                    "TagA",
                    "send order global msg".getBytes()
            );
            msg.setKey(orderId);

            // Sharding key determines partition routing.
            // Messages with the same sharding key are delivered to the same partition
            // and consumed in order.
            String shardingKey = String.valueOf(orderId);
            try {
                SendResult sendResult = producer.send(msg, shardingKey);
                if (sendResult != null) {
                    System.out.println(new Date() + " Send mq message success. Topic is:" + msg.getTopic() + " msgId is: " + sendResult.getMessageId());
                }
            } catch (Exception e) {
                System.out.println(new Date() + " Send mq message failed. Topic is:" + msg.getTopic());
                e.printStackTrace();
            }
        }

        producer.shutdown();
    }
}

Subscribe to ordered messages

Use OrderConsumer with a MessageOrderListener. Return OrderAction.Suspend if consumption fails so that the broker retries the message without breaking order.

package com.aliyun.openservices.ons.example.order;

import com.aliyun.openservices.ons.api.Message;
import com.aliyun.openservices.ons.api.ONSFactory;
import com.aliyun.openservices.ons.api.PropertyKeyConst;
import com.aliyun.openservices.ons.api.order.ConsumeOrderContext;
import com.aliyun.openservices.ons.api.order.MessageOrderListener;
import com.aliyun.openservices.ons.api.order.OrderAction;
import com.aliyun.openservices.ons.api.order.OrderConsumer;

import java.util.Properties;

public class ConsumerClient {

    public static void main(String[] args) {
        Properties properties = new Properties();
        properties.put(PropertyKeyConst.GROUP_ID, "XXX");
        // ... common configuration (see above) ...

        // Retry interval (ms) if consumption fails. Valid values: 10 to 30000.
        properties.put(PropertyKeyConst.SuspendTimeMillis, "100");
        // Maximum retry count for failed messages.
        properties.put(PropertyKeyConst.MaxReconsumeTimes, "20");

        OrderConsumer consumer = ONSFactory.createOrderedConsumer(properties);

        consumer.subscribe(
                "Order_global_topic",
                // Tag filter:
                // "*"           -> subscribe to all tags
                // "TagA||TagB"  -> subscribe to TagA or TagB
                "*",
                new MessageOrderListener() {
                    @Override
                    public OrderAction consume(Message message, ConsumeOrderContext context) {
                        System.out.println(message);
                        // Return OrderAction.Suspend if consumption fails or an exception occurs.
                        return OrderAction.Success;
                    }
                });

        consumer.start();
    }
}

Send and receive scheduled messages

Scheduled messages specify an absolute delivery time instead of being sent immediately. Use them for tasks that must happen at a precise moment, such as sending a reminder 24 hours after user registration.

Send scheduled messages

Set startDeliverTime to a Unix timestamp in milliseconds. The broker delivers the message at this time. If the timestamp is in the past, the broker delivers the message immediately.

import com.aliyun.openservices.ons.api.Message;
import com.aliyun.openservices.ons.api.ONSFactory;
import com.aliyun.openservices.ons.api.Producer;
import com.aliyun.openservices.ons.api.PropertyKeyConst;
import com.aliyun.openservices.ons.api.SendResult;

import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.Properties;

public class ProducerDelayTest {
    public static void main(String[] args) {
        Properties properties = new Properties();
        // ... common configuration (see above) ...

        Producer producer = ONSFactory.createProducer(properties);
        producer.start();

        Message msg = new Message(
                "Topic",
                "tag",
                "Hello MQ".getBytes());
        msg.setKey("ORDERID_100");

        try {
            // Deliver at a specific time. Replace with your target timestamp.
            long timeStamp = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").parse("2016-03-07 16:21:00").getTime();
            msg.setStartDeliverTime(timeStamp);

            SendResult sendResult = producer.send(msg);
            System.out.println("Message Id:" + sendResult.getMessageId());
        } catch (Exception e) {
            System.out.println(new Date() + " Send mq message failed. Topic is:" + msg.getTopic());
            e.printStackTrace();
        }

        producer.shutdown();
    }
}

Subscribe to scheduled messages

Subscribe to scheduled messages the same way as normal messages. See Subscribe to normal messages in push mode.

Send and receive delayed messages

Delayed messages specify a relative delay from the current time, rather than an absolute delivery time. Use them for scenarios like retrying a failed operation after a cooldown period.

Send delayed messages

Set startDeliverTime to System.currentTimeMillis() plus the desired delay in milliseconds. The maximum delay is 40 days (3,456,000,000 ms).

import com.aliyun.openservices.ons.api.Message;
import com.aliyun.openservices.ons.api.ONSFactory;
import com.aliyun.openservices.ons.api.Producer;
import com.aliyun.openservices.ons.api.PropertyKeyConst;
import com.aliyun.openservices.ons.api.SendResult;

import java.util.Date;
import java.util.Properties;

public class ProducerDelayTest {
    public static void main(String[] args) {
        Properties properties = new Properties();
        // ... common configuration (see above) ...

        Producer producer = ONSFactory.createProducer(properties);
        producer.start();

        Message msg = new Message(
                "Topic",
                "tag",
                "Hello MQ".getBytes());
        msg.setKey("ORDERID_100");

        try {
            // Deliver 3 seconds from now.
            long delayTime = System.currentTimeMillis() + 3000;
            msg.setStartDeliverTime(delayTime);

            SendResult sendResult = producer.send(msg);
            if (sendResult != null) {
                System.out.println(new Date() + " Send mq message success. Topic is:" + msg.getTopic() + " msgId is: " + sendResult.getMessageId());
            }
        } catch (Exception e) {
            System.out.println(new Date() + " Send mq message failed. Topic is:" + msg.getTopic());
            e.printStackTrace();
        }

        producer.shutdown();
    }
}

Subscribe to delayed messages

Subscribe to delayed messages the same way as normal messages. See Subscribe to normal messages in push mode.

Send and receive transactional messages

Transactional messages tie a local transaction and message delivery together so they succeed or fail as a unit. The workflow is:

  1. The broker sends a half-message.

  2. Your application runs the local transaction.

  3. Based on the transaction result, the broker commits or rolls back the message.

If the broker does not receive a commit or rollback in time, it calls a checker callback to query the transaction status.

Transactional messages require a dedicated consumer group. Do not share the group with other message types.

Send transactional messages

Implement two interfaces:

  • LocalTransactionExecuter: Runs your local transaction after the broker accepts the half-message.

  • LocalTransactionChecker: Responds to broker status checks when the transaction outcome is unknown.

package com.aliyun.openservices.tcp.example.producer;

import com.aliyun.openservices.ons.api.Message;
import com.aliyun.openservices.ons.api.ONSFactory;
import com.aliyun.openservices.ons.api.PropertyKeyConst;
import com.aliyun.openservices.ons.api.SendResult;
import com.aliyun.openservices.ons.api.exception.ONSClientException;
import com.aliyun.openservices.ons.api.transaction.LocalTransactionChecker;
import com.aliyun.openservices.ons.api.transaction.LocalTransactionExecuter;
import com.aliyun.openservices.ons.api.transaction.TransactionProducer;
import com.aliyun.openservices.ons.api.transaction.TransactionStatus;

import java.util.Date;
import java.util.Properties;

public class SimpleTransactionProducer {

    public static void main(String[] args) {

        Properties properties = new Properties();
        // Transactional messages require a dedicated group ID.
        properties.put(PropertyKeyConst.GROUP_ID, "XXX");
        // ... common configuration (see above) ...

        // Register the transaction status checker before creating the producer.
        LocalTransactionCheckerImpl localTransactionChecker = new LocalTransactionCheckerImpl();
        TransactionProducer transactionProducer = ONSFactory.createTransactionProducer(properties, localTransactionChecker);
        transactionProducer.start();

        Message msg = new Message("XXX", "TagA", "Hello MQ transaction===".getBytes());

        for (int i = 0; i < 3; i++) {
            try {
                SendResult sendResult = transactionProducer.send(msg, new LocalTransactionExecuter() {
                    @Override
                    public TransactionStatus execute(Message msg, Object arg) {
                        // Run the local transaction here.
                        System.out.println("Execute the local transaction and commit the transaction status.");
                        return TransactionStatus.CommitTransaction;
                    }
                }, null);
                assert sendResult != null;
            } catch (ONSClientException e) {
                System.out.println(new Date() + " Send mq message failed! Topic is:" + msg.getTopic());
                e.printStackTrace();
            }
        }

        System.out.println("Send transaction message success.");
    }
}

// Transaction status checker.
// The broker calls check() when it does not receive a commit or rollback.
class LocalTransactionCheckerImpl implements LocalTransactionChecker {

    @Override
    public TransactionStatus check(Message msg) {
        System.out.println("The request to check the transaction status of the message is received. MsgId: " + msg.getMsgID());
        return TransactionStatus.CommitTransaction;
    }
}

Subscribe to transactional messages

Subscribe to transactional messages the same way as normal messages. See Subscribe to normal messages in push mode.

What's next

  • Set up client-side logging to help troubleshoot message delivery issues. See Log configurations.

  • Migrate to the latest RocketMQ 5.x SDKs for more features and better compatibility. See Release notes.