Before migrating data between Elasticsearch clusters, create the required indexes on the destination cluster with the same mappings and settings as the source. Skipping this step causes Elasticsearch to infer field types through dynamic mapping, which can lead to data loss, field type mismatches, and degraded query performance.
This guide shows how to use Python scripts to copy the following from a source cluster to a destination cluster:
-
Index lifecycle management (ILM) policies
-
Index templates
-
Index mappings and settings
Run the scripts in this order: ILM policies first, then index templates, then index mappings and settings. Index templates and ILM policies must exist before indexes are created against them.
Prerequisites
Before you begin, make sure you have:
-
Two Alibaba Cloud Elasticsearch clusters (both V7.10), one as the source and one as the destination. See Create an Alibaba Cloud Elasticsearch cluster.
-
An Elastic Compute Service (ECS) instance with Python 3.6.8 installed. See Get started with Linux instances.
-
Network connectivity between the ECS instance and both clusters, with the ECS instance IP address added to the public or private IP address whitelists of both clusters.
For production environments, connect over an internal network to keep data off the public internet.
The scripts in this guide target Elasticsearch V7.10, which does not support multi-type indexes. If your clusters run an earlier version, modify the scripts accordingly.
Script configuration
Each script uses the same config dictionary. Update the following fields before running any script:
| Field | Description | Example |
|---|---|---|
old_cluster_host |
Source cluster host and port | es-cn-27a3jyyd2000770dd.public.elasticsearch.aliyuncs.com:9200 |
old_cluster_user |
Source cluster username | yourusername |
old_cluster_password |
Source cluster password | yourpassward |
old_cluster_protocol |
Source cluster protocol (http or https) |
http |
new_cluster_host |
Destination cluster host and port. Find this on the Basic Information page in the Elasticsearch console. | es-cn-o493kbics000cw4pt.elasticsearch.aliyuncs.com:9200 |
new_cluster_user |
Destination cluster username | yourusername |
new_cluster_password |
Destination cluster password | yourpassward |
new_cluster_protocol |
Destination cluster protocol (http or https) |
http |
default_replicas |
Replica shard count to apply on the destination cluster. This overrides the source cluster's replica count. | 1 |
Synchronize ILM policies
ILM policies define how indexes age through phases—such as hot and delete—and what actions to take at each phase. Synchronize ILM policies before index templates, because templates can reference ILM policies.
Create test data on the source cluster
Run the following command on the source cluster to create a test ILM policy:
PUT _ilm/policy/product
{
"policy": {
"phases": {
"hot": {
"actions": {
"rollover": {
"max_size": "1GB",
"max_age": "1d",
"max_docs": 1000
}
}
},
"delete": {
"min_age": "2h",
"actions": {
"delete": {}
}
}
}
}
}
Run the sync script
Run the following script on the ECS instance. It fetches all ILM policies from the source cluster and creates them on the destination cluster. The script removes version, modified_date, and modified_date_string before writing to the destination, because those are read-only metadata generated by Elasticsearch and cannot be set on creation.
import requests
from requests.auth import HTTPBasicAuth
# Update these values for your environment.
config = {
# Source cluster
'old_cluster_host': 'es-cn-27a3jyyd2000770dd.public.elasticsearch.aliyuncs.com:9200',
'old_cluster_user': 'yourusername',
'old_cluster_password': 'yourpassward',
'old_cluster_protocol': 'http', # http or https
# Destination cluster
# Find the host on the Basic Information page in the Elasticsearch console.
'new_cluster_host': 'es-cn-o493kbics000cw4pt.elasticsearch.aliyuncs.com:9200',
'new_cluster_user': 'yourusername',
'new_cluster_password': 'yourpassward',
'new_cluster_protocol': 'http', # http or https
'default_replicas': 1,
}
def send_http_request(method, host, endpoint, username="", password="", params=None, json_body=None, protocol='http'):
url = f"{protocol}://{host}{endpoint}"
auth = (username, password) if username and password else None
headers = {'Content-Type': 'application/json'} if method != 'GET' else None
try:
response = requests.request(method, url, auth=auth, params=params, json=json_body, headers=headers)
response.raise_for_status()
return response.json()
except requests.HTTPError as e:
print(f"HTTP Error: {e.response.status_code} for {url}")
print(e.response.text)
except ValueError as e:
print("Invalid JSON response:")
print(response.text)
raise
def get_ilm_polices():
endpoint = "/_ilm/policy"
templates_result = send_http_request('GET', config['old_cluster_host'], endpoint, config['old_cluster_user'], config['old_cluster_password'], protocol=config['old_cluster_protocol'])
return templates_result
def create_ilm_policy(policy_name, policy_body):
# Remove read-only metadata before writing to the destination cluster.
policy_body.pop('version', None)
policy_body.pop('modified_date', None)
policy_body.pop('modified_date_string', None)
endpoint = f"/_ilm/policy/{policy_name}"
create_result = send_http_request('PUT', config['new_cluster_host'], endpoint, config['new_cluster_user'], config['new_cluster_password'], protocol=config['new_cluster_protocol'], json_body=policy_body)
print(f"Policy {policy_name} created with result: {create_result}")
def main():
policies = get_ilm_polices()
for policy_name, policy_body in policies.items():
create_ilm_policy(policy_name, policy_body)
if __name__ == '__main__':
main()
Verify the result
Run the following command on the destination cluster to confirm the ILM policy was created:
GET _ilm/policy/product
Synchronize index templates
Index templates automatically apply mappings and settings to new indexes whose names match a specified pattern. Synchronize templates after ILM policies and before creating any indexes that depend on them.
Create test data on the source cluster
Run the following command on the source cluster to create a test index template:
PUT _template/product
{
"index_patterns": ["product_*"],
"settings": {
"number_of_shards": 3,
"number_of_replicas": 1
},
"mappings": {
"properties": {
"productName": {
"type": "text"
},
"annual_rate": {
"type": "keyword"
},
"describe": {
"type": "text"
}
}
}
}
Run the sync script
Run the following script on the ECS instance. It fetches all index templates from the source cluster and creates them on the destination cluster.
import requests
from requests.auth import HTTPBasicAuth
# Update these values for your environment.
config = {
# Source cluster
'old_cluster_host': 'es-cn-27a3jyyd2000770dd.public.elasticsearch.aliyuncs.com:9200',
'old_cluster_user': 'yourusername',
'old_cluster_password': 'yourpassward',
'old_cluster_protocol': 'http', # http or https
# Destination cluster
# Find the host on the Basic Information page in the Elasticsearch console.
'new_cluster_host': 'es-cn-o493kbics000cw4pt.elasticsearch.aliyuncs.com:9200',
'new_cluster_user': 'yourusername',
'new_cluster_password': 'yourpassward',
'new_cluster_protocol': 'http', # http or https
'default_replicas': 1,
}
def send_http_request(method, host, endpoint, username="", password="", params=None, json_body=None, protocol='http'):
url = f"{protocol}://{host}{endpoint}"
auth = (username, password) if username and password else None
headers = {'Content-Type': 'application/json'} if method != 'GET' else None
try:
response = requests.request(method, url, auth=auth, params=params, json=json_body, headers=headers)
response.raise_for_status()
return response.json()
except requests.HTTPError as e:
print(f"HTTP Error: {e.response.status_code} for {url}")
print(e.response.text)
except ValueError as e:
print("Invalid JSON response:")
print(response.text)
raise
def get_index_templates():
endpoint = "/_template"
templates_result = send_http_request('GET', config['old_cluster_host'], endpoint, config['old_cluster_user'], config['old_cluster_password'], protocol=config['old_cluster_protocol'])
return templates_result
def create_index_template(template_name, template_body):
endpoint = f"/_template/{template_name}"
create_result = send_http_request('PUT', config['new_cluster_host'], endpoint, config['new_cluster_user'], config['new_cluster_password'], protocol=config['new_cluster_protocol'], json_body=template_body)
print(f"Template {template_name} created with result: {create_result}")
def main():
templates = get_index_templates()
for template_name, template_body in templates.items():
create_index_template(template_name, template_body)
if __name__ == '__main__':
main()
Verify the result
Run the following command on the destination cluster to confirm the index template was created:
GET _template/product
Synchronize index mappings and settings
Index settings define the primary shard and replica shard configuration for an index. This section copies those settings along with field mappings from the source cluster to the destination cluster.
Create test data on the source cluster
Run the following command on the source cluster to create a test index:
PUT /product_info
{
"settings": {
"number_of_shards": 3,
"number_of_replicas": 1
},
"mappings": {
"properties": {
"productName": {
"type": "text"
},
"annual_rate": {
"type": "keyword"
},
"describe": {
"type": "text"
}
}
}
}
The three fields demonstrate common mapping types:
-
productName—texttype, full-text indexed and analyzed for search -
annual_rate—keywordtype, stored as-is for exact-match filtering and aggregation -
describe—texttype, full-text indexed and analyzed for search
Run the sync script
Run the following script on the ECS instance. It reads all open indexes from the source cluster (skipping system indexes that start with .) and recreates them on the destination cluster with the same mappings and shard count. The replica count on the destination is set by default_replicas rather than copied from the source.
import requests
from requests.auth import HTTPBasicAuth
# Update these values for your environment.
config = {
# Source cluster
'old_cluster_host': 'es-cn-27a3jyyd2000770dd.public.elasticsearch.aliyuncs.com:9200',
'old_cluster_user': 'yourusername',
'old_cluster_password': 'yourpassward',
'old_cluster_protocol': 'http', # http or https
# Destination cluster
# Find the host on the Basic Information page in the Elasticsearch console.
'new_cluster_host': 'es-cn-o493kbics000cw4pt.elasticsearch.aliyuncs.com:9200',
'new_cluster_user': 'yourusername',
'new_cluster_password': 'yourpassward',
'new_cluster_protocol': 'http', # http or https
# Replica count to apply on the destination cluster.
'default_replicas': 1,
}
def send_http_request(method, host, endpoint, username="", password="", params=None, json_body=None, protocol='http'):
url = f"{protocol}://{host}{endpoint}"
auth = (username, password) if username and password else None
headers = {'Content-Type': 'application/json'} if method != 'GET' else None
try:
response = requests.request(method, url, auth=auth, params=params, json=json_body, headers=headers)
response.raise_for_status()
return response.json()
except requests.HTTPError as e:
print(f"HTTP Error: {e.response.status_code} for {url}")
print(e.response.text)
except ValueError as e:
print("Invalid JSON response:")
print(response.text)
raise
def get_indices():
endpoint = "/_cat/indices?format=json"
indices_result = send_http_request('GET', config['old_cluster_host'], endpoint, config['old_cluster_user'], config['old_cluster_password'], protocol=config['old_cluster_protocol'])
index_list = [index['index'] for index in indices_result if index['status'] == 'open']
return index_list
def get_index_settings(index):
endpoint = f"/{index}/_settings"
index_settings = send_http_request('GET', config['old_cluster_host'], endpoint, config['old_cluster_user'], config['old_cluster_password'], protocol=config['old_cluster_protocol'])
settings = index_settings[index]['settings']['index']
shards_replicas_settings = {
'number_of_shards': settings.get('number_of_shards'),
'number_of_replicas': config['default_replicas']
}
return {'settings': shards_replicas_settings}
def get_index_mapping(index):
endpoint = f"/{index}/_mapping"
index_mapping = send_http_request('GET', config['old_cluster_host'], endpoint, config['old_cluster_user'], config['old_cluster_password'], protocol=config['old_cluster_protocol'])
return {'mappings': index_mapping[index]['mappings']}
def create_index(old_index_name, new_index_name=""):
if not new_index_name:
new_index_name = old_index_name
settings = get_index_settings(old_index_name)
mappings = get_index_mapping(old_index_name)
body = {**settings, **mappings}
endpoint = f"/{new_index_name}"
create_result = send_http_request('PUT', config['new_cluster_host'], endpoint, config['new_cluster_user'], config['new_cluster_password'], protocol=config['new_cluster_protocol'], json_body=body)
print(f"Index {new_index_name} created with result: {create_result}")
def main():
index_list = get_indices()
for index in index_list:
if not index.startswith("."): # Skip system indexes.
create_index(index)
if __name__ == '__main__':
main()
Verify the result
Run the following command on the destination cluster to confirm the index was created with the correct mappings and settings:
GET _cat/indices/product_info