すべてのプロダクト
Search
ドキュメントセンター

Elasticsearch:Pythonスクリプトを使用して、インデックスマッピングと設定、インデックステンプレート、およびILMポリシーを同期する

最終更新日:Jan 11, 2025

セルフマネージドElasticsearchクラスターからAlibaba Cloud Elasticsearchクラスターへのデータ移行など、Elasticsearchクラスター間でデータを移行する場合、2つのクラスターのマッピングが同じであることを確認する必要があります。これは、自動マッピングによって発生する可能性のあるデータ損失、フォーマットエラー、クエリパフォーマンスの低下などの問題を防ぐことができます。Elasticsearchクラスター間でデータを移行する前に、必要なインデックスを手動で作成し、インデックスのマッピングと設定を定義する必要があります。このトピックでは、Pythonスクリプトを使用して、事前定義されたインデックスマッピングと設定、インデックステンプレート、およびインデックスライフサイクル管理(ILM)ポリシーをElasticsearchクラスターから別のElasticsearchクラスターに同期する方法について説明します。

前提条件

  1. 2つのAlibaba Cloud Elasticsearchクラスターが作成されています。詳細については、Alibaba Cloud Elasticsearchクラスターを作成するをご参照ください。

    Elasticsearchクラスターは、ソースクラスターとデスティネーションクラスターとして個別に使用され、両方のクラスターのバージョンはV7.10です。

    説明

    この例では、Elasticsearch V7.10クラスター用のPythonスクリプトを使用しています。Elasticsearch V7.10クラスターのインデックスのマッピングは、V7.10より前のElasticsearchクラスターのマッピングとは異なる場合があります。たとえば、Elasticsearch V7.10クラスターは、V7.10より前のElasticsearchクラスターでサポートされているマルチタイプインデックスをサポートしておらず、この例で提供されているPythonスクリプトを使用してElasticsearch V7.10クラスターのマルチタイプインデックスを作成することはできません。V7.10より前のElasticsearchクラスターを使用する場合は、ビジネス要件に基づいて、この例で提供されているPythonスクリプトを変更する必要があります。

  2. Elastic Compute Service(ECS)インスタンスが作成され、Python環境が準備されています。詳細については、Linuxインスタンスの使用を開始するをご参照ください。

    この例では、Python 3.6.8環境が準備されています。別のバージョンのPython環境を使用する場合は、そのバージョンのAPI操作のPython requestsモジュールに基づいて構成を変更できます。

  3. ElasticsearchクラスターとECSインスタンス間にネットワーク接続が確立され、ECSインスタンスのパブリックまたはプライベートIPアドレスがElasticsearchクラスターのパブリックまたはプライベートIPアドレスホワイトリストに追加されます。

    説明

    本番環境でのデータセキュリティを確保するために、内部ネットワークを介してElasticsearchクラスターとECSインスタンス間のネットワーク接続を確立することをお勧めします。

インデックスのマッピングと設定を同期する

