All Products
Search
Document Center

Send and receive transactional messages

Last Updated: Mar 29, 2019

The currently supported regions include Internet, China (Hangzhou), China (Beijing), China (Shanghai), and China (Shenzhen).

Interaction process

The interaction process of transactional messages is shown in the following figure.

transaction

Send transactional messages

Follow these steps to send a transactional message:

  1. Send a half message and execute a local transaction. The sample code is as follows:
  1. #include "ONSFactory.h"
  2. #include "ONSClientException.h"
  3. using namespace ons;
  4. class MyLocalTransactionExecuter : LocalTransactionExecuter
  5. {
  6. MyLocalTransactionExecuter()
  7. {
  8. }
  9. ~MyLocalTransactionExecuter()
  10. {
  11. }
  12. virtual TransactionStatus execute(Message &value)
  13. {
  14. // The message ID. Two messages may have the same message body but different message IDs. The current message ID cannot be obtained in the console.
  15. string msgId = value.getMsgID();
  16. // Compute the message body by using CRC32, MD5, or other algorithms.
  17. // The message ID and CRC32 ID are used to prevent duplication of messages.
  18. // You do not need to specify the message ID or CRC32 ID if the business itself achieves idempotence. Otherwise, set the message ID or CRC32 ID to ensure idempotence.
  19. // To avoid duplication of messages, compute the message body by using the CRC32 or MD5 algorithm.
  20. TransactionStatus transactionStatus = Unknow;
  21. try {
  22. boolean isCommit = Execution result of the local transaction
  23. if (isCommit) {
  24. // If the local transaction succeeded, the message is submitted.
  25. transactionStatus = CommitTransaction;
  26. } else {
  27. // If the local transaction failed, the message is rolled back.
  28. transactionStatus = RollbackTransaction;
  29. }
  30. } catch (...) {
  31. //exception handle
  32. }
  33. return transactionStatus;
  34. }
  35. }
  36. int main(int argc, char* argv[])
  37. {
  38. //The information that is required to create a producer and to send messages.
  39. ONSFactoryProperty factoryInfo;
  40. factoryInfo.setFactoryProperty(ONSFactoryProperty::ProducerId,"XXX");//The group ID you created in the console.
  41. factoryInfo.setFactoryProperty(ONSFactoryProperty::NAMESRV_ADDR, "XXX"); //Set the TCP endpoint: Go to the **Instances** page in the RocketMQ console, and view the endpoint in the **Endpoint Information** area.
  42. factoryInfo.setFactoryProperty(ONSFactoryProperty::PublishTopics,"XXX" );//The topic you created in the console.
  43. factoryInfo.setFactoryProperty(ONSFactoryProperty::MsgContent, "XXX");//msg content
  44. factoryInfo.setFactoryProperty(ONSFactoryProperty::AccessKey, "xxx");//The AccessKeyId that was created in the Alibaba Cloud console for identity authentication.
  45. factoryInfo.setFactoryProperty(ONSFactoryProperty::SecretKey, "xxxxxxxxxxxxxxxxxxxx" );//The AccessKeySecret that was created in the Alibaba Cloud console for identity authentication.
  46. //Create a producer. RocketMQ does not release pChecker, which must be released by the service provider.
  47. MyLocalTransactionChecker *pChecker = new MyLocalTransactionChecker();
  48. g_producer = ONSFactory::getInstance()->createTransactionProducer(factoryInfo,pChecker);
  49. //Before sending a message, call the start method once to start the producer.
  50. pProducer->start();
  51. Message msg(
  52. //The message topic.
  53. factoryInfo.getPublishTopics(),
  54. // The message tag, which is similar to a Gmail tag. It is used to sort messages, enabling the consumer to filter messages on the RocketMQ broker based on the specified criteria.
  55. "TagA",
  56. //The body of the message. The message body cannot be empty. RocketMQ makes no interventions. The compatible serialization and deserialization methods must be negotiated by the producer and the consumer.
  57. factoryInfo.getMessageContent()
  58. );
  59. // Set a key service property representing the message, that is, the message key, and try to keep it globally unique.
  60. // A unique identifier enables you to query a message and resend it in the console if you fail to receive the message.
  61. // Note: Messages can still be sent and received if you do not set this attribute.
  62. msg.setKey("ORDERID_100");
  63. // The message is sent if no exception is thrown.
  64. try
  65. {
  66. //RocketMQ does not release pExecuter, which must be released by the service provider.
  67. MyLocalTransactionExecuter pExecuter = new MyLocalTransactionExecuter();
  68. SendResultONS sendResult = pProducer->send(msg,pExecuter);
  69. }
  70. catch(ONSClientException & e)
  71. {
  72. //Customize the exception handling details.
  73. }
  74. // Destroy the producer before exiting the application. Otherwise, memory leakage may occur.
  75. pProducer->shutdown();
  76. return 0;
  77. }
  1. Submit the status of the transactional message.

