Database change events captured by Data Transmission Service (DTS) often need to reach downstream systems in real time. A message inflow task streams row-level inserts, updates, and deletes from a DTS change tracking instance to an ApsaraMQ for RocketMQ topic -- without custom consumer code.
How it works
A message inflow task bridges DTS change tracking and ApsaraMQ for RocketMQ through the following pipeline:
DTS captures row-level changes (inserts, updates, and deletes) from a source database and writes them to a change tracking instance.
The message inflow task reads change events from the DTS instance through a consumer group, starting from the consumer offset you specify.
Events are batched based on a message count threshold or a time interval.
An event pattern filters the data, and a Function Compute function transforms it.
The processed messages are delivered to a topic on your ApsaraMQ for RocketMQ instance.
Each consumer group must run on only one client. If multiple clients share the same consumer group, the consumer offset may become invalid.
Supported regions
DTS is available as a message inflow source in the following regions: China (Hangzhou), China (Shanghai), China (Qingdao), China (Beijing), China (Shenzhen), China (Guangzhou), China (Chengdu), and China (Hong Kong).
Create a message inflow task
Before you begin, make sure that you have:
An ApsaraMQ for RocketMQ instance in the Running state. For more information, see Create resources
A DTS change tracking task in the Normal state. For more information, see Manage a change tracking task
A consumer group created in the change tracking task. For more information, see Create consumer groups
To create the task:
Log on to the ApsaraMQ for RocketMQ console.
In the left-side navigation pane, choose Message Integration > Tasks.
In the top navigation bar, select a region such as China (Hangzhou).
On the Tasks page, click Create Task.
On the Create Task page, specify the Task Name and Description, then configure the following steps.
Step 1: Configure the source
Set Data Provider to Data Transmission Service (DTS) and configure the following parameters. Then click Next Step.
| Parameter | Description | Example |
|---|---|---|
| Region | Auto-filled with the region you selected for the task. | China (Hangzhou) |
| Data Subscription Task | The ID of the change tracking task created in the DTS console. | dts8jqe\*\*\*\* |
| Access Method | The access method of the source database instance. Read-only. | RDS |
| Instance ID | The ID of the source database instance. Read-only. | rm-bp18mj3q2dzyb\*\*\*\* |
| Consumer Group | The consumer group for reading data from the change tracking task. Must run on only one client; otherwise the consumer offset may become invalid. | test |
| Account | The account name you specified when you created the consumer group. | test |
| Password | The account password you specified when you created the consumer group. | \*\*\*\*\*\* |
| Consumer Offset | The timestamp from which to start consuming data. Must fall within the data range of the change tracking task. Takes effect only on first consumption. If the change tracking task restarts, consumption resumes from the last recorded offset. | 2022-06-21 00:00:00 |
| Messages | The maximum number of messages that can be sent in each function invocation. The system sends messages only when the number of messages in the backlog reaches the specified value. Valid values: 1 to 10000. | 100 |
| Interval (Unit: Seconds) | The time interval at which the function is invoked. The system sends the aggregated messages to Function Compute at the specified time interval. Valid values: 0 to 15. A value of 0 means messages are sent immediately after aggregation. | 3 |
Step 2: Configure filtering
Define an event pattern to filter change events before they reach the sink. For details, see Event patterns.
Step 3: Configure transformation
Specify a data cleansing method to process messages with Function Compute. Supported operations include splitting, mapping, enrichment, and dynamic routing. For details, see Use Function Compute to perform message cleansing.
Step 4: Configure the sink
Set Service Type to ApsaraMQ for RocketMQ and configure the following parameters.
| Parameter | Description | Example |
|---|---|---|
| Version | The RocketMQ version of the target instance. Valid values: RocketMQ 4.x, RocketMQ 5.x. | RocketMQ 5.x |
| Instance ID | The target ApsaraMQ for RocketMQ instance. | rmq-cn-\*\*\*\* |
| Topic | The target topic on the RocketMQ instance. | topic |
| Message Body | The method for constructing the message body. Options: Complete Data, Data Extraction, Fixed Value, Template. | Data Extraction: $.data.body |
| Custom Property | Custom properties to attach to each message. Options: Not Specified, Data Extraction, Template. | Template (see example below) |
| Message Key | The message key for indexing. Options: Not Specified, Data Extraction, Fixed Value, Template. | Data Extraction: $.data.systemProperties.KEYS |
| Message Tag | The message tag for filtering. Options: Not Specified, Data Extraction, Fixed Value, Template. | Data Extraction: $.data.systemProperties.TAGS |
Custom Property template example
Use the Parameters field to extract values with JSONPath expressions, then reference them in the Template field with ${variable} syntax.
Parameters:
{
"userProperties": "$.data.userProperties",
"msgId": "$.data.systemProperties.UNIQ_KEY"
}Template:
{
"EB_SYS_EMBED_OBJECT": "${userProperties}",
"UNIQ_KEY": "${msgId}"
}Step 5: Configure task properties
Configure the retry policy for failed event delivery and the method for handling faults. For details, see Retry policies and dead-letter queues.
Step 6: Save and verify
Click Save. On the Tasks page, find the task you created. When the Status column changes from Starting to Running, the task is active.
Manage a message inflow task
On the Tasks page, find the task and use the Actions column to perform the following operations:
| Action | Description |
|---|---|
| Details | View basic information, properties, and monitoring metrics for the task. |
| Edit | Modify the task configuration and properties. |
| Enable / Pause | Enable or disable the task. Confirm by clicking OK in the dialog. |
| Delete | Delete the task. Confirm by clicking OK in the dialog. |