All Products
Search
Document Center

ApsaraMQ for RocketMQ:Synchronize DTS change tracking data to ApsaraMQ for RocketMQ

Last Updated:Mar 11, 2026

Database change events captured by Data Transmission Service (DTS) often need to reach downstream systems in real time. A message inflow task streams row-level inserts, updates, and deletes from a DTS change tracking instance to an ApsaraMQ for RocketMQ topic -- without custom consumer code.

How it works

A message inflow task bridges DTS change tracking and ApsaraMQ for RocketMQ through the following pipeline:

  1. DTS captures row-level changes (inserts, updates, and deletes) from a source database and writes them to a change tracking instance.

  2. The message inflow task reads change events from the DTS instance through a consumer group, starting from the consumer offset you specify.

  3. Events are batched based on a message count threshold or a time interval.

  4. An event pattern filters the data, and a Function Compute function transforms it.

  5. The processed messages are delivered to a topic on your ApsaraMQ for RocketMQ instance.

Note

Each consumer group must run on only one client. If multiple clients share the same consumer group, the consumer offset may become invalid.

Supported regions

DTS is available as a message inflow source in the following regions: China (Hangzhou), China (Shanghai), China (Qingdao), China (Beijing), China (Shenzhen), China (Guangzhou), China (Chengdu), and China (Hong Kong).

Create a message inflow task

Before you begin, make sure that you have:

To create the task:

  1. Log on to the ApsaraMQ for RocketMQ console.

  2. In the left-side navigation pane, choose Message Integration > Tasks.

  3. In the top navigation bar, select a region such as China (Hangzhou).

  4. On the Tasks page, click Create Task.

  5. On the Create Task page, specify the Task Name and Description, then configure the following steps.

Step 1: Configure the source

Set Data Provider to Data Transmission Service (DTS) and configure the following parameters. Then click Next Step.

ParameterDescriptionExample
RegionAuto-filled with the region you selected for the task.China (Hangzhou)
Data Subscription TaskThe ID of the change tracking task created in the DTS console.dts8jqe\*\*\*\*
Access MethodThe access method of the source database instance. Read-only.RDS
Instance IDThe ID of the source database instance. Read-only.rm-bp18mj3q2dzyb\*\*\*\*
Consumer GroupThe consumer group for reading data from the change tracking task. Must run on only one client; otherwise the consumer offset may become invalid.test
AccountThe account name you specified when you created the consumer group.test
PasswordThe account password you specified when you created the consumer group.\*\*\*\*\*\*
Consumer OffsetThe timestamp from which to start consuming data. Must fall within the data range of the change tracking task. Takes effect only on first consumption. If the change tracking task restarts, consumption resumes from the last recorded offset.2022-06-21 00:00:00
MessagesThe maximum number of messages that can be sent in each function invocation. The system sends messages only when the number of messages in the backlog reaches the specified value. Valid values: 1 to 10000.100
Interval (Unit: Seconds)The time interval at which the function is invoked. The system sends the aggregated messages to Function Compute at the specified time interval. Valid values: 0 to 15. A value of 0 means messages are sent immediately after aggregation.3

Step 2: Configure filtering

Define an event pattern to filter change events before they reach the sink. For details, see Event patterns.

Step 3: Configure transformation

Specify a data cleansing method to process messages with Function Compute. Supported operations include splitting, mapping, enrichment, and dynamic routing. For details, see Use Function Compute to perform message cleansing.

Step 4: Configure the sink

Set Service Type to ApsaraMQ for RocketMQ and configure the following parameters.

ParameterDescriptionExample
VersionThe RocketMQ version of the target instance. Valid values: RocketMQ 4.x, RocketMQ 5.x.RocketMQ 5.x
Instance IDThe target ApsaraMQ for RocketMQ instance.rmq-cn-\*\*\*\*
TopicThe target topic on the RocketMQ instance.topic
Message BodyThe method for constructing the message body. Options: Complete Data, Data Extraction, Fixed Value, Template.Data Extraction: $.data.body
Custom PropertyCustom properties to attach to each message. Options: Not Specified, Data Extraction, Template.Template (see example below)
Message KeyThe message key for indexing. Options: Not Specified, Data Extraction, Fixed Value, Template.Data Extraction: $.data.systemProperties.KEYS
Message TagThe message tag for filtering. Options: Not Specified, Data Extraction, Fixed Value, Template.Data Extraction: $.data.systemProperties.TAGS

Custom Property template example

Use the Parameters field to extract values with JSONPath expressions, then reference them in the Template field with ${variable} syntax.

Parameters:

{
  "userProperties": "$.data.userProperties",
  "msgId": "$.data.systemProperties.UNIQ_KEY"
}

Template:

{
  "EB_SYS_EMBED_OBJECT": "${userProperties}",
  "UNIQ_KEY": "${msgId}"
}

Step 5: Configure task properties

Configure the retry policy for failed event delivery and the method for handling faults. For details, see Retry policies and dead-letter queues.

Step 6: Save and verify

Click Save. On the Tasks page, find the task you created. When the Status column changes from Starting to Running, the task is active.

Manage a message inflow task

On the Tasks page, find the task and use the Actions column to perform the following operations:

ActionDescription
DetailsView basic information, properties, and monitoring metrics for the task.
EditModify the task configuration and properties.
Enable / PauseEnable or disable the task. Confirm by clicking OK in the dialog.
DeleteDelete the task. Confirm by clicking OK in the dialog.