Message Queue for Apache RocketMQ allows you to send messages in synchronous, asynchronous, or one-way transmission modes. This topic describes the principles and scenarios of the three transmission modes and provides sample code. This topic also compares the three modes.

Prerequisites

Before you use SDK for Java to send normal messages, make sure that the following operations are completed:
  • SDK for Java is downloaded. For more information about release notes of SDK for Java, see Release notes.
  • The environment is prepared. For more information, see Prepare the environment.
  • Optional:The log settings are configured. For more information, see Log configuration.

Synchronous transmission

  • How it works
    In synchronous transmission mode, the sender sends another message only after it receives a response from the Message Queue for Apache RocketMQ broker for the previous message. sync
  • Scenarios

    This mode is applicable to various scenarios, such as important notification emails, short message service (SMS) notifications for registration results, and SMS marketing systems.

  • Sample code
    import com.aliyun.openservices.ons.api.Message;
        import com.aliyun.openservices.ons.api.Producer;
        import com.aliyun.openservices.ons.api.SendResult;
        import com.aliyun.openservices.ons.api.ONSFactory;
        import com.aliyun.openservices.ons.api.PropertyKeyConst;
    
        import java.util.Properties;
    
        public class ProducerTest {
            public static void main(String[] args) {
                Properties properties = new Properties();
                // The AccessKey ID that you created in the Alibaba Cloud Management Console for identity authentication. 
                properties.put(PropertyKeyConst.AccessKey,"XXX");
                // The AccessKey secret that you created in the Alibaba Cloud Management Console for identity authentication. 
                properties.put(PropertyKeyConst.SecretKey, "XXX");
                // The timeout period for sending a message. Unit: milliseconds. 
                properties.setProperty(PropertyKeyConst.SendMsgTimeoutMillis, "3000");
                // The TCP endpoint. To obtain the TCP endpoint, log on to the Message Queue for Apache RocketMQ console. In the left-side navigation pane, click Instances. On the Instances page, click the name of your instance. On the Instance Details page, scroll to the Basic Information section and view the TCP endpoint on the Endpoints tab. 
                properties.put(PropertyKeyConst.NAMESRV_ADDR, "XXX");
                Producer producer = ONSFactory.createProducer(properties);
                // Before you send a message, call the start() method only once to start the producer. 
                producer.start();
    
                // Cyclically send messages. 
                for (int i = 0; i < 100; i++){
                    Message msg = new Message( 
                        // The topic of the normal messages. The topic that is used to send and subscribe to normal messages cannot be used to send and subscribe to other types of messages. 
                        "TopicTestMQ",
                        // The message tag, which is similar to a Gmail tag. The message tag is used to sort messages and filter messages for the consumer on the Message Queue for Apache RocketMQ broker based on specified conditions. 
                        "TagA",
                        // The message body in the binary format. Message Queue for Apache RocketMQ does not process the message body. 
                        // The producer and consumer must agree on the serialization and deserialization methods. 
                        "Hello MQ".getBytes());
                    // The key of the message. The key is the business-specific attribute of the message and must be globally unique whenever possible. 
                    // A unique key helps you query and resend a message in the Message Queue for Apache RocketMQ console if the message fails to be received. 
                    // Note: Messages can be sent and received even if you do not specify the message key. 
                    msg.setKey("ORDERID_" + i);
    
                    try {
                        SendResult sendResult = producer.send(msg);
                        // Send the message in synchronous mode. If no error occurs, the message is sent. 
                        if (sendResult != null) {
                            System.out.println(new Date() + " Send mq message success. Topic is:" + msg.getTopic() + " msgId is: " + sendResult.getMessageId());
                        }
                    }
                    catch (Exception e) {
                        // Specify the logic to resend or persist the message if the message fails to be sent and needs to be resent. 
                        System.out.println(new Date() + " Send mq message failed. Topic is:" + msg.getTopic());
                        e.printStackTrace();
                    }
                }
    
                // Before you exit the application, shut down the producer object. 
                // Note: You can choose not to shut down the producer object. 
                producer.shutdown();
            }
        }               

