RocketMQ-message filtering
Catalogue
• Business background
What is message filtering
How to use message filtering
Analyze message filtering
• Subsequent
Business background
In e-commerce systems, operations and products require querying order and product data through big data systems.
Many companies, at the beginning, did not have a particularly well-developed technical architecture. So, the system architecture diagram may look like this:
Big data system, directly query MySQL database. When big data systems send some big SQL query statements to query data, it is easy to add CPU and IO pressure to MySQL.
This can easily affect the query of e-commerce systems.
In order not to have a direct impact on the MySQL database, while also allowing big data systems to query data. The technical architecture of companies with relatively complete general architecture is as follows:
Usually, it is necessary to synchronize MySQL data to the hive database through a data exchange platform.
At the same time, the above architecture also uses RocketMQ.
In fact, after removing RocketMQ above, data can also be synchronized from MySQL to the hive database normally.
So here's the question: Why use RocketMQ?
Let's consider using the method of proof to the contrary: what would have happened without RocketMQ? Firstly, we can be certain that the data can definitely be synchronized from the MySQL database to the hive database.
But what are the drawbacks?
If RocketMQ is not used, then a separate data team will also need data from the e-commerce database. At this point, customized development is needed again.
If other data teams need data after introducing RocketMQ, you only need to give them a Topic.
In fact, the introduction of RocketMQ is mainly for the convenience of subsequent expansion and data sharing.
But there is actually another issue with the above: big data systems may only require data from order databases. Data from other tables, such as product data, is not required.
At this point, message filtering is necessary.
Message filtering
In RocketMQ, a message has only one label, and when a producer sends a message, the message has a label. When it comes to consumer message information, one or more labeled messages can be consumed.
How to use it?
Next, how do you need to use RocketMQ?
public class FilterProducer {
public static void main(String[] args) throws Exception{
//Producer Group
DefaultMQProducer producer = new DefaultMQProducer("filter_producer_group");
//Set nameserver
producer.setNamesrvAddr("localhost:9876");
//Start Producer
producer.start();
for(int i=0;i<10;i++){
//Build Message
Message message = new Message("filterTopic",("helloWorld"+i).getBytes(RemotingHelper.DEFAULT_CHARSET));
message.putUserProperty("a",String.valueOf(i));
//Sending messages
SendResult sendResult = producer.send(message);
//Print sending results
System. out. println ("Send result:"+sendResult);
}
//Close Producers
producer.shutdown();
}
}
Key value has been added to each message here. That is the attribute a.
Next, let's look at consumers:
public class FilterConsumer {
public static void main(String[] args) throws Exception {
//Consumer Group
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("filter_consumer_group");
//Register nameserver
consumer.setNamesrvAddr("localhost:9876");
MessageSelector ms = MessageSelector.bySql("a > 5");
//Subscription Theme
consumer.subscribe("filterTopic",ms);
//Enable consumption offset
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
//Listener
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
for (int i = 0; i < list.size(); i++) {
MessageExt messageExt = list.get(i);
String msg = new String(messageExt.getBody());
System. out. println ("Consumer message:"+msg);
}
return ConsumeConcurrentlyStatus.CONSUME_ SUCCESS;
}
});
consumer.start();
System. out. println ("Start Consumer");
}
}
//The operation results are as follows:
Launch Consumer
Consumption message: helloWorld8
Consumption message: helloWorld7
Consumption message: helloWorld9
Consumption message: helloWorld6
The syntax of message filtering
Attention points
If you report an error when starting a consumer:
CODE: 1 DESC: The broker does not support consumer to filter message by SQL92
That's because when you start the broker, message filtering is not enabled.
How to open it?
Add: enablePropertyFilter=true to the broker. conf file
And when starting, you also need to specify the configuration file broker. conf
nohup sh bin/mqbroker -n localhost:9876 -c conf/broker.conf &
Analysis of Message Filtering
Firstly, let's take a look at the three overloaded methods subscribe based on the code points:
Let's take a look at the first option:
A subscribe (String topic, String subExpression) is a type of directly labeled:
consumer.subscribe("TOPIC", "TAGA || TAGB || TAGC");
Let's take a look at the second option:
subscribe(String topic, String fullClassName, String filterClassSource)
This is a custom filter implementation class.
Let me roughly write: Implement the match method of the Message Filter interface
FullClassName: Refers to the full path of the class filterClassSource: Refers to the Java file path of the class
• 1. Broker machine starts multiple FilterServer filtering processes
After the Consumer starts, it will pass a Java class to the Broker
• 3. The Consumer pulls messages from the FilterServer, while the FilterServer pulls messages from the Broker, filters them according to the uploaded Java class, and returns them to the Consumer after filtering
Let's take a look at the third option:
subscribe(String topic, MessageSelector messageSelector)
That's what we just wrote about
That's all for this section.
If you have any questions, please leave a message for communication
6. Subsequent articles
RocketMQ - Getting Started (Updated)
RocketMQ - Architecture and Roles (Updated)
RocketMQ - Message Sending (Updated)
RocketMQ - Consumer Information
RocketMQ - Consumer's broadcast mode and cluster mode (updated)
RocketMQ Sequential Message (Updated)
RocketMQ - Delay Message (Updated)
RocketMQ - Batch Messages
RocketMQ - Filter messages (updated)
RocketMQ - Transaction Message (Updated)
RocketMQ - Message Storage
RocketMQ - High availability
RocketMQ - High Performance
RocketMQ - master-slave replication
RocketMQ - Disk brushing mechanism
RocketMQ Idempotency
RocketMQ - Message retry
RocketMQ - Dead Letter Queue
• Business background
What is message filtering
How to use message filtering
Analyze message filtering
• Subsequent
Business background
In e-commerce systems, operations and products require querying order and product data through big data systems.
Many companies, at the beginning, did not have a particularly well-developed technical architecture. So, the system architecture diagram may look like this:
Big data system, directly query MySQL database. When big data systems send some big SQL query statements to query data, it is easy to add CPU and IO pressure to MySQL.
This can easily affect the query of e-commerce systems.
In order not to have a direct impact on the MySQL database, while also allowing big data systems to query data. The technical architecture of companies with relatively complete general architecture is as follows:
Usually, it is necessary to synchronize MySQL data to the hive database through a data exchange platform.
At the same time, the above architecture also uses RocketMQ.
In fact, after removing RocketMQ above, data can also be synchronized from MySQL to the hive database normally.
So here's the question: Why use RocketMQ?
Let's consider using the method of proof to the contrary: what would have happened without RocketMQ? Firstly, we can be certain that the data can definitely be synchronized from the MySQL database to the hive database.
But what are the drawbacks?
If RocketMQ is not used, then a separate data team will also need data from the e-commerce database. At this point, customized development is needed again.
If other data teams need data after introducing RocketMQ, you only need to give them a Topic.
In fact, the introduction of RocketMQ is mainly for the convenience of subsequent expansion and data sharing.
But there is actually another issue with the above: big data systems may only require data from order databases. Data from other tables, such as product data, is not required.
At this point, message filtering is necessary.
Message filtering
In RocketMQ, a message has only one label, and when a producer sends a message, the message has a label. When it comes to consumer message information, one or more labeled messages can be consumed.
How to use it?
Next, how do you need to use RocketMQ?
public class FilterProducer {
public static void main(String[] args) throws Exception{
//Producer Group
DefaultMQProducer producer = new DefaultMQProducer("filter_producer_group");
//Set nameserver
producer.setNamesrvAddr("localhost:9876");
//Start Producer
producer.start();
for(int i=0;i<10;i++){
//Build Message
Message message = new Message("filterTopic",("helloWorld"+i).getBytes(RemotingHelper.DEFAULT_CHARSET));
message.putUserProperty("a",String.valueOf(i));
//Sending messages
SendResult sendResult = producer.send(message);
//Print sending results
System. out. println ("Send result:"+sendResult);
}
//Close Producers
producer.shutdown();
}
}
Key value has been added to each message here. That is the attribute a.
Next, let's look at consumers:
public class FilterConsumer {
public static void main(String[] args) throws Exception {
//Consumer Group
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("filter_consumer_group");
//Register nameserver
consumer.setNamesrvAddr("localhost:9876");
MessageSelector ms = MessageSelector.bySql("a > 5");
//Subscription Theme
consumer.subscribe("filterTopic",ms);
//Enable consumption offset
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
//Listener
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List
for (int i = 0; i < list.size(); i++) {
MessageExt messageExt = list.get(i);
String msg = new String(messageExt.getBody());
System. out. println ("Consumer message:"+msg);
}
return ConsumeConcurrentlyStatus.CONSUME_ SUCCESS;
}
});
consumer.start();
System. out. println ("Start Consumer");
}
}
//The operation results are as follows:
Launch Consumer
Consumption message: helloWorld8
Consumption message: helloWorld7
Consumption message: helloWorld9
Consumption message: helloWorld6
The syntax of message filtering
Attention points
If you report an error when starting a consumer:
CODE: 1 DESC: The broker does not support consumer to filter message by SQL92
That's because when you start the broker, message filtering is not enabled.
How to open it?
Add: enablePropertyFilter=true to the broker. conf file
And when starting, you also need to specify the configuration file broker. conf
nohup sh bin/mqbroker -n localhost:9876 -c conf/broker.conf &
Analysis of Message Filtering
Firstly, let's take a look at the three overloaded methods subscribe based on the code points:
Let's take a look at the first option:
A subscribe (String topic, String subExpression) is a type of directly labeled:
consumer.subscribe("TOPIC", "TAGA || TAGB || TAGC");
Let's take a look at the second option:
subscribe(String topic, String fullClassName, String filterClassSource)
This is a custom filter implementation class.
Let me roughly write: Implement the match method of the Message Filter interface
FullClassName: Refers to the full path of the class filterClassSource: Refers to the Java file path of the class
• 1. Broker machine starts multiple FilterServer filtering processes
After the Consumer starts, it will pass a Java class to the Broker
• 3. The Consumer pulls messages from the FilterServer, while the FilterServer pulls messages from the Broker, filters them according to the uploaded Java class, and returns them to the Consumer after filtering
Let's take a look at the third option:
subscribe(String topic, MessageSelector messageSelector)
That's what we just wrote about
That's all for this section.
If you have any questions, please leave a message for communication
6. Subsequent articles
RocketMQ - Getting Started (Updated)
RocketMQ - Architecture and Roles (Updated)
RocketMQ - Message Sending (Updated)
RocketMQ - Consumer Information
RocketMQ - Consumer's broadcast mode and cluster mode (updated)
RocketMQ Sequential Message (Updated)
RocketMQ - Delay Message (Updated)
RocketMQ - Batch Messages
RocketMQ - Filter messages (updated)
RocketMQ - Transaction Message (Updated)
RocketMQ - Message Storage
RocketMQ - High availability
RocketMQ - High Performance
RocketMQ - master-slave replication
RocketMQ - Disk brushing mechanism
RocketMQ Idempotency
RocketMQ - Message retry
RocketMQ - Dead Letter Queue
Related Articles
-
A detailed explanation of Hadoop core architecture HDFS
Knowledge Base Team
-
What Does IOT Mean
Knowledge Base Team
-
6 Optional Technologies for Data Storage
Knowledge Base Team
-
What Is Blockchain Technology
Knowledge Base Team
Explore More Special Offers
-
Short Message Service(SMS) & Mail Service
50,000 email package starts as low as USD 1.99, 120 short messages start at only USD 1.00