This topic describes how to create a Phoenix5 dimension table in Realtime Compute for Apache Flink. It also describes the parameters in the WITH and CACHE clauses used when you create a Phoenix5 dimension table.

Notice Only Blink versions later than Blink 3.4.0 support Phoenix5 dimension tables.

Syntax

create table US_POPULATION_DIM (
  `STATE` varchar,
  CITY varchar,
  POPULATION BIGINT,
  PRIMARY KEY (`STATE`, CITY),
  PERIOD FOR SYSTEM_TIME
) WITH (
  type = 'PHOENIX5',
  serverUrl = '<YourServerUrl>',
  tableName = '<YourTableName>'
);           

Parameters in the WITH clause

Parameter Description Required Remarks
type The type of the dimension table. Yes Set the value to PHOENIX5.
serverUrl The URL of the Phoenix5 query server.
  • If Phoenix5 is created in a cluster, the value of this parameter is the URL of Server Load Balancer (SLB).
  • If Phoenix5 is created on a single server, the value of this parameter is the URL of the server.
Yes You must enable the HBase SQL service in an ApsaraDB for HBase instance.
The value of the serverUrl parameter is in the http://host:port format. In the format:
  • host: indicates the domain name of the Phoenix5 service.
  • port: indicates the port number of the Phoenix5 service. Set the value to 8765.
tableName The name of the Phoenix5 table. Yes The name of the Phoenix5 table is in the SchemaName.TableName format. In the format:
  • SchemaName: indicates the schema name, which can be empty. This means that the schema name is not used and only the table name is used. In this case, the default schema of the database is used.
  • TableName: indicates the name of the table.

Parameters in the CACHE clause

Parameter Description Required Remarks
cache The cache policy. No Valid values:
  • None: indicates that no data is cached. This is the default value.
  • LRU: indicates that only the specified data in the dimension table is cached. The system searches the cache each time it receives a data record. If the system does not find the record in the cache, it searches for the data record in the physical dimension table.

    If this cache policy is used, you must configure the cacheSize and cacheTTLMs parameters.

  • ALL: indicates that all the data in the dimension table is cached. Before Realtime Compute for Apache Flink runs a job, Realtime Compute for Apache Flink loads all the data in the dimension table to the cache and then searches the cache for all subsequent queries in the dimension table. If the system does not find the data record in the cache, the join key does not exist. The system reloads all data in the cache after cache entries expire.

    If the data amount of a remote table is small and a large number of missing keys exist, we recommend that you set this parameter to ALL. (The source table and dimension table cannot be associated based on the ON clause.)

    If you use this cache policy, you must configure the cacheSize and cacheTTLMs parameters.

Note If you set the cache parameter to ALL, you must increase the memory of the node for joining tables because Realtime Compute for Apache Flink asynchronously loads data from the dimension table. The increased memory size is twice that of the remote table.
cacheSize The maximum number of data records that can be cached. No This parameter is available only if you set the cache parameter to LRU. Default value: 10000. Unit: rows.
cacheTTLMs The cache timeout period. Unit: milliseconds. No If the cache parameter is set to LRU, the cacheTTLMs parameter specifies the time allowed before cache entries expire. Cache entries do not expire by default. If the cache parameter is set to ALL, the cacheTTLMs parameter specifies the interval at which the cache is loaded. The cache is not reloaded by default.
cacheReloadTimeBlackList The time periods during which the cache is not refreshed. This parameter takes effect when the cache parameter is set to ALL. The cache is not refreshed during the time periods that you specify for this parameter. This parameter is useful for large-scale online promotional events such as Double 11. No This parameter is optional. It is empty by default. For example, you can set this parameter to '2017-10-24 14:00 -> 2017-10-24 15:00, 2017-11-10 23:30 -> 2017-11-11 08:00'. Multiple time periods are separated by commas (,). The start time and end time of each time period are separated with a hyphen and a greater-than sign (->).

Sample code

CREATE TABLE datahub_input1 (
  id  BIGINT,
  name  VARCHAR,
  age   BIGINT
) WITH (
  type='datahub'
);

create table phoneNumber(
  name VARCHAR,
  phoneNumber BIGINT,
  primary key(name),
  PERIOD FOR SYSTEM_TIME--Define the change period of the dimension table. 
)with(
  type='PHOENIX5'
);

CREATE table result_infor(
  id BIGINT,
  phoneNumber BIGINT,
  name VARCHAR
)with(
  type='rds'
);

INSERT INTO result_infor
SELECT
  t.id,
  w.phoneNumber,
  t.name
FROM datahub_input1 as t
JOIN phoneNumber FOR SYSTEM_TIME AS OF PROCTIME() as w -- You must include this clause in the INSERT INTO statement if you are performing a JOIN operation on the dimension table. 
ON t.name = w.name;