This article describes how to use the message delay feature of MNS to achieve transactional consistency between on-premises operations and message sending.

Prerequisites

The following queues are created:

  • Transactional message queue
    The message validity period is shorter than the message delay in the queue.
    • If producers modify the validity period of messages after the messages are sent and transactions are implemented, messages are visible to consumers.
    • If producers do not modify the validity period of messages after messages are sent and transactions fail to be implemented, messages are invisible to consumers.
  • Operations log queues

    Operations log queues store the operations logs of transactional messages. The message delay is the timeout period of a transactional operation. If the messages in an operations log queue are confirmed, they are invisible to consumers.

How it works

In some business scenarios, you must ensure the transactional consistency between on-premises operations and message sending. If a message is sent, an on-premises operation succeeds. If a message is sent and an on-premises operation fails, a rollback is performed.

The following figure shows the process.

Transactional messages

If a message is sent and an on-premises operation succeeds, the following process is used:

  1. The producer sends a transactional message to the transactional message queue.
  2. The producer sends log entries to the operations log queue. The log entries include the receipt handle of the transactional message created in Step 1.
  3. The producer implements the on-premises transaction.
  4. The producer modifies the validity period of the transactional message. The message is visible to consumers.
  5. The producer confirms and deletes the log entries in the operations logs queue.
  6. The consumer consumes the transactional message from the transactional message queue.
  7. The consumer processes the transactional message.
  8. The consumer requests to delete the transactional message.

If a message is sent and an on-premises operation fails, the following process is used:

  1. The producer sends a transactional message to the transactional message queue.
  2. The producer sends log entries to the operations log queue. The log entries include the receipt handle of the transactional message created in Step 1.
  3. The producer fails to implement the on-premises transaction.
  4. The operations log queue requests the producer to read unconfirmed log entries that have timed out.
  5. The producer checks the transaction result and finds that the on-premises operation failed.
  6. If the producer sends a request that does not modify the validity period of the transactional message, the message is deleted.
  7. The producer confirms and deletes the log entries in the operations logs queue.

Sample code

You can use the TransactionQueue class provided by Java SDK 1.1.8 to keep transactional consistency between on-premises operations and message sending. To do this, you need to specify an on-premises transaction and the exception-handling logic in the following operations: TransactionOperations and TransactionChecker.

The following code is used as an example:

public class TransactionMessageDemo{
    public class MyTransactionChecker implements TransactionChecker
    {
        public boolean checkTransactionStatus(Message message)
        {
            boolean checkResult = false;
            String messageHandler = message.getReceiptHandle();
            try{
                // Check whether the messageHandler transaction is successful.
                checkResult = true;
            }catch(Exception e)
            {
                checkResult = false;
            }
            return checkResult;
        }
    }

    public class MyTransactionOperations implements TransactionOperations
    {
        public boolean doTransaction(Message message)
        {
            boolean transactionResult = false;
            String messageHandler = message.getReceiptHandle();
            String messageBody = message.getMessageBody();
            try{
                // Implement the on-premises transaction based on messageHandler and messageBody.
                transactionResult = true;
            }catch(Exception e)
            {
                transactionResult = false;
            }
            return transactionResult;
        }
    }

    public static void main(String[] args) {
        System.out.println("Start TransactionMessageDemo");
        String transQueueName = "transQueueName";
        String accessKeyId = ServiceSettings.getMNSAccessKeyId();
        String accessKeySecret = ServiceSettings.getMNSAccessKeySecret();
        String endpoint = ServiceSettings.getMNSAccountEndpoint();

        CloudAccount account = new CloudAccount(accessKeyId, accessKeySecret, endpoint);
        MNSClient client = account.getMNSClient(); // Initialize the client.

        // Create a queue for the transaction.
        QueueMeta queueMeta = new QueueMeta();
        queueMeta.setQueueName(transQueueName);
        queueMeta.setPollingWaitSeconds(15);

        TransactionMessageDemo demo = new TransactionMessageDemo();
        TransactionChecker transChecker = demo.new MyTransactionChecker();
        TransactionOperations transOperations = demo.new MyTransactionOperations();

        TransactionQueue transQueue = client.createTransQueue(queueMeta, transChecker);

        // Implement the transaction.
        Message msg = new Message();
        String messageBody = "TransactionMessageDemo";
        msg.setMessageBody(messageBody); 
        transQueue.sendTransMessage(msg, transOperations);

        // If you no longer perform operations on the queue, delete the queue and shut down the client.
        transQueue.delete();

        // Shut down the client.
        client.close();
        System.out.println("End TransactionMessageDemo");
    }

}

Troubleshoot exceptions

  • Producer-side exceptions (for example, process restart exceptions)
    1. Request the producer to read unconfirmed log entries that have timed out.
    2. Check whether the transaction is implemented.
    3. If the transaction is implemented, submit the message to the transactional message queue.
      Note Repeated submissions do not affect the whole process. Messages with the same receipt handle can be submitted only once.
    4. Confirm the log entries
  • Consumer-side exceptions (for example, process restart exceptions)

    MNS ensures that a message is consumed at least once. If the consumer fails to delete the transactional message, the message remains visible for a period of time and can be processed by the current consumer or other consumers.

  • Inaccessible MNS services (for example, disconnected network)

    The MNS server stores the status and log entries of the message. MNS features high reliability and high availability, which allows you to continue the transaction after the network recovers. If the transaction succeeds, consumers can retrieve and process the message. Otherwise, consumers cannot receive the message.