If the consumption of duplicate messages affects your business logic, you should ensure that your messages are idempotent. This topic describes the concept of message idempotence, scenarios that require it, and methods for implementation.
What is message idempotence
In mathematics and computer science, an idempotent operation is an operation that produces the same result regardless of whether it is executed once or multiple times. In messaging, idempotence means that when a consumer repeatedly consumes a message, the outcome is the same as consuming it once. The repeated consumption does not have any negative impact on the business system.
For example, in a payment scenario, a consumer processes a message to deduct a payment for an order. The deduction amount is 100 USD. If the deduction message is repeatedly delivered due to factors such as network instability, the consumer may process the message multiple times. However, the final business outcome is that the payment of 100 USD is deducted only once. The user's transaction history for the order contains only one deduction record, and the fee is not charged multiple times. In this case, the deduction operation is performed as expected, and the entire message processing procedure is idempotent.
Scenarios
In Internet applications, you may receive duplicate messages from ApsaraMQ for RabbitMQ, especially when the network is unstable. If this duplication affects your business logic, you must ensure that your messages are idempotent. Messages may be duplicated for the following reasons:
-
Duplicate messages during sending
After a message is sent to the server and persisted, a transient disconnection or client breakdown may occur. This can prevent the server from sending an acknowledgement to the producer. If the producer assumes that the message failed to send and resends it, the consumer receives two identical messages with the same content and message ID.
-
Duplicate messages during delivery
A message is delivered to a consumer and the business logic is processed. A transient disconnection then occurs when the client sends an acknowledgement to the server. To ensure that the message is consumed at least once, the ApsaraMQ for RabbitMQ server redelivers the message after the network recovers. The consumer then receives a duplicate message with the same content and message ID.
-
Duplicate messages during rebalancing
When the ApsaraMQ for RabbitMQ server or client restarts, scales out, or scales in, rebalancing is triggered. This process can cause a consumer to receive duplicate messages. Rebalancing can also be caused by events such as network jitter, server-side restarts, and consumer application restarts.
Methods
Follow these steps to implement message idempotence using the message ID as the idempotence key:
-
Create a table in your database that uses the unique message ID as a unique key.
-
Set a unique message ID for each message in the producer client.
The following sample code shows how to set a unique message ID:
AMQP.BasicProperties props = new AMQP.BasicProperties.Builder().messageId(UUID.randomUUID().toString()).build(); channel.basicPublish("${ExchangeName}", "RoutingKey", true, props, ("Message Body" + i).getBytes(StandardCharsets.UTF_8));For more information about message IDs, see How to set a message ID.
-
In the consumer client, process messages for idempotence based on their unique message IDs.
The following sample code shows how to process messages for idempotence based on their unique message IDs:
channel.basicConsume(Producer.QueueName, false, new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { // 1. Obtain the data for the unique business index. try{ String messageId = properties.getMessageId(); // The message ID or other information that can be used as a unique key. // 2. Start a database transaction. idempTable.insert(messageId); // 3. Process the business logic for the received message. // 4. Commit or roll back the transaction. Send an acknowledgement (ACK) only if the processing is successful. channel.basicAck(envelope.getDeliveryTag(), false); } catch (DatabasePrimaryKeyConflictException e){ // This is a duplicate message. Directly acknowledge it. channel.basicAck(envelope.getDeliveryTag(), false); } } } );