This topic provides the DDL syntax that is used to create an ApsaraDB RDS for MySQL dimension table, describes the parameters in the WITH and CACHE clauses, and provides data type mappings and sample code.

What is ApsaraDB RDS for MySQL?

ApsaraDB RDS for MySQL is developed based on MySQL and provides high performance. ApsaraDB RDS for MySQL is a tried and tested solution that can handle high-volume concurrent traffic during Double 11. ApsaraDB RDS for MySQL provides basic features such as whitelist configuration, backup and restoration, Transparent Data Encryption (TDE), data migration, and management of instances, accounts, and databases. For more information about ApsaraDB RDS for MySQL, see Overview of ApsaraDB RDS for MySQL.

Prerequisites

Limits

Only Flink that uses Ververica Runtime (VVR) 2.0.0 or later versions supports ApsaraDB RDS for MySQL connectors.

DDL syntax

CREATE TABLE rds_dim (
  id1 INT,
  id2 VARCHAR
) WITH (
  'connector' = 'rds',
  'password' = '<yourPassword>',
  'tableName' = '<yourTablename>',
  'url' = '<yourUrl>',
  'userName' = '<yourUsername>',
  'cache' = 'ALL'
);

Parameters in the WITH clause

Parameter Description Required Remarks
connector The type of the dimension table. Yes Set the value to rds.
password The password that is used to access the ApsaraDB RDS for MySQL database. Yes N/A.
tableName The name of the table. Yes N/A.
url The URL of the ApsaraDB RDS for MySQL database. Yes The virtual private cloud (VPC) endpoint of the ApsaraDB RDS for MySQL database. The value is an internal endpoint. For more information, see View and change the internal and public endpoints and port numbers of an ApsaraDB RDS for MySQL instance. Specify the value in the jdbc:mysql://<Internal endpoint>/<databaseName> format. Replace databaseName with the name of your database.
userName The username that is used to access the ApsaraDB RDS for MySQL database. Yes N/A.
maxRetryTimes The maximum number of retries to write data to the table after data initially fails to be written. No Default value: 3.
CACHE The parameters in the CACHE clause. These parameters are used to configure the cache policy, cache size, and cache timeout period. No For more information, see Parameters in the CACHE clause.

Parameters in the CACHE clause

Parameter Description Required Remarks
cache The cache policy. No Valid values:
  • None: indicates that data is not cached. Default value: None.
  • LRU: indicates that only some data in the dimension table is cached. Each time the system receives a data record, the system searches the cache. If the system does not find the record in the cache, the system searches for the data record in the physical dimension table.
  • ALL: indicates that all the data in the dimension table is cached. Before a job runs, the system loads all data in the dimension table to the cache. This way, the cache is searched for all subsequent queries in the dimension table. If the system does not find the data record in the cache, the join key does not exist. The system reloads all data in the cache after cache entries expire.

    If the amount of data in a remote table is small and a large number of missing keys exist, we recommend that you set this parameter to ALL. The source table and dimension table cannot be associated based on the ON clause.

Note
  • You must configure the cacheSize parameter if the cache parameter is set to ALL or LRU.
  • If you set the cache parameter to ALL, you must monitor the memory usage of the node to prevent out of memory (OOM) errors.
  • If you set the cache parameter to ALL, you must increase the memory of the node for joining tables because the system asynchronously loads data from the dimension table. The increased memory size is twice the memory size of the remote table.
cacheSize The maximum number of data records that can be cached. No
  • You must configure this parameter if the cache parameter is set to LRU.
  • You do not need to configure this parameter if the cache parameter is set to ALL.
cacheTTLMs The cache timeout period. Unit: milliseconds. No The cacheTTLMs parameter applies configurations based on the value of the cache parameter.
  • If you set the cache parameter to None, the cacheTTLMs parameter can be left empty. This indicates that cache entries do not expire.
  • If you set the cache parameter to LRU, the cacheTTLMs parameter specifies the cache timeout period. By default, cache entries do not expire.
  • If you set the cache parameter to ALL, the cacheTTLMs parameter specifies the interval at which the system refreshes the cache. By default, the cache is not refreshed.

Data type mapping

Data type of ApsaraDB RDS for MySQL Data type of Flink
BOOLEAN BOOLEAN
TINYINT TINYINT
SMALLINT SMALLINT
TINYINT UNSIGNED SMALLINT
INT INT
SMALLINT UNSIGNED INT
BIGINT BIGINT
INT UNSIGNED BIGINT
BIGINT UNSIGNED DECIMAL(20,0)
FLOAT FLOAT
DECIMAL DECIMAL
DOUBLE DOUBLE
DATE DATE
TIME TIME
TIMESTAMP TIMESTAMP
VARCHAR VARCHAR
VARBINARY VARBINARY

Sample code

CREATE TEMPORARY TABLE datagen_source(
  a INT,
  b BIGINT,
  c STRING,
  `proctime` AS PROCTIME()
) with (
  'connector' = 'datagen'
);

CREATE TEMPORARY TABLE rds_dim (
  a INT, 
  b VARCHAR, 
  c VARCHAR
) with (
  'connector' = 'rds',
  'password' = '<yourPassword>',
  'tableName' = '<yourTablename>',
  'url' = 'jdbc:mysql://xxx',
  'userName' = '<yourUsername>'
);

CREATE TEMPORARY TABLE blackhole_sink(
  a INT,
  b STRING
) with (
  'connector' = 'blackhole'
);

insert into blackhole_sink select T.a,H.b
FROM datagen_source AS T JOIN rds_dim FOR SYSTEM_TIME AS OF T.`proctime` AS H ON T.a = H.a;