Data Transmission Service (DTS) is an event source that integrates with Function Compute through EventBridge. This integration allows DTS triggers to invoke associated functions to process real-time incremental data from database change tracking tasks. This topic describes how to create a DTS trigger, configure input parameters, and write and test code in the Function Compute console.
Overview
When you create a trigger in the Function Compute console, Function Compute automatically creates event streams in EventBridge based on your trigger configuration.
After the trigger is created, you can view its information in the Function Compute console. You can also view the automatically created resources in the EventBridge console. When a DTS change tracking task captures incremental data from a database, the trigger invokes the associated function. Then, one or more message events are pushed to the function in batches for processing, based on your batch configuration.
Precautions
The DTS change tracking task used as the event source must be in the same region as the Function Compute function.
If the number of created event streams reaches the upper limit, you cannot create additional DTS triggers. For more information about the limits on the number of event streams, see Limits.
Prerequisites
EventBridge
Function Compute
Data Transmission Service (DTS)
Step 1: Create a DTS trigger
Log on to the Function Compute console. In the left-side navigation pane, click Functions.
In the top navigation bar, select a region. On the Functions page, click the function that you want to manage.
On the function details page, click the Triggers tab and then click Create Trigger.
In the Create Trigger panel, configure the parameters and click OK.
The following table describes the basic configuration items.
Configuration Item
Description
Example
Trigger Type
The type of the trigger. For more information about supported trigger types, see Trigger overview.
DTS
Name
A custom name for the trigger.
dts-trigger
Version Or Alias
The default value is LATEST. To create a trigger for another version or alias, first switch to that version or alias in the upper-right corner of the function details page. For more information about versions and aliases, see Manage versions and Manage aliases.
LATEST
Change Tracking Task
The name of an existing change tracking task.
dtsqntc2***
Consumer Group
The name of the consumer group created to consume data from the tracking task.
ImportantMake sure the consumer group is not running on other client instances. Otherwise, the specified consumer offset may become invalid.
test
Account
The account that you set when you created the consumer group.
test
Password
The password that you set when you created the consumer group.
******
Consumer Offset
The timestamp when you expect to consume the first data record. The consumer offset must be within the timestamp range of the subscription instance.
NoteThe consumer offset takes effect only when a new consumer group runs for the first time. If the task restarts later, consumption continues from the last consumer offset.
2022-06-21 00:00:00
Invocation Method
Select a method to invoke the function.
Valid values are described as follows:
Synchronous Call: This method is suitable for sequential invocation scenarios. A single event or a batch of events triggers a function invocation. The system waits for the function to execute and return a result before the next event or batch of events triggers another function invocation. The maximum payload of a synchronous call request is 32 MB. For more information, see Synchronous calls.
Asynchronous Invocation: This method lets you quickly consume events. A single event or a batch of events triggers a function invocation. Function Compute immediately returns a response, and the next event or batch of events can then trigger another function invocation. During this process, the function is executed asynchronously. The maximum payload of an asynchronous invocation request is 128 KB. For more information, see Overview.
Synchronous Call
Trigger State
Specifies whether to enable the trigger immediately after it is created. By default, Enable Trigger is selected, which means the trigger is enabled immediately after it is created.
Enabled
For more information about advanced configuration items, such as push configuration, retries, and dead-letter queues, see Advanced features.
After the trigger is created, it is displayed on the Triggers tab. To modify or delete a trigger, see Trigger management.
Step 2: Configure function input parameters
The DTS event source passes an event input parameter to the function. You can manually pass the event to the function to simulate a triggering event.
On the Code tab of the function details page, click the
icon next Test Function and select Configure Test Parameters from the drop-down list. In the Configure Test Parameters panel, select Create New Test Event or Edit Existing Test Event, enter the event name and content, and then click OK.
The
eventuses the following format:[ { "data": { "id": 321****, "topicPartition": { "hash": 0, "partition": 0, "topic": "cn_hangzhou_rm_1234****_test_version2" }, "offset": 3218099, "sourceTimestamp": 1654847757, "operationType": "UPDATE", "schema": { "recordFields": [ { "fieldName": "id", "rawDataTypeNum": 8, "isPrimaryKey": true, "isUniqueKey": false, "fieldPosition": 0 }, { "fieldName": "topic", "rawDataTypeNum": 253, "isPrimaryKey": false, "isUniqueKey": false, "fieldPosition": 1 } ], "nameIndex": { "id": { "fieldName": "id", "rawDataTypeNum": 8, "isPrimaryKey": true, "isUniqueKey": false, "fieldPosition": 0 }, "topic": { "fieldName": "topic", "rawDataTypeNum": 253, "isPrimaryKey": false, "isUniqueKey": false, "fieldPosition": 1 } }, "schemaId": "(hangzhou-test-db,hangzhou-test-db,message_info)", "databaseName": "hangzhou--test-db", "tableName": "message_info", "primaryIndexInfo": { "indexType": "PrimaryKey", "indexFields": [ { "fieldName": "id", "rawDataTypeNum": 8, "isPrimaryKey": true, "isUniqueKey": false, "fieldPosition": 0 } ], "cardinality": 0, "nullable": true, "isFirstUniqueIndex": false }, "uniqueIndexInfo": [], "foreignIndexInfo": [], "normalIndexInfo": [], "databaseInfo": { "databaseType": "MySQL", "version": "5.7.35-log" }, "totalRows": 0 }, "beforeImage": { "recordSchema": { "recordFields": [ { "fieldName": "id", "rawDataTypeNum": 8, "isPrimaryKey": true, "isUniqueKey": false, "fieldPosition": 0 }, { "fieldName": "topic", "rawDataTypeNum": 253, "isPrimaryKey": false, "isUniqueKey": false, "fieldPosition": 1 } ], "nameIndex": { "id": { "fieldName": "id", "rawDataTypeNum": 8, "isPrimaryKey": true, "isUniqueKey": false, "fieldPosition": 0 }, "topic": { "fieldName": "topic", "rawDataTypeNum": 253, "isPrimaryKey": false, "isUniqueKey": false, "fieldPosition": 1 } }, "schemaId": "(hangzhou-test-db,hangzhou-test-db,message_info)", "databaseName": "hangzhou-test-db", "tableName": "message_info", "primaryIndexInfo": { "indexType": "PrimaryKey", "indexFields": [ { "fieldName": "id", "rawDataTypeNum": 8, "isPrimaryKey": true, "isUniqueKey": false, "fieldPosition": 0 } ], "cardinality": 0, "nullable": true, "isFirstUniqueIndex": false }, "uniqueIndexInfo": [], "foreignIndexInfo": [], "normalIndexInfo": [], "databaseInfo": { "databaseType": "MySQL", "version": "5.7.35-log" }, "totalRows": 0 }, "values": [ { "data": 115 }, { "data": { "hb": [ 104, 101, 108, 108, 111 ], "offset": 0, "isReadOnly": false, "bigEndian": true, "nativeByteOrder": false, "mark": -1, "position": 0, "limit": 9, "capacity": 9, "address": 0 }, "charset": "utf8mb4" } ], "size": 45 }, "afterImage": { "recordSchema": { "recordFields": [ { "fieldName": "id", "rawDataTypeNum": 8, "isPrimaryKey": true, "isUniqueKey": false, "fieldPosition": 0 }, { "fieldName": "topic", "rawDataTypeNum": 253, "isPrimaryKey": false, "isUniqueKey": false, "fieldPosition": 1 } ], "nameIndex": { "id": { "fieldName": "id", "rawDataTypeNum": 8, "isPrimaryKey": true, "isUniqueKey": false, "fieldPosition": 0 }, "topic": { "fieldName": "topic", "rawDataTypeNum": 253, "isPrimaryKey": false, "isUniqueKey": false, "fieldPosition": 1 } }, "schemaId": "(hangzhou-test-db,hangzhou-test-db,message_info)", "databaseName": "hangzhou-test-db", "tableName": "message_info", "primaryIndexInfo": { "indexType": "PrimaryKey", "indexFields": [ { "fieldName": "id", "rawDataTypeNum": 8, "isPrimaryKey": true, "isUniqueKey": false, "fieldPosition": 0 } ], "cardinality": 0, "nullable": true, "isFirstUniqueIndex": false }, "uniqueIndexInfo": [], "foreignIndexInfo": [], "normalIndexInfo": [], "databaseInfo": { "databaseType": "MySQL", "version": "5.7.35-log" }, "totalRows": 0 }, "values": [ { "data": 115 }, { "data": { "hb": [ 98, 121, 101 ], "offset": 0, "isReadOnly": false, "bigEndian": true, "nativeByteOrder": false, "mark": -1, "position": 0, "limit": 11, "capacity": 11, "address": 0 }, "charset": "utf8mb4" } ], "size": 47 } }, "id": "12f701a43741d404fa9a7be89d9acae0-321****", "source": "DTSstreamDemo", "specversion": "1.0", "type": "dts:ConsumeMessage", "datacontenttype": "application/json; charset=utf-8", "time": "2022-06-10T07:55:57Z", "subject": "acs:dts:cn-hangzhou:12345****:kk123abc60g782/dtsabcdet1ro" } ]For information about the parameters defined in the CloudEvents specification, see Overview.
The following table describes the parameters contained in the data field.
Parameter
Type
Description
id
String
The ID of the DTS data entry.
topicPartition
Array
The partition information about the topic to which the event is pushed.
hash
String
The underlying storage parameter of DTS.
partition
String
The partition.
topic
String
The topic name.
offset
Int
The offset of the DTS data entry.
sourceTimestamp
Int
The timestamp that indicates when the DTS data entry was generated.
operationType
String
The type of operation involved in the DTS data entry.
schema
Array
The schema information about the database.
recordFields
Array
The details of fields.
fieldName
String
The field name.
rawDataTypeNum
Int
The mapped value of the field type.
The value of this parameter corresponds to the value of the dataTypeNumber field in the deserialized incremental data from the change tracking instance. For more information, see Use a Kafka client to consume tracked data.
isPrimaryKey
Boolean
Indicates whether the field is a primary key field.
isUniqueKey
Boolean
Indicates whether the field has a unique key.
fieldPosition
String
The field position.
nameIndex
Array
The indexing information about the fields based on field names.
schemaId
String
The ID of the database schema.
databaseName
String
The database name.
tableName
String
The table name.
primaryIndexInfo
String
The primary key indexes.
indexType
String
The index type.
indexFields
Array
The fields on which the indexes are created.
cardinality
String
The cardinality of the primary keys.
nullable
Boolean
Indicates whether the primary keys can be null.
isFirstUniqueIndex
Boolean
Indicates whether the index is the first unique index.
uniqueIndexInfo
String
The unique indexes.
foreignIndexInfo
String
The indexes for foreign keys.
normalIndexInfo
String
The regular indexes.
databaseInfo
Array
The information about the database.
databaseType
String
The database engine.
version
String
The database engine version.
totalRows
Int
The total number of rows in the table.
beforeImage
String
The image that records field values before the operation is performed.
values
String
The field values recorded.
size
Int
The size of the fields recorded.
afterImage
String
The image that records field values after the operation is performed.
Step 3: Write and test the function code
After you create the trigger, you can write and test the function code. When the DTS change tracking task captures incremental data from the database, the trigger automatically invokes the function.
On the Code tab of the function details page, write code in the code editor and then click Deploy Code.
This topic uses Node.js code as an example.
'use strict'; /* To enable the initializer feature please implement the initializer function as below: exports.initializer = (context, callback) => { console.log('initializing'); callback(null, ''); }; */ exports.handler = (event, context, callback) => { console.log("event: %s", event); // Parse the event parameters and process the event. callback(null, 'return result'); }Click Test Function.
More information
In addition to the Function Compute console, you can configure triggers by using one of the following methods:
Use Serverless Devs tool to configure triggers. For more operations, please refer to Common commands of Serverless Devs.
Use SDKs to configure triggers. For more information, see SDKs.
To modify or delete an existing trigger, see Manage triggers.