This topic describes how to create a MaxCompute dimension table in Realtime Compute for Apache Flink. This topic also describes the parameters in the WITH clause, cache parameters, and data type mappings used when you create a MaxCompute dimension table.
- Blink 2.1.1 and later versions support MaxCompute dimension tables.
- For more information about the query syntax of a dimension table, see JOIN statements for dimension tables.
- To use a MaxCompute dimension table, you must grant the read permissions to the account used to access MaxCompute.
DDL syntax
CREATE TABLE white_list (
id varchar,
name varchar,
age int,
PRIMARY KEY (id),
PERIOD FOR SYSTEM_TIME -- The identifier of a dimension table.
) WITH (
type = 'odps',
endPoint = '<YourEndPoint>',
project = '<YourProjectName>',
tableName = '<YourtableName>',
accessId = '<yourAccessKeyId>',
accessKey = '<yourAccessKeySecret>',
`partition` = 'ds=2018****',
cache = 'ALL'
);
- When you declare a dimension table, you must specify a primary key. The primary key of a MaxCompute dimension table must be unique. Duplicate primary keys are deleted.
- When you join a dimension table with another table, the ON condition must contain equality conditions that include all primary keys.
- partition is a keyword and must be commented with backticks ('), for example,
'partition'
. - If the dimension table is a partitioned table, Realtime Compute for Apache Flink does not write partition key columns to the DDL statement.
Parameters in the WITH clause
Parameter | Description | Required | Remarks |
---|---|---|---|
type | The type of the dimension table. | Yes | Set the value to odps .
|
endPoint | The endpoint of MaxCompute. | Yes | For more information, see Endpoints. |
tunnelEndpoint | The endpoint of MaxCompute Tunnel. | Yes | For more information, see Endpoints.
Note This parameter is required if MaxCompute is deployed in a virtual private cloud (VPC).
|
project | The name of the MaxCompute project. | Yes | N/A. |
tableName | The name of the table. | Yes | N/A. |
accessId | The AccessKey ID that is used to access MaxCompute. | Yes | N/A. |
accessKey | The AccessKey secret that is used to access MaxCompute. | Yes | N/A. |
partition | The name of a partition. | No |
|
maxRowCount | The maximum number of rows that Realtime Compute for Apache Flink can load from a table. | No | Default value: 100000.
Note If your data contains more than 100,000 rows, you must configure this parameter. We
recommend that you set this parameter to a greater value than the actual number of
rows to be loaded.
|
Cache parameters
Parameter | Description | Remarks |
---|---|---|
cache | The cache policy. | You must set the cache parameter to ALL for a MaxCompute dimension table and explicitly declare the setting in the DDL statement.
ALL: indicates that all data in the dimension table is cached. Before the system runs a job, the system loads all data in the dimension table to the cache. This way, the cache is searched for all subsequent queries in the dimension table. If the system does not find the data record in the cache, the join key does not exist. The system reloads all data in the cache after cache entries expire. If the amount of data in a remote table is small and a large number of missing keys exist, we recommend that you set this parameter to ALL. The source table and dimension table cannot be associated based on the ON clause. If you set this parameter to ALL, you must configure the cacheTTLMs and cacheReloadTimeBlackList parameters. Note
|
cacheSize | The maximum number of data records that can be cached. | You can configure the cacheSize parameter based on your business requirements. By default, data of 100,000 rows in a MaxCompute dimension table can be cached. |
cacheTTLMs | The cache timeout period. | Unit: milliseconds. If you set the cache parameter to ALL , the timeout period specifies the interval at which Realtime Compute for Apache Flink
refreshes the cache. The cache is not refreshed by default.
|
cacheReloadTimeBlackList | The periods of time during which cache is not refreshed. This parameter takes effect when the cache parameter is set to ALL. The cache is not refreshed during the periods of time that you specify for this parameter. This parameter is suitable for large-scale online promotional events such as Double 11. | This parameter is empty by default. The following example shows the format of the
values: 2017-10-24 14:00 -> 2017-10-24 15:00, 2017-11-10 23:30 -> 2017-11-11 08:00 . Use delimiters based on the following rules:
|
partitionedJoin | Specifies whether to cache full data of a dimension table in the memory of each concurrent job. | This parameter is optional. Default value: false. This value indicates that full data of the dimension table is cached in the memory of each concurrent job. If you set this parameter to true, partial data of the dimension table is cached in the memory of each concurrent task. |
Sample code
CREATE TABLE datahub_input1 (
id BIGINT,
name VARCHAR,
age BIGINT
) with (
type='datahub'
);
CREATE TABLE odps_dim (
name VARCHAR,
phoneNumber BIGINT,
PRIMARY KEY (name),
PERIOD FOR SYSTEM_TIME -- The identifier of a dimension table.
) with (
type = 'odps',
endPoint = '<yourEndpointName>',
project = '<yourProjectName>',
tableName = '<yourTableName>',
accessId = '<yourAccessId>',
accessKey = '<yourAccessPassword>',
`partition` = 'ds=20180905',-- For more information about dynamic or static partitions, see the description for the parameters in the WITH clause.
cache = 'ALL'
);
CREATE table result_infor(
id BIGINT,
phoneNumber BIGINT,
name VARCHAR
)with(
type='print'
);
INSERT INTO result_infor
SELECT
t.id,
w.phoneNumber,
t.name
FROM datahub_input1 as t
JOIN odps_dim FOR SYSTEM_TIME AS OF PROCTIME() as w --You must include this clause when you perform a JOIN operation on a dimension table.
ON t.name = w.name;
Data type mappings
Data type of MaxCompute | Data type of BLINK |
---|---|
TINYINT | TINYINT |
SMALLINT | SMALLINT |
INT | INT |
BIGINT | BIGINT |
FLOAT | FLOAT |
DOUBLE | DOUBLE |
BOOLEAN | BOOLEAN |
DATETIME | TIMESTAMP |
TIMESTAMP | TIMESTAMP |
VARCHAR | VARCHAR |
DECIMAL | DECIMAL |
BINARY | VARBINARY |
STRING | VARCHAR |
FAQ
- Q: What is the difference between
max_pt()
andmax_pt_with_done()
?max_pt()
indicates that the partition ranked first in alphabetical order among all partitions is loaded. If the values of the partition parameter are sorted in alphabetical order,max_pt_with_done()
returns the partition that ranks first in alphabetical order and ends with the.done
suffix.Partition ds=20190101 ds=20190101.done ds=20190102 ds=20190102.done ds=20190103 max_pt()
andmax_pt_with_done()
:`partition`='max_pt_with_done()'
returns theds=20190102
partition.`partition`='max_pt()'
returns theds=20190103
partition.
Note Only Blink 3.3.2 and later support'partition' = 'max_pt_with_done()'
. - Q: The failover message
RejectedExecutionException: Task java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTas
is reported when a job is running. What do I do?A: Dimension table joining in Blink 1.0 has some issues. We recommend that you upgrade the Blink version to 2.1.1 or later. If you still want to use Blink 1.0, you must suspend your job and then resume it. You can troubleshoot this issue based on the first error message in the failover history.
- Q: What do the endPoint and tunnelEndpoint parameters mean in the Alibaba Cloud public
cloud? What happens if the two parameters are incorrectly configured?
A: For more information about the endPoint and tunnelEndpoint parameters, see Endpoints. If the configuration of these two parameters is incorrect in a VPC, one of the following task exceptions may occur.
- If the endPoint parameter is incorrectly configured, the task stops at a progress of 91%.
- If the tunnelEndpoint parameter is incorrectly configured, the task fails.
- Q: What do I do if the error message "ErrorMessage=Authorization Failed [4019], You
have NO privilege'ODPS:***'" appears when a job is running?
A: This error occurs because the user identity information specified in the MaxCompute DDL statements cannot be used to access MaxCompute. Therefore, you must use an Alibaba Cloud account, a RAM user, or a RAM role to authenticate the user identity. For more information, see User authentication.
If you have any questions, submit a ticket and set the product name to MaxCompute.