All Products
Search
Document Center

Send and receive transactional messages

Last Updated: Mar 29, 2019

Interaction process

The following figure shows the transactional message interaction among RocketMQ components.

trans

Send transactional messages

Follow these steps to send a transactional message:

  1. Send a half message and execute a local transaction. The sample code is as follows:
  1. package com.alibaba.webx.TryHsf.app1;
  2. import com.aliyun.openservices.ons.api.Message;
  3. import com.aliyun.openservices.ons.api.PropertyKeyConst;
  4. import com.aliyun.openservices.ons.api.SendResult;
  5. import com.aliyun.openservices.ons.api.transaction.LocalTransactionExecuter;
  6. import com.aliyun.openservices.ons.api.transaction.TransactionProducer;
  7. import com.aliyun.openservices.ons.api.transaction.TransactionStatus;
  8. import java.util.Properties;
  9. import java.util.concurrent.TimeUnit;
  10. public class TransactionProducerClient {
  11. private final static Logger log = ClientLogger.getLog(); // Set your own logger to facilitate troubleshooting.
  12. public static void main(String[] args) throws InterruptedException {
  13. final BusinessService businessService = new BusinessService(); // Local business
  14. Properties properties = new Properties();
  15. // The group ID you created in the console. Note: Transactional messages cannot share the group ID with other types of messages.
  16. properties.put(PropertyKeyConst.GROUP_ID, "XXX");
  17. // The AccessKeyId you created in the Alibaba Cloud console for identity authentication.
  18. properties.put(PropertyKeyConst.AccessKey, "XXX");
  19. // The AccessKeySecret you created in the Alibaba Cloud console for identity authentication.
  20. properties.put(PropertyKeyConst.SecretKey, "XXX");
  21. // Set the TCP endpoint: Go to the **Instances** page in the RocketMQ console, and view the endpoint in the **Endpoint Information** area.
  22. properties.put(PropertyKeyConst.NAMESRV_ADDR,
  23. "XXX ");
  24. TransactionProducer producer = ONSFactory.createTransactionProducer(properties,
  25. new LocalTransactionCheckerImpl());
  26. producer.start();
  27. Message msg = new Message("Topic", "TagA", "Hello MQ transaction===".getBytes());
  28. try {
  29. SendResult sendResult = producer.send(msg, new LocalTransactionExecuter() {
  30. @Override
  31. public TransactionStatus execute(Message msg, Object arg) {
  32. // The message ID. Two messages may have the same message body but different message IDs. The current message ID cannot be obtained in the console.
  33. String msgId = msg.getMsgID();
  34. // Compute the message body by using CRC32, MD5, or other algorithms.
  35. long crc32Id = HashUtil.crc32Code(msg.getBody());
  36. // The message ID and CRC32 ID are used to prevent duplication of messages.
  37. // You do not need to specify the message ID or CRC32 ID if the business itself achieves idempotence. Otherwise, set the message ID or CRC32 ID to ensure idempotence.
  38. // To avoid duplication of messages, compute the message body by using the CRC32 or MD5 algorithm.
  39. Object businessServiceArgs = new Object();
  40. TransactionStatus transactionStatus = TransactionStatus.Unknow;
  41. try {
  42. boolean isCommit =
  43. businessService.execbusinessService(businessServiceArgs);
  44. if (isCommit) {
  45. // Submit the message if the local transaction succeeds.
  46. transactionStatus = TransactionStatus.CommitTransaction;
  47. } else {
  48. // Roll back the message if the local transaction fails.
  49. transactionStatus = TransactionStatus.RollbackTransaction;
  50. }
  51. } catch (Exception e) {
  52. log.error("Message Id:{}", msgId, e);
  53. }
  54. System.out.println(msg.getMsgID());
  55. log.warn("Message Id:{}transactionStatus:{}", msgId, transactionStatus.name());
  56. return transactionStatus;
  57. }
  58. }, null);
  59. }
  60. catch (Exception e) {
  61. // The message failed to be sent and requires a retry. The system can resend the message or store message data persistently.
  62. System.out.println(new Date() + " Send mq message failed. Topic is:" + msg.getTopic());
  63. e.printStackTrace();
  64. }
  65. // The demo example to prevent the process from exiting (not necessary in practical use)
  66. TimeUnit.MILLISECONDS.sleep(Integer.MAX_VALUE);
  67. }
  68. }
  1. Submit the transactional message status.

