重複メッセージの消費がビジネスロジックに影響を与える場合は、メッセージがべき等であることを保証する必要があります。このトピックでは、メッセージのべき等性の概念、それを必要とするシナリオ、および実装方法について説明します。
メッセージのべき等性とは
数学およびコンピューターサイエンスにおいて、べき等な操作とは、一度実行しても複数回実行しても同じ結果を生み出す操作のことです。メッセージングにおいて、べき等性とは、コンシューマーがメッセージを繰り返し消費しても、その結果が一度だけ消費した場合と同じであることを意味します。繰り返しの消費は、業務システムに悪影響を与えません。
たとえば、支払いシナリオでは、コンシューマーが注文の支払いを差し引くためのメッセージを処理します。差し引き金額は100 USDです。ネットワークの不安定さなどの要因により差し引きメッセージが繰り返し配信された場合、コンシューマーはメッセージを複数回処理する可能性があります。ただし、最終的なビジネス成果としては、100 USDの支払いが1回だけ差し引かれます。ユーザーの注文に対するトランザクション履歴には差し引きレコードが1つしか含まれず、手数料も複数回請求されません。この場合、差し引き操作は期待通りに実行され、メッセージ処理プロシージャ全体がべき等となります。
シナリオ
インターネット アプリケーションでは、特にネットワークが不安定な場合に、ApsaraMQ for RabbitMQ から重複したメッセージを受信する場合があります。この重複がビジネスロジックに影響を与える場合は、メッセージがべき等であることを確実にする必要があります。メッセージは、以下の理由により重複することがあります。
-
送信中のメッセージの重複
メッセージがサーバーに送信されて永続化された後、瞬断やクライアントの機能停止が発生する可能性があります。これにより、サーバーがプロデューサーに確認応答を送信できなくなることがあります。プロデューサーがメッセージの送信に失敗したと判断して再送すると、コンシューマーは同じ内容とメッセージ ID を持つ 2 つの同一メッセージを受信します。
-
配信中のメッセージの重複
メッセージがコンシューマーに配信され、ビジネスロジックが処理されます。その後、クライアントがサーバーに確認応答を送信する際に瞬断が発生します。メッセージが少なくとも 1 回は確実に消費されるようにするために、ApsaraMQ for RabbitMQ サーバーはネットワークが回復した後にメッセージを再配信します。すると、コンシューマーは同じ内容とメッセージ ID を持つ重複メッセージを受信します。
-
リバランス中のメッセージの重複
ApsaraMQ for RabbitMQ サーバーまたはクライアントが再起動、スケールアウト、またはスケールインすると、リバランスがトリガーされます。このプロセスにより、コンシューマーが重複したメッセージを受信する可能性があります。また、リバランスは、ネットワークジッター、サーバー側の再起動、コンシューマーアプリケーションの再起動などのイベントが原因で発生することもあります。
実装方法
メッセージ ID をべき等キーとして使用してメッセージのべき等性を実装するには、次の手順に従います:
-
データベースに、一意のメッセージ ID を一意キーとして使用するテーブルを作成します。
-
プロデューサーのクライアントで、各メッセージに一意のメッセージ ID を設定します。
次のサンプルコードは、一意のメッセージ ID を設定する方法を示しています:
AMQP.BasicProperties props = new AMQP.BasicProperties.Builder().messageId(UUID.randomUUID().toString()).build(); channel.basicPublish("${ExchangeName}", "RoutingKey", true, props, ("Message Body" + i).getBytes(StandardCharsets.UTF_8));メッセージ ID の詳細については、「メッセージ ID の設定方法」をご参照ください。
-
コンシューマーのクライアントで、一意のメッセージ ID に基づいてメッセージのべき等性処理を行います。
次のサンプルコードは、一意のメッセージ ID に基づいてメッセージのべき等性処理を行う方法を示しています:
channel.basicConsume(Producer.QueueName, false, new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { // 1. 一意のビジネスインデックスのデータを取得します。 try{ String messageId = properties.getMessageId(); // メッセージ ID または一意キーとして使用できるその他の情報。 // 2. データベーストランザクションを開始します。 idempTable.insert(messageId); // 3. 受信したメッセージのビジネスロジックを処理します。 // 4. トランザクションをコミットまたはロールバックします。処理が成功した場合にのみ、確認応答 (ACK) を送信します。 channel.basicAck(envelope.getDeliveryTag(), false); } catch (DatabasePrimaryKeyConflictException e){ // これは重複メッセージです。直接確認応答します。 channel.basicAck(envelope.getDeliveryTag(), false); } } } );