This topic provides the DDL syntax that is used to create an AnalyticDB for MySQL V3.0 dimension table, describes the parameters in the WITH clause and cache parameters, and provides data type mappings.

What is AnalyticDB for MySQL?

AnalyticDB for MySQL is a cloud-native enterprise-class data warehousing service that integrates database and big data technologies. AnalyticDB for MySQL supports high-throughput real-time data addition, removal, and modification, low-latency real-time analysis, and complex extract, transform, load (ETL) operations. AnalyticDB for MySQL is compatible with upstream and downstream ecosystem tools and can be used to build enterprise-class report systems, data warehouses, and data service engines.

Prerequisites

  • An AnalyticDB for MySQL cluster and an AnalyticDB for MySQL table are created. For more information, see Create a cluster and CREATE TABLE.
  • A whitelist is configured for the AnalyticDB for MySQL cluster. For more information, see Configure a whitelist.

Limits

Only Flink that uses Ververica Runtime (VVR) 2.0.0 or later supports AnalyticDB for MySQL V3.0 connectors.

DDL syntax

CREATE TABLE adb30_dim (
  id1 INT,
  id2 VARCHAR
) WITH (
  'connector' = 'adb3.0',
  'password' = '<yourPassword>',
  'tableName' = '<yourTablename>',
  'url' = '<yourUrl>',
  'userName' = '<yourUsername>',
  'cache' = 'ALL',
  'cacheSize' = '500'
);

Parameters in the WITH clause

Parameter Description Required Remarks
connector The type of the dimension table. Yes Set the value to adb3.0.
password The password that is used to access the AnalyticDB for MySQL database. Yes N/A.
tableName The name of the table. Yes N/A.
url The URL of the database. Yes The virtual private cloud (VPC) endpoint of the AnalyticDB for MySQL V3.0 database.
username The username that is used to access the AnalyticDB for MySQL database. Yes N/A.
maxRetryTimes The maximum number of retries to write data to the table after data fails to be written. No The default value of this parameter varies based on the VVR version of Flink:
  • If the VVR version is 3.X, the default value is 3.
  • If the VVR version is 10.X, the default value is 10.
CACHE Cache parameters. These parameters are used to configure the cache policy, cache size, and cache timeout period. No For more information, see Cache parameters.

Cache parameters

Parameter Description Required Remarks
cache The cache policy. No Valid values:
  • None: indicates that data is not cached.
  • LRU: indicates that only the specified data in the dimension table is cached. Each time the system receives a data record, the system searches the cache. If the system does not find the record in the cache, the system searches for the data record in the physical dimension table.
  • ALL: indicates that all data in the dimension table is cached. This is the default value. Before the system runs a job, the system loads all data in the dimension table to the cache. This way, the cache can be searched for all subsequent queries in the dimension table. If the system does not find the data record in the cache, the join key does not exist. The system reloads all data in the cache after cache entries expire.

    If the amount of data in a remote table is small and a large number of missing keys exist, we recommend that you set this parameter to ALL. The source table and dimension table cannot be associated based on the ON clause.

Note
  • If you set the cache parameter to ALL, you must monitor the memory usage of the node to prevent out of memory (OOM) errors.
  • If you set the cache parameter to ALL, you must increase the memory of the node for joining tables because the system asynchronously loads data from the dimension table. The increased memory size is twice that of the remote table.
cacheSize The maximum number of rows of data that can be cached. No You must configure the cacheSize parameter when the cache parameter is set to LRU. Default value: 100000.
Note
  • If the cache parameter is set to None, you do not need to configure the cacheSize parameter. By default, the cacheSize parameter is empty.
  • If the cache parameter is set to ALL, you do not need to configure the cacheSize parameter. If the cache parameter is set to ALL, the system loads all data in the physical table to the cache. In this case, you do not need to configure the cacheSize parameter.
cacheTTLMs The cache timeout period. Unit: milliseconds. No You must configure the cacheTTLMs parameter when the cache parameter is set to LRU or ALL.
  • If the cache parameter is set to LRU, the cacheTTLMs parameter specifies the cache timeout period. Default value: Long.MAX_VALUE. The default value indicates that cache entries do not expire.
  • If the cache parameter is set to ALL, the cacheTTLMs parameter specifies the interval at which the system reloads the data in the physical table. Default value: Long.MAX_VALUE. The default value indicates that data in the physical table is not reloaded.
Note If the cache parameter is set to None, you do not need to configure the cacheTTLMs parameter. If the cache parameter is set to None, data is not cached. Therefore, you do not need to configure the cacheTTLMs parameter.
maxJoinRows The maximum number of results that are returned each time a data record in the primary table is queried and matched with data records in the dimension table. No Default value: 1024. If you can estimate that a data record in the primary table corresponds to a maximum of n data records in the dimension table, you can set the maxJoinRows to n to ensure efficient matching in Realtime Compute for Apache Flink.
Note When you join the primary table with a dimension table, the number of results returned after an input data record in the primary table is mapped to the data records in the dimension table is limited by this parameter.

Data type mappings

Data type of AnalyticDB for MySQL V3.0 Data type of Flink
BOOLEAN BOOLEAN
TINYINT INT
SMALLINT INT
INT INT
BIGINT BIGINT
DOUBLE DOUBLE
VARCHAR VARCHAR
DATETIME TIMESTAMP
DATE DATE

Sample code

CREATE TEMPORARY TABLE datagen_source(
  a INT,
  b VARCHAR,
  c STRING,
  `proctime` AS PROCTIME()
) with (
  'connector' = 'datagen'
);

CREATE TEMPORARY TABLE adb_dim (
  a INT, 
  b VARCHAR, 
  c VARCHAR
) with (
  'connector' = 'adb3.0',
  'password' = '<yourPassword>',
  'tableName' = '<yourTablename>',
  'url' = '<yourUrl>',
  'userName' = '<yourUsername>'
);

CREATE TEMPORARY TABLE blackhole_sink(
  a INT,
  b VARCHAR
) with (
  'connector' = 'blackhole'
);

insert into blackhole_sink select T.a,H.b
FROM datagen_source AS T JOIN adb_dim FOR SYSTEM_TIME AS OF T.proctime AS H ON T.a = H.a;