This topic provides the DDL syntax that is used to create an ApsaraDB RDS for MySQL dimension table, describes the parameters in the WITH and CACHE clauses, and provides data type mappings and sample code.

What is ApsaraDB RDS for MySQL?

ApsaraDB RDS for MySQL is developed based on a branch of MySQL and provides excellent performance. ApsaraDB RDS for MySQL is a tried and tested solution that handled the high-volume concurrent traffic during Double 11. ApsaraDB RDS for MySQL provides basic features such as whitelist configuration, backup and restoration, Transparent Data Encryption (TDE), data migration, and management of instances, accounts, and databases. For more information about ApsaraDB RDS for MySQL, see Overview of ApsaraDB RDS for MySQL.

Prerequisites

Limits

Only Flink that uses Ververica Runtime (VVR) 2.0.0 or later supports ApsaraDB RDS for MySQL connectors.

DDL syntax

CREATE TABLE rds_dim (
  id1 INT,
  id2 VARCHAR
) WITH (
  'connector' = 'rds',
  'password' = '<yourPassword>',
  'tableName' = '<yourTablename>',
  'url' = '<yourUrl>',
  'userName' = '<yourUsername>',
  'cache' = 'ALL'
);

Parameters in the WITH clause

Parameter Description Required Remarks
connector The type of the dimension table. Yes Set the value to rds.
password The password that is used to access the ApsaraDB RDS for MySQL database. Yes N/A.
tableName The name of the table. Yes N/A.
url The URL of the ApsaraDB RDS for MySQL database. Yes The virtual private cloud (VPC) endpoint of the ApsaraDB RDS for MySQL database. The value is an internal endpoint. For more information, see View and change the internal and public endpoints and port numbers of an ApsaraDB RDS for MySQL instance. Set the value in the jdbc:mysql://<Internal endpoint>/<databaseName> format. Replace databaseName with the name of your database.
userName The username that is used to access the ApsaraDB RDS for MySQL database. Yes N/A.
maxRetryTimes The maximum number of retries to write data to the table after data writing fails. No Default value: 3.
CACHE The parameters in the CACHE clause. These parameters are used to configure the cache policy, cache size, and cache timeout period. No For more information, see Parameters in the CACHE clause.

Parameters in the CACHE clause

Parameter Description Required Remarks
cache The cache policy. No Valid values:
  • None: indicates that data is not cached. This is the default value.
    Note The default value of this parameter for VVR 4.0.6 and later is ALL.
  • LRU: indicates that only the specified data in the dimension table is cached. Each time the system receives a data record, the system searches the cache. If the system does not find the record in the cache, the system searches for the data record in the physical dimension table.
  • ALL: indicates that all the data in the dimension table is cached. Before the system runs a job, the system loads all data in the dimension table to the cache. This way, the cache is searched for all subsequent queries in the dimension table. If the system does not find the data record in the cache, the join key does not exist. The system reloads all data in the cache after cache entries expire.

    If the amount of data in a remote table is small and a large number of missing keys exist, we recommend that you set this parameter to ALL. The source table and dimension table cannot be associated based on the ON clause.

Note
  • You must configure the cacheSize parameter if the cache parameter is set to ALL or LRU.
  • If you set the cache parameter to ALL, you must monitor the memory usage of the node to prevent out of memory (OOM) errors.
  • If you set the cache parameter to ALL, you must increase the memory of the node for joining tables because the system asynchronously loads data from the dimension table. The increased memory size is twice that of the remote table.
cacheSize The maximum number of data records that can be cached. No
  • You must configure this parameter if the cache parameter is set to LRU.
  • You do not need to configure this parameter if the cache parameter is set to ALL.
cacheTTLMs The cache timeout period. Unit: milliseconds. No The setting of cacheTTLMs is related to cache.
  • If you set the cache parameter to None, the cacheTTLMs parameter can be left empty. This indicates that cache entries do not expire.
  • If you set the cache parameter to LRU, the cacheTTLMs parameter specifies the cache timeout period. By default, cache entries do not expire.
  • If you set the cache parameter to ALL, the cacheTTLMs parameter specifies the interval at which the system refreshes the cache. By default, the cache is not refreshed.
maxJoinRows The maximum number of results returned after each data record in the primary table is mapped to the data in the dimension table. No Default value: 1024. If you can estimate that each data record in the primary table is mapped to a maximum of n data records in the dimension table, you can configure maxJoinRows='n' to ensure efficient matching in Realtime Compute for Apache Flink.
Note When you join the primary table with a dimension table, the number of results returned after an input data record in the primary table is mapped to the data records in the dimension table is limited by this parameter.

Data type mappings

Data type of ApsaraDB RDS for MySQL Data type of Flink
BOOLEAN BOOLEAN
TINYINT TINYINT
SMALLINT SMALLINT
TINYINT UNSIGNED SMALLINT
INT INT
SMALLINT UNSIGNED INT
BIGINT BIGINT
INT UNSIGNED BIGINT
BIGINT UNSIGNED DECIMAL(20,0)
FLOAT FLOAT
DECIMAL DECIMAL
DOUBLE DOUBLE
DATE DATE
TIME TIME
TIMESTAMP TIMESTAMP
VARCHAR VARCHAR
VARBINARY VARBINARY

Sample code

CREATE TEMPORARY TABLE datagen_source(
  a INT,
  b BIGINT,
  c STRING,
  `proctime` AS PROCTIME()
) with (
  'connector' = 'datagen'
);

CREATE TEMPORARY TABLE rds_dim (
  a INT, 
  b VARCHAR, 
  c VARCHAR
) with (
  'connector' = 'rds',
  'password' = '<yourPassword>',
  'tableName' = '<yourTablename>',
  'url' = 'jdbc:mysql://xxx',
  'userName' = '<yourUsername>'
);

CREATE TEMPORARY TABLE blackhole_sink(
  a INT,
  b STRING
) with (
  'connector' = 'blackhole'
);

insert into blackhole_sink select T.a,H.b
FROM datagen_source AS T JOIN rds_dim FOR SYSTEM_TIME AS OF T.`proctime` AS H ON T.a = H.a;