This topic describes the best practices of Message Queue for Apache Kafka subscribers to help you reduce the possibility of message consumption errors.

Basic process of message consumption

Message Queue for Apache Kafka subscribers use the following message consumption process:

  1. Poll data.
  2. Execute the consumption logic.
  3. Poll data again.

Load balancing

Each consumer group can contain multiple consumer instances. Specifically, you can enable multiple Message Queue for Apache Kafka consumers and set the group.id parameter to the same value for the consumers. Consumer instances in the same consumer group consume the subscribed topics in load balancing mode.

For example, consumer group A has subscribed to topic A and enabled consumer instances C1, C2, and C3. In this case, each message sent to topic A will only be sent to one of C1, C2, and C3. By default, Message Queue for Apache Kafka evenly transfers messages to different consumer instances to balance the consumption loads.

To achieve load balancing in consumption, Message Queue for Apache Kafka evenly distributes the partitions of subscribed topics to the consumer instances. Therefore, the number of consumer instances cannot be greater than the number of partitions. Otherwise, some instances may not be assigned with any partitions and will be in the dry-run state. In addition, load balancing is triggered not only during first launch, but also when a consumer instance is restarted, increased, or decreased.

Each Message Queue for Apache Kafka topic contains 16 partitions by default, which is sufficient for most scenarios. In addition, the number of partitions will be adjusted for cloud services based on the capacity.

Multiple subscriptions

Message Queue for Apache Kafka supports the following modes:

  • A consumer group subscribes to multiple topics.

    A consumer group can subscribe to multiple topics. Messages from multiple topics are evenly consumed by consumers in the consumer group. For example, consumer group A has subscribed to topic A, topic B, and topic C, so the messages from the three topics are evenly consumed by consumers in consumer group A.

    The following sample code for subscription to multiple topics by a consumer group is provided:

    String topicStr = kafkaProperties.getProperty("topic");
    String[] topics = topicStr.split(",");
    for (String topic: topics) {
    subscribedTopics.add(topic.trim());
    }
    consumer.subscribe(subscribedTopics);
  • Multiple consumer groups subscribe to a topic.

    Multiple consumer groups can subscribe to the same topic, and each consumer group separately consumes all messages under the topic. For example, consumer groups A and B have both subscribed to topic A. Each message sent to topic A will be transferred to the consumer instances in both consumer groups A and B. The two processes are independent of each other without mutual effects.

One consumer group for a single application

We recommend that you configure one consumer group for one application. Specifically, different applications correspond to different pieces of code. If you need to write different pieces of code in the same application, you must prepare multiple different kafka.properties, such as kafka1.properties and kafka2.properties.

Consumer offset

Each topic contains multiple partitions, and each partition counts the total number of current messages, which is the maximum offset MaxOffset.

In Message Queue for Apache Kafka, a consumer sequentially consumes messages in the partition and records the number of consumed messages, which is ConsumerOffset.

Number of unconsumed messages (accumulated messages) = MaxOffset - ConsumerOffset

Consumer offset committing

Message Queue for Apache Kafka provides the following consumer offset committing parameters for consumers:

  • enable.auto.commit: The default value is true.
  • auto.commit.interval.ms: The default value is 1000, indicating 1 second.

After you set the two parameters, the system checks the last consumer offset committing time before each data polling. If the interval between this time and the current time exceeds the interval specified by the auto.commit.interval.ms parameter, the consumer commits a consumer offset.

Therefore, if the enable.auto.commit parameter is set to true, you must ensure that all the data polled last time has been consumed before each poll. Otherwise, unconsumed messages may be skipped.

To control offset committing, you must set the enable.auto.commit parameter to false and call the commit(offsets) function.

Consumer offset resetting

The consumer offset is reset in the following scenarios:

  • No offset has been committed to the broker, for example, when the consumer is brought online for the first time.
  • A message is pulled from an invalid offset. For example, the maximum offset in a partition is 10, but the consumer starts consumption from offset 11.
On the Java client, you can configure the following resetting policies by using the auto.offset.reset parameter.
  • latest: Reset the consumer offset to the maximum offset.
  • earliest: Reset the consumer offset to the minimum offset.
  • none: Do not reset the consumer offset.
