All Products
Search
Document Center

Realtime Compute for Apache Flink:Elasticsearch

Last Updated:Mar 26, 2026

The Elasticsearch connector lets you use Elasticsearch indexes as a source table, dimension table, or sink table in Realtime Compute for Apache Flink jobs.

Background information

Alibaba Cloud Elasticsearch is compatible with open source Elasticsearch features, including Security, Machine Learning, Graph, and application performance management (APM). It is suitable for data analytics, data search, and other scenarios, and provides enterprise-level services such as access control, security monitoring and alerting, and automatic report generation.

Supported modes: Sink: Batch | Sink: Streaming Append & Upsert Mode | Source: Batch | Source: Streaming

Category Details
Supported table types Source table, dimension table, sink table
Running mode Batch and streaming
Data format JSON
API type DataStream and SQL
Data update or deletion in a sink table Yes

Monitoring metrics

Table type Metrics
Source table pendingRecords, numRecordsIn, numRecordsInPerSecond, numBytesIn, numBytesInPerSecond
Dimension table None
Sink table (VVR 6.0.6 or later) numRecordsOut, numRecordsOutPerSecond

For metric definitions, see Metrics.

Prerequisites

Before you begin, make sure that:

Limitations

  • Source and dimension tables support Elasticsearch 6.8.x or later, but not 8.x or later.

  • Sink tables support Elasticsearch 6.x, 7.x, and 8.x.

  • Only full source table reads are supported. Incremental reads are not supported.

Key concepts

Write mode: upsert vs. append

The sink table operates in one of two modes, determined by whether a primary key is defined in the DDL.

Mode Trigger Document ID Supported messages
Upsert Primary key defined Primary key value INSERT, UPDATE, DELETE
Append No primary key Random ID generated by Elasticsearch INSERT only

When a primary key is defined, it is used as the document ID. The connector uses the document-id.key-delimiter parameter (default: _) to join multiple primary key field values into a single document ID string (up to 512 bytes, no spaces), in the order defined in the DDL.

The following types cannot be used as primary key fields because they have no string representation: BYTES, ROW, ARRAY, and MAP.

DDL fields map to Elasticsearch document fields. Metadata such as document IDs is managed by Elasticsearch and cannot be written through the DDL.

Dynamic indexes

The index parameter for sink tables supports both static and dynamic values.

  • Static index: A plain string, such as myusers. All records are written to the same index.

  • Dynamic index: A template using {field_name} to generate the index name from a field value in each record.

For TIMESTAMP, DATE, and TIME fields, use {field_name|date_format_string} to format the value using Java's DateTimeFormatter. For example, setting index to myusers-{log_ts|yyyy-MM-dd} writes a record with log_ts = 2020-03-27 12:25:55 to the myusers-2020-03-27 index.

Delivery guarantees

When checkpointing is enabled, the connector provides at-least-once delivery by waiting for all pending bulk requests to complete before each checkpoint. If you set sink.flush-on-checkpoint to false, the connector skips this wait and at-least-once delivery guarantees no longer apply.

Syntax

Source table

CREATE TABLE elasticsearch_source(
  name STRING,
  location STRING,
  value FLOAT
) WITH (
  'connector' = 'elasticsearch',
  'endPoint' = '<yourEndPoint>',
  'indexName' = '<yourIndexName>'
);

Dimension table

CREATE TABLE es_dim(
  field1 STRING,  -- JOIN key; must be STRING type
  field2 FLOAT,
  field3 BIGINT,
  PRIMARY KEY (field1) NOT ENFORCED
) WITH (
  'connector' = 'elasticsearch',
  'endPoint' = '<yourEndPoint>',
  'indexName' = '<yourIndexName>'
);

Primary key behavior for dimension tables:

  • With primary key: Only one JOIN key is allowed. The key must match the document ID in the Elasticsearch index.

  • Without primary key: One or more JOIN keys are allowed. The keys must be fields in the Elasticsearch document.

For the STRING type, Flink adds a .keyword suffix to field names by default to ensure compatibility. If this prevents matching with Text fields in Elasticsearch, set ignoreKeywordSuffix to true.

Sink table

CREATE TABLE es_sink(
  user_id   STRING,
  user_name STRING,
  uv        BIGINT,
  pv        BIGINT,
  PRIMARY KEY (user_id) NOT ENFORCED  -- Optional. If defined, used as the document ID; enables upsert mode.
) WITH (
  'connector' = 'elasticsearch-7',  -- Use elasticsearch-6 for Elasticsearch 6.x, elasticsearch-8 for 8.x (requires VVR 8.0.5+)
  'hosts' = '<yourHosts>',
  'index' = '<yourIndex>'
);

WITH parameters

Source table

Parameter Type Required Default Description
connector String Yes Set to elasticsearch.
endPoint String Yes Elasticsearch server address. Example: http://127.0.0.1:XXXX.
indexName String Yes Name of the Elasticsearch index to read from.
accessId String No Empty (no auth) Username for the Elasticsearch instance. If set, accessKey must also be set. Store credentials as Project variables to avoid leaking them.
accessKey String No Empty Password for the Elasticsearch instance.
typeNames String No _doc Elasticsearch type name. Do not set this for Elasticsearch 7.0 or later.
batchSize Int No 2000 Maximum number of documents returned per scroll request.
keepScrollAliveSecs Int No 3600 Maximum time in seconds to keep the scroll context alive.

