メッセージの重複消費がビジネスに影響を与える場合は、メッセージに対して冪等処理を実行できます。このトピックでは、メッセージの冪等性の概念、シナリオ、および処理方法について説明します。
メッセージの冪等性とは
数学およびコンピューターサイエンスでは、冪等演算は、最初の適用以降の結果を変更することなく複数回適用できます。メッセージキューサービスでは、冪等性は、同じメッセージの重複消費を処理するために使用されます。コンシューマーがメッセージを繰り返し消費する場合、最終的な消費結果は最初の消費結果と同じであり、重複消費はビジネスシステムに悪影響を与えません。
たとえば、コンシューマーは、支払控除メッセージに基づいて注文の支払いを控除します。支払額は 100 米ドルです。ネットワークの問題により、メッセージがコンシューマーに繰り返し配信されます。その結果、メッセージが繰り返し消費されます。ただし、支払いは 1 回だけ控除され、注文に対して 100 米ドルの控除レコードが 1 つだけ記録されます。この例では、メッセージの冪等性はメッセージ消費プロセスで実装され、支払控除はビジネス要件を満たしています。
シナリオ
インターネットアプリケーション、特にネットワークが不安定な場合、ApsaraMQ for RabbitMQ メッセージが繰り返し消費される可能性があります。メッセージの重複消費がビジネスに影響を与える場合は、メッセージに対して冪等処理を実行できます。メッセージは、以下のシナリオで繰り返し消費される可能性があります。
プロデューサーがApsaraMQ for RabbitMQ ブローカーにメッセージを繰り返し送信する。
メッセージがブローカーに送信されて永続化された後、ネットワークが切断されるか、クライアントが停止します。ブローカーはクライアントに応答できません。この場合、プロデューサーはブローカーがメッセージを受信していないと判断し、メッセージを再送信します。その結果、コンシューマーは同じコンテンツとメッセージ ID を持つ 2 つのメッセージを受信します。
ApsaraMQ for RabbitMQ ブローカーがコンシューマーにメッセージを繰り返し配信する。
メッセージがコンシューマーに配信された後、ネットワークが切断され、コンシューマークライアントはブローカーに ACK 応答を返すことができません。この場合、ブローカーはメッセージが消費されたかどうかを認識しません。メッセージが少なくとも 1 回消費されるように、ブローカーはネットワークが回復した後、メッセージを再配信します。その結果、コンシューマーは同じコンテンツとメッセージ ID を持つ 2 つのメッセージを受信します。
負荷分散が原因で、メッセージがコンシューマーに繰り返し配信される。負荷分散をトリガーする要因には、ネットワークのジッター、ブローカーの再起動、コンシューマーアプリケーションの再起動などが含まれますが、これらに限定されません。
ApsaraMQ for RabbitMQ ブローカーまたはクライアントを再起動またはスケーリングすると、負荷分散がトリガーされます。負荷分散中、コンシューマーは重複メッセージを受信する可能性があります。
冪等処理方法
メッセージ ID を冪等キーとして使用して、メッセージに対して冪等処理を実行するには、次の手順を実行します。
メッセージ ID フィールドをテーブルの一意キーとして使用するテーブルをデータベースに作成します。
プロデューサークライアントで各メッセージに一意のメッセージ ID を設定します。
サンプルコード:
AMQP.BasicProperties props = new AMQP.BasicProperties.Builder().messageId(UUID.randomUUID().toString()).build(); channel.basicPublish("${ExchangeName}", "RoutingKey", true, props, ("messageBody" + i).getBytes(StandardCharsets.UTF_8));メッセージ ID の詳細については、「メッセージ ID を指定するにはどうすればよいですか?」をご参照ください。
コンシューマークライアントで、メッセージ ID に基づいてメッセージに対して冪等処理を実行します。
サンプルコード:
channel.basicConsume(Producer.QueueName, false, new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { // 1. ビジネスの一意のキーデータを取得します。 try{ String messageId = properties.getMessageId(); // メッセージ ID または一意のキーとして使用されるその他の情報。 // 2. データベース トランザクションを開始します。 idempTable.insert(messageId); // 3. ビジネス ロジックに基づいて受信したメッセージを処理します。 // 4. トランザクションをコミットまたはロールバックします。// 処理が成功した場合、コンシューマー クライアントはメッセージを確認応答します。そうでない場合、コンシューマー クライアントはメッセージを確認応答しません。 channel.basicAck(envelope.getDeliveryTag(), false); } catch (PrimaryKeyException e){ // 重複メッセージを直接確認応答します。 channel.basicAck(envelope.getDeliveryTag(), false); } } } );