All Products
Search
Document Center

Function Compute:DTS triggers

Last Updated:Apr 01, 2026

A Data Transmission Service (DTS) trigger connects a DTS change tracking task to a Function Compute function through EventBridge. When the change tracking task captures incremental data from a database, the trigger batches and delivers the change records to your function for processing.

How it works

When you create a DTS trigger, Function Compute automatically creates an event stream in EventBridge. Each time the change tracking task captures incremental data, EventBridge pushes one or more change records to your function in a single invocation, based on your batch configuration.

You can view the trigger in the Function Compute console and the auto-created event stream in the EventBridge console.

Limits

  • The DTS change tracking task and the Function Compute function must be in the same region.

  • If the number of event streams reaches the quota limit, you cannot create additional DTS triggers. For quota details, see Limits.

Prerequisites

Before you begin, ensure that you have:

Create a DTS trigger

  1. Log on to the Function Compute console. In the left-side navigation pane, click Functions.

  2. In the top navigation bar, select a region. On the Functions page, click the function you want to manage.

  3. On the function details page, click the Triggers tab, then click Create Trigger.

  4. In the Create Trigger panel, configure the parameters described below, then click OK.

Trigger parameters

ParameterRequiredDefaultDescription
Trigger TypeYesSet to DTS.
NameYesA custom name for the trigger. Example: dts-trigger.
Version Or AliasNoLATESTThe function version or alias to attach the trigger to. To use a version other than LATEST, switch to it in the upper-right corner of the function details page before creating the trigger. See Manage versions and Manage aliases.
Change Tracking TaskYesThe name of the DTS change tracking task to use as the event source. Example: dtsqntc2***.
Consumer GroupYesThe consumer group that reads data from the change tracking task. A consumer group can only be used by one client at a time — make sure it is not running on any other client instance. Otherwise, the specified consumer offset may become invalid. Example: test.
AccountYesThe account you set when creating the consumer group. Example: test.
PasswordYesThe password you set when creating the consumer group.
Consumer OffsetYesThe timestamp from which to start consuming records. Must fall within the retention window of the subscription instance. This offset applies only when the consumer group runs for the first time. If the trigger restarts later, consumption resumes from the last committed offset. Example: 2022-06-21 00:00:00.
Invocation MethodYesSynchronous CallHow the trigger invokes the function. See the table below.
Trigger StateNoEnabledWhether to enable the trigger immediately after creation.

Invocation method

MethodBehaviorMax payload
Synchronous CallThe trigger waits for the function to return before sending the next batch. Use this for sequential processing.32 MB
Asynchronous InvocationThe trigger dispatches each batch and moves to the next without waiting. Use this for high-throughput scenarios.128 KB

For advanced settings such as push configuration, retries, and dead-letter queues, see Advanced features.

After creation, the trigger appears on the Triggers tab. To modify or delete it, see Manage triggers.

Configure test parameters

The DTS trigger passes an event parameter to your function — a JSON array of change records conforming to the CloudEvents specification. To test your function locally, configure a test event that simulates this input.

  1. On the Code tab of the function details page, click the image.png icon next to Test Function and select Configure Test Parameters.

  2. In the Configure Test Parameters panel, select Create New Test Event or Edit Existing Test Event, enter a name, paste the event JSON, then click OK.

Event format

Each invocation receives an array of change records. All records use dts:ConsumeMessage as the event type.

[
  {
    "id": "12f701a43741d404fa9a7be89d9acae0-321****",
    "source": "DTSstreamDemo",
    "specversion": "1.0",
    "type": "dts:ConsumeMessage",
    "datacontenttype": "application/json; charset=utf-8",
    "time": "2022-06-10T07:55:57Z",
    "subject": "acs:dts:cn-hangzhou:12345****:kk123abc60g782/dtsabcdet1ro",
    "data": {
      "id": 321,
      "operationType": "UPDATE",
      "sourceTimestamp": 1654847757,
      "offset": 3218099,
      "topicPartition": {
        "topic": "cn_hangzhou_rm_1234****_test_version2",
        "partition": 0,
        "hash": 0
      },
      "schema": {
        "databaseName": "hangzhou-test-db",
        "tableName": "message_info",
        "databaseInfo": {
          "databaseType": "MySQL",
          "version": "5.7.35-log"
        }
      },
      "beforeImage": { "values": [...], "size": 45 },
      "afterImage":  { "values": [...], "size": 47 }
    }
  }
]

