A Data Transmission Service (DTS) trigger connects a DTS change tracking task to a Function Compute function through EventBridge. When the change tracking task captures incremental data from a database, the trigger batches and delivers the change records to your function for processing.
How it works
When you create a DTS trigger, Function Compute automatically creates an event stream in EventBridge. Each time the change tracking task captures incremental data, EventBridge pushes one or more change records to your function in a single invocation, based on your batch configuration.
You can view the trigger in the Function Compute console and the auto-created event stream in the EventBridge console.
Limits
The DTS change tracking task and the Function Compute function must be in the same region.
If the number of event streams reaches the quota limit, you cannot create additional DTS triggers. For quota details, see Limits.
Prerequisites
Before you begin, ensure that you have:
Activated EventBridge and granted the required permissions. See Activate EventBridge and grant permissions.
A Function Compute function. See Create a function.
A DTS change tracking task. See Manage a change tracking task.
A consumer group for the change tracking task. See Create a consumer group.
Create a DTS trigger
Log on to the Function Compute console. In the left-side navigation pane, click Functions.
In the top navigation bar, select a region. On the Functions page, click the function you want to manage.
On the function details page, click the Triggers tab, then click Create Trigger.
In the Create Trigger panel, configure the parameters described below, then click OK.
Trigger parameters
| Parameter | Required | Default | Description |
|---|---|---|---|
| Trigger Type | Yes | — | Set to DTS. |
| Name | Yes | — | A custom name for the trigger. Example: dts-trigger. |
| Version Or Alias | No | LATEST | The function version or alias to attach the trigger to. To use a version other than LATEST, switch to it in the upper-right corner of the function details page before creating the trigger. See Manage versions and Manage aliases. |
| Change Tracking Task | Yes | — | The name of the DTS change tracking task to use as the event source. Example: dtsqntc2***. |
| Consumer Group | Yes | — | The consumer group that reads data from the change tracking task. A consumer group can only be used by one client at a time — make sure it is not running on any other client instance. Otherwise, the specified consumer offset may become invalid. Example: test. |
| Account | Yes | — | The account you set when creating the consumer group. Example: test. |
| Password | Yes | — | The password you set when creating the consumer group. |
| Consumer Offset | Yes | — | The timestamp from which to start consuming records. Must fall within the retention window of the subscription instance. This offset applies only when the consumer group runs for the first time. If the trigger restarts later, consumption resumes from the last committed offset. Example: 2022-06-21 00:00:00. |
| Invocation Method | Yes | Synchronous Call | How the trigger invokes the function. See the table below. |
| Trigger State | No | Enabled | Whether to enable the trigger immediately after creation. |
Invocation method
| Method | Behavior | Max payload |
|---|---|---|
| Synchronous Call | The trigger waits for the function to return before sending the next batch. Use this for sequential processing. | 32 MB |
| Asynchronous Invocation | The trigger dispatches each batch and moves to the next without waiting. Use this for high-throughput scenarios. | 128 KB |
For advanced settings such as push configuration, retries, and dead-letter queues, see Advanced features.
After creation, the trigger appears on the Triggers tab. To modify or delete it, see Manage triggers.
Configure test parameters
The DTS trigger passes an event parameter to your function — a JSON array of change records conforming to the CloudEvents specification. To test your function locally, configure a test event that simulates this input.
On the Code tab of the function details page, click the
icon next to Test Function and select Configure Test Parameters.In the Configure Test Parameters panel, select Create New Test Event or Edit Existing Test Event, enter a name, paste the event JSON, then click OK.
Event format
Each invocation receives an array of change records. All records use dts:ConsumeMessage as the event type.
[
{
"id": "12f701a43741d404fa9a7be89d9acae0-321****",
"source": "DTSstreamDemo",
"specversion": "1.0",
"type": "dts:ConsumeMessage",
"datacontenttype": "application/json; charset=utf-8",
"time": "2022-06-10T07:55:57Z",
"subject": "acs:dts:cn-hangzhou:12345****:kk123abc60g782/dtsabcdet1ro",
"data": {
"id": 321,
"operationType": "UPDATE",
"sourceTimestamp": 1654847757,
"offset": 3218099,
"topicPartition": {
"topic": "cn_hangzhou_rm_1234****_test_version2",
"partition": 0,
"hash": 0
},
"schema": {
"databaseName": "hangzhou-test-db",
"tableName": "message_info",
"databaseInfo": {
"databaseType": "MySQL",
"version": "5.7.35-log"
}
},
"beforeImage": { "values": [...], "size": 45 },
"afterImage": { "values": [...], "size": 47 }
}
}
]The outer fields follow the CloudEvents 1.0 specification. The data field carries DTS-specific change record content.
Event data fields
The following fields are nested under data in each record.
| Field | Type | Description |
|---|---|---|
id | String | ID of the DTS data entry |
operationType | String | The type of operation involved in the DTS data entry |
sourceTimestamp | Int | The timestamp that indicates when the DTS data entry was generated |
offset | Int | Offset of the DTS data entry in the topic partition |
topicPartition | Array | Kafka partition info for the topic the event was pushed to |
topicPartition.topic | String | Kafka topic name |
topicPartition.partition | String | Partition number |
topicPartition.hash | String | Underlying DTS storage parameter |
schema | Array | Database and table metadata |
schema.databaseName | String | Source database name |
schema.tableName | String | Source table name |
schema.recordFields | Array | Field definitions, including name, type, and key information |
schema.recordFields[].fieldName | String | Field name |
schema.recordFields[].rawDataTypeNum | Int | Mapped field type value. Corresponds to dataTypeNumber in the deserialized output of the change tracking instance. See Use a Kafka client to consume tracked data. |
schema.recordFields[].isPrimaryKey | Boolean | Whether the field is a primary key |
schema.recordFields[].isUniqueKey | Boolean | Whether the field has a unique key constraint |
schema.recordFields[].fieldPosition | String | Ordinal position of the field in the table |
schema.nameIndex | Array | Field definitions indexed by field name, for fast lookup |
schema.schemaId | String | Unique identifier of the database schema |
schema.primaryIndexInfo | String | Primary key index details |
schema.primaryIndexInfo.indexType | String | Index type, such as PrimaryKey |
schema.primaryIndexInfo.indexFields | Array | Fields that form the primary key |
schema.primaryIndexInfo.cardinality | String | Primary key cardinality |
schema.primaryIndexInfo.nullable | Boolean | Whether the primary key allows null values |
schema.primaryIndexInfo.isFirstUniqueIndex | Boolean | Whether this is the first unique index |
schema.uniqueIndexInfo | String | Unique index definitions |
schema.foreignIndexInfo | String | Foreign key index definitions |
schema.normalIndexInfo | String | Regular (non-unique) index definitions |
schema.databaseInfo | Array | Source database engine information |
schema.databaseInfo.databaseType | String | Database engine type, such as MySQL |
schema.databaseInfo.version | String | Database engine version |
schema.totalRows | Int | Total number of rows in the table |
beforeImage | String | Field values before the operation |
beforeImage.values | String | Recorded field values |
beforeImage.size | Int | The size of the fields recorded |
afterImage | String | Field values after the operation |
afterImage.values | String | Recorded field values |
afterImage.size | Int | The size of the fields recorded |
Write and test the function code
After creating the trigger, write your function handler to process the incoming change records.
On the Code tab of the function details page, write your handler in the code editor, then click Deploy Code. The following Node.js example parses each change record from the event and logs the operation type, affected table, and changed fields:
'use strict'; exports.handler = (event, context, callback) => { const records = JSON.parse(event.toString()); records.forEach(record => { const { operationType, schema, beforeImage, afterImage } = record.data; console.log(`Operation: ${operationType}`); console.log(`Table: ${schema.databaseName}.${schema.tableName}`); // For UPDATE and DELETE, log the values before the change. if (beforeImage) { console.log('Before:', JSON.stringify(beforeImage.values)); } // For INSERT and UPDATE, log the values after the change. if (afterImage) { console.log('After:', JSON.stringify(afterImage.values)); } }); callback(null, 'processed'); };Click Test Function.
What's next
Configure triggers using Serverless Devs. See Common commands of Serverless Devs.
Configure triggers using the SDK. See SDKs.
Modify or delete an existing trigger. See Manage triggers.