All Products
Search
Document Center

Realtime Compute for Apache Flink:Elasticsearch connector

Last Updated:Jan 24, 2024

This topic describes how to use the Elasticsearch connector.

Background information

Alibaba Cloud Elasticsearch is compatible with the features of open source Elasticsearch, such as Security, Machine Learning, Graph, and Application Performance Monitoring (APM). Alibaba Cloud Elasticsearch provides enterprise-class services, such as access control, security monitoring and alerting, and automatic generation of reports.

The following table describes the capabilities supported by the Elasticsearch connector.

Item

Description

Table type

Source table, dimension table, and result table

Running mode

Batch mode and streaming mode

Data format

JSON

Metric

  • Metrics for source tables

    • pendingRecords

    • numRecordsIn

    • numRecordsInPerSecond

    • numBytesIn

    • numBytesInPerSecond

  • Metrics for dimension tables

    None

  • Metrics for result tables in Realtime Compute for Apache Flink that uses Ververica Runtime (VVR) 6.0.6 or later

    • numRecordsOut

    • numRecordsOutPerSecond

Note

For more information about the metrics and how to view the metrics, see Report metrics of fully managed Flink to other platforms.

API type

DataStream API and SQL API

Data update or deletion in a result table

Supported

Prerequisites

Limits

  • The Elasticsearch connector can be used for source tables and dimension tables only when the Elasticsearch version is V5.5 or later.

  • The Elasticsearch connector can be used for result tables only when the Elasticsearch version is V6.X, V7.X, or V8.X.

  • Only Realtime Compute for Apache Flink that uses VVR 2.0.0 or later supports the Elasticsearch connector.

  • The Elasticsearch connector can be used only for full Elasticsearch source tables and cannot be used for incremental Elasticsearch source tables.

Syntax

  • Statement for creating a source table

    CREATE TABLE elasticsearch_source(
      name STRING,
      location STRING,
      value FLOAT
    ) WITH (
      'connector' ='elasticsearch',
      'endPoint' = '<yourEndPoint>',
      'indexName' = '<yourIndexName>'
    );
  • Statement for creating a dimension table

    CREATE TABLE es_dim(
      field1 STRING, --- If this field is used as a key to join a dimension table with another table, the value of this field must be of the STRING data type. 
      field2 FLOAT,
      field3 BIGINT,
      PRIMARY KEY (field1) NOT ENFORCED
    ) WITH (
      'connector' ='elasticsearch',
      'endPoint' = '<yourEndPoint>',
      'indexName' = '<yourIndexName>'
    );
    Note
    • If you define a primary key for a dimension table, only one key can be used to join the dimension table with another table. The key is the ID of a document in your Elasticsearch index.

    • If you do not define a primary key for the dimension table, one or more keys can be used to join the dimension table with another table. The keys are the fields of a document in your Elasticsearch index.

    • By default, the .keyword suffix is added to the names of the fields of the STRING data type to ensure compatibility. If the fields of the TEXT data type in the Elasticsearch table cannot be matched, you can set the value of the ignoreKeywordSuffix parameter to true.

  • Statement for creating a result table

    CREATE TABLE es_sink(
      user_id   STRING,
      user_name   STRING,
      uv BIGINT,
      pv BIGINT,
      PRIMARY KEY (user_id) NOT ENFORCED
    ) WITH (
      'connector' = 'elasticsearch-7' -- If the Elasticsearch version is V6.X, enter elasticsearch-6.
      'hosts' = '<yourHosts>',
      'index' = '<yourIndex>'
    );
    Note
    • The Elasticsearch result table is determined to work in upsert mode or append mode based on whether a primary key is defined.

      • If a primary key is defined, the primary key must be a document ID and the Elasticsearch result table works in upsert mode. In this mode, the Elasticsearch result table can consume messages including UPDATE and DELETE messages.

      • If no primary key is defined, Elasticsearch automatically generates a random document ID and the Elasticsearch result table works in append mode. In this mode, the Elasticsearch result table can consume only INSERT messages.

    • Specific data types, such as BYTES, ROW, ARRAY, and MAP, cannot be represented as strings. Therefore, fields of these data types cannot be used as primary key fields.

    • The fields in the DDL statement correspond to the fields in an Elasticsearch document. Metadata, such as document IDs, is maintained on Elasticsearch clusters. Therefore, metadata cannot be written to Elasticsearch result tables.