Sink table

Parameter Type Required Default Description
connector String Yes elasticsearch-6, elasticsearch-7, or elasticsearch-8. elasticsearch-8 requires VVR 8.0.5 or later.
hosts String Yes Elasticsearch server address. Example: 127.0.0.1:XXXX.
index String Yes Target index name. Supports static strings and dynamic templates using {field_name} or {field_name|date_format_string}. See Dynamic indexes.
document-type String Required for elasticsearch-6; not supported for elasticsearch-7 Document type. Must match the type value on the Elasticsearch side when using elasticsearch-6.
username String No Empty (no auth) Elasticsearch username. If set, password must also be set. Store credentials as Project variables.
password String No Empty Elasticsearch password.
document-id.key-delimiter String No _ Separator used to join multiple primary key fields into a document ID. The document ID is constructed by concatenating all primary key fields in DDL order using this separator. Maximum length: 512 bytes, no spaces.
failure-handler String No fail Policy for failed Elasticsearch requests. Options: fail (fail the job), ignore (discard the request), retry-rejected (retry requests rejected due to a full queue), or a custom subclass of ActionRequestFailureHandler.
sink.flush-on-checkpoint Boolean No true Whether to flush all pending requests before each checkpoint. Set to false to disable — this removes at-least-once delivery guarantees.
sink.bulk-flush.backoff.strategy Enum No DISABLED Retry backoff strategy for failed flush operations. Options: DISABLED (no retry), CONSTANT (fixed delay between retries), EXPONENTIAL (exponentially increasing delay).
sink.bulk-flush.backoff.max-retries Int No Maximum number of backoff retries.
sink.bulk-flush.backoff.delay Duration No Delay between retries. For CONSTANT: the fixed delay. For EXPONENTIAL: the initial baseline delay.
sink.bulk-flush.max-actions Int No 1000 Maximum number of operations buffered per bulk request. Set to 0 to disable.
sink.bulk-flush.max-size String No 2 MB Maximum buffer size per bulk request, in MB. Set to 0 MB to disable.
sink.bulk-flush.interval Duration No 1s Flush interval. Set to 0s to disable.
connection.path-prefix String No Empty Prefix string added to every REST request path.
retry-on-conflict Int No 0 Maximum retries for update operations that fail due to a version conflict. If exceeded, the job fails. Requires VVR 4.0.13 or later; takes effect only when a primary key is defined.
routing-fields String No One or more Elasticsearch field names used to route documents to specific shards. Separate multiple field names with ;. Empty fields are set to null. Requires VVR 8.0.6 or later; supported for elasticsearch-7 and elasticsearch-8 only.
sink.delete-strategy Enum No DELETE_ROW_ON_PK Behavior when a retraction message (-D or -U) is received. See the table below.
sink.ignore-null-when-update Boolean No false When true, null field values in incoming data are not written to Elasticsearch (the existing field value is preserved). When false, the field is updated to null. Requires VVR 11.1 or later; only effective when a primary key is set and the data format is JSON.
connection.request-timeout Duration No Timeout for requesting a connection from the connection manager. Requires VVR 11.5 or later.
connect.timeout Duration No Timeout for establishing a connection. Requires VVR 11.5 or later.
socket.timeout Duration No Maximum idle time between two consecutive data packets. Requires VVR 11.5 or later.
sink.bulk-flush.update.doc_as_upsert Boolean No false When true, sets doc_as_upsert on the Update Request. Use this when working with Elasticsearch preset data pipelines, which do not support partial updates in bulk operations. Requires VVR 11.5 or later.

`sink.delete-strategy` options:

Value Behavior
DELETE_ROW_ON_PK (default) Ignores -U messages. Deletes the document matching the primary key when a -D message is received.
IGNORE_DELETE Ignores both -U and -D messages. The sink does not perform retractions.
NON_PK_FIELD_TO_NULL Ignores -U messages. When a -D message is received, keeps the primary key value unchanged and sets all other fields to null. Useful for partial updates when multiple sinks write to the same Elasticsearch index.
CHANGELOG_STANDARD Similar to DELETE_ROW_ON_PK, but also deletes the document when a -U message is received. Requires VVR 8.0.8 or later.

Dimension table

