The consumer and producer objects of Message Queue for Apache RocketMQ are thread-secure and can be shared among threads.
You can deploy multiple producer and consumer instances on one or more ECS instances. A producer or consumer instance can also run multiple threads to send or receive messages, improving the message sending or receiving TPS. Do not create a client instance for every thread.
The sample code for sharing a producer among threads is as follows:
import com.aliyun.openservices.ons.api.Message;
import com.aliyun.openservices.ons.api.Producer;
import com.aliyun.openservices.ons.api.ONSFactory;
import com.aliyun.openservices.ons.api.PropertyKeyConst;
import com.aliyun.openservices.ons.api.SendResult;
import java.util.Properties;
public class SharedProducer {
public static void main(String[] args) {
// Initialize the producer configuration.
Properties properties = new Properties();
// The group ID you created in the console.
properties.put(PropertyKeyConst.GROUP_ID, "XXX");
// The AccessKey ID you created in the Alibaba Cloud console for identity authentication.
properties.put(PropertyKeyConst.AccessKey,"XXX");
// The AccessKey secret you created in the Alibaba Cloud console for identity authentication.
properties.put(PropertyKeyConst.SecretKey, "XXX");
// The message transmission timeout interval, in milliseconds.
properties.setProperty(PropertyKeyConst.SendMsgTimeoutMillis, "3000");
// The TCP endpoint. Go to the Instances page in the Message Queue for Apache RocketMQ console, and view the endpoint in the Endpoint Information section.
properties.put(PropertyKeyConst.NAMESRV_ADDR,
"XXX");
final Producer producer = ONSFactory.createProducer(properties);
// Before sending a message, call the start method once to start the producer.
producer.start();
// The created producer and consumer objects are thread-secure and can be shared among threads. Do not create a client instance for every thread.
// Two threads share the producer object and concurrently send messages to Message Queue for Apache RocketMQ.
Thread thread = new Thread(new Runnable() {
@Override
public void run() {
try {
Message msg = new Message( //
// The topic of the message.
"TopicTestMQ",
// The message tag, which is similar to a Gmail tag. It is used to sort messages, enabling the consumer to filter messages on the Message Queue for Apache RocketMQ broker based on the specified criteria.
"TagA",
// The message body in any binary format. Message Queue for Apache RocketMQ does not process the message body.
// The producer and consumer must negotiate the consistent serialization and deserialization methods.
"Hello MQ".getBytes());
SendResult sendResult = producer.send(msg);
// The synchronous message transmission result, which is successful if no exception occurs.
if (sendResult ! = null) {
System.out.println(new Date() + " Send mq message success. Topic is:" + MqConfig.TOPIC + " msgId is: " + sendResult.getMessageId());
}
} catch (Exception e) {
// The message failed to be sent and must be resent. The system can resend the message or store message data persistently.
System.out.println(new Date() + " Send mq message failed. Topic is:" + MqConfig.TOPIC);
e.printStackTrace();
}
}
});
thread.start();
Thread anotherThread = new Thread(new Runnable() {
@Override
public void run() {
try {
Message msg = new Message("TopicTestMQ", "TagA", "Hello MQ".getBytes());
SendResult sendResult = producer.send(msg);
// The synchronous message transmission result, which is successful if no exception occurs.
if (sendResult ! = null) {
System.out.println(new Date() + " Send mq message success. Topic is:" + MqConfig.TOPIC + " msgId is: " + sendResult.getMessageId());
}
} catch (Exception e) {
// The message failed to be sent and must be resent. The system can resend the message or store message data persistently.
System.out.println(new Date() + " Send mq message failed. Topic is:" + MqConfig.TOPIC);
e.printStackTrace();
}
}
});
anotherThread.start();
// If the producer instance is no longer used, disable it to release resources.
// producer.shutdown();
}
}