After the execution of a local transaction (successful or failed), the broker must be notified of the transaction status of the current message. Two notification modes are supported:

  • Submit the status after executing the local transaction.

  • Wait until the broker requests to check the transaction status of the message.

A transaction may be in one of the following states:

  • TransactionStatus.CommitTransaction: The transaction is submitted, and the consumer can consume the message.

  • TransactionStatus.RollbackTransaction: The transaction is rolled back, and the message is discarded and cannot be consumed.

  • TransactionStatus.Unknown: The transaction status is unknown, and the sender is waiting for the RocketMQ broker to query the transaction status of the message.

  1. public class LocalTransactionCheckerImpl implements LocalTransactionChecker {
  2. private final static Logger log = ClientLogger.getLog();
  3. final BusinessService businessService = new BusinessService();
  4. @Override
  5. public TransactionStatus check(Message msg) {
  6. // The message ID (Two messages may have the same message body but different message IDs. The current message is a half message, and therefore its message ID cannot be obtained in the console.)
  7. String msgId = msg.getMsgID();
  8. // Compute the message body by using CRC32, MD5, or other algorithms.
  9. long crc32Id = HashUtil.crc32Code(msg.getBody());
  10. // The message ID and CRC32 ID are used to prevent duplication of messages.
  11. // You do not need to specify the message ID or CRC32 ID if the business itself achieves idempotence. Otherwise, set the message ID or CRC32 ID to ensure idempotence.
  12. // To eliminate the duplication of messages, we recommend that you use CRC32 or MD5 to process the message body.
  13. // The parameter object of the business. This is an example. Set the object based on the actual situation of your business.
  14. Object businessServiceArgs = new Object();
  15. TransactionStatus transactionStatus = TransactionStatus.Unknow;
  16. try {
  17. boolean isCommit = businessService.checkbusinessService(businessServiceArgs);
  18. if (isCommit) {
  19. // Submit the message if the local transaction succeeds.
  20. transactionStatus = TransactionStatus.CommitTransaction;
  21. } else {
  22. // Roll back the message if the local transaction fails.
  23. transactionStatus = TransactionStatus.RollbackTransaction;
  24. }
  25. } catch (Exception e) {
  26. log.error("Message Id:{}", msgId, e);
  27. }
  28. log.warn("Message Id:{}transactionStatus:{}", msgId, transactionStatus.name());
  29. return transactionStatus;
  30. }
  31. }

Tool class

  1. import java.util.zip.CRC32;
  2. public class HashUtil {
  3. public static long crc32Code(byte[] bytes) {
  4. CRC32 crc32 = new CRC32();
  5. crc32.update(bytes);
  6. return crc32.getValue();
  7. }
  8. }

Transaction check mechanism

  • Why is the check mechanism required for transactional message delivery?

If the half message is sent in step 1 but TransactionStatus.Unknown is returned, or if no status is submitted because the application exits, the status of the half message is unknown to the broker. Therefore, the broker periodically requests the sender to check and report the status of the half message.

  • What does the business logic do when the check method is called back?

The check method for transactional messages needs to contain the logic of transaction consistency check. After a transactional message is sent, RocketMQ needs to use the LocalTransactionChecker operation to respond to the request of the broker for the local transaction status. Therefore, the check method for the transactional message needs to complete the following tasks:

(1) Check the status of the local transaction corresponding to the half message (committed or rollback).

(2) Submit the status of the local transaction to the broker.

Subscribe to transactional messages

The method for subscribing to transactional messages is the same as that for subscribing to normal messages. For more information, see Subscribe to messages.