ソースElasticsearchクラスター内のインデックスのマッピングと設定を同期します。インデックスの設定は、インデックスのプライマリシャードとレプリカシャードの構成を定義します。

  1. テストデータを準備します。

    ソースElasticsearchクラスターで次のコマンドを実行して、クラスターにインデックスを作成します。

    PUT /product_info
    {
      "settings": {
        "number_of_shards": 3,
        "number_of_replicas": 1
      },
      "mappings": {
          "properties": {
            "productName": {
              "type": "text"
            },
            "annual_rate":{
              "type":"keyword"
            },
            "describe": {
              "type": "text"
            }
        }
      }
    }
  2. ECSインスタンスで次のスクリプトを実行して、作成されたインデックスのマッピングと設定を同期します。

    import requests
    from requests.auth import HTTPBasicAuth
    
    # 使用する環境に基づいて、以下の構成を変更します。
    config = {
        # ソースElasticsearchクラスターのホスト。
        'old_cluster_host': 'es-cn-27a3jyyd2000770dd.public.elasticsearch.aliyuncs.com:9200',
        # ソースElasticsearchクラスターにアクセスするために使用できるユーザー名。
        'old_cluster_user': 'yourusername',
        # ソースElasticsearchクラスターにアクセスするために使用できるパスワード。
        'old_cluster_password': 'yourpassward',
        # ソースElasticsearchクラスターで使用されるプロトコル。有効な値:httpまたはhttps。
        'old_cluster_protocol': 'http',
        # デスティネーションElasticsearchクラスターのホスト。ホストは、ElasticsearchコンソールのデスティネーションElasticsearchクラスターの[基本情報]ページで取得できます。
        'new_cluster_host': 'es-cn-o493kbics000cw4pt.elasticsearch.aliyuncs.com:9200',
        # デスティネーションElasticsearchクラスターにアクセスするために使用できるユーザー名。
        'new_cluster_user': 'yourusername',
        # デスティネーションElasticsearchクラスターにアクセスするために使用できるパスワード。
        'new_cluster_password': 'yourpassward',
        # デスティネーションElasticsearchクラスターで使用されるプロトコル。有効な値:httpおよびhttps。
        'new_cluster_protocol': 'http',
        # デスティネーションElasticsearchクラスターのレプリカシャードのデフォルト数。
        'default_replicas': 1,
    }
    
    # HTTPリクエストの共通関数を指定します。
    def send_http_request(method, host, endpoint, username="", password="", params=None, json_body=None, protocol='http'):
        url = f"{protocol}://{host}{endpoint}"
        auth = (username, password) if username and password else None
        headers = {'Content-Type': 'application/json'} if method != 'GET' else None
        try:
            response = requests.request(method, url, auth=auth, params=params, json=json_body, headers=headers)
            response.raise_for_status()
            return response.json()
        except requests.HTTPError as e:
            # エラーメッセージを出力します。
            print(f"HTTP Error: {e.response.status_code} for {url}")
            print(e.response.text)
        except ValueError as e:
            # レスポンスがJSON形式でない場合は、関連するエラーメッセージを出力し、レスポンスを返します。
            print("Invalid JSON response:")
            print(response.text)
            raise
    
    # すべてのインデックスをクエリします。
    def get_indices():
        endpoint = "/_cat/indices?format=json"
        indices_result = send_http_request('GET', config['old_cluster_host'], endpoint, config['old_cluster_user'], config['old_cluster_password'], protocol=config['old_cluster_protocol'])
        index_list = [index['index'] for index in indices_result if index['status'] == 'open']
        return index_list
    
    # インデックスの設定をクエリします。
    def get_index_settings(index):
        endpoint = f"/{index}/_settings"
        index_settings = send_http_request('GET', config['old_cluster_host'], endpoint, config['old_cluster_user'], config['old_cluster_password'], protocol=config['old_cluster_protocol'])
        settings = index_settings[index]['settings']['index']
        shards_replicas_settings = {
            'number_of_shards': settings.get('number_of_shards'),
            'number_of_replicas': config['default_replicas']
        }
        return {'settings': shards_replicas_settings}
    
    # インデックスのマッピングをクエリします。
    def get_index_mapping(index):
        endpoint = f"/{index}/_mapping"
        index_mapping = send_http_request('GET', config['old_cluster_host'], endpoint, config['old_cluster_user'], config['old_cluster_password'], protocol=config['old_cluster_protocol'])
        return {'mappings': index_mapping[index]['mappings']}
    
    
    # 新しいインデックスを作成します。
    def create_index(old_index_name, new_index_name=""):
        if not new_index_name:
            new_index_name = old_index_name
    
        settings = get_index_settings(old_index_name)
        mappings = get_index_mapping(old_index_name)
        body = {**settings, **mappings}
    
        endpoint = f"/{new_index_name}"
        create_result = send_http_request('PUT', config['new_cluster_host'], endpoint, config['new_cluster_user'], config['new_cluster_password'], protocol=config['new_cluster_protocol'], json_body=body)
    
        print(f"Index {new_index_name} created with result: {create_result}")
    
    
    # メイン関数を指定します。
    def main():
        index_list = get_indices()
        for index in index_list:
            if not index.startswith("."):  # システムインデックスは無視します。
                create_index(index)
    
    
    if __name__ == '__main__':
        main()
    
  3. 同期結果を確認します。

    デスティネーションElasticsearchクラスターで次のコマンドを実行して、同期されたマッピングと設定を表示します。

    GET _cat/indices/product_info