Parameters in the WITH clause

  • Parameters only for source tables

    Parameter

    Description

    Data type

    Required

    Default value

    Remarks

    connector

    The type of the source table.

    STRING

    Yes

    No default value

    Set the value to elasticsearch.

    endPoint

    The endpoint of the Elasticsearch cluster.

    STRING

    Yes

    No default value

    Example: http://127.0.0.1:XXXX.

    indexName

    The name of the Elasticsearch index.

    STRING

    Yes

    No default value

    N/A.

    accessId

    The username that is used to access the Elasticsearch cluster, which is the AccessKey ID of your Alibaba Cloud account.

    STRING

    No

    No default value

    For more information, see How do I view information about the AccessKey ID and AccessKey secret of the account?

    Important

    To protect your AccessKey pair, we recommend that you specify the AccessKey ID by using the key management method. For more information, see Manage keys.

    accessKey

    The password that is used to access the Elasticsearch cluster, which is the AccessKey secret of your Alibaba Cloud account.

    STRING

    No

    No default value

    For more information, see How do I view information about the AccessKey ID and AccessKey secret of the account?

    Important

    To protect your AccessKey pair, we recommend that you specify the AccessKey secret by using the key management method. For more information, see Manage keys.

    typeNames

    The names of types.

    STRING

    No

    _doc

    We recommend that you do not configure this parameter if the version of your Elasticsearch cluster is later than V7.0.

    batchSize

    The maximum number of documents that can be obtained from the Elasticsearch cluster for each scroll request.

    INT

    No

    2000

    N/A.

    keepScrollAliveSecs

    The maximum retention period of the scroll context.

    INT

    No

    3600

    Unit: seconds.

  • Parameters only for result tables

    Parameter

    Description

    Data type

    Required

    Default value

    Remarks

    connector

    The type of the result table.

    STRING

    Yes

    No default value

    Set the value to elasticsearch-6, elasticsearch-7, or elasticsearch-8.

    hosts

    The endpoint of the Elasticsearch cluster.

    STRING

    Yes

    No default value

    Example: 127.0.0.1:XXXX.

    index

    The name of the index in the Elasticsearch cluster.

    STRING

    Yes

    No default value

    The Elasticsearch result table supports both static and dynamic indexes. When you use static and dynamic indexes, take note of the following points:

    • If you use a static index, the index option value must be a string, such as myusers. All records are written to the myusers index.

    • If you use a dynamic index, you can use {field_name} to reference the field values in the records to dynamically generate the destination index. You can also use {field_namedate_format_string} to convert field values of the TIMESTAMP, DATE, and TIME data types into the format specified by date_format_string. date_format_string is compatible with DateTimeFormatter in Java. For example, if you set the dynamic index to myusers-{log_tsyyyy-MM-dd}, the record 2020-03-27 12:25:55 in the value of the log_ts field is written to the myusers-2020-03-27 index.

    document-type

    The type of a document.

    STRING

    • If the connector parameter is set to elasticsearch-6, this parameter must be configured.

    • If the connector parameter is set to elasticsearch-7, this parameter is not supported.

    No default value

    If the connector parameter is set to elasticsearch-6, the value of this parameter must be the same as the value of the type parameter that is configured for Elasticsearch.

    username

    The username that is used to access the Elasticsearch cluster.

    STRING

    No

    Empty

    By default, this parameter is empty. This indicates that permission verification is not required.

    For more information, see How do I view information about the AccessKey ID and AccessKey secret of the account?

    Important

    To protect your AccessKey pair, we recommend that you specify the AccessKey ID by using the key management method. For more information, see Manage keys.

    password

    The password that is used to access the Elasticsearch cluster.

    STRING

    No

    Empty

    If you specify the username parameter, you must also specify the password parameter.

    For more information, see How do I view information about the AccessKey ID and AccessKey secret of the account?

    Important

    To protect your AccessKey pair, we recommend that you specify the AccessKey secret by using the key management method. For more information, see Manage keys.

    document-id.key-delimiter

    The delimiter that is used to separate multiple document IDs.

    STRING

    No

    _

    In the Elasticsearch result table, the primary key is used to calculate document IDs of Elasticsearch. The Elasticsearch result table concatenates all primary key fields in the order that is defined in the DDL statement by using the key delimiter that is specified by document-id.key-delimiter. A document ID is also generated for each row.

    Note

    A document ID is a string that contains a maximum of 512 bytes without spaces.

    failure-handler

    The fault handling policy that is used when an Elasticsearch request fails.

    STRING

    No

    fail

    Valid values:

    • fail: The deployment fails if the request fails. This is the default value.

    • ignore: The failure is ignored and the request is deleted.

    • retry_rejected: The request is retried if the failure is caused by full queue capacity.

    • custom class name: The ActionRequestFailureHandler subclass is used to troubleshoot the failure.

    sink.flush-on-checkpoint

    Specifies whether the flush operation is triggered during checkpointing.

    BOOLEAN

    No

    true

    • true: The flush operation is triggered during checkpointing. This is the default value.

    • false: The flush operation is not triggered during checkpointing. After this feature is disabled, the Elasticsearch connector does not wait to check whether all pending requests are complete during checkpointing. Therefore, the Elasticsearch connector does not provide the at-least-once guarantee for requests.

    sink.bulk-flush.backoff.strategy

    You can configure the sink.bulk-flush.backoff.strategy parameter to specify a retry policy if the flush operation fails due to a temporary request error.

    Enum

    No

    DISABLED

    • DISABLED: The flush operation is not retried. The flush operation fails when the first request error occurs. This is the default value.

    • CONSTANT: The waiting time for each flush operation is the same.

    • EXPONENTIAL: The waiting time for each flush operation exponentially increases.

    sink.bulk-flush.backoff.max-retries

    The maximum number of retries.

    INT

    No

    No default value

    N/A.

    sink.bulk-flush.backoff.delay

    The delay between retries.

    Duration

    No

    No default value

    • If the sink.bulk-flush.backoff.strategy parameter is set to CONSTANT, the value of this parameter is the delay between reties.

    • If the sink.bulk-flush.backoff.strategy parameter is set to EXPONENTIAL, the value of this parameter is the initial baseline delay.

    sink.bulk-flush.max-actions

    The maximum number of flush operations that can be performed for each batch of requests.

    INT

    No

    1000

    The value 0 indicates that this feature is disabled.

    sink.bulk-flush.max-size

    The maximum memory size of the buffer in which requests are saved.

    STRING

    No

    2 MB

    Default value: 2. Unit: MB. If this parameter is set to 0, this feature is disabled.

    sink.bulk-flush.interval

    The interval at which flush operations are performed.

    Duration

    No

    1s

    Default value: 1. Unit: seconds. If this parameter is set to 0, this feature is disabled.

    connection.path-prefix

    The prefix that must be added to each REST communication.

    STRING

    No

    Empty

    N/A.

    retry-on-conflict

    The maximum number of retries that are allowed due to version conflicts in an update operation. If the number of retries exceeds the value of this parameter, an exception occurs and the deployment fails.

    INT

    No

    0

    Note
    • Only Realtime Compute for Apache Flink that uses VVR 4.0.13 or later supports this parameter.

    • This parameter takes effect only when a primary key is specified.

  • Parameters only for dimension tables

    Parameter

    Description

    Data type

    Required

    Default value

    Remarks

    connector

    The type of the dimension table.

    STRING

    Yes

    No default value

    Set the value to elasticsearch.

    endPoint

    The endpoint of the Elasticsearch cluster.

    STRING

    Yes

    No default value

    Example: http://127.0.0.1:XXXX.

    indexName

    The name of the index in the Elasticsearch cluster.

    STRING

    Yes

    No default value

    N/A.

    accessId

    The username that is used to access the Elasticsearch cluster.

    STRING

    No

    No default value

    For more information, see How do I view information about the AccessKey ID and AccessKey secret of the account?

    Important

    To protect your AccessKey pair, we recommend that you specify the AccessKey ID by using the key management method. For more information, see Manage keys.

    accessKey

    The password that is used to access the Elasticsearch cluster.

    STRING

    No

    No default value

    For more information, see How do I view information about the AccessKey ID and AccessKey secret of the account?

    Important

    To protect your AccessKey pair, we recommend that you specify the AccessKey secret by using the key management method. For more information, see Manage keys.

    typeNames

    The names of types.

    STRING

    No

    _doc

    We recommend that you do not configure this parameter if the version of your Elasticsearch cluster is later than V7.0.

    maxJoinRows

    The maximum number of rows to join in a row of data.

    INTEGER

    No

    1024

    N/A.

    cache

    The cache policy.

    STRING

    No

    ALL

    Valid values:

    • ALL: All data in the dimension table is cached. Before a deployment runs, the system loads all data in the dimension table to the cache. This way, the cache is searched for all subsequent queries in the dimension table. If the system does not find the data record in the cache, the join key does not exist. The system reloads all data in the cache after cache entries expire.

    • LRU: Partial data in the dimension table is cached. The system searches for data in the cache each time a data record is read from the source table. If the data is not found, the system searches for the data in the physical dimension table.

    • None: No data is cached.

    cacheSize

    The maximum number of rows of data that can be cached.

    LONG

    No

    100000

    The cacheSize parameter takes effect only when you set the cache parameter to LRU.

    cacheTTLMs

    The cache timeout period.

    LONG

    No

    Long.MAX_VALUE

    Unit: milliseconds. The configuration of the cacheTTLMs parameter varies based on the cache parameter.

    • If the cache parameter is set to LRU, the cacheTTLMs parameter specifies the cache timeout period. By default, cache entries do not expire.

    • If you set the cache parameter to ALL, the cacheTTLMs parameter specifies the interval at which the system refreshes the cache. By default, the cache is not refreshed.

    ignoreKeywordSuffix

    Specifies whether to remove the .keyword suffix that is automatically added to the name of the field of the STRING data type.

    BOOLEAN

    No

    false

    By default, the .keyword suffix is added to the names of the fields of the STRING data type to ensure compatibility. If the fields of the TEXT data type in the Elasticsearch table cannot be matched, you can set the value of the ignoreKeywordSuffix parameter to true.

    cacheEmpty

    Specifies whether to cache the empty results that are found in the physical dimension table.

    BOOLEAN

    No

    true

    The cacheEmpty parameter takes effect only when the cache parameter is set to LRU.

