All Products
Search
Document Center

DataHub:Synchronize data to Elasticsearch

Last Updated:Mar 31, 2023

Preparations

1. Create an index in Elasticsearch

DataHub allows you to synchronize data to indexes in clusters of Elasticsearch V5.X, V6.X, and V7.X.

DataHub allows you to synchronize data from only the topics of the TUPLE type to Elasticsearch. Before you start a DataConnector, make sure that an index is created in Elasticsearch, or indexes can be automatically created. Otherwise, the DataConnector will fail.

2.Prepare an account for data synchronization that is granted required permissions

When you create a DataConnector to synchronize data to Elasticsearch, you must manually enter information related to Elasticsearch such as the endpoint, index name, account, and password. Make sure that the entered account information is authentic and valid. Otherwise, the DataConnector fails to be created.

Create a DataConnector

  1. In the left-side navigation pane of the DataHub console, click Project Manager. On the Project List page, find a project and click View in the Actions column. On the details page of the project, find a topic and click View in the Actions column.

  2. On the details page of the topic, click Connector in the upper-right corner.

  3. In the Create Connector panel, click ElasticSearch, as shown in the following figure.

es_02

Parameter description

1. Endpoint

The endpoint of Elasticsearch. You must enter an internal endpoint and an internal port in the format of Internal endpoint:Internal port.

For example, if the internal endpoint is es-cn-xxx.elasticsearch.aliyuncs.com and the internal port is 9200, enter es-cn-xxx.elasticsearch.aliyuncs.com:9200.

2. Index

You can specify indexes in two ways: static indexes and dynamic indexes.

Static indexes

If you create an index in advance or allow Elasticsearch to automatically create an index, all data is written to the index.

Dynamic indexes

You must allow Elasticsearch to automatically create indexes. Otherwise, data fails to be written. You can allow Elasticsearch to generate indexes based on a time period or a field. Indexes can be generated based on only one field.

Supported time formats

Year

Month

Day

Week

%Y

%m

%d

%U

Example 1: Create an index every morning. Set the Index parameter to test_${%Y-%m-%d}. If the current date is March 31, 2021, the index to which data is written is test_2021-03-31.

Example 2: Create indexes based on a field. To create indexes based on the col1 field, set the Index parameter to test_${col1}. If a record whose value in the col1 field is AAA and a record whose value in the col1 field is BBB exist, the two records are written to different indexes. The former is written to test_AAA and the latter is written to test_BBB.

When the number of indexes in an Elasticsearch cluster increases, the data write slows down. Excessive indexes may cause timeout when DataHub writes data to Elasticsearch. Therefore, when you use dynamic indexes, you must prevent creating excessive indexes.

3. User and Password

The username and password that are used to access Elasticsearch.

4. Type Fields

The types that are generated during data synchronization from DataHub to Elasticsearch vary with the versions of Elasticsearch clusters. In a cluster of Elasticsearch V5.X, you can create multiple types in one index. However, in a cluster of Elasticsearch V6.X, you can create only one type in one index. Therefore, the way in which DataHub synchronizes data to Elasticsearch varies. The Type Fields parameter is required.

  • When you create a DataConnector to synchronize data from DataHub to a cluster of Elasticsearch V5.X, the value of the field that you select is used as the type of a record. If multiple fields are selected, the values of the fields that are separated by vertical bars (|) are used together as the type of a record. The fields that you select from the Type Fields drop-down list cannot be null.

  • When you create a DataConnector to synchronize data from DataHub to a cluster of Elasticsearch V6.X, the name of the field that you select is used as the type of a record. If multiple fields are selected, the names of the fields that are separated by vertical bars (|) are used together as the type of a record. The cluster of Elasticsearch V6.X supports all field names.

Example:

DataHub schema : f1 string, f2 string, f3 string, f4 string
Data: ["test1","test2","test3",null]

Type field

Type in Elasticsearch V5.X

Type in a Elasticsearch V6.X

f1

test1

f1

f1,f3

test1|test3

f1|f3

ff

The type failed to be created.

ff

f1,ff

The type failed to be created.

f1|ff

f4

The type is created but the data failed to be synchronized because dirty data occurs.

The type is created and the data is synchronized.

