When you join two dimension tables, you can configure a cache policy to improve the job throughput.
Background information
- If the cache parameter is set to LRU, some data in the dimension table is cached.
The system creates a local LRU cache for each join node. Realtime Compute for Apache
Flink searches for data in the cache each time it reads a data record in the source
table. The data that meets the requirement is returned. This reduces I/O requests.
If no data in the cache meets the requirement, Realtime Compute for Apache Flink searches
the dimension table. The data that meets the requirement is stored in the cache for
subsequent queries.
To limit the volume of data that can be stored in the cache, you can set cacheSize. To regularly update the data of a dimension table, you can set cacheTTLMs to adjust the cache expiration time. cacheTTLMs takes effect for all cached data records. If a cached data record is not accessed within the specified period of time, it is removed from the cache.
- If the cache parameter is set to All, all data in the dimension table is cached. The
system creates an asynchronous thread for the join node to synchronize data between
the cache and dimension table. The input data is blocked from the moment a job is
started to the moment loading data to the cache is completed. This ensures that the
data in the dimension table is loaded to the cache before input data processing starts.
Realtime Compute for Apache Flink searches the cache in subsequent dimension table queries. If the data that meet the requirement cannot be found in the cache, the join key does not exist. If data in the cache expires, Realtime Compute for Apache Flink reloads the data in the dimension table to the cache. The reloading process does not affect the join operation of dimension tables. The reloaded data is stored in the temporary memory. The atomic substitution operation is performed after all data in the dimension table is reloaded.
If cache is set to ALL, the join operation of dimension tables can achieve excellent performance because few I/O requests are initiated. However, the memory must be large enough to store the data of two dimension tables.
Optimization method
Parameter | Description | Required | Remarks |
---|---|---|---|
cache | The cache policy. | No |
|
cacheSize | The cache size. Unit: rows. | No | You can set this parameter only after you set the cache parameter to LRU. Default value: 10000. |
cacheTTLMs | The cache expiration period or the cache reloading interval. Unit: milliseconds. | No |
|
cacheReloadTimeBlackList | The periods during which cache reloading is not allowed. This parameter takes effect if the cache parameter is set to ALL. During the periods specified by this parameter, the cache is not reloaded (for example, in Double 11). | No | Optional. This parameter is empty by default. Example: '2017-10-24 14:00 -> 2017-10-24 15:00, 2017-11-10 23:30 -> 2017-11-11 08:00'.
|
partitionedJoin | Specifies whether to enable Partitioned All Cache. | No | By default, this parameter is set to false, which indicates that Partitioned All Cache
is disabled. If Partitioned All Cache is enabled, data is shuffled before the source
table is associated with the dimension table based on join keys.
|
Sample code
CREATE TABLE white_list (
id varchar,
name varchar,
age int,
PRIMARY KEY (id)
) with (
type = 'odps',
endPoint = 'your_end_point_name',
project = 'your_project_name',
tableName = 'your_table_name',
accessId = 'your_access_id',
accessKey = 'your_access_key',
`partition` = 'ds=20180905',
cache = 'ALL'
);