Message Queue for Apache RocketMQ allows you to send messages in synchronous, asynchronous, or one-way transmission modes. This topic describes the principles and scenarios of the three transmission modes and provides sample code. This topic also compares the three modes.

Prerequisites

Before you use the SDK for Java to send normal messages, make sure that the following operations are complete:
  • TSDK for Java is downloaded. For information about the release notes for the SDK for Java, see Release notes.
  • An environment is set up. For more information, see Prepare the environment.
  • Optional:Logging settings are configured. For more information, see Logging settings.

Synchronous transmission

  • How synchronous transmission works
    In synchronous transmission mode, the sender sends another message only after it receives a response from the Message Queue for Apache RocketMQ broker for the previous message. sync
  • Scenarios

    This mode is applicable to various scenarios, such as important notification emails, test message notifications for registration results, and promotional message systems.

  • Sample code
    import com.aliyun.openservices.ons.api.Message;
        import com.aliyun.openservices.ons.api.Producer;
        import com.aliyun.openservices.ons.api.SendResult;
        import com.aliyun.openservices.ons.api.ONSFactory;
        import com.aliyun.openservices.ons.api.PropertyKeyConst;
    
        import java.util.Properties;
    
        public class ProducerTest {
            public static void main(String[] args) {
                Properties properties = new Properties();
                // The AccessKey ID that you created in the Alibaba Cloud Management Console for identity verification. 
                properties.put(PropertyKeyConst.AccessKey,"XXX");
                // The AccessKey secret that you created in the Alibaba Cloud Management Console for identity verification. 
                properties.put(PropertyKeyConst.SecretKey, "XXX");
                // The timeout period for sending a message. Unit: milliseconds. 
                properties.setProperty(PropertyKeyConst.SendMsgTimeoutMillis, "3000");
                // The TCP endpoint. To obtain the TCP endpoint, log on to the Message Queue for Apache RocketMQ console. In the left-side navigation pane, click Instances. On the Instances page, click the name of your instance. On the Instance Details page, scroll to the Basic Information section and view the TCP endpoint on the Endpoints tab. 
                properties.put(PropertyKeyConst.NAMESRV_ADDR, "XXX");
                Producer producer = ONSFactory.createProducer(properties);
                // Before you send a message, call the start() method only once to start the producer. 
                producer.start();
    
                // Cyclically send messages. 
                for (int i = 0; i < 100; i++){
                    Message msg = new Message( 
                        // The topic of the normal messages. The topic that is used to send and subscribe to normal messages cannot be used to send and subscribe to other types of messages. 
                        "TopicTestMQ",
                        // The message tag, which is similar to a Gmail tag. The message tag is used to sort messages and filter messages for the consumer on the Message Queue for Apache RocketMQ broker based on specified conditions. 
                        // For information about the format of the tag and how to specify the tag, see Best practices of topics and tags. 
                        "TagA",
                        // The message body in the binary format. Message Queue for Apache RocketMQ does not process the message body. 
                        // The producer and consumer must agree on the message serialization and deserialization methods. 
                        "Hello MQ".getBytes());
                    // The key of the message. The key is the business-specific attribute of the message and must be globally unique. 
                    // A unique key helps you query and resend a message in the Message Queue for Apache RocketMQ console if the message fails to be received. 
                    // Note: Messages can be sent and received even if you do not specify the message key. 
                    msg.setKey("ORDERID_" + i);
    
                    try {
                        SendResult sendResult = producer.send(msg);
                        // Send the message in synchronous mode. If no error occurs, the message is sent. 
                        if (sendResult != null) {
                            System.out.println(new Date() + " Send mq message success. Topic is:" + msg.getTopic() + " msgId is: " + sendResult.getMessageId());
                        }
                    }
                    catch (Exception e) {
                        // Specify the logic to resend or persist the message if the message fails to be sent and needs to be re-sent. 
                        System.out.println(new Date() + " Send mq message failed. Topic is:" + msg.getTopic());
                        e.printStackTrace();
                    }
                }
    
                // Before you exit the application, shut down the producer. 
                // Note: You do not need to shut down the producer. 
                producer.shutdown();
            }
        }               

