Message accumulation, also known as consumer lag, is a common monitoring metric when using Kafka. Understanding and handling message accumulation is key to ensuring system stability, real-time performance, and data consistency.
What is Kafka message accumulation
Kafka message accumulation occurs when a consumer cannot keep up with the messages written by a producer. This causes unconsumed messages to build up in a partition.
Total message accumulation = Latest offset (all partitions) - Consumer offset (all partitions)
The higher the total message accumulation, the more severe the issue.
A total message accumulation near zero means the consumer is keeping up with the production rate.
Topic: test (Partition 0)
+----+----+----+----+----+----+----+
| M1 | M2 | M3 | M4 | M5 | M6 | M7 | ← 7 messages written
+----+----+----+----+----+----+----+
↑ ↑
Consumer offset M3 Latest offset (M7)
Topic: test (Partition 1)
+----+----+----+----+----+----+
| M1 | M2 | M3 | M4 | M5 | M6 | ← 6 messages written
+----+----+----+----+----+----+
↑ ↑
Consumer offset M3 Latest offset (M6)
Current total message accumulation = 7 - 3 + 6 - 3 = 7
7 unconsumed messages → 7 accumulated messagesTo resolve certain alerts, you can use ApsaraMQ for Kafka to reset the consumer offset of a topic partition to 0. When the consumer offset is 0, the accumulation is 0.
If a consumer offset does not exist (because the consumer has not committed an offset or the offset has expired and been cleared) and a consumer thread in the Group is online, then
Total message accumulation = Latest offset (all partitions) - Earliest offset (all partitions). If all consumer threads in the Group are offline, the accumulation is 0.
Root causes of message accumulation
Insufficient consumer processing capacity: Complex processing logic, slow I/O, or CPU/memory bottlenecks.
Sudden surge in production rate: Traffic peaks or batch imports.
Frequent consumer breakdowns or restarts: Crashes, long garbage collection (GC) pauses, or deployment updates.
Frequent rebalancing: Consumers frequently joining or leaving the Group, heartbeat timeouts, or session timeouts.
Issues in the consumer code: Infinite loops, uncaught exceptions, or long intervals between
poll()calls.Consumer rate limiting: The consumption rate reaches the reserved or elastic limit of the instance.
Delayed or failed offset commits: This leads to repeated pulls and false accumulation.
How to view message accumulation
For details about the accumulation metrics, see the documentation for your instance type:
Subscription/Pay-as-you-go: Prometheus monitoring
Serverless: Dashboard
Impact of message accumulation
Reduced real-time performance: Increased data processing latency affects business decisions.
Slower system response: Blocked consumer threads can cause timeouts and trigger circuit breakers.
Increased risk of rebalancing: Consumer processing delays lead to heartbeat timeouts, which triggers partition rebalancing. Frequent rebalancing extends the period when consumption is paused, increases the probability of repeated pulls, and worsens consumption latency. This creates a negative feedback loop.
Risk of out-of-memory (OOM) errors: If a consumer does not process messages promptly after calling
poll(), many messages accumulate in the client's memory buffer. This can cause a heap memory overflow.
How to resolve and optimize message accumulation
Increase consumer throughput capacity:
Add consumer instances: Add more consumers to the same Group. The number of partitions must be greater than or equal to the number of consumers (
Number of partitions ≥ Number of consumers).Increase the number of partitions: This increases the degree of parallelism.
Use asynchronous processing: Make time-consuming operations asynchronous to speed up the poll loop.
Use batch processing: Process multiple messages at once.
Adjust consumer parameters:
Parameter
Recommended value
Description
max.poll.records1 to 500
Pull fewer messages with each poll to reduce network overhead.
fetch.min.bytes1 KB to 1 MB
Increase throughput and reduce empty polling.
fetch.max.wait.ms500 ms
Wait for more data to be returned together.
session.timeout.ms30 s
Avoid incorrectly flagging a consumer as down.
heartbeat.interval.ms≤ session.timeout / 3
Maintain normal heartbeats.
enable.auto.committrue
Autocommit is recommended.
Take temporary emergency measures:
If the accumulation is too large to be processed quickly, you can reset the consumer offset to the latest offset.