This topic provides the DDL syntax that is used to create a MySQL dimension table, describes the parameters in the WITH clause, and provides data type mappings.
What is a MySQL dimension table?
MySQL dimension tables support all databases that are compatible with the MySQL protocol. The databases include ApsaraDB RDS for MySQL, PolarDB for MySQL, and self-managed MySQL databases. ApsaraDB RDS for MySQL dimension tables are customized for ApsaraDB RDS for MySQL databases and support more parameters.
Prerequisites
- A MySQL database and a MySQL table are created. For more information, see Create a database and account for an ApsaraDB RDS for MySQL instance, Create a database and account for a PolarDB for MySQL instance, or Create a database and account for a self-managed MySQL instance.
- An IP address whitelist is configured. For more information, see Configure an ApsaraDB RDS for MySQL whitelist, Configure a PolarDB for MySQL whitelist, or Configure a self-managed MySQL whitelist.
Limits
Only Flink that uses Ververica Runtime (VVR) 4.0.11 or later supports MySQL connectors.
DDL syntax
CREATE TABLE mysql_dim (
id INT,
name STRING,
PRIMARY KEY (id) NOT ENFORCED
) WITH (
'connector' = 'mysql',
'hostname' = '<yourHostname>',
'port' = '3306',
'username' = '<yourUsername>',
'password' = '<yourPassword>',
'database-name' = '<yourDatabaseName>',
'table-name' = '<yourTableName>'
);
Parameters in the WITH clause
Parameter | Description | Data type | Required | Remarks |
---|---|---|---|---|
connector | The type of the dimension table. | String | Yes | Set the value to mysql .
|
hostname | The IP address or hostname that is used to access the MySQL database. | String | Yes | We recommend that you enter the IP address of a virtual private cloud (VPC).
Note If the MySQL database and fully managed Flink are not deployed in the same VPC, you
must specify the public IP address.
|
username | The username that is used to access the MySQL database. | String | Yes | N/A. |
password | The password that is used to access the MySQL database. | String | Yes | N/A. |
database-name | The name of the MySQL database. | String | Yes | N/A. |
table-name | The name of the MySQL table. | String | Yes | N/A. |
port | The port number that is used to access the MySQL database. | Integer | No | Default value: 3306. |
lookup.cache.strategy | The cache policy. | String | No | Valid values:
Note
|
lookup.cache.max-rows | The maximum number of data records that can be cached. | Integer | No |
|
lookup.cache.ttl | The cache timeout period. | Duration | No | The setting of the lookup.cache.ttl parameter is related to the lookup.cache.strategy parameter.
|
Data type mappings
Data type of MySQL | Data type of Flink |
---|---|
TINYINT | TINYINT |
SMALLINT | SMALLINT |
TINYINT UNSIGNED | |
INT | INT |
MEDIUMINT | |
SMALLINT UNSIGNED | |
BIGINT | BIGINT |
INT UNSIGNED | |
BIGINT UNSIGNED | DECIMAL(20, 0) |
BIGINT | BIGINT |
FLOAT | FLOAT |
DOUBLE | DOUBLE |
DOUBLE PRECISION | |
NUMERIC(p, s) | DECIMAL(p, s)
Note The value of p is less than or equal to 38.
|
DECIMAL(p, s) | |
BOOLEAN | BOOLEAN |
TINYINT(1) | |
DATE | DATE |
TIME [(p)] | TIME [(p)] [WITHOUT TIMEZONE] |
DATETIME [(p)] | TIMESTAMP [(p)] [WITHOUT TIMEZONE] |
TIMESTAMP [(p)] | |
CHAR(n) | CHAR(n) |
VARCHAR(n) | VARCHAR(n) |
BIT(n) | BINARY(⌈n/8⌉) |
BINARY(n) | BINARY(n) |
VARBINARY(N) | VARBINARY(N) |
TINYTEXT | STRING |
TEXT | |
MEDIUMTEXT | |
LONGTEXT | |
TINYBLOB | BYTES
Note Flink supports only MySQL binary large object (BLOB) records that are less than or
equal to 2,147,483,647(2^31 - 1).
|
BLOB | |
MEDIUMBLOB | |
LONGBLOB |
Sample code
CREATE TEMPORARY TABLE datagen_source(
a INT,
b BIGINT,
c STRING,
`proctime` AS PROCTIME()
) WITH (
'connector' = 'datagen'
);
CREATE TEMPORARY TABLE mysql_dim (
a INT,
b VARCHAR,
c VARCHAR
) WITH (
'connector' = 'mysql',
'hostname' = '<yourHostname>',
'port' = '3306',
'username' = '<yourUsername>',
'password' = '<yourPassword>',
'database-name' = '<yourDatabaseName>',
'table-name' = '<yourTableName>'
);
CREATE TEMPORARY TABLE blackhole_sink(
a INT,
b STRING
) WITH (
'connector' = 'blackhole'
);
INSERT INTO blackhole_sink
SELECT T.a, H.b
FROM datagen_source AS T JOIN mysql_dim FOR SYSTEM_TIME AS OF T.`proctime` AS H ON T.a = H.a;