This topic describes how to use the reindex API to migrate data from a self-managed Elasticsearch cluster to an Alibaba Cloud Elasticsearch cluster. Related operations include index creation and data migration. The self-managed Elasticsearch cluster runs on Elastic Compute Service (ECS) instances.
Background information
- If the self-managed Elasticsearch cluster stores a large volume of data, use snapshots stored in Object Storage Service (OSS).
- If you want to filter source data, use Logstash.
Prerequisites
- The self-managed Elasticsearch cluster meets the following requirements:
- The ECS instances that host the self-managed Elasticsearch cluster are deployed in a virtual private cloud (VPC). You cannot use an ECS instance that is connected to a VPC over a ClassicLink. The self-managed Elasticsearch cluster and Alibaba Cloud Elasticsearch cluster are deployed in the same VPC.
- The IP addresses of nodes in the Alibaba Cloud Elasticsearch cluster are added to the security groups of the ECS instances that host the self-managed Elasticsearch cluster. You can query the IP addresses of the nodes in the Kibana console of the Alibaba Cloud Elasticsearch cluster. In addition, port 9200 is enabled.
- The self-managed Elasticsearch cluster is connected to the Alibaba Cloud Elasticsearch
cluster. You can test the connectivity by running the
curl -XGET http://<host>:9200
command on the server where you run scripts.Note You can run all scripts provided in this topic on a server that is connected to both clusters over port 9200.
- A private connection is configured for the Alibaba Cloud Elasticsearch cluster if
the cluster is deployed in the new network architecture. To configure a private connection
for an Alibaba Cloud Elasticsearch cluster, perform the following steps:
- Create a Classic Load Balancer (CLB) instance that supports the PrivateLink service and resides in the same VPC as the Alibaba Cloud Elasticsearch cluster. For more information, see Step 1: Create a CLB instance that supports PrivateLink.
- Configure the CLB instance. For more information, see Step 2: Configure the CLB instance.
Note You must add all ECS instances that host the self-managed Elasticsearch cluster to the CLB instance as backend servers. In this topic, port 9200 is listened.
- Create an endpoint service. For more information, see Step 3: Create an endpoint service.
- Obtain the domain name of the endpoint that is used to access the endpoint service. For more information, see View the domain name of an endpoint.
Precautions
Scenario | Network architecture | Support for the reindex API | Solution |
---|---|---|---|
Use the reindex API to migrate data between Alibaba Cloud Elasticsearch clusters | Both clusters are deployed in the original network architecture. | Yes | For more information, see Use the reindex API to migrate data. |
Both clusters are deployed in the new network architecture. | No | Use OSS or Logstash to migrate data between the clusters. For more information, see Use OSS to migrate data from a self-managed Elasticsearch cluster to an Alibaba Cloud Elasticsearch cluster and Use Alibaba Cloud Logstash to migrate data from a self-managed Elasticsearch cluster to an Alibaba Cloud Elasticsearch cluster. | |
One is deployed in the original network architecture, and the other is deployed in the new network architecture. | No | ||
Migrate data from a self-managed Elasticsearch cluster that runs on ECS instances to an Alibaba Cloud Elasticsearch cluster | The Alibaba Cloud Elasticsearch cluster is deployed in the original network architecture. | Yes | For more information, see Use the reindex API to migrate data from a self-managed Elasticsearch cluster to an Alibaba Cloud Elasticsearch cluster. |
The Alibaba Cloud Elasticsearch cluster is deployed in the new network architecture. | Yes | Use the PrivateLink service to establish a network connection between the Alibaba
Cloud Elasticsearch cluster and the self-managed Elasticsearch cluster that runs on
ECS instances. This way, the service account of Alibaba Cloud Elasticsearch can be
used to access the self-managed Elasticsearch cluster. Then, use the domain name of
the endpoint you obtained and the reindex API to migrate data between the clusters.
For more information, see Migrate data from a self-managed Elasticsearch cluster to an Alibaba Cloud Elasticsearch cluster deployed in the new network architecture.
Note Only some regions support PrivateLink. For more information, see Regions and zones that support PrivateLink. If the zone where your Alibaba Cloud Elasticsearch cluster resides does not support
PrivateLink, you cannot use the reindex API to migrate data between the two clusters.
|
- Alibaba Cloud Elasticsearch clusters deployed in the new network architecture reside in an exclusive VPC for Alibaba Cloud Elasticsearch. These clusters cannot access resources in other network environments. Alibaba Cloud Elasticsearch clusters deployed in the original network architecture reside in VPCs that are created by users. These clusters can still access resources in other network environments.
- The network architecture in the China (Zhangjiakou) region and the regions outside China was adjusted before October 2020. If you want to perform operations between a cluster that is created before October 2020 and a cluster that is created in October 2020 or later in such a region, submit a ticket to contact Alibaba Cloud technical support to check whether the network architecture supports the operations.
- Clusters created in other regions before October 2020 are deployed in the original network architecture, and those created in other regions in October 2020 or later are deployed in the new network architecture.
- To ensure data consistency, we recommend that you stop writing data to the self-managed Elasticsearch cluster before the migration. This way, you can continue to read data from the cluster during the migration. After the migration, you can read data from and write data to the Alibaba Cloud Elasticsearch cluster. If you do not stop writing data to the self-managed Elasticsearch cluster, we recommend that you configure loop execution for reindex operations in the code to shorten the time during which write operations are suspended. For more information, see the method used to migrate a large volume of data (without deletions and with update time) in the Step 3: Migrate data section.
- If you connect to the self-managed Elasticsearch cluster or the Alibaba Cloud Elasticsearch cluster by using its domain name, do not include path in the URL, such as http://host:port/path.
Procedure
Step 1: Create indexes on the Alibaba Cloud Elasticsearch cluster
Create indexes on the Alibaba Cloud Elasticsearch cluster based on the index settings of the self-managed Elasticsearch cluster. You can also enable the Auto Indexing feature for the Alibaba Cloud Elasticsearch cluster. However, we recommend that you do not use this feature.
#!/usr/bin/python
# -*- coding: UTF-8 -*-
# File name: indiceCreate.py
import sys
import base64
import time
import httplib
import json
## Specify the host of the self-managed Elasticsearch cluster.
oldClusterHost = "old-cluster.com"
## Specify the username of the self-managed Elasticsearch cluster. The field can be empty.
oldClusterUserName = "old-username"
## Specify the password of the self-managed Elasticsearch cluster. The field can be empty.
oldClusterPassword = "old-password"
## Specify the host of the Alibaba Cloud Elasticsearch cluster.
newClusterHost = "new-cluster.com"
## Specify the username of the Alibaba Cloud Elasticsearch cluster. The field can be empty.
newClusterUser = "new-username"
## Specify the password of the Alibaba Cloud Elasticsearch cluster. The field can be empty.
newClusterPassword = "new-password"
DEFAULT_REPLICAS = 0
def httpRequest(method, host, endpoint, params="", username="", password=""):
conn = httplib.HTTPConnection(host)
headers = {}
if (username != "") :
'Hello {name}, your age is {age} !'.format(name = 'Tom', age = '20')
base64string = base64.encodestring('{username}:{password}'.format(username = username, password = password)).replace('\n', '')
headers["Authorization"] = "Basic %s" % base64string;
if "GET" == method:
headers["Content-Type"] = "application/x-www-form-urlencoded"
conn.request(method=method, url=endpoint, headers=headers)
else :
headers["Content-Type"] = "application/json"
conn.request(method=method, url=endpoint, body=params, headers=headers)
response = conn.getresponse()
res = response.read()
return res
def httpGet(host, endpoint, username="", password=""):
return httpRequest("GET", host, endpoint, "", username, password)
def httpPost(host, endpoint, params, username="", password=""):
return httpRequest("POST", host, endpoint, params, username, password)
def httpPut(host, endpoint, params, username="", password=""):
return httpRequest("PUT", host, endpoint, params, username, password)
def getIndices(host, username="", password=""):
endpoint = "/_cat/indices"
indicesResult = httpGet(oldClusterHost, endpoint, oldClusterUserName, oldClusterPassword)
indicesList = indicesResult.split("\n")
indexList = []
for indices in indicesList:
if (indices.find("open") > 0):
indexList.append(indices.split()[2])
return indexList
def getSettings(index, host, username="", password=""):
endpoint = "/" + index + "/_settings"
indexSettings = httpGet(host, endpoint, username, password)
print index + " Original settings: \n" + indexSettings
settingsDict = json.loads(indexSettings)
## By default, the number of primary shards is the same as that for the indexes on the self-managed Elasticsearch cluster.
number_of_shards = settingsDict[index]["settings"]["index"]["number_of_shards"]
## The default number of replica shards is 0.
number_of_replicas = DEFAULT_REPLICAS
newSetting = "\"settings\": {\"number_of_shards\": %s, \"number_of_replicas\": %s}" % (number_of_shards, number_of_replicas)
return newSetting
def getMapping(index, host, username="", password=""):
endpoint = "/" + index + "/_mapping"
indexMapping = httpGet(host, endpoint, username, password)
print index + " Original mappings: \n" + indexMapping
mappingDict = json.loads(indexMapping)
mappings = json.dumps(mappingDict[index]["mappings"])
newMapping = "\"mappings\" : " + mappings
return newMapping
def createIndexStatement(oldIndexName):
settingStr = getSettings(oldIndexName, oldClusterHost, oldClusterUserName, oldClusterPassword)
mappingStr = getMapping(oldIndexName, oldClusterHost, oldClusterUserName, oldClusterPassword)
createstatement = "{\n" + str(settingStr) + ",\n" + str(mappingStr) + "\n}"
return createstatement
def createIndex(oldIndexName, newIndexName=""):
if (newIndexName == "") :
newIndexName = oldIndexName
createstatement = createIndexStatement(oldIndexName)
print "New index " + newIndexName + " Index settings and mappings: \n" + createstatement
endpoint = "/" + newIndexName
createResult = httpPut(newClusterHost, endpoint, createstatement, newClusterUser, newClusterPassword)
print "New index " + newIndexName + " Creation result: " + createResult
## main
indexList = getIndices(oldClusterHost, oldClusterUserName, oldClusterPassword)
systemIndex = []
for index in indexList:
if (index.startswith(".")):
systemIndex.append(index)
else :
createIndex(index, index)
if (len(systemIndex) > 0) :
for index in systemIndex:
print index + " It may be a system index and will not be recreated. You can manually recreate the index based on your business requirements."
Step 2: Configure a remote reindex whitelist for the Alibaba Cloud Elasticsearch cluster
Step 3: Migrate data
You can use one of the following methods to migrate data. Select a suitable method based on the volume of data that you want to migrate and your business requirements. In this example, data is migrated to an Alibaba Cloud Elasticsearch cluster that is deployed in the original network architecture.
- Migrate a small volume of data
Run the following script:
#!/bin/bash # file:reindex.sh indexName="The name of the index" newClusterUser="The username of the Alibaba Cloud Elasticsearch cluster" newClusterPass="The password of the Alibaba Cloud Elasticsearch cluster" newClusterHost="The host of the Alibaba Cloud Elasticsearch cluster" oldClusterUser="The username of the self-managed Elasticsearch cluster" oldClusterPass="The password of the self-managed Elasticsearch cluster" # You must configure the host of the self-managed Elasticsearch cluster in the format of [scheme]://[host]:[port]. Example: http://10.37.1. *:9200. oldClusterHost="The host of the self-managed Elasticsearch cluster" curl -u ${newClusterUser}:${newClusterPass} -XPOST "http://${newClusterHost}/_reindex?pretty" -H "Content-Type: application/json" -d'{ "source": { "remote": { "host": "'${oldClusterHost}'", "username": "'${oldClusterUser}'", "password": "'${oldClusterPass}'" }, "index": "'${indexName}'", "query": { "match_all": {} } }, "dest": { "index": "'${indexName}'" } }'
- Migrate a large volume of data (without deletions and with update time)
To migrate a large volume of data without deletions, you can perform a rolling update to shorten the time during which write operations are suspended. The rolling update requires that your data schema has a time-series attribute that indicates the update time. You can stop writing data to the self-managed Elasticsearch cluster after data is migrated. Then, use the reindex API to perform a rolling update to synchronize the data that is updated during the migration. After the rolling update is complete, you can read data from and write data to the Alibaba Cloud Elasticsearch cluster.
#!/bin/bash # file: circleReindex.sh # CONTROLLING STARTUP: # This is a script that uses the reindex API to remotely reindex data. Requirements: # 1. Indexes are created on the Alibaba Cloud Elasticsearch cluster, or the Auto Indexing and dynamic mapping features are enabled for the cluster. # 2. The following information is added to the YML file of the destination Elasticsearch cluster: reindex.remote.whitelist: 172.16.123.*:9200. # 3. The host is configured in the format of [scheme]://[host]:[port]. USAGE="Usage: sh circleReindex.sh <count> count: the number of reindex operations that you can perform. A negative number indicates loop execution. Example: sh circleReindex.sh 1 sh circleReindex.sh 5 sh circleReindex.sh -1" indexName="The name of the index" newClusterUser="The username of the Alibaba Cloud Elasticsearch cluster" newClusterPass="The password of the Alibaba Cloud Elasticsearch cluster" oldClusterUser="The username of the self-managed Elasticsearch cluster" oldClusterPass="The password of the self-managed Elasticsearch cluster" ## http://myescluster.com newClusterHost="The host of the Alibaba Cloud Elasticsearch cluster" # You must configure the host of the self-managed Elasticsearch cluster in the format of [scheme]://[host]:[port]. Example: http://10.37.1. *:9200. oldClusterHost="The host of the self-managed Elasticsearch cluster" timeField="The update time of data" reindexTimes=0 lastTimestamp=0 curTimestamp=`date +%s` hasError=false function reIndexOP() { reindexTimes=$[${reindexTimes} + 1] curTimestamp=`date +%s` ret=`curl -u ${newClusterUser}:${newClusterPass} -XPOST "${newClusterHost}/_reindex?pretty" -H "Content-Type: application/json" -d '{ "source": { "remote": { "host": "'${oldClusterHost}'", "username": "'${oldClusterUser}'", "password": "'${oldClusterPass}'" }, "index": "'${indexName}'", "query": { "range" : { "'${timeField}'" : { "gte" : '${lastTimestamp}', "lt" : '${curTimestamp}' } } } }, "dest": { "index": "'${indexName}'" } }'` lastTimestamp=${curTimestamp} echo "${reindexTimes} reindex operations are performed. The last reindex operation is completed at ${lastTimestamp}. Result: ${ret}." if [[ ${ret} == *error* ]]; then hasError=true echo "An unknown error occurred when you perform this operation. All subsequent operations are suspended." fi } function start() { ## A negative number indicates loop execution. if [[ $1 -lt 0 ]]; then while : do reIndexOP done elif [[ $1 -gt 0 ]]; then k=0 while [[ k -lt $1 ]] && [[ ${hasError} == false ]]; do reIndexOP let ++k done fi } ## main if [ $# -lt 1 ]; then echo "$USAGE" exit 1 fi echo "Start the reindex operation for the ${indexName} index." start $1 echo "${reindexTimes} reindex operations are performed."
- Migrate a large volume of data (without deletions and update time)
You can migrate a large volume of data when no update time is defined in the index mappings of the self-managed Elasticsearch cluster. However, you must add an update time field to the index mappings. After the field is added, you can migrate existing data. Then, perform a rolling update that is described in the second data migration method to migrate incremental data.
#!/bin/bash # file:miss.sh indexName="The name of the index" newClusterUser="The username of the Alibaba Cloud Elasticsearch cluster" newClusterPass="The password of the Alibaba Cloud Elasticsearch cluster" newClusterHost="The host of the Alibaba Cloud Elasticsearch cluster" oldClusterUser="The username of the self-managed Elasticsearch cluster" oldClusterPass="The password of the self-managed Elasticsearch cluster" # You must configure the host of the self-managed Elasticsearch cluster in the format of [scheme]://[host]:[port]. Example: http://10.37.1. *:9200. oldClusterHost="The host of the self-managed Elasticsearch cluster" timeField="updatetime" curl -u ${newClusterUser}:${newClusterPass} -XPOST "http://${newClusterHost}/_reindex?pretty" -H "Content-Type: application/json" -d '{ "source": { "remote": { "host": "'${oldClusterHost}'", "username": "'${oldClusterUser}'", "password": "'${oldClusterPass}'" }, "index": "'${indexName}'", "query": { "bool": { "must_not": { "exists": { "field": "'${timeField}'" } } } } }, "dest": { "index": "'${indexName}'" } }'
- Migrate data without suspending write operations
This data migration method will be available soon.
FAQ
- Problem: When I run the curl command, the system displays
{"error":"Content-Type header [application/x-www-form-urlencoded] is not supported","status":406}
. What do I do?Solution: Add-H "Content-Type: application/json"
to the curl command and try again.// Obtain all the indexes on the self-managed Elasticsearch cluster. If you do not have the required permissions, remove the "-u user:pass" parameter. Replace oldClusterHost with the information about the host of the self-managed Elasticsearch cluster. curl -u user:pass -XGET http://oldClusterHost/_cat/indices | awk '{print $3}' // Obtain the settings and mappings of the index that you want to migrate for the specified user based on the returned indexes. Replace indexName with the index name that you want to query. curl -u user:pass -XGET http://oldClusterHost/indexName/_settings,_mapping?pretty=true // Create an index on the Alibaba Cloud Elasticsearch cluster based on the _settings and _mapping configurations that you obtained. You can set the number of replica shards to 0 to accelerate data migration, and change the number to 1 after data is migrated. // newClusterHost indicates the host of the Alibaba Cloud Elasticsearch cluster, testindex indicates the name of the index that you have created, and testtype indicates the type of the index. curl -u user:pass -XPUT http://<newClusterHost>/<testindex> -d '{ "testindex" : { "settings" : { "number_of_shards" : "5", //Specify the number of primary shards for the index on the self-managed Elasticsearch cluster, such as 5. "number_of_replicas" : "0" //Set the number of replica shards to 0. } }, "mappings" : { //Specify the mappings for the index on the self-managed Elasticsearch cluster. Example: "testtype" : { "properties" : { "uid" : { "type" : "long" }, "name" : { "type" : "text" }, "create_time" : { "type" : "long" } } } } } }'
- Problem: What do I do if the source index stores large volumes of data and the data
migration is slow?
Solution:
- If you use the reindex API to migrate data, data is migrated in scroll mode. To improve the efficiency of data migration, you can increase the scroll size or configure a sliced scroll. The sliced scroll can parallelize the reindex process. For more information, see the reindex API.
- If the self-managed Elasticsearch cluster stores large volumes of data, we recommend that you use snapshots stored in Object Storage Service (OSS) to migrate data. For more information, see Use OSS to migrate data from a self-managed Elasticsearch cluster to an Alibaba Cloud Elasticsearch cluster.
- If the source index stores large volumes of data, you can set the number of replica
shards to 0 and the refresh interval to -1 for the destination index before you migrate
data to accelerate data migration. After data is migrated, restore the settings to
the original values.
// You can set the number of replica shards to 0 and disable the refresh feature to accelerate the data migration. curl -u user:password -XPUT 'http://<host:port>/indexName/_settings' -d' { "number_of_replicas" : 0, "refresh_interval" : "-1" }' // After data is migrated, set the number of replica shards to 1 and the refresh interval to 1s, which is the default value. curl -u user:password -XPUT 'http://<host:port>/indexName/_settings' -d' { "number_of_replicas" : 1, "refresh_interval" : "1s" }'