The original RDS trigger for Function Compute will be retired on January 31, 2023. We recommend that you use the latest data path for RDS triggers: . This topic describes how to configure the new data path and highlights differences in event parameters.
Prerequisites
You have created a database account to configure a DTS change tracking task. For more information, see Create an Account.
You have enabled EventBridge and granted the required permissions. For more information, see Enable EventBridge and Grant Permissions.
You have created a service and a function in Function Compute. For more information, see Create a Service and Create a Function.
Procedure
Configure DTS
Log on to the Data Transmission Service console. In the top navigation bar, select a region.
In the navigation pane on the left, click Data Subscription, and then click Create Task.
On the Create Task page, in the Configure Source and Destination Database Information wizard, set Source Database Information and Consumption Network Type, and then click Test Connection to Proceed.
The key configuration items are described below. Use the default values for all other configuration items.
NoteThis connection test requires a public network connection. Make sure that your RDS instance has public network access enabled. You can configure this on the Database Connection page of the RDS instance.
Configuration Item
Description
Example
Source Database Information
Database Type
Select the database type to track.
MySQL
RDS Instance ID
You can select the RDS instance that you want to subscribe to.
rm-bp1pw60i18f2x****
Database Account
You can enter the RDS database account for the subscription.
db_chi
Database Password
Enter the password for the database account.
*************
Consumer Network Type region
Virtual Private Cloud (VPC)
Select the VPC for the change tracking instance.
NoteThe VPC and vSwitch for consumption must match those configured for the RDS instance. You can view them on the Database Connection page of the RDS instance.
vpc-bp12c5dzorfoizcez****
vSwitch
Select the vSwitch for the change tracking instance.
NoteThe VPC and vSwitch for consumption must match those configured for the RDS instance. You can view them on the Database Connection page of the RDS instance.
vsw-bp1lt2oxenx87jvc8****
On the Create Task page, in the Configure Task Objects and Advanced Configuration wizard, select the data types and tables to track. Then, complete the advanced configuration and precheck as prompted.
On the Create Task page, in the Purchase wizard, purchase an instance as prompted. Then, start the task from the change tracking task list.
Click the task ID to open the task. On the Task Management page, click Data Consumption and then click Create Consumer Group. In the Create Consumer Group panel, set Consumer Group Name, Account, and Password.
Configure EventBridge
Log on to the EventBridge console. In the left-side navigation pane, click Event Streams.
In the top navigation bar, select a region and click Create Event Stream.
In the Create Event Stream panel, configure the following parameters and then click Create.
On the Basic Information tab, set Event Stream Name and Description, and then click Next.
On the Event Source tab, set Event Provider to Database DTS. Select the Consumer Group that you created in Step6. Enter the Password and Consumer Offset. Then, click Next.
Optional: On the Rules tab, configure event rules and then click Next.
On the Target tab, set Service Type to Function Compute. Then, enter the names of the service and function that you created.
After the event stream is created, you can view it in the event stream list. Its status is Running. The Event Source is set to Database DTS and the Event Target is set to Function Compute.
After you complete all the configurations, the data path is ready.
Update Your Function Code Based on Event Differences
The event format for the new data path for the RDS trigger differs from the format of the original RDS trigger. You must adjust your function’s event parsing logic accordingly.
The original RDS trigger, which uses the path, uses an object-based event format. For more information, see Code Example: Using protobuf-formatted events with the RDS trigger.
The new RDS trigger, which uses the path, uses an array-based event format. Each element in the array is a string that is converted from an object. For more information, see Data Transmission Service (DTS).
The following sections provide Python and Java code examples for both paths. Compare the examples and update your function’s event parsing logic.
Python
event parsing code
# -*- coding: utf-8 -*- import json import logging def handler(event, context): logger = logging.getLogger() eventObj = json.loads(event) logger.info("rds trigger event = {}".format(eventObj)) # Add your custom logic here, such as updating a Redis cache. return "OK"event parsing code
# -*- coding: utf-8 -*- import logging import json def handler(event, context): logger = logging.getLogger() # Convert the event input to a list of strings. evt_list = json.loads(event) for evt in evt_list: # Parse each string in the list into JSON. evt_obj = json.loads(evt) logger.info(evt_obj) return "OK"
Java
event parsing code
package example; import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; import com.google.gson.*; import com.google.gson.JsonParser; import com.google.gson.JsonObject; import com.aliyun.fc.runtime.Context; import com.aliyun.fc.runtime.StreamRequestHandler; import java.io.*; import java.nio.charset.StandardCharsets; public class App implements StreamRequestHandler { @Override public void handleRequest( InputStream inputStream, OutputStream outputStream, Context context) throws IOException { // Read data from the input stream into a byte[] buffer. ByteArrayOutputStream result = new ByteArrayOutputStream(); byte[] buffer = new byte[1024]; for (int length; (length = inputStream.read(buffer)) != -1; ) { result.write(buffer, 0, length); } // Convert the byte[] data to an object. String stringData = result.toString(StandardCharsets.UTF_8.name()); JsonObject jsonObj = (JsonObject)(new JsonParser().parse(stringData)); System.out.println(jsonObj); outputStream.write(new String("OK").getBytes()); } }Sample pom.xml file:
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>example</groupId> <artifactId>FCJavaDemo</artifactId> <packaging>jar</packaging> <version>1.0-SNAPSHOT</version> <name>FCJavaDemo</name> <dependencies> <dependency> <groupId>junit</groupId> <artifactId>junit</artifactId> <version>3.8.1</version> <scope>test</scope> </dependency> <dependency> <groupId>com.aliyun.fc.runtime</groupId> <artifactId>fc-java-core</artifactId> <version>1.4.1</version> </dependency> <dependency> <groupId>com.google.code.gson</groupId> <artifactId>gson</artifactId> <version>2.8.5</version> </dependency> <dependency> <groupId>com.alibaba</groupId> <artifactId>fastjson</artifactId> <version>1.2.83</version> </dependency> </dependencies> <build> <plugins> <plugin> <artifactId>maven-assembly-plugin</artifactId> <configuration> <descriptorRefs> <descriptorRef>jar-with-dependencies</descriptorRef> </descriptorRefs> </configuration> <executions> <execution> <id>make-my-jar-with-dependencies</id> <phase>package</phase> <goals> <goal>single</goal> </goals> </execution> </executions> </plugin> </plugins> </build> <properties> <maven.compiler.target>1.8</maven.compiler.target> <maven.compiler.source>1.8</maven.compiler.source> <maven.test.skip>true</maven.test.skip> </properties> </project>event parsing code
package example; import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; import com.google.gson.*; import com.google.gson.JsonParser; import com.google.gson.JsonObject; import com.aliyun.fc.runtime.Context; import com.aliyun.fc.runtime.StreamRequestHandler; import java.io.*; import java.nio.charset.StandardCharsets; public class App implements StreamRequestHandler { @Override public void handleRequest(InputStream inputStream, OutputStream outputStream, Context context) throws IOException { // Read data from the input stream into a byte[] buffer. ByteArrayOutputStream result = new ByteArrayOutputStream(); byte[] buffer = new byte[1024]; for (int length; (length = inputStream.read(buffer)) != -1; ) { result.write(buffer, 0, length); } // Convert the byte[] data to a String[]. String stringData = result.toString(StandardCharsets.UTF_8.name()); Gson gson =new Gson(); String[] stringArrayData = gson.fromJson(stringData, String[].class); // Iterate over each string in the array and parse it into JSON. The data field contains the raw DTS data. for(String elem : stringArrayData){ JsonObject jsonObj = (JsonObject)(new JsonParser().parse(elem)); System.out.println(jsonObj.get("data")); } outputStream.write((new String("OK")).getBytes()); } }Sample pom.xml file:
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>example</groupId> <artifactId>FCJavaDemo</artifactId> <packaging>jar</packaging> <version>1.0-SNAPSHOT</version> <name>FCJavaDemo</name> <dependencies> <dependency> <groupId>junit</groupId> <artifactId>junit</artifactId> <version>3.8.1</version> <scope>test</scope> </dependency> <dependency> <groupId>com.aliyun.fc.runtime</groupId> <artifactId>fc-java-core</artifactId> <version>1.4.1</version> </dependency> <dependency> <groupId>com.google.code.gson</groupId> <artifactId>gson</artifactId> <version>2.8.5</version> </dependency> <dependency> <groupId>com.alibaba</groupId> <artifactId>fastjson</artifactId> <version>1.2.83</version> </dependency> </dependencies> <build> <plugins> <plugin> <artifactId>maven-assembly-plugin</artifactId> <configuration> <descriptorRefs> <descriptorRef>jar-with-dependencies</descriptorRef> </descriptorRefs> </configuration> <executions> <execution> <id>make-my-jar-with-dependencies</id> <phase>package</phase> <goals> <goal>single</goal> </goals> </execution> </executions> </plugin> </plugins> </build> <properties> <maven.compiler.target>1.8</maven.compiler.target> <maven.compiler.source>1.8</maven.compiler.source> <maven.test.skip>true</maven.test.skip> </properties> </project>