All Products
Search
Document Center

ApsaraMQ for RocketMQ:Send messages by using multiple threads

Last Updated:Mar 11, 2026

Producer and consumer objects in ApsaraMQ for RocketMQ are thread-safe. Share a single instance across threads instead of creating one per thread.

Why share instances

You can deploy multiple producer and consumer instances on one or more brokers. You can also run multiple threads to send or receive messages in a producer or consumer instance. Creating an instance per thread wastes resources and can degrade throughput.

Sharing a single producer or consumer across multiple threads:

  • Reduces resource overhead on both the client and the broker

  • Improves transactions per second (TPS) for sending and receiving messages

Usage notes

Important

Do not create a producer or consumer instance for each thread. Create one instance and share it across threads.

Important

Avoid sending ordered messages from multiple threads. The broker determines message order based on the sequence in which it receives messages from a single producer. When multiple threads send concurrently, the arrival order at the broker may differ from the sending order in your application logic.

Shared vs. per-thread producer

Correct -- create the producer once, share it across threads:

Producer producer = ONSFactory.createProducer(properties);
producer.start();

// Thread 1 and Thread 2 share the same producer
Thread t1 = new Thread(() -> producer.send(msg1));
Thread t2 = new Thread(() -> producer.send(msg2));
t1.start();
t2.start();

Incorrect -- create a producer per thread:

// Each thread creates its own producer -- wastes resources
Thread t1 = new Thread(() -> {
    Producer p = ONSFactory.createProducer(properties);
    p.start();
    p.send(msg);
    p.shutdown();
});

Sample code

The following example creates a shared producer and sends messages from two threads concurrently.

import com.aliyun.openservices.ons.api.Message;
import com.aliyun.openservices.ons.api.Producer;
import com.aliyun.openservices.ons.api.ONSFactory;
import com.aliyun.openservices.ons.api.PropertyKeyConst;
import com.aliyun.openservices.ons.api.SendResult;
import java.util.Properties;

public class SharedProducer {
    public static void main(String[] args) {
        Properties properties = new Properties();
        // The ID of the consumer group created in the ApsaraMQ for RocketMQ console.
        properties.put(PropertyKeyConst.GROUP_ID, "<your-group-id>");
        // Make sure the environment variables ALIBABA_CLOUD_ACCESS_KEY_ID
        // and ALIBABA_CLOUD_ACCESS_KEY_SECRET are configured.
        properties.put(PropertyKeyConst.AccessKey, System.getenv("ALIBABA_CLOUD_ACCESS_KEY_ID"));
        properties.put(PropertyKeyConst.SecretKey, System.getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET"));
        // Message send timeout in milliseconds.
        properties.setProperty(PropertyKeyConst.SendMsgTimeoutMillis, "3000");
        // TCP endpoint. Get this value from the TCP endpoint section on the Instance Details
        // page in the ApsaraMQ for RocketMQ console.
        properties.put(PropertyKeyConst.NAMESRV_ADDR, "<your-tcp-endpoint>");

        final Producer producer = ONSFactory.createProducer(properties);
        // Call start() once before sending any messages.
        producer.start();

        // Both threads share the same producer instance.
        Thread thread1 = new Thread(() -> {
            try {
                Message msg = new Message(
                    "TopicTestMQ",   // Topic for normal messages
                    "TagA",          // Message tag for consumer-side filtering
                    "Hello MQ".getBytes()  // Message body in binary format
                );
                SendResult result = producer.send(msg);
                if (result != null) {
                    System.out.println("Sent successfully. Message ID: " + result.getMessageId());
                }
            } catch (Exception e) {
                // Handle failures: retry or persist the message for later processing.
                e.printStackTrace();
            }
        });

        Thread thread2 = new Thread(() -> {
            try {
                Message msg = new Message("TopicTestMQ", "TagA", "Hello MQ".getBytes());
                SendResult result = producer.send(msg);
                if (result != null) {
                    System.out.println("Sent successfully. Message ID: " + result.getMessageId());
                }
            } catch (Exception e) {
                e.printStackTrace();
            }
        });

        thread1.start();
        thread2.start();

        // Call shutdown() when the producer is no longer needed to release resources.
        // producer.shutdown();
    }
}

Replace the following placeholders with your actual values:

PlaceholderDescriptionWhere to find
<your-group-id>Consumer group IDApsaraMQ for RocketMQ console
<your-tcp-endpoint>TCP endpointThe TCP endpoint section on the Instance Details page

Key points:

  • The send() method uses synchronous transmission. If no exception is thrown, the message is sent successfully.

  • A message tag is a label (similar to a Gmail tag) that consumers use to filter messages on the broker.

  • The message body is binary data. ApsaraMQ for RocketMQ does not process message bodies. The producer and consumer must agree on serialization and deserialization methods.

  • A topic used for normal messages cannot be used for other message types.

Scale beyond a single instance

For higher throughput, deploy multiple producer or consumer instances across one or more brokers. Each instance is shared by a pool of threads -- do not create an instance per thread regardless of scale.

ScaleApproach
A few threadsCreate dedicated threads sharing one producer (as shown above)
Many concurrent sendersUse a thread pool sharing one producer
Very high throughputDeploy multiple producer instances, each shared by a thread pool