Use the data cleaning templates provided by Function Compute to process message data. You can also modify the template code to meet custom cleaning requirements. This topic uses an example that splits content from ApsaraMQ for RocketMQ messages to describe the types of message processing templates and how to use them.
Function introduction
Message data cleaning tasks provide basic operator capabilities. The underlying logic uses Function Compute. After you create a data cleaning task for ApsaraMQ for RocketMQ messages, log on to the Function Compute console to customize the code and modify the function configuration.
Operator | Operator description |
Message filtering | Matches message content against a regular expression and sends matching messages to the destination. For more information, see Event patterns. |
Message transformation | Replaces message content based on a string match, such as converting character case. Sends the transformed message to the destination. For more information, see Transform event content. |
Splits message content based on a regular expression and sends each split part as a separate message to the destination. | |
Matches message content against a regular expression. Routes matching messages to a corresponding destination and non-matching messages to a default destination. | |
Enriches message content based on an enrichment source. For example, if an original message contains an AccountID, you can query a database using the AccountID to get the customer region, add the region to the original message body, and send the message to the destination service. | |
Maps message content based on a regular expression. For example, you can mask sensitive fields in a message or reduce the message size to a minimum standard. |
Scenario examples
Content splitting
For example, you need to split the original message—a student list [Zhang San, male, Class 4|Li Si, female, Class 3|Wang Wu, male, Class 4]—into three separate messages and then push them to their respective destination services. To do this, use the content splitting operator. The split messages are as follows:
message:
[Zhang San, male, Class 4]
message:
[Li Si, female, Class 3]
message:
[Wang Wu, male, Class 4]The following figure shows the result.
Dynamic routing
For example, the following is a list of toothpaste information.
message:
[BrandA, toothpaste, $12.98, 100g
BrandB, toothpaste, $7.99, 80g
BrandC, toothpaste, $1.99, 100g]You need to route the list to destination topics based on custom dynamic rules. The rules are described as follows.
If a message starts with BrandA, send it to the BrandA-item-topic and BrandA-discount-topic topics.
If a message starts with BrandB, send it to the BrandB-item-topic and BrandB-discount-topic topics.
Send all other messages to Unknown-brand-topic.
The rules are described in JSON format as follows.
{
"defaultTopic": "Unknown-brand-topic",
"rules": [
{
"regex": "^BrandA",
"targetTopics": [
"BrandA-item-topic",
"BrandA-discount-topic"
]
},
{
"regex": "^BrandB",
"targetTopics": [
"BrandB-item-topic",
"BrandB-discount-topic"
]
}
]
}The following figure shows the result.
Content enrichment
This topic uses an example of enriching data from an IP address segment processing scenario. Assume that the access log for a service is as follows.
{
"accountID": "164901546557****",
"hostIP": "192.168.XX.XX"
}You need to count the sources of IP addresses. The mapping relationship is stored in a MySQL database.
CREATE TABLE `tb_ip` (
-> `IP` VARCHAR(256) NOT NULL,
-> `Region` VARCHAR(256) NOT NULL,
-> `ISP` VARCHAR(256) NOT NULL,
-> PRIMARY KEY (`IP`)
-> );The processed message is as follows.
{
"accountID": "164901546557****",
"hostIP": "192.168.XX.XX",
"region": "Beijing"
}The following figure shows the result.
Content mapping
For example, the following is the registration information for a company's employees, which includes private information such as employee IDs and phone numbers.
Zhang San, Employee ID 1, 131 1111 1111
Li Si, Employee ID 2, 132 2222 2222
Wang Wu, Employee ID 3, 133 3333 3333You need to mask the private employee information in the messages and then push them to the destination service. The result is as follows.
Zhang*, Employee ID *, *** **** ****
Li*, Employee ID *, *** **** ****
Wang*, Employee ID *, *** **** ****The following figure shows the result.
Procedure
1. Create an ApsaraMQ for RocketMQ instance and topics
Log on to the Message Queue for Apache RocketMQ console. In the navigation pane on the left, select Instances. In the top menu bar, select a region, and then click Create Instance.
In the Create RocketMQ Instance panel, select an Instance Version, such as 4.0 Series. Select Standard Instance for Instance Type. Enter an instance name, such as
test, and an instance description, and then click OK.On the Instance List page, click the target instance. In the navigation pane on the left of the instance details page, select Topic Management, and then click Create Topic.
In the Create Topic panel, specify the topic name, such as
source-topicandtarget-topic. Enter a Description, select Normal message for Message type, and click OK.NoteYou must create at least two topics. One topic acts as the event source to send original messages, and the other acts as the event target to receive the cleaned data. The two topics can belong to the same ApsaraMQ for RocketMQ instance or different instances.
2. Create an event stream
Log on to the EventBridge console. In the navigation pane on the left, select Event Streams. In the top menu bar, select a region, and then click Create Event Stream.
On the Create Event Stream page, use the Source, Filtering, Transform, and Sink configuration wizard to configure the event source, filtering rules, data cleaning template, and event target, respectively. Then click Save.

①Source and ④Sink
For Source, select the ApsaraMQ for RocketMQ instance named test and the topic named
source-topic.For Sink, select the ApsaraMQ for RocketMQ instance named test and the topic named
target-topic.
Keep the default values for the other configuration items.
②Filtering
Optional configuration. Keep the default value and then click Next.
③Transform
Select an Alibaba Cloud service: Function Compute.
Select New Function Template: An FC function
EventStreaming_Transform_Customized_****will be created when you create an event stream.Function templates: This topic uses the content splitting template as an example. The available templates include content splitting, content mapping, content enrichment, and dynamic routing. You can select any of these templates based on your needs. Each template provides basic data processing logic that you can use as-is or customize.

3. Test and verify
3.1 Send an original message from the source ApsaraMQ for RocketMQ instance topic
Log on to the Message Queue for Apache RocketMQ console. Find the topic
source-topicof the Source instance that you configured when you created the event stream. In the Actions column, click Quick Experience.In the Quick Experience for Message Production and Consumption panel, enter the raw message
[Zhang San, Male, Class 4|Li Si, Female, Class 3|Wang Wu, Male, Class 4], and click OK to send the message.
3.2 Confirm that the message is correctly split in the destination ApsaraMQ for RocketMQ instance topic
In the Message Queue for Apache RocketMQ console, locate the Sink (destination) instance topic
target-topicthat you configured when you created an event stream. Click the topic name, and then select the Message Query tab.Select Query method as Query by Topic, and click Query. In the query results, you can see that the message sent in the previous step has been split into three messages upon delivery to the destination. Click Details for each message row to view the three messages:
"data": "Zhang San, male, Class 4","data": "Li Si, female, Class 3", and"data": "Wang Wu, male, Class 4".
4. Clean up resources
After you complete the test, if you no longer need to use this feature, release the created resources promptly to avoid unnecessary charges. For more information, see Delete a topic, Delete an ApsaraMQ for RocketMQ instance, and Delete a function.