This topic describes how to create an Elasticsearch result table in Realtime Compute for Apache Flink. This topic also describes the parameters in the WITH clause used when you create an Elasticsearch result table.

Notice This topic applies only to Blink 3.2.2 and later.

DDL syntax

In Realtime Compute for Apache Flink, you can use Elasticsearch to store output data. The following sample code shows how to create an Elasticsearch result table.
 CREATE TABLE es_stream_sink(
  field1 LONG,
  field2 VARBINARY,
  field3 VARCHAR,
  PRIMARY KEY(field1)
)WITH(
  type ='elasticsearch',
  endPoint = 'http://es-cn-mp****.public.elasticsearch.aliyuncs.com:****',
  accessId = '<yourUsername>',
  accessKey = '<yourPassword>',
  index = '<yourIndex>',
  typeName = '<yourTypeName>'
);
Note
  • Elasticsearch supports data updates based on the PRIMARY KEY field. You can use only one field as the PRIMARY KEY field.
  • If you specify the PRIMARY KEY field, values in the field are used as document IDs.
  • If you do not specify the PRIMARY KEY field, document IDs are randomly generated. For more information, see Index API.
  • In full update mode, later documents overwrite earlier documents.
  • In incremental update mode, the system updates the fields based on the field values you entered.
  • By default, all updates use the UPSERT syntax, which means to insert or update data.

Parameters in the WITH clause (general configurations)

Parameter Description Required Default value
type The type of the connector. Yes elasticsearch
endPoint The endpoint of an Elasticsearch cluster, such as http://127.0.0.1:9211. Yes N/A
accessId The AccessKey ID that is used to access the Elasticsearch cluster.
Note If you use the Kibana plug-in to access the Elasticsearch cluster, enter the Kibana logon ID.
Yes N/A
accessKey The AccessKey secret that is used to access the Elasticsearch cluster.
Note If you use the Kibana plug-in to access the Elasticsearch cluster, enter the Kibana logon password.
Yes N/A
index The name of the index. Yes N/A
typeName The type of a document. Yes _doc
bufferSize The maximum number of data records that can be stored in the buffer before data is deduplicated. No 1000
maxRetryTimes The maximum number of retries for writing data to a table. No 30
timeout The read timeout period. Unit: milliseconds. No 600000
discovery Specifies whether node discovery is enabled. If this feature is enabled, the client refreshes the server list every 5 minutes. No false
compression Specifies whether to compress request bodies in the gzip format. No true
multiThread Specifies whether to enable multithreading for JestClient. No true
ignoreWriteError Specifies whether to ignore write exceptions. No false
settings The settings used to create indexes. No N/A
updateMode The update mode used after the primary key is specified.
  • full: Full data is overwritten.
  • inc: Incremental data is added.
No full

Parameters in the WITH clause (dynamic indexing)

Parameter Description Required Remarks
dynamicIndex Specifies whether to enable dynamic indexing. No Valid values:
  • true: Dynamic indexing is enabled.
  • false: Dynamic indexing is disabled. This is the default value.
indexField The field name of the index. This parameter is required if the dynamicIndex parameter is set to true. Only the TIMESTAMP (in seconds), DATE, and LONG data types are supported.
indexInterval The interval at which an index is changed. This parameter is required if the dynamicIndex parameter is set to true. Valid values:
  • d: one day. This is the default value.
  • m: one month.
  • w: one week.
mapping The mappings between the types and formats of fields in a document configured when dynamic indexing is enabled. The following example shows how to set the mapping between the type and format of the sendTime field:
{
 "properties": {    
 "sendTime": {     
 "type":   "date",     
 "format": "yyyy-MM-dd HH:mm:ss"    
    }
  }
}
No This parameter is empty by default.
Note
  • Blink 3.7.0 and later support this parameter.
  • Elasticsearch V7.0 does not support this parameter.
Note
  • Only Blink 2.2.7 and later support the dynamic indexing feature.
  • After dynamic indexing is enabled, the index name in the basic configuration is used as the unified alias for indexes created subsequently. An alias can correspond to multiple indexes.
  • Actual index names that correspond to different values of indexInterval:
    • d -> Alias "yyyyMMdd"
    • m -> Alias "yyyyMM"
    • w -> Alias "yyyyMMW"
  • You can use Index API to change a single actual index, but the alias supports only the GET method. For more information about how to change the alias, see Index Aliases.

Sample code

The following sample code shows how to create a dynamic index.
CREATE TABLE es_stream_sink(
  field1 LONG,
  field2 VARBINARY,
  field3 TIMESTAMP,
  PRIMARY KEY(field1)
)WITH(
  type ='elasticsearch',
  endPoint = 'http://es-cn-mp****.public.elasticsearch.aliyuncs.com:****',
  accessId = '<yourAccessId>',
  accessKey = '<yourAccessSecret>',
  index = '<yourIndex>',
  typeName = '<yourTypeName>',
  dynamicIndex = 'true',
  indexField = 'field3',
  indexInterval = 'd'
);