All Products
Search
Document Center

Elasticsearch:Use Python scripts to synchronize index mappings and settings, an index template, and an ILM policy

Last Updated:Mar 15, 2024

When you migrate data between Elasticsearch clusters, such as migrating data from a self-managed Elasticsearch cluster to an Alibaba Cloud Elasticsearch cluster, you must make sure that the mappings of the two clusters are the same. This can prevent issues such as data loss, format errors, and degraded query performance that may be caused by automatic mapping. Before you migrate data between Elasticsearch clusters, you must manually create the required indexes and define mappings and settings for the indexes. This topic describes how to use Python scripts to synchronize predefined index mappings and settings, an index template, and an index lifecycle management (ILM) policy from an Elasticsearch cluster to another Elasticsearch cluster.

Prerequisites

  1. Two Alibaba Cloud Elasticsearch clusters are created. For more information, see Create an Alibaba Cloud Elasticsearch cluster.

    The Elasticsearch clusters are separately used as the source cluster and the destination cluster, and the versions of both clusters are V7.10.

    Note

    In this example, Python scripts for Elasticsearch V7.10 clusters are used. Mappings for indexes in Elasticsearch V7.10 clusters may be different from those in Elasticsearch clusters earlier than V7.10. For example, Elasticsearch V7.10 clusters do not support multi-type indexes that are supported in Elasticsearch clusters earlier than V7.10, and you cannot create multi-type indexes for an Elasticsearch V7.10 cluster by using Python scripts provided in this example. If you use Elasticsearch clusters earlier than V7.10, you need to modify the Python scripts provided in this example based on your business requirements.

  2. An Elastic Compute Service (ECS) instance is created, and a Python environment is prepared. For more information, see Get started with Linux instances.

    In this example, a Python 3.6.8 environment is prepared. If you want to use a Python environment of another version, you can modify configurations based on the Python requests module of the API operation for the version.

  3. Network connections are established between the Elasticsearch clusters and the ECS instance, and the public or private IP address of the ECS instance is added to the public or private IP address whitelists of the Elasticsearch clusters.

    Note

    To ensure data security in the production environment, we recommend that you establish network connections between the Elasticsearch clusters and the ECS instance over an internal network.

Synchronize mappings and settings of an index

Synchronize mappings and settings of an index in the source Elasticsearch cluster. The settings of an index define the configurations of primary shards and replica shards for the index.

  1. Prepare test data.

    Run the following command in the source Elasticsearch cluster to create an index in the cluster:

    PUT /product_info
    {
      "settings": {
        "number_of_shards": 3,
        "number_of_replicas": 1
      },
      "mappings": {
          "properties": {
            "productName": {
              "type": "text"
            },
            "annual_rate":{
              "type":"keyword"
            },
            "describe": {
              "type": "text"
            }
        }
      }
    }
  2. Run the following script in the ECS instance to synchronize mappings and settings of the created index:

    import requests
    from requests.auth import HTTPBasicAuth
    
    # Modify the following configurations based on the environment that you use.
    config = {
        # The host of the source Elasticsearch cluster.
        'old_cluster_host': 'es-cn-27a3jyyd2000770dd.public.elasticsearch.aliyuncs.com:9200',
        # The username that you can use to access the source Elasticsearch cluster.
        'old_cluster_user': 'yourusername',
        # The password that you can use to access the source Elasticsearch cluster.
        'old_cluster_password': 'yourpassward',
        # The protocol that is used by the source Elasticsearch cluster. Valid values: http or https.
        'old_cluster_protocol': 'http',
        # The host of the destination Elasticsearch cluster. You can obtain the host on the Basic Information page of the destination Elasticsearch cluster in the Elasticsearch console. 
        'new_cluster_host': 'es-cn-o493kbics000cw4pt.elasticsearch.aliyuncs.com:9200',
        # The username that you can use to access the destination Elasticsearch cluster.
        'new_cluster_user': 'yourusername',
        # The password that you can use to access the destination Elasticsearch cluster.
        'new_cluster_password': 'yourpassward',
        # The protocol that is used by the destination Elasticsearch cluster. Valid values: http and https.
        'new_cluster_protocol': 'http',
        # The default number of replica shards for the destination Elasticsearch cluster.
        'default_replicas': 1,
    }
    
    # Specify a common function for an HTTP request.
    def send_http_request(method, host, endpoint, username="", password="", params=None, json_body=None, protocol='http'):
        url = f"{protocol}://{host}{endpoint}"
        auth = (username, password) if username and password else None
        headers = {'Content-Type': 'application/json'} if method != 'GET' else None
        try:
            response = requests.request(method, url, auth=auth, params=params, json=json_body, headers=headers)
            response.raise_for_status()
            return response.json()
        except requests.HTTPError as e:
            # Print an error message.
            print(f"HTTP Error: {e.response.status_code} for {url}")
            print(e.response.text)
        except ValueError as e:
            # If the response is not in the JSON format, print the related error message and return the response.
            print("Invalid JSON response:")
            print(response.text)
            raise
    
    # Query all indexes.
    def get_indices():
        endpoint = "/_cat/indices?format=json"
        indices_result = send_http_request('GET', config['old_cluster_host'], endpoint, config['old_cluster_user'], config['old_cluster_password'], protocol=config['old_cluster_protocol'])
        index_list = [index['index'] for index in indices_result if index['status'] == 'open']
        return index_list
    
    # Query settings of an index.
    def get_index_settings(index):
        endpoint = f"/{index}/_settings"
        index_settings = send_http_request('GET', config['old_cluster_host'], endpoint, config['old_cluster_user'], config['old_cluster_password'], protocol=config['old_cluster_protocol'])
        settings = index_settings[index]['settings']['index']
        shards_replicas_settings = {
            'number_of_shards': settings.get('number_of_shards'),
            'number_of_replicas': config['default_replicas']
        }
        return {'settings': shards_replicas_settings}
    
    # Query mappings of an index.
    def get_index_mapping(index):
        endpoint = f"/{index}/_mapping"
        index_mapping = send_http_request('GET', config['old_cluster_host'], endpoint, config['old_cluster_user'], config['old_cluster_password'], protocol=config['old_cluster_protocol'])
        return {'mappings': index_mapping[index]['mappings']}
    
    # Create a new index.
    def create_index(old_index_name, new_index_name=""):
        if not new_index_name:
            new_index_name = old_index_name
    
        settings = get_index_settings(old_index_name)
        mappings = get_index_mapping(old_index_name)
        body = {**settings, **mappings}
    
        endpoint = f"/{new_index_name}"
        create_result = send_http_request('PUT', config['new_cluster_host'], endpoint, config['new_cluster_user'], config['new_cluster_password'], protocol=config['new_cluster_protocol'], json_body=body)
    
        print(f"Index {new_index_name} created with result: {create_result}")
    
    # Specify the main function.
    def main():
        index_list = get_indices()
        for index in index_list:
            if not index.startswith("."):  # Ignore system indexes.
                create_index(index)
    
    
    if __name__ == '__main__':
        main()
    
  3. Verify the synchronization result.

    Run the following command in the destination Elasticsearch cluster to view the mappings and settings that are synchronized:

    GET _cat/indices/product_info

