This guide describes how to migrate data from a self-managed Elasticsearch cluster (hosted on ECS) to an Alibaba Cloud Elasticsearch instance using the Reindex API. The Reindex API allows the destination cluster to "pull" data from a remote source cluster.
Background and selection guide
The Reindex API is ideal for migrating specific indices or small-to-medium data volumes. Depending on your instance architecture and data size, choose the appropriate tool:
Data Volume / Requirement | Recommended Method |
Small to Medium Volume | Reindex API (This guide) |
Large Volume (> 100GB) | |
Data Filtering/Transformation |
Prerequisites
Network connectivity:
The ECS instance (self-managed Elasticsearch) and the Alibaba Cloud Elasticsearch cluster must be in the same Virtual Private Cloud (VPC).
Security group: The ECS security group must allow inbound traffic on port 9200 from the Alibaba Cloud Elasticsearch node IP addresses (found in the Kibana console).
Cluster architecture check:
Check your Alibaba Cloud Elasticsearch cluster's Basic Information > Control Architecture Type to see its architecture:Basic Control Architecture (v2): Deployed in your VPC. Access is straightforward. See Procedure of this guide for instructions.
Cloud-native Control Architecture (v3): Deployed in a service VPC. You must use PrivateLink to bridge the network. See Migrate data from a self-managed Elasticsearch cluster to Alibaba Cloud Elasticsearch over a private connection.
Stability: Stop writing data to the source cluster during migration to ensure 100% data consistency.
Usage notes
Instances created before October 2020 (v2 architecture) cannot directly perform cross-cluster reindexing with instances created after October 2020 (v3 architecture) without using a proxy or PrivateLink. If they are on different architectures, use Logstash as an intermediary.
When you use a domain name to access a self-managed Elasticsearch or an Alibaba Cloud Elasticsearch cluster, you cannot use a URL that includes a path, such as
http://host:port/path.
Procedure
Step 1: Establish network connectivity (For v3 architecture only)
If your Alibaba Cloud Elasticsearch cluster uses the cloud-native control architecture (v3) architecture, you must create a private connection using PrivateLink. For more information, see Create private connections with PrivateLink and SLB.
Step 2: Create a destination index
You must create the index in the destination Alibaba Cloud Elasticsearch cluster with the same mappings and settings as the source.
Batch Script (Python 2.7): Use this script to replicate index structures (mappings/shards) from the old cluster to the new one.
In this example, replicas are set to 0 to speed up migration.
#!/usr/bin/python
# -*- coding: UTF-8 -*-
# File name: indiceCreate.py
import sys
import base64
import time
import httplib
import json
## The host of the self-managed Elasticsearch cluster.
oldClusterHost = "old-cluster.com"
## The username for the self-managed Elasticsearch cluster. This can be empty.
oldClusterUserName = "old-username"
## The password for the self-managed Elasticsearch cluster. This can be empty.
oldClusterPassword = "old-password"
## The host of the Alibaba Cloud Elasticsearch cluster. You can obtain it from the Basic Information page of the Alibaba Cloud Elasticsearch instance.
newClusterHost = "new-cluster.com"
## The username for the Alibaba Cloud Elasticsearch cluster.
newClusterUser = "elastic"
## The password for the Alibaba Cloud Elasticsearch cluster.
newClusterPassword = "new-password"
DEFAULT_REPLICAS = 0
def httpRequest(method, host, endpoint, params="", username="", password=""):
conn = httplib.HTTPConnection(host)
headers = {}
if (username != "") :
'Hello {name}, your age is {age} !'.format(name = 'Tom', age = '20')
base64string = base64.encodestring('{username}:{password}'.format(username = username, password = password)).replace('\n', '')
headers["Authorization"] = "Basic %s" % base64string;
if "GET" == method:
headers["Content-Type"] = "application/x-www-form-urlencoded"
conn.request(method=method, url=endpoint, headers=headers)
else :
headers["Content-Type"] = "application/json"
conn.request(method=method, url=endpoint, body=params, headers=headers)
response = conn.getresponse()
res = response.read()
return res
def httpGet(host, endpoint, username="", password=""):
return httpRequest("GET", host, endpoint, "", username, password)
def httpPost(host, endpoint, params, username="", password=""):
return httpRequest("POST", host, endpoint, params, username, password)
def httpPut(host, endpoint, params, username="", password=""):
return httpRequest("PUT", host, endpoint, params, username, password)
def getIndices(host, username="", password=""):
endpoint = "/_cat/indices"
indicesResult = httpGet(oldClusterHost, endpoint, oldClusterUserName, oldClusterPassword)
indicesList = indicesResult.split("\n")
indexList = []
for indices in indicesList:
if (indices.find("open") > 0):
indexList.append(indices.split()[2])
return indexList
def getSettings(index, host, username="", password=""):
endpoint = "/" + index + "/_settings"
indexSettings = httpGet(host, endpoint, username, password)
print index + " The original settings are as follows:\n" + indexSettings
settingsDict = json.loads(indexSettings)
## By default, the number of shards is the same as that of the index in the self-managed Elasticsearch cluster.
number_of_shards = settingsDict[index]["settings"]["index"]["number_of_shards"]
## By default, the number of replicas is 0.
number_of_replicas = DEFAULT_REPLICAS
newSetting = "\"settings\": {\"number_of_shards\": %s, \"number_of_replicas\": %s}" % (number_of_shards, number_of_replicas)
return newSetting
def getMapping(index, host, username="", password=""):
endpoint = "/" + index + "/_mapping"
indexMapping = httpGet(host, endpoint, username, password)
print index + " The original mapping is as follows:\n" + indexMapping
mappingDict = json.loads(indexMapping)
mappings = json.dumps(mappingDict[index]["mappings"])
newMapping = "\"mappings\" : " + mappings
return newMapping
def createIndexStatement(oldIndexName):
settingStr = getSettings(oldIndexName, oldClusterHost, oldClusterUserName, oldClusterPassword)
mappingStr = getMapping(oldIndexName, oldClusterHost, oldClusterUserName, oldClusterPassword)
createstatement = "{\n" + str(settingStr) + ",\n" + str(mappingStr) + "\n}"
return createstatement
def createIndex(oldIndexName, newIndexName=""):
if (newIndexName == "") :
newIndexName = oldIndexName
createstatement = createIndexStatement(oldIndexName)
print "The settings and mapping for the new index " + newIndexName + " are as follows:\n" + createstatement
endpoint = "/" + newIndexName
createResult = httpPut(newClusterHost, endpoint, createstatement, newClusterUser, newClusterPassword)
print "Result of creating the new index " + newIndexName + ": " + createResult
## main
indexList = getIndices(oldClusterHost, oldClusterUserName, oldClusterPassword)
systemIndex = []
for index in indexList:
if (index.startswith(".")):
systemIndex.append(index)
else :
createIndex(index, index)
if (len(systemIndex) > 0) :
for index in systemIndex:
print index + " might be a system index and will not be re-created. Handle it separately if needed."Step 3: Configure the remote reindex whitelist
Alibaba Cloud Elasticsearch requires a whitelist to allow remote communication.
Log on to the Alibaba Cloud Elasticsearch console.
In the left navigation menu, choose Elasticsearch Clusters.
Navigate to the target cluster.
In the top navigation bar, select the resource group to which the cluster belongs and the region where the cluster resides.
On the Elasticsearch Clusters page, find the cluster and click its ID.
In the left navigation menu, choose .
In the YML File Configuration section, click Modify Configuration on the right.
In the YML File Configuration panel, modify Other Configurations to set the reindex whitelist. For more information, see Configure YML parameters.
v2 architecture: Specify a combination of a host and a port. Example:
reindex.remote.whitelist: ["10.0.xx.xx:9200","10.0.xx.xx:9200","10.0.xx.xx:9200","10.15.xx.xx:9200","10.15.xx.xx:9200","10.15.xx.xx:9200"]
v3 architecture: Specify a combination of the endpoint domain name and port that correspond to the instance. Example:
ep-bp1hfkx7coy8lvu4****-cn-hangzhou-i.epsrv-bp1zczi0fgoc5qtv****.cn-hangzhou.privatelink.aliyuncs.com:9200
Save and restart the cluster.
Step 4: Migrate data
Select the script that matches your data scenario.
Scenario A: Simple migration (small data)
Run this once for each index.
#!/bin/bash
# file:reindex.sh
indexName="Your index name"
newClusterUser="Username for the Alibaba Cloud Elasticsearch cluster"
newClusterPass="Password for the Alibaba Cloud Elasticsearch cluster"
newClusterHost="Host of the Alibaba Cloud Elasticsearch cluster"
oldClusterUser="Username for the self-managed Elasticsearch cluster"
oldClusterPass="Password for the self-managed Elasticsearch cluster"
# The host of the self-managed Elasticsearch cluster must be in the format of [scheme]://[host]:[port], for example, http://10.37.*.*:9200.
oldClusterHost="Host of the self-managed Elasticsearch cluster"
curl -u ${newClusterUser}:${newClusterPass} -XPOST "http://${newClusterHost}/_reindex?pretty" -H "Content-Type: application/json" -d'{
"source": {
"remote": {
"host": "'${oldClusterHost}'",
"username": "'${oldClusterUser}'",
"password": "'${oldClusterPass}'"
},
"index": "'${indexName}'",
"query": {
"match_all": {}
}
},
"dest": {
"index": "'${indexName}'"
}
}'Scenario B: Incremental migration (Large data with timestamp)
If you have an update_time field, use this loop script to migrate data in chunks. This minimizes downtime by allowing you to sync the "delta" (new changes) after the initial bulk move.
#!/bin/bash
# file: circleReindex.sh
# CONTROLLING STARTUP:
# This is a script for remote reindexing. Requirements:
# 1. The index has been created in the Alibaba Cloud Elasticsearch cluster, or the cluster supports automatic creation and dynamic mapping.
# 2. An IP address whitelist must be configured in the YML file of the Alibaba Cloud Elasticsearch cluster, for example, reindex.remote.whitelist: 172.16.**.**:9200.
# 3. The host must be in the format of [scheme]://[host]:[port].
USAGE="Usage: sh circleReindex.sh <count>
count: The number of executions. A negative number indicates a loop for incremental execution. A positive number indicates a one-time or multiple executions.
Example:
sh circleReindex.sh 1
sh circleReindex.sh 5
sh circleReindex.sh -1"
indexName="Your index name"
newClusterUser="Username for the Alibaba Cloud Elasticsearch cluster"
newClusterPass="Password for the Alibaba Cloud Elasticsearch cluster"
oldClusterUser="Username for the self-managed Elasticsearch cluster"
oldClusterPass="Password for the self-managed Elasticsearch cluster"
## http://myescluster.com
newClusterHost="Host of the Alibaba Cloud Elasticsearch cluster"
# The host of the self-managed Elasticsearch cluster must be in the format of [scheme]://[host]:[port], for example, http://10.37.*.*:9200.
oldClusterHost="Host of the self-managed Elasticsearch cluster"
timeField="Update time field"
reindexTimes=0
lastTimestamp=0
curTimestamp=`date +%s`
hasError=false
function reIndexOP() {
reindexTimes=$[${reindexTimes} + 1]
curTimestamp=`date +%s`
ret=`curl -u ${newClusterUser}:${newClusterPass} -XPOST "${newClusterHost}/_reindex?pretty" -H "Content-Type: application/json" -d '{
"source": {
"remote": {
"host": "'${oldClusterHost}'",
"username": "'${oldClusterUser}'",
"password": "'${oldClusterPass}'"
},
"index": "'${indexName}'",
"query": {
"range" : {
"'${timeField}'" : {
"gte" : '${lastTimestamp}',
"lt" : '${curTimestamp}'
}
}
}
},
"dest": {
"index": "'${indexName}'"
}
}'`
lastTimestamp=${curTimestamp}
echo "The ${reindexTimes}th reindex. The update deadline for this execution is ${lastTimestamp}. Result: ${ret}"
if [[ ${ret} == *error* ]]; then
hasError=true
echo "An exception occurred during this execution. Subsequent operations are interrupted. Please check."
fi
}
function start() {
## If the number is negative, the loop runs continuously.
if [[ $1 -lt 0 ]]; then
while :
do
reIndexOP
done
elif [[ $1 -gt 0 ]]; then
k=0
while [[ k -lt $1 ]] && [[ ${hasError} == false ]]; do
reIndexOP
let ++k
done
fi
}
## main
if [ $# -lt 1 ]; then
echo "$USAGE"
exit 1
fi
echo "Start the reindex operation for the index ${indexName}."
start $1
echo "A total of ${reindexTimes} reindex operations were performed."Scenario C: Incremental migration (Large data without timestamp)
Modify the upstream service code to add an update_time field. After you add the field, first migrate the historical data. Then, use the scroll migration method described in Scenario B: Incremental migration (Large data with timestamp) .
#!/bin/bash
# file:miss.sh
indexName="Your index name"
newClusterUser="Username for the Alibaba Cloud Elasticsearch cluster"
newClusterPass="Password for the Alibaba Cloud Elasticsearch cluster"
newClusterHost="Host of the Alibaba Cloud Elasticsearch cluster"
oldClusterUser="Username for the self-managed Elasticsearch cluster"
oldClusterPass="Password for the self-managed Elasticsearch cluster"
# The host of the self-managed Elasticsearch cluster must be in the format of [scheme]://[host]:[port], for example, http://10.37.*.*:9200
oldClusterHost="Host of the self-managed Elasticsearch cluster"
timeField="updatetime"
curl -u ${newClusterUser}:${newClusterPass} -XPOST "http://${newClusterHost}/_reindex?pretty" -H "Content-Type: application/json" -d '{
"source": {
"remote": {
"host": "'${oldClusterHost}'",
"username": "'${oldClusterUser}'",
"password": "'${oldClusterPass}'"
},
"index": "'${indexName}'",
"query": {
"bool": {
"must_not": {
"exists": {
"field": "'${timeField}'"
}
}
}
}
},
"dest": {
"index": "'${indexName}'"
}
}'FAQ and troubleshooting
Q: When I run the curl command, the error message
{"error":"Content-Type header [application/x-www-form-urlencoded] is not supported","status":406}is returned.A: Add
-H "Content-Type: application/json"to the curl command and try again.// Get information about all indexes in the self-managed Elasticsearch cluster. If you do not have permissions, you can remove the "-u user:pass" parameter. oldClusterHost is the host of the self-managed Elasticsearch cluster. Replace it with your actual host. curl -u user:pass -XGET http://oldClusterHost/_cat/indices | awk '{print $3}' // Based on the returned index list, obtain the settings and mapping of the user index to be migrated. Replace indexName with the name of the user index you want to query. curl -u user:pass -XGET http://oldClusterHost/indexName/_settings,_mapping?pretty=true // Based on the obtained _settings and _mapping information of the corresponding index, create the corresponding index in the Alibaba Cloud Elasticsearch cluster. You can set the number of replicas to 0 to speed up data synchronization. After the data migration is complete, reset the number of replicas to 1. // newClusterHost is the host of the Alibaba Cloud Elasticsearch cluster, testindex is the name of the created index, and testtype is the type of the corresponding index. curl -u user:pass -XPUT http://<newClusterHost>/<testindex> -d '{ "testindex" : { "settings" : { "number_of_shards" : "5", // Assume that the number of shards for the corresponding index in the self-managed Elasticsearch cluster is 5. "number_of_replicas" : "0" // Set the number of replicas for the index to 0. } }, "mappings" : { // Assume that the mappings for the corresponding index in the self-managed Elasticsearch cluster are configured as follows. "testtype" : { "properties" : { "uid" : { "type" : "long" }, "name" : { "type" : "text" }, "create_time" : { "type" : "long" } } } } } }'Q: How can I speed up the migration?
A:
Disable replicas: Set
number_of_replicas: 0on the destination index before starting.Disable refresh: Set
refresh_interval: -1on the destination index.Slicing: Use the
slicesparameter in the Reindex API to parallelize the process:POST _reindex?slices=5&refreshFor more information, see reindex API.