RocketMQ-message filtering


• Business background

What is message filtering

How to use message filtering

Analyze message filtering

• Subsequent

Business background

In e-commerce systems, operations and products require querying order and product data through big data systems.

Many companies, at the beginning, did not have a particularly well-developed technical architecture. So, the system architecture diagram may look like this:

Big data system, directly query MySQL database. When big data systems send some big SQL query statements to query data, it is easy to add CPU and IO pressure to MySQL.

This can easily affect the query of e-commerce systems.

In order not to have a direct impact on the MySQL database, while also allowing big data systems to query data. The technical architecture of companies with relatively complete general architecture is as follows:

Usually, it is necessary to synchronize MySQL data to the hive database through a data exchange platform.

At the same time, the above architecture also uses RocketMQ.

In fact, after removing RocketMQ above, data can also be synchronized from MySQL to the hive database normally.

So here's the question: Why use RocketMQ?

Let's consider using the method of proof to the contrary: what would have happened without RocketMQ? Firstly, we can be certain that the data can definitely be synchronized from the MySQL database to the hive database.

But what are the drawbacks?

If RocketMQ is not used, then a separate data team will also need data from the e-commerce database. At this point, customized development is needed again.

If other data teams need data after introducing RocketMQ, you only need to give them a Topic.

In fact, the introduction of RocketMQ is mainly for the convenience of subsequent expansion and data sharing.

But there is actually another issue with the above: big data systems may only require data from order databases. Data from other tables, such as product data, is not required.

At this point, message filtering is necessary.

Message filtering

In RocketMQ, a message has only one label, and when a producer sends a message, the message has a label. When it comes to consumer message information, one or more labeled messages can be consumed.

How to use it?

Next, how do you need to use RocketMQ?

public class FilterProducer {

public static void main(String[] args) throws Exception{

//Producer Group

DefaultMQProducer producer = new DefaultMQProducer("filter_producer_group");

//Set nameserver


//Start Producer


for(int i=0;i<10;i++){

//Build Message

Message message = new Message("filterTopic",("helloWorld"+i).getBytes(RemotingHelper.DEFAULT_CHARSET));


//Sending messages

SendResult sendResult = producer.send(message);

//Print sending results

System. out. println ("Send result:"+sendResult);


//Close Producers




Key value has been added to each message here. That is the attribute a.

Next, let's look at consumers:

public class FilterConsumer {

public static void main(String[] args) throws Exception {

//Consumer Group

DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("filter_consumer_group");

//Register nameserver


MessageSelector ms = MessageSelector.bySql("a > 5");

//Subscription Theme


//Enable consumption offset



consumer.registerMessageListener(new MessageListenerConcurrently() {


public ConsumeConcurrentlyStatus consumeMessage(List list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {

for (int i = 0; i < list.size(); i++) {

MessageExt messageExt = list.get(i);

String msg = new String(messageExt.getBody());

System. out. println ("Consumer message:"+msg);


return ConsumeConcurrentlyStatus.CONSUME_ SUCCESS;




System. out. println ("Start Consumer");



//The operation results are as follows:

Launch Consumer

Consumption message: helloWorld8

Consumption message: helloWorld7

Consumption message: helloWorld9

Consumption message: helloWorld6

The syntax of message filtering

Attention points

If you report an error when starting a consumer:

CODE: 1 DESC: The broker does not support consumer to filter message by SQL92

That's because when you start the broker, message filtering is not enabled.

How to open it?

Add: enablePropertyFilter=true to the broker. conf file

And when starting, you also need to specify the configuration file broker. conf

nohup sh bin/mqbroker -n localhost:9876 -c conf/broker.conf &

Analysis of Message Filtering

Firstly, let's take a look at the three overloaded methods subscribe based on the code points:

Let's take a look at the first option:

A subscribe (String topic, String subExpression) is a type of directly labeled:

consumer.subscribe("TOPIC", "TAGA || TAGB || TAGC");

Let's take a look at the second option:

subscribe(String topic, String fullClassName, String filterClassSource)

This is a custom filter implementation class.

Let me roughly write: Implement the match method of the Message Filter interface

FullClassName: Refers to the full path of the class filterClassSource: Refers to the Java file path of the class

• 1. Broker machine starts multiple FilterServer filtering processes

After the Consumer starts, it will pass a Java class to the Broker

• 3. The Consumer pulls messages from the FilterServer, while the FilterServer pulls messages from the Broker, filters them according to the uploaded Java class, and returns them to the Consumer after filtering

Let's take a look at the third option:

subscribe(String topic, MessageSelector messageSelector)

That's what we just wrote about

That's all for this section.

If you have any questions, please leave a message for communication

6. Subsequent articles

RocketMQ - Getting Started (Updated)

RocketMQ - Architecture and Roles (Updated)

RocketMQ - Message Sending (Updated)

RocketMQ - Consumer Information

RocketMQ - Consumer's broadcast mode and cluster mode (updated)

RocketMQ Sequential Message (Updated)

RocketMQ - Delay Message (Updated)

RocketMQ - Batch Messages

RocketMQ - Filter messages (updated)

RocketMQ - Transaction Message (Updated)

RocketMQ - Message Storage

RocketMQ - High availability

RocketMQ - High Performance

RocketMQ - master-slave replication

RocketMQ - Disk brushing mechanism

RocketMQ Idempotency

RocketMQ - Message retry

RocketMQ - Dead Letter Queue

Related Articles

Explore More Special Offers

  1. Short Message Service(SMS) & Mail Service

    50,000 email package starts as low as USD 1.99, 120 short messages start at only USD 1.00

phone Contact Us