すべてのプロダクト
Search
ドキュメントセンター

ApsaraMQ for RocketMQ:バッチ消費

最終更新日:Mar 12, 2026

バッチ消費は、メッセージを一度に 1 つずつではなく、単一のディスパッチで複数のメッセージをコンシューマースレッドに配信します。これにより、ダウンストリームシステムへのリモートプロシージャコール (RPC) のオーバーヘッドが削減され、メッセージのスループットが向上します。

仕組み

プッシュコンシューマーは、2 つの段階でバッチ消費を処理します:

  1. プルとキャッシュ:メッセージプルスレッドは、ロングポーリングを使用して ApsaraMQ for RocketMQ からメッセージを取得し、ローカルにキャッシュします。

  2. ディスパッチ:キャッシュされたメッセージがバッチサイズのしきい値または待機時間のしきい値に達すると (どちらか早い方)、プッシュコンシューマーはバッチをコンシューマースレッドに送信して処理します。

batch_consume
説明 ApsaraMQ for RocketMQ は、プッシュコンシューマーとプルコンシューマーの両方をサポートしています。バッチ消費はプッシュコンシューマーにのみ適用されます。詳細については、「用語」をご参照ください。

利用シーン

バッチ消費は、ダウンストリームシステムがバッチ処理からメリットを得られる場合に最も効果的です。並列処理の向上のみが目的の場合は、まずコンシューマーインスタンスの追加やスレッドプールサイズの調整など、より簡単な代替手段を検討してください。

  • 一括インデックス作成:アップストリームの注文システムがログメッセージを公開し、ダウンストリームの Elasticsearch クラスターがそれをインデックス化します。各メッセージは 1 つの RPC リクエスト (約 10 ms) をトリガーします。10 件のメッセージを個別に処理すると 100 ms かかりますが、それらを単一の一括インデックス作成呼び出しにまとめることで、合計時間は約 10 ms に短縮されます。

  • データベースへの一括挿入:アプリケーションが高い更新頻度で一度に 1 つずつレコードをデータベースに挿入すると、高い負荷が発生します。挿入ごとに 10 レコードをバッチ処理し、5 秒ごとにフラッシュすることで、接続のオーバーヘッドと書き込み増幅が削減されます。

制限事項

  • バッチ消費は TCP 経由でのみサポートされます。TCP クライアント SDK for Java の商用版、バージョン 1.8.7.3.Final 以降を使用してください。リリースノートとダウンロード手順については、「リリースノート」をご参照ください。

  • 最大バッチサイズ:1,024 メッセージ

  • バッチ間の最大待機時間:450 秒

パラメーター

バッチがいつディスパッチされるかは、2 つのパラメーターでコントロールします。ディスパッチは、いずれかの条件が最初に満たされたときに発生します。

パラメータータイプデフォルト値有効値説明
ConsumeMessageBatchMaxSizeString321~1,024バッチあたりの最大メッセージ数。キャッシュされたメッセージの数がこの値に達すると、SDK はただちにバッチをコンシューマースレッドにディスパッチします。
BatchConsumeMaxAwaitDurationInSecondsString00~450最大待機時間 (秒)。この間隔が経過すると、バッチサイズのしきい値に達していなくても、SDK は蓄積されたメッセージをディスパッチします。

サンプルコード

ONSFactory.createBatchConsumer() に渡される Properties を通じてバッチ消費を設定します。BatchMessageListener コールバックは、最大で ConsumeMessageBatchMaxSize 個のメッセージを含む List<Message> を受け取ります。

import com.aliyun.openservices.ons.api.Action;
import com.aliyun.openservices.ons.api.ConsumeContext;
import com.aliyun.openservices.ons.api.Message;
import com.aliyun.openservices.ons.api.batch.BatchConsumer;
import com.aliyun.openservices.ons.api.batch.BatchMessageListener;
import java.util.List;
import java.util.Properties;

import com.aliyun.openservices.ons.api.ONSFactory;
import com.aliyun.openservices.ons.api.PropertyKeyConst;
import com.aliyun.openservices.tcp.example.MqConfig;

public class SimpleBatchConsumer {

    public static void main(String[] args) {
        Properties consumerProperties = new Properties();
        consumerProperties.setProperty(PropertyKeyConst.GROUP_ID, MqConfig.GROUP_ID);
        consumerProperties.setProperty(PropertyKeyConst.AccessKey, MqConfig.ACCESS_KEY);
        consumerProperties.setProperty(PropertyKeyConst.SecretKey, MqConfig.SECRET_KEY);
        consumerProperties.setProperty(PropertyKeyConst.NAMESRV_ADDR, MqConfig.NAMESRV_ADDR);

        // バッチあたりの最大メッセージ数を設定します。
        // デフォルト値:32。有効値:1~1024。
        consumerProperties.setProperty(PropertyKeyConst.ConsumeMessageBatchMaxSize, String.valueOf(128));
        // バッチ間の最大待機時間 (秒) を設定します。
        // デフォルト値:0。有効値:0~450。
        consumerProperties.setProperty(PropertyKeyConst.BatchConsumeMaxAwaitDurationInSeconds, String.valueOf(10));

        BatchConsumer batchConsumer = ONSFactory.createBatchConsumer(consumerProperties);
        batchConsumer.subscribe(MqConfig.TOPIC, MqConfig.TAG, new BatchMessageListener() {

             @Override
            public Action consume(final List<Message> messages, ConsumeContext context) {
                System.out.printf("Batch-size: %d\n", messages.size());
                // メッセージをバッチで処理します。
                return Action.CommitMessage;
            }
        });
        // BatchConsumer を開始します。
        batchConsumer.start();
        System.out.println("Consumer start success.") ;

        // プロセスが終了しないように、一定期間待機します。
        try {
            Thread.sleep(200000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}
説明

ベストプラクティス

バッチサイズと待機時間の同時調整

ディスパッチは、バッチサイズまたは待機時間のいずれかのしきい値に達したときにトリガーされます。ワークロードに合わせて両方のパラメーターを設定してください:

  • 高スループットのシナリオConsumeMessageBatchMaxSize を大きな値 (例:128 または 256) に設定し、BatchConsumeMaxAwaitDurationInSeconds を短い間隔 (例:1~5 秒) に設定します。これにより、完全なバッチを待たずに頻繁にバッチがディスパッチされます。

  • 低スループットのシナリオ:非常に小さなバッチのディスパッチを避けるために、中程度のバッチサイズ (例:32) と長めの待機時間 (例:10~30 秒) を設定します。

例:ConsumeMessageBatchMaxSize を 128 に、BatchConsumeMaxAwaitDurationInSeconds を 1 に設定した場合、蓄積されたメッセージが 128 未満であっても、1 秒後にバッチがディスパッチされます。この場合、コールバック内の messages.size() は 128 未満の値を返します。

消費のべき等性の実装

より良いバッチ消費を実現するために、コンシューマークライアントでメッセージのべき等性を実装し、メッセージが一度だけ処理されるようにしてください。詳細については、「消費のべき等性」をご参照ください。

参考文献