すべてのプロダクト
Search
ドキュメントセンター

ApsaraMQ for RabbitMQ:メッセージの冪等性

最終更新日:Jun 05, 2025

メッセージの重複消費がビジネスに影響を与える場合は、メッセージに対して冪等処理を実行できます。このトピックでは、メッセージの冪等性の概念、シナリオ、および処理方法について説明します。

メッセージの冪等性とは

数学およびコンピューターサイエンスでは、冪等演算は、最初の適用以降の結果を変更することなく複数回適用できます。メッセージキューサービスでは、冪等性は、同じメッセージの重複消費を処理するために使用されます。コンシューマーがメッセージを繰り返し消費する場合、最終的な消費結果は最初の消費結果と同じであり、重複消費はビジネスシステムに悪影響を与えません。

たとえば、コンシューマーは、支払控除メッセージに基づいて注文の支払いを控除します。支払額は 100 米ドルです。ネットワークの問題により、メッセージがコンシューマーに繰り返し配信されます。その結果、メッセージが繰り返し消費されます。ただし、支払いは 1 回だけ控除され、注文に対して 100 米ドルの控除レコードが 1 つだけ記録されます。この例では、メッセージの冪等性はメッセージ消費プロセスで実装され、支払控除はビジネス要件を満たしています。

シナリオ

インターネットアプリケーション、特にネットワークが不安定な場合、ApsaraMQ for RabbitMQ メッセージが繰り返し消費される可能性があります。メッセージの重複消費がビジネスに影響を与える場合は、メッセージに対して冪等処理を実行できます。メッセージは、以下のシナリオで繰り返し消費される可能性があります。

  • プロデューサーがApsaraMQ for RabbitMQ ブローカーにメッセージを繰り返し送信する。

    メッセージがブローカーに送信されて永続化された後、ネットワークが切断されるか、クライアントが停止します。ブローカーはクライアントに応答できません。この場合、プロデューサーはブローカーがメッセージを受信していないと判断し、メッセージを再送信します。その結果、コンシューマーは同じコンテンツとメッセージ ID を持つ 2 つのメッセージを受信します。

  • ApsaraMQ for RabbitMQ ブローカーがコンシューマーにメッセージを繰り返し配信する。

    メッセージがコンシューマーに配信された後、ネットワークが切断され、コンシューマークライアントはブローカーに ACK 応答を返すことができません。この場合、ブローカーはメッセージが消費されたかどうかを認識しません。メッセージが少なくとも 1 回消費されるように、ブローカーはネットワークが回復した後、メッセージを再配信します。その結果、コンシューマーは同じコンテンツとメッセージ ID を持つ 2 つのメッセージを受信します。

  • 負荷分散が原因で、メッセージがコンシューマーに繰り返し配信される。負荷分散をトリガーする要因には、ネットワークのジッター、ブローカーの再起動、コンシューマーアプリケーションの再起動などが含まれますが、これらに限定されません。

    ApsaraMQ for RabbitMQ ブローカーまたはクライアントを再起動またはスケーリングすると、負荷分散がトリガーされます。負荷分散中、コンシューマーは重複メッセージを受信する可能性があります。

冪等処理方法

メッセージ ID を冪等キーとして使用して、メッセージに対して冪等処理を実行するには、次の手順を実行します。

  1. メッセージ ID フィールドをテーブルの一意キーとして使用するテーブルをデータベースに作成します。

  2. プロデューサークライアントで各メッセージに一意のメッセージ ID を設定します。

    サンプルコード:

    AMQP.BasicProperties props = new AMQP.BasicProperties.Builder().messageId(UUID.randomUUID().toString()).build();
    channel.basicPublish("${ExchangeName}", "RoutingKey", true, props, ("messageBody" + i).getBytes(StandardCharsets.UTF_8));

    メッセージ ID の詳細については、「メッセージ ID を指定するにはどうすればよいですか?」をご参照ください。

  3. コンシューマークライアントで、メッセージ ID に基づいてメッセージに対して冪等処理を実行します。

    サンプルコード:

    channel.basicConsume(Producer.QueueName, false, new DefaultConsumer(channel) {
    
        @Override public void handleDelivery(String consumerTag, Envelope envelope,
                    AMQP.BasicProperties properties, byte[] body) throws IOException {
            // 1. ビジネスの一意のキーデータを取得します。
            try{
                String messageId = properties.getMessageId();
                // メッセージ ID または一意のキーとして使用されるその他の情報。
                // 2. データベース トランザクションを開始します。
                idempTable.insert(messageId);
                // 3. ビジネス ロジックに基づいて受信したメッセージを処理します。
                // 4. トランザクションをコミットまたはロールバックします。// 処理が成功した場合、コンシューマー クライアントはメッセージを確認応答します。そうでない場合、コンシューマー クライアントはメッセージを確認応答しません。
                channel.basicAck(envelope.getDeliveryTag(), false);
            }
            catch (PrimaryKeyException e){
                // 重複メッセージを直接確認応答します。
                channel.basicAck(envelope.getDeliveryTag(), false);
            }
        }
    }
    );