This topic describes how to create an Elasticsearch result table in Realtime Compute for Apache Flink. This topic also describes the parameters in the WITH clause used when you create an Elasticsearch result table.
Notice This topic applies only to Blink 3.2.2 and later.
DDL syntax
In Realtime Compute for Apache Flink, you can use Elasticsearch to store output data.
The following sample code shows how to create an Elasticsearch result table.
CREATE TABLE es_stream_sink(
field1 LONG,
field2 VARBINARY,
field3 VARCHAR,
PRIMARY KEY(field1)
)WITH(
type ='elasticsearch',
endPoint = 'http://es-cn-mp****.public.elasticsearch.aliyuncs.com:****',
accessId = '<yourUsername>',
accessKey = '<yourPassword>',
index = '<yourIndex>',
typeName = '<yourTypeName>'
);
Note
- Elasticsearch supports data updates based on the PRIMARY KEY field. You can use only one field as the PRIMARY KEY field.
- If you specify the PRIMARY KEY field, values in the field are used as document IDs.
- If you do not specify the PRIMARY KEY field, document IDs are randomly generated. For more information, see Index API.
- In full update mode, later documents overwrite earlier documents.
- In incremental update mode, the system updates the fields based on the field values you entered.
- By default, all updates use the UPSERT syntax, which means to insert or update data.
Parameters in the WITH clause (general configurations)
Parameter | Description | Required | Default value |
---|---|---|---|
type | The type of the connector. | Yes | elasticsearch |
endPoint | The endpoint of an Elasticsearch cluster, such as http://127.0.0.1:9211. | Yes | N/A |
accessId | The AccessKey ID that is used to access the Elasticsearch cluster.
Note If you use the Kibana plug-in to access the Elasticsearch cluster, enter the Kibana
logon ID.
|
Yes | N/A |
accessKey | The AccessKey secret that is used to access the Elasticsearch cluster.
Note If you use the Kibana plug-in to access the Elasticsearch cluster, enter the Kibana
logon password.
|
Yes | N/A |
index | The name of the index. | Yes | N/A |
typeName | The type of a document. | Yes | _doc |
bufferSize | The maximum number of data records that can be stored in the buffer before data is deduplicated. | No | 1000 |
maxRetryTimes | The maximum number of retries for writing data to a table. | No | 30 |
timeout | The read timeout period. Unit: milliseconds. | No | 600000 |
discovery | Specifies whether node discovery is enabled. If this feature is enabled, the client refreshes the server list every 5 minutes. | No | false |
compression | Specifies whether to compress request bodies in the gzip format. | No | true |
multiThread | Specifies whether to enable multithreading for JestClient. | No | true |
ignoreWriteError | Specifies whether to ignore write exceptions. | No | false |
settings | The settings used to create indexes. | No | N/A |
updateMode | The update mode used after the primary key is specified.
|
No | full |
Parameters in the WITH clause (dynamic indexing)
Parameter | Description | Required | Remarks |
---|---|---|---|
dynamicIndex | Specifies whether to enable dynamic indexing. | No | Valid values:
|
indexField | The field name of the index. | This parameter is required if the dynamicIndex parameter is set to true. | Only the TIMESTAMP (in seconds), DATE, and LONG data types are supported. |
indexInterval | The interval at which an index is changed. | This parameter is required if the dynamicIndex parameter is set to true. | Valid values:
|
mapping | The mappings between the types and formats of fields in a document configured when
dynamic indexing is enabled. The following example shows how to set the mapping between
the type and format of the sendTime field:
|
No | This parameter is empty by default.
Note
|
Note
- Only Blink 2.2.7 and later support the dynamic indexing feature.
- After dynamic indexing is enabled, the
index
name in the basic configuration is used as the unified alias for indexes created subsequently. An alias can correspond to multiple indexes. - Actual index names that correspond to different values of
indexInterval
:- d -> Alias "yyyyMMdd"
- m -> Alias "yyyyMM"
- w -> Alias "yyyyMMW"
- You can use Index API to change a single actual index, but the alias supports only
the
GET
method. For more information about how to change the alias, see Index Aliases.
Sample code
The following sample code shows how to create a dynamic index.
CREATE TABLE es_stream_sink(
field1 LONG,
field2 VARBINARY,
field3 TIMESTAMP,
PRIMARY KEY(field1)
)WITH(
type ='elasticsearch',
endPoint = 'http://es-cn-mp****.public.elasticsearch.aliyuncs.com:****',
accessId = '<yourAccessId>',
accessKey = '<yourAccessSecret>',
index = '<yourIndex>',
typeName = '<yourTypeName>',
dynamicIndex = 'true',
indexField = 'field3',
indexInterval = 'd'
);