edit-icon download-icon

Send and receive transactional messages

Last Updated: Oct 09, 2018

Interaction Process

The interaction process of MQ transactional messages is shown below:

MQ Transactional Message Interaction Process

Send Transactional Messages

Sending a transactional message consists of the following two steps:

  1. Send a half message and execute local transaction. The sample code is as follows:

    1. package com.alibaba.webx.TryHsf.app1;
    2. import com.aliyun.openservices.ons.api.Message;
    3. import com.aliyun.openservices.ons.api.PropertyKeyConst;
    4. import com.aliyun.openservices.ons.api.SendResult;
    5. import com.aliyun.openservices.ons.api.transaction.LocalTransactionExecuter;
    6. import com.aliyun.openservices.ons.api.transaction.TransactionProducer;
    7. import com.aliyun.openservices.ons.api.transaction.TransactionStatus;
    8. import java.util.Properties;
    9. import java.util.concurrent.TimeUnit;
    10. public class TransactionProducerClient {
    11. private final static Logger log = ClientLogger.getLog(); // Set your own log to facilitate troubleshooting.
    12. public static void main(String[] args) throws InterruptedException {
    13. final BusinessService businessService = new BusinessService(); // Local service
    14. Properties properties = new Properties();
    15. // The producer ID you created on the console. Note: Producer IDs of transactional messages can not be shared with producer IDs of other types of messages.
    16. properties.put(PropertyKeyConst.ProducerId, "");
    17. // Alibaba Cloud ID verification, which is created on Alibaba Cloud Management Console
    18. properties.put(PropertyKeyConst.AccessKey, "");
    19. // Alibaba Cloud ID verification, which is created on Alibaba Cloud Management Console
    20. properties.put(PropertyKeyConst.SecretKey, "");
    21. // Set a TCP access domain name (the following uses public cloud production environment as an example)
    22. properties.put(PropertyKeyConst.ONSAddr,
    23. "http://onsaddr-internal.aliyun.com:8080/rocketmq/nsaddr4client-internal");
    24. TransactionProducer producer = ONSFactory.createTransactionProducer(properties,
    25. new LocalTransactionCheckerImpl());
    26. producer.start();
    27. Message msg = new Message("Topic", "TagA", "Hello MQ transaction===".getBytes());
    28. try {
    29. SendResult sendResult = producer.send(msg, new LocalTransactionExecuter() {
    30. @Override
    31. public TransactionStatus execute(Message msg, Object arg) {
    32. // Message ID (It is possible that the message bodies are the same, while the message IDs are different. The current message ID cannot be queried on the console).
    33. String msgId = msg.getMsgID();
    34. // Perform crc32 or MD5 on the message body content.
    35. long crc32Id = HashUtil.crc32Code(msg.getBody());
    36. // Message ID and crc32id are mainly used to prevent duplicated messages.
    37. // If the service itself is idempotent, it can be ignored. Otherwise idempotence has to be implemented through msgId or crc32Id.
    38. // If it is required that there are no duplicated messages, it is recommended to prevent duplicated messages through performing crc32 or md5 on the message bodies.
    39. Object businessServiceArgs = new Object();
    40. TransactionStatus transactionStatus = TransactionStatus.Unknow;
    41. try {
    42. boolean isCommit =
    43. businessService.execbusinessService(businessServiceArgs);
    44. if (isCommit) {
    45. // If the local transaction succeeds, submit the message.
    46. transactionStatus = TransactionStatus.CommitTransaction;
    47. } else {
    48. // If the local transaction fails, rollback the message.
    49. transactionStatus = TransactionStatus.RollbackTransaction;
    50. }
    51. } catch (Exception e) {
    52. log.error("Message Id:{}", msgId, e);
    53. }
    54. System.out.println(msg.getMsgID());
    55. log.warn("Message Id:{}transactionStatus:{}", msgId, transactionStatus.name());
    56. return transactionStatus;
    57. }
    58. }, null);
    59. }
    60. catch (Exception e) {
    61. // If the message sending fails and a retry is needed, the message can be re-sent or persisted for compensated processing.
    62. System.out.println(new Date() + " Send mq message failed. Topic is:" + msg.getTopic());
    63. e.printStackTrace();
    64. }
    65. // Demo example, which prevents process quit (not necessary in acutal use)
    66. TimeUnit.MILLISECONDS.sleep(Integer.MAX_VALUE);
    67. }
    68. }
  2. Submit transactional message status

    When the local transaction execution is completed (succeeded or failed), it is required to notify the server of the transaction status of the current message. There are two notification methods:

    • Submit when the local transaction execution is completed;
    • Do not submit the status when the local transaction execution is completed, and wait for the server to check the transaction status of the message.

    There are 3 statuses for transactions:

    • TransactionStatus.CommitTransaction: Submit a transaction to allow the consumer to consume the message.
    • TransactionStatus.RollbackTransaction: Rolls back a transaction, a message will be discarded and not be allowed to be consumed.
    • TransactionStatus.Unknown: The status is unknown, and the MQ broker is expected to check with the sender for the local transaction status.
  1. public class LocalTransactionCheckerImpl implements LocalTransactionChecker {
  2. private final static Logger log = ClientLogger.getLog();
  3. final BusinessService businessService = new BusinessService();
  4. @Override
  5. public TransactionStatus check(Message msg) {
  6. // Message ID (It is possible that the message bodies are the same, while the message IDs are different. The current message ID cannot be queried on the console).
  7. String msgId = msg.getMsgID();
  8. // Perform crc32 or MD5 on the message body content.
  9. long crc32Id = HashUtil.crc32Code(msg.getBody());
  10. // Message ID and crc32id are mainly used to prevent duplicated messages.
  11. // If the service itself is idempotent, it can be ignored. Otherwise idempotence has to be implemented through msgId or crc32Id.
  12. // If it is required that there are no duplicated messages, it is recommended to prevent duplicated messages through performing crc32 or md5 on the message bodies.
  13. // A parameter object of the service, which is an example here, and you should handle it as needed
  14. Object businessServiceArgs = new Object();
  15. TransactionStatus transactionStatus = TransactionStatus.Unknow;
  16. try {
  17. boolean isCommit = businessService.checkbusinessService(businessServiceArgs);
  18. if (isCommit) {
  19. // If the local transaction succeeds, submit the message.
  20. transactionStatus = TransactionStatus.CommitTransaction;
  21. } else {
  22. // If the local transaction fails, roll back the message.
  23. transactionStatus = TransactionStatus.RollbackTransaction;
  24. }
  25. } catch (Exception e) {
  26. log.error("Message Id:{}", msgId, e);
  27. }
  28. log.warn("Message Id:{}transactionStatus:{}", msgId, transactionStatus.name());
  29. return transactionStatus;
  30. }
  31. }

Tool Class

  1. import java.util.zip.CRC32;
  2. public class HashUtil {
  3. public static long crc32Code(byte[] bytes) {
  4. CRC32 crc32 = new CRC32();
  5. crc32.update(bytes);
  6. return crc32.getValue();
  7. }
  8. }

Transaction Message Status Check Mechanism

  • Why message status check mechanism must be implemented when sending transactional messages?

    When half message is sent in step 1, but the returned status of local transaction is TransactionStatus.Unknown, or no status is submitted due to the application quitting, from the perspective of MQ Broker, the status of the half message is unknown. The MQ broker then requires the sender to check the status of the half message regularly, and report the final status.

  • When the check has been called back, what does the service logic need to do?

    In the check method of MQ transactional messages, logic for checking the transaction consistency should be added. LocalTransactionChecker has to be implemented for MQ to process the status check requests actively initiated by MQ Broker. Therefore, two steps need to be completed in the check method of transactional messages:

    (1) Check the status of the local transaction related to the half message (committed or rollback);

    (2) Submit the status of the local transaction to the MQ broker.

Subscribe to Transactional Messages

The subscription of transactional messages is the same as that of normal messages, see Subscribe to messages.

Thank you! We've received your feedback.