All Products
Search
Document Center

Send and receive transactional messages

Last Updated: Mar 29, 2019

The currently supported regions include Internet, China (Hangzhou), China (Beijing), China (Shanghai), and China (Shenzhen).

Interaction process

The following figure shows the interaction process of RocketMQ transactional messages.

transaction

Send transactional messages

Follow these steps to send a transactional message:

  1. Send a half message and execute a local transaction. The sample code is as follows:

    1. using System;
    2. using System.Collections.Generic;
    3. using System.Linq;
    4. using System.Text;
    5. using System.Runtime.InteropServices;
    6. using ons;
    7. namespace ons
    8. {
    9. public class MyLocalTransactionExecuter : LocalTransactionExecuter
    10. {
    11. public MyLocalTransactionExecuter()
    12. {
    13. }
    14. ~MyLocalTransactionExecuter()
    15. {
    16. }
    17. public override TransactionStatus execute(Message value)
    18. {
    19. Console.WriteLine("execute topic: {0}, tag:{1}, key:{2}, msgId:{3},msgbody:{4}, userProperty:{5}",
    20. value.getTopic(), value.getTag(), value.getKey(), value.getMsgID(), value.getBody(), value.getUserProperty("VincentNoUser"));
    21. //The ID of the message. Two messages can have the same message body but different message IDs. Currently, message IDs cannot be queried in the console.
    22. string msgId = value.getMsgID();
    23. // Compute the message body by using CRC32, MD5, or other algorithms.
    24. // The message ID and CRC32 ID are used to prevent duplication of messages.
    25. // To avoid duplication of messages, compute the message body by using the CRC32 or MD5 algorithm.
    26. TransactionStatus transactionStatus = TransactionStatus.Unknow;
    27. try {
    28. boolean isCommit = Execution result of the local transaction;
    29. if (isCommit) {
    30. // Submit the message if the local transaction succeeds.
    31. transactionStatus = TransactionStatus.CommitTransaction;
    32. } else {
    33. // Roll back the message if the local transaction fails.
    34. transactionStatus = TransactionStatus.RollbackTransaction;
    35. }
    36. } catch (Exception e) {
    37. //exception handle
    38. }
    39. return transactionStatus;
    40. }
    41. }
    42. class onscsharp
    43. {
    44. static void Main(string[] args)
    45. {
    46. ONSFactoryProperty factoryInfo = new ONSFactoryProperty();
    47. factoryInfo.setFactoryProperty(factoryInfo.NAMESRV_ADDR, "XXX");//Set the TCP endpoint: Go to the **Instances** page in the RocketMQ console, and view the endpoint in the **Endpoint Information** area.
    48. factoryInfo.setFactoryProperty(factoryInfo.ProducerId, "");//The group ID you created in the console.
    49. factoryInfo.setFactoryProperty(factoryInfo.PublishTopics, "");//The topic you created in the console.
    50. factoryInfo.setFactoryProperty(factoryInfo.MsgContent, "");//The message body.
    51. factoryInfo.setFactoryProperty(factoryInfo.AccessKey, "");//The AccessKeyId you created in the Alibaba Cloud console for identity authentication.
    52. factoryInfo.setFactoryProperty(factoryInfo.SecretKey, "");//The AccessKeySecret you created in the Alibaba Cloud console for identity authentication.
    53. //Create transaction producers
    54. ONSFactory onsfactory = new ONSFactory();
    55. LocalTransactionChecker myChecker = new MyLocalTransactionChecker();
    56. TransactionProducer pProducer = onsfactory.getInstance().createTransactionProducer(factoryInfo,ref myChecker);
    57. // Before sending messages, call the start method once to start the producer. After the producer is started, messages can be concurrently sent through multiple threads.
    58. pProducer.start();
    59. Message msg = new Message(
    60. //The message topic.
    61. factoryInfo.getPublishTopics(),
    62. //The message tag.
    63. "TagA",
    64. // The message body.
    65. factoryInfo.getMessageContent()
    66. );
    67. // Set a key service property representing the message, that is, the message key, and try to keep it globally unique.
    68. // A unique identifier enables you to query a message and resend it in the console if you fail to receive the message.
    69. // Note: Messages can still be sent and received if you do not set this attribute.
    70. msg.setKey("ORDERID_100");
    71. // The message is sent if no exception is thrown.
    72. try
    73. {
    74. LocalTransactionExecuter myExecuter = new MyLocalTransactionExecuter();
    75. SendResultONS sendResult = pProducer.send(msg, ref myExecuter);
    76. }
    77. catch(ONSClientException e)
    78. {
    79. Console.WriteLine("\nexception of sendmsg:{0}",e.what() );
    80. }
    81. // Destroy the producer before exiting the application. Otherwise, memory leakage may occur.
    82. // The producer cannot be started again after shutdown.
    83. pProducer.shutdown();
    84. }
    85. }
    86. }
  2. Submit the transactional message status.

