All Products
Search
Document Center

ApsaraMQ for Kafka:Best practices for consumers

Last Updated:Jan 15, 2024

This topic describes the best practices for ApsaraMQ for Kafka consumers to help you reduce message consumption errors.

Process of message consumption

ApsaraMQ for Kafka consumers follow the following process to consume messages: poll data, execute the consumption logic, and poll data again. The following figure describes the process.

image

Load balancing

Each group in ApsaraMQ for Kafka consists of multiple consumer instances. You can start multiple consumers and set the group.id parameter of the consumers to the same value. Consumers in the same group consume messages from topics to which the group subscribes in load balancing mode.

For example, Group A subscribes to Topic A, and consumers C1, C2, and C3 are started in the group. In this case, each message that Topic A receives is delivered only to one of C1, C2, and C3. By default, ApsaraMQ for Kafka evenly delivers messages to consumers to balance the consumption load.

To implement load balancing in consumption, ApsaraMQ for Kafka evenly distributes the partitions of the subscribed topics to each consumer. The number of consumers cannot be greater than the number of partitions. Otherwise, specific consumers may not be assigned partitions and may enter the idle state. Load balancing is triggered when consumers are first started, restarted, added, or removed.

Frequent rebalances on consumer clients

Rebalances occur on consumer clients if the heartbeats of the clients time out. To prevent rebalances, you can change the related parameters or increase the consumption rate. For more information, see Why do rebalances frequently occur on my consumer client?

Partitions

The number of partitions affects the number of concurrent consumers.

Messages in one partition can be consumed by only one consumer in the same group. The number of consumers cannot be greater than the number of partitions. Otherwise, specific consumers may not be assigned partitions and may enter the idle state.

By default, the number of partitions is set to 12 in the ApsaraMQ for Kafka console. This can meet business requirements in most scenarios. You can increase the value based on your business requirements. We recommend that you set the number of partitions to a value within the range of 12 to 100. A value less than 12 may affect message production and consumption performance. A value greater than 100 may trigger rebalances on consumer clients.

Important

You cannot reduce the number of partitions after partition increase. We recommend that you slightly increase the number of partitions.

Subscription modes

ApsaraMQ for Kafka supports the following subscription modes:

  • One group subscribes to multiple topics

    One group can subscribe to multiple topics. In this subscription mode, messages from multiple topics are evenly consumed by consumers in the group. For example, Group A subscribes to Topic A, Topic B, and Topic C. Messages from the three topics are evenly consumed by consumers in Group A.

    Sample code:

    String topicStr = kafkaProperties.getProperty("topic");
    String[] topics = topicStr.split(",");
    for (String topic: topics) {
    subscribedTopics.add(topic.trim());
    }
    consumer.subscribe(subscribedTopics);
  • Multiple groups subscribe to one topic

    Multiple groups can subscribe to the same topic. In this subscription mode, each group separately consumes all messages from the topic. For example, Group A and Group B subscribe to Topic A. Each message that Topic A receives is delivered to consumers in Group A and Group B. The delivery processes are independent of each other without mutual impacts.

One group for one application

We recommend that you configure one group for one application. This means that the code that is used by each application to consume messages varies. If you want to write different pieces of code in the same application, you must prepare different kafka.properties files, such as kafka1.properties and kafka2.properties.

Consumer offsets

In ApsaraMQ for Kafka, each topic has multiple partitions. Each partition calculates the total number of messages, which is known as the maximum offset.

An ApsaraMQ for Kafka consumer consumes messages in a partition in sequence and records the number of consumed messages, which is known as the consumer offset.

The number of unconsumed messages is calculated by subtracting the consumer offset from the maximum offset. This number indicates the number of accumulated messages.

Commit consumer offsets

ApsaraMQ for Kafka provides the following parameters for consumers to commit consumer offsets:

  • enable.auto.commit: specifies whether to enable automatic commits. Default value: true.

  • auto.commit.interval.ms: the interval at which consumer offsets are automatically committed. Default value: 1000. Unit: milliseconds.

After you configure the preceding parameters, the client checks the time when the consumer offset was last committed before each poll. If the interval between the time when the offset was last committed and the current time exceeds the interval that is specified by the auto.commit.interval.ms parameter, the client commits a consumer offset.

If you set the enable.auto.commit parameter to true, you must make sure that all data that was last polled is consumed before each poll. Otherwise, unconsumed messages may be skipped.

To manually commit consumer offsets, set the enable.auto.commit parameter to false and call the commit(offsets) function.

Reset consumer offsets

Consumer offsets are reset in the following scenarios:

  • No offset is committed to the broker. For example, the consumer is first connected to the broker.

  • Messages are pulled from an invalid offset. For example, the maximum offset in a partition is 10, but consumers start consumption from Offset 11.

