This topic provides the DDL syntax that is used to create an Elasticsearch dimension table, describes the parameters in the WITH and CACHE clauses, and provides sample code.

Note Elasticsearch connectors support the features that are provided by Alibaba Cloud Elasticsearch and do not support HTTPS-based access.

What is Alibaba Cloud Elasticsearch?

Alibaba Cloud Elasticsearch is compatible with the features that are provided by open source Elasticsearch and provides commercial features such as security, machine learning, graph, and application performance management (APM). Alibaba Cloud Elasticsearch is suitable for scenarios such as data analysis and data search. Alibaba Cloud Elasticsearch provides enterprise-level services, including access control, security monitoring and alerting, and automatic generation of reports.

Prerequisites

Limits

Only Flink that uses Ververica Runtime (VVR) 2.0.0 or later supports Elasticsearch connectors.

DDL syntax

CREATE TABLE es_dim(
  field1 LONG, --- If this field is used as a key to join a dimension table with another table, the value of this field must be of the STRING type. 
  field2 VARBINARY, 
  field3 VARCHAR
) WITH (
  'connector' ='elasticsearch',
  'endPoint' = '<yourEndPoint>',
  'accessId' = '<yourAccessId>',
  'accessKey' = '<yourAccessSecret>',
  'indexName' = '<yourIndexName>',
  'typeNames' = '<yourTypeName>'
);
Note
  • Only one key can be used to join a dimension table with another table. The key is the ID of a document in your Elasticsearch index.
  • Elasticsearch V5.5 and later are supported.

Parameters in the WITH clause

Parameter Description Required Remarks
connector The type of the dimension table. Yes Set the value to elasticsearch.
endPoint The endpoint of Elasticsearch. Yes Example: http://127.0.0.1:9200.
accessId The username that is used to access the Elasticsearch cluster. No N/A.
accessKey The password that is used to access the Elasticsearch cluster. No N/A.
indexName The name of the index. Yes N/A.
typeNames The type of the document. No Default value: _doc.
Note We recommend that you do not specify this parameter if you use a version of Elasticsearch later than V7.0.

Parameters in the CACHE clause

Parameter Description Required Remarks
cache The cache policy. No Valid values:
  • None: indicates that no data is cached. This is the default value.
  • LRU: indicates that only some data in the dimension table is cached. The system searches the cache each time it receives a data record. If the system does not find the record in the cache, it searches for the data record in the physical dimension table.

    If you use this cache policy, you must configure the cacheSize and cacheTTLMs parameters.

cacheSize The maximum number of data records that can be cached. No This parameter is available only if you set the cache parameter to LRU. Default value: 10000.
cacheTTLMs The interval at which the system refreshes the cache. No The cache entries do not expire by default. Unit: milliseconds. You can configure this parameter based on the cache policy.
  • If you set the cache parameter to LRU, the cacheTTLMs parameter specifies the period of time before cache entries expire.
  • If you set the cache parameter to None, the cacheTTLMs parameter can be left empty. This indicates that cache entries do not expire.
cacheEmpty Specifies whether to cache empty results. No Default value: true.

Data type mapping

Flink parses Elasticsearch data in the JSON format. For more information, see Data type mapping.

Sample code

CREATE TEMPORARY TABLE datagen_source (
  id STRING, 
  data STRING,
  proctime as PROCTIME()
) WITH (
  'connector' = 'datagen' 
);

CREATE TEMPORARY TABLE es_dim (
  id STRING,
  `value` FLOAT
) WITH (
  'connector' ='elasticsearch',
  'endPoint' = '<yourEndPoint>',
  'accessId' = '<yourAccessId>',
  'accessKey' = '<yourAccessSecret>',
  'indexName' = '<yourIndexName>',
  'typeNames' = '<yourTypeName>'
);

CREATE TEMPORARY TABLE blackhole_sink (
  id STRING,
  data STRING,
  `value` FLOAT
) WITH (
  'connector' = 'blackhole' 
);

INSERT INTO blackhole_sink
SELECT e.*, w.*
  FROM datagen_source AS e
JOIN es_dim FOR SYSTEM_TIME AS OF e.proctime AS w
ON e.id = w.id;