After the execution of a local transaction (successful or failed), the broker must be notified of the transaction status of the current message. Two notification modes are supported:

  • Submit the status after executing the local transaction.

  • Wait until the broker requests to check the transaction status of the message.

A transaction may be in one of the following states:

  • TransactionStatus.CommitTransaction: The transaction is submitted, and the consumer can consume the message.

  • TransactionStatus.RollbackTransaction: The transaction is rolled back, and the message is discarded and cannot be consumed.

  • TransactionStatus.Unknow: The transaction is in an unknown status, and the broker is expected to query the status of the local transaction that corresponds to the message from the message sender.

    1. Public class MyLocalTransactionChecker: LocalTransactionChecker
    2. {
    3. public MyLocalTransactionChecker()
    4. {
    5. }
    6. ~MyLocalTransactionChecker()
    7. {
    8. }
    9. public override TransactionStatus check(Message value)
    10. {
    11. Console.WriteLine("check topic: {0}, tag:{1}, key:{2}, msgId:{3},msgbody:{4}, userProperty:{5}",
    12. value.getTopic(), value.getTag(), value.getKey(), value.getMsgID(), value.getBody(), value.getUserProperty("VincentNoUser"));
    13. // The ID of the message. Two messages can have the same message body but different message IDs. Currently, message IDs cannot be queried in the console.
    14. string msgId = value.getMsgID();
    15. // Compute the message body by using CRC32, MD5, or other algorithms.
    16. // The message ID and CRC32 ID are used to prevent duplication of messages.
    17. // You do not need to specify the message ID or CRC32 ID if the business itself achieves idempotence. Otherwise, set the message ID or CRC32 ID to ensure idempotence.
    18. // To avoid duplication of messages, compute the message body by using the CRC32 or MD5 algorithm.
    19. TransactionStatus transactionStatus = TransactionStatus.Unknow;
    20. try {
    21. boolean isCommit = Execution result of the local transaction;
    22. if (isCommit) {
    23. // If the local transaction succeeded, the message is submitted.
    24. transactionStatus = TransactionStatus.CommitTransaction;
    25. } else {
    26. // If the local transaction failed, the message is rolled back.
    27. transactionStatus = TransactionStatus.RollbackTransaction;
    28. }
    29. } catch (Exception e) {
    30. //exception handle
    31. }
    32. return transactionStatus;
    33. }
    34. }

Transaction check mechanism

  • Why must the transaction status check mechanism be implemented when transactional messages are sent?

    When a half message is sent in step 1, but either the returned status of the local transaction is TransactionStatus.Unknow, or no status is submitted because the application exits, the status of the half message is unknown to the RocketMQ broker. Therefore, the RocketMQ broker requires the message sender to periodically check the status of the half message and report the final status.

  • What does the business logic do when the check method is called back?

    The check method for transactional messages needs to contain the logic of transaction consistency check. After a transactional message is sent, RocketMQ needs to use the LocalTransactionChecker operation to respond to the request of the broker for the local transaction status. Therefore, the check method for the transactional message needs to complete the following tasks:

(1) Check the status of the local transaction corresponding to the half message (committed or rollback).

(2) Submit the status of the local transaction to the broker.

  • What is the impact of different local transaction statuses on the half message?

    • TransactionStatus.CommitTransaction: The transaction is submitted, and the consumer can consume the message.

    • TransactionStatus.RollbackTransaction: The transaction is rolled back, and the message is discarded and cannot be consumed.

    • TransactionStatus.Unknow: The transaction is in an unknown status, and the broker is expected to query the status of the local transaction that corresponds to the message from the message sender.

      For more information about the code, see the implementation of MyLocalTransactionChecker.

Subscribe to transactional messages

For instructions and sample codes of subscribing to normal messages, see Subscribe to messages.