edit-icon download-icon

Send and receive transactional messages

Last Updated: Jun 20, 2018

The currently supported domains include Internet, East China 1, East China 2, North China 2, and South China 1.

Interaction Process

The interaction process of MQ transactional messages is shown in the figure below:

MQ Transactional Message Interaction Process

Send Transactional Messages

Sending a transactional message consists of the following two steps:

  1. Send a half message and execute local transaction. The sample code is as follows:

    1. using System;
    2. using System.Collections.Generic;
    3. using System.Linq;
    4. using System.Text;
    5. using System.Runtime.InteropServices;
    6. using ons;
    7. namespace ons
    8. {
    9. public class MyLocalTransactionExecuter : LocalTransactionExecuter
    10. {
    11. public MyLocalTransactionExecuter()
    12. {
    13. }
    14. ~MyLocalTransactionExecuter()
    15. {
    16. }
    17. public override TransactionStatus execute(Message value)
    18. {
    19. Console.WriteLine("execute Topic: {0}, Tag:{1}, key:{2}, msgId:{3},msgbody:{4}, userProperty:{5}",
    20. value.getTopic(), value.getTag(), value.getKey(), value.getMsgID(), value.getBody(), value.getUserProperty("VincentNoUser"));
    21. // Message ID (It is possible that the message bodies are the same, while the message IDs are different. The current message ID cannot be queried on the console).
    22. string msgId = value.getMsgID();
    23. // Perform crc32 or MD5 on the message body content.
    24. // Message ID and crc32id are mainly used to prevent duplicated messages
    25. // If it is required that there are no duplicated messages, it is recommended to prevent duplicated messages through performing crc32 or md5 on the message bodies.
    26. TransactionStatus transactionStatus = TransactionStatus.Unknow;
    27. try {
    28. boolean isCommit = execution result of local transaction;
    29. if (isCommit) {
    30. // If the local transaction succeeds, submit the message.
    31. transactionStatus = TransactionStatus.CommitTransaction;
    32. } else {
    33. // If the local transaction fails, rollback the message.
    34. transactionStatus = TransactionStatus.RollbackTransaction;
    35. }
    36. } catch (Exception e) {
    37. //exception handle
    38. }
    39. return transactionStatus
    40. }
    41. }
    42. class onscsharp
    43. {
    44. static void Main(string[] args)
    45. {
    46. ONSFactoryProperty factoryInfo = new ONSFactoryProperty();
    47. factoryInfo.setFactoryProperty(factoryInfo.ProducerId, "");//The producer ID you have created in the MQ console
    48. factoryInfo.setFactoryProperty(factoryInfo.PublishTopics, "");// The topic you have created in the MQ console
    49. factoryInfo.setFactoryProperty(factoryInfo.MsgContent, "");//message body
    50. factoryInfo.setFactoryProperty(factoryInfo.AccessKey, "");//AccessKey, Alibaba Cloud ID verification, which is created on Alibaba Cloud Management Console
    51. factoryInfo.setFactoryProperty(factoryInfo.SecretKey, "");//SecretKey, Alibaba Cloud ID verification, which is created on Alibaba Cloud Management Console
    52. //create transaction producer
    53. ONSFactory onsfactory = new ONSFactory();
    54. LocalTransactionChecker myChecker = new MyLocalTransactionChecker();
    55. TransactionProducer pProducer = onsfactory.getInstance().createTransactionProducer(factoryInfo,ref myChecker);
    56. // Before sending messages, a start method must be called once to start the producer, then you can use multi-threads to concurrently send messages.
    57. pProducer.start();
    58. Message msg = new Message(
    59. //Message Topic
    60. factoryInfo.getPublishTopics(),
    61. //Message Tag
    62. "TagA",
    63. //Message Body
    64. factoryInfo.getMessageContent()
    65. );
    66. // The setting represents the key business property of the message, so please keep it globally unique.
    67. // You can query a message and resend it through the MQ console if you cannot receive the message properly.
    68. // Note: Message sending and receiving is not affected if you do not configure this setting.
    69. msg.setKey("ORDERID_100");
    70. // If no exceptions are thrown, then the message is sent successfully.
    71. try
    72. {
    73. LocalTransactionExecuter myExecuter = new MyLocalTransactionExecuter();
    74. SendResultONS sendResult = pProducer.send(msg, ref myExecuter);
    75. }
    76. catch(ONSClientException e)
    77. {
    78. Console.WriteLine("\nexception of sendmsg:{0}",e.what() );
    79. }
    80. // The object Producer must be destroyed before exiting the application, otherwise there will be problems such as memory leakage.
    81. // You cannot restart the producer after shutdown.
    82. pProducer.shutdown();
    83. }
    84. }
    85. }
  2. Submit transactional message status

    When the local transaction execution is completed (succeeded or failed), it is required to notify the server of the transaction status of the current message. There are two notification methods:

    • Submit when the local transaction execution is completed;
    • Do not submit the status when the local transaction execution is completed, and wait for the server to check the transaction status of the message.

    There are 3 statuses for transactions:

    • TransactionStatus.CommitTransaction: Submit a transaction and allow the consumer to consume the message;
    • TransactionStatus.RollbackTransaction: Rollback a transaction, the message will be discarded and not allowed to be consumed;
    • TransactionStatus.Unknown: The status is unknown, and the MQ broker is expected to check with the sender for the local transaction status.
  1. public class MyLocalTransactionChecker : LocalTransactionChecker
  2. {
  3. public MyLocalTransactionChecker()
  4. {
  5. }
  6. ~MyLocalTransactionChecker()
  7. {
  8. }
  9. public override TransactionStatus check(Message value)
  10. {
  11. Console.WriteLine("check Topic: {0}, Tag:{1}, key:{2}, msgId:{3},msgbody:{4}, userProperty:{5}",
  12. value.getTopic(), value.getTag(), value.getKey(), value.getMsgID(), value.getBody(), value.getUserProperty("VincentNoUser"));
  13. // Message ID (It is possible that the message bodies are the same, while the message IDs are different. The current message ID cannot be queried on the console).
  14. string msgId = value.getMsgID();
  15. // Perform crc32 or MD5 on the message body content.
  16. // Message ID and crc32id are mainly used to prevent duplicated messages
  17. // If the service itself is idempotent, it can be ignored. Otherwise idempotence has to be implemented through msgId or crc32Id.
  18. // If it is required that there are no duplicated messages, it is recommended to prevent duplicated messages through performing crc32 or md5 on the message bodies.
  19. TransactionStatus transactionStatus = TransactionStatus.Unknow;
  20. try {
  21. boolean isCommit = execution result of local transaction;
  22. if (isCommit) {
  23. // If the local transaction succeeds, submit the message
  24. transactionStatus = TransactionStatus.CommitTransaction;
  25. } else {
  26. // If the local transaction fails, rollback the message
  27. transactionStatus = TransactionStatus.RollbackTransaction;
  28. }
  29. } catch (Exception e) {
  30. //exception handle
  31. }
  32. return transactionStatus
  33. }
  34. }

