The Elasticsearch connector lets you use Elasticsearch indexes as a source table, dimension table, or sink table in Realtime Compute for Apache Flink jobs.
Background information
Alibaba Cloud Elasticsearch is compatible with open source Elasticsearch features, including Security, Machine Learning, Graph, and application performance management (APM). It is suitable for data analytics, data search, and other scenarios, and provides enterprise-level services such as access control, security monitoring and alerting, and automatic report generation.
Supported modes: Sink: Batch | Sink: Streaming Append & Upsert Mode | Source: Batch | Source: Streaming
| Category | Details |
|---|---|
| Supported table types | Source table, dimension table, sink table |
| Running mode | Batch and streaming |
| Data format | JSON |
| API type | DataStream and SQL |
| Data update or deletion in a sink table | Yes |
Monitoring metrics
| Table type | Metrics |
|---|---|
| Source table | pendingRecords, numRecordsIn, numRecordsInPerSecond, numBytesIn, numBytesInPerSecond |
| Dimension table | None |
| Sink table (VVR 6.0.6 or later) | numRecordsOut, numRecordsOutPerSecond |
For metric definitions, see Metrics.
Prerequisites
Before you begin, make sure that:
-
An Elasticsearch index is created. For more information, see Create an index.
-
A public or internal-facing access whitelist is configured for the Elasticsearch instance. For more information, see Configure a public or internal-facing access whitelist for an instance.
Limitations
-
Source and dimension tables support Elasticsearch 6.8.x or later, but not 8.x or later.
-
Sink tables support Elasticsearch 6.x, 7.x, and 8.x.
-
Only full source table reads are supported. Incremental reads are not supported.
Key concepts
Write mode: upsert vs. append
The sink table operates in one of two modes, determined by whether a primary key is defined in the DDL.
| Mode | Trigger | Document ID | Supported messages |
|---|---|---|---|
| Upsert | Primary key defined | Primary key value | INSERT, UPDATE, DELETE |
| Append | No primary key | Random ID generated by Elasticsearch | INSERT only |
When a primary key is defined, it is used as the document ID. The connector uses the document-id.key-delimiter parameter (default: _) to join multiple primary key field values into a single document ID string (up to 512 bytes, no spaces), in the order defined in the DDL.
The following types cannot be used as primary key fields because they have no string representation: BYTES, ROW, ARRAY, and MAP.
DDL fields map to Elasticsearch document fields. Metadata such as document IDs is managed by Elasticsearch and cannot be written through the DDL.
Dynamic indexes
The index parameter for sink tables supports both static and dynamic values.
-
Static index: A plain string, such as
myusers. All records are written to the same index. -
Dynamic index: A template using
{field_name}to generate the index name from a field value in each record.
For TIMESTAMP, DATE, and TIME fields, use {field_name|date_format_string} to format the value using Java's DateTimeFormatter. For example, setting index to myusers-{log_ts|yyyy-MM-dd} writes a record with log_ts = 2020-03-27 12:25:55 to the myusers-2020-03-27 index.
Delivery guarantees
When checkpointing is enabled, the connector provides at-least-once delivery by waiting for all pending bulk requests to complete before each checkpoint. If you set sink.flush-on-checkpoint to false, the connector skips this wait and at-least-once delivery guarantees no longer apply.
Syntax
Source table
CREATE TABLE elasticsearch_source(
name STRING,
location STRING,
value FLOAT
) WITH (
'connector' = 'elasticsearch',
'endPoint' = '<yourEndPoint>',
'indexName' = '<yourIndexName>'
);
Dimension table
CREATE TABLE es_dim(
field1 STRING, -- JOIN key; must be STRING type
field2 FLOAT,
field3 BIGINT,
PRIMARY KEY (field1) NOT ENFORCED
) WITH (
'connector' = 'elasticsearch',
'endPoint' = '<yourEndPoint>',
'indexName' = '<yourIndexName>'
);
Primary key behavior for dimension tables:
-
With primary key: Only one JOIN key is allowed. The key must match the document ID in the Elasticsearch index.
-
Without primary key: One or more JOIN keys are allowed. The keys must be fields in the Elasticsearch document.
For the STRING type, Flink adds a .keyword suffix to field names by default to ensure compatibility. If this prevents matching with Text fields in Elasticsearch, set ignoreKeywordSuffix to true.
Sink table
CREATE TABLE es_sink(
user_id STRING,
user_name STRING,
uv BIGINT,
pv BIGINT,
PRIMARY KEY (user_id) NOT ENFORCED -- Optional. If defined, used as the document ID; enables upsert mode.
) WITH (
'connector' = 'elasticsearch-7', -- Use elasticsearch-6 for Elasticsearch 6.x, elasticsearch-8 for 8.x (requires VVR 8.0.5+)
'hosts' = '<yourHosts>',
'index' = '<yourIndex>'
);
WITH parameters
Source table
| Parameter | Type | Required | Default | Description |
|---|---|---|---|---|
connector |
String | Yes | — | Set to elasticsearch. |
endPoint |
String | Yes | — | Elasticsearch server address. Example: http://127.0.0.1:XXXX. |
indexName |
String | Yes | — | Name of the Elasticsearch index to read from. |
accessId |
String | No | Empty (no auth) | Username for the Elasticsearch instance. If set, accessKey must also be set. Store credentials as Project variables to avoid leaking them. |
accessKey |
String | No | Empty | Password for the Elasticsearch instance. |
typeNames |
String | No | _doc |
Elasticsearch type name. Do not set this for Elasticsearch 7.0 or later. |
batchSize |
Int | No | 2000 |
Maximum number of documents returned per scroll request. |
keepScrollAliveSecs |
Int | No | 3600 |
Maximum time in seconds to keep the scroll context alive. |
Sink table
| Parameter | Type | Required | Default | Description |
|---|---|---|---|---|
connector |
String | Yes | — | elasticsearch-6, elasticsearch-7, or elasticsearch-8. elasticsearch-8 requires VVR 8.0.5 or later. |
hosts |
String | Yes | — | Elasticsearch server address. Example: 127.0.0.1:XXXX. |
index |
String | Yes | — | Target index name. Supports static strings and dynamic templates using {field_name} or {field_name|date_format_string}. See Dynamic indexes. |
document-type |
String | Required for elasticsearch-6; not supported for elasticsearch-7 |
— | Document type. Must match the type value on the Elasticsearch side when using elasticsearch-6. |
username |
String | No | Empty (no auth) | Elasticsearch username. If set, password must also be set. Store credentials as Project variables. |
password |
String | No | Empty | Elasticsearch password. |
document-id.key-delimiter |
String | No | _ |
Separator used to join multiple primary key fields into a document ID. The document ID is constructed by concatenating all primary key fields in DDL order using this separator. Maximum length: 512 bytes, no spaces. |
failure-handler |
String | No | fail |
Policy for failed Elasticsearch requests. Options: fail (fail the job), ignore (discard the request), retry-rejected (retry requests rejected due to a full queue), or a custom subclass of ActionRequestFailureHandler. |
sink.flush-on-checkpoint |
Boolean | No | true |
Whether to flush all pending requests before each checkpoint. Set to false to disable — this removes at-least-once delivery guarantees. |
sink.bulk-flush.backoff.strategy |
Enum | No | DISABLED |
Retry backoff strategy for failed flush operations. Options: DISABLED (no retry), CONSTANT (fixed delay between retries), EXPONENTIAL (exponentially increasing delay). |
sink.bulk-flush.backoff.max-retries |
Int | No | — | Maximum number of backoff retries. |
sink.bulk-flush.backoff.delay |
Duration | No | — | Delay between retries. For CONSTANT: the fixed delay. For EXPONENTIAL: the initial baseline delay. |
sink.bulk-flush.max-actions |
Int | No | 1000 |
Maximum number of operations buffered per bulk request. Set to 0 to disable. |
sink.bulk-flush.max-size |
String | No | 2 MB |
Maximum buffer size per bulk request, in MB. Set to 0 MB to disable. |
sink.bulk-flush.interval |
Duration | No | 1s |
Flush interval. Set to 0s to disable. |
connection.path-prefix |
String | No | Empty | Prefix string added to every REST request path. |
retry-on-conflict |
Int | No | 0 |
Maximum retries for update operations that fail due to a version conflict. If exceeded, the job fails. Requires VVR 4.0.13 or later; takes effect only when a primary key is defined. |
routing-fields |
String | No | — | One or more Elasticsearch field names used to route documents to specific shards. Separate multiple field names with ;. Empty fields are set to null. Requires VVR 8.0.6 or later; supported for elasticsearch-7 and elasticsearch-8 only. |
sink.delete-strategy |
Enum | No | DELETE_ROW_ON_PK |
Behavior when a retraction message (-D or -U) is received. See the table below. |
sink.ignore-null-when-update |
Boolean | No | false |
When true, null field values in incoming data are not written to Elasticsearch (the existing field value is preserved). When false, the field is updated to null. Requires VVR 11.1 or later; only effective when a primary key is set and the data format is JSON. |
connection.request-timeout |
Duration | No | — | Timeout for requesting a connection from the connection manager. Requires VVR 11.5 or later. |
connect.timeout |
Duration | No | — | Timeout for establishing a connection. Requires VVR 11.5 or later. |
socket.timeout |
Duration | No | — | Maximum idle time between two consecutive data packets. Requires VVR 11.5 or later. |
sink.bulk-flush.update.doc_as_upsert |
Boolean | No | false |
When true, sets doc_as_upsert on the Update Request. Use this when working with Elasticsearch preset data pipelines, which do not support partial updates in bulk operations. Requires VVR 11.5 or later. |
`sink.delete-strategy` options:
| Value | Behavior |
|---|---|
DELETE_ROW_ON_PK (default) |
Ignores -U messages. Deletes the document matching the primary key when a -D message is received. |
IGNORE_DELETE |
Ignores both -U and -D messages. The sink does not perform retractions. |
NON_PK_FIELD_TO_NULL |
Ignores -U messages. When a -D message is received, keeps the primary key value unchanged and sets all other fields to null. Useful for partial updates when multiple sinks write to the same Elasticsearch index. |
CHANGELOG_STANDARD |
Similar to DELETE_ROW_ON_PK, but also deletes the document when a -U message is received. Requires VVR 8.0.8 or later. |
Dimension table
| Parameter | Type | Required | Default | Description |
|---|---|---|---|---|
connector |
String | Yes | — | Set to elasticsearch. |
endPoint |
String | Yes | — | Elasticsearch server address. Example: http://127.0.0.1:XXXX. |
indexName |
String | Yes | — | Name of the Elasticsearch index to look up from. |
accessId |
String | No | Empty (no auth) | Username for the Elasticsearch instance. If set, accessKey must also be set. Store credentials as Project variables. |
accessKey |
String | No | Empty | Password for the Elasticsearch instance. |
typeNames |
String | No | _doc |
Elasticsearch type name. Do not set this for Elasticsearch 7.0 or later. |
maxJoinRows |
Integer | No | 1024 |
Maximum number of rows to return per input record. |
cache |
String | No | None | Cache policy. Options: ALL (load all data before the job runs; reload after TTL expires), LRU (cache frequently accessed rows; look up the index on a cache miss), None (no cache). |
cacheSize |
Long | No | 100000 |
Maximum number of rows to cache. Applies only when cache is set to LRU. |
cacheTTLMs |
Long | No | Long.MAX_VALUE (no expiry) |
Cache time-to-live in milliseconds. For LRU: the timeout before a cached entry expires. For ALL: the interval between full cache reloads. |
ignoreKeywordSuffix |
Boolean | No | false |
When true, disables the automatic .keyword suffix that Flink adds to STRING field names. Set to true if the suffix prevents matching with Text fields in Elasticsearch. |
cacheEmpty |
Boolean | No | true |
Whether to cache empty lookup results. Applies only when cache is set to LRU. |
queryMaxDocs |
Integer | No | 10000 |
Maximum number of documents returned per lookup for non-primary key dimension tables. Cannot exceed the Elasticsearch server limit of 10,000. Reduce this value if you encounter memory issues. Requires VVR 8.0.8 or later; has no effect for primary key dimension tables. |
Type mappings
Flink parses Elasticsearch data in JSON format. For type mapping details, see Data type mapping.
Examples
Source table
CREATE TEMPORARY TABLE elasticsearch_source (
name STRING,
location STRING,
`value` FLOAT
) WITH (
'connector' = 'elasticsearch',
'endPoint' = '<yourEndPoint>', -- Example: http://127.0.0.1:XXXX
'accessId' = '${secret_values.ak_id}', -- Store as a project variable
'accessKey' = '${secret_values.ak_secret}',
'indexName' = '<yourIndexName>',
'typeNames' = '<yourTypeName>' -- Omit for Elasticsearch 7.0 or later
);
CREATE TEMPORARY TABLE blackhole_sink (
name STRING,
location STRING,
`value` FLOAT
) WITH (
'connector' = 'blackhole'
);
INSERT INTO blackhole_sink
SELECT name, location, `value`
FROM elasticsearch_source;
Dimension table
This example joins a generated data stream with an Elasticsearch dimension table using a temporal join.
CREATE TEMPORARY TABLE datagen_source (
id STRING,
data STRING,
proctime AS PROCTIME()
) WITH (
'connector' = 'datagen'
);
CREATE TEMPORARY TABLE es_dim (
id STRING,
`value` FLOAT,
PRIMARY KEY (id) NOT ENFORCED -- Key must match the document ID in the Elasticsearch index
) WITH (
'connector' = 'elasticsearch',
'endPoint' = '<yourEndPoint>',
'accessId' = '${secret_values.ak_id}',
'accessKey' = '${secret_values.ak_secret}',
'indexName' = '<yourIndexName>',
'typeNames' = '<yourTypeName>'
);
CREATE TEMPORARY TABLE blackhole_sink (
id STRING,
data STRING,
`value` FLOAT
) WITH (
'connector' = 'blackhole'
);
INSERT INTO blackhole_sink
SELECT e.*, w.*
FROM datagen_source AS e
JOIN es_dim FOR SYSTEM_TIME AS OF e.proctime AS w
ON e.id = w.id;
Sink table — flat schema
CREATE TEMPORARY TABLE datagen_source (
id STRING,
name STRING,
uv BIGINT
) WITH (
'connector' = 'datagen'
);
CREATE TEMPORARY TABLE es_sink (
user_id STRING,
user_name STRING,
uv BIGINT,
PRIMARY KEY (user_id) NOT ENFORCED -- Optional. If defined, used as the document ID; enables upsert mode.
) WITH (
'connector' = 'elasticsearch-6', -- Use elasticsearch-7 for Elasticsearch 7.x
'hosts' = '<yourHosts>', -- Example: 127.0.0.1:XXXX
'index' = '<yourIndex>',
'document-type' = '<yourElasticsearch.types>', -- Required for elasticsearch-6 only
'username' = '${secret_values.ak_id}',
'password' = '${secret_values.ak_secret}'
);
INSERT INTO es_sink
SELECT id, name, uv
FROM datagen_source;
Sink table — nested schema
This example writes data with a nested ROW type, including ARRAY and MAP fields.
CREATE TEMPORARY TABLE datagen_source (
id STRING,
details ROW<
name STRING,
ages ARRAY<INT>,
attributes MAP<STRING, STRING>
>
) WITH (
'connector' = 'datagen'
);
CREATE TEMPORARY TABLE es_sink (
id STRING,
details ROW<
name STRING,
ages ARRAY<INT>,
attributes MAP<STRING, STRING>
>,
PRIMARY KEY (id) NOT ENFORCED
) WITH (
'connector' = 'elasticsearch-6',
'hosts' = '<yourHosts>',
'index' = '<yourIndex>',
'document-type' = '<yourElasticsearch.types>',
'username' = '${secret_values.ak_id}',
'password' = '${secret_values.ak_secret}'
);
INSERT INTO es_sink
SELECT id, details
FROM datagen_source;