Parameter Type Required Default Description
connector String Yes Set to elasticsearch.
endPoint String Yes Elasticsearch server address. Example: http://127.0.0.1:XXXX.
indexName String Yes Name of the Elasticsearch index to look up from.
accessId String No Empty (no auth) Username for the Elasticsearch instance. If set, accessKey must also be set. Store credentials as Project variables.
accessKey String No Empty Password for the Elasticsearch instance.
typeNames String No _doc Elasticsearch type name. Do not set this for Elasticsearch 7.0 or later.
maxJoinRows Integer No 1024 Maximum number of rows to return per input record.
cache String No None Cache policy. Options: ALL (load all data before the job runs; reload after TTL expires), LRU (cache frequently accessed rows; look up the index on a cache miss), None (no cache).
cacheSize Long No 100000 Maximum number of rows to cache. Applies only when cache is set to LRU.
cacheTTLMs Long No Long.MAX_VALUE (no expiry) Cache time-to-live in milliseconds. For LRU: the timeout before a cached entry expires. For ALL: the interval between full cache reloads.
ignoreKeywordSuffix Boolean No false When true, disables the automatic .keyword suffix that Flink adds to STRING field names. Set to true if the suffix prevents matching with Text fields in Elasticsearch.
cacheEmpty Boolean No true Whether to cache empty lookup results. Applies only when cache is set to LRU.
queryMaxDocs Integer No 10000 Maximum number of documents returned per lookup for non-primary key dimension tables. Cannot exceed the Elasticsearch server limit of 10,000. Reduce this value if you encounter memory issues. Requires VVR 8.0.8 or later; has no effect for primary key dimension tables.

Type mappings

Flink parses Elasticsearch data in JSON format. For type mapping details, see Data type mapping.

Examples

Source table

CREATE TEMPORARY TABLE elasticsearch_source (
  name     STRING,
  location STRING,
  `value`  FLOAT
) WITH (
  'connector'  = 'elasticsearch',
  'endPoint'   = '<yourEndPoint>',          -- Example: http://127.0.0.1:XXXX
  'accessId'   = '${secret_values.ak_id}',  -- Store as a project variable
  'accessKey'  = '${secret_values.ak_secret}',
  'indexName'  = '<yourIndexName>',
  'typeNames'  = '<yourTypeName>'           -- Omit for Elasticsearch 7.0 or later
);

CREATE TEMPORARY TABLE blackhole_sink (
  name     STRING,
  location STRING,
  `value`  FLOAT
) WITH (
  'connector' = 'blackhole'
);

INSERT INTO blackhole_sink
SELECT name, location, `value`
FROM elasticsearch_source;

Dimension table

This example joins a generated data stream with an Elasticsearch dimension table using a temporal join.

CREATE TEMPORARY TABLE datagen_source (
  id       STRING,
  data     STRING,
  proctime AS PROCTIME()
) WITH (
  'connector' = 'datagen'
);

CREATE TEMPORARY TABLE es_dim (
  id      STRING,
  `value` FLOAT,
  PRIMARY KEY (id) NOT ENFORCED  -- Key must match the document ID in the Elasticsearch index
) WITH (
  'connector' = 'elasticsearch',
  'endPoint'  = '<yourEndPoint>',
  'accessId'  = '${secret_values.ak_id}',
  'accessKey' = '${secret_values.ak_secret}',
  'indexName' = '<yourIndexName>',
  'typeNames' = '<yourTypeName>'
);

CREATE TEMPORARY TABLE blackhole_sink (
  id      STRING,
  data    STRING,
  `value` FLOAT
) WITH (
  'connector' = 'blackhole'
);

INSERT INTO blackhole_sink
SELECT e.*, w.*
FROM datagen_source AS e
JOIN es_dim FOR SYSTEM_TIME AS OF e.proctime AS w
ON e.id = w.id;

Sink table — flat schema

CREATE TEMPORARY TABLE datagen_source (
  id   STRING,
  name STRING,
  uv   BIGINT
) WITH (
  'connector' = 'datagen'
);

CREATE TEMPORARY TABLE es_sink (
  user_id   STRING,
  user_name STRING,
  uv        BIGINT,
  PRIMARY KEY (user_id) NOT ENFORCED  -- Optional. If defined, used as the document ID; enables upsert mode.
) WITH (
  'connector'     = 'elasticsearch-6',           -- Use elasticsearch-7 for Elasticsearch 7.x
  'hosts'         = '<yourHosts>',               -- Example: 127.0.0.1:XXXX
  'index'         = '<yourIndex>',
  'document-type' = '<yourElasticsearch.types>', -- Required for elasticsearch-6 only
  'username'      = '${secret_values.ak_id}',
  'password'      = '${secret_values.ak_secret}'
);

INSERT INTO es_sink
SELECT id, name, uv
FROM datagen_source;

Sink table — nested schema

This example writes data with a nested ROW type, including ARRAY and MAP fields.

CREATE TEMPORARY TABLE datagen_source (
  id      STRING,
  details ROW<
    name       STRING,
    ages       ARRAY<INT>,
    attributes MAP<STRING, STRING>
  >
) WITH (
  'connector' = 'datagen'
);

CREATE TEMPORARY TABLE es_sink (
  id      STRING,
  details ROW<
    name       STRING,
    ages       ARRAY<INT>,
    attributes MAP<STRING, STRING>
  >,
  PRIMARY KEY (id) NOT ENFORCED
) WITH (
  'connector'     = 'elasticsearch-6',
  'hosts'         = '<yourHosts>',
  'index'         = '<yourIndex>',
  'document-type' = '<yourElasticsearch.types>',
  'username'      = '${secret_values.ak_id}',
  'password'      = '${secret_values.ak_secret}'
);

INSERT INTO es_sink
SELECT id, details
FROM datagen_source;