Note
  • We recommend that you set this parameter to latest instead of earliest to prevent heavily repetitive consumption when consumption starts from the beginning due to an invalid offset.
  • If you manage the offset, you can set the parameter to none.

Large message pulling

During consumption, the consumer actively pulls messages from the broker. When the consumer pulls large messages, you need to control the pulling speed by modifying the following parameters:

  • max.poll.records: If the size of a message exceeds 1 MB, we recommend that you set this parameter to 1.
  • fetch.max.bytes: Set this parameter to a value that is slightly larger than the size of a single message.
  • max.partition.fetch.bytes: Set this parameter to a value that is slightly larger than the size of a single message.

Large messages are pulled one by one.

Message duplication and consumption idempotence

In Message Queue for Apache Kafka, the semantics for consumption is consuming each message "at least once". Specifically, a message is delivered at least once to ensure that the message will not be lost. However, this does not ensure that messages are not duplicated. When a network error occurs or the client restarts, a small number of messages may be duplicated. In this case, if the application consumer is sensitive to message duplication (for example, order transactions), the messages must be idempotent.

The following common practices are for database applications:

  • When you send a message, pass in a key as a unique sequence ID.
  • When you consume a message, check whether the key has been consumed. If yes, skip the message. If no, consume the message once.

Certainly, if the application is not sensitive to duplication of a few messages, the idempotence check is not required.

Consumption failure

Message Queue for Apache Kafka messages are consumed one by one in a partition. If the consumer fails to execute the consumption logic after it receives a message, for example, a message fails to be processed due to dirty data on the application server, you can use the following methods to handle this issue:

  • Make the system keep trying to execute the consumption logic upon failure. This method may block the consumption thread at the current message, resulting in message accumulation.
  • Message Queue for Apache Kafka is not designed to process failed messages. Therefore, you can print failed messages or store them to a service. For example, you can create a topic that is dedicated to store failed messages. Then, you can check the failed messages regularly, analyze the causes, and take appropriate measures.

Consumption latency

In Message Queue for Apache Kafka, the consumer automatically pulls messages from the broker to consume. Therefore, if the consumer can consume the data promptly, the latency is low. If the latency is high, first check whether any messages are accumulated, and then increase the consumption speed.

Consumption blocking and accumulation

The most common issue on a consumer is consumption accumulation. The most common causes for accumulation are as follows:

  • Consumption is slower than production. In this case, you need to increase the consumption speed. For more information, see Consumption speed increase.
  • The consumer is blocked.

After receiving a message, the consumer executes the consumption logic and usually makes some remote calls. If the consumer waits for the call result at this time, the consumer may keep waiting, causing the consumption process to suspend.

The consumer needs to try to prevent consumption thread blocking. If the consumer waits for the call result, we recommend that you set a timeout period for waiting, so that the consumption is considered failed if no result is returned within the set timeout period.

Consumption speed increase

You can increase the consumption speed in either of the following ways:

  • Add consumer instances.

    You can add consumer instances in a process and ensure that each instance corresponds to one thread. Alternatively, you can deploy multiple consumer instance processes. When the number of instances exceeds the number of partitions, the speed cannot be increased and some consumer instances become idle.

  • Add consumption threads.
    Adding a consumer instance is essentially the same as adding a consumption thread to increase the speed. Therefore, to improve the performance, it is more important to add a consumption thread. You can perform the following basic steps:
    1. Define a thread pool.
    2. Poll data.
    3. Submit data to the thread pool for concurrent processing.
    4. Poll data again after the concurrent processing result is returned.

Message filtering

Message Queue for Apache Kafka does not provide any semantics for filtering messages. You can use either of the following methods to filter messages:

  • If a few types of messages need to be filtered, you can use multiple topics to filter them.
  • If many types of messages need to be filtered, we recommend that you filter the messages by services on the client.

You can select either of the methods as required or integrate both methods.

Message broadcasting

Message Queue for Apache Kafka does not provide semantics for broadcasting a message. You can simulate message broadcasting by creating different consumer groups.

Subscription relationship

To facilitate troubleshooting, we recommend that consumer instances in the same consumer group subscribe to the same topics.