全部产品
Search
文档中心

函数计算:【产品变更】原RDS触发器下线通知

更新时间:Mar 14, 2026

函数计算原RDS触发器将于2023年01月31日下线,建议您使用函数计算提供的最新RDS触发链路,即。本文介绍RDS触发器的最新链路配置步骤以及Event参数的差异性。

前提条件

操作步骤

配置DTS

  1. 登录数据传输服务控制台,然后在顶部菜单栏,选择地域。

  2. 在左侧导航栏,单击数据订阅,然后单击创建任务

  3. 创建任务页面的配置源库及目标库信息配置向导,设置源库信息消费网络类型,然后单击测试连接以进行下一步

    主要配置项解释如下,其余配置项保持默认值即可。

    说明

    此测试连接必须通过公网连接,请确保RDS实例已开通外网连接功能。您可以在RDS实例的数据库连接页面配置。

    配置项

    说明

    示例

    源库信息区域

    数据库类型

    选择待订阅的数据库类型。

    MySQL

    RDS实例ID

    选择待订阅的RDS实例。

    rm-bp1pw60i18f2x****

    数据库账号

    填写待订阅的RDS数据库账号。

    db_chi

    数据库密码

    填写待订阅的RDS数据库账号对应的密码。

    *************

    消费网络类型区域

    专有网络

    选择数据订阅实例所属的专有网络。

    说明

    消费网络的专有网络虚拟交换机必须与RDS实例设置的VPC和vSwitch一致。您可以在RDS实例的数据库连接页面查看。

    vpc-bp12c5dzorfoizcez****

    虚拟交换机

    选择数据订阅实例所属的交换机。

    说明

    消费网络的专有网络虚拟交换机必须与RDS实例设置的VPC和vSwitch一致。您可以在RDS实例的数据库连接页面查看。

    vsw-bp1lt2oxenx87jvc8****

  4. 创建任务页面的配置任务对象及高级配置配置向导,选择订阅的数据类型和数据表,然后根据界面提示完成高级配置和预检查操作。

  5. 创建任务页面的购买配置向导,根据界面提示购买实例,然后在订阅任务列表,启动任务。

  6. 单击任务ID打开任务,在任务管理页面,单击数据消费,然后单击新增消费组,在弹出的新增消费组面板设置消费组名称账号密码等。

配置EventBridge

  1. 登录事件总线EventBridge控制台,在左侧导航栏,单击事件流
  2. 在顶部菜单栏,选择地域,然后单击创建事件流
  3. 创建事件流面板,设置以下配置项,然后单击创建

    1. 基本信息页签,设置事件流名称描述,然后单击下一步

    2. 事件源页签,事件提供方选择数据库 DTS,选择步骤6设置的消费组,并填写密码消费位点。然后单击下一步

    3. 可选:规则页签,设置事件规则,然后单击下一步

    4. 目标页签,服务类型选择函数计算,然后填写已创建的服务函数

    创建完成后,您可以在事件流列表查看事件源数据库DTS至事件目标函数计算的任务,其状态为运行中

完成以上所有配置后,的数据链路配置成功。

根据Event差异调整函数代码

RDS触发器最新链路的数据格式和原RDS触发器的数据格式不相同,具体差异点如下所示。需要在代码中将函数对Event的解析处理方式做调整。

以下提供Python和Java两条链路的Event解析Demo。您可以对比两者的差异,修改函数Event解析逻辑。

Python

  • 事件解析代码

    # -*- coding: utf-8 -*-
    import json
    import logging
    
    def handler(event, context):
      logger = logging.getLogger()
      eventObj = json.loads(event)
      logger.info("rds trigger event = {}".format(eventObj))
      # 您可以在此处增加自己的处理逻辑,例如,更新Redis缓存。
      return "OK"
  • 事件解析代码

    # -*- coding: utf-8 -*-
    import logging
    import json
    
    def handler(event, context):
      logger = logging.getLogger()
      # 将Event入参转换为String列表。
      evt_list = json.loads(event)
      for evt in evt_list:
          # 将列表中每一个元素转为Json类型。
          evt_obj = json.loads(evt)
          logger.info(evt_obj)
      return "OK"

