すべてのプロダクト
Search
ドキュメントセンター

ApsaraMQ for RocketMQ:コンシューマー側のべき等性

最終更新日:Mar 11, 2026

ApsaraMQ for RocketMQ はメッセージを少なくとも 1 回配信します。つまり、コンシューマーが同じメッセージを複数回受信する可能性があります。支払いの引き落とし、在庫調整、注文作成など、重複に敏感なビジネスロジックの場合、べき等性のあるコンシューマー処理(べき等消費)を実装してください。べき等消費により、同じメッセージを複数回処理しても、1 回処理した場合と同じ結果が得られます。

たとえば、コンシューマーが 100 米ドルの注文に対する支払い引き落としメッセージを処理するとします。ネットワークの問題により、そのメッセージが 2 回配信された場合でも、べき等消費を実装していれば、支払いは 1 回のみ引き落とされ、その注文に対して 100 米ドルの引き落としレコードが 1 件のみ生成されます。

重複メッセージが発生する理由

重複メッセージは以下の 3 つのシナリオで発生します。

プロデューサーのリトライ

プロデューサーがメッセージを送信し、ApsaraMQ for RocketMQ のブローカーがそれを永続化します。しかし、一時的なネットワーク障害やプロデューサーのクラッシュにより、ブローカーがプロデューサーへの応答に失敗することがあります。プロデューサーはこれを送信失敗とみなし、再送信(リトライ)します。その結果、コンシューマーは内容が同じだがメッセージ ID が異なる 2 つのメッセージを受信します。

ブローカーによる再配信

コンシューマーがメッセージを受信・処理したものの、一時的なネットワーク障害によりブローカーへの応答(ACK)に失敗します。ブローカーはメッセージが正常に消費されたかどうかを確認できないため、「少なくとも 1 回」の保証を守るためにネットワークが回復後に再度メッセージを配信します。この場合、コンシューマーは内容もメッセージ ID も同じ 2 つのメッセージを受信します。

負荷分散

ネットワークジッター、ブローカーの再起動、コンシューマーアプリケーションの再起動などのイベントにより負荷分散がトリガーされます。リバランス中に、すでに配信済みのメッセージを再度受信することがあります。

メッセージ ID ではなくビジネスキーを使用する

最初に思いつくのはメッセージ ID を使って重複排除を行うことですが、これは信頼できません。プロデューサーのリトライのシナリオで説明したように、プロデューサーが送信をリトライすると、同じ論理メッセージであっても異なるメッセージ ID で到着することがあります。メッセージ ID に基づいて重複排除を行うと、このような重複を完全に見逃してしまいます。

代わりに、メッセージキーとして一意なビジネス識別子を割り当ててください。たとえば、注文 ID や支払いトランザクション ID、またはビジネス操作を一意に識別できる任意の値を使用します。このキーは、メッセージが何回送信・再配信されても一貫性を保ちます。これはメッセージングシステムによって生成されるものではなく、ビジネスロジックから導出されるものです。

このアプローチにより、予測可能な繰り返し性が確保されます。障害が発生してメッセージが再送された場合でも、同じビジネスコンテキストからは常に同じキーが生成されるため、信頼性の高い重複排除が可能になります。

べき等消費の実装

ステップ 1:プロデューサー側でメッセージキーを設定する

メッセージ送信時に、一意なビジネス識別子をメッセージキーとして設定します。

Message message = new Message();
message.setKey("ORDERID_100");
SendResult sendResult = producer.send(message);

ORDERID_100 は、実際の注文 ID やトランザクション ID などの一意なビジネス識別子に置き換えてください。

ステップ 2:コンシューマー側でメッセージキーを取得する

コンシューマーのコールバック内でメッセージキーを取得し、べき等処理を実施します。

consumer.subscribe("ons_test", "*", new MessageListener() {
    public Action consume(Message message, ConsumeContext context) {
        String key = message.getKey()
        // ビジネスを一意に識別するメッセージキーに基づいて、べき等処理を実行します。
    }
});

ステップ 3:重複排除ストアを使用してべき等性を保証する

メッセージキーだけでは重複処理を防げません。アプリケーション側で、そのキーがすでに処理済みかどうかをチェックする必要があります。一般的なパターンとして、一意制約付きのカラムを持つリレーショナルデータベースを使用する方法があります。

  1. 処理前に、メッセージキーを一意制約付きの重複排除テーブルに挿入します。

  2. 挿入が成功した場合は、メッセージを処理します。

  3. プライマリキーまたは一意制約違反により挿入に失敗した場合は、そのメッセージはすでに処理済みです。スキップします。

「挿入してからチェック」のパターンを使用し、「チェックしてから挿入」は避けてください。「チェックしてから挿入」の場合、2 つのスレッドがどちらも挿入前にチェックを通過してしまう可能性があり、重複処理が発生します。データベースの一意制約違反に依存することで、アトミックかつ競合状態に安全な処理が実現できます。

重複排除テーブルの例 (MySQL):

CREATE TABLE message_dedup (
    message_key VARCHAR(255) NOT NULL,
    created_at  TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
    PRIMARY KEY (message_key)
);

べき等コンシューマーのロジック例:

consumer.subscribe("ons_test", "*", new MessageListener() {
    public Action consume(Message message, ConsumeContext context) {
        String key = message.getKey();

        try {
            // メッセージキーを挿入します。すでに処理済みの場合は失敗します。
            insertMessageKey(key);
        } catch (DuplicateKeyException e) {
            // すでに処理済みです。スキップします。
            return Action.CommitMessage;
        }

        // ビジネスロジックを処理します。
        processOrder(key);

        return Action.CommitMessage;
    }
});

insertMessageKey および processOrder は、実際のデータベースおよびビジネスロジックのメソッドに置き換えてください。

リレーショナルデータベースがボトルネックとなるような高スループットのシナリオでは、代わりに Redis の SETNX (SET if Not eXists) を使用したアプローチを採用してください。

ベストプラクティス

プラクティス詳細
重複排除とビジネスロジックを単一のトランザクションでラップする重複排除レコードの挿入後にビジネスロジックが失敗した場合、重複排除レコードもロールバックされ、次回の配信時にメッセージを再試行できます。トランザクションを使用しないと、ビジネス操作が失敗しても重複排除レコードが残ってしまい、メッセージが再処理されなくなります。
重複排除ストアを定期的にクリーンアップする重複排除テーブルは時間とともに肥大化します。保持期間(例:7 日間)を設定し、古いレコードをバッチで削除して、ストレージの無限増加を防いでください。
スループットに適した重複排除ストアを選択する中程度のスループットには一意制約付きのリレーショナルデータベースを使用します。高スループットのシナリオでは、保持期間に合わせた TTL を持つ Redis の SETNX を使用し、自動クリーンアップも同時に実現します。