A picture advanced RocketMQ
Introduction: San Jijun has read several books, and has read a picture of advanced RocketMQ organized by the source code many times. For RocketMQ, you only need to remember this picture! I thought that it was only the broker who pushed the message to the consumer, and the consumer was finished after consuming it. However, from consumer initiation, message pulling, message consumption, load balancing, message retry and consumption displacement management in result processing. Consumers, the big grievance species, are just stone hammers.
Foreword
San Jijun has read several books, and has read a picture of advanced RocketMQ organized by the source code many times. For RocketMQ, you only need to remember this picture! If you think it's good, remember to like and follow.
This article is the seventh in the series "Analyzing RocketMQ with a Picture". We have already learned about it before:
1: A picture of advanced RocketMQ - overall architecture
2: A picture advanced RocketMQ - NameServer
3: A picture advanced RocketMQ - message sending
4: A graph advanced RocketMQ - communication mechanism
5: A picture advanced RocketMQ - message storage
6: Advanced RocketMQ with one graph - message refresh & index construction
The producer sends the message, and the broker stores the message, and now the RocketMQ consumer can finally consume it. Consumers are gods. They should stretch their hands with clothes, open their mouths with meals, and wait for the broker to feed the news. But the reality slapped me with a big mouth, and the news asked consumers to get it by themselves, and they had to do a lot of dirty work. Directly reduce the dimensionality and attack, and eliminate from God to pure workers.
They are all workers, and workers can definitely understand each other, so everyone will be able to understand RocketMQ consumers soon. We still use an example and take the key code as an entry point to deeply analyze the design and implementation of RocketMQ consumers.
consumer example
• First, instantiate a DefaultMQPushConsumer consumer, tell it the address of the NameServer, so that the consumer can get routing information from the NameServer.
In addition to DefaultMQPushConsumer, there is DefaultMQPullConsumer. The consumption behavior of DefaultMQPullConsumer is mainly controlled by the business side, while the consumption behavior of DefaultMQPushConsumer is mainly controlled by RocketMQ. That is, DefaultMQPullConsumer needs the business to call the API to pull messages, and DefaultMQPushConsumer is actually based on the pull method to achieve the effect of push. DefaultMQPushConsumer will automatically pull the message back, then call back the MessageListener implemented by the business, and return the message to the business party. It is also more commonly used in practice, so we mainly analyze the principle of DefaultMQPushConsumer.
• Then this consumer needs to know which Topic messages it can consume, that is, each consumer needs to subscribe to one or more Topics and specify a tag. In fact, we all see tags when consuming, sending and storing, which are mainly used to define the business attributes of messages. Consumers can only subscribe to messages of certain tags under Topic, that is, filter messages according to tags.
• Consumers also need to do some initialization. Broker does not push messages to consumers, but requires consumers to fetch them on demand. The business itself does not care how to pull messages from the Broker, these are the silent dedications of DefaultMQPushConsumer. Therefore, we need to start the consumer, and the consumer will pull routing information from the NameServer and continuously pull messages from the Broker.
• After the message is pulled back, what should be done with the message? Each consumer is different (determined by the business itself) and is handled by the MessageListener defined by our business. Finally, the consumer also needs to confirm the receipt, that is, to tell the broker whether the consumption is successful or not.
As can be seen from the above example, we just started the consumer, and did not call the API related to pulling messages. How did the messages crawl along the network cable? Since we just started the consumer, let's take a look at how the consumer start is fat four.
consumer start
This picture is like the verdict of Jinling Twelve Hairpins. It was doomed to the fate of RocketMQ consumers early from the start. Consumers are destined to be just workers, not gods. Let's take a look at what migrant workers should do first:
• Check whether the consumer's configuration, such as the consumer group name, consumption type, queue allocation strategy and other parameters comply with the specifications; send the subscription relationship data to the Rebalance service object. Check the consumer instance name, if it is the default name, change it to the current program process id.
The consumption type is the cluster consumption or broadcast consumption mentioned in the RocketMQ overview, and the Queue allocation strategy and Rebalance will be analyzed later in this article.
• Obtain or create an MQClientInstance. MQClientInstance is the same as the MQClientInstance started by the Producer in the RocketMQ producer. It is used to manage the production and consumption behaviors of all producers and consumers in this instance. The same clientId shares the same MQClientInstance, and the clientId is combined by the local IP and instanceName (default value default).
• Set parameters such as Rebalance object consumer group, consumption type, Queue allocation strategy, MQClientInstance, etc., which will be used later.
• Initialize the encapsulation class pullAPIWrapper of the Broker API, you can see by the name that it will come in handy in the message pulling process, and register the message filter at the same time.
• Initialize the site manager and load the site information. The site management is the consumption progress management. The site manager is divided into two types: local management and remote management. During cluster consumption, the consumption site is stored in the Broker and managed by the remote manager; during broadcast consumption, the site is stored locally and managed by the local manager.
• Register the consumer instance locally. If the registration is successful, it means that the consumer is started successfully.
• Start the MQClientInstance instance, and the startup process is the same as that of the producer. Mainly, NettyRemotingClient and some scheduled tasks are started.
• Initialize the consumer service and start it. The reason why the user "feels" that the message is actively pushed by the Broker is because the DefaultMQPushConsumer pulls the message locally through the PullMessageService, and then pushes the local message to the user's consumption code in the form of Callback. DefaultMQPushConsumer and DefaultMQPullConsumer get messages in the same way, which is essentially a pull.
• Update the local subscription relationship and routing information; check whether the consumer's filter type is supported through the broker; send the heartbeat information of the consumer group to all brokers in the cluster.
• Execute a Rebalance immediately. The Rebalance process will be explained in detail later.
Is everyone watching it in a fog? Anyway, I was very confused when I first watched it, and I couldn't understand what this operation was. You don't have to panic, we will analyze it in detail when the process is involved. Let’s get familiar with it first. When we encounter it in the process, we know that this component is initialized when it is started.
Message consumption process
Next, let's see how consumers are reduced from God to workers. First, the consumer needs to pull the message from the broker, and then pass it to the business thread for consumption. The consumer is sandwiched in the middle, and the tool attributes are full. The entry point for consumers to pull messages is the PullMessageService initialized at startup.
• DefaultMQPushConsumer will start the message pull service PullMessageService during the initialization process. This service is a circular thread service. The run() method continuously executes a message pull task PullRequest from the PullRequestQueue, and then obtains the corresponding message according to the consumer group in the task. Instance of DefaultMQPushConsumer to perform message pull task.
• PullRequest is a message pull task, which encapsulates the consumer group, the MessageQueue to be pulled, the message stored in the local ProcessQueue after pulling, and the pull offset, etc. PullRequestQueue is obviously a queue for storing PullRequests. It is maintained by RebalanceService. We will analyze the Rebalance process in detail later in this article, but we will not expand it here.
• DefaultMQPushConsumer.pullMessage() Checks whether the currently processed queue is deleted, whether the server is running, etc. Then perform local flow control judgment. If the number of locally cached messages is greater than the configured maximum number of pulls (the default is 1000, which can be adjusted), or the number of locally cached message bytes is greater than the configured maximum number of cached bytes, the delay will be 50ms before pulling Pick. Check whether the subscription relationship is empty. If it is empty, it will be delayed for 50ms before pulling.
• Encapsulate the pull callback function PullCallback. If the network request is successful, it will call PullCallback.onSuccess. If it is abnormal, it will call PullCallback.onException.
• Find brokerAddr based on brokerName and brokerId. If not found, first pull routing information from NameServer, and then re-acquire brokerAddr. Build the pull message request header PullMessageRequestHeader(Topic, queueId, offset), etc., and then call pullMessageAsync() to send the information to the server.
• pullMessageAsync encapsulates the request into a RemotintCommand, and then constructs the callback function InvokeCallback, which will call back InvokeCallback.operationComplete when the remote request returns.
• Then there is the Netty-based network request process that we are already familiar with. The entire network request response process is the same as the producer message sending. The initialization of Netty is also in the Consumer startup process, where the main purpose is to obtain or create a NettyChannel. First get the Channel from the local cache of the channelTables Map and use the Broker Addr as the key. If it is not obtained, create the Channel through Netty Bootstrap.connect( Broker Addr) and put it into the cache. Then generate the key-value pair of and put it into the responseTable cache. Call channel.writeAndFlush() to transmit the request to the specified broker over the network.
• When a client sends a request, the NettyRemotingServer WorkerGroup handles read events and calls NettyServerHandler.channelRead0() to process the data. Then the call is linked to the processRequestCommand method. This method mainly obtains the corresponding Processor from the local cache processorTable according to the RequestCode in the request to execute the subsequent logic. At present, the message is pulled, so the PullMessageProcessor is obtained. The specific processing process of PullMessageProcessor will be analyzed later, and now we only need to know that it will call MessageStore.getMessage() to get the message and return it to the Consumer.
• Message query: Combined with the storage structure introduced in RocketMQ message storage, we all know that messages are actually stored in CommitLog. In order to speed up message query, the index file ConsumeQueue is maintained. Next, let's see how to find it from these two files. wanted news. The Broker will find the records of the messages to be returned in the ConsumeQueue according to the Topic, queueId, offset and other information in the request, then read the physical offsets of these records, and then obtain the actual messages from CommitLog according to the physical offsets. After processing, it is returned to the Consumer. The process of querying messages can be divided into the following steps.
• Verify before pulling, verify whether the DefaultMessageStore service has been closed (it will be closed when the process is closed normally), and verify whether the DefaultMessageStore service is readable.
• The findConsumeQueue method finds the ConsumeQueue index mapping file according to Topic, queueId. Judging whether the incoming position value to be queried is reasonable according to the found ConsumeQueue index file, only if the message offset to be queried is greater than the minOffset of the current ConsumeQueue file and less than the maxOffset, it is reasonable, otherwise, recalculate the position that can be pulled next time point value.
• Read messages that satisfy maxMsgNums=32 in a loop. The loop reads the message physical location, message size and taghashCode sequentially from ConsumeQueue. Do Hash filtering first, then use the filtered message physical offset and message size to find the message body in CommitLog, and put it in the result list.
• Monitor indicator statistics and return the pulled message results.
At this point, the consumer has achieved a staged victory, and the consumer has pulled the message back from the broker. It's really gratifying, the whole country is celebrating, the gongs and drums are loud, firecrackers are blaring... It's too early to be happy, there are brokers on the top and business threads on the bottom, and consumers are about to start a bumpy road to callback.
Foreword
San Jijun has read several books, and has read a picture of advanced RocketMQ organized by the source code many times. For RocketMQ, you only need to remember this picture! If you think it's good, remember to like and follow.
This article is the seventh in the series "Analyzing RocketMQ with a Picture". We have already learned about it before:
1: A picture of advanced RocketMQ - overall architecture
2: A picture advanced RocketMQ - NameServer
3: A picture advanced RocketMQ - message sending
4: A graph advanced RocketMQ - communication mechanism
5: A picture advanced RocketMQ - message storage
6: Advanced RocketMQ with one graph - message refresh & index construction
The producer sends the message, and the broker stores the message, and now the RocketMQ consumer can finally consume it. Consumers are gods. They should stretch their hands with clothes, open their mouths with meals, and wait for the broker to feed the news. But the reality slapped me with a big mouth, and the news asked consumers to get it by themselves, and they had to do a lot of dirty work. Directly reduce the dimensionality and attack, and eliminate from God to pure workers.
They are all workers, and workers can definitely understand each other, so everyone will be able to understand RocketMQ consumers soon. We still use an example and take the key code as an entry point to deeply analyze the design and implementation of RocketMQ consumers.
consumer example
• First, instantiate a DefaultMQPushConsumer consumer, tell it the address of the NameServer, so that the consumer can get routing information from the NameServer.
In addition to DefaultMQPushConsumer, there is DefaultMQPullConsumer. The consumption behavior of DefaultMQPullConsumer is mainly controlled by the business side, while the consumption behavior of DefaultMQPushConsumer is mainly controlled by RocketMQ. That is, DefaultMQPullConsumer needs the business to call the API to pull messages, and DefaultMQPushConsumer is actually based on the pull method to achieve the effect of push. DefaultMQPushConsumer will automatically pull the message back, then call back the MessageListener implemented by the business, and return the message to the business party. It is also more commonly used in practice, so we mainly analyze the principle of DefaultMQPushConsumer.
• Then this consumer needs to know which Topic messages it can consume, that is, each consumer needs to subscribe to one or more Topics and specify a tag. In fact, we all see tags when consuming, sending and storing, which are mainly used to define the business attributes of messages. Consumers can only subscribe to messages of certain tags under Topic, that is, filter messages according to tags.
• Consumers also need to do some initialization. Broker does not push messages to consumers, but requires consumers to fetch them on demand. The business itself does not care how to pull messages from the Broker, these are the silent dedications of DefaultMQPushConsumer. Therefore, we need to start the consumer, and the consumer will pull routing information from the NameServer and continuously pull messages from the Broker.
• After the message is pulled back, what should be done with the message? Each consumer is different (determined by the business itself) and is handled by the MessageListener defined by our business. Finally, the consumer also needs to confirm the receipt, that is, to tell the broker whether the consumption is successful or not.
As can be seen from the above example, we just started the consumer, and did not call the API related to pulling messages. How did the messages crawl along the network cable? Since we just started the consumer, let's take a look at how the consumer start is fat four.
consumer start
This picture is like the verdict of Jinling Twelve Hairpins. It was doomed to the fate of RocketMQ consumers early from the start. Consumers are destined to be just workers, not gods. Let's take a look at what migrant workers should do first:
• Check whether the consumer's configuration, such as the consumer group name, consumption type, queue allocation strategy and other parameters comply with the specifications; send the subscription relationship data to the Rebalance service object. Check the consumer instance name, if it is the default name, change it to the current program process id.
The consumption type is the cluster consumption or broadcast consumption mentioned in the RocketMQ overview, and the Queue allocation strategy and Rebalance will be analyzed later in this article.
• Obtain or create an MQClientInstance. MQClientInstance is the same as the MQClientInstance started by the Producer in the RocketMQ producer. It is used to manage the production and consumption behaviors of all producers and consumers in this instance. The same clientId shares the same MQClientInstance, and the clientId is combined by the local IP and instanceName (default value default).
• Set parameters such as Rebalance object consumer group, consumption type, Queue allocation strategy, MQClientInstance, etc., which will be used later.
• Initialize the encapsulation class pullAPIWrapper of the Broker API, you can see by the name that it will come in handy in the message pulling process, and register the message filter at the same time.
• Initialize the site manager and load the site information. The site management is the consumption progress management. The site manager is divided into two types: local management and remote management. During cluster consumption, the consumption site is stored in the Broker and managed by the remote manager; during broadcast consumption, the site is stored locally and managed by the local manager.
• Register the consumer instance locally. If the registration is successful, it means that the consumer is started successfully.
• Start the MQClientInstance instance, and the startup process is the same as that of the producer. Mainly, NettyRemotingClient and some scheduled tasks are started.
• Initialize the consumer service and start it. The reason why the user "feels" that the message is actively pushed by the Broker is because the DefaultMQPushConsumer pulls the message locally through the PullMessageService, and then pushes the local message to the user's consumption code in the form of Callback. DefaultMQPushConsumer and DefaultMQPullConsumer get messages in the same way, which is essentially a pull.
• Update the local subscription relationship and routing information; check whether the consumer's filter type is supported through the broker; send the heartbeat information of the consumer group to all brokers in the cluster.
• Execute a Rebalance immediately. The Rebalance process will be explained in detail later.
Is everyone watching it in a fog? Anyway, I was very confused when I first watched it, and I couldn't understand what this operation was. You don't have to panic, we will analyze it in detail when the process is involved. Let’s get familiar with it first. When we encounter it in the process, we know that this component is initialized when it is started.
Message consumption process
Next, let's see how consumers are reduced from God to workers. First, the consumer needs to pull the message from the broker, and then pass it to the business thread for consumption. The consumer is sandwiched in the middle, and the tool attributes are full. The entry point for consumers to pull messages is the PullMessageService initialized at startup.
• DefaultMQPushConsumer will start the message pull service PullMessageService during the initialization process. This service is a circular thread service. The run() method continuously executes a message pull task PullRequest from the PullRequestQueue, and then obtains the corresponding message according to the consumer group in the task. Instance of DefaultMQPushConsumer to perform message pull task.
• PullRequest is a message pull task, which encapsulates the consumer group, the MessageQueue to be pulled, the message stored in the local ProcessQueue after pulling, and the pull offset, etc. PullRequestQueue is obviously a queue for storing PullRequests. It is maintained by RebalanceService. We will analyze the Rebalance process in detail later in this article, but we will not expand it here.
• DefaultMQPushConsumer.pullMessage() Checks whether the currently processed queue is deleted, whether the server is running, etc. Then perform local flow control judgment. If the number of locally cached messages is greater than the configured maximum number of pulls (the default is 1000, which can be adjusted), or the number of locally cached message bytes is greater than the configured maximum number of cached bytes, the delay will be 50ms before pulling Pick. Check whether the subscription relationship is empty. If it is empty, it will be delayed for 50ms before pulling.
• Encapsulate the pull callback function PullCallback. If the network request is successful, it will call PullCallback.onSuccess. If it is abnormal, it will call PullCallback.onException.
• Find brokerAddr based on brokerName and brokerId. If not found, first pull routing information from NameServer, and then re-acquire brokerAddr. Build the pull message request header PullMessageRequestHeader(Topic, queueId, offset), etc., and then call pullMessageAsync() to send the information to the server.
• pullMessageAsync encapsulates the request into a RemotintCommand, and then constructs the callback function InvokeCallback, which will call back InvokeCallback.operationComplete when the remote request returns.
• Then there is the Netty-based network request process that we are already familiar with. The entire network request response process is the same as the producer message sending. The initialization of Netty is also in the Consumer startup process, where the main purpose is to obtain or create a NettyChannel. First get the Channel from the local cache of the channelTables Map and use the Broker Addr as the key. If it is not obtained, create the Channel through Netty Bootstrap.connect( Broker Addr) and put it into the cache. Then generate the key-value pair of
• When a client sends a request, the NettyRemotingServer WorkerGroup handles read events and calls NettyServerHandler.channelRead0() to process the data. Then the call is linked to the processRequestCommand method. This method mainly obtains the corresponding Processor from the local cache processorTable according to the RequestCode in the request to execute the subsequent logic. At present, the message is pulled, so the PullMessageProcessor is obtained. The specific processing process of PullMessageProcessor will be analyzed later, and now we only need to know that it will call MessageStore.getMessage() to get the message and return it to the Consumer.
• Message query: Combined with the storage structure introduced in RocketMQ message storage, we all know that messages are actually stored in CommitLog. In order to speed up message query, the index file ConsumeQueue is maintained. Next, let's see how to find it from these two files. wanted news. The Broker will find the records of the messages to be returned in the ConsumeQueue according to the Topic, queueId, offset and other information in the request, then read the physical offsets of these records, and then obtain the actual messages from CommitLog according to the physical offsets. After processing, it is returned to the Consumer. The process of querying messages can be divided into the following steps.
• Verify before pulling, verify whether the DefaultMessageStore service has been closed (it will be closed when the process is closed normally), and verify whether the DefaultMessageStore service is readable.
• The findConsumeQueue method finds the ConsumeQueue index mapping file according to Topic, queueId. Judging whether the incoming position value to be queried is reasonable according to the found ConsumeQueue index file, only if the message offset to be queried is greater than the minOffset of the current ConsumeQueue file and less than the maxOffset, it is reasonable, otherwise, recalculate the position that can be pulled next time point value.
• Read messages that satisfy maxMsgNums=32 in a loop. The loop reads the message physical location, message size and taghashCode sequentially from ConsumeQueue. Do Hash filtering first, then use the filtered message physical offset and message size to find the message body in CommitLog, and put it in the result list.
• Monitor indicator statistics and return the pulled message results.
At this point, the consumer has achieved a staged victory, and the consumer has pulled the message back from the broker. It's really gratifying, the whole country is celebrating, the gongs and drums are loud, firecrackers are blaring... It's too early to be happy, there are brokers on the top and business threads on the bottom, and consumers are about to start a bumpy road to callback.
Related Articles
-
A detailed explanation of Hadoop core architecture HDFS
Knowledge Base Team
-
What Does IOT Mean
Knowledge Base Team
-
6 Optional Technologies for Data Storage
Knowledge Base Team
-
What Is Blockchain Technology
Knowledge Base Team
Explore More Special Offers
-
Short Message Service(SMS) & Mail Service
50,000 email package starts as low as USD 1.99, 120 short messages start at only USD 1.00