Asynchronous transmission

  • How it works

    In asynchronous transmission mode, the sender sends another message after it sends a message, without waiting for a response from the Message Queue for Apache RocketMQ broker. The asynchronous transmission mode of Message Queue for Apache RocketMQ requires implementation of the SendCallback method. The sender sends another message immediately after it sends a message, without waiting for a response from the Message Queue for Apache RocketMQ broker. The sender calls the SendCallback method to receive responses from the Message Queue for Apache RocketMQ broker and processes the responses.

    async
  • Scenarios

    This mode is used for time-consuming processes in business scenarios that are sensitive to the response time. For example, after you upload a video, a callback is fired to enable transcoding. After the video is transcoded, a callback is fired to push transcoding results.

  • Sample code
    import com.aliyun.openservices.ons.api.Message;
        import com.aliyun.openservices.ons.api.OnExceptionContext;
        import com.aliyun.openservices.ons.api.Producer;
        import com.aliyun.openservices.ons.api.SendCallback;
        import com.aliyun.openservices.ons.api.SendResult;
        import com.aliyun.openservices.ons.api.ONSFactory;
        import com.aliyun.openservices.ons.api.PropertyKeyConst;
    
        import java.util.Properties;
    
        public class ProducerTest {
            public static void main(String[] args) {
                Properties properties = new Properties();
                // The AccessKey ID that you created in the Alibaba Cloud Management Console for identity authentication. 
                properties.put(PropertyKeyConst.AccessKey, "XXX");
                // The AccessKey secret that you created in the Alibaba Cloud Management Console for identity authentication. 
                properties.put(PropertyKeyConst.SecretKey, "XXX");
                // The timeout period for sending a message. Unit: milliseconds. 
                properties.setProperty(PropertyKeyConst.SendMsgTimeoutMillis, "3000");
                // The TCP endpoint. To obtain the TCP endpoint, log on to the Message Queue for Apache RocketMQ console. In the left-side navigation pane, click Instances. On the Instances page, click the name of your instance. On the Instance Details page, scroll to the Basic Information section and view the TCP endpoint on the Endpoints tab. 
                properties.put(PropertyKeyConst.NAMESRV_ADDR, "XXX");
    
                Producer producer = ONSFactory.createProducer(properties);
                // Before you send a message, call the start() method only once to start the producer. 
                producer.start();
    
                Message msg = new Message(
                        // The topic of the normal messages. The topic that is used to send and subscribe to normal messages cannot be used to send and subscribe to other types of messages. 
                        "TopicTestMQ",
                        // The message tag, which is similar to a Gmail tag. The message tag is used to sort messages and filter messages for the consumer on the Message Queue for Apache RocketMQ broker based on specified conditions. 
                        "TagA",
                        // The message body in the binary format. Message Queue for Apache RocketMQ does not process the message body. The producer and consumer must agree on the serialization and deserialization methods. 
                        "Hello MQ".getBytes());
    
                // The key of the message. The key is the business-specific attribute of the message and must be globally unique whenever possible.  // A unique key helps you query and resend a message in the Message Queue for Apache RocketMQ console if the message fails to be received. 
                // Note: Messages can be sent and received even if you do not specify the message key. 
                msg.setKey("ORDERID_100");
    
                // Send the message in asynchronous mode. The result is returned to the producer after the producer calls the SendCallback method. 
                producer.sendAsync(msg, new SendCallback() {
                    @Override
                    public void onSuccess(final SendResult sendResult) {
                        // The message is sent. 
                        System.out.println("send message success. topic=" + sendResult.getTopic() + ", msgId=" + sendResult.getMessageId());
                    }
    
                    @Override
                    public void onException(OnExceptionContext context) {
                        // Specify the logic to resend or persist the message if the message fails to be sent and needs to be resent. 
                        System.out.println("send message failed. topic=" + context.getTopic() + ", msgId=" + context.getMessageId());
                    }
                });
    
                // Obtain the value of the msgId parameter before the SendCallback method returns the result. 
                System.out.println("send message async. topic=" + msg.getTopic() + ", msgId=" + msg.getMsgID());
    
                // Before you exit the application, shut down the producer object. Note: You can choose not to shut down the producer object. 
                producer.shutdown();
            }
        }                

