This topic provides the DDL syntax that is used to create an Elasticsearch result table, describes the parameters in the WITH clause, and provides data type mappings and sample code.

Note The features supported by Elasticsearch connectors are consistent with Alibaba Cloud Elasticsearch. This type of connector can be accessed over HTTPS.

What is Elasticsearch?

Alibaba Cloud Elasticsearch is compatible with open source Elasticsearch features, and business features such as Security, Machine Learning, Graph, and Application Performance Management (APM). Alibaba Cloud Elasticsearch is suitable for scenarios such as data analysis and data search. Alibaba Cloud Elasticsearch provides enterprise-level services, including access control, security monitoring and alerting, and automatic generation of reports.

Prerequisites

Limits

Only Flink that uses Ververica Runtime (VVR) 2.0.0 or later supports Elasticsearch connectors.

DDL syntax

 CREATE TABLE es_sink(
   user_id   STRING,
   user_name   STRING,
   uv BIGINT,
   pv BIGINT,
   PRIMARY KEY (user_id) NOT ENFORCED  -- The primary key is optional. If you specify a primary key, the primary key is used as the document ID. If you do not specify a primary key, a random value is used as the document ID. 
)  WITH (
   'connector' = 'elasticsearch-6',
   'hosts' = '<yourHosts>',
   'index' = '<yourIndex>',
   'document-type' = '<yourEelasticsearch.types>',
   'username' ='<yourElasticsearch.accessId>',
   'password' ='<yourElasticsearch.accessKey>'
);
Note
  • Only Elasticsearch V6.X and 7.X allow you to create Elasticsearch result tables.
  • The fields in the DDL statements correspond to the fields in Elasticsearch documents. Document IDs cannot be written to the result table.

Parameters in the WITH clause

Parameter Description Required Remarks
connector The type of the result table. Yes Set the value to elasticsearch-6 or elasticsearch-7.
hosts The endpoint of the Elasticsearch cluster. Yes Example: 127.0.0.1:9200.
index The name of the index. Yes This parameter is empty by default. This indicates that permission verification is not required.
document-type The type of a document.
  • elasticsearch-6: required
  • elasticsearch-7: not supported
N/A.
username The username that is used to access the Elasticsearch cluster. No This parameter is empty by default. This indicates that permission verification is not required.
password The password that is used to access the Elasticsearch cluster. No If you specify the username parameter, you must also specify the password parameter.
document-id.key-delimiter The delimiter that is used between document IDs. No Default value: _.
failure-handler The troubleshooting policy that is used when an Elasticsearch request fails. No Valid values:
  • fail: The job fails. This is the default value.
  • ignore: The failure is ignored and the request is deleted.
  • retry_rejected: The request is retried if the failure is caused by full queue capacity.
  • custom class name: The ActionRequestFailureHandler subclass is used to troubleshoot the failure.
sink.flush-on-checkpoint Specifies whether to trigger the flush operation when Elasticsearch creates a checkpoint. No Default value: true. If this feature is disabled, the connector does not wait to confirm whether all pending requests are complete when Elasticsearch creates a checkpoint. Therefore, the connector does not provide the at-least-once guarantee for the requests.
sink.bulk-flush.backoff.strategy The retry policy for the flush operation. This parameter is required if the flush operation fails due to a temporary request error. No
  • DISABLED: The flush operation is not retried. The flush operation fails when the first request error occurs. This is the default value.
  • CONSTANT: The waiting time for each flush operation is the same.
  • EXPONENTIAL: The waiting time for each flush operation exponentially increases.
sink.bulk-flush.backoff.max-retries The maximum number of retries. No Default value: 8.
sink.bulk-flush.backoff.delay The delay between retries. No
  • CONSTANT: the delay between retries.
  • EXPONENTIAL: the initial baseline delay.
sink.bulk-flush.max-actions The maximum number of flush operations that can be performed for each batch of requests. No Default value: 1000. If this parameter is set to 0, this feature is disabled.
sink.bulk-flush.max-size The maximum memory size of the buffer in which requests are saved. No Default value: 2. Unit: MB. If this parameter is set to 0, this feature is disabled.
sink.bulk-flush.interval The interval between flush operations. No Default value: 1. Unit: seconds. If this parameter is set to 0, this feature is disabled.
connection.path-prefix The prefix that must be added to each REST communication. No This parameter is empty by default.

Document ID

The working mode of the Elasticsearch sink varies based on whether a primary key is specified. If a primary key is specified, the Elasticsearch sink works in upsert mode. In this mode, messages including UPDATE and DELETE messages are consumed. If no primary key is specified, the Elasticsearch sink works in append mode. In this mode, only INSERT messages are consumed.

In the Elasticsearch sink, primary keys are used to calculate document IDs of Elasticsearch. A document ID is a string that contains a maximum of 512 bytes without spaces. The Elasticsearch sink uses the key delimiter specified by document-id.key-delimiter to concatenate all primary key fields in the order defined in the DDL statement and generates a document ID for each row. Some data types such as BYTES, ROW, ARRAY, and MAP cannot be represented as strings. Therefore, fields of these data types cannot be used as primary key fields. If no primary key is specified, Elasticsearch automatically generates random document IDs.

Dynamic index

The Elasticsearch sink supports static and dynamic indexes.
  • If you use a static index, the index option value must be a string, such as myusers. All records are written to the myusers index.
  • If you use a dynamic index, you can use {field_name} to reference the field values in the records to dynamically generate the destination index. You can also use {field_name|date_format_string} to convert field values of the TIMESTAMP, DATE, and TIME types to the format specified by date_format_string. date_format_string is compatible with DateTimeFormatter in Java. For example, if you set the dynamic index to myusers-{log_ts|yyyy-MM-dd}, the record 2020-03-27 12:25:55 in the value of the log_ts field is written to the myusers-2020-03-27 index.

Data type mapping

Flink parses Elasticsearch data in the JSON format. For more information, see Data type mapping.

Sample code

CREATE TEMPORARY TABLE datagen_source (
  id STRING, 
  name STRING,
  uv BIGINT
) with (
  'connector' = 'datagen'
);

CREATE TEMPORARY TABLE es_sink (
  user_id STRING,
  user_name STRING,
  uv BIGINT,
  PRIMARY KEY (user_id) NOT ENFORCED  -- The primary key is optional. If you specify a primary key, the primary key is used as the document ID. If you do not specify a primary key, a random value is used as the document ID. 
) WITH (
  'connector' = 'elasticsearch-7',
  'hosts' = '<yourHosts>',
  'index' = '<yourIndex>',
  'document-type' = '<yourElasticsearch.types>',
  'username' ='<yourElasticsearch.accessId>',
  'password' ='<yourElasticsearch.accessKey>'
);

INSERT INTO es_sink
   SELECT id, name, uv
FROM datagen_source;