Asynchronous transmission

  • How asynchronous transmission works

    In asynchronous transmission mode, the sender sends another message after it sends a message, without waiting for a response from the Message Queue for Apache RocketMQ broker. The asynchronous transmission mode of Message Queue for Apache RocketMQ requires implementation of the SendCallback API operation. The sender sends another message immediately after it sends a message, without waiting for a response from the Message Queue for Apache RocketMQ broker. The sender calls the SendCallback operation to receive a response from the Message Queue for Apache RocketMQ broker and processes the response.

    async
  • Scenarios

    This mode is used for time-consuming processes in business scenarios that are sensitive to the response time. For example, after you upload a video, a callback is used to enable transcoding. After the video is transcoded, a callback is used to push transcoding results.

  • Sample code
        import com.aliyun.openservices.ons.api.Message;
        import com.aliyun.openservices.ons.api.OnExceptionContext;
        import com.aliyun.openservices.ons.api.Producer;
        import com.aliyun.openservices.ons.api.SendCallback;
        import com.aliyun.openservices.ons.api.SendResult;
        import com.aliyun.openservices.ons.api.ONSFactory;
        import com.aliyun.openservices.ons.api.PropertyKeyConst;
    
        import java.util.Properties;
        import java.util.concurrent.TimeUnit;
    
        public class ProducerTest {
            public static void main(String[] args) throws InterruptedException {
                Properties properties = new Properties();
                // The AccessKey ID that you created in the Alibaba Cloud Management Console for identity verification. 
                properties.put(PropertyKeyConst.AccessKey, "XXX");
                // The AccessKey secret that you created in the Alibaba Cloud Management Console for identity verification. 
                properties.put(PropertyKeyConst.SecretKey, "XXX");
                // The timeout period for sending a message. Unit: milliseconds. 
                properties.setProperty(PropertyKeyConst.SendMsgTimeoutMillis, "3000");
                // The TCP endpoint. To obtain the TCP endpoint, log on to the Message Queue for Apache RocketMQ console. In the left-side navigation pane, click Instances. On the Instances page, click the name of your instance. On the Instance Details page, scroll to the Basic Information section and view the TCP endpoint on the Endpoints tab. 
                properties.put(PropertyKeyConst.NAMESRV_ADDR, "XXX");
    
                Producer producer = ONSFactory.createProducer(properties);
                // Before you send a message, call the start() method only once to start the producer. 
                producer.start();
    
                Message msg = new Message(
                        // The topic of the normal messages. The topic that is used to send and subscribe to normal messages cannot be used to send and subscribe to other types of messages. 
                        "TopicTestMQ",
                        // The message tag, which is similar to a Gmail tag. The message tag is used to sort messages and filter messages for the consumer on the Message Queue for Apache RocketMQ broker based on specified conditions. 
                        "TagA",
                        // The message body in the binary format. Message Queue for Apache RocketMQ does not process the message body. The producer and consumer must agree on the serialization and deserialization methods. 
                        "Hello MQ".getBytes());
    
                // The key of the message. The key is the business-specific attribute of the message and must be globally unique.  // A unique key helps you query and resend a message in the Message Queue for Apache RocketMQ console if the message fails to be received. 
                // Note: Messages can be sent and received even if you do not specify the message key. 
                msg.setKey("ORDERID_100");
    
                // Send the message in asynchronous mode. The result is returned to the producer after the producer calls the SendCallback operation. 
                producer.sendAsync(msg, new SendCallback() {
                    @Override
                    public void onSuccess(final SendResult sendResult) {
                        // The message is sent. 
                        System.out.println("send message success. topic=" + sendResult.getTopic() + ", msgId=" + sendResult.getMessageId());
                    }
    
                    @Override
                    public void onException(OnExceptionContext context) {
                        // Specify the logic to resend or persist the message if the message fails to be sent and needs to be re-sent. 
                        System.out.println("send message failed. topic=" + context.getTopic() + ", msgId=" + context.getMessageId());
                    }
                });
    
                // Block the current thread for 3 seconds and wait for the asynchronous result to return. 
                TimeUnit.SECONDS.sleep(3);
    
                // Before you exit the application, shut down the producer. Note: You can choose not to shut down the producer object. 
                producer.shutdown();
            }
        }                

One-way transmission

  • How one-way transmission works

    In one-way transmission mode, the producer only sends messages. This means that the producer sends messages without waiting for responses from the Message Queue for Apache RocketMQ broker or triggering the callback function. In this mode, a message can be sent within microseconds.

    oneway
  • Scenarios

    This mode is applicable to scenarios where message transmission takes a short time and has no demanding requirements for reliability. For example, this mode can be used for log collection.

  • Sample code
    import com.aliyun.openservices.ons.api.Message;
        import com.aliyun.openservices.ons.api.Producer;
        import com.aliyun.openservices.ons.api.ONSFactory;
        import com.aliyun.openservices.ons.api.PropertyKeyConst;
    
        import java.util.Properties;
    
        public class ProducerTest {
            public static void main(String[] args) {
                Properties properties = new Properties();
                // The AccessKey ID that you created in the Alibaba Cloud Management Console for identity verification. 
                properties.put(PropertyKeyConst.AccessKey, "XXX");
                // The AccessKey secret that you created in the Alibaba Cloud Management Console for identity verification. 
                properties.put(PropertyKeyConst.SecretKey, "XXX");
                // The timeout period for sending a message. Unit: milliseconds. 
                properties.setProperty(PropertyKeyConst.SendMsgTimeoutMillis, "3000");
                // The TCP endpoint. To obtain the TCP endpoint, log on to the Message Queue for Apache RocketMQ console. In the left-side navigation pane, click Instances. On the Instances page, click the name of your instance. On the Instance Details page, scroll to the Basic Information section and view the TCP endpoint on the Endpoints tab. 
                properties.put(PropertyKeyConst.NAMESRV_ADDR,
                  "XXX");
    
                Producer producer = ONSFactory.createProducer(properties);
                // Before you send a message, call the start() method only once to start the producer. 
                producer.start();
                // Cyclically send messages. 
                for (int i = 0; i < 100; i++){
                    Message msg = new Message(
                            // The topic of the normal messages. The topic that is used to send and subscribe to normal messages cannot be used to send and subscribe to other types of messages. 
                            "TopicTestMQ",
                            // Message Tag,
                            // The message tag, which is similar to a Gmail tag. The message tag is used to sort messages and filter messages for the consumer on the Message Queue for Apache RocketMQ broker based on specified conditions. 
                            "TagA",
                            // Message Body
                            // The message body in the binary format. Message Queue for Apache RocketMQ does not process the message body. The producer and consumer must agree on the serialization and deserialization methods. 
                            "Hello MQ".getBytes());
    
                    // The key of the message. The key is the business-specific attribute of the message and must be globally unique. 
                    // A unique key helps you query and resend a message in the Message Queue for Apache RocketMQ console if the message fails to be received. 
                    // Note: Messages can be sent and received even if you do not specify the message key. 
                    msg.setKey("ORDERID_" + i);
    
                    // In one-way transmission mode, the producer does not wait for responses from the Message Queue for Apache RocketMQ broker. Therefore, data loss occurs if messages that fail to be sent are not re-sent. If data loss is not acceptable, we recommend that you use the reliable synchronous or asynchronous transmission mode. 
                    producer.sendOneway(msg);
                }
    
                // Before you exit the application, shut down the producer. 
                // Note: You do not need to shut down the producer. 
                producer.shutdown();
            }
        }

Comparison of the three transmission modes

The following table summarizes the features of and major differences among the three modes.

Transmission mode TPS Response Reliability
Synchronous transmission High Supported No message loss
Asynchronous transmission High Supported No message loss
One-way transmission Highest None Possible message loss

Subscribe to normal messages

For more information about the sample code for subscribing to normal messages, see Subscribe to messages.