This article describes how to use the message delay feature of MNS to achieve transactional consistency between on-premises operations and message sending.

Prerequisites

The following queues are created. For more information, see Create a queue.

  • Transactional message queue
    The message validity period is shorter than the message delay in the queue.
    • If producers modify the validity period of messages after the messages are sent and transactions are implemented, the messages are visible to consumers.
    • If producers do not modify the validity period of messages after messages are sent and transactions fail to be implemented, the messages are invisible to consumers.
  • Operation log queue

    The operation log queue stores the operation logs of transactional messages. The message delay is the timeout period of a transactional operation. If messages in the operation log queue are confirmed, they are invisible to consumers.

Process

In some business scenarios, you must ensure the transactional consistency between on-premises operations and message sending. If a message is sent, an on-premises operation succeeds. If a message is sent and an on-premises operation fails, a rollback is performed. The following figure shows the process.

Handle transactional messages

If a message is sent and an on-premises operation succeeds, the following process is used:

  1. The producer sends a transactional message to the transactional message queue.
  2. The producer sends log entries to the operation log queue. The log entries include the receipt handle of the transactional message created in Step 1.
  3. The producer implements the on-premises transaction.
  4. The producer modifies the validity period of the transactional message. The message is visible to consumers.
  5. The producer confirms and deletes the log entries in the operation logs queue.
  6. The consumer consumes the transactional message from the transactional message queue.
  7. The consumer processes the transactional message.
  8. The consumer requests to delete the transactional message.

If a message is sent and an on-premises operation fails, the following process is used:

  1. The producer sends a transactional message to the transactional message queue.
  2. The producer sends log entries to the operation log queue. The log entries include the receipt handle of the transactional message created in Step 1.
  3. The producer fails to implement the on-premises transaction.
  4. The operation log queue requests the producer to read unconfirmed log entries that have timed out. This step corresponds to Step A in the preceding figure.
  5. The producer checks the transaction result and finds that the on-premises operation failed. This step corresponds to Step B in the preceding figure.
  6. The producer sends a request to roll back the message. The message delay is not modified, and the message is invisible to consumers. This step corresponds to Step C in the preceding figure.
  7. The producer confirms and deletes the log entries in the operation logs queue. This step corresponds to Step D in the preceding figure.

Sample code

MNS provides the TransactionQueue class in the SDK for Java 1.1.8 to keep transactional consistency between on-premises operations and message sending. To do this, you need to specify an on-premises transaction and the exception-handling logic in the following operations: TransactionOperations and TransactionChecker.

The following code is used as an example:

public class TransactionMessageDemo {

    public class MyTransactionChecker implements TransactionChecker {
        @Override
        public boolean checkTransactionStatus(Message message) {
            boolean checkResult = false;
            String messageHandler = message.getReceiptHandle();
            try {
                // Check whether the messageHandler transaction is successful. 
                checkResult = true;
            } catch (Exception e) {
                checkResult = false;
            }
            return checkResult;
        }
    }

    public class MyTransactionOperations implements TransactionOperations {
        @Override
        public boolean doTransaction(Message message) {
            boolean transactionResult = false;
            String messageHandler = message.getReceiptHandle();
            String messageBody = message.getMessageBody();
            try {
                // Implement the on-premises transaction based on messageHandler and messageBody. 
                transactionResult = true;
            } catch (Exception e) {
                transactionResult = false;
            }
            return transactionResult;
        }
    }

    public static void main(String[] args) {
        System.out.println("Start TransactionMessageDemo");
        String transQueueName = "transQueueName";
        String accessKeyId = ServiceSettings.getMNSAccessKeyId();
        String accessKeySecret = ServiceSettings.getMNSAccessKeySecret();
        String endpoint = ServiceSettings.getMNSAccountEndpoint();

        CloudAccount account = new CloudAccount(accessKeyId, accessKeySecret, endpoint);
        MNSClient client = account.getMNSClient(); // Initialize the client. 

        // Create a queue for the transaction. 
        QueueMeta queueMeta = new QueueMeta();
        queueMeta.setQueueName(transQueueName);
        queueMeta.setPollingWaitSeconds(15);

        TransactionMessageDemo demo = new TransactionMessageDemo();
        TransactionChecker transChecker = demo.new MyTransactionChecker();
        TransactionOperations transOperations = demo.new MyTransactionOperations();

        TransactionQueue transQueue = client.createTransQueue(queueMeta, transChecker);

        // Implement the transaction. 
        Message msg = new Message();
        String messageBody = "TransactionMessageDemo";
        msg.setMessageBody(messageBody);
        transQueue.sendTransMessage(msg, transOperations);

        // If you no longer perform operations on the queue, delete the queue and shut down the client. 
        transQueue.delete();

        // Shut down the client. 
        client.close();
        System.out.println("End TransactionMessageDemo");
    }

}

Troubleshoot exceptions

  • Producer-side exceptions (for example, process restart exceptions)
    1. Request the producer to read unconfirmed log entries that have timed out.
    2. Check whether the transaction is implemented.
    3. If the transaction is implemented, submit the message to the transactional message queue.
      Note Repeated submissions do not affect the whole process. Messages with the same receipt handle can be submitted only once.
    4. Confirm the log entries
  • Consumer-side exceptions (for example, process restart exceptions)

    MNS ensures that a message is consumed at least once. If the consumer does not delete the transactional message, the message becomes visible after the invisibility period ends and can be processed by the current consumer or other consumers.

  • Inaccessible MNS services (for example, disconnected network)

    The MNS server stores the status and log entries of the message. MNS features high reliability and high availability, which allows you to continue the transaction after the network recovers. If the transaction succeeds, consumers can retrieve and process the message. Otherwise, consumers cannot receive the message.