This topic provides the DDL syntax that is used to create a MySQL dimension table, describes the parameters in the WITH clause, and provides data type mappings.

What is a MySQL dimension table?

MySQL dimension tables support all databases that are compatible with the MySQL protocol. The databases include ApsaraDB RDS for MySQL, PolarDB for MySQL, and self-managed MySQL databases. ApsaraDB RDS for MySQL dimension tables are customized for ApsaraDB RDS for MySQL databases and support more parameters.

Prerequisites

Limits

Only Flink that uses Ververica Runtime (VVR) 4.0.11 or later supports MySQL connectors.

DDL syntax

CREATE TABLE mysql_dim (
   id INT,
   name STRING,
   PRIMARY KEY (id) NOT ENFORCED
) WITH (
   'connector' = 'mysql',
   'hostname' = '<yourHostname>',
   'port' = '3306',
   'username' = '<yourUsername>',
   'password' = '<yourPassword>',
   'database-name' = '<yourDatabaseName>',
   'table-name' = '<yourTableName>'
);

Parameters in the WITH clause

Parameter Description Data type Required Remarks
connector The type of the dimension table. String Yes Set the value to mysql.
hostname The IP address or hostname that is used to access the MySQL database. String Yes We recommend that you enter the IP address of a virtual private cloud (VPC).
Note If the MySQL database and fully managed Flink are not deployed in the same VPC, you must specify the public IP address.
username The username that is used to access the MySQL database. String Yes N/A.
password The password that is used to access the MySQL database. String Yes N/A.
database-name The name of the MySQL database. String Yes N/A.
table-name The name of the MySQL table. String Yes N/A.
port The port number that is used to access the MySQL database. Integer No Default value: 3306.
lookup.cache.strategy The cache policy. String No Valid values:
  • None: indicates that data is not cached. This is the default value.
  • LRU: indicates that only the specified data in the dimension table is cached. The system searches for data in the cache each time a data record is read from the source table. If the data is not found, the system searches for the data in the physical dimension table.
  • ALL: indicates that all the data in the dimension table is cached. Before the system runs a job, the system loads all data in the dimension table to the cache. This way, the cache is searched for all subsequent queries in the dimension table. If the system does not find the data record in the cache, the join key does not exist. The system reloads all data in the cache after cache entries expire.

    If the amount of data in a remote table is small and a large number of missing keys exist, we recommend that you set this parameter to ALL. The source table and dimension table cannot be associated based on the ON clause.

Note
  • If you set the lookup.cache.strategy parameter to LRU, you must configure the lookup.cache.max-rows parameter.
  • If you set the lookup.cache.strategy parameter to ALL, you must monitor the memory usage of the node to prevent out of memory (OOM) errors.
  • If you set the lookup.cache.strategy parameter to ALL, you must increase the memory of the node for joining tables because the system asynchronously loads data from the dimension table. The increased memory size is twice that of the remote table.
lookup.cache.max-rows The maximum number of data records that can be cached. Integer No
  • You must configure this parameter if the lookup.cache.strategy parameter is set to LRU.
  • You do not need to configure this parameter if the lookup.cache.strategy parameter is set to ALL.
lookup.cache.ttl The cache timeout period. Duration No The setting of the lookup.cache.ttl parameter is related to the lookup.cache.strategy parameter.
  • If you set the lookup.cache.strategy parameter to None, you do not need to configure the lookup.cache.ttl parameter. This indicates that cache entries do not expire.
  • If you set the lookup.cache.strategy parameter to LRU, the lookup.cache.ttl parameter indicates the cache timeout period. By default, cache entries do not expire.
  • If you set the lookup.cache.strategy parameter to ALL, the lookup.cache.ttl parameter indicates the cache refresh period. By default, the cache is not refreshed.

Data type mappings

Data type of MySQL Data type of Flink
TINYINT TINYINT
SMALLINT SMALLINT
TINYINT UNSIGNED
INT INT
MEDIUMINT
SMALLINT UNSIGNED
BIGINT BIGINT
INT UNSIGNED
BIGINT UNSIGNED DECIMAL(20, 0)
BIGINT BIGINT
FLOAT FLOAT
DOUBLE DOUBLE
DOUBLE PRECISION
NUMERIC(p, s) DECIMAL(p, s)
Note The value of p is less than or equal to 38.
DECIMAL(p, s)
BOOLEAN BOOLEAN
TINYINT(1)
DATE DATE
TIME [(p)] TIME [(p)] [WITHOUT TIMEZONE]
DATETIME [(p)] TIMESTAMP [(p)] [WITHOUT TIMEZONE]
TIMESTAMP [(p)]
CHAR(n) CHAR(n)
VARCHAR(n) VARCHAR(n)
BIT(n) BINARY(⌈n/8⌉)
BINARY(n) BINARY(n)
VARBINARY(N) VARBINARY(N)
TINYTEXT STRING
TEXT
MEDIUMTEXT
LONGTEXT
TINYBLOB BYTES
Note Flink supports only MySQL binary large object (BLOB) records that are less than or equal to 2,147,483,647(2^31 - 1).
BLOB
MEDIUMBLOB
LONGBLOB

Sample code

CREATE TEMPORARY TABLE datagen_source(
  a INT,
  b BIGINT,
  c STRING,
  `proctime` AS PROCTIME()
) WITH (
  'connector' = 'datagen'
);

CREATE TEMPORARY TABLE mysql_dim (
  a INT,
  b VARCHAR,
  c VARCHAR
) WITH (
  'connector' = 'mysql',
  'hostname' = '<yourHostname>',
  'port' = '3306',
  'username' = '<yourUsername>',
  'password' = '<yourPassword>',
  'database-name' = '<yourDatabaseName>',
  'table-name' = '<yourTableName>'
);

CREATE TEMPORARY TABLE blackhole_sink(
  a INT,
  b STRING
) WITH (
  'connector' = 'blackhole'
);

INSERT INTO blackhole_sink
SELECT T.a, H.b
FROM datagen_source AS T JOIN mysql_dim FOR SYSTEM_TIME AS OF T.`proctime` AS H ON T.a = H.a;