Transaction Message Status Check Mechanism

  • Why message status check mechanism must be implemented when sending transactional messages?

    When half message is sent in step 1, but the returned status of local transaction is TransactionStatus.Unknown, or no status is submitted due to the application quitting, from the perspective of MQ Broker, the status of the half message is unknown. The MQ broker then requires the sender to check the status of the half message regularly, and report the final status.

  • When the check has been called back, what does the service logic need to do?

    In the check method of MQ transactional messages, logic for checking the transaction consistency should be added. LocalTransactionChecker has to be implemented for MQ to process the status check requests actively initiated by MQ Broker. Therefore, two steps need to be completed in the check method of transactional messages:

    (1) Check the status of the local transaction related to the half message (committed or rollback);

    (2) Submit the status of the local transaction to the MQ broker.

  • What are the effects of different statuses of local transaction to the half message?

    • TransactionStatus.CommitTransaction: Submit a transaction and allow the consumer to consume the message;

    • TransactionStatus.RollbackTransaction: Rollback a transaction, the message will be discarded and not allowed to be consumed;

    • TransactionStatus.Unknown: The status is unknown, and the MQ broker is expected to check with the sender for the local transaction status.

    For detailed code, see the implementation of MyLocalTransactionChecker.

If the service itself is idempotent, it can be ignored. Otherwise idempotence has to be implemented through msgId or crc32Id.For instructions and sampel code of subscribing to transactional messages, see Subscribe to messages.

Thank you! We've received your feedback.