This topic describes how to create an AnalyticDB for MySQL V3.0 dimension table. This topic also describes the parameters in the WITH and CACHE clauses used when you create an AnalyticDB for MySQL V3.0 dimension table.

Notice This topic applies only to Blink-3.5.0-hotfix and later.

Syntax

CREATE TABLE dim_ads(
    `name` VARCHAR,
    id VARCHAR,
    PRIMARY KEY (`name`),
    PERIOD FOR SYSTEM_TIME
)with(
    type='ADB30',
    url='jdbc:mysql://<Internal endpoint>/<databaseName>',
    tableName='xxx',
    userName='xxx',
    password='xxx'
);
Note
  • You must specify a primary key when you declare a dimension table.
  • When you join a dimension table with another table, the ON clause must contain the equivalent (=) conditions for all the primary key fields.
  • You can define a primary key of AnalyticDB for MySQL as the primary key or the unique index column of the dimension table.

Parameters in the WITH clause

Parameter Description Required Remarks
type The type of the dimension table. Yes Set the value to ADB30.
url The URL of the AnalyticDB for MySQL database. Yes The URL of the AnalyticDB for MySQL database. Example: url='jdbc:mysql://databaseName****-cn-shenzhen-a.ads.aliyuncs.com:10014/databaseName'.
Note
tableName The name of the table. Yes N/A.
userName The username that is used to access the AnalyticDB for MySQL database. Yes N/A.
password The password that is used to access the AnalyticDB for MySQL database. Yes N/A.
maxRetryTimes The maximum number of retries for writing data to the table. No Default value: 3.

Parameters in the CACHE clause

Parameter Description Remarks
cache The cache policy. AnalyticDB for MySQL V3.0 supports the following cache policies:
  • None: indicates that no data is cached. This is the default value.
  • LRU: indicates that only the specified data in the dimension table is cached. The system searches the cache each time it receives a data record from the source table. If the system does not find the record in the cache, the system searches for the data record in the physical dimension table.

    If you use this cache policy, you must configure the cacheSize and cacheTTLMs parameters.

  • ALL: indicates that all the data in the dimension table is cached. Before Realtime Compute for Apache Flink runs a job, Realtime Compute for Apache Flink loads all the data in the dimension table to the cache and then searches the cache for all subsequent queries in the dimension table. If the system does not find the data record in the cache, the join key does not exist. The system reloads all data in the cache after cache entries expire.

    If the data amount of a remote table is small and a large number of missing keys exist, we recommend that you set this parameter to ALL. (The source table and dimension table cannot be associated based on the ON clause.)

    If you use this cache policy, you must configure the cacheSize and cacheTTLMs parameters.

Note
  • If you set the cache parameter to ALL, you must increase the memory space of the node where the dimension table is joined with another table. This is because Realtime Compute for Apache Flink asynchronously loads the data from the dimension table. The increased memory size is twice the size of the data in the remote table.
  • If a dimension table stores a large volume of data and the cache parameter is set to ALL, an out of memory (OOM) error may occur or a full garbage collection (GC) may be time-consuming. To address this issue, you can use the following methods:
    • If the cache parameter can be set to ALL for a dimension table, enable the partitionedJoin feature For a Blink version earlier than Blink 3.6.0, the full data of the dimension table is loaded for each concurrent job by default. For a Blink version later than Blink 3.6.0, the partitionedJoin feature is available if you set the cache parameter to ALL. After you enable the partitionedJoin feature, only the required data is cached for each concurrent job.
    • Use an ApsaraDB for HBase or ApsaraDB RDS dimension table that uses key-value pairs to store data.
cacheSize The cache size. This parameter is available only if you set the cache parameter to LRU. Default value: 10000. Unit: rows.
cacheTTLMs The interval at which Realtime Compute for Apache Flink refreshes the cache. The system reloads the latest data in the dimension table based on the value of this parameter. This ensures that the data in the source table is associated with the latest data in the dimension table. Unit: milliseconds. This parameter is empty by default. This indicates that the updates in the dimension table are not reloaded.
cacheReloadTimeBlackList The time periods during which the cache is not refreshed. This parameter takes effect when the cache parameter is set to ALL. The cache is not refreshed during the time periods that you specify for this parameter. This parameter is useful for large-scale online promotional events such as Double 11. This parameter is optional. It is empty by default. For example, you can specify this parameter as '2017-10-24 14:00 -> 2017-10-24 15:00, 2017-11-10 23:30 -> 2017-11-11 08:00'. Use the following delimiters to separate time periods:
  • Separate multiple time periods with commas (,).
  • Separate the start time and end time of each time period with a hyphen and a greater-than sign (->).
partitionedJoin Specifies whether to enable the partitionedJoin feature. If the partitionedJoin feature is enabled, shuffling is implemented based on JOIN keys before the source table is joined with the dimension table. This process provides the following benefits:
  • If you set the cache parameter to LRU, the cache hit rate increases.
  • If you set the cache parameter to ALL, memory resources are reduced because only the required data is cached for each concurrent job.
The default value of this parameter is false. This indicates that the partitionedJoin feature is disabled.
Note Before you enable the partitionedJoin feature, specify the following setting: partitionedJoin = 'true'.

Sample code

CREATE TABLE datahub_input1 (
  id      BIGINT,
  name    VARCHAR,
  age     BIGINT
) WITH (
  type='datahub'
);

create table phoneNumber (
  name VARCHAR,
  phoneNumber BIGINT,
  primary key(name),
  PERIOD FOR SYSTEM_TIME -- The identifier of a dimension table. 
) with (
  type='ADB30'
);

CREATE table result_infor (
  id BIGINT,
  phoneNumber BIGINT,
  name VARCHAR
) with (
  type='rds'
);

INSERT INTO result_infor
SELECT
  t.id,
  w.phoneNumber,
  t.name
FROM datahub_input1 as t
JOIN phoneNumber FOR SYSTEM_TIME AS OF PROCTIME() as w -- You must include this clause in the INSERT INTO statement if you are performing a JOIN operation on the dimension table. 
ON t.name = w.name;