Synchronize an index template

  1. Prepare test data.

    Run the following command in the source Elasticsearch cluster to create an index template:

    PUT _template/product
    {
      "index_patterns": ["product_*"],
      "settings": {
        "number_of_shards": 3,
        "number_of_replicas": 1
      },
      "mappings": {
        "properties": {
            "productName": {
              "type": "text"
            },
            "annual_rate":{
              "type":"keyword"
            },
            "describe": {
              "type": "text"
            }
        }
      }
    }
  2. Run the following script in the ECS instance to synchronize the index template:

    import requests
    from requests.auth import HTTPBasicAuth
    
    # Modify the following configurations based on the environment that you use.
    config = {
        # The host of the source Elasticsearch cluster.
        'old_cluster_host': 'es-cn-27a3jyyd2000770dd.public.elasticsearch.aliyuncs.com:9200',
        # The username that you can use to access the source Elasticsearch cluster.
        'old_cluster_user': 'yourusername',
        # The password that you can use to access the source Elasticsearch cluster.
        'old_cluster_password': 'yourpassward',
        # The protocol that is used by the source Elasticsearch cluster. Valid values: http or https.
        'old_cluster_protocol': 'http',
        # The host of the destination Elasticsearch cluster. You can obtain the host on the Basic Information page of the destination Elasticsearch cluster in the Elasticsearch console. 
        'new_cluster_host': 'es-cn-o493kbics000cw4pt.elasticsearch.aliyuncs.com:9200',
        # The username that you can use to access the destination Elasticsearch cluster.
        'new_cluster_user': 'yourusername',
        # The password that you can use to access the destination Elasticsearch cluster.
        'new_cluster_password': 'yourpassward',
        # The protocol that is used by the destination Elasticsearch cluster. Valid values: http and https.
        'new_cluster_protocol': 'http',
        # The default number of replica shards for the destination Elasticsearch cluster.
        'default_replicas': 1,
    }
    
    # Specify a common function for an HTTP request.
    def send_http_request(method, host, endpoint, username="", password="", params=None, json_body=None, protocol='http'):
        url = f"{protocol}://{host}{endpoint}"
        auth = (username, password) if username and password else None
        headers = {'Content-Type': 'application/json'} if method != 'GET' else None
        try:
            response = requests.request(method, url, auth=auth, params=params, json=json_body, headers=headers)
            response.raise_for_status()
            return response.json()
        except requests.HTTPError as e:
            # Print an error message.
            print(f"HTTP Error: {e.response.status_code} for {url}")
            print(e.response.text)
        except ValueError as e:
            # If the response is not in the JSON format, print the related error message and return the response.
            print("Invalid JSON response:")
            print(response.text)
            raise
    
    # Query all index templates of the source Elasticsearch cluster.
    def get_index_templates():
        endpoint = "/_template"
        templates_result = send_http_request('GET', config['old_cluster_host'], endpoint, config['old_cluster_user'], config['old_cluster_password'], protocol=config['old_cluster_protocol'])
        return templates_result
    
    # Create an index template for the destination Elasticsearch cluster.
    def create_index_template(template_name, template_body):
        endpoint = f"/_template/{template_name}"
        create_result = send_http_request('PUT', config['new_cluster_host'], endpoint, config['new_cluster_user'], config['new_cluster_password'], protocol=config['new_cluster_protocol'], json_body=template_body)
        print(f"Template {template_name} created with result: {create_result}")
    
    # Specify the main function.
    def main():
    
        # Synchronize the index template of the source Elasticsearch cluster.
        templates = get_index_templates()
        for template_name, template_body in templates.items():
            create_index_template(template_name, template_body)
    
    if __name__ == '__main__':
        main()
  3. Verify the synchronization result.

    Run the following command in the destination Elasticsearch cluster to query information about the index template that is synchronized:

    GET _template/product