The outer fields follow the CloudEvents 1.0 specification. The data field carries DTS-specific change record content.

Event data fields

The following fields are nested under data in each record.

FieldTypeDescription
idStringID of the DTS data entry
operationTypeStringThe type of operation involved in the DTS data entry
sourceTimestampIntThe timestamp that indicates when the DTS data entry was generated
offsetIntOffset of the DTS data entry in the topic partition
topicPartitionArrayKafka partition info for the topic the event was pushed to
topicPartition.topicStringKafka topic name
topicPartition.partitionStringPartition number
topicPartition.hashStringUnderlying DTS storage parameter
schemaArrayDatabase and table metadata
schema.databaseNameStringSource database name
schema.tableNameStringSource table name
schema.recordFieldsArrayField definitions, including name, type, and key information
schema.recordFields[].fieldNameStringField name
schema.recordFields[].rawDataTypeNumIntMapped field type value. Corresponds to dataTypeNumber in the deserialized output of the change tracking instance. See Use a Kafka client to consume tracked data.
schema.recordFields[].isPrimaryKeyBooleanWhether the field is a primary key
schema.recordFields[].isUniqueKeyBooleanWhether the field has a unique key constraint
schema.recordFields[].fieldPositionStringOrdinal position of the field in the table
schema.nameIndexArrayField definitions indexed by field name, for fast lookup
schema.schemaIdStringUnique identifier of the database schema
schema.primaryIndexInfoStringPrimary key index details
schema.primaryIndexInfo.indexTypeStringIndex type, such as PrimaryKey
schema.primaryIndexInfo.indexFieldsArrayFields that form the primary key
schema.primaryIndexInfo.cardinalityStringPrimary key cardinality
schema.primaryIndexInfo.nullableBooleanWhether the primary key allows null values
schema.primaryIndexInfo.isFirstUniqueIndexBooleanWhether this is the first unique index
schema.uniqueIndexInfoStringUnique index definitions
schema.foreignIndexInfoStringForeign key index definitions
schema.normalIndexInfoStringRegular (non-unique) index definitions
schema.databaseInfoArraySource database engine information
schema.databaseInfo.databaseTypeStringDatabase engine type, such as MySQL
schema.databaseInfo.versionStringDatabase engine version
schema.totalRowsIntTotal number of rows in the table
beforeImageStringField values before the operation
beforeImage.valuesStringRecorded field values
beforeImage.sizeIntThe size of the fields recorded
afterImageStringField values after the operation
afterImage.valuesStringRecorded field values
afterImage.sizeIntThe size of the fields recorded

Write and test the function code

After creating the trigger, write your function handler to process the incoming change records.

  1. On the Code tab of the function details page, write your handler in the code editor, then click Deploy Code. The following Node.js example parses each change record from the event and logs the operation type, affected table, and changed fields:

    'use strict';
    
    exports.handler = (event, context, callback) => {
      const records = JSON.parse(event.toString());
    
      records.forEach(record => {
        const { operationType, schema, beforeImage, afterImage } = record.data;
    
        console.log(`Operation: ${operationType}`);
        console.log(`Table: ${schema.databaseName}.${schema.tableName}`);
    
        // For UPDATE and DELETE, log the values before the change.
        if (beforeImage) {
          console.log('Before:', JSON.stringify(beforeImage.values));
        }
    
        // For INSERT and UPDATE, log the values after the change.
        if (afterImage) {
          console.log('After:', JSON.stringify(afterImage.values));
        }
      });
    
      callback(null, 'processed');
    };
  2. Click Test Function.

What's next