This topic describes how to create an ApsaraDB RDS for MySQL result table. It also describes the data definition language (DDL) statements, parameters in the WITH clause, and field type mappings used when you create an ApsaraDB RDS for MySQL result table.
DDL syntax
CREATE TABLE rds_sink (
id INT,
num BIGINT,
PRIMARY KEY (id) NOT ENFORCED
) WITH (
'connector' = 'rds',
'password' = '<yourPassword>',
'tableName' = '<yourTablename>',
'url' = '<yourUrl>',
'userName' = '<yourUsername>'
);
- Flink converts each row of output data to a line of SQL statement and then writes
and executes the statements in the destination ApsaraDB RDS or Distributed Relational
Database Service (DRDS) database. If you want to write multiple rows of output data
at a time, you must add
? rewriteBatchedStatements=true
to the end of the URL. This improves system performance. - You can define an auto-increment primary key for the ApsaraDB RDS for MySQL database that stores the result table. If you want to use the auto-increment primary key, you do not need to declare the auto-increment field in the DDL statement. For example, if you use ID as an auto-increment field, do not declare the ID field in the DDL statement. When a row of output data is written to the ApsaraDB RDS for MySQL database, the value for the auto-increment field is automatically filled.
- If a DRDS result table is partitioned, the partition key must be declared in PRIMARY KEY() of the DDL statement. Otherwise, you cannot write data to the partitioned table.
- You must declare at least one non-primary key in the DDL statement. Otherwise, an error is returned.
Parameters in the WITH clause
Parameter | Description | Required | Remarks |
---|---|---|---|
connector | The type of the result table. | Yes | Set the value to rds .
|
password | The password that is used to access the database. | Yes | None. |
tableName | The name of the table. | Yes | None. |
url | The URL of the ApsaraDB RDS for MySQL database in a VPC. | Yes | For more information, see View and change the internal and public endpoints and port numbers of an ApsaraDB RDS for MySQL instance.
Note If you want to write multiple rows of output data at a time, you must add
? rewriteBatchedStatements=true to the end of the URL. This improves system performance when you write multiple rows
of output data at a time.
|
userName | The username that is used to access the database. | Yes | None. |
maxRetryTimes | The number of retries for writing data to the table after data writing fails. | No | Default value: 3. |
batchSize | The number of data records that can be written at a time. | No | Default value: 5000. |
flushIntervalMs | The time interval at which the cache is cleared. | No | Unit: milliseconds. Default value: 0. This value indicates that cache flushing is not enabled. If the value of this parameter is set to 2000, all cached data is automatically written to the result table when the number of input data records does not reach the value specified by the bufferSize parameter within 2,000 milliseconds. |
Field type mapping
Data type of ApsaraDB RDS for MySQL | Data type of Flink |
---|---|
BOOLEAN | BOOLEAN |
TINYINT | TINYINT |
SMALLINT | SMALLINT |
TINYINT UNSIGNED | SMALLINT |
INT | INT |
SMALLINT UNSIGNED | INT |
BIGINT | BIGINT |
INT UNSIGNED | BIGINT |
BIGINT UNSIGNED | DECIMAL(20,0) |
FLOAT | FLOAT |
DECIMAL | DECIMAL |
DOUBLE | DOUBLE |
DATE | DATE |
TIME | TIME |
TIMESTAMP | TIMESTAMP |
VARCHAR | VARCHAR |
VARBINARY | VARBINARY |
Sample code
CREATE TEMPORARY TABLE datagen_source (
`name` VARCHAR,
`age` INT
) WITH (
'connector' = 'datagen'
);
CREATE TEMPORARY TABLE rds_sink (
`name` VARCHAR,
`age` INT
)WITH (
'connector' = 'rds',
'password' = '<yourPassword>',
'tableName' = '<yourTablename>',
'url' = '<yourUrl>',
'userName' = '<yourUsername>'
);
INSERT INTO rds_sink
SELECT * FROM datagen_source;
FAQ
- Q: When output data is written to an ApsaraDB RDS for MySQL result table, is a new
data record inserted into the table or is the result table updated based on the primary
key value?
A: If a primary key is defined in the DDL statement, the following statement is executed to write the output data:
INSERT INTO tablename(field1,field2, field3, ...) VALUES(value1, value2, value3, ...) ON DUPLICATE KEY UPDATE field1=value1,field2=value2, field3=value3, ... ;
. If the primary key value in the output data already exists in the table, the matching record is updated. Otherwise, the output data is inserted as a new record. If no primary key is defined in the DDL statement, theINSERT INTO
statement is executed to insert the output data. - Q: How do I perform GROUP BY operations based on the unique index of an ApsaraDB RDS
for MySQL result table?
A:
- You must declare the unique index in the GROUP BY clause in your Flink job.
- An ApsaraDB RDS for MySQL table has only one auto-increment primary key, which cannot be declared as a primary key in a Flink job.