All Products
Search
Document Center

ApsaraMQ for RocketMQ:Send messages by using multiple threads

Last Updated:Aug 18, 2023

The consumer and producer objects of ApsaraMQ for RocketMQ are thread-safe and can be shared among multiple threads.

You can deploy multiple producer and consumer instances on one or more brokers. You can also run multiple threads to send or receive messages in a producer or consumer instance. This improves the transactions per second (TPS) for sending or receiving messages.

Important
  • Do not create a producer instance or consumer instance for each thread.

  • We recommend that you do not use multiple threads to send ordered messages.

    An ApsaraMQ for RocketMQ broker determines the order in which messages are produced based on the order in which the sender uses a single producer or thread to send messages. If the sender uses multiple producers or threads to concurrently send messages, the message order is determined by the order in which the messages are received by the ApsaraMQ for RocketMQ broker. This order may be different from the sending order on the business side.

The following sample code shows how to share a producer among multiple threads:

import com.aliyun.openservices.ons.api.Message;
import com.aliyun.openservices.ons.api.Producer;
import com.aliyun.openservices.ons.api.ONSFactory;
import com.aliyun.openservices.ons.api.PropertyKeyConst;

import com.aliyun.openservices.ons.api.SendResult;
import java.util.Properties;

public class SharedProducer {
    public static void main(String[] args) {
        // Initialize the producer configurations. 
        Properties properties = new Properties();
        // The ID of the consumer group that you created in the ApsaraMQ for RocketMQ console. 
        properties.put(PropertyKeyConst.GROUP_ID,"XXX");
        // Make sure that the environment variables ALIBABA_CLOUD_ACCESS_KEY_ID and ALIBABA_CLOUD_ACCESS_KEY_SECRET are configured. 
        // The AccessKey ID that is used for authentication. 
        properties.put(PropertyKeyConst.AccessKey, System.getenv("ALIBABA_CLOUD_ACCESS_KEY_ID"));
        // The AccessKey secret that is used for authentication. 
        properties.put(PropertyKeyConst.SecretKey, System.getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET"));
        // The timeout period for sending messages. Unit: milliseconds.  
        properties.setProperty(PropertyKeyConst.SendMsgTimeoutMillis,"3000");
        // The TCP endpoint. You can obtain the endpoint in the TCP endpoint section of the Instance Details page in the ApsaraMQ for RocketMQ console. 
        properties.put(PropertyKeyConst.NAMESRV_ADDR, "XXX");
        final Producer producer = ONSFactory.createProducer(properties);
        // Before you send the message, call the start() function only once to start the producer. 
        producer.start();

        // The created producer and consumer objects are thread-safe and can be shared among multiple threads. Do not create a producer instance or consumer instance for each thread. 

        // Two threads share the producer object and concurrently send messages to ApsaraMQ for RocketMQ. 
        Thread thread = new Thread(new Runnable() {
            @Override
            public void run() {
                try {
                    Message msg = new Message(
                    // The topic in which normal messages are produced. A topic that is used to send and receive normal messages cannot be used to send or receive messages of other types. 
                    "TopicTestMQ",
                    // The message tag. A message tag is similar to a Gmail tag and is used by consumers to filter messages on the ApsaraMQ for RocketMQ broker. 
                    "TagA",
                    // The message body. A message body is data in the binary format. ApsaraMQ for RocketMQ does not process message bodies. 
                    // The producer and consumer must agree on the methods to serialize and deserialize a message body. 
                    "Hello MQ".getBytes());
                    SendResult sendResult = producer.send(msg);
                    // Send the message in synchronous transmission mode. If no exception is thrown, the message is sent. 
                    if (sendResult != null) {
                        System.out.println(new Date() + " Send mq message success. Topic is:" + MqConfig.TOPIC + " msgId is: " + sendResult.getMessageId());
                    }
                } catch (Exception e) {
                    // The logic that you want to use to resend or persist the message if the message fails to be sent and needs to be sent again. 
                    System.out.println(new Date() + " Send mq message failed. Topic is:" + MqConfig.TOPIC);
                    e.printStackTrace();
                }
            }
        });
        thread.start();


        Thread anotherThread = new Thread(new Runnable() {
            @Override
            public void run() {
                try {
                    Message msg = new Message("TopicTestMQ", "TagA", "Hello MQ".getBytes());
                    SendResult sendResult = producer.send(msg);
                    // Send the message in synchronous transmission mode. If no exception is thrown, the message is sent. 
                    if (sendResult != null) {
                        System.out.println(new Date() + " Send mq message success. Topic is:" + MqConfig.TOPIC + " msgId is: " + sendResult.getMessageId());
                    }
                } catch (Exception e) {
                    // The logic that you want to use to resend or persist the message if the message fails to be sent and needs to be sent again. 
                    System.out.println(new Date() + " Send mq message failed. Topic is:" + MqConfig.TOPIC);
                    e.printStackTrace();
                }
            }
        });
        anotherThread.start();


        // Optional. If the producer instance is no longer used, terminate the producer and release the allocated resources. 
        // producer.shutdown();
    }
}