Details and Best Practices of RocketMQ Client Load Balancing Mechanism

Preface

This article introduces the RocketMQ load balancing mechanism, mainly involving the timing of load balancing, the impact of client load balancing on consumption (message accumulation/consumption burr, etc.), and gives some recommendations on best practices.

Meaning of load balancing

The above figure shows the message storage model of RocketMQ: messages are stored in an orderly manner by queue partition. RocketMQ's queue model enables producers, consumers, and read/write queues to be many to many mapping relationships, which can be expanded infinitely. It has great advantages over traditional message queues, such as RabbitMQ. Especially in the streaming processing scenario, it has a natural advantage to ensure that messages in the same queue are processed by the same consumers, which is more friendly for batch processing and aggregation processing.

A consumer consuming a message on a topic is equivalent to consuming messages on all queues on the topic (in the figure above, Consumer A1 consumes queue 1, Consumer A2 consumes queues 2 and 3).

Therefore, it is necessary to ensure that the load of each consumer is balanced as far as possible, that is, to allocate the same number of queues to these consumers, and to ensure that queues can be migrated between different consumers in exceptional circumstances (such as client downtime).

Analysis of load balancing mechanism

Load balancing timing

Load balancing is a process in which the client and the server cooperate with each other. First, we should integrate the responsibilities of the server and the client to answer the first question: When will load balancing occur.

• Client active load balancing

The above figure shows the structure of relevant classes of RocketMQ client. MQClientInstance is responsible for the interaction with the server and the coordination of underlying services, including load balancing.

There are two related methods in MQClientInstance: rebalanceImmediately and doRebalance. When we analyze load balancing, we can find out when to call these two methods:

1. Perform load balancing immediately upon startup;

2. Load balancing is performed once at a fixed time (20s by default).

• Server notifies load balancing

The server notifies the client to perform load balancing through the MQClientInstance # rebalanceImmediately method. We also look for related calls in the server code.

After analyzing the above methods, we can draw a conclusion that in the following scenarios, the server will actively notify the client to trigger load balancing:

1. Client online and offline

• Go online

a. New client sends heartbeat to server

• Offline

b. The client sends the offline request to the server

c. Bottom layer connection exception: responding to the IDLE/CLOSE/EXCEPTION event of the netty channel

2. Subscription relationship changes: subscribe to new topics or no longer subscribe to old topics

Load balancing policy

The previous article has described that load balancing is actually to change the number of queues that consumers are responsible for processing. Here, the number of queues that need to be changed each time and the number of affected clients are determined by the load balancing policy.

Let's analyze the common load balancing policies:

• Average distribution

AllocateMessageQueueAveragely is the default load balancing policy:

If we have 4 clients and 24 queues, when the second client is offline:

Take the default load balancing policy (AllocateMessageQueueAveragely) as an example, and the number of reallocated queues is 8.

The default load balancing policy can distribute queues to each client as evenly as possible, but the number of queues reallocated each time by load balancing is large, especially in scenarios with a large number of clients.

• Consistent Hash

The load balancing policy (AllocateMessageQueueConsistentHash) based on the consistent hash algorithm will reallocate as few queues as possible each time the load balancing, but the load may be uneven.

The impact of load balancing on consumption

Let's take a real online scene as an example:

In the figure below, the green line represents the sending of tps, and the yellow line represents the consumption of tps. It is easy to find that there is a consumption burr around 21:00 and 21:50.

These two time points are in the process of application release. According to our analysis above, it takes a certain time for other consumers in the same group to perceive this change after a consumer goes offline, resulting in second level consumption delay. After the release, consumers quickly handled the accumulated news, and we can find that the consumption speed has increased significantly.

This example shows that when offline, due to the short message processing delay caused by load balancing, new consumers will obtain the consumption progress from the server before continuing the consumption. If the consumer goes down abnormally or does not call shutdown to gracefully offline, and does not upload his/her latest consumption point, the newly allocated consumer will consume repeatedly.

Here we summarize the impact of load balancing on consumption. When a client triggers load balancing:

1. The newly allocated queue may consume repeatedly, which is also the reason why the government requires consumption to be idempotent;

2. The queue that is no longer responsible for consumption will stop for a short time. If the original consumption TPS is very high or the production peak happens, it will cause consumption burr.

Best Practices

Avoid frequent online and offline

In order to avoid the impact of load balancing, we should try to reduce the online and offline consumption of clients, and do a good job of consuming idempotent.

At the same time, the shutdown method should be called before an application is restarted or offline, so that the server will notify the client to trigger load balancing in a timely manner after receiving the offline request from the client to reduce consumption delay.

Select an appropriate load balancing policy

It is necessary to flexibly select load balancing policies according to business needs:

• It is necessary to ensure that the load on the client is balanced as much as possible: select the default average distribution strategy;

• Need to reduce the consumption delay caused by application restart: choose the allocation strategy of consistent hashing.

Of course, other load balancing strategies are not introduced one by one due to the time relationship, which is left to the reader to explore.

Ensure consistent client subscriptions

The load balancing of RocketMQ is calculated independently by each client, so it is necessary to ensure that the load balancing algorithm of each client is consistent with the subscription statement.

• Inconsistent load balancing policies will cause multiple clients to be assigned to the same queue or some clients cannot be assigned to the queue;

• Inconsistent subscription statements may result in failure to consume messages.

RocketMQ 5.0 message level load balancing

To completely solve the problem of repeated consumption and consumption delay caused by client load balancing, RocketMQ 5.0 proposes a message level load balancing mechanism.

Messages in the same queue can be consumed by multiple consumers. The server will ensure that messages are consumed by the client without duplication or leakage:

The load balancing mechanism of message granularity is implemented based on the internal single message confirmation semantics. After a consumer obtains a message, the server will lock the message to ensure that the message is invisible to other consumers until the message is successfully consumed or the consumption timeout occurs. Therefore, even if multiple consumers consume messages of the same queue at the same time, the server can ensure that messages will not be consumed repeatedly by multiple consumers.

In 4. x clients, the implementation of sequential consumption strongly depends on queue allocation. RocketMQ 5.0 also implements the semantics of sequential consumption on the basis of message dimension load balancing: when different consumers process messages in the same message group, they will lock the message status in strict order to ensure the serial consumption of messages in the same message group.

As shown in the above figure, there are four sequential messages in queue Queue1. The four messages belong to the same message group G1, and the storage order is from M1 to M4. In the consumption process, when the previous messages M1 and M2 are processed by consumer A1, as long as the consumption status has not been submitted, Consumer A2 cannot consume the subsequent M3 and M4 messages in parallel, and must wait for the previous message to submit the consumption status before consuming the subsequent messages.

Related Articles

Explore More Special Offers

  1. Short Message Service(SMS) & Mail Service

    50,000 email package starts as low as USD 1.99, 120 short messages start at only USD 1.00

phone Contact Us