On Java clients, you can set the auto.offset.reset parameter to one of the following values to specify how to reset a consumer offset:

  • latest: resets the consumer offset to the maximum offset.

  • earliest: resets the consumer offset to the minimum offset.

  • none: does not reset the consumer offset.

Note
  • We recommend that you set this parameter to latest instead of earliest. This prevents consumers from consuming messages from the very beginning due to an invalid offset.

  • If you commit the offset by calling the commit(offsets) function, you can set this parameter to none.

Pull large messages

During message consumption, clients proactively pull messages from the broker. If you want to pull large messages, you can control the pulling rate by configuring the following parameters:

  • max.poll.records: the maximum number of messages that is returned for a single call to the poll method. If each message exceeds 1 MB in size, we recommend that you set this parameter to 1.

  • fetch.max.bytes: the maximum amount of data that is returned for a fetch request. Set this parameter to a value that is slightly greater than the size of a single message.

  • max.partition.fetch.bytes: the maximum amount of data that is returned for a partition for a fetch request. Set this parameter to a value that is slightly greater than the size of a single message.

Clients pull large messages one by one.

Message duplication and consumption idempotence

The delivery semantics in ApsaraMQ for Kafka is at least once. This means that a message is delivered at least once to ensure that the message is not lost. However, this does not ensure that the message is not duplicated. When a network error occurs or the client restarts, a small number of messages may be repeatedly delivered. If the consumer is sensitive to message duplication, such as in the online transaction scenario, consumption idempotence must be implemented.

If your application is a database application, you can perform the following operations to implement an idempotence check:

  • When you send a message, specify a key as a unique transaction ID.

  • When you consume a message, check whether the key is consumed. If the key is consumed, skip the message. If the key is not consumed, consume the message once.

If your application is not sensitive to the duplication of a few messages, an idempotence check is not required.

Consumption failures

Messages in ApsaraMQ for Kafka are consumed one by one in a partition. If a consumer fails to execute the consumption logic after the consumer receives a message, you can use the following methods for troubleshooting. An example of such a failure is that a message fails to be processed due to dirty data on the application server.

  • Keep trying to execute the consumption logic upon failure. This method may block the consumption thread at the current message and cause message accumulation.

  • ApsaraMQ for Kafka does not specify the logic to process failed messages. You can export failed messages or store the messages in a service. For example, you can create a topic that is dedicated to storing failed messages. Then, you can regularly check the failed messages, analyze the causes, and take appropriate measures.

Consumption latency

In ApsaraMQ for Kafka, clients proactively pull messages from the broker. If clients can promptly consume data, the latency is low. If the latency is high, check whether messages are accumulated and then increase the consumption rate based on your business requirements.

Consumption blocking and message accumulation

Message accumulation is the most common issue on consumer clients. The issue may be caused by the following reasons:

  • The consumption rate is lower than the production rate. In this case, you must increase the consumption rate. For more information, see Increase consumption rate.

  • The consumer thread is blocked.

After a consumer receives a message, the consumer initiates remote calls to execute the consumption logic. If the consumer waits for the call result during this process, the consumer may keep waiting. This causes the consumption process to suspend.

A consumer must try to prevent the consumption thread from being blocked. If a consumer needs to wait for the call result, we recommend that you specify a timeout period for waiting. This way, if no result is returned after the timeout period elapses, the consumption is considered failed and the subsequent messages can continue to be consumed.

Increase consumption rate

You can use one of the following methods to increase the consumption rate:

  • Add consumers

    You can add consumers in a process and make sure that each consumer corresponds to one thread. Alternatively, you can deploy multiple consumer processes. If the number of consumers exceeds the number of partitions, the consumption rate cannot be increased and specific consumers become idle.

  • Add consumption threads

    Adding a consumer is the same as adding a consumption thread to increase the consumption rate. Therefore, an important method to improve the performance is to add consumption threads. You can perform the following steps to add consumption threads:

    1. Define a thread pool.

    2. Poll data.

    3. Submit data to the thread pool for concurrent processing.

    4. After the concurrent processing result is returned, poll data again.

Filter messages

ApsaraMQ for Kafka does not provide semantics for message filtering. You can use one of the following methods to filter messages:

  • If you want to filter only a few types of messages, you can use multiple topics.

  • If you want to filter many types of messages, we recommend that you filter the messages by business on the client.

You can use one of the methods or integrate both methods based on your business requirements.

Broadcast messages

ApsaraMQ for Kafka does not provide semantics for message broadcasting. You can simulate message broadcasting by creating different groups.

Subscriptions

To facilitate troubleshooting, we recommend that consumers in the same group subscribe to the same topics.