The consumer and producer objects of Message Queue for Apache RocketMQ are thread-safe and can be shared among threads.

You can deploy multiple producer and consumer instances on one or more cloud servers. A producer or consumer instance can also run multiple threads to send or receive messages. This improves the transactions per second (TPS) for sending or receiving messages.
Notice
  • Do not create a producer instance or consumer instance for each thread.
  • We recommend that you do not use multiple threads to send ordered messages.

    The Message Queue for Apache RocketMQ broker determines the order in which messages are generated based on the order in which the sender uses a single producer to concurrently send messages in a single thread. If the sender uses multiple producers or multiple threads to concurrently send messages, the message order is determined based on the order in which the messages are received by the Message Queue for Apache RocketMQ broker. This order may be different from the sending order on the business side.

The following sample code shows how to share a producer among threads:

import com.aliyun.openservices.ons.api.Message;
import com.aliyun.openservices.ons.api.Producer;
import com.aliyun.openservices.ons.api.ONSFactory;
import com.aliyun.openservices.ons.api.PropertyKeyConst;

import com.aliyun.openservices.ons.api.SendResult;
import java.util.Properties;

public class SharedProducer {
    public static void main(String[] args) {
        // Initialize the producer configuration. 
        Properties properties = new Properties();
        // The ID of the group that you created in the Message Queue for Apache RocketMQ console. 
        properties.put(PropertyKeyConst.GROUP_ID,"XXX");
        // The AccessKey ID that you created in the Alibaba Cloud Management Console. The AccessKey ID is used for identity authentication. 
        properties.put(PropertyKeyConst.AccessKey,"XXX");
        // The AccessKey secret that you created in the Alibaba Cloud Management Console. The AccessKey secret is used for identity authentication. 
        properties.put(PropertyKeyConst.SecretKey,"XXX");
        // The timeout period for sending a message. Unit: milliseconds.  
        properties.setProperty(PropertyKeyConst.SendMsgTimeoutMillis,"3000");
        // The TCP endpoint of your instance. To obtain the TCP endpoint, log on to the Message Queue for Apache RocketMQ console. In the left-side navigation pane, click Instances. On the Instances page, click the name of your instance. On the Instance Details page that appears, go to the Basic Information section and view the TCP endpoint on the Endpoints tab. 
        properties.put(PropertyKeyConst.NAMESRV_ADDR, "XXX");
        final Producer producer = ONSFactory.createProducer(properties);
        // Before you send a message, call the start() function only once to start the producer. 
        producer.start();

        // The created producer and consumer objects are thread-safe and can be shared among threads. Do not create a producer instance or consumer instance for each thread. 

        // Share the producer object in two threads. The two threads can concurrently send messages to Message Queue for Apache RocketMQ. 
        Thread thread = new Thread(new Runnable() {
            @Override
            public void run() {
                try {
                    Message msg = new Message(
                    // The topic of the normal messages. The topic that is used to send and subscribe to normal messages cannot be used to send and subscribe to other types of messages. 
                    "TopicTestMQ",
                    // The message tag that is similar to a Gmail tag. The message tag is used to sort messages and filter messages for the consumer on the Message Queue for Apache RocketMQ broker based on specified conditions. 
                    "TagA",
                    // The message body in the binary format. Message Queue for Apache RocketMQ does not process the message body. 
                    // The producer and consumer must agree on the message serialization and deserialization methods. 
                    "Hello MQ".getBytes());
                    SendResult sendResult = producer.send(msg);
                    // Send the message in synchronous mode. If no error occurs, the message is sent. 
                    if (sendResult != null) {
                        System.out.println(new Date() + " Send mq message success. Topic is:" + MqConfig.TOPIC + " msgId is: " + sendResult.getMessageId());
                    }
                } catch (Exception e) {
                    // Specify the logic to resend or persist the message if the message fails to be sent and needs to be sent again. 
                    System.out.println(new Date() + " Send mq message failed. Topic is:" + MqConfig.TOPIC);
                    e.printStackTrace();
                }
            }
        });
        thread.start();


        Thread anotherThread = new Thread(new Runnable() {
            @Override
            public void run() {
                try {
                    Message msg = new Message("TopicTestMQ", "TagA", "Hello MQ".getBytes());
                    SendResult sendResult = producer.send(msg);
                    // Send the message in synchronous mode. If no error occurs, the message is sent. 
                    if (sendResult != null) {
                        System.out.println(new Date() + " Send mq message success. Topic is:" + MqConfig.TOPIC + " msgId is: " + sendResult.getMessageId());
                    }
                } catch (Exception e) {
                    // Specify the logic to resend or persist the message if the message fails to be sent and needs to be sent again. 
                    System.out.println(new Date() + " Send mq message failed. Topic is:" + MqConfig.TOPIC);
                    e.printStackTrace();
                }
            }
        });
        anotherThread.start();


        // Optional. If the producer instance is no longer used, shut down the producer and release the allocated resources. 
        // producer.shutdown();
    }
}