All Products
Search
Document Center

RDS Trigger

Last Updated: Mar 15, 2019

RDS triggers

Alibaba Cloud ApsaraDB Relational Database Service (RDS) is a stable, reliable, and scalable online database service. Alibaba Cloud integrates ApsaraDB RDS with Function Compute to meet user requirements on heterogeneous data forwarding and lightweight computing, such as field merging, splitting, and transformation. RDS is connected to Function Compute as an event source. By using the serverless computing capability of Function Compute, users can customize their data processing methods. A typical example is Redis cache elimination.

fc-rds

Take the bls business as an example. The bls-manager is a core control component that performs read and write operations on metabases. The bls-console is a peripheral component that manages the operations in the O&M console. The bIs-console performs query operations, and caches the data retrieved to Redis.

  • Problem

    Instead of promptly synchronizing the metadata updated by the bls-manager, the bls-console obtains metadata from the metabase only after the key expires.

  • Solution

    To ensure that the Redis cache is cleared or updated in time after metadata is updated, use Function Compute to subscribe to the binlog of RDS in real time and delete or update the key in Redis by using the corresponding DML statement.

Current support scope of RDS trigger

  • Specifications

ApsaraDB RDS high-availability edition and basic edition for MySQL 5.6 and 5.7 are supported. More types of databases will be supported in the future.

  • Network

Both classic networks and VPC networks are supported.

  • Region

RDS trigger is currently supported in Beijing, Qingdao, Shanghai, Hangzhou, Shenzhen, Hong Kong, US East, US West, Singapore, Australia, and Germany. It is not supported in Japan, India, and Zhangjiakou.

Event definition

When an insert, update, or delete operation is performed on a specific table in a database under the RDS instance, the change details will be encoded as a JSON string or protobuf byte stream and will be transferred to Function Compute for processing.

Sample format of an RDS event

  • JSON (for example, to delete a row from a table):

    1. {
    2. "offset": "13054",
    3. "timestamp": "1539182054",
    4. "entries": [{
    5. "operation": "BEGIN",
    6. "timestamp": "1539182055",
    7. "id": "90bd7f98-cac6-11e8-bb08-506b4b2364ec:32690"
    8. }, {
    9. "operation": "DELETE",
    10. "timestamp": "1539182055",
    11. "id": "90bd7f98-cac6-11e8-bb08-506b4b2364ec:32690",
    12. "sequence": "1",
    13. "dbName": "fc-wp",
    14. "tableName": "wp_posts",
    15. "beforeRow": [{
    16. "name": "id",
    17. "charset": "utf8",
    18. "typeNum": -5,
    19. "orgTypeName": "bigint(20)",
    20. "isPk": true,
    21. "value": "MTI="
    22. }, {
    23. "name": "content",
    24. "charset": "utf8",
    25. "idx": 1,
    26. "typeNum": -4,
    27. "orgTypeName": "longtext",
    28. "value": "aGVsbG8="
    29. }, {
    30. "name": "ver",
    31. "charset": "utf8",
    32. "idx": 2,
    33. "typeNum": 4,
    34. "orgTypeName": "int(11)",
    35. "value": "MjQ="
    36. }]
    37. }, {
    38. "operation": "COMMIT",
    39. "timestamp": "1539182055",
    40. "id": "90bd7f98-cac6-11e8-bb08-506b4b2364ec:32690",
    41. "sequence": "2"
    42. }]
    43. }
  • Complete definition of protobuf

    1. syntax = "proto3";
    2. package protocol;
    3. enum DBType {
    4. MySQL = 0;
    5. Redis = 1;
    6. Mongo = 2;
    7. HBase = 3;
    8. }
    9. message Message {
    10. //The offset acknowledged after this message is consumed. You can write code.
    11. int64 offset = 1;
    12. //The timestamp mapping the record that is acknowledged after this message is consumed.
    13. int64 timestamp = 2;
    14. //Reserved: It can be used to transmit large data rows.
    15. int32 spare_flag = 3;
    16. int32 spare_seq = 4;
    17. //The version of the message.
    18. int32 version = 5;
    19. //The data source.
    20. DBType db_type = 6;
    21. //The data entry.
    22. repeated Entry entries = 7;
    23. }
    24. //The operation.
    25. enum OpType {
    26. UNKOWN_TYPE = 0;
    27. BEGIN = 1;
    28. COMMIT = 2;
    29. //The query not used in the following DDL operations.
    30. QUERY = 3;
    31. INSERT = 4;
    32. UPDATE = 5;
    33. DELETE = 6;
    34. CREATE = 7;
    35. ALTER = 8;
    36. DROP = 9;
    37. TRUNCATE = 10;
    38. RENAME = 11;
    39. //CREATE INDEX
    40. CINDEX = 12;
    41. //DROP INDEX
    42. DINDEX = 13;
    43. OPTIMIZE = 14;
    44. XA = 15;
    45. }
    46. message Entry {
    47. OpType operation = 1;
    48. //The timestamp, in seconds.
    49. int64 timestamp = 2;
    50. //The transaction ID.
    51. string id = 3;
    52. //The row number in a transaction.
    53. int64 sequence = 4;
    54. //The db_name of a DML operation, that is, the default db of the session during a DDL operation.
    55. string db_name = 5;
    56. //The table_name of the DML operation.
    57. string table_name = 6;
    58. //after image, that is, the image after an operation.
    59. repeated Field row = 7;
    60. //before image, that is, the image before an operation.
    61. repeated Field before_row = 8;
    62. //The non-DML SQL statements.
    63. string query = 9;
    64. }
    65. message Field {
    66. //The column name.
    67. string name = 1;
    68. //The character set.
    69. string charset = 3;
    70. //The column number.
    71. int32 idx = 2;
    72. //Corresponding to type in Java.
    73. int32 type_num = 4;
    74. //Corresponding to type num in MySQL.
    75. int32 org_type = 5;
    76. //The name of the original type in the db.
    77. string org_type_name = 6;
    78. //The reserved flag.
    79. int32 flag = 7;
    80. //Whether the value is null.
    81. bool is_null = 8;
    82. //Whether the value is pk.
    83. bool is_pk = 9;
    84. //Whether the value is unsigned.
    85. bool is_unsigned = 10;
    86. //Whether the value is a timestamp (The value displayed in the timestamp is related to the time zone. The value of the Standard Time Zone is recorded here).
    87. bool is_timestamp = 11;
    88. //All values are represented by bytes. The consumer needs to use charset and bytes together to generate a corresponding string. Then, the consumer converts the string to the type value corresponding to type_num.
    89. //If charset is empty, the metadata type is binary.
    90. bytes value = 12;
    91. }

Configure an RDS trigger

Trigger example: rds_trigger.yml

  1. triggerConfig:
  2. subscriptionObjects: ["db1.table1", "db1.table2"]
  3. retry: 1
  4. concurrency: 2
  5. eventFormat: json

Trigger parameters

Parameter name Constraint Default value Type Description
subscriptionObjects None None String The subscription object. Currently, subscription objects are supported down to the table level. Function execution can be triggered only by updating the tables. For example, the parameter is as follows:[“db1.table1”, “db2.table2”]
retry [0,3] 3 Integer The number of retries. If failure persists after a specified number of retries, skip the failed event and continue subsequent calls.
concurrency [1,5] 1 Integer The number of concurrent calls. If you are concerned about the event sequence, set this parameter to 1. The functions will be called in the sequence that the transaction commits in the binlog. If you do not concern about the event sequence, set this parameter to a larger value to increase concurrencies and improve performance.
eventFormat json,protobuf protobuf String The format of an event.

Example

You can create an RDS trigger in the Function Compute console, using the fcli, or through SDKs. The following section gives an example for each method.

Example 1: Create an RDS trigger in the console

This example demonstrates how to create an RDS trigger in the console. You can create an RDS trigger either when or after creating a function. For more information about creating triggers, see Create a trigger and Trigger operation.

Log on to the Function Compute console, and then select a region and service. If no service is available, create a service by following the instructions provided in Create a service.

Create a trigger when creating a function

  1. Click Create Function. On the Create Function page, select Empty Function, and click Next.

  2. Select RDS Trigger, and complete the configuration according to the following figure. For example, set EventFormat to json.

rds1

  1. Create the function, and specify the parameters. Select In-line Edit, and paste the following Python example code, then click Next.
  1. import json
  2. import logging
  3. def handler(event, context):
  4. logger = logging.getLogger()
  5. eventObj = json.loads(event)
  6. logger.info("rds trigger event = {}".format(eventObj))
  7. return "OK"
  1. ( Optional) Configure permissions and click Next. After you have confirmed the settings, click Create.

Create a trigger after creating a function

  1. Select an existing function and click Triggers > Create Trigger.
  2. On the Create Trigger page, configure the RDS trigger according to the following figure, and then click OK.

Note: You may use Quick Authorize to speed up the authorization process if fine-grained authorization is not required.

rds2

Example 2: Create an RDS trigger by using the fcli

First, create a yaml file containing Trigger Config. Assume that you create an RDS trigger based on table 1 and table 2 in db1 on the RDS instance. The yaml file is as follows:

  1. triggerConfig:
  2. subscriptionObjects: ["db1.table1", "db1.table2"]
  3. retry: 1
  4. concurrency: 2
  5. eventFormat: json

Create an RDS trigger in the corresponding function directory.

  1. mkt serviceName/functionName -t rds -c TriggerConfig.yaml

For more information about using the fcli, see the fcli Developer Guide.

Example 3: Create an RDS trigger through SDKs

The following example demonstrates how to create an RDS trigger using the fc-python-sdk. Function Compute also provides fc-nodejs-sdk and fc-java-sdk.

Create a trigger

  1. client = fc2. Client(
  2. endpoint='<Your Endpoint>',
  3. accessKeyID='<Your AccessKeyID>',
  4. accessKeySecret='<Your AccessKeySecret>')
  5. service_name = 'serviceName'
  6. function_name = 'functionName'
  7. trigger_name = 'triggerName'
  8. trigger_type = 'rds'
  9. source_arn = 'acs:rds:cn-hangzhou:<Your Account ID>:dbinstance/<Your Rds instance ID>'
  10. invocation_role = 'acs:ram::<Your Account ID>:role/<Your Invocation Role>'
  11. trigger_config = {
  12. "subscriptionObjects": ["db1.table1", "db1.table2"],
  13. "retry": 2,
  14. "concurrency": 1,
  15. "eventFormat": "json"
  16. }
  17. client.create_trigger(service_name, function_name, trigger_name, trigger_type, trigger_config, source_arn, invocation_role)

Create the function

  1. # For example, eventFormat is json
  2. import json
  3. import logging
  4. def handler(event, context):
  5. logger = logging.getLogger()
  6. eventObj = json.loads(event)
  7. logger.info("rds trigger event = {}".format(eventObj))
  8. return 'OK'

If you have any questions, leave a comment or join the Function Compute customer support group (DingTalk group number: 11721331).