This topic provides the DDL syntax that is used to create an Elasticsearch dimension table, describes the parameters in the WITH and CACHE clauses, and provides sample code.

Note Elasticsearch connectors support the features that are provided by Alibaba Cloud Elasticsearch and do not support HTTPS-based access.

What is Alibaba Cloud Elasticsearch?

Alibaba Cloud Elasticsearch is compatible with open source Elasticsearch features, and business features such as Security, Machine Learning, Graph, and Application Performance Management (APM). Alibaba Cloud Elasticsearch is suitable for scenarios such as data analysis and data search. Alibaba Cloud Elasticsearch provides enterprise-level services. These services include access control, security monitoring and alerting, and automatic generation of reports.

Prerequisites

Limits

Only Flink that uses Ververica Runtime (VVR) 2.0.0 or later versions supports Elasticsearch connectors.

DDL syntax

CREATE TABLE es_dim(
  field1 LONG, --- If this field is used as a key to join a dimension table with another table, the value of this field must be of the STRING type. 
  field2 VARBINARY, 
  field3 VARCHAR
) WITH (
  'connector' ='elasticsearch',
  'endPoint' = '<yourEndPoint>',
  'accessId' = '<yourAccessId>',
  'accessKey' = '<yourAccessSecret>',
  'indexName' = '<yourIndexName>',
  'typeNames' = '<yourTypeName>'
);
Note
  • Only one key can be used to join a dimension table with another table. The key is the ID of a document in your Elasticsearch index.
  • Elasticsearch V5.5 and later versions are supported.

Parameters in the WITH clause

Parameter Description Required Remarks
connector The type of the dimension table. Yes Set the value to elasticsearch.
endPoint The endpoint of Elasticsearch. Yes Example: http://127.0.0.1:9200.
accessId The username that is used to access the Elasticsearch cluster. No N/A.
accessKey The password that is used to access the Elasticsearch cluster. No N/A.
indexName The name of the Elasticsearch index. Yes N/A.
typeNames The type of the document. No Default value: _doc.
Note We recommend that you do not configure this parameter if you use a version of Elasticsearch later than V7.0.

Parameters in the CACHE clause

Parameter Description Required Remarks
cache The cache policy. No Valid values:
  • None: indicates that no data is cached. Default value: None.
  • LRU: indicates that only some data in the dimension table is cached. Each time the system receives a data record, the system searches the cache. If the system does not find the record in the cache, the system searches for the data record in the physical dimension table.

    If this cache policy is used, you must configure the cacheSize and cacheTTLMs parameters.

cacheSize The maximum number of data records that can be cached. No This parameter is available only if you set the cache parameter to LRU. Default value: 10000.
cacheTTLMs The cache timeout period. Unit: milliseconds. No The cacheTTLMs parameter applies configurations based on the value of the cache parameter.
  • If you set the cache parameter to None, the cacheTTLMs parameter can be left empty. This indicates that cache entries do not expire.
  • If you set the cache parameter to LRU, the cacheTTLMs parameter specifies the cache timeout period. Default value: 10000. Unit: milliseconds.
cacheEmpty Specifies whether to cache empty results. No Default value: true.

Data type mapping

Realtime Compute for Apache Flink parses Elasticsearch data in the JavaScript Object Notation (JSON) format. For more information, see Data type mapping.

Sample code

CREATE TEMPORARY TABLE datagen_source (
  id STRING, 
  data STRING,
  proctime as PROCTIME()
) WITH (
  'connector' = 'datagen' 
);

CREATE TEMPORARY TABLE es_dim (
  id STRING,
  `value` FLOAT
) WITH (
  'connector' ='elasticsearch',
  'endPoint' = '<yourEndPoint>',
  'accessId' = '<yourAccessId>',
  'accessKey' = '<yourAccessSecret>',
  'indexName' = '<yourIndexName>',
  'typeNames' = '<yourTypeName>'
);

CREATE TEMPORARY TABLE blackhole_sink (
  id STRING,
  data STRING,
  `value` FLOAT
) WITH (
  'connector' = 'blackhole' 
);

INSERT INTO blackhole_sink
SELECT e.*, w.*
  FROM datagen_source AS e
JOIN es_dim FOR SYSTEM_TIME AS OF e.proctime AS w
ON e.id = w.id;