全部產品
Search
文件中心

Realtime Compute for Apache Flink:Elasticsearch

更新時間:Jan 06, 2026

本文為您介紹如何使用Elasticsearch連接器。

背景資訊

Elasticsearch相容開源Elasticsearch的功能,以及Security、Machine Learning、Graph、APM等商業功能,致力於資料分析、資料搜尋等情境服務。為您提供企業級許可權管控、安全監控警示、自動報表產生等情境服務。

Elasticsearch連接器支援的資訊如下:

類別

詳情

支援類型

源表、維表和結果表

運行模式

批模式和流模式

資料格式

JSON

特有監控指標

  • 源表

    • pendingRecords

    • numRecordsIn

    • numRecordsInPerSecond

    • numBytesIn

    • numBytesInPerSecond

  • 維表

  • 結果表 ( VVR 6.0.6及以上 )

    • numRecordsOut

    • numRecordsOutPerSecond

說明

指標含義詳情,請參見監控指標說明

API種類

Datastream和SQL

是否支援更新或刪除結果表資料

前提條件

使用限制

  • 源表和維表支援大於等於6.8.x,但小於8.x版本的Elasticsearch。

  • 結果表僅支援Elasticsearch 6.x、7.x和8.x版本。

  • 僅支援全量Elasticsearch源表,不支援增量Elasticsearch源表。

文法結構

  • 源表

    CREATE TABLE elasticsearch_source(
      name STRING,
      location STRING,
      value FLOAT
    ) WITH (
      'connector' ='elasticsearch',
      'endPoint' = '<yourEndPoint>',
      'indexName' = '<yourIndexName>'
    );
  • 維表

    CREATE TABLE es_dim(
      field1 STRING, --作為JOIN時的Key,必須為STRING類型。
      field2 FLOAT,
      field3 BIGINT,
      PRIMARY KEY (field1) NOT ENFORCED
    ) WITH (
      'connector' ='elasticsearch',
      'endPoint' = '<yourEndPoint>',
      'indexName' = '<yourIndexName>'
    );
    說明
    • 如果指定主鍵,則維表JOIN時的Key(欄位)有且只能有一個,且必須為Elasticsearch對應索引的文檔ID。

    • 如果不指定主鍵,則維表JOIN時的Key可以有一個或多個,需要為Elasticsearch對應索引的文檔中的欄位。

    • 對於String類型,為了保持相容性,預設會對錶中欄位名增加.keyword尾碼。如果因此無法匹配到Elasticsearch中的Text欄位,可以將配置項ignoreKeywordSuffix配置為true。

  • 結果表

    CREATE TABLE es_sink(
      user_id   STRING,
      user_name   STRING,
      uv BIGINT,
      pv BIGINT,
      PRIMARY KEY (user_id) NOT ENFORCED
    ) WITH (
      'connector' = 'elasticsearch-7', -- 如果是Elasticsearch 6.x版本,填寫elasticsearch-6
      'hosts' = '<yourHosts>',
      'index' = '<yourIndex>'
    );
    說明
    • Elasticsearch結果表會根據是否定義了主鍵,確定是在upsert模式或append模式下工作。

      • 如果定義了主鍵,則主鍵必須為文檔ID,Elasticsearch結果表將在upsert模式下工作,該模式可以處理包含UPDATE和DELETE的訊息。

      • 如果未定義主鍵,Elasticsearch將自動產生隨機的文檔ID,Elasticsearch結果表將在append模式工作,該模式只能消費INSERT訊息。

    • 某些類型(例如BYTES、ROW、ARRAY和MAP等)由於沒有對應的字串表示形式,所以不允許其作為主鍵欄位。

    • DDL中的欄位均對應Elasticsearch文檔中的欄位,不支援將文檔ID等Meta資訊寫入Elasticsearch結果表中,因為文檔ID等Meta資訊由Elasticsearch執行個體側維護。

WITH參數

源表

參數

說明

資料類型

是否必填

預設值

備忘

connector

源表類型。

String

固定值為elasticsearch。

endPoint

Server地址。

String

例如:http://127.0.0.1:XXXX

indexName

索引名稱。

String

無。

accessId

Elasticsearch執行個體的使用者名稱。

String

預設為空白,不進行許可權驗證。如果定義了accessId,則必須定義非空的accessKey

重要

為了避免您的使用者名稱和密碼資訊泄露,建議您使用變數的方式填寫,詳情請參見專案變數

accessKey

Elasticsearch執行個體的密碼。

String

typeNames

Type名稱。

String

_doc

Elasticsearch 7.0以上版本不建議設定該參數。

batchSize

每個scroll請求從Elasticsearch叢集擷取的最大文檔數。

Int

2000

無。

keepScrollAliveSecs

scroll上下文保留的最長時間。

Int

3600

