This topic provides the DDL syntax that is used to create an ApsaraDB RDS for MySQL result table, describes the parameters in the WITH clause, and provides data type mappings.

What is ApsaraDB RDS for MySQL?

Originally based on a branch of MySQL, ApsaraDB RDS for MySQL provides excellent performance. It is a tried and tested solution that handled the high-volume concurrent traffic during Double 11. ApsaraDB RDS for MySQL provides basic features such as whitelist configuration, backup and restoration, Transparent Data Encryption (TDE), data migration, and management for instances, accounts, and databases. For more information about ApsaraDB RDS for MySQL, see Overview of ApsaraDB RDS for MySQL.

Prerequisites

An ApsaraDB RDS for MySQL database and an ApsaraDB RDS for MySQL table are created. For more information, see Create accounts and databases for an ApsaraDB RDS for MySQL instance.

DDL syntax

CREATE TABLE rds_sink (
   id INT,
   num BIGINT,
   PRIMARY KEY (id) NOT ENFORCED
) WITH (
   'connector' = 'rds',
   'password' = '<yourPassword>',
   'tableName' = '<yourTablename>',
   'url' = '<yourUrl>',
   'userName' = '<yourUsername>'
);
Note
  • Flink converts each row of output data to a line of SQL statement and then writes and executes the statement in the destination ApsaraDB RDS or Distributed Relational Database Service (DRDS) database. In this case, data is written to the result table. To write multiple rows of output data at a time, add ?rewriteBatchedStatements=true to the end of the URL. This improves the system performance.
  • ApsaraDB RDS for MySQL supports the auto-increment primary key. Therefore, you do not need to declare the auto-increment field in the DDL statement. For example, if you use ID as an auto-increment field, do not declare the ID field in the DDL statement. When a row of output data is written to the ApsaraDB RDS for MySQL database, the value for the auto-increment field is automatically filled.
  • If a DRDS result table is partitioned, the shard key must be declared in PRIMARY KEY() of the DDL statement. Otherwise, you cannot write data to the partitioned table.
  • You must declare at least one non-primary key in the DDL statement. Otherwise, an error is returned.

Parameters in the WITH clause

Parameter Description Required Remarks
connector The type of the result table. Yes Set the value to rds.
password The password that is used to log on to the database. Yes N/A.
tableName The name of the table. Yes N/A.
url The URL of the database. Yes The virtual private cloud (VPC) endpoint of the ApsaraDB RDS database. This also indicates the internal endpoint. For more information, see View and change the internal and public endpoints and port numbers of an ApsaraDB RDS for MySQL instance. Set the value in the jdbc:mysql://<Internal endpoint>/<databaseName> format. Replace databaseName with the name of your database.
Note To write multiple rows of output data at a time, add ?rewriteBatchedStatements=true to the end of the URL. This improves the system performance.
userName The username that is used to log on to the database. Yes N/A.
maxRetryTimes The number of retries for writing data to the table after data writing fails. No Default value: 3.
batchSize The number of data records that are written at a time. No Default value: 5000.
flushIntervalMs The interval at which the cache is cleared. No Unit: milliseconds. Default value: 0. This value indicates that cache flushing is disabled. If you set the value to 2000, all cached data is automatically written to the result table when the number of input data records does not reach the specified value within 2 seconds.

Data type mapping

Data type of ApsaraDB RDS for MySQL Data type of Flink
BOOLEAN BOOLEAN
TINYINT TINYINT
SMALLINT SMALLINT
TINYINT UNSIGNED SMALLINT
INT INT
SMALLINT UNSIGNED INT
BIGINT BIGINT
INT UNSIGNED BIGINT
BIGINT UNSIGNED DECIMAL(20,0)
FLOAT FLOAT
DECIMAL DECIMAL
DOUBLE DOUBLE
DATE DATE
TIME TIME
TIMESTAMP TIMESTAMP
VARCHAR VARCHAR
VARBINARY VARBINARY

Sample code

CREATE TEMPORARY TABLE datagen_source (
  `name` VARCHAR,
  `age` INT
) WITH (
  'connector' = 'datagen'
);

CREATE TEMPORARY TABLE rds_sink (
  `name` VARCHAR,
  `age` INT
) WITH (
  'connector' = 'rds',
  'password' = '<yourPassword>',
  'tableName' = '<yourTablename>',
  'url' = '<yourUrl>',
  'userName' = '<yourUsername>'
);

INSERT INTO rds_sink
SELECT  * FROM datagen_source;

FAQ

  • Q: When output data is written to an ApsaraDB RDS for MySQL result table, is a new data record inserted into the table or is the result table updated based on the primary key value?

    A: If a primary key is defined in the DDL statement, the following statement is executed to write output data: INSERT INTO tablename(field1,field2, field3, ...) VALUES(value1, value2, value3, ...) ON DUPLICATE KEY UPDATE field1=value1,field2=value2, field3=value3, ...;. If the primary key value in the output data exists in the table, the matching record is updated. Otherwise, the output data is inserted as a new record. If no primary key is defined in the DDL statement, the INSERT INTO statement is executed to insert the output data.

  • Q: How do I perform the GROUP BY operation by using the unique index of an ApsaraDB RDS for MySQL result table?
    A: Use the following method to resolve the issue:
    • You must declare the unique index in the GROUP BY clause in your Flink job.
    • An ApsaraDB RDS for MySQL table has only one auto-increment primary key, which cannot be declared as a primary key in a Flink job.