This topic describes how to create an Elasticsearch dimension table in Realtime Compute for Apache Flink. It also describes the parameters in the WITH and CACHE clauses used when you create such a dimension table.

Notice This topic applies only to Blink 3.2.2 and later.

DDL syntax

In Realtime Compute for Apache Flink, you can use an Elasticsearch table as a dimension table. The following code shows an example:
 CREATE TABLE es_stream_sink(
  field1 LONG, 
  field2 VARBINARY, 
  field3 VARCHAR,
  PRIMARY KEY(field1),
  PERIOD FOR SYSTEM_TIME
) WIHT (
  type ='elasticsearch',
  endPoint = '<yourEndPoint>',
  accessId = '<yourAccessId>',
  accessKey = '<yourAccessSecret>',
  index = '<yourIndex>',
  typeName = '<yourTypeName>'
);
Note Elasticsearch supports data update based on the PRIMARY KEY field. You can specify only one field for the PRIMARY KEY field.

Parameters in the WITH clause

Parameter Description Default value Required
type The type of the dimension table. elasticsearch Yes
endPoint The endpoint of an Elasticsearch server, for example, http://127.0.0.1:9211. None Yes
accessId The AccessKey ID that is used to access an Elasticsearch instance. None Yes
accessKey The AccessKey secret that is used to access an Elasticsearch instance. None Yes
index The index name, which is similar to the database name. None Yes
typeName The type name, which is similar to the database table name. None Yes
maxRetryTimes The maximum number of retries for writing data into the table. 30 No
timeout The read timeout period. Unit: milliseconds. 600000 No
discovery Specifies whether node discovery is enabled. If it is enabled, the client refreshes the server list every five minutes. false No
compression Specifies whether to compress request bodies in the GZIP format. true No
multiThread Specifies whether to enable multithreading for JestClient. true No

Parameters in the CACHE clause

Parameter Description Remarks
cache The cache policy.
  • None (default value): indicates that no data is cached.
  • LRU: indicates that partial data in the dimension table is cached. The system searches the cache each time it receives a data record. If the system does not find the record in the cache, it searches for the data record in the physical dimension table.

    If this cache policy is used, you must configure the cacheSize and cacheTTLMs parameters.

  • ALL: indicates that all data in the dimension table is cached. Before a Realtime Compute for Apache Flink job starts to run, Realtime Compute for Apache Flink loads all data in the dimension table to the cache, and then searches the cache for all subsequent queries in the dimension table. If the system does not find the data record in the cache, the join key does not exist. The system reloads all data in the cache after cache entries expire.
cacheSize The cache size. Default value: 10000. The maximum number of data records that can be cached. You can specify this parameter only if you set the cache parameter to LRU.
cacheTTLMs The cache refresh interval. The cache does not time out by default. The purpose of setting this parameter varies based on the cache policy.
  • If the cache parameter is set to LRU, the cacheTTLMs parameter specifies the timeout period of the cache.
  • If the cache parameter is set to ALL, the cacheTTLMs parameter specifies the interval at which the cache is loaded. The cache is not reloaded by default.