インデックステンプレートを同期する

  1. テストデータを準備します。

    ソースElasticsearchクラスターで次のコマンドを実行して、インデックステンプレートを作成します。

    PUT _template/product
    {
      "index_patterns": ["product_*"],
      "settings": {
        "number_of_shards": 3,
        "number_of_replicas": 1
      },
      "mappings": {
        "properties": {
            "productName": {
              "type": "text"
            },
            "annual_rate":{
              "type":"keyword"
            },
            "describe": {
              "type": "text"
            }
        }
      }
    }
  2. ECSインスタンスで次のスクリプトを実行して、インデックステンプレートを同期します。

    import requests
    from requests.auth import HTTPBasicAuth
    
    # 使用する環境に基づいて、以下の構成を変更します。
    config = {
        # ソースElasticsearchクラスターのホスト。
        'old_cluster_host': 'es-cn-27a3jyyd2000770dd.public.elasticsearch.aliyuncs.com:9200',
        # ソースElasticsearchクラスターにアクセスするために使用できるユーザー名。
        'old_cluster_user': 'yourusername',
        # ソースElasticsearchクラスターにアクセスするために使用できるパスワード。
        'old_cluster_password': 'yourpassward',
        # ソースElasticsearchクラスターで使用されるプロトコル。有効な値:httpまたはhttps。
        'old_cluster_protocol': 'http',
        # デスティネーションElasticsearchクラスターのホスト。ホストは、ElasticsearchコンソールのデスティネーションElasticsearchクラスターの[基本情報]ページで取得できます。
        'new_cluster_host': 'es-cn-o493kbics000cw4pt.elasticsearch.aliyuncs.com:9200',
        # デスティネーションElasticsearchクラスターにアクセスするために使用できるユーザー名。
        'new_cluster_user': 'yourusername',
        # デスティネーションElasticsearchクラスターにアクセスするために使用できるパスワード。
        'new_cluster_password': 'yourpassward',
        # デスティネーションElasticsearchクラスターで使用されるプロトコル。有効な値:httpおよびhttps。
        'new_cluster_protocol': 'http',
        # デスティネーションElasticsearchクラスターのレプリカシャードのデフォルト数。
        'default_replicas': 1,
    }
    
    
    # HTTPリクエストの共通関数を指定します。
    def send_http_request(method, host, endpoint, username="", password="", params=None, json_body=None, protocol='http'):
        url = f"{protocol}://{host}{endpoint}"
        auth = (username, password) if username and password else None
        headers = {'Content-Type': 'application/json'} if method != 'GET' else None
        try:
            response = requests.request(method, url, auth=auth, params=params, json=json_body, headers=headers)
            response.raise_for_status()
            return response.json()
        except requests.HTTPError as e:
            # エラーメッセージを出力します。
            print(f"HTTP Error: {e.response.status_code} for {url}")
            print(e.response.text)
        except ValueError as e:
            # レスポンスがJSON形式でない場合は、関連するエラーメッセージを出力し、レスポンスを返します。
            print("Invalid JSON response:")
            print(response.text)
            raise
    
    
    # ソースElasticsearchクラスターのすべてのインデックステンプレートをクエリします。
    def get_index_templates():
        endpoint = "/_template"
        templates_result = send_http_request('GET', config['old_cluster_host'], endpoint, config['old_cluster_user'], config['old_cluster_password'], protocol=config['old_cluster_protocol'])
        return templates_result
    
    # デスティネーションElasticsearchクラスターのインデックステンプレートを作成します。
    def create_index_template(template_name, template_body):
        endpoint = f"/_template/{template_name}"
        create_result = send_http_request('PUT', config['new_cluster_host'], endpoint, config['new_cluster_user'], config['new_cluster_password'], protocol=config['new_cluster_protocol'], json_body=template_body)
        print(f"Template {template_name} created with result: {create_result}")
    
    
    # メイン関数を指定します。
    def main():
    
        # ソースElasticsearchクラスターのインデックステンプレートを同期します。
        templates = get_index_templates()
        for template_name, template_body in templates.items():
            create_index_template(template_name, template_body)
    
    if __name__ == '__main__':
        main()
    
  3. 同期結果を確認します。

    デスティネーションElasticsearchクラスターで次のコマンドを実行して、同期されたインデックステンプレートに関する情報をクエリします。

    GET _template/product

