This topic describes the best practices for Message Queue for Apache Kafka consumers to help you reduce message consumption errors.

Basic process of message consumption

The following process describes how Message Queue for Apache Kafka consumers consume messages:

  1. Poll data.
  2. Execute the consumption logic.
  3. Poll data again.

Load balancing

Each group can contain multiple consumers. This means that you can start multiple Message Queue for Apache Kafka consumers and set the group.id parameter of these consumers to the same value. Consumers in the same group consume the topics that are subscribed to in load balancing mode.

For example, Consumer Group A subscribes to Topic A, and Consumers C1, C2, and C3 are started in Consumer Group A. In this case, each message destined for Topic A is transferred only to one of C1, C2, and C3. By default, Message Queue for Apache Kafka evenly transfers messages to different consumers to balance the consumption loads.

To implement load balancing in consumption, Message Queue for Apache Kafka evenly distributes the partitions of topics to the consumers. Therefore, the number of consumers cannot be greater than the number of partitions. Otherwise, specific consumers may be assigned no partitions and therefore enter the idle state. Load balancing is triggered when consumers are started for the first time or when consumers are restarted, added, or removed.

Frequent rebalancing at the consumer side

Rebalancing occurs at the consumer side if the heartbeat of a consumer times out. To prevent rebalancing, you can adjust related parameters or improve the consumption speed of the consumers. For more information, see Why does rebalancing frequently occurs on the client?

Number of partitions

The number of partitions affects the number of concurrent consumers.

Messages in one partition can be consumed by only one consumer within the same group. Therefore, the number of consumers cannot be greater than the number of partitions. Otherwise, specific consumers may be assigned no partitions and therefore enter the idle state.

By default, the number of partitions is set to 12 in the Message Queue for Apache Kafka console. This can meet the requirements in most scenarios. You can increase the value based on your business needs. We recommend that you set the number of partitions to a value in the range from 12 to 100. A value less than 12 may affect the message consumption and production performance. A value greater than 100 may trigger rebalancing at the consumer side.
Notice Partitions cannot be reduced. Therefore, we recommend that you slightly adjust the number of partitions.

Subscription modes

Message Queue for Apache Kafka supports the following subscription modes:

  • One group subscribes to multiple topics.

    One group can subscribe to multiple topics. Messages from multiple topics are evenly consumed by consumers in the group. For example, Group A subscribes to Topic A, Topic B, and Topic C. Messages from the three topics are evenly consumed by consumers in Group A.

    The following code shows an example in which one group subscribes to multiple topics:

    String topicStr = kafkaProperties.getProperty("topic");
    String[] topics = topicStr.split(",");
    for (String topic: topics) {
    subscribedTopics.add(topic.trim());
    }
    consumer.subscribe(subscribedTopics);
  • Multiple groups subscribe to one topic.

    Multiple groups can subscribe to the same topic, and each group separately consumes all messages from the topic. For example, Group A and Group B subscribe to Topic A. Each message destined for Topic A is transferred to the consumers in Group A and the consumers in Group B. The two transfer processes are independent of each other without mutual impacts.

One group for a single application

We recommend that you configure one group for one application. This means that different applications have different pieces of code. If you need to write different pieces of code in the same application, you must prepare different kafka.properties files, such as kafka1.properties and kafka2.properties.

Consumer offsets

Each topic has multiple partitions, and each partition counts the total number of current messages, which is the latest offset.

In Message Queue for Apache Kafka, a consumer consumes messages one by one in a partition and records the number of consumed messages, which is the consumer offset.

The number of unconsumed messages is calculated by subtracting the consumer offset from the latest offset. This number indicates the message accumulation status.

Commit consumer offsets

Message Queue for Apache Kafka provides the following parameters for consumers to commit consumer offsets:

  • enable.auto.commit: specifies whether to enable automatic commits. The default value is true.
  • auto.commit.interval.ms: the interval at which consumer offsets are automatically committed. The default value is 1000, which indicates 1 second.

After you set the two parameters, the consumer checks the last time when the consumer offset is committed before each poll. If the interval between this time and the current time exceeds the interval specified by the auto.commit.interval.ms parameter, the consumer commits a consumer offset.

Therefore, if the enable.auto.commit parameter is set to true, you must make sure that all the data polled last time has been consumed before each poll. Otherwise, unconsumed messages may be skipped.

To manually commit consumer offsets, set the enable.auto.commit parameter to false and call the commit(offsets) function.

Reset consumer offsets

A consumer offset is reset in the following scenarios:

  • No offset has been committed to the broker, such as when the consumer is brought online for the first time.
  • A message is pulled from an invalid offset. For example, the latest offset in a partition is 10, but consumers start consumption from Offset 11.
