edit-icon download-icon

Send and receive transactional messages

Last Updated: Jun 19, 2018

The currently supported regions include Internet, East China 1, East China 2, North China 2, and South China 1.

Interaction Process

The interaction process of MQ transactional messages is shown in the figure below:

MQ Transactional Message Interaction Process

Send Transactional Messages

Sending a transactional message involves the following two steps:

  1. Send a half message and execute local transaction. The sample code is as follows:
  1. #include "ONSFactory.h"
  2. #include "ONSClientException.h"
  3. using namespace ons;
  4. class MyLocalTransactionExecuter : LocalTransactionExecuter
  5. {
  6. MyLocalTransactionExecuter()
  7. {
  8. }
  9. ~MyLocalTransactionExecuter()
  10. {
  11. }
  12. virtual TransactionStatus execute(Message &value)
  13. {
  14. // Message ID (It is possible that the message bodies are the same, while the message IDs are different. The current message ID cannot be queried on the console).
  15. string msgId = value.getMsgID();
  16. // Perform crc32 or MD5 on the message body content.
  17. // Message ID and crc32id are mainly used to prevent duplicated messages.
  18. // If the service itself is idempotent, it can be ignored. Otherwise idempotence has to be implemented through msgId or crc32Id.
  19. // If it is required that there are no duplicated messages, it is recommended to prevent duplicated messages through performing crc32 or md5 on the message bodies.
  20. TransactionStatus transactionStatus = Unknow;
  21. try {
  22. boolean isCommit = execution result of local transaction;
  23. if (isCommit) {
  24. // If the local transaction succeeds, submit the message.
  25. transactionStatus = CommitTransaction;
  26. } else {
  27. // If the local transaction fails, rollback the message.
  28. transactionStatus = RollbackTransaction;
  29. }
  30. } catch (...) {
  31. //exception handle
  32. }
  33. return transactionStatus
  34. }
  35. }
  36. int main(int argc, char* argv[])
  37. {
  38. ONSFactoryProperty factoryInfo;
  39. factoryInfo.setFactoryProperty(ONSFactoryProperty::ProducerId, "XXX");//The producer ID you have created on the console
  40. factoryInfo.setFactoryProperty(ONSFactoryProperty::PublishTopics,"XXX" );//Enter the topic you have created on the console
  41. factoryInfo.setFactoryProperty(ONSFactoryProperty::MsgContent, "XXX");//msg content
  42. factoryInfo.setFactoryProperty(ONSFactoryProperty::AccessKey, "xxxxxxxxx");//Alibaba Cloud ID verification, which is created on Alibaba Cloud Management Console
  43. factoryInfo.setFactoryProperty(ONSFactoryProperty::SecretKey, "xxxxxxxxxxxxxxxxxxxx" );//Alibaba Cloud ID verification, which is created on Alibaba Cloud Management Console
  44. //Create producer. MQ is not responsible for releasing pChecker, and the service provider needs to release the resource by itself.
  45. MyLocalTransactionChecker *pChecker = new MyLocalTransactionChecker();
  46. g_producer = ONSFactory::getInstance()->createTransactionProducer(factoryInfo,pChecker);
  47. // Before sending messages, the start method must be called once to start the producer.
  48. pProducer->start();
  49. Message msg(
  50. //Message topic
  51. factoryInfo.getPublishTopics(),
  52. //Message tag, which is similar to tag in Gmail, and is used to classify messages. Consumers can then set filtering conditions for messages to be filtered in MQ broker.
  53. "TagA",
  54. //Message Body, which cannot be null. Serialization and deserialization methods need to be negotiated and remain consistent between the producer and the consumer.
  55. factoryInfo.getMessageContent()
  56. );
  57. // The setting represents the key service property of the message, so please set it as globally unique as possible.
  58. // You can query a message and resend it through the MQ console when you cannot receive the message properly.
  59. // Note: Normal sessage sending and receiving will not be affected if message key is not configured.
  60. msg.setKey("ORDERID_100");
  61. // If no exceptions are thrown, then the message is sent successfully.
  62. try
  63. {
  64. //MQ is not responsible for the release of pExecuter, and the service provider needs to release the resource by itself.
  65. MyLocalTransactionExecuter pExecuter = new MyLocalTransactionExecuter();
  66. SendResultONS sendResult = pProducer->send(msg,pExecuter);
  67. }
  68. catch(ONSClientException & e)
  69. {
  70. //Customize the details for processing exceptions
  71. }
  72. // The object Producer must be destroyed before exiting the application. Otherwise there will be memory leakage.
  73. pProducer->shutdown();
  74. return 0;
  75. }
  1. Submit transactional message status

    When the local transaction execution is completed (succeeded or failed), it is required to notify the server of the transaction status of the current message. There are two notification methods:

    • Submit when the local transaction execution is completed;
    • Do not submit the status when the local transaction execution is completed, and wait for the server to check the transaction status of the message.