One-way transmission

  • How it works

    In one-way transmission mode, the producer only sends messages. This means that the producer sends messages without waiting for responses from the Message Queue for Apache RocketMQ broker or triggering the SendCallback method. In this mode, a message can be sent within microseconds.

    oneway
  • Scenarios

    This mode is applicable to scenarios where message transmission takes a short time and has no demanding reliability requirements. For example, this mode can be used for log collection.

  • Sample code
    import com.aliyun.openservices.ons.api.Message;
        import com.aliyun.openservices.ons.api.Producer;
        import com.aliyun.openservices.ons.api.ONSFactory;
        import com.aliyun.openservices.ons.api.PropertyKeyConst;
    
        import java.util.Properties;
    
        public class ProducerTest {
            public static void main(String[] args) {
                Properties properties = new Properties();
                // The AccessKey ID that you created in the Alibaba Cloud Management Console for identity authentication. 
                properties.put(PropertyKeyConst.AccessKey, "XXX");
                // The AccessKey secret that you created in the Alibaba Cloud Management Console for identity authentication. 
                properties.put(PropertyKeyConst.SecretKey, "XXX");
                // The timeout period for sending a message. Unit: milliseconds. 
                properties.setProperty(PropertyKeyConst.SendMsgTimeoutMillis, "3000");
                // The TCP endpoint. To obtain the TCP endpoint, log on to the Message Queue for Apache RocketMQ console. In the left-side navigation pane, click Instances. On the Instances page, click the name of your instance. On the Instance Details page, scroll to the Basic Information section and view the TCP endpoint on the Endpoints tab. 
                properties.put(PropertyKeyConst.NAMESRV_ADDR,
                  "XXX");
    
                Producer producer = ONSFactory.createProducer(properties);
                // Before you send a message, call the start() method only once to start the producer. 
                producer.start();
                // Cyclically send messages. 
                for (int i = 0; i < 100; i++){
                    Message msg = new Message(
                            // The topic of the normal messages. The topic that is used to send and subscribe to normal messages cannot be used to send and subscribe to other types of messages. 
                            "TopicTestMQ",
                            // Message Tag,
                            // The message tag, which is similar to a Gmail tag. The message tag is used to sort messages and filter messages for the consumer on the Message Queue for Apache RocketMQ broker based on specified conditions. 
                            "TagA",
                            // Message Body
                            // The message body in the binary format. Message Queue for Apache RocketMQ does not process the message body. The producer and consumer must agree on the serialization and deserialization methods. 
                            "Hello MQ".getBytes());
    
                    // The key of the message. The key is the business-specific attribute of the message and must be globally unique whenever possible. 
                    // A unique key helps you query and resend a message in the Message Queue for Apache RocketMQ console if the message fails to be received. 
                    // Note: Messages can be sent and received even if you do not specify the message key. 
                    msg.setKey("ORDERID_" + i);
    
                    // In one-way transmission mode, the producer does not wait for responses from the Message Queue for Apache RocketMQ broker. Therefore, data loss occurs if messages that fail to be sent are not resent. If data loss is not acceptable, we recommend that you use the reliable synchronous or asynchronous transmission mode. 
                    producer.sendOneway(msg);
                }
    
                // Before you exit the application, shut down the producer object. 
                // Note: You can choose not to shut down the producer object. 
                producer.shutdown();
            }
        }

Comparison of the three transmission modes

The following table summarizes the features of and major differences among the three modes.

Transmission mode Transactions per second (TPS) for message sending Response Reliability
Synchronous transmission High Supported No message loss
Asynchronous transmission High Supported No message loss
One-way transmission Highest Not supported Possible message loss

Subscribe to normal messages

For more information about the sample code for subscribing to normal messages, see Subscribe to messages.