Note:

  • The type name cannot be customized when you create a DataConnector to synchronize data to a cluster of Elasticsearch V6.X. If you want to customize the type name, you can use DataHub SDKs to create a DataConnector.

  • Default types are used in a cluster of Elasticsearch V7.X. Therefore, you do not need to set the Type Fields parameter when you create a DataConnector to synchronize data to a cluster of Elasticsearch V7.X.

5. ID Fields

You can generate IDs for the data written to Elasticsearch based on the data written to DataHub. You can also select no ID fields. In this case, Elasticsearch generates a unique ID for each record. When you create a DataConnector to synchronize data from DataHub to Elasticsearch, the value of the field that you select is used as the ID of a record. If multiple fields are selected, the values of the fields that are separated by vertical bars (|) are used together as the ID of a record. The fields that you select from the ID Fields drop-down list cannot be null.

Example:

DataHub schema : f1 string, f2 string, f3 string, f4 string
Data: ["test1","test2","test3",null]

ID field

Data ID

Elasticsearch automatically generates a unique ID for each data record.

f1

test1

f1,f3

test1|test3

ff

The ID failed to be generated.

f4

The ID is generated but the data record failed to be synchronized because dirty data occurs.

6. Router attribute column

You can generate routers for the data written to Elasticsearch based on the data written to DataHub. You can also select no router fields. In this case, the router feature of Elasticsearch is not used. When you create a DataConnector to synchronize data from DataHub to Elasticsearch, the value of the field that you select is used as the router of a data record. If multiple fields are selected, the values of the fields that are separated by vertical bars (|) are used together as the router of a data record. The fields that you select from the Router attribute column drop-down list cannot be null.

For more information about examples, see the description of the ID Fields parameter.

7. Import Fields

The fields to be imported from DataHub to Elasticsearch. Fields that are not selected are not synchronized from DataHub to Elasticsearch. Fields that are used as IDs and fields that are used as types in a cluster of Elasticsearch V5.X cannot be selected as the fields to be imported. No examples are provided because the fields to be imported do not completely determine the finally generated data. The following part provides a complete data synchronization example.

8. Network Type

You must select the network type based on the type of the Elasticsearch cluster. DataConnectors used to synchronize data from DataHub on Alibaba Cloud support only the VPC network type because all Elasticsearch clusters on the Alibaba Cloud are virtual private cloud (VPC)-connected clusters. If you select the VPC network type, you must enter information such as the VPC ID and the ID of the Elasticsearch cluster. The following figure shows how to view the information.

ES_3

Note: When you set the Instance ID parameter, you must append -worker to the ID of the Elasticsearch cluster. For example, if the ID of the Elasticsearch cluster is es-cn-xxx, enter es-cn-xxx-worker.

Data write example

In this example, the DataConnector is created. If the DataConnector fails to be created, modify the configuration based on the foregoing parameter description.

The following table describes a topic schema in DataHub.

Field name

Data type

f1

BIGINT

f2

STRING

f3

BOOLEAN

f4

DOUBLE

f5

TIMESTAMP

f6

DECIMAL

Example 1:

  • Type Fields = f1 (This parameter is unavailable when you create a DataConnector to synchronize data from DataHub to a cluster of Elasticsearch V7.X.)

  • ID Fields = f2

  • Import Fields = f1,f2,f3,f4,f5,f6

Data = v1,v2,v3,v4,v5,v6

Elasticsearch version

type

id

data

ES5

v1

v2

{f3:v3,f4:v4,f5:v5,f6:v6}

ES6

f1

v2

{f1:v1,f3:v3,f4:v4,f5:v5,f6:v6}

ES7

-

v2

{f1:v1,f3:v3,f4:v4,f5:v5,f6:v6}

Data = null,v2,v3,v4,v5,v6

Elasticsearch version

type

id

data

ES5

-

-

Dirty data occurs because the type field is null.

ES6

f1

v2

{f1:v1,f3:v3,f4:v4,f5:v5,f6:v6}

ES7

-

v2

{f1:v1,f3:v3,f4:v4,f5:v5,f6:v6}

Data = v1,null,v3,v4,v5,v6

Elasticsearch version

type

id

data

ES5

-

-

Dirty data occurs because the ID field is null.

ES6

-

-

Dirty data occurs because the ID field is null.

ES7

-

-

Dirty data occurs because the ID field is null.

