This topic provides the DDL syntax that is used to create an Elasticsearch dimension table, describes the parameters in the WITH clause and CACHE parameters, and provides sample code.

Note The Elasticsearch connector supports the features that are provided by Alibaba Cloud Elasticsearch and do not support HTTPS-based access.

What is Alibaba Cloud Elasticsearch?

Alibaba Cloud Elasticsearch supports features of open source Elasticsearch, such as Security, Machine Learning, Graph, and Application Performance Management (APM). Alibaba Cloud Elasticsearch is suitable for a variety of scenarios, such as data analysis and data search. Alibaba Cloud Elasticsearch provides enterprise-class services. These services include access control, security monitoring and alerting, and automatic generation of reports.

Prerequisites

Limits

Only Flink that uses Ververica Runtime (VVR) 2.0.0 or later supports the Elasticsearch connector.

DDL syntax

CREATE TABLE es_dim(
  field1 STRING, --- If this field is used as a key to join a dimension table with another table, the value of this field must be of the STRING type. 
  field2 FLOAT,
  field3 BIGINT,
  PRIMARY KEY (field1) NOT ENFORCED
) WITH (
  'connector' ='elasticsearch',
  'endPoint' = '<yourEndPoint>',
  'accessId' = '<yourAccessId>',
  'accessKey' = '<yourAccessSecret>',
  'indexName' = '<yourIndexName>',
  'typeNames' = '<yourTypeName>'
);
Note
  • If you define a primary key for a dimension table, only one key can be used to join the dimension table with another table. The key is the ID of a document in your Elasticsearch index.
  • If you do not define a primary key for the dimension table, one or more keys can be used to join the dimension table with another table. The keys are the fields of a document in your Elasticsearch index.
  • Elasticsearch V5.5 and later are supported.

Parameters in the WITH clause

Parameter Description Required Remarks
connector The type of the dimension table. Yes Set the value to elasticsearch.
endPoint The endpoint of Elasticsearch. Yes Example: http://127.0.XX.XX:9200.
accessId The username that is used to access the Elasticsearch cluster. No N/A.
accessKey The password that is used to access the Elasticsearch cluster. No N/A.
indexName The name of the index. Yes N/A.
typeNames The type of the document. No Default value: _doc.
Note We recommend that you do not configure this parameter if the version of your Elasticsearch cluster is later than 7.0.

Cache parameters

Parameter Description Required Remarks
cache The cache policy. No Valid values:
  • None: indicates that data is not cached. This is the default value.
  • LRU: indicates that only the specified data in the dimension table is cached. Each time the system receives a data record, the system searches the cache. If the system does not find the record in the cache, the system searches for the data record in the physical dimension table.

    If this cache policy is used, you must configure the cacheSize and cacheTTLMs parameters.

cacheSize The maximum number of data records that can be cached. No This parameter is available only if you set the cache parameter to LRU. Default value: 10000.
cacheTTLMs The cache timeout period. Unit: milliseconds. No The cacheTTLMs parameter applies configurations based on the value of the cache parameter.
  • If you set the cache parameter to None, the cacheTTLMs parameter can be left empty. This indicates that cache entries do not expire.
  • If you set the cache parameter to LRU, the cacheTTLMs parameter specifies the cache timeout period. Default value: 10000. Unit: milliseconds.
cacheEmpty Specifies whether to cache empty results. No Default value: true.

Data type mappings

Realtime Compute for Apache Flink parses Elasticsearch data in the JavaScript Object Notation (JSON) format. For more information, see Data type mappings.

Sample code

CREATE TEMPORARY TABLE datagen_source (
  id STRING, 
  data STRING,
  proctime as PROCTIME()
) WITH (
  'connector' = 'datagen' 
);

CREATE TEMPORARY TABLE es_dim (
  id STRING,
  `value` FLOAT,
  PRIMARY KEY (id) NOT ENFORCED
) WITH (
  'connector' ='elasticsearch',
  'endPoint' = '<yourEndPoint>',
  'accessId' = '<yourAccessId>',
  'accessKey' = '<yourAccessSecret>',
  'indexName' = '<yourIndexName>',
  'typeNames' = '<yourTypeName>'
);

CREATE TEMPORARY TABLE blackhole_sink (
  id STRING,
  data STRING,
  `value` FLOAT
) WITH (
  'connector' = 'blackhole' 
);

INSERT INTO blackhole_sink
SELECT e.*, w.*
  FROM datagen_source AS e
JOIN es_dim FOR SYSTEM_TIME AS OF e.proctime AS w
ON e.id = w.id;