This topic provides the DDL syntax that is used to create a full Elasticsearch source table, describes the parameters in the WITH clause, and provides data type mappings and sample code.

Note An Elasticsearch connector can be used to store data of a source table for streaming jobs and batch jobs.

What is Elasticsearch?

Alibaba Cloud Elasticsearch is compatible with open-source Elasticsearch features, and business features such as Security, Machine Learning, Graph, and Application Performance Management (APM). Alibaba Cloud Elasticsearch is suitable for scenarios such as data analysis and data search. Alibaba Cloud Elasticsearch provides you with features, such as enterprise-level permission control, security monitoring and alerting, and automatic report generation.

Prerequisites

An Elasticsearch index is created. For more information, see Step 1: Create a cluster.

Limits

  • You can create full Elasticsearch source tables only when Elasticsearch V5.5 or later is used.
  • Only Flink that uses Ververica Runtime (VVR) 2.0.0 or later supports full Elasticsearch source table connectors.
  • Only full Elasticsearch source tables are supported. Incremental Elasticsearch source tables are not supported.

DDL syntax

 CREATE TABLE elasticsearch_source(
  name STRING, 
  location STRING, 
  `value` FLOAT
) WITH (
  'connector' ='elasticsearch',
  'endPoint' = '<yourEndPoint>',
  'accessId' = '<yourAccessId>',
  'accessKey' = '<yourAccessSecret>',
  'indexName' = '<yourIndexName>',
  'typeNames' = '<yourTypeName>'
);
Note The fields in the DDL statements correspond to the fields in Elasticsearch documents. Document IDs cannot be written to the result table.

Parameters in the WITH clause

Parameter Description Required Remarks
connector The type of the source table. Yes Set the value to elasticsearch.
endPoint The endpoint of the Elasticsearch cluster. Yes Example: http://127.0.0.1:9200.
accessId The username that is used to access the Elasticsearch cluster. No N/A.
accessKey The password that is used to access the Elasticsearch cluster. No N/A.
indexName The name of the index. Yes N/A.
typeNames The type of the document. No Default value: _doc.
Note We recommend that you do not set this parameter in versions later than Elasticsearch V7.0.
batchSize The maximum number of documents that can be obtained from the Elasticsearch cluster for each scroll request. No Default value: 2000.
keepScrollAliveSecs The maximum retention period of the scroll context. No Default value: 3600. Unit: seconds.

Data type mapping

Flink parses Elasticsearch data in the JSON format. For more information, see Data type mapping.

Sample code

CREATE TEMPORARY TABLE elasticsearch_source (
  name STRING,
  location STRING,
  `value` FLOAT
) WITH (
  'connector' ='elasticsearch',
  'endPoint' = '<yourEndPoint>',
  'accessId' = '<yourAccessId>',
  'accessKey' = '<yourAccessSecret>',
  'indexName' = '<yourIndexName>',
  'typeNames' = '<yourTypeName>'
);

CREATE TEMPORARY TABLE blackhole_sink (
  name STRING,
  location STRING,
  `value` FLOAT
) WITH (
  'connector' ='blackhole'
);

INSERT INTO blackhole_sink
SELECT name, location, `value`
FROM elasticsearch_source;