This topic provides answers to frequently asked questions about message accumulation and latency when you use the ApsaraMQ for RocketMQ SDK for Java client over TCP. After you understand how the ApsaraMQ for RocketMQ client consumes messages and why messages are accumulated, you can better plan resources and configure settings to deploy your business. You can also adjust the business logic during O&M at the earliest opportunity to avoid impacts on business operations due to message accumulation and latency.
Background information
- Messages are continuously accumulated because the upstream and downstream capabilities of the business system cannot match each other. In addition, message consumption cannot be automatically recovered.
- The business system has high requirements on real-time message consumption and even cannot accept latency caused by short accumulation.
How does the client consume messages?
- Phase 1: Pull messages. The SDK client pulls messages from the ApsaraMQ for RocketMQ broker in batches by using the long polling mechanism. The pulled messages are cached in the local buffer queue.
High throughput is implemented in most intranet environments. For example, assume that a machine has a specification of 4 cores and 8 GB of memory. The transactions per second (TPS) of the machine can reach tens of thousands if the machine has a single thread and a single partition. The TPS can reach hundreds of thousands if the machine has multiple partitions. Therefore, this phase is not a bottleneck that causes message accumulation although the SDK client pulls messages in batches.
- Phase 2: Submit messages to the consumption thread. The SDK client submits the locally cached messages to the consumption thread and the thread uses the business consumption logic to process the messages.
The consumption capability of the client depends on the complexity of the business logic (consumption time) and the consumption concurrency. If the business processing logic is complex and it takes a long time to process a single message, the overall message throughput will not be high. This way, the local buffer queue on the client will reach the upper limit, and the client will stop pulling messages from the broker.
Consumption time
- Data reads and writes on external databases, such as MySQL databases.
- Data reads and writes on external cache systems, such as Redis.
- Downstream system calls, such as Dubbo calls and downstream HTTP interface calls.
Assume that data needs to be written to the database and the consumption time of a single message is 1 millisecond in the consumption logic of your business. No errors occur in most cases because the message volume is small. When the business side holds large promotions, the TPS of write operations on the database explodes and reaches the limit of the database capacity. As a result, the time of consuming a single message increases to 100 milliseconds. The consumption speed sharply dropped. You cannot solve this problem by only adjusting the consumption concurrency of the ApsaraMQ for RocketMQ SDK client. To fundamentally improve the consumption capability of the client, you must upgrade the database capacity.
Consumption concurrency
Message type | Consumption concurrency |
---|---|
Normal message | Number of threads per node × Number of nodes |
Scheduled message and delayed message | |
Transactional message | |
Ordered message | Min(Threads per node × Number of nodes, Number of partitions) |
- The number of vCPUs of a single node is C.
- The time consumed for thread switching is ignored, and I/O operations do not consume CPU resources.
- The thread has enough messages that wait for processing, and the memory is sufficient.
- In the logic, the CPU time is T1 and the external I/O operation time is T2.
How can I avoid message accumulation and latency?
To avoid unexpected message accumulation and latency, you must check and sort out the entire business logic in the early stage of design. You can sort out the performance baseline for normal business operations so that you can locate the blocking points when faults occur. The major task is to sort out the message consumption time and concurrency.
- Sort out the message consumption timeObtain the message consumption time by performing stress tests and analyze the code logic of time-consuming operations. For more information about how to query the consumption time, see Query the message consumption time. Take note of the following information when you sort out the message consumption time:
- Check whether the computing complexity of the message consumption logic is too high, and whether the code has defects, such as infinite loops and recursion.
- Check whether I/O operations, such as external calls and read/write storage, in the message consumption logic are required. In addition, check whether solutions such as local cache can be used to avoid these operations.
- Check whether complex and time-consuming operations in the consumption logic can be asynchronously processed. If the operations can be asynchronously processed, check whether the asynchronous operations will cause confusing logic. For example, the consumption is completed but the asynchronous operation is not completed.
- Set the message consumption concurrency
- Gradually increase the number of threads on a single node. Then, you can observe the metrics of the node to obtain the optimal number of consumption threads and maximum message throughput for the node.
- After you obtain the optimal number of threads and message throughput for a single node, calculate the number of required nodes. You can calculate it based on the peak traffic of the upstream and downstream links by using the formula: Number of nodes = Peak traffic/Message throughput of a single thread.
How do I resolve message accumulation and latency?
To avoid the impacts of message accumulation or latency on your business, you can set alert rules by using the monitoring and alerting feature provided by ApsaraMQ for RocketMQ. This way, the system sends you alerts on message accumulation. You can also use event tracking to monitor message accumulation and handle issues immediately after they occur. For more information about how to set alert rules, see Monitoring and alerting.
For more information about how to handle message accumulation alerts, see How can I handle accumulated messages?