Data type mappings

Flink parses Elasticsearch data in the JSON format. For more information, see Data Type Mapping.

Sample code

  • The following sample code provides an example on how to create a source table.

    CREATE TEMPORARY TABLE elasticsearch_source (
      name STRING,
      location STRING,
      `value` FLOAT
    ) WITH (
      'connector' ='elasticsearch',
      'endPoint' = '<yourEndPoint>',
      'accessId' = '${secret_values.ak_id}',
      'accessKey' = '${secret_values.ak_secret}',
      'indexName' = '<yourIndexName>',
      'typeNames' = '<yourTypeName>'
    );
    
    CREATE TEMPORARY TABLE blackhole_sink (
      name STRING,
      location STRING,
      `value` FLOAT
    ) WITH (
      'connector' ='blackhole'
    );
    
    INSERT INTO blackhole_sink
    SELECT name, location, `value`
    FROM elasticsearch_source;
  • The following sample code provides an example on how to create a dimension table.

    CREATE TEMPORARY TABLE datagen_source (
      id STRING, 
      data STRING,
      proctime as PROCTIME()
    ) WITH (
      'connector' = 'datagen' 
    );
    
    CREATE TEMPORARY TABLE es_dim (
      id STRING,
      `value` FLOAT,
      PRIMARY KEY (id) NOT ENFORCED
    ) WITH (
      'connector' ='elasticsearch',
      'endPoint' = '<yourEndPoint>',
      'accessId' = '${secret_values.ak_id}',
      'accessKey' = '${secret_values.ak_secret}',
      'indexName' = '<yourIndexName>',
      'typeNames' = '<yourTypeName>'
    );
    
    CREATE TEMPORARY TABLE blackhole_sink (
      id STRING,
      data STRING,
      `value` FLOAT
    ) WITH (
      'connector' = 'blackhole' 
    );
    
    INSERT INTO blackhole_sink
    SELECT e.*, w.*
    FROM datagen_source AS e
    JOIN es_dim FOR SYSTEM_TIME AS OF e.proctime AS w
    ON e.id = w.id;
  • The following sample code provides an example on how to create a result table.

    CREATE TEMPORARY TABLE datagen_source (
      id STRING, 
      name STRING,
      uv BIGINT
    ) WITH (
      'connector' = 'datagen'
    );
    
    CREATE TEMPORARY TABLE es_sink (
      user_id STRING,
      user_name STRING,
      uv BIGINT,
      PRIMARY KEY (user_id) NOT ENFORCED -- The primary key is optional. If you specify a primary key, the primary key is used as the document ID. If you do not specify a primary key, a random value is used as the document ID. 
    ) WITH (
      'connector' = 'elasticsearch-6',
      'hosts' = '<yourHosts>',
      'index' = '<yourIndex>',
      'document-type' = '<yourElasticsearch.types>',
      'username' ='${secret_values.ak_id}',
      'password' ='${secret_values.ak_secret}'
    );
    
    INSERT INTO es_sink
    SELECT id, name, uv
    FROM datagen_source;