本文為您介紹如何使用Elasticsearch連接器。
背景資訊
Elasticsearch相容開源Elasticsearch的功能,以及Security、Machine Learning、Graph、APM等商業功能,致力於資料分析、資料搜尋等情境服務。為您提供企業級許可權管控、安全監控警示、自動報表產生等情境服務。
Elasticsearch連接器支援的資訊如下:
類別 | 詳情 |
支援類型 | 源表、維表和結果表 |
運行模式 | 批模式和流模式 |
資料格式 | JSON |
特有監控指標 |
說明 指標含義詳情,請參見監控指標說明。 |
API種類 | Datastream和SQL |
是否支援更新或刪除結果表資料 | 是 |
前提條件
已建立Elasticsearch索引,詳情請參見建立樣本。
已配置Elasticsearch公網或私網訪問白名單,詳情請參見配置執行個體公網或私網訪問白名單。
使用限制
源表和維表支援大於等於6.8.x,但小於8.x版本的Elasticsearch。
結果表僅支援Elasticsearch 6.x、7.x和8.x版本。
僅支援全量Elasticsearch源表,不支援增量Elasticsearch源表。
文法結構
源表
CREATE TABLE elasticsearch_source( name STRING, location STRING, value FLOAT ) WITH ( 'connector' ='elasticsearch', 'endPoint' = '<yourEndPoint>', 'indexName' = '<yourIndexName>' );維表
CREATE TABLE es_dim( field1 STRING, --作為JOIN時的Key,必須為STRING類型。 field2 FLOAT, field3 BIGINT, PRIMARY KEY (field1) NOT ENFORCED ) WITH ( 'connector' ='elasticsearch', 'endPoint' = '<yourEndPoint>', 'indexName' = '<yourIndexName>' );說明如果指定主鍵,則維表JOIN時的Key(欄位)有且只能有一個,且必須為Elasticsearch對應索引的文檔ID。
如果不指定主鍵,則維表JOIN時的Key可以有一個或多個,需要為Elasticsearch對應索引的文檔中的欄位。
對於String類型,為了保持相容性,預設會對錶中欄位名增加.keyword尾碼。如果因此無法匹配到Elasticsearch中的Text欄位,可以將配置項ignoreKeywordSuffix配置為true。
結果表
CREATE TABLE es_sink( user_id STRING, user_name STRING, uv BIGINT, pv BIGINT, PRIMARY KEY (user_id) NOT ENFORCED ) WITH ( 'connector' = 'elasticsearch-7', -- 如果是Elasticsearch 6.x版本,填寫elasticsearch-6 'hosts' = '<yourHosts>', 'index' = '<yourIndex>' );說明Elasticsearch結果表會根據是否定義了主鍵,確定是在upsert模式或append模式下工作。
如果定義了主鍵,則主鍵必須為文檔ID,Elasticsearch結果表將在upsert模式下工作,該模式可以處理包含UPDATE和DELETE的訊息。
如果未定義主鍵,Elasticsearch將自動產生隨機的文檔ID,Elasticsearch結果表將在append模式工作,該模式只能消費INSERT訊息。
某些類型(例如BYTES、ROW、ARRAY和MAP等)由於沒有對應的字串表示形式,所以不允許其作為主鍵欄位。
DDL中的欄位均對應Elasticsearch文檔中的欄位,不支援將文檔ID等Meta資訊寫入Elasticsearch結果表中,因為文檔ID等Meta資訊由Elasticsearch執行個體側維護。
WITH參數
源表
參數 | 說明 | 資料類型 | 是否必填 | 預設值 | 備忘 |
connector | 源表類型。 | String | 是 | 無 | 固定值為elasticsearch。 |
endPoint | Server地址。 | String | 是 | 無 | 例如: |
indexName | 索引名稱。 | String | 是 | 無 | 無。 |
accessId | Elasticsearch執行個體的使用者名稱。 | String | 否 | 無 | 預設為空白,不進行許可權驗證。如果定義了accessId,則必須定義非空的accessKey。 重要 為了避免您的使用者名稱和密碼資訊泄露,建議您使用變數的方式填寫,詳情請參見專案變數。 |
accessKey | Elasticsearch執行個體的密碼。 | String | 否 | 無 | |
typeNames | Type名稱。 | String | 否 | _doc | Elasticsearch 7.0以上版本不建議設定該參數。 |
batchSize | 每個scroll請求從Elasticsearch叢集擷取的最大文檔數。 | Int | 否 | 2000 | 無。 |
keepScrollAliveSecs | scroll上下文保留的最長時間。 | Int | 否 | 3600 | 單位為秒。 |
結果表
參數 | 說明 | 資料類型 | 是否必填 | 預設值 | 備忘 |
connector | 結果表類型。 | String | 是 | 無 | 固定值為 說明 僅Realtime Compute引擎VVR 8.0.5及以上版本支援配置為 |
hosts | Server地址。 | String | 是 | 無 | 例如: |
index | 索引名稱。 | String | 是 | 無 | Elasticsearch結果表同時支援靜態索引和動態索引。在使用靜態和動態索引時,您需要注意以下幾點:
|
document-type | 文件類型。 | String |
| 無 | 當連接器類型為 |
username | 使用者名稱。 | String | 否 | 空 | 預設為空白,不進行許可權驗證。如果定義了username,則必須定義非空的password。 重要 為了避免您的使用者名稱和密碼資訊泄露,建議您使用變數的方式填寫,詳情請參見專案變數。 |
password | 密碼。 | String | 否 | 空 | |
document-id.key-delimiter | 文檔ID的分隔字元。 | String | 否 | _ | 在Elasticsearch結果表中,主鍵用於計算Elasticsearch的文檔ID。Elasticsearch結果表通過使用document-id.key-delimiter指定的鍵分隔字元,按照DDL中定義的順序串連所有主鍵欄位,從而為每一行產生一個文檔ID字串。 說明 文檔ID為最多512個位元組但不包含空格的字串。 |
failure-handler | Elasticsearch請求失敗時的故障處理策略。 | String | 否 | fail | 可選策略如下:
|
sink.flush-on-checkpoint | 是否在checkpoint時執行flush。 | Boolean | 否 | true |
|
sink.bulk-flush.backoff.strategy | 如果由於臨時請求錯誤導致flush操作失敗,則設定sink.bulk-flush.backoff.strategy指定重試策略。 | Enum | 否 | DISABLED |
|
sink.bulk-flush.backoff.max-retries | 最大回退重試次數。 | Int | 否 | 無 | 無。 |
sink.bulk-flush.backoff.delay | 每次回退嘗試之間的延遲。 | Duration | 否 | 無 |
|
sink.bulk-flush.max-actions | 每個批量請求的最大緩衝運算元。 | Int | 否 | 1000 | 0表示禁用該功能。 |
sink.bulk-flush.max-size | 存放請求的緩衝區記憶體最大值。 | String | 否 | 2 MB | 單位為MB,預設值為2 MB,0 MB表示禁用該功能。 |
sink.bulk-flush.interval | flush的間隔。 | Duration | 否 | 1s | 單位為秒,預設值為1s,0s表示禁用該功能。 |
connection.path-prefix | 要添加到每個REST通訊中的前置詞字元串。 | String | 否 | 空 | 無。 |
retry-on-conflict | 更新操作中,允許因版本衝突異常而重試的最大次數。超過該次數後將拋出異常導致作業失敗。 | Int | 否 | 0 | 說明
|
routing-fields | 指定一個或多個ES欄位名稱,用來將文檔路由到Elasticsearch的指定分區中。 | String | 否 | 無 | 多個欄位名以分號(;)進行分割。如果某個欄位資料為空白,則該欄位會被置為null。 說明 僅Realtime Compute引擎VVR 8.0.6及以上版本,且elasticsearch-7和elasticsearch-8支援該參數。 |
sink.delete-strategy | 用來配置收到回撤(-D/-U)類型訊息時的行為 | Enum | 否 | DELETE_ROW_ON_PK | 可選行為如下:
|
sink.ignore-null-when-update | 更新資料時,如果傳入的資料欄位值為null,判斷是否更新對應欄位為null,或不更新該欄位。 | BOOLEAN | 否 | false | 參數取值如下:
說明 僅Realtime Compute引擎VVR 11.1及以上版本支援該參數。 |
connection.request-timeout | 從連線管理員請求串連的逾時時間。 | Duration | 否 | 無 | 說明 僅Realtime Compute引擎VVR 11.5及以上版本支援該參數。 |
connect.timeout | 建立串連的逾時時間 | Duration | 否 | 無 | 說明 僅Realtime Compute引擎VVR 11.5及以上版本支援該參數。 |
socket.timeout | 等待資料的逾時時間,即兩個連續資料包之間的最大空閑時間間隔 | Duration | 否 | 無 | 說明 僅Realtime Compute引擎VVR 11.5及以上版本支援該參數。 |
sink.bulk-flush.update.doc_as_upsert | 是否將文檔作為更新欄位 | BOOLEAN | 否 | false | 參數取值如下:
根據https://github.com/elastic/elasticsearch/issues/105804,目前ES的預設資料管道不支援bulk update部分更新。如果使用者希望使用資料管道,可以將此配置項設定為true。 說明 僅Realtime Compute引擎VVR 11.5及以上版本支援該參數。 |
維表
參數 | 說明 | 資料類型 | 是否必填 | 預設值 | 備忘 |
connector | 維表類型。 | String | 是 | 無 | 固定值為elasticsearch。 |
endPoint | Server地址。 | String | 是 | 無 | 例如: |
indexName | 索引名稱。 | String | 是 | 無 | 無。 |
accessId | Elasticsearch執行個體的使用者名稱。 | String | 否 | 無 | 預設為空白,不進行許可權驗證。如果定義了accessId,則必須定義非空的accessKey。 重要 為了避免您的使用者名稱和密碼資訊泄露,建議您使用變數的方式填寫,詳情請參見專案變數。 |
accessKey | Elasticsearch執行個體的密碼。 | String | 否 | 無 | |
typeNames | Type名稱。 | String | 否 | _doc | Elasticsearch 7.0以上版本不建議設定該參數。 |
maxJoinRows | 單行資料Join的最多行數。 | Integer | 否 | 1024 | 無。 |
cache | 緩衝策略。 | String | 否 | None | 支援以下三種緩衝策略:
|
cacheSize | 緩衝大小,即緩衝多少行資料。 | Long | 否 | 100000 | 僅當cache選擇LRU緩衝策略時,cacheSize參數生效。 |
cacheTTLMs | 緩衝失效的逾時時間。 | Long | 否 | Long.MAX_VALUE | 單位為毫秒。cacheTTLMs配置和cache配置有關:
|
ignoreKeywordSuffix | 是否忽略自動為String欄位添加的.keyword尾碼。 | Boolean | 否 | false | 為了保證相容性,Flink將Elasticsearch中的Text類型轉換為String,並預設在String類型欄位名後增加.keyword尾碼。 參數取值如下:
|
cacheEmpty | 是否緩衝物理維表中尋找結果為空白的結果。 | Boolean | 否 | true | 僅當cache選擇LRU緩衝策略時,cacheEmpty參數生效。 |
queryMaxDocs | 非主鍵維表的輸入端每條資料到來後,查詢Elasticsearch Server時返回的最大文檔條數。 | Integer | 否 | 10000 | 預設值10000是Elasticsearch Server返迴文檔條數的最大限制,該配置項的取值不能超過這個上限。 說明
|
類型映射
Flink以JSON來解析Elasticsearch資料,詳情請參見資料類型映射關係。
使用樣本
源表示例
CREATE TEMPORARY TABLE elasticsearch_source ( name STRING, location STRING, `value` FLOAT ) WITH ( 'connector' ='elasticsearch', 'endPoint' = '<yourEndPoint>', 'accessId' = '${secret_values.ak_id}', 'accessKey' = '${secret_values.ak_secret}', 'indexName' = '<yourIndexName>', 'typeNames' = '<yourTypeName>' ); CREATE TEMPORARY TABLE blackhole_sink ( name STRING, location STRING, `value` FLOAT ) WITH ( 'connector' ='blackhole' ); INSERT INTO blackhole_sink SELECT name, location, `value` FROM elasticsearch_source;維表示例
CREATE TEMPORARY TABLE datagen_source ( id STRING, data STRING, proctime as PROCTIME() ) WITH ( 'connector' = 'datagen' ); CREATE TEMPORARY TABLE es_dim ( id STRING, `value` FLOAT, PRIMARY KEY (id) NOT ENFORCED ) WITH ( 'connector' ='elasticsearch', 'endPoint' = '<yourEndPoint>', 'accessId' = '${secret_values.ak_id}', 'accessKey' = '${secret_values.ak_secret}', 'indexName' = '<yourIndexName>', 'typeNames' = '<yourTypeName>' ); CREATE TEMPORARY TABLE blackhole_sink ( id STRING, data STRING, `value` FLOAT ) WITH ( 'connector' = 'blackhole' ); INSERT INTO blackhole_sink SELECT e.*, w.* FROM datagen_source AS e JOIN es_dim FOR SYSTEM_TIME AS OF e.proctime AS w ON e.id = w.id;結果表示例1
CREATE TEMPORARY TABLE datagen_source ( id STRING, name STRING, uv BIGINT ) WITH ( 'connector' = 'datagen' ); CREATE TEMPORARY TABLE es_sink ( user_id STRING, user_name STRING, uv BIGINT, PRIMARY KEY (user_id) NOT ENFORCED -- 主鍵可選,如果定義了主鍵,則作為文檔ID,否則文檔ID將為隨機值。 ) WITH ( 'connector' = 'elasticsearch-6', 'hosts' = '<yourHosts>', 'index' = '<yourIndex>', 'document-type' = '<yourElasticsearch.types>', 'username' ='${secret_values.ak_id}', 'password' ='${secret_values.ak_secret}' ); INSERT INTO es_sink SELECT id, name, uv FROM datagen_source;結果表示例2
CREATE TEMPORARY TABLE datagen_source( id STRING, details ROW< name STRING, ages ARRAY<INT>, attributes MAP<STRING, STRING> > ) WITH ( 'connector' = 'datagen' ); CREATE TEMPORARY TABLE es_sink ( id STRING, details ROW< name STRING, ages ARRAY<INT>, attributes MAP<STRING, STRING> >, PRIMARY KEY (id) NOT ENFORCED -- 主鍵可選,如果定義了主鍵,則作為文檔ID,否則文檔ID將為隨機值。 ) WITH ( 'connector' = 'elasticsearch-6', 'hosts' = '<yourHosts>', 'index' = '<yourIndex>', 'document-type' = '<yourElasticsearch.types>', 'username' ='${secret_values.ak_id}', 'password' ='${secret_values.ak_secret}' ); INSERT INTO es_sink SELECT id, details FROM datagen_source;