You can use data cleansing templates provided by Function Compute to process message data, or modify the template code based on your business requirements to meet custom cleansing needs. This topic uses content splitting for data in ApsaraMQ for RocketMQ as an example to introduce the types of message processing templates and how to use them.
Overview
The data cleansing feature provides basic operator capabilities based on Function Compute. After you create a message data cleansing task in ApsaraMQ for RocketMQ, you can log on to Function Compute to modify the code and function configurations.
Operator | Description |
Message filtering | Matches message content based on regular expressions and sends the matched messages to specified destinations. For more information, see Event patterns. |
Message conversion | Converts message content based on string match conditions and sends converted messages to specified destinations. For example, character case conversion can be performed. The transformed messages can be sent to specified destinations. For more information, see Event transformation. |
Splits message content based on regular expressions and sends the split messages to specified destinations. | |
Matches message content based on regular expressions and sends the matched messages to specified destinations and the unmatched messages to the default destination. | |
Enriches message content based on sources. For example, if the original content of a message contains AccountID, AccountID is used to query the database and obtain the customer region. The system inserts the customer region into the source message body and sends the message body to specified destination services. | |
Maps message content based on regular expressions. For example, the system can mask sensitive fields in messages or reduce the message size to the minimum size. |
Examples
Content splitting
For example, you need to split the original student list message [Jack, Male, Class 4|Alice, Female, Class 3|John, Male, Class 4] into three separate messages, and then send these three messages to respective destination services. To implement this, you can use the content splitting operator. The split messages are as follows:
message:
[Jack, Male, Class 4]
message:
[Alice, Female, Class 3]
message:
[John, Male, Class 4]The following figure shows the process.
Dynamic routing
The following message contains information about toothpaste of three brands:
message:
[BrandA, toothpaste, $12.98, 100g
BrandB, toothpaste, $7.99, 80g
BrandC, toothpaste, $1.99, 100g]The list needs to be sent to destination topics based on the custom dynamic rules. The following items describe the rules:
Send the messages that start with BrandA to the BrandA-item-topic and BrandA-discount-topic topics.
Send the messages that start with BrandB to the BrandB-item-topic and BrandB-discount-topic topics.
Send other messages to the Unknown-brand-topic topic.
The following sample code shows the JSON format of the rules:
{
"defaultTopic": "Unknown-brand-topic",
"rules": [
{
"regex": "^BrandA",
"targetTopics": [
"BrandA-item-topic",
"BrandA-discount-topic"
]
},
{
"regex": "^BrandB",
"targetTopics": [
"BrandB-item-topic",
"BrandB-discount-topic"
]
}
]
}The following figure shows the process.
Content enrichment
In this example, a CIDR block is enriched. The following sample code provides an example of the access logs of a service:
{
"accountID": "164901546557****",
"hostIP": "192.168.XX.XX"
}The following sample code provides an example on how to query the source of the IP address and store the mapping relationship in a MySQL database:
CREATE TABLE `tb_ip` (
-> `IP` VARCHAR(256) NOT NULL,
-> `Region` VARCHAR(256) NOT NULL,
-> `ISP` VARCHAR(256) NOT NULL,
-> PRIMARY KEY (`IP`)
-> );The following sample code provides an example of processed messages:
{
"accountID": "164901546557****",
"hostIP": "192.168.XX.XX",
"region": "beijing"
}The following figure shows the process.
Content mapping
The following message contains the registration information about the employees of a company.
Zhang San, Employee ID 1, 131 1111 1111
Li Si, Employee ID 2, 132 2222 2222
Wang Wu, Employee ID 3, 133 3333 3333The names, IDs, and phone numbers of employees in the preceding message are confidential. Therefore, you need to mask the names, IDs, and phone numbers before you send the message to the destination service. The following sample code provides an example:
Ja*, Employee ID *, ***********
Ma*, Employee ID *, ***********
Dav*, Employee ID *, *********** The following figure shows the process.
Procedure
1. Create ApsaraMQ for RocketMQ instance topics
Log on to the ApsaraMQ for RocketMQ console. In the left-side navigation pane, click Instances. In the top navigation bar, select a region, and then click Create Instance.
In the Create RocketMQ Instance panel, select an Instance Version, such as 4.0 Series, select Standard Instance for Instance Type, enter an instance name such as
test, enter an instance description, and then click OK.On the Instances page, click the target instance. In the left-side navigation pane of the instance details page, click Topics, and then click Create Topic.
In the Create Topic panel, set the topic name, such as
source-topicandtarget-topic, enter a Description, select Normal Message for Message Type, and then click OK.NoteYou need to create at least two topics: one as the event source to send original messages and the other as the event target to receive cleansed data. The two topics can belong to the same RocketMQ instance or different RocketMQ instances.
2. Create an event stream
Log on to the EventBridge console. In the left-side navigation pane, click Event Streams. In the top navigation bar, select a region, and then click Create Event Stream.
In the Create Event Stream page, configure the event source, filtering rules, data cleansing template, and event target in the Source, Filtering, Transformation, and Sink configuration wizards, and then click Save.

① Source and ④ Sink
For Source, select the ApsaraMQ for RocketMQ instance test and select
source-topicfor Topic.For Sink, select the ApsaraMQ for RocketMQ instance test and select
target-topicfor Topic.
Keep the default values for other configuration items.
② Filtering
This is an optional configuration. Keep the default value and click Next.
③ Transformation
Select Alibaba Cloud Service: Function Compute.
Select Create Function Template: A new FC function
EventStreaming_Transform_Customized_****will be created when you create the event stream.Function Template: In this example, the Content Splitting template is selected. Options include Content Splitting, Content Mapping, Content Enrichment, and Dynamic Routing. You can select one of the preceding templates based on your business requirements. The templates provide the basic data processing logic, which can be directly used or custom tailored.

3. Test and verify
3.1 Send original messages in the source RocketMQ instance topic
Log on to the ApsaraMQ for RocketMQ console. Find the Topic
source-topicof the Source instance that you configured when creating the event stream. In the Actions column, click Quick Start.In the Start Message Production And Consumption panel, enter the original message
[Jack, Male, Class 4|Alice, Female, Class 3|John, Male, Class 4], and click OK to send the message.
3.2 Confirm that messages are correctly split in the destination RocketMQ instance topic
In the ApsaraMQ for RocketMQ console, find the Topic
target-topicof the Sink instance that you configured when creating the event stream. Click the topic name and select the Message Query tab.Select Query By Topic for Query Method, and then click Search. In the query results, you can see that the message sent in the previous step has been split into three messages when it is sent to the destination instance. If you click Details for each message row, you can see that the three messages are
"data": "Jack, Male, Class 4","data": "Alice, Female, Class 3", and"data": "John, Male, Class 4".
4. Clean up resources
After testing, if you do not need to use this feature in the short term, release the created resources promptly to avoid unnecessary costs. For more information, see Delete topics, Delete RocketMQ instances, and Delete functions.