Transaction Message

Last Updated: Jun 07, 2017

Context

In some cases, local operations must be consistent with message sending transactions. That is, if a message is successfully sent, the local operation succeeds; if a message fails to be sent, the local operation fails (the message needs to be rolled back even if it is successfully sent). This prevents situations where an operation succeeds but the message fails to be sent, or an operation fails but the message is successfully sent.

In addition, a message must be successfully processed once on the consumer end to avoid manual reset of consumption progress as a result of failed message processing caused by consumer program crash.

Solution

You can use the message delay function of Message Service to implement transaction messages.

Preparation

Create two queues:

  1. Transaction message queue

The message validity period is shorter than the message delay time. That is, if the producer does not actively modify (submit) the message visibility time, the message is invisible to the consumer.

  1. Operation log queue

The operation log queue records operations on transaction messages. The message delay time is the timeout time for the transaction operation. After a message in the log queue is confirmed (deleted), the message is invisible to the consumer.

Procedure

  1. The producer sends a transaction preparation message to the transaction message queue.

  2. The producer writes an operation log message to the operation log queue. The log contains the handle of the message in Step 1.

  3. The producer performs a local transaction operation.

  4. If the operation in Step 3 succeeds, the producer submits the message (which is visible to the consumer). Otherwise, the producer rolls back the message.

  5. The producer confirms the operation log in Step 2 (and deletes the log message).

  6. After Step 4 is complete, the consumer can receive the transaction message.

  7. The consumer processes the message.

  8. The consumer confirms and deletes the message.

Refer to the following figure.

Exception analysis:

Exceptions of the producer (for example: process restart)

A. Read the log that is not confirmed due to operation log queue timeout.

B. Check the transaction result.

C. If the check result indicates that the transaction is successful, submit the message. (Repeated message submission does not affect the system. Messages with the same handle can only be successfully submitted once.)

D. Confirm the operation log.

Exceptions of the consumer (for example: process restart)

A message must be processed on the consumer at least once. If Step 8 fails, the message is visible to the consumer after a period of time, and can be processed by the current or another consumer.

Unreachable Message Service (for example: network disconnection)

Message sending and receiving status and operation logs are stored in Message Service, which features high reliability and high availability. After the network connection is recovered, transactions can be continuously implemented, ensuring the consistency between operations and message sending transactions.

Transaction Message

Code implementation:

TransactionQueue of the latest Message Service Java SDK (1.1.5) supports the preceding transaction message solution. You can easily implement transaction messages by adding the service operation and check logic to TransactionOperations and TransactionChecker.

Demo code

  1. public class TransactionMessageDemo{
  2. public class MyTransactionChecker implements TransactionChecker
  3. {
  4. public boolean checkTransactionStatus(Message message)
  5. {
  6. boolean checkResult = false;
  7. String messageHandler = message.getReceiptHandle();
  8. try{
  9. //TODO: check if the messageHandler related transaction is success.
  10. checkResult = true;
  11. }catch(Exception e)
  12. {
  13. checkResult = false;
  14. }
  15. return checkResult;
  16. }
  17. }
  18. public class MyTransactionOperations implements TransactionOperations
  19. {
  20. public boolean doTransaction(Message message)
  21. {
  22. boolean transactionResult = false;
  23. String messageHandler = message.getReceiptHandle();
  24. String messageBody = message.getMessageBody();
  25. try{
  26. //TODO: do your local transaction according to the messageHandler and messageBody here.
  27. transactionResult = true;
  28. }catch(Exception e)
  29. {
  30. transactionResult = false;
  31. }
  32. return transactionResult;
  33. }
  34. }
  35. public static void main(String[] args) {
  36. System.out.println("Start TransactionMessageDemo");
  37. String transQueueName = "transQueueName";
  38. String accessKeyId = ServiceSettings.getMNSAccessKeyId();
  39. String accessKeySecret = ServiceSettings.getMNSAccessKeySecret();
  40. String endpoint = ServiceSettings.getMNSAccountEndpoint();
  41. CloudAccount account = new CloudAccount(accessKeyId, accessKeySecret, endpoint);
  42. MNSClient client = account.getMNSClient(); //this client need only initialize once
  43. // create queue for transaction queue.
  44. QueueMeta queueMeta = new QueueMeta();
  45. queueMeta.setQueueName(transQueueName);
  46. queueMeta.setPollingWaitSeconds(15);
  47. TransactionMessageDemo demo = new TransactionMessageDemo();
  48. TransactionChecker transChecker = demo.new MyTransactionChecker();
  49. TransactionOperations transOperations = demo.new MyTransactionOperations();
  50. TransactionQueue transQueue = client.createTransQueue(queueMeta, transChecker);
  51. // do transaction.
  52. Message msg = new Message();
  53. String messageBody = "TransactionMessageDemo";
  54. msg.setMessageBody(messageBody);
  55. transQueue.sendTransMessage(msg, transOperations);
  56. // delete queue and close client if we won't use them.
  57. transQueue.delete();
  58. // close the client at the end.
  59. client.close();
  60. System.out.println("End TransactionMessageDemo");
  61. }
  62. }
Thank you! We've received your feedback.