ApsaraMQ for RocketMQ provides the batch consumption feature. You can use this feature to efficiently process messages or reduce the number of API calls made by downstream resources. This topic describes the definition, benefits, scenarios, limits, and sample code of the batch consumption feature.

What is batch consumption?

  • Definition

    ApsaraMQ for RocketMQ provides the batch consumption feature. This feature enables push consumers to submit messages in batches to consumer threads. Then, the consumer threads consume the messages in batches.

    Note ApsaraMQ for RocketMQ provides push consumers and pull consumers based on whether consumers obtain messages in push or pull mode. For more information, see Terms.
  • How it works
    Batch consumption is divided into the following two stages:
    1. Producers publish messages to ApsaraMQ for RocketMQ. Then, the message pull threads of the push consumer pull the messages by using the long polling policy and cache them to the backend of ApsaraMQ for RocketMQ.
    2. The push consumer determines whether to submit messages to consumer threads for batch consumption based on whether the cached messages meet one of the specified conditions for batch consumption.
    The following figure shows the process of batch consumption. batch_consume

Limits

  • Batch consumption is supported only over TCP. Make sure that you use the commercial edition of TCP client SDK for Java whose version is 1.8.7.3.Final or later. For information about the release notes of the SDK and how to obtain the SDK, see Release notes.
  • A maximum of 1,024 messages can be submitted in one batch. The maximum wait time between batches is 450 seconds.

Benefits and scenarios

The following information describes the benefits and scenarios of the batch consumption feature:

  • Benefit 1: Improve message throughput and processing efficiency

    Scenario: ApsaraMQ for RocketMQ decouples the upstream order system from the downstream Elasticsearch system. The Elasticsearch system consumes 10 log messages from the upstream order system. For the Elasticsearch system, each message equals to a remote procedure call (RPC) request. Assume that one RPC request consumes 10 milliseconds. If the batch consumption feature is not used, a total of 100 milliseconds are required to consume the 10 log messages. If the batch consumption feature is used, the 10 messages are consumed in one batch and the consumption time is reduced to 10 milliseconds. This way, messages can be efficiently processed.

  • Benefit 2: Reduce the number of API calls made by downstream resources

    Scenario: Assume that you want to insert data into a database. If you run an insert job for each new piece of data and need to frequently update data, the database may be under high pressure. You can set related parameters to insert 10 pieces of data in one batch and perform insert operations every 5 seconds to reduce system pressure.

Sample code

The following sample code provides an example of batch consumption:

import com.aliyun.openservices.ons.api.Action;
import com.aliyun.openservices.ons.api.ConsumeContext;
import com.aliyun.openservices.ons.api.Message;
import com.aliyun.openservices.ons.api.batch.BatchConsumer;
import com.aliyun.openservices.ons.api.batch.BatchMessageListener;
import java.util.List;
import java.util.Properties;

import com.aliyun.openservices.ons.api.ONSFactory;
import com.aliyun.openservices.ons.api.PropertyKeyConst;
import com.aliyun.openservices.tcp.example.MqConfig;

public class SimpleBatchConsumer {

    public static void main(String[] args) {
        Properties consumerProperties = new Properties();
        consumerProperties.setProperty(PropertyKeyConst.GROUP_ID, MqConfig.GROUP_ID);
        consumerProperties.setProperty(PropertyKeyConst.AccessKey, MqConfig.ACCESS_KEY);
        consumerProperties.setProperty(PropertyKeyConst.SecretKey, MqConfig.SECRET_KEY);
        consumerProperties.setProperty(PropertyKeyConst.NAMESRV_ADDR, MqConfig.NAMESRV_ADDR);

        // Set the maximum number of messages to be consumed in a batch. When the number of messages accumulated under a specified topic reaches this value, the SDK immediately calls the callback method to consume these messages. Default value: 32. Valid values: 1 to 1024. In this example, the value is set to 128.
        consumerProperties.setProperty(PropertyKeyConst.ConsumeMessageBatchMaxSize, String.valueOf(128));
        // Set the maximum wait time between batches. The SDK immediately calls the callback method to consume messages after the specified wait time. Default value: 0. Valid values: 0 to 450. Unit: seconds. In this example, the value is set to 10 seconds.
        consumerProperties.setProperty(PropertyKeyConst.BatchConsumeMaxAwaitDurationInSeconds, String.valueOf(10));

        BatchConsumer batchConsumer = ONSFactory.createBatchConsumer(consumerProperties);
        batchConsumer.subscribe(MqConfig.TOPIC, MqConfig.TAG, new BatchMessageListener() {

             @Override
            public Action consume(final List<Message> messages, ConsumeContext context) {
                System.out.printf("Batch-size: %d\n", messages.size());
                // Process messages in batches.
                return Action.CommitMessage;
            }
        });
        // Start BatchConsumer.
        batchConsumer.start();
        System.out.println("Consumer start success.") ;

        // Wait for a specified period of time to prevent the process from exiting.
        try {
            Thread.sleep(200000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}         

The following table describes the parameters.

ParameterTypeRequiredDescription
ConsumeMessageBatchMaxSizeStringNo
Note If no parameter value is specified, the default value is used.
The maximum number of messages to be consumed in a batch. If the number of cached messages reaches the specified value of this parameter, the SDK of the push consumer client submits the messages to consumer threads at a time for batch consumption. Valid values: 1 to 1024. Default value: 32.
BatchConsumeMaxAwaitDurationInSecondsStringThe maximum wait time between batches. If the wait time specified by this parameter is reached, ApsaraMQ for RocketMQ pushes messages to consumers in a batch. Valid values: 1 to 450. Default value: 0. Unit: seconds.
Note

Best practices

Set the values of the ConsumeMessageBatchMaxSize and BatchConsumeMaxAwaitDurationInSeconds parameters based on your needs. Batch consumption is triggered if the triggering condition specified by one of the parameters is met. Assume that the ConsumeMessageBatchMaxSize parameter is set to 128 and the BatchConsumeMaxAwaitDurationInSeconds parameter is set to 1. Even if less than 128 messages are cached within 1 second, batch consumption is still triggered. In this case, a value less than 128 will be returned for the Batch-size parameter.

To achieve better batch consumption, we recommend that you implement message idempotence on your consumer client to ensure that a message is processed only once. For more information about message idempotence, see Consumption idempotence.

References

Commercial edition of TCP client SDK for Java