單位為秒。

結果表

參數

說明

資料類型

是否必填

預設值

備忘

connector

結果表類型。

String

固定值為elasticsearch-6elasticsearch-7 elasticsearch-8

說明

僅Realtime Compute引擎VVR 8.0.5及以上版本支援配置為elasticsearch-8

hosts

Server地址。

String

例如:127.0.0.1:XXXX

index

索引名稱。

String

Elasticsearch結果表同時支援靜態索引和動態索引。在使用靜態和動態索引時,您需要注意以下幾點:

  • 如果使用靜態索引,則索引選項值應為純字串,例如myusers,所有記錄都將被寫入myusers索引。

  • 如果使用動態索引,可以使用{field_name}引用記錄中的欄位值以動態產生目標索引。您還可以使用{field_name|date_format_string}將TIMESTAMP、DATE和TIME類型的欄位值轉換為date_format_string指定的格式。date_format_string與Java的DateTimeFormatter相容。例如,如果設定為myusers-{log_ts|yyyy-MM-dd},則log_ts欄位值為2020-03-27 12:25:55的記錄將被寫入myusers-2020-03-27索引。

document-type

文件類型。

String

  • elasticsearch-6:必填

  • elasticsearch-7:不支援

當連接器類型為elasticsearch-6時,此處參數取值需要和Elasticsearch側的type參數取值保持一致。

username

使用者名稱。

String

預設為空白,不進行許可權驗證。如果定義了username,則必須定義非空的password。

重要

為了避免您的使用者名稱和密碼資訊泄露,建議您使用變數的方式填寫,詳情請參見專案變數

password

密碼。

String

document-id.key-delimiter

文檔ID的分隔字元。

String

_

在Elasticsearch結果表中,主鍵用於計算Elasticsearch的文檔ID。Elasticsearch結果表通過使用document-id.key-delimiter指定的鍵分隔字元,按照DDL中定義的順序串連所有主鍵欄位,從而為每一行產生一個文檔ID字串。

說明

文檔ID為最多512個位元組但不包含空格的字串。

failure-handler

Elasticsearch請求失敗時的故障處理策略。

String

fail

可選策略如下:

  • fail(預設值):如果請求失敗,則作業失敗。

  • ignore:忽略失敗並刪除請求。

  • retry-rejected:重新添加由於隊列容量滿而失敗的請求。

  • custom class name:用於使用ActionRequestFailureHandler子類進行故障處理。

sink.flush-on-checkpoint

是否在checkpoint時執行flush。

Boolean

true

  • true:預設值。

  • false:禁用該功能後,在Elasticsearch進行Checkpoint時,連接器將不等待確認所有pending請求是否已完成,故連接器不會為請求提供At-least-once保證。

sink.bulk-flush.backoff.strategy

如果由於臨時請求錯誤導致flush操作失敗,則設定sink.bulk-flush.backoff.strategy指定重試策略。

Enum

DISABLED

  • DISABLED(預設值):不執行重試,即第一次請求錯誤後失敗。

  • CONSTANT:常量回退,即每次回退等待時間相同。

  • EXPONENTIAL:指數回退,即每次回退等待時間指數遞增。

sink.bulk-flush.backoff.max-retries

最大回退重試次數。

Int

無。

sink.bulk-flush.backoff.delay

每次回退嘗試之間的延遲。

Duration

  • 對於CONSTANT回退策略:該值為每次重試之間的延遲。

  • 對於EXPONENTIAL回退策略:該值為初始基準延遲。

sink.bulk-flush.max-actions

每個批量請求的最大緩衝運算元。

Int

1000

0表示禁用該功能。

sink.bulk-flush.max-size

存放請求的緩衝區記憶體最大值。

String

2 MB

單位為MB,預設值為2 MB,0 MB表示禁用該功能。

sink.bulk-flush.interval

flush的間隔。

Duration

1s

單位為秒,預設值為1s,0s表示禁用該功能。

connection.path-prefix

要添加到每個REST通訊中的前置詞字元串。

String

無。

retry-on-conflict

更新操作中,允許因版本衝突異常而重試的最大次數。超過該次數後將拋出異常導致作業失敗。

Int

0

說明
  • 僅Realtime Compute引擎VVR 4.0.13及以上版本支援該參數。

  • 該參數僅在定義了主鍵的情況下生效。

routing-fields

指定一個或多個ES欄位名稱,用來將文檔路由到Elasticsearch的指定分區中。

String

多個欄位名以分號(;)進行分割。如果某個欄位資料為空白,則該欄位會被置為null。

說明

僅Realtime Compute引擎VVR 8.0.6及以上版本,且elasticsearch-7和elasticsearch-8支援該參數。

sink.delete-strategy

用來配置收到回撤(-D/-U)類型訊息時的行為

Enum