Synchronize an ILM policy

  1. Prepare test data.

    Run the following command in the source Elasticsearch cluster to create an ILM policy:

    PUT _ilm/policy/product
    {
      "policy": {
        "phases": {
          "hot": {
            "actions": {
              "rollover": {
                "max_size": "1GB",
                "max_age": "1d",
                "max_docs": 1000
              }
            }
          },
          "delete": {
            "min_age": "2h",
            "actions": {
              "delete": {}
            }
          }
        }
      }
    }
  1. Run the following script in the ECS instance to synchronize the ILM policy:

    import requests
    from requests.auth import HTTPBasicAuth
    
    # Modify the following configurations based on the environment that you use.
    config = {
        # The host of the source Elasticsearch cluster.
        'old_cluster_host': 'es-cn-27a3jyyd2000770dd.public.elasticsearch.aliyuncs.com:9200',
        # The username that you can use to access the source Elasticsearch cluster.
        'old_cluster_user': 'yourusername',
        # The password that you can use to access the source Elasticsearch cluster.
        'old_cluster_password': 'yourpassward',
        # The protocol that is used by the source Elasticsearch cluster. Valid values: http or https.
        'old_cluster_protocol': 'http',
        # The host of the destination Elasticsearch cluster. You can obtain the host on the Basic Information page of the destination Elasticsearch cluster in the Elasticsearch console. 
        'new_cluster_host': 'es-cn-o493kbics000cw4pt.elasticsearch.aliyuncs.com:9200',
        # The username that you can use to access the destination Elasticsearch cluster.
        'new_cluster_user': 'yourusername',
        # The password that you can use to access the destination Elasticsearch cluster.
        'new_cluster_password': 'yourpassward',
        # The protocol that is used by the destination Elasticsearch cluster. Valid values: http and https.
        'new_cluster_protocol': 'http',
        # The default number of replica shards for the destination Elasticsearch cluster.
        'default_replicas': 1,
    }
    
    # Specify a common function for an HTTP request.
    def send_http_request(method, host, endpoint, username="", password="", params=None, json_body=None, protocol='http'):
        url = f"{protocol}://{host}{endpoint}"
        auth = (username, password) if username and password else None
        headers = {'Content-Type': 'application/json'} if method != 'GET' else None
        try:
            response = requests.request(method, url, auth=auth, params=params, json=json_body, headers=headers)
            response.raise_for_status()
            return response.json()
        except requests.HTTPError as e:
            # Print an error message.
            print(f"HTTP Error: {e.response.status_code} for {url}")
            print(e.response.text)
        except ValueError as e:
            # If the response is not in the JSON format, print the related error message and return the response.
            print("Invalid JSON response:")
            print(response.text)
            raise
    
    # Query all ILM policies of the source Elasticsearch cluster.
    def get_ilm_polices():
        endpoint = "/_ilm/policy"
        templates_result = send_http_request('GET', config['old_cluster_host'], endpoint, config['old_cluster_user'], config['old_cluster_password'], protocol=config['old_cluster_protocol'])
        return templates_result
    
    # Create an ILM policy for the destination Elasticsearch cluster.
    def create_ilm_policy(policy_name, policy_body):
        policy_body.pop('version', None)
        policy_body.pop('modified_date', None)
        policy_body.pop('modified_date_string', None)
    
        endpoint = f"/_ilm/policy/{policy_name}"
        create_result = send_http_request('PUT', config['new_cluster_host'], endpoint, config['new_cluster_user'], config['new_cluster_password'], protocol=config['new_cluster_protocol'], json_body=policy_body)
        print(f"Policy {policy_name} created with result: {create_result}")
    
    # Specify the main function.
    def main():
    
        # Synchronize the ILM policy of the source Elasticsearch cluster.
        policies = get_ilm_polices()
        for policy_name, policy_body in policies.items():
            create_ilm_policy(policy_name, policy_body)
    if __name__ == '__main__':
        main()
    
  2. Verify the synchronization result.

    Run the following command in the destination Elasticsearch cluster to query information about the ILM policy that is synchronized:

    GET _ilm/policy/product