This topic provides the DDL syntax that is used to create a full Elasticsearch source table, describes the parameters in the WITH clause, and provides data type mappings and sample code.
What is Elasticsearch?
Alibaba Cloud Elasticsearch is compatible with open-source Elasticsearch features, and business features such as Security, Machine Learning, Graph, and Application Performance Management (APM). Alibaba Cloud Elasticsearch is suitable for scenarios such as data analysis and data search. Alibaba Cloud Elasticsearch provides enterprise-level services. These services include access control, security monitoring and alerting, and automatic generation of reports.
- An Elasticsearch index is created. For more information, see Step 1: Create a cluster.
- A whitelist is configured to access an Elasticsearch cluster over the Internet or a virtual private cloud (VPC). For more information, see Configure a public or private IP address whitelist for an Elasticsearch cluster.
- You can create full Elasticsearch source tables only when Elasticsearch V5.5 or later is used.
- Only Flink that uses Ververica Runtime (VVR) 2.0.0 or later supports full Elasticsearch source table connectors.
- Only full Elasticsearch source tables are supported. Incremental Elasticsearch source tables are not supported.
CREATE TABLE elasticsearch_source( name STRING, location STRING, `value` FLOAT ) WITH ( 'connector' ='elasticsearch', 'endPoint' = '<yourEndPoint>', 'accessId' = '<yourAccessId>', 'accessKey' = '<yourAccessSecret>', 'indexName' = '<yourIndexName>', 'typeNames' = '<yourTypeName>' );
Parameters in the WITH clause
|connector||The type of the source table.||Yes||Set the value to
|endPoint||The endpoint of the Elasticsearch cluster.||Yes||Example: http://127.0.0.1:9200.|
|accessId||The username that is used to access the Elasticsearch cluster.||No||N/A.|
|accessKey||The password that is used to access the Elasticsearch cluster.||No||N/A.|
|indexName||The name of the index.||Yes||N/A.|
|typeNames||The type of the document.||No||Default value:
Note We recommend that you do not configure this parameter if the version of your Elasticsearch cluster is later than 7.0.
|batchSize||The maximum number of documents that can be obtained from the Elasticsearch cluster for each scroll request.||No||Default value: 2000.|
|keepScrollAliveSecs||The maximum retention period of the scroll context.||No||Default value: 3600. Unit: seconds.|
Data type mapping
Flink parses Elasticsearch data in the JSON format. For more information, see Data type mapping.
CREATE TEMPORARY TABLE elasticsearch_source ( name STRING, location STRING, `value` FLOAT ) WITH ( 'connector' ='elasticsearch', 'endPoint' = '<yourEndPoint>', 'accessId' = '<yourAccessId>', 'accessKey' = '<yourAccessSecret>', 'indexName' = '<yourIndexName>', 'typeNames' = '<yourTypeName>' ); CREATE TEMPORARY TABLE blackhole_sink ( name STRING, location STRING, `value` FLOAT ) WITH ( 'connector' ='blackhole' ); INSERT INTO blackhole_sink SELECT name, location, `value` FROM elasticsearch_source;