On the Java client, you can set the auto.offset.reset parameter to one of the following values to specify how a consumer offset is reset:
  • latest: resets the consumer offset to the latest offset.
  • earliest: resets the consumer offset to the earliest offset.
  • none: does not reset the consumer offset.
Note
  • We recommend that you set this parameter to latest instead of earliest. This prevents consumers from consuming messages from the very beginning due to an invalid offset. This way, the consumers do not need to repeatedly consume messages.
  • If you commit the offset by calling the commit(offsets) function, you can set this parameter to none.

Pull large messages

Consumers actively pull messages from the broker. When consumers pull large messages, you need to control the pulling speed by modifying the following parameters:

  • max.poll.records: the maximum size of messages that can be polled. If the size of a message exceeds 1 MB, we recommend that you set this parameter to 1.
  • fetch.max.bytes: the maximum number of bytes allowed for a single fetch operation. Set this parameter to a value that is slightly greater than the value of the max.poll.records parameter.
  • max.partition.fetch.bytes: the maximum number of bytes in a partition allowed for a single fetch operation. Set this parameter to a value that is slightly greater than the value of the max.poll.records parameter.

Consumers pull large messages one by one.

Message duplication and consumption idempotence

In Message Queue for Apache Kafka, the delivery semantics is at least once. This means that a message is delivered at least once to ensure that the message is not lost. However, this does not ensure that messages are not duplicated. When a network error occurs or the client restarts, a small number of messages may be repeatedly delivered. If the consumer is sensitive to message duplication (such as in order transactions), consumption idempotence must be implemented.

For example, if your application is a database application, you can perform the following operations for idempotence check:

  • When you send a message, pass in a key as a unique transaction ID.
  • When you consume a message, check whether the key has been consumed. If yes, skip the message. Otherwise, consume the message once.

If the application is not sensitive to the duplication of a few messages, the idempotence check is not required.

Consumption failure

Message Queue for Apache Kafka messages are consumed one by one in a partition. If a consumer fails to execute the consumption logic after it receives a message, you can use the following methods for troubleshooting. An example of such a failure is that a message fails to be processed due to the dirty data on the application server.

  • Make the system keep trying to execute the consumption logic upon failure. This method may block the consumption thread at the current message, resulting in message accumulation.
  • Message Queue for Apache Kafka is not designed to process failed messages. Therefore, you can export failed messages or store them in a service. For example, you can create a topic that is dedicated to storing failed messages. Then, you can regularly check the failed messages, analyze the causes, and take appropriate measures.

Consumption latency

In Message Queue for Apache Kafka, consumers proactively pull messages from the broker. The latency is low if consumers can promptly consume the data. If the latency is high, check whether specific messages are accumulated, and then increase the consumption speed.

Consumption blocking and accumulation

Consumption accumulation is the most common issue on the consumer side. It may be caused by the following reasons:

  • Consumption is slower than production. In this case, you need to increase the consumption speed. For more information, see Increase the consumption speed.
  • The consumer is blocked.

After the consumer receives a message, the consumer executes the consumption logic and usually makes remote calls. If the consumer waits for the call result at this time, the consumer may keep waiting, causing the consumption process to suspend.

The consumer needs to try to prevent the consumption thread from being blocked. If the consumer waits for the call result, we recommend that you set a timeout period for waiting. This way, the consumption is considered failed if no result is returned after the timeout period ends.

Increase the consumption speed

You can increase the consumption speed by using one of the following ways:

  • Add consumers.

    You can add consumers in a process and make sure that each consumer corresponds to one thread. Alternatively, you can deploy multiple consumer processes. If the number of consumers exceeds the number of partitions, the speed cannot be increased, and specific consumers become idle.

  • Add consumption threads.
    Adding a consumer is essentially the same as adding a consumption thread to increase the speed. Therefore, a more important method to improve the performance is to add a consumption thread. You can perform the following steps to achieve this objective:
    1. Define a thread pool.
    2. Poll data.
    3. Submit data to the thread pool for concurrent processing.
    4. Poll data again after the concurrent processing result is returned.

Filter messages

Message Queue for Apache Kafka does not provide semantics for message filtering. You can use one of the following methods to filter messages:

  • If you need to filter a few types of messages, you can use multiple topics.
  • If you need to filter many types of messages, we recommend that you filter the messages by business on the client.

You can select one of the methods as required or integrate both methods.

Broadcast messages

Message Queue for Apache Kafka does not provide semantics for message broadcasting. You can simulate message broadcasting by creating different groups.

Subscriptions

To facilitate troubleshooting, we recommend that you specify that consumers in the same group subscribe to the same topics.