DELETE_ROW_ON_PK

可選行為如下:

  • DELETE_ROW_ON_PK(預設值):忽略-U類型的訊息,但是在收到-D類型的訊息時刪除主鍵對應的行(文檔)。

  • IGNORE_DELETE:忽略-U和-D 類型的訊息,Elasticsearch Sink不發生回撤。

  • NON_PK_FIELD_TO_NULL:忽略 -U類型的訊息,但是在收到-D類型的訊息時,會修改主鍵對應的行(文檔),主索引值保持不變,表 Schema中其他非主索引值均置為 NULL。主要用在多個Sink同時寫入同一張Elasticsearch表時部分更新的情境。

  • CHANGELOG_STANDARD:和 DELETE_ROW_ON_PK類似,唯一的區別是該模式收到-U類型的訊息時也會刪除主鍵對應的行(文檔)。

    說明

    僅Realtime Compute引擎VVR 8.0.8及以上版本支援該參數。

sink.ignore-null-when-update

更新資料時,如果傳入的資料欄位值為null,判斷是否更新對應欄位為null,或不更新該欄位。

BOOLEAN

false

參數取值如下:

  • true:不更新該欄位。僅Flink表設定主鍵且Elasticsearch資料格式為JSON時,才支援配置該參數為true。

  • false:更新該欄位為null。

說明

僅Realtime Compute引擎VVR 11.1及以上版本支援該參數。

connection.request-timeout

從連線管理員請求串連的逾時時間。

Duration

說明

僅Realtime Compute引擎VVR 11.5及以上版本支援該參數。

connect.timeout

建立串連的逾時時間

Duration

說明

僅Realtime Compute引擎VVR 11.5及以上版本支援該參數。

socket.timeout

等待資料的逾時時間,即兩個連續資料包之間的最大空閑時間間隔

Duration

說明

僅Realtime Compute引擎VVR 11.5及以上版本支援該參數。

sink.bulk-flush.update.doc_as_upsert

是否將文檔作為更新欄位

BOOLEAN

false

參數取值如下:

  • true:將Update Request的doc_as_upsert欄位設定為true

  • false:將Update Request的upsert欄位填充為document

根據https://github.com/elastic/elasticsearch/issues/105804,目前ES的預設資料管道不支援bulk update部分更新。如果使用者希望使用資料管道,可以將此配置項設定為true。

說明

僅Realtime Compute引擎VVR 11.5及以上版本支援該參數。

維表

參數

說明

資料類型

是否必填

預設值

備忘

connector

維表類型。

String

固定值為elasticsearch。

endPoint

Server地址。

String

例如:http://127.0.0.1:XXXX

indexName

索引名稱。

String

無。

accessId

Elasticsearch執行個體的使用者名稱。

String

預設為空白,不進行許可權驗證。如果定義了accessId,則必須定義非空的accessKey

重要

為了避免您的使用者名稱和密碼資訊泄露,建議您使用變數的方式填寫,詳情請參見專案變數

accessKey

Elasticsearch執行個體的密碼。

String

typeNames

Type名稱。

String

_doc

Elasticsearch 7.0以上版本不建議設定該參數。

maxJoinRows

單行資料Join的最多行數。

Integer

1024

無。

cache

緩衝策略。

String

None

支援以下三種緩衝策略:

  • ALL:緩衝維表裡的所有資料。在Job運行前,系統會將維表中的所有資料載入到Cache中,之後所有的維表尋找資料都會通過Cache進行。如果在Cache中無法找到資料,則KEY不存在,並在Cache到期後重新載入一遍全量Cache。

  • LRU:緩衝維表裡的部分資料。源表的每條資料都會觸發系統先在Cache中尋找資料。如果沒有找到,則去物理維表中尋找。

  • None:無緩衝。

cacheSize

緩衝大小,即緩衝多少行資料。

Long

100000

僅當cache選擇LRU緩衝策略時,cacheSize參數生效。

cacheTTLMs

緩衝失效的逾時時間。

Long

Long.MAX_VALUE

單位為毫秒。cacheTTLMs配置和cache配置有關:

  • 如果cache配置為LRU,則cacheTTLMs為緩衝失效的逾時時間,預設不到期。

  • 如果cache配置為ALL,則cacheTTLMs為設定緩衝重新載入的間隔時間,預設不重新載入。

ignoreKeywordSuffix

是否忽略自動為String欄位添加的.keyword尾碼。

Boolean

false

為了保證相容性,Flink將Elasticsearch中的Text類型轉換為String,並預設在String類型欄位名後增加.keyword尾碼。

參數取值如下:

  • true:忽略。

    如果因此無法匹配到Elasticsearch中的Text類型欄位,需要將該參數配置為true。

  • false:不忽略。

cacheEmpty