ILMポリシーを同期する

  1. テストデータを準備します。

    ソースElasticsearchクラスターで次のコマンドを実行して、ILMポリシーを作成します。

    PUT _ilm/policy/product
    {
      "policy": {
        "phases": {
          "hot": {
            "actions": {
              "rollover": {
                "max_size": "1GB",
                "max_age": "1d",
                "max_docs": 1000
              }
            }
          },
          "delete": {
            "min_age": "2h",
            "actions": {
              "delete": {}
            }
          }
        }
      }
    }
  1. ECSインスタンスで次のスクリプトを実行して、ILMポリシーを同期します。

    import requests
    from requests.auth import HTTPBasicAuth
    
    # 使用する環境に基づいて、以下の構成を変更します。
    config = {
        # ソースElasticsearchクラスターのホスト。
        'old_cluster_host': 'es-cn-27a3jyyd2000770dd.public.elasticsearch.aliyuncs.com:9200',
        # ソースElasticsearchクラスターにアクセスするために使用できるユーザー名。
        'old_cluster_user': 'yourusername',
        # ソースElasticsearchクラスターにアクセスするために使用できるパスワード。
        'old_cluster_password': 'yourpassward',
        # ソースElasticsearchクラスターで使用されるプロトコル。有効な値:httpまたはhttps。
        'old_cluster_protocol': 'http',
        # デスティネーションElasticsearchクラスターのホスト。ホストは、ElasticsearchコンソールのデスティネーションElasticsearchクラスターの[基本情報]ページで取得できます。
        'new_cluster_host': 'es-cn-o493kbics000cw4pt.elasticsearch.aliyuncs.com:9200',
        # デスティネーションElasticsearchクラスターにアクセスするために使用できるユーザー名。
        'new_cluster_user': 'yourusername',
        # デスティネーションElasticsearchクラスターにアクセスするために使用できるパスワード。
        'new_cluster_password': 'yourpassward',
        # デスティネーションElasticsearchクラスターで使用されるプロトコル。有効な値:httpおよびhttps。
        'new_cluster_protocol': 'http',
        # デスティネーションElasticsearchクラスターのレプリカシャードのデフォルト数。
        'default_replicas': 1,
    }
    
    # HTTPリクエストの共通関数を指定します。
    def send_http_request(method, host, endpoint, username="", password="", params=None, json_body=None, protocol='http'):
        url = f"{protocol}://{host}{endpoint}"
        auth = (username, password) if username and password else None
        headers = {'Content-Type': 'application/json'} if method != 'GET' else None
        try:
            response = requests.request(method, url, auth=auth, params=params, json=json_body, headers=headers)
            response.raise_for_status()
            return response.json()
        except requests.HTTPError as e:
            # エラーメッセージを出力します。
            print(f"HTTP Error: {e.response.status_code} for {url}")
            print(e.response.text)
        except ValueError as e:
            # レスポンスがJSON形式でない場合は、関連するエラーメッセージを出力し、レスポンスを返します。
            print("Invalid JSON response:")
            print(response.text)
            raise
    
    # ソースElasticsearchクラスターのすべてのILMポリシーをクエリします。
    def get_ilm_polices():
        endpoint = "/_ilm/policy"
        templates_result = send_http_request('GET', config['old_cluster_host'], endpoint, config['old_cluster_user'], config['old_cluster_password'], protocol=config['old_cluster_protocol'])
        return templates_result
    
    # デスティネーションElasticsearchクラスターのILMポリシーを作成します。
    def create_ilm_policy(policy_name, policy_body):
        policy_body.pop('version', None) # version 情報を削除します。
        policy_body.pop('modified_date', None) # modified_date 情報を削除します。
        policy_body.pop('modified_date_string', None) # modified_date_string 情報を削除します。
    
    
        endpoint = f"/_ilm/policy/{policy_name}"
        create_result = send_http_request('PUT', config['new_cluster_host'], endpoint, config['new_cluster_user'], config['new_cluster_password'], protocol=config['new_cluster_protocol'], json_body=policy_body)
        print(f"Policy {policy_name} created with result: {create_result}")
    
    # メイン関数を指定します。
    def main():
    
        # ソースElasticsearchクラスターのILMポリシーを同期します。
        policies = get_ilm_polices()
        for policy_name, policy_body in policies.items():
            create_ilm_policy(policy_name, policy_body)
    if __name__ == '__main__':
        main()
    
  2. 同期結果を確認します。

    デスティネーションElasticsearchクラスターで次のコマンドを実行して、同期されたILMポリシーに関する情報をクエリします。

    GET _ilm/policy/product