After the execution of a local transaction (successful or failed), the broker must be notified of the transaction status of the current message. Two notification modes are supported:

  • Submit the status after executing the local transaction.

  • Wait until the broker requests to check the transaction status of the message.

A transaction may be in one of the following states:

  • TransactionStatus.CommitTransaction: The transaction is submitted, and the consumer can consume the message.

  • TransactionStatus.RollbackTransaction: The transaction is rolled back, and the message is discarded and cannot be consumed.

  • TransactionStatus.Unknow: The transaction is in an unknown status, and the broker is expected to query the status of the local transaction that corresponds to the message from the message sender.

    1. class MyLocalTransactionChecker : LocalTransactionChecker
    2. {
    3. MyLocalTransactionChecker()
    4. {
    5. }
    6. ~MyLocalTransactionChecker()
    7. {
    8. }
    9. virtual TransactionStatus check(Message &value)
    10. {
    11. // The ID of the message. Two messages may have the same body but different IDs. Currently, you cannot query message IDs in the console.
    12. string msgId = value.getMsgID();
    13. // Compute the message body by using CRC32, MD5, or other algorithms.
    14. // The message ID and CRC32 ID are used to prevent duplication of messages.
    15. // You do not need to specify the message ID or CRC32 ID if the business itself achieves idempotence. Otherwise, set the message ID or CRC32 ID to ensure idempotence.
    16. // To avoid duplication of messages, compute the message body by using the CRC32 or MD5 algorithm.
    17. TransactionStatus transactionStatus = Unknow;
    18. try {
    19. boolean isCommit = Execution result of the local transaction
    20. if (isCommit) {
    21. // If the local transaction succeeded, the message is submitted.
    22. transactionStatus = CommitTransaction;
    23. } else {
    24. // If the local transaction failed, the message is rolled back.
    25. transactionStatus = RollbackTransaction;
    26. }
    27. } catch(...) {
    28. //exception error
    29. }
    30. return transactionStatus;
    31. }
    32. }

Transaction check mechanism

  • Why must the transaction status check mechanism be implemented when transactional messages are sent?

    When a half message is sent in step 1, but the returned status of the local transaction is TransactionStatus.Unknow, or no status is submitted because the application exits, the status of the half message is unknown to the RocketMQ broker. Therefore, the RocketMQ broker requires the message sender to check the status of the half message and to periodically report the final status.

  • What does the business logic do when the check method is called back?

The check method for transactional messages needs to contain the logic of transaction consistency check. After a transactional message is sent, RocketMQ needs to use the LocalTransactionChecker operation to respond to the request of the broker for the local transaction status. Therefore, the check method for the transactional message needs to complete the following tasks:

(1) Check the status of the local transaction corresponding to the half message (committed or rollback).

(2) Submit the status of the local transaction to the broker.

  • What is the impact of different local transaction statuses on the half message?

  • TransactionStatus.CommitTransaction: The transaction is submitted, and the consumer can consume the message.

  • TransactionStatus.RollbackTransaction: The transaction is rolled back, and the message is discarded and cannot be consumed.

  • TransactionStatus.Unknow: The transaction is in an unknown status, and the broker is expected to query the status of the local transaction that corresponds to the message from the message sender.

For more information about the code, see the implementation of MyLocalTransactionChecker.

Subscribe to transactional messages

For instructions and sample codes for subscribing to normal messages, see Subscribe to messages.