Producer and consumer objects in ApsaraMQ for RocketMQ are thread-safe. Share a single instance across threads instead of creating one per thread.
Why share instances
You can deploy multiple producer and consumer instances on one or more brokers. You can also run multiple threads to send or receive messages in a producer or consumer instance. Creating an instance per thread wastes resources and can degrade throughput.
Sharing a single producer or consumer across multiple threads:
Reduces resource overhead on both the client and the broker
Improves transactions per second (TPS) for sending and receiving messages
Usage notes
Do not create a producer or consumer instance for each thread. Create one instance and share it across threads.
Avoid sending ordered messages from multiple threads. The broker determines message order based on the sequence in which it receives messages from a single producer. When multiple threads send concurrently, the arrival order at the broker may differ from the sending order in your application logic.
Shared vs. per-thread producer
Correct -- create the producer once, share it across threads:
Producer producer = ONSFactory.createProducer(properties);
producer.start();
// Thread 1 and Thread 2 share the same producer
Thread t1 = new Thread(() -> producer.send(msg1));
Thread t2 = new Thread(() -> producer.send(msg2));
t1.start();
t2.start();Incorrect -- create a producer per thread:
// Each thread creates its own producer -- wastes resources
Thread t1 = new Thread(() -> {
Producer p = ONSFactory.createProducer(properties);
p.start();
p.send(msg);
p.shutdown();
});Sample code
The following example creates a shared producer and sends messages from two threads concurrently.
import com.aliyun.openservices.ons.api.Message;
import com.aliyun.openservices.ons.api.Producer;
import com.aliyun.openservices.ons.api.ONSFactory;
import com.aliyun.openservices.ons.api.PropertyKeyConst;
import com.aliyun.openservices.ons.api.SendResult;
import java.util.Properties;
public class SharedProducer {
public static void main(String[] args) {
Properties properties = new Properties();
// The ID of the consumer group created in the ApsaraMQ for RocketMQ console.
properties.put(PropertyKeyConst.GROUP_ID, "<your-group-id>");
// Make sure the environment variables ALIBABA_CLOUD_ACCESS_KEY_ID
// and ALIBABA_CLOUD_ACCESS_KEY_SECRET are configured.
properties.put(PropertyKeyConst.AccessKey, System.getenv("ALIBABA_CLOUD_ACCESS_KEY_ID"));
properties.put(PropertyKeyConst.SecretKey, System.getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET"));
// Message send timeout in milliseconds.
properties.setProperty(PropertyKeyConst.SendMsgTimeoutMillis, "3000");
// TCP endpoint. Get this value from the TCP endpoint section on the Instance Details
// page in the ApsaraMQ for RocketMQ console.
properties.put(PropertyKeyConst.NAMESRV_ADDR, "<your-tcp-endpoint>");
final Producer producer = ONSFactory.createProducer(properties);
// Call start() once before sending any messages.
producer.start();
// Both threads share the same producer instance.
Thread thread1 = new Thread(() -> {
try {
Message msg = new Message(
"TopicTestMQ", // Topic for normal messages
"TagA", // Message tag for consumer-side filtering
"Hello MQ".getBytes() // Message body in binary format
);
SendResult result = producer.send(msg);
if (result != null) {
System.out.println("Sent successfully. Message ID: " + result.getMessageId());
}
} catch (Exception e) {
// Handle failures: retry or persist the message for later processing.
e.printStackTrace();
}
});
Thread thread2 = new Thread(() -> {
try {
Message msg = new Message("TopicTestMQ", "TagA", "Hello MQ".getBytes());
SendResult result = producer.send(msg);
if (result != null) {
System.out.println("Sent successfully. Message ID: " + result.getMessageId());
}
} catch (Exception e) {
e.printStackTrace();
}
});
thread1.start();
thread2.start();
// Call shutdown() when the producer is no longer needed to release resources.
// producer.shutdown();
}
}Replace the following placeholders with your actual values:
| Placeholder | Description | Where to find |
|---|---|---|
<your-group-id> | Consumer group ID | ApsaraMQ for RocketMQ console |
<your-tcp-endpoint> | TCP endpoint | The TCP endpoint section on the Instance Details page |
Key points:
The
send()method uses synchronous transmission. If no exception is thrown, the message is sent successfully.A message tag is a label (similar to a Gmail tag) that consumers use to filter messages on the broker.
The message body is binary data. ApsaraMQ for RocketMQ does not process message bodies. The producer and consumer must agree on serialization and deserialization methods.
A topic used for normal messages cannot be used for other message types.
Scale beyond a single instance
For higher throughput, deploy multiple producer or consumer instances across one or more brokers. Each instance is shared by a pool of threads -- do not create an instance per thread regardless of scale.
| Scale | Approach |
|---|---|
| A few threads | Create dedicated threads sharing one producer (as shown above) |
| Many concurrent senders | Use a thread pool sharing one producer |
| Very high throughput | Deploy multiple producer instances, each shared by a thread pool |