This topic provides the DDL syntax that is used to create an Elasticsearch dimension table, describes the parameters in the WITH and CACHE clauses, and provides sample code.
What is Alibaba Cloud Elasticsearch?
Alibaba Cloud Elasticsearch is compatible with open source Elasticsearch features, and business features such as Security, Machine Learning, Graph, and Application Performance Management (APM). Alibaba Cloud Elasticsearch is suitable for scenarios such as data analysis and data search. Alibaba Cloud Elasticsearch provides enterprise-level services. These services include access control, security monitoring and alerting, and automatic generation of reports.
Prerequisites
- An Elasticsearch index is created. For more information, see Step 1: Create a cluster.
- A whitelist is configured to allow access to an Elasticsearch cluster over the Internet or a virtual private cloud (VPC). For more information, see Configure a public or private IP address whitelist for an Elasticsearch cluster.
Limits
Only Flink that uses Ververica Runtime (VVR) 2.0.0 or later versions supports Elasticsearch connectors.
DDL syntax
CREATE TABLE es_dim(
field1 LONG, --- If this field is used as a key to join a dimension table with another table, the value of this field must be of the STRING type.
field2 VARBINARY,
field3 VARCHAR
) WITH (
'connector' ='elasticsearch',
'endPoint' = '<yourEndPoint>',
'accessId' = '<yourAccessId>',
'accessKey' = '<yourAccessSecret>',
'indexName' = '<yourIndexName>',
'typeNames' = '<yourTypeName>'
);
- Only one key can be used to join a dimension table with another table. The key is the ID of a document in your Elasticsearch index.
- Elasticsearch V5.5 and later versions are supported.
Parameters in the WITH clause
Parameter | Description | Required | Remarks |
---|---|---|---|
connector | The type of the dimension table. | Yes | Set the value to elasticsearch .
|
endPoint | The endpoint of Elasticsearch. | Yes | Example: http://127.0.0.1:9200. |
accessId | The username that is used to access the Elasticsearch cluster. | No | N/A. |
accessKey | The password that is used to access the Elasticsearch cluster. | No | N/A. |
indexName | The name of the Elasticsearch index. | Yes | N/A. |
typeNames | The type of the document. | No | Default value: _doc .
Note We recommend that you do not configure this parameter if you use a version of Elasticsearch
later than V7.0.
|
Parameters in the CACHE clause
Parameter | Description | Required | Remarks |
---|---|---|---|
cache | The cache policy. | No | Valid values:
|
cacheSize | The maximum number of data records that can be cached. | No | This parameter is available only if you set the cache parameter to LRU . Default value: 10000.
|
cacheTTLMs | The cache timeout period. Unit: milliseconds. | No | The cacheTTLMs parameter applies configurations based on the value of the cache parameter.
|
cacheEmpty | Specifies whether to cache empty results. | No | Default value: true. |
Data type mapping
Realtime Compute for Apache Flink parses Elasticsearch data in the JavaScript Object Notation (JSON) format. For more information, see Data type mapping.
Sample code
CREATE TEMPORARY TABLE datagen_source (
id STRING,
data STRING,
proctime as PROCTIME()
) WITH (
'connector' = 'datagen'
);
CREATE TEMPORARY TABLE es_dim (
id STRING,
`value` FLOAT
) WITH (
'connector' ='elasticsearch',
'endPoint' = '<yourEndPoint>',
'accessId' = '<yourAccessId>',
'accessKey' = '<yourAccessSecret>',
'indexName' = '<yourIndexName>',
'typeNames' = '<yourTypeName>'
);
CREATE TEMPORARY TABLE blackhole_sink (
id STRING,
data STRING,
`value` FLOAT
) WITH (
'connector' = 'blackhole'
);
INSERT INTO blackhole_sink
SELECT e.*, w.*
FROM datagen_source AS e
JOIN es_dim FOR SYSTEM_TIME AS OF e.proctime AS w
ON e.id = w.id;