Example 2:

  • Type Fields = f1,f2 (This parameter is unavailable when you create a DataConnector to synchronize data from DataHub to a cluster of Elasticsearch V7.X.)

  • ID Fields = f3,f4

  • Import Fields = f1,f2,f3,f4,f5,f6

Data = v1,v2,v3,v4,v5,v6

Elasticsearch version

type

id

data

ES5

v1|v2

v3|v4

{f5:v5,f6,v6

ES6

f1|f2

v3|v4

{f5:v5,f6,v6}

ES7

-

v3v4

{f1:v1,f2:v2,f5:v5,f6:v6}

Data = v1,null,v3,v4,v5,v6

Elasticsearch version

type

id

data

ES5

-

-

Dirty data occurs because the type field is null.

ES6

f1|f2

v3|v4

{f1:v1,f2:v2,f5:v5,f6:v6}

ES7

-

v3|v4

{f1:v1,f2:v2,f5:v5,f6:v6}

Data = v1,v2,null,v4,v5,v6

Elasticsearch version

type

id

data

ES5

-

-

Dirty data occurs because the ID field is null.

ES6

-

-

Dirty data occurs because the ID field is null.

ES7

-

-

Dirty data occurs because the ID field is null.

Example 3:

  • Type Fields = f1 (This parameter is unavailable when you create a DataConnector to synchronize data from DataHub to a cluster of Elasticsearch V7.X.)

  • ID Fields = f2

  • Router attribute column = f3

  • Import Fields = f1,f2,f3,f4,f5,f6

Data = v1,v2,v3,v4,v5,v6

Elasticsearch version

type

id

router

data

ES5

v1

v2

v3

{f4:v4,f5:v5,f6:v6}

ES6

f1

v2

v3

{f1:v1,f4:v4,f5:v5,f6:v6}

ES7

-

v2

v3

{f1:v1,f4:v4,f5:v5,f6:v6}

Data = null,v2,v3,v4,v5,v6

Elasticsearch version

type

id

data

ES5

-

-

Dirty data occurs because the type field is null.

ES6

f1

v2

{f1:v1,f4:v4,f5:v5,f6:v6}

ES7

-

v2

{f1:v1,f4:v4,f5:v5,f6:v6}

Data = v1,null,v3,v4,v5,v6

Elasticsearch version

type

id

data

ES5

-

-

Dirty data occurs because the ID field is null.

ES6

-

-

Dirty data occurs because the ID field is null.

ES7

-

-

Dirty data occurs because the ID field is null.

Data = v1,v2,null,v4,v5,v6

Elasticsearch version

type

id

data

ES5

-

-

Dirty data occurs because the router field is null.

ES6

-

-

Dirty data occurs because the router field is null.

ES7

-

-

Dirty data occurs because the router field is null.

Example

This example shows the complete procedure of synchronizing data from DataHub to Elasticsearch V6.7. All operations related to Elasticsearch are performed by using the development tools provided in the Kibana console. For more information about other operations, see Elasticsearch documentation.

1. Create an index in Elasticsearch

By default, Elasticsearch can automatically create an index. Therefore, you can skip this step. If you do not allow Elasticsearch to automatically create an index, you must manually create an index. For more information about how to create an index, see Elasticsearch documentation.

2. Create a DataHub topic

Note: You can create DataConnectors to synchronize data to Elasticsearch only for the topics of the TUPLE type.

For more information, see the "Create a topic" section of the Manage topics topic.

View the created topic.

es_003

3. Create a DataConnector to synchronize data to Elasticsearch

In this example, when you create the DataConnector to synchronize data to Elasticsearch, f1 and f2 are used as the type fields, f3 and f4 as the ID fields, and all fields as the fields to be imported.

es_02

4. Write data to the DataHub topic

You can use a DataHub SDK or a DataHub plug-in to write data.

After a data record is written, sample the data and view the written data.

5. Verify the synchronized data

First, check the synchronization offset of the DataConnector that is used to synchronize data to Elasticsearch.

You can find that the synchronization offset and synchronization time of the DataConnector have changed. The synchronization time is the time when the data has just been written to DataHub, and the synchronization offset has changed to 1. The synchronization offset starts from 0, and an offset of 0 indicates that the first data record is written.

Then, check whether data is synchronized to Elasticsearch. You can view the synchronized data in the Kibana console.

image