RocketMQ message integration


Since its birth, Apache RocketMQ has served 100% of Alibaba Group's internal businesses and tens of thousands of Alibaba Cloud enterprise customers after more than 10 years of large-scale business stability polishing. As a finance-level reliable business message solution, RocketMQ has been focusing on building asynchronous communication capability in the field of business integration since its inception.

This article will continue the scenario of business message integration, and introduce RocketMQ's timing message function from the perspective of use scenarios, application cases, functional principles and best practices.

Click the link below to view the live explanation:

Concept: What is a scheduled message

In the business message integration scenario, the timing message is that after the producer sends a message to the message queue, it is not expected that the message will be consumed by the consumer immediately, but that the consumer can consume it at the specified time.

Similarly, delayed message is actually another interpretation of timed message, which means that the producer expects the message to be delayed for a certain time before the consumer can consume it. It can be understood as timing to the current time plus a certain delay time.

Compare the flow of regular messages with that of regular messages. Common messages can be roughly divided into three processes: message sending, message storage and message consumption. After a message is sent to Topic, the message can be immediately waiting for consumers to consume.

For timed/delayed messages, it can be understood as the characteristics of regular delivery to consumers superimposed on ordinary messages. After the producer sends a timed message, the message will not immediately enter the user's real topic, but will be temporarily stored in a system topic by RocketMQ. When the set time is reached, RocketMQ will deliver the message to the real topic so that consumers can consume it.

Scenario: Why do I need to use timed messages

In scenarios such as distributed timing scheduling trigger and task timeout processing, it is necessary to achieve accurate and reliable timing event trigger. Often, the following demands exist when such timing events are triggered:

• High-performance throughput: It requires a large number of events to trigger, and there can be no performance bottleneck.

• Highly reliable retrieable: event triggering cannot be lost.

• Distributed scalability: scheduled scheduling cannot be a stand-alone system, but needs to be able to balance scheduling to multiple service loads.

Traditional timing scheduling schemes are often implemented based on the task table scanning mechanism of the database. The general idea is to put the tasks that need to be triggered at a fixed time into the database, and then the microservice application will trigger the operation of scanning the database at a fixed time to achieve task retrieval processing.

Although such schemes can achieve scheduled scheduling, they often have many shortcomings:

• Repeated scanning: under the distributed micro-service architecture, each micro-service node needs to scan the database, which brings a lot of redundant task processing and needs to be reprocessed.

• Inaccurate fixed time interval: the mechanism based on timing scanning cannot achieve delay scheduling with arbitrary time accuracy.

• Poor horizontal scalability: In order to avoid the problem of repeated scanning, the database table scanning scheme often splits tables according to the service node, but each data table can only be processed by a single node, which will cause performance bottlenecks.

In this kind of scheduled scheduling scenario, using RocketMQ's scheduled messages can simplify the development logic of scheduled scheduling tasks, and achieve high-performance, scalable, and highly reliable timing triggering capabilities.

• High accuracy and low development threshold: there is no fixed step interval based on message notification. It can easily trigger events with any accuracy without requiring business de-duplication.

• High-performance scalability: The traditional database scanning method is more complex and requires frequent calls to interface scanning, which is easy to cause performance bottlenecks. The message queue RocketMQ version of timed messages has the ability of high concurrency and horizontal expansion.

Case: Use timed messages to meet the financial payment timeout requirement

By using timing messages, certain operations can be performed after a certain time, and the business system does not need to manage the timing status. Here is a typical case scenario: financial payment timeout. Now there is an order system that wants to check the user's order status 30 minutes after the user places the order. If the user has not paid, the order will be automatically cancelled.

Based on RocketMQ timing messages, we can send a timing message that is timed to 30 minutes after the user places an order. At the same time, we can use to set the order ID as MessageKey. After 30 minutes, the order system can check the status of the order through the order ID after receiving the message. If the user fails to pay after the timeout, the order will be automatically closed.

Principle: How to implement RocketMQ timing messages

Fixed interval timing message

As mentioned earlier, the core of timing message is how to transfer the message in the system timing topic to the user's topic at a specific time.

The timing message of Apache RocketMQ 4. x version is to put the timing message in SCHEDULE according to DelayLevel_ TOPIC_ XXXX is a different queue in the system, and then start a scheduled task for each queue, pull messages regularly and transfer the messages to the user's topic. Although this is simple to implement, it also results in only supporting the timing message of a specific DelayLevel.