是否緩衝物理維表中尋找結果為空白的結果。

Boolean

true

僅當cache選擇LRU緩衝策略時,cacheEmpty參數生效。

queryMaxDocs

非主鍵維表的輸入端每條資料到來後,查詢Elasticsearch Server時返回的最大文檔條數。

Integer

10000

預設值10000是Elasticsearch Server返迴文檔條數的最大限制,該配置項的取值不能超過這個上限。

說明
  • 僅Realtime Compute引擎VVR 8.0.8及以上版本支援該參數。

  • 該參數僅對非主鍵維表生效,因為主鍵表中資料是唯一的。

  • 為了查詢的正確性,預設值給的比較大。但是該值會增大查詢Elasticsearch時的記憶體佔用,確實遇到記憶體問題後,可以適當降低該值來最佳化記憶體使用量。

類型映射

Flink以JSON來解析Elasticsearch資料,詳情請參見資料類型映射關係

使用樣本

  • 源表示例

    CREATE TEMPORARY TABLE elasticsearch_source (
      name STRING,
      location STRING,
      `value` FLOAT
    ) WITH (
      'connector' ='elasticsearch',
      'endPoint' = '<yourEndPoint>',
      'accessId' = '${secret_values.ak_id}',
      'accessKey' = '${secret_values.ak_secret}',
      'indexName' = '<yourIndexName>',
      'typeNames' = '<yourTypeName>'
    );
    
    CREATE TEMPORARY TABLE blackhole_sink (
      name STRING,
      location STRING,
      `value` FLOAT
    ) WITH (
      'connector' ='blackhole'
    );
    
    INSERT INTO blackhole_sink
    SELECT name, location, `value`
    FROM elasticsearch_source;
  • 維表示例

    CREATE TEMPORARY TABLE datagen_source (
      id STRING, 
      data STRING,
      proctime as PROCTIME()
    ) WITH (
      'connector' = 'datagen' 
    );
    
    CREATE TEMPORARY TABLE es_dim (
      id STRING,
      `value` FLOAT,
      PRIMARY KEY (id) NOT ENFORCED
    ) WITH (
      'connector' ='elasticsearch',
      'endPoint' = '<yourEndPoint>',
      'accessId' = '${secret_values.ak_id}',
      'accessKey' = '${secret_values.ak_secret}',
      'indexName' = '<yourIndexName>',
      'typeNames' = '<yourTypeName>'
    );
    
    CREATE TEMPORARY TABLE blackhole_sink (
      id STRING,
      data STRING,
      `value` FLOAT
    ) WITH (
      'connector' = 'blackhole' 
    );
    
    INSERT INTO blackhole_sink
    SELECT e.*, w.*
    FROM datagen_source AS e
    JOIN es_dim FOR SYSTEM_TIME AS OF e.proctime AS w
    ON e.id = w.id;
  • 結果表示例1

    CREATE TEMPORARY TABLE datagen_source (
      id STRING, 
      name STRING,
      uv BIGINT
    ) WITH (
      'connector' = 'datagen'
    );
    
    CREATE TEMPORARY TABLE es_sink (
      user_id STRING,
      user_name STRING,
      uv BIGINT,
      PRIMARY KEY (user_id) NOT ENFORCED -- 主鍵可選,如果定義了主鍵,則作為文檔ID,否則文檔ID將為隨機值。
    ) WITH (
      'connector' = 'elasticsearch-6',
      'hosts' = '<yourHosts>',
      'index' = '<yourIndex>',
      'document-type' = '<yourElasticsearch.types>',
      'username' ='${secret_values.ak_id}',
      'password' ='${secret_values.ak_secret}'
    );
    
    INSERT INTO es_sink
    SELECT id, name, uv
    FROM datagen_source;
  • 結果表示例2

    CREATE TEMPORARY TABLE datagen_source(  
      id STRING,
        details ROW<  
            name STRING,  
            ages ARRAY<INT>,  
            attributes MAP<STRING, STRING>  
        >
    ) WITH (  
        'connector' = 'datagen'
    );
    
    CREATE TEMPORARY TABLE es_sink (
      id STRING,
        details ROW<  
            name STRING,  
            ages ARRAY<INT>,  
            attributes MAP<STRING, STRING>  
        >, 
      PRIMARY KEY (id) NOT ENFORCED  -- 主鍵可選,如果定義了主鍵,則作為文檔ID,否則文檔ID將為隨機值。
    ) WITH (
      'connector' = 'elasticsearch-6',
      'hosts' = '<yourHosts>',
      'index' = '<yourIndex>',
      'document-type' = '<yourElasticsearch.types>',
      'username' ='${secret_values.ak_id}',
      'password' ='${secret_values.ak_secret}'
    );
    
    INSERT INTO es_sink
    SELECT id, details
    FROM datagen_source;