There are 3 statuses for transactions:

  • TransactionStatus.CommitTransaction: Submit a transaction and allow the consumer to consume the message;
  • TransactionStatus.RollbackTransaction: Rollback a transaction, the message will be discarded and not allowed to be consumed;
  • TransactionStatus.Unknown: The status is unknown, and the MQ broker is expected to check with the sender for the local transaction status.

    1. class MyLocalTransactionChecker : LocalTransactionChecker
    2. {
    3. MyLocalTransactionChecker()
    4. {
    5. }
    6. ~MyLocalTransactionChecker()
    7. {
    8. }
    9. virtual TransactionStatus check(Message &value)
    10. {
    11. // Message ID (It is possible that the message bodies are the same, while the message IDs are different. The current message ID cannot be queried on the console).
    12. string msgId = value.getMsgID();
    13. // Perform crc32 or MD5 on the message body content.
    14. // Message ID and crc32id are mainly used to prevent duplicated messages.
    15. // If the service itself is idempotent, it can be ignored. Otherwise idempotence has to be implemented through msgId or crc32Id.
    16. // If it is required that there are no duplicated messages, it is recommended to prevent duplicated messages through performing crc32 or md5 on the message bodies.
    17. TransactionStatus transactionStatus = Unknow;
    18. try {
    19. boolean isCommit = execution result of local transaction;
    20. if (isCommit) {
    21. // If the local transaction succeeds, submit the message.
    22. transactionStatus = CommitTransaction;
    23. } else {
    24. // If the local transaction fails, rollback the message.
    25. transactionStatus = RollbackTransaction;
    26. }
    27. } catch(...) {
    28. //exception error
    29. }
    30. return transactionStatus
    31. }
    32. }

Transaction Message Status Check Mechanism

  • Why message status check mechanism must be implemented when sending transactional messages?

    When half message is sent in step 1, but the returned status of local transaction is TransactionStatus.Unknown, or no status is submitted due to the application quitting, from the perspective of MQ Broker, the status of the half message is unknown. The MQ broker then requires the sender to check the status of the half message regularly, and report the final status.

  • When the check has been called back, what does the service logic need to do?

    In the check method of MQ transactional messages, logic for checking the transaction consistency should be added. LocalTransactionChecker has to be implemented for MQ to process the status check requests actively initiated by MQ Broker. Therefore, two steps need to be completed in the check method of transactional messages:

    (1) Check the status of the local transaction related to the half message (committed or rollback);

    (2) Submit the status of the local transaction to the MQ broker.

  • What effect do the different statuses of local transactions have on the half message?

    • TransactionStatus.CommitTransaction: Submit a transaction to allow the consumer to consume the message.

    • TransactionStatus.RollbackTransaction: Rolls back a transaction, and the message is discarded and will not be consumed.

    • TransactionStatus.Unknown: The status is unknown, and the MQ broker is expected to check with the sender for the local transaction status.

    For detailed code, see the implementation of MyLocalTransactionChecker.

Subscribe to Transactional Messages

For instructions and example codes of subscribing to normal messages, see Subscribe to messages.

Thank you! We've received your feedback.