All Products
Search
Document Center

ApsaraMQ for Kafka:Data cleansing

Last Updated:Mar 11, 2026

When messages flow between Kafka topics, you often need to split, route, enrich, or mask their content before delivery. Data cleansing handles these transformations inline -- powered by Function Compute templates.

How it works

A data cleansing task follows a four-stage pipeline:

  1. Source -- The ApsaraMQ for Kafka instance and topic to consume messages from.

  2. Filtering (optional) -- A pattern-matching rule that selects which messages to process. If no filter is set, all messages pass through.

  3. Transformation -- A Function Compute template that processes message content. Four built-in operators are available: content splitting, dynamic routing, content enrichment, and content mapping.

  4. Sink -- The ApsaraMQ for Kafka instance and topic to deliver the processed messages to.

After you create a task, the corresponding function appears in the Function Compute console. Open it to view or edit the processing logic beyond what the template provides.

Data cleansing is also available for ApsaraMQ for RocketMQ, ApsaraMQ for MQTT, ApsaraMQ for RabbitMQ, and Simple Message Queue (formerly MNS).

Operators

OperatorDescription
Content splittingSplits a single message into multiple messages based on a regular expression and sends each one individually to the sink.
Dynamic routingMatches message content against regular expressions and routes each message to a different destination topic based on the match result. Unmatched messages go to a default topic.
Content enrichmentLooks up external data (for example, from a MySQL database) based on a field in the message and appends the result to the message body.
Content mappingApplies regular expressions to transform message content -- for example, masking sensitive fields or reducing message size.

Operator examples

Split a message into individual records

Split a single message that contains multiple records into individual messages, one per record.

Input: A message containing a list of student records, semicolon-delimited.

[Jack, Male, Class 4; Alice, Female, Class 3; John, Male, Class 4]

Output: Three separate messages, each containing one student record.

[Jack, Male, Class 4]
[Alice, Female, Class 3]
[John, Male, Class 4]
Content splitting process

Route messages by content pattern

Route messages to different destination topics based on content patterns.

Input: A message containing product entries from multiple brands.

[BrandA, toothpaste, $12.98, 100g
 BrandB, toothpaste, $7.99, 80g
 BrandC, toothpaste, $1.99, 100g]

Routing rules:

  • Messages starting with BrandA go to BrandA-item-topic and BrandA-discount-topic.

  • Messages starting with BrandB go to BrandB-item-topic and BrandB-discount-topic.

  • All other messages go to Unknown-brand-topic.

These rules are defined in JSON:

{
  "defaultTopic": "Unknown-brand-topic",
  "rules": [
    {
      "regex": "^BrandA",
      "targetTopics": [
        "BrandA-item-topic",
        "BrandA-discount-topic"
      ]
    },
    {
      "regex": "^BrandB",
      "targetTopics": [
        "BrandB-item-topic",
        "BrandB-discount-topic"
      ]
    }
  ]
}
Dynamic routing process

Enrich messages with external data

Append external data to messages by looking up a field value in an external data source.

Input: An access log entry containing an account ID and host IP.

{
  "accountID": "164901546557****",
  "hostIP": "192.168.XX.XX"
}

Enrichment source: A MySQL table that maps IP addresses to regions.

CREATE TABLE `tb_ip` (
    `IP` VARCHAR(256) NOT NULL,
    `Region` VARCHAR(256) NOT NULL,
    `ISP` VARCHAR(256) NOT NULL,
    PRIMARY KEY (`IP`)
);

Output: The original message with a region field appended.

{
  "accountID": "164901546557****",
  "hostIP": "192.168.XX.XX",
  "region": "beijing"
}
Content enrichment process

Mask sensitive fields in messages

Transform or mask specific fields in messages using regular expressions.

Input: Employee registration records containing sensitive data -- employee IDs and phone numbers.

James, Employee ID 1, 131 1111 1111
Mary, Employee ID 2, 132 2222 2222
David, Employee ID 3, 133 3333 3333

Output: The same records with sensitive fields masked.

Ja*, Employee ID *, ***********
Ma*, Employee ID *, ***********
Dav*, Employee ID *, ***********
Content mapping process

Create a data cleansing task

Procedure

  1. Log in to the ApsaraMQ for Kafka console. In the left-side navigation pane, choose Message Integration > Tasks. Click Create Task.

  2. Configure the four pipeline stages on the Create Task page: Each template provides the base processing logic. To customize it, open the corresponding function in the Function Compute console after creating the task.

    StageConfiguration
    Source and SinkSelect different ApsaraMQ for Kafka instances.
    FilteringSpecify a matching rule to filter events. Leave blank to match all events. For details, see Message filtering.
    TransformationSelect a Function Compute template: Content Splitting, Content Mapping, Content Enrichment, or Dynamic Routing. In this example, the Content Splitting template is selected.

    Create Task page

    Transformation template selection