This topic provides the DDL syntax that is used to create an ApsaraDB RDS for MySQL result table, describes the parameters in the WITH clause, and provides data type mappings.
What is ApsaraDB RDS for MySQL?
ApsaraDB RDS for MySQL is developed based on a branch of MySQL and provides excellent performance. ApsaraDB RDS for MySQL is a tried and tested solution that handled the high-volume concurrent traffic during Double 11. ApsaraDB RDS for MySQL provides basic features such as whitelist configuration, backup and restoration, Transparent Data Encryption (TDE), data migration, and management of instances, accounts, and databases. For more information about ApsaraDB RDS for MySQL, see Overview of ApsaraDB RDS for MySQL.
Prerequisites
- An ApsaraDB RDS for MySQL database and an ApsaraDB RDS for MySQL table are created. For more information, see Create databases and accounts for an ApsaraDB RDS for MySQL instance.
- An IP address whitelist is configured for the ApsaraDB RDS for MySQL database. For more information, see Use a database client or the CLI to connect to an ApsaraDB RDS for MySQL instance.
Limits
Only Flink that uses Ververica Runtime (VVR) 2.0.0 or later supports ApsaraDB RDS for MySQL connectors.
DDL syntax
CREATE TABLE rds_sink (
id INT,
num BIGINT,
PRIMARY KEY (id) NOT ENFORCED
) WITH (
'connector' = 'rds',
'tableName' = '<yourTablename>',
'userName' = '<yourUsername>',
'password' = '<yourPassword>',
'url' = '<yourUrl>'
);
- Flink converts each row of output data into a line of SQL statement and then writes
and executes the statement in the destination ApsaraDB RDS or Distributed Relational
Database Service (DRDS) database. In this case, data is written to the result table.
If you want to write multiple rows of data to the result table at the same time, you
must add
?rewriteBatchedStatements=true
to the URL to improve system performance. - ApsaraDB RDS for MySQL databases support the auto-increment primary key. Therefore, you do not need to declare the auto-increment field in the DDL statement. For example, if you use ID as an auto-increment field, do not declare the ID field in the DDL statement of Flink. When a row of output data is written to an ApsaraDB RDS for MySQL database, the value for the auto-increment field is automatically filled.
- If a DRDS result table is partitioned, the shard key must be declared in PRIMARY KEY() of the DDL statement. Otherwise, you cannot write data to the partitioned table.
- You must declare at least one non-primary key in the DDL statement. Otherwise, an error is returned.
Parameters in the WITH clause
Parameter | Description | Type | Required | Remarks |
---|---|---|---|---|
connector | The type of the result table. | String | Yes | Set the value to rds .
|
tableName | The name of the table. | String | Yes | N/A. |
userName | The username that is used to log on to the database. | String | Yes | N/A. |
password | The password that is used to log on to the database. | String | Yes | N/A. |
url | The URL of the database. | String | Yes | The virtual private cloud (VPC) endpoint of the database. This also indicates the
internal endpoint. For more information, see View and change the internal and public endpoints and port numbers of an ApsaraDB RDS for MySQL instance. Specify the value in the jdbc:mysql://<Internal endpoint>/<databaseName> format. Replace databaseName with the name of your database.
Note If you want to write multiple rows of data to the result table at the same time, you
must add
?rewriteBatchedStatements=true to the URL to improve system performance.
|
maxRetryTimes | The maximum number of retries to write data to the table after data initially fails to be written. | Integer | No | The default value of this parameter varies based on the VVR version of Flink:
Note We recommend that you use VVR 4.0.10 or a later patch version, or 4.1.1 or a later
patch version.
|
batchSize | The number of data records that can be written at a time. | Integer | No | The default value of this parameter varies based on the VVR version of Flink:
Note We recommend that you use VVR 4.0.10 or a later patch version, or 4.1.1 or a later
patch version.
|
bufferSize | The number of data records that are cached in the memory. Write operations are triggered if the value of the batchSize or bufferSize parameter reaches the specified threshold. | Integer | No | Default value: 10000.
Note
|
flushIntervalMs | The interval at which the cache is cleared. This value indicates that if the number of input data records does not reach the value specified by the batchSize parameter within the specified time, all cached data is written to the result table. | Integer | No | Unit: milliseconds. The default value of this parameter varies based on the VVR version
of Flink:
Note We recommend that you use VVR 4.0.10 or a later patch version, or 4.1.1 or a later
patch version.
|
ignoreDelete | Specifies whether to skip delete operations. | Boolean | No | Valid values:
Note Only Flink that uses VVR 4.0.7 or later supports this parameter.
|
connectionMaxActive | The size of the database connection pool. | Integer | No | Default value: 40.
Note
|
Data type mappings
Data type of ApsaraDB RDS for MySQL | Data type of Flink |
---|---|
BOOLEAN | BOOLEAN |
TINYINT | TINYINT |
SMALLINT | SMALLINT |
TINYINT UNSIGNED | SMALLINT |
INT | INT |
SMALLINT UNSIGNED | INT |
BIGINT | BIGINT |
INT UNSIGNED | BIGINT |
BIGINT UNSIGNED | DECIMAL(20,0) |
FLOAT | FLOAT |
DECIMAL | DECIMAL |
DOUBLE | DOUBLE |
DATE | DATE |
TIME | TIME |
TIMESTAMP | TIMESTAMP |
VARCHAR | VARCHAR |
VARBINARY | VARBINARY |
Sample code
CREATE TEMPORARY TABLE datagen_source (
`name` VARCHAR,
`age` INT
) WITH (
'connector' = 'datagen'
);
CREATE TEMPORARY TABLE rds_sink (
`name` VARCHAR,
`age` INT
) WITH (
'connector' = 'rds',
'password' = '<yourPassword>',
'tableName' = '<yourTablename>',
'url' = '<yourUrl>',
'userName' = '<yourUsername>'
);
INSERT INTO rds_sink
SELECT * FROM datagen_source;