At present, pr, which supports the implementation of timing messages from timing to any second, has been proposed to the community. Here is a brief introduction to its basic implementation principles.

Time wheel algorithm

Before introducing the specific implementation principle, first introduce the classic time wheel algorithm, which is the core algorithm of timing message implementation.

As shown above, this is a time wheel with a cycle timing of 7 seconds, and the minimum accuracy of timing is seconds. At the same time, there will be a pointer pointing to the current time on the time wheel, which will move to the next scale regularly.

Now we want to schedule to 1 second later, then put the data in the scale of "1". At the same time, if there are multiple data to be scheduled to the same time,

Then it will be added in the form of a linked list. When the time wheel turns to the scale of "1", it will be read and queued from the list. What if you want to set the time beyond the time round? For example, we want to set the time to 14 seconds. Since the time of a circle is 7 seconds, we put it in the scale of "6". When the first time round is turned to "6", it is found that the current time is less than the expected time, so this data is ignored. When the second time round turns to "6", we will find that we have reached the expected 14 seconds.

Any second timing message

In RocketMQ, TimerWheel is used to describe and store the time wheel, and an AppendOnly TimerLog is used to record all messages corresponding to each scale on the time wheel.

TimerLog records some important metadata of a timing message, which is used to transfer the message to the user's topic after the later timing time. Several important attributes are as follows:

For TimerWheel, it can be abstractly considered as an array of fixed length. Each cell in the array represents a "scale" on the time wheel. A "scale" of TimerWheel has the following properties.

The direct relationship between TimerWheel and TimerLog is shown in the following figure:

Each grid in the TimerWheel represents a time scale. At the same time, there will be a firstPos pointing to the address of the first TimerLog record of all timing messages under this scale, and a lastPos pointing to the address of the last TimerLog record of all timing messages under this scale. In addition, for messages in the same scale, their TimerLog will be concatenated into a linked list through prevPos.

When you need to add a record, for example, now we need to add a "1-4". Then point the prevPos of the new record to the current lastPos, that is, "1-3", and modify the lastPos to point to "1-4". In this way, the TimerLog records on the same scale are all concatenated.

With TimerWheel and TimerLog, let's see how a scheduled message is finally delivered to users after it is sent to RocketMQ.

First of all, after discovering that the user sends a timed message, RocketMQ will actually send the message to a system dedicated to processing timed messages, Topic

Then there will be five services in the TimerMessageStore for division of work and cooperation, but the whole process can be divided into two stages: in time round and out time round

For incoming time wheel:

• TimerEnqueueGetService is responsible for pulling messages from the system timing topic and putting them into enqueuePutQueue to wait for the processing of TimerEnqueuePutService

• TimerEnqueuePutService is responsible for building TimerLog records and placing them in the corresponding scale of the time wheel

For outgoing time wheel:

• TimerDequeueGetService is responsible for turning the time wheel and taking out all TimerLog records of the current time scale and putting them into dequeueGetQueue

• TimerDequeueGetMessageService is responsible for reading messages from CommitLog according to TimerLog records

• TimerDequeuePutMessageService is responsible for judging whether the message in the queue has expired. If it has expired, put it into the user's topic and wait for consumption; If it has not expired yet, put the system timing topic into operation again and wait for the time wheel to re-enter.

Actual combat: use timed messages

After understanding the principle of RocketMQ second-level timing messages, let's look at how to use timing messages. First of all, we need to create a topic of "timed/delayed message" type, which can be created using the console or CLi command.

As can be seen from the above, for timed messages, it is to "write articles" when sending messages. Therefore, for producers, we can set the expected delivery time at the time of sending, rather than sending ordinary messages.

When the scheduled time is up, this message is actually a common message delivered to the user Topic. So for consumers, it is no different from the consumption of ordinary news.

Note: The implementation logic of the timing message needs to go through the timing storage and wait for triggering before it is delivered to the consumer. Therefore, if the timing time of a large number of timing messages is set to the same time, there will be a large number of messages that need to be processed at the same time after reaching the time, which will cause excessive pressure on the system. Therefore, it is generally recommended not to set a large number of messages at the same trigger time.

Related Articles

Explore More Special Offers

  1. Short Message Service(SMS) & Mail Service

    50,000 email package starts as low as USD 1.99, 120 short messages start at only USD 1.00

phone Contact Us