This topic provides the DDL syntax that is used to create a MySQL result table, describes the parameters in the WITH clause, and provides data type mappings.

What is a MySQL result table?

MySQL result tables support all databases that are compatible with the MySQL protocol. The databases include ApsaraDB RDS for MySQL, PolarDB for MySQL, and self-managed MySQL databases. ApsaraDB RDS for MySQL result tables are customized for ApsaraDB RDS for MySQL databases and support more parameters.

Prerequisites

Limits

Only Flink that uses Ververica Runtime (VVR) 4.0.11 or later supports MySQL connectors.

DDL syntax

CREATE TABLE mysql_sink (
  id INT,
  num BIGINT,
  PRIMARY KEY (id) NOT ENFORCED
) WITH (
  'connector' = 'mysql',
  'hostname' = '<yourHostname>',
  'port' = '3306',
  'username' = '<yourUsername>',
  'password' = '<yourPassword>',
  'database-name' = '<yourDatabaseName>',
  'table-name' = '<yourTableName>'
);
Note
  • ApsaraDB RDS for MySQL supports the auto-increment primary key. Therefore, you do not need to declare the auto-increment field in the DDL statement. For example, if you use ID as an auto-increment field, you do not need to declare the ID field in the DDL statement. When a row of output data is written to the ApsaraDB RDS for MySQL database, a value is automatically filled for the auto-increment field.
  • You must declare at least one non-primary key in the DDL statement. Otherwise, an error is returned.

Parameters in the WITH clause

Parameter Description Data type Required Remarks
connector The type of the result table. String Yes Set the value to mysql.
hostname The IP address or hostname that is used to access the MySQL database. String Yes We recommend that you enter the IP address of a virtual private cloud (VPC).
Note If the MySQL database and fully managed Flink are not deployed in the same VPC, you must specify the public IP address.
username The username that is used to access the MySQL database. String Yes N/A.
password The password that is used to access the MySQL database. String Yes N/A.
database-name The name of the MySQL database. String Yes N/A.
table-name The name of the MySQL table. String Yes N/A.
port The port number that is used to access the MySQL database. Integer No Default value: 3306.
sink.max-retries The maximum number of retries that are allowed to write data to the table if a data writing attempt fails. Integer No Default value: 3.
sink.buffer-flush.max-rows The maximum number of data records that can be cached in the memory. Integer No Default value: 100.
sink.buffer-flush.interval The interval at which the cache is cleared. This value indicates that if the number of cached data records does not reach the upper limit in a specified period of time, all cached data is written to the result table. Duration No Default value: 1. Unit: seconds.

Data type mappings

Data type of MySQL Data type of Flink
TINYINT TINYINT
SMALLINT SMALLINT
TINYINT UNSIGNED
INT INT
MEDIUMINT
SMALLINT UNSIGNED
BIGINT BIGINT
INT UNSIGNED
BIGINT UNSIGNED DECIMAL(20, 0)
BIGINT BIGINT
FLOAT FLOAT
DOUBLE DOUBLE
DOUBLE PRECISION
NUMERIC(p, s) DECIMAL(p, s)
Note The value of p is less than or equal to 38.
DECIMAL(p, s)
BOOLEAN BOOLEAN
TINYINT(1)
DATE DATE
TIME [(p)] TIME [(p)] [WITHOUT TIMEZONE]
DATETIME [(p)] TIMESTAMP [(p)] [WITHOUT TIMEZONE]
TIMESTAMP [(p)]
CHAR(n) CHAR(n)
VARCHAR(n) VARCHAR(n)
BIT(n) BINARY(⌈n/8⌉)
BINARY(n) BINARY(n)
VARBINARY(N) VARBINARY(N)
TINYTEXT STRING
TEXT
MEDIUMTEXT
LONGTEXT
TINYBLOB BYTES
Note Flink supports only MySQL binary large object (BLOB) records that are less than or equal to 2,147,483,647(2^31 - 1).
BLOB
MEDIUMBLOB
LONGBLOB

Sample code

CREATE TEMPORARY TABLE datagen_source (
  `name` VARCHAR,
  `age` INT
) WITH (
  'connector' = 'datagen'
);

CREATE TEMPORARY TABLE mysql_sink (
  `name` VARCHAR,
  `age` INT
) WITH (
  'connector' = 'mysql',
  'hostname' = '<yourHostname>',
  'port' = '3306',
  'username' = '<yourUsername>',
  'password' = '<yourPassword>',
  'database-name' = '<yourDatabaseName>',
  'table-name' = '<yourTableName>'
);

INSERT INTO mysql_sink
SELECT  * FROM datagen_source;