Java

  • 事件解析代码

    package example;
    
    import java.io.IOException;
    import java.io.InputStream;
    import java.io.OutputStream;
    
    import com.google.gson.*;
    import com.google.gson.JsonParser;
    import com.google.gson.JsonObject;
    import com.aliyun.fc.runtime.Context;
    import com.aliyun.fc.runtime.StreamRequestHandler;
    
    import java.io.*;
    import java.nio.charset.StandardCharsets;
    
    public class App implements StreamRequestHandler {
        @Override
        public void handleRequest(
                InputStream inputStream, OutputStream outputStream, Context context) throws IOException {
              // 从input入参读取数据,并存储到byte[] buffer中。
            ByteArrayOutputStream result = new ByteArrayOutputStream();
            byte[] buffer = new byte[1024];
            for (int length; (length = inputStream.read(buffer)) != -1; ) {
                result.write(buffer, 0, length);
            }
    
            // 将byte[]数据转换为Object类型。
            String stringData = result.toString(StandardCharsets.UTF_8.name());
            JsonObject jsonObj = (JsonObject)(new JsonParser().parse(stringData));
            System.out.println(jsonObj);
            outputStream.write(new String("OK").getBytes());
        }
    }
                        

    pom.xml文件示例如下。

    <project xmlns="http://maven.apache.org/POM/4.0.0"
             xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
      <modelVersion>4.0.0</modelVersion>
      <groupId>example</groupId>
      <artifactId>FCJavaDemo</artifactId>
      <packaging>jar</packaging>
      <version>1.0-SNAPSHOT</version>
      <name>FCJavaDemo</name>
    
      <dependencies>
        <dependency>
          <groupId>junit</groupId>
          <artifactId>junit</artifactId>
          <version>3.8.1</version>
          <scope>test</scope>
        </dependency>
        <dependency>
          <groupId>com.aliyun.fc.runtime</groupId>
          <artifactId>fc-java-core</artifactId>
          <version>1.4.1</version>
        </dependency>
        <dependency>
          <groupId>com.google.code.gson</groupId>
          <artifactId>gson</artifactId>
          <version>2.8.5</version>
        </dependency>
        <dependency>
          <groupId>com.alibaba</groupId>
          <artifactId>fastjson</artifactId>
          <version>1.2.83</version>
        </dependency>
      </dependencies>
    
      <build>
        <plugins>
          <plugin>
            <artifactId>maven-assembly-plugin</artifactId>
            <configuration>
              <descriptorRefs>
                <descriptorRef>jar-with-dependencies</descriptorRef>
              </descriptorRefs>
            </configuration>
            <executions>
              <execution>
                <id>make-my-jar-with-dependencies</id>
                <phase>package</phase>
                <goals>
                  <goal>single</goal>
                </goals>
              </execution>
            </executions>
          </plugin>
        </plugins>
      </build>
    
      <properties>
        <maven.compiler.target>1.8</maven.compiler.target>
        <maven.compiler.source>1.8</maven.compiler.source>
        <maven.test.skip>true</maven.test.skip>
      </properties>
    </project>
  • 事件解析代码

    package example;
    
    import java.io.IOException;
    import java.io.InputStream;
    import java.io.OutputStream;
    
    import com.google.gson.*;
    import com.google.gson.JsonParser;
    import com.google.gson.JsonObject;
    import com.aliyun.fc.runtime.Context;
    import com.aliyun.fc.runtime.StreamRequestHandler;
    
    import java.io.*;
    import java.nio.charset.StandardCharsets;
    
    public class App implements StreamRequestHandler {
    
        @Override
        public void handleRequest(InputStream inputStream, OutputStream outputStream, Context context) throws IOException {
              // 从input入参读取数据,并存储到byte[] buffer中
            ByteArrayOutputStream result = new ByteArrayOutputStream();
            byte[] buffer = new byte[1024];
            for (int length; (length = inputStream.read(buffer)) != -1; ) {
                result.write(buffer, 0, length);
            }
    
        // 将byte[]类型数据转换为String[]类型。
            String stringData = result.toString(StandardCharsets.UTF_8.name());
            Gson gson =new Gson();
            String[] stringArrayData = gson.fromJson(stringData, String[].class);
    
            // 遍历String[]中的每一个元素,并将每一个元素转为Json格式数据。其中data字段为DTS原数据。
            for(String elem : stringArrayData){
                JsonObject jsonObj = (JsonObject)(new JsonParser().parse(elem));
                System.out.println(jsonObj.get("data"));
            }
            outputStream.write((new String("OK")).getBytes());
        }
    }

    pom.xml文件示例如下。

    <project xmlns="http://maven.apache.org/POM/4.0.0"
      xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
      <modelVersion>4.0.0</modelVersion>
      <groupId>example</groupId>
      <artifactId>FCJavaDemo</artifactId>
      <packaging>jar</packaging>
      <version>1.0-SNAPSHOT</version>
      <name>FCJavaDemo</name>
    
      <dependencies>
        <dependency>
          <groupId>junit</groupId>
          <artifactId>junit</artifactId>
          <version>3.8.1</version>
          <scope>test</scope>
        </dependency>
        <dependency>
          <groupId>com.aliyun.fc.runtime</groupId>
          <artifactId>fc-java-core</artifactId>
          <version>1.4.1</version>
        </dependency>
        <dependency>
          <groupId>com.google.code.gson</groupId>
          <artifactId>gson</artifactId>
          <version>2.8.5</version>
        </dependency>
        <dependency>
          <groupId>com.alibaba</groupId>
          <artifactId>fastjson</artifactId>
          <version>1.2.83</version>
        </dependency>
      </dependencies>
    
      <build>
        <plugins>
          <plugin>
            <artifactId>maven-assembly-plugin</artifactId>
            <configuration>
              <descriptorRefs>
                <descriptorRef>jar-with-dependencies</descriptorRef>
              </descriptorRefs>
            </configuration>
            <executions>
              <execution>
                <id>make-my-jar-with-dependencies</id>
                <phase>package</phase>
                <goals>
                  <goal>single</goal>
                </goals>
              </execution>
            </executions>
          </plugin>
        </plugins>
      </build>
    
      <properties>
        <maven.compiler.target>1.8</maven.compiler.target>
        <maven.compiler.source>1.8</maven.compiler.source>
        <maven.test.skip>true</maven.test.skip>
      </properties>
    </project>