All Products
Search
Document Center

Send normal messages (in three modes)

Last Updated: Mar 29, 2019

RocketMQ can send normal messages in three modes: reliable synchronous mode, reliable asynchronous mode, and one-way mode. This topic describes the working principles, scenarios, and differences of the three modes, and provides sample code for reference.

Note: Ordered messages can only be sent in reliable synchronous mode.

  • Reliable synchronous transmission

Principle: In synchronous mode, the sender waits for the response from the receiver to the last data message before sending the next data message.

syncsendmsg

Scenario: This mode is applicable to extensive scenarios, such as email notification delivery, registration SMS notification delivery, and marketing SMS delivery.

  • Reliable asynchronous transmission

Principle: In asynchronous mode, the sender sends the next data message without waiting for the receiver’s response to the last data message. RocketMQ needs to call the SendCallback operation to implement asynchronous transmission. The sender sends the second message immediately after the first one, without waiting for the response from the broker. The sender calls the SendCallback operation to receive responses from the broker and processes the responses.

async

Scenario: This mode is used on time-consuming links for business scenarios that are sensitive to response time (RT). For example, after a user uploads a video, a notification is sent to enable the encoding service. After encoding is complete, a notification is sent to return the encoding result.

  • One-way transmission

Principle: In one-way transmission mode, the sender only sends messages and does not wait for broker responses or call any callback function. This mode consumes the least time and completes transmission within microseconds.

oneway

Scenario: This mode is applicable to least time-consuming scenarios that pose low reliability requirements, for example, log collection.

The following table summarizes the features of the three transmission modes and the differences between them.

Transmission mode TPS Response Reliability
Synchronous transmission High Yes No message loss
Asynchronous transmission High Yes No message loss
One-way transmission Highest None Possible message loss

Sample code

Synchronous transmission

  1. import com.aliyun.openservices.ons.api.Message;
  2. import com.aliyun.openservices.ons.api.Producer;
  3. import com.aliyun.openservices.ons.api.SendResult;
  4. import com.aliyun.openservices.ons.api.ONSFactory;
  5. import com.aliyun.openservices.ons.api.PropertyKeyConst;
  6. import java.util.Properties;
  7. public class ProducerTest {
  8. public static void main(String[] args) {
  9. Properties properties = new Properties();
  10. // The AccessKeyId you created in the Alibaba Cloud console for identity authentication.
  11. properties.put(PropertyKeyConst.AccessKey,"XXX");
  12. // The AccessKeySecret you created in the Alibaba Cloud console for identity authentication.
  13. properties.put(PropertyKeyConst.SecretKey, "XXX");
  14. // Set the message transmission timeout period (in milliseconds).
  15. properties.setProperty(PropertyKeyConst.SendMsgTimeoutMillis, "3000");
  16. // Set the TCP endpoint: Go to the **Instances** page in the RocketMQ console, and view the endpoint in the **Endpoint Information** area.
  17. properties.put(PropertyKeyConst.NAMESRV_ADDR,
  18. "XXX ");
  19. Producer producer = ONSFactory.createProducer(properties);
  20. //Before sending a message, call the start() method once to start the producer.
  21. producer.start();
  22. // Send messages cyclically.
  23. for (int i = 0; i < 100; i++){
  24. Message msg = new Message( //
  25. // The topic of the message.
  26. "TopicTestMQ",
  27. // The message tag, which is similar to a Gmail tag. It is used to sort messages, enabling the consumer to filter messages on the RocketMQ broker based on the specified criteria.
  28. "TagA",
  29. // The message body in any binary format. RocketMQ does not process the message body.
  30. // The producer and consumer must negotiate the consistent serialization and deserialization methods.
  31. "Hello MQ".getBytes());
  32. // Set a key service property representing the message, that is, the message key, and try to keep it globally unique.
  33. // A unique identifier enables you to query a message and resend it in the Alibaba Cloud console if you fail to receive the message.
  34. // Note: Messages can still be sent and received if you do not set this attribute.
  35. msg.setKey("ORDERID_" + i);
  36. try {
  37. SendResult sendResult = producer.send(msg);
  38. // Send the message in synchronous mode. The message is sent if no exception is thrown.
  39. if (sendResult ! = null) {
  40. System.out.println(new Date() + " Send mq message success. Topic is:" + msg.getTopic() + " msgId is: " + sendResult.getMessageId());
  41. }
  42. }
  43. catch (Exception e) {
  44. // The message failed to be sent and requires a retry. The system can resend the message or store message data persistently.
  45. System.out.println(new Date() + " Send mq message failed. Topic is:" + msg.getTopic());
  46. e.printStackTrace();
  47. }
  48. }
  49. // Destroy the producer object before exiting from the application.
  50. // Note: You can choose not to destroy the producer object.
  51. producer.shutdown();
  52. }
  53. }

Asynchronous transmission

  1. import com.aliyun.openservices.ons.api.Message;
  2. import com.aliyun.openservices.ons.api.OnExceptionContext;
  3. import com.aliyun.openservices.ons.api.Producer;
  4. import com.aliyun.openservices.ons.api.SendCallback;
  5. import com.aliyun.openservices.ons.api.SendResult;
  6. import com.aliyun.openservices.ons.api.ONSFactory;
  7. import com.aliyun.openservices.ons.api.PropertyKeyConst;
  8. import java.util.Properties;
  9. public static void main(String[] args) {
  10. Properties properties = new Properties();
  11. // The AccessKeyId you created in the Alibaba Cloud console for identity authentication.
  12. properties.put(PropertyKeyConst.AccessKey, "XXX");
  13. // The AccessKeySecret you created in the Alibaba Cloud console for identity authentication.
  14. properties.put(PropertyKeyConst.SecretKey, "XXX");
  15. // Set the message transmission timeout period (in milliseconds).
  16. properties.setProperty(PropertyKeyConst.SendMsgTimeoutMillis, "3000");
  17. // Set the TCP endpoint: Go to the **Instances** page in the RocketMQ console, and view the endpoint in the **Endpoint Information** area.
  18. properties.put(PropertyKeyConst.NAMESRV_ADDR,
  19. "XXX ");
  20. Producer producer = ONSFactory.createProducer(properties);
  21. //Before sending a message, call the start() method once to start the producer.
  22. producer.start();
  23. Message msg = new Message(
  24. // The topic of the message.
  25. "TopicTestMQ",
  26. // The message tag, which is similar to a Gmail tag. It is used to sort messages, enabling the consumer to filter messages on the RocketMQ broker based on the specified criteria.
  27. "TagA",
  28. // The message body in any binary format. RocketMQ does not process the message body. The producer and consumer must negotiate the consistent serialization and deserialization methods.
  29. "Hello MQ".getBytes());
  30. // Set a key service property representing the message, that is, the message key, and try to keep it globally unique. A unique identifier enables you to query a message and resend it in the console if you fail to receive the message.
  31. // Note: Messages can still be sent and received if you do not set this attribute.
  32. msg.setKey("ORDERID_100");
  33. // Send the message in asynchronous mode. The result is returned to the client through the callback function.
  34. producer.sendAsync(msg, new SendCallback() {
  35. @Override
  36. public void onSuccess(final SendResult sendResult) {
  37. // The message is sent to the consumer.
  38. System.out.println("send message success. topic=" + sendResult.getTopic() + ", msgId=" + sendResult.getMessageId());
  39. }
  40. @Override
  41. public void onException(OnExceptionContext context) {
  42. // The message failed to be sent and requires a retry. The system can resend the message or store message data persistently.
  43. System.out.println("send message failed. topic=" + context.getTopic() + ", msgId=" + context.getMessageId());
  44. }
  45. });
  46. // The message ID can be obtained before the callback function returns the result.
  47. System.out.println("send message async. topic=" + msg.getTopic() + ", msgId=" + msg.getMsgID());
  48. // Destroy the producer object before exiting from the application. Note: You can choose not to destroy the producer object.
  49. producer.shutdown();
  50. }

One-way transmission

  1. import com.aliyun.openservices.ons.api.Message;
  2. import com.aliyun.openservices.ons.api.Producer;
  3. import com.aliyun.openservices.ons.api.ONSFactory;
  4. import com.aliyun.openservices.ons.api.PropertyKeyConst;
  5. import java.util.Properties;
  6. public static void main(String[] args) {
  7. Properties properties = new Properties();
  8. // The AccessKeyId you created in the Alibaba Cloud console for identity authentication.
  9. properties.put(PropertyKeyConst.AccessKey, "XXX");
  10. // The AccessKeySecret you created in the Alibaba Cloud console for identity authentication.
  11. properties.put(PropertyKeyConst.SecretKey, "XXX");
  12. // Set the message transmission timeout period (in milliseconds).
  13. properties.setProperty(PropertyKeyConst.SendMsgTimeoutMillis, "3000");
  14. // Set the TCP endpoint: Go to the **Instances** page in the RocketMQ console, and view the endpoint in the **Endpoint Information** area.
  15. properties.put(PropertyKeyConst.NAMESRV_ADDR,
  16. "XXX ");
  17. Producer producer = ONSFactory.createProducer(properties);
  18. //Before sending a message, call the start() method once to start the producer.
  19. producer.start();
  20. // Send messages cyclically.
  21. for (int i = 0; i < 100; i++){
  22. Message msg = new Message(
  23. // The topic of the message.
  24. "TopicTestMQ",
  25. // The message tag,
  26. // It is similar to a Gmail tag. It is used to sort messages, enabling the consumer to filter messages on the RocketMQ broker based on the specified criteria.
  27. "TagA",
  28. // The message body
  29. // It is in any binary format. RocketMQ does not process the message body. The producer and consumer must negotiate the consistent serialization and deserialization methods.
  30. "Hello MQ".getBytes());
  31. // Set a key service property representing the message, that is, the message key, and try to keep it globally unique.
  32. // A unique identifier enables you to query a message and resend it in the Alibaba Cloud console if you fail to receive the message.
  33. // Note: Messages can still be sent and received if you do not set this attribute.
  34. msg.setKey("ORDERID_" + i);
  35. // In one-way transmission mode, the sender does not wait for the response from the broker. Therefore, if messages that fail to be delivered are not retransmitted, data is lost. If data loss is not acceptable, we recommend that you use the reliable synchronous or asynchronous transmission mode.
  36. producer.sendOneway(msg);
  37. }
  38. // Destroy the producer object before exiting from the application.
  39. // Note: You can choose not to destroy the producer object.
  40. producer.shutdown();
  41. }