This topic provides the DDL syntax that is used to create an ApsaraDB RDS for MySQL result table, describes the parameters in the WITH clause, and provides data type mappings.

What is ApsaraDB RDS for MySQL?

ApsaraDB RDS for MySQL is developed based on a branch of MySQL and provides excellent performance. ApsaraDB RDS for MySQL is a tried and tested solution that handled the high-volume concurrent traffic during Double 11. ApsaraDB RDS for MySQL provides basic features such as whitelist configuration, backup and restoration, Transparent Data Encryption (TDE), data migration, and management of instances, accounts, and databases. For more information about ApsaraDB RDS for MySQL, see Overview of ApsaraDB RDS for MySQL.

Prerequisites

Limits

Only Flink that uses Ververica Runtime (VVR) 2.0.0 or later supports ApsaraDB RDS for MySQL connectors.

DDL syntax

CREATE TABLE rds_sink (
   id INT,
   num BIGINT,
   PRIMARY KEY (id) NOT ENFORCED
) WITH (
   'connector' = 'rds',
   'tableName' = '<yourTablename>',
   'userName' = '<yourUsername>',
   'password' = '<yourPassword>',
   'url' = '<yourUrl>'
);
Note
  • Flink converts each row of output data into a line of SQL statement and then writes and executes the statement in the destination ApsaraDB RDS or Distributed Relational Database Service (DRDS) database. In this case, data is written to the result table. If you want to write multiple rows of data to the result table at the same time, you must add ?rewriteBatchedStatements=true to the URL to improve system performance.
  • ApsaraDB RDS for MySQL databases support the auto-increment primary key. Therefore, you do not need to declare the auto-increment field in the DDL statement. For example, if you use ID as an auto-increment field, do not declare the ID field in the DDL statement of Flink. When a row of output data is written to an ApsaraDB RDS for MySQL database, the value for the auto-increment field is automatically filled.
  • If a DRDS result table is partitioned, the shard key must be declared in PRIMARY KEY() of the DDL statement. Otherwise, you cannot write data to the partitioned table.
  • You must declare at least one non-primary key in the DDL statement. Otherwise, an error is returned.

Parameters in the WITH clause

Parameter Description Type Required Remarks
connector The type of the result table. String Yes Set the value to rds.
tableName The name of the table. String Yes N/A.
userName The username that is used to log on to the database. String Yes N/A.
password The password that is used to log on to the database. String Yes N/A.
url The URL of the database. String Yes The virtual private cloud (VPC) endpoint of the database. This also indicates the internal endpoint. For more information, see View and change the internal and public endpoints and port numbers of an ApsaraDB RDS for MySQL instance. Specify the value in the jdbc:mysql://<Internal endpoint>/<databaseName> format. Replace databaseName with the name of your database.
Note If you want to write multiple rows of data to the result table at the same time, you must add ?rewriteBatchedStatements=true to the URL to improve system performance.
maxRetryTimes The maximum number of retries to write data to the table after data initially fails to be written. Integer No The default value of this parameter varies based on the VVR version of Flink:
  • If the VVR version is 4.0.7 or later, the default value is 10.
  • If the VVR version ranges from 4.0.0 to 4.0.6, the default value is 3.
  • If the VVR version is 3.X or earlier, the default value is 3.
Note We recommend that you use VVR 4.0.10 or a later patch version, or 4.1.1 or a later patch version.
batchSize The number of data records that can be written at a time. Integer No The default value of this parameter varies based on the VVR version of Flink:
  • If the VVR version is 4.0.7 or later, the default value is 4096.
  • If the VVR version ranges from 4.0.0 to 4.0.6, the default value is 5000.
  • If the VVR version is 3.X or earlier, the default value is 100.
Note We recommend that you use VVR 4.0.10 or a later patch version, or 4.1.1 or a later patch version.
bufferSize The number of data records that are cached in the memory. Write operations are triggered if the value of the batchSize or bufferSize parameter reaches the specified threshold. Integer No Default value: 10000.
Note
  • Only Flink that uses VVR 4.0.7 or later supports this parameter.
  • This parameter takes effect only after you specify the primary key.
flushIntervalMs The interval at which the cache is cleared. This value indicates that if the number of input data records does not reach the value specified by the batchSize parameter within the specified time, all cached data is written to the result table. Integer No Unit: milliseconds. The default value of this parameter varies based on the VVR version of Flink:
  • If the VVR version is 4.0.7 or later, the default value is 2000.
  • If the VVR version ranges from 4.0.0 to 4.0.6, the default value is 0.
  • If the VVR version is 3.X or earlier, the default value is 1000.
Note We recommend that you use VVR 4.0.10 or a later patch version, or 4.1.1 or a later patch version.
ignoreDelete Specifies whether to skip delete operations. Boolean No Valid values:
  • true: Delete operations are skipped.
  • false: Delete operations are not skipped. This is the default value.
Note Only Flink that uses VVR 4.0.7 or later supports this parameter.
connectionMaxActive The size of the database connection pool. Integer No Default value: 40.
Note
  • Only Flink that uses VVR 4.0.7 or later supports this parameter.
  • If access to a database connection pool times out, the number of database connection pools is insufficient. You can increase the number of database connection pools.

Data type mappings

Data type of ApsaraDB RDS for MySQL Data type of Flink
BOOLEAN BOOLEAN
TINYINT TINYINT
SMALLINT SMALLINT
TINYINT UNSIGNED SMALLINT
INT INT
SMALLINT UNSIGNED INT
BIGINT BIGINT
INT UNSIGNED BIGINT
BIGINT UNSIGNED DECIMAL(20,0)
FLOAT FLOAT
DECIMAL DECIMAL
DOUBLE DOUBLE
DATE DATE
TIME TIME
TIMESTAMP TIMESTAMP
VARCHAR VARCHAR
VARBINARY VARBINARY

Sample code

CREATE TEMPORARY TABLE datagen_source (
  `name` VARCHAR,
  `age` INT
) WITH (
  'connector' = 'datagen'
);

CREATE TEMPORARY TABLE rds_sink (
  `name` VARCHAR,
  `age` INT
) WITH (
  'connector' = 'rds',
  'password' = '<yourPassword>',
  'tableName' = '<yourTablename>',
  'url' = '<yourUrl>',
  'userName' = '<yourUsername>'
);

INSERT INTO rds_sink
SELECT  * FROM datagen_source;