This topic provides the DDL syntax that is used to create a MaxCompute dimension table, describes the parameters in the WITH and CACHE clauses, and provides data type mappings and answers to some frequently asked questions.

What is MaxCompute?

MaxCompute is a fast and fully managed computing platform for large-scale data warehousing. MaxCompute can process exabytes of data. It provides solutions for storing and computing mass structured data in data warehouses and provides analytics and modeling services. For more information about MaxCompute, see What is MaxCompute?.

Prerequisites

A MaxCompute table is created. For more information about how to create a MaxCompute table, see Create tables.

Limits

Only Flink that uses Ververica Runtime (VVR) 2.0.0 or later supports MaxCompute connectors.

Usage notes

If you use a MaxCompute connector of a version later than vvr-3.0.4-flink-1.12, a job failover may occur. Therefore, we recommend that you use a MaxCompute connector of vvr-3.0.4-flink-1.12 or earlier.

DDL syntax

create table odps_dim(
  id VARCHAR,
  name VARCHAR,
  age int,
  PRIMARY KEY (id, name) not enforced
) with (
  'connector' = 'odps', 
  'endpoint' = '<yourEndpoint>',
  'tunnelEndpoint' = '<yourTunnelEndpoint>',
  'project' = '<yourProjectName>',
  'tablename' = '<yourTableName>',
  'accessid' = '<yourAccessKeyId>',
  'accesskey' = '<yourAccessKeySecret>',
  'partition' = 'ds=2018****',
  'cache' = 'ALL'
);
Note
  • When you join a dimension table with another table, the ON condition must contain equality conditions that include all primary keys.
  • You are not allowed to write the partition key columns of a MaxCompute dimension table to a DDL statement.
  • The primary key fields must be placed before non-primary key fields and in the order of the fields in the parentheses of PRIMARY KEY.
  • When you declare a dimension table, you can define primary keys to create one-to-one data mappings between the source table and the dimension table. If you want to create one-to-many data mappings between the source table and the dimension table, you do not need to define primary keys.

Parameters in the WITH clause

Parameter Description Required Remarks
connector The type of the dimension table. Yes Set the value to odps.
endPoint The endpoint of MaxCompute. Yes For more information, see Endpoints.
tunnelEndpoint The endpoint of MaxCompute Tunnel. Yes For more information, see Endpoints.
Note This parameter is required if MaxCompute is deployed in a virtual private cloud (VPC).
project The name of the MaxCompute project. Yes None.
tableName The name of the table. Yes None.
accessId The AccessKey ID that is used to access the Tablestore instance. Yes None.
accessKey The AccessKey secret that is used to access the MaxCompute project. Yes None.
partition The name of a partition. No
  • Static partitions
    • A MaxCompute table that has only one level of partitions

      For example, if only one partition key column ds exists, `partition` = 'ds=20180905' indicates that data in the ds=20180905 partition is read.

    • A MaxCompute table that has multiple levels of partitions

      For example, if two partition key columns ds and hh exist, `partition`='ds=20180905,hh=*' indicates that data in the ds=20180905 partition is read.

      Note When you filter partitions, you must declare the values of all partitions. In the preceding example, if you declare only 'partition' = 'ds=20180905', no partition data is read.
  • Dynamic partitions
    • 'partition' = 'max_pt()' is supported. It indicates that the partition that ranks first in alphabetical order among all partitions is loaded.
    • `partition` = 'max_pt_with_done()' is supported. It indicates that the partition that ranks first in alphabetical order among all partitions and ends with the .done suffix is loaded.

Parameters in the CACHE clause

Parameter Description Remarks
cache The cache policy. You must set the cache parameter to ALL for a MaxCompute dimension table and explicitly declare the setting in the DDL statement.

ALL: indicates that all data in the dimension table is cached. Before the system runs a job, the system loads all data in the dimension table to the cache. This way, the cache is searched for all subsequent queries in the dimension table. If the system does not find the data record in the cache, the join key does not exist. The system reloads all data in the cache after cache entries expire.

If the amount of data in a remote table is small and a large number of missing keys exist, we recommend that you set this parameter to ALL. The source table and dimension table cannot be associated based on the ON clause. If you set this parameter to ALL, you must configure the cacheTTLMs and cacheReloadTimeBlackList parameters.

Note
  • If the cache parameter is set to ALL, you must increase the memory of the join node because the system asynchronously loads data of the dimension table. We recommend that you increase the size of the memory at least four times the amount of data in the remote table. The size of the memory is related to the MaxCompute storage compression algorithm.
  • If you use an ultra-large MaxCompute dimension table, frequent garbage collections that are triggered by allocation failures may cause job exceptions. To resolve this issue, you can increase the memory of the node where the dimension table is joined with another table. If the issue persists, we recommend that you convert the dimension table to a key-value dimension table that supports the least recently used (LRU) cache policy. For example, you can use an ApsaraDB for HBase dimension table as the key-value dimension table.
cacheSize The maximum number of data records that can be cached. You can configure the cacheSize parameter based on your business requirements. By default, data of 100,000 rows in a MaxCompute dimension table can be cached.

If a MaxCompute dimension table contains more than 100,000 rows of data, we recommend that you set the cacheSize parameter to a value that is greater than the number of rows in the dimension table. Otherwise, error Partition null table count ODPS tables row count exceeds maxRowCount limit {2} is returned.

cacheTTLMs The cache timeout period. Unit: milliseconds. If the cache parameter is set to ALL, the cacheTTLMs parameter specifies the interval at which the cache is refreshed. The cache is not refreshed by default.
cacheReloadTimeBlackList The periods of time during which cache is not refreshed. This parameter takes effect when the cache parameter is set to ALL. The cache is not refreshed during the time periods that you specify for this parameter. This parameter is useful for large-scale online promotional events such as Double 11. This parameter is empty by default. The following example shows the format of the values: 2017-10-24 14:00 -> 2017-10-24 15:00, 2017-11-10 23:30 -> 2017-11-11 08:00. Use delimiters based on the following rules:
  • Separate multiple time periods with commas (,).
  • Separate the start time and end time of each time period with a hyphen and a closing angle bracket (->).

Data type mapping

Data type of MaxCompute Data type of Flink
TINYINT TINYINT
SMALLINT SMALLINT
INT INT
BIGINT BIGINT
FLOAT FLOAT
DOUBLE DOUBLE
BOOLEAN BOOLEAN
DATETIME TIMESTAMP
TIMESTAMP TIMESTAMP
VARCHAR VARCHAR
STRING VARCHAR
DECIMAL DECIMAL
BINARY VARBINARY