セルフマネージドElasticsearchクラスターからAlibaba Cloud Elasticsearchクラスターへのデータ移行など、Elasticsearchクラスター間でデータを移行する場合、2つのクラスターのマッピングが同じであることを確認する必要があります。これは、自動マッピングによって発生する可能性のあるデータ損失、フォーマットエラー、クエリパフォーマンスの低下などの問題を防ぐことができます。Elasticsearchクラスター間でデータを移行する前に、必要なインデックスを手動で作成し、インデックスのマッピングと設定を定義する必要があります。このトピックでは、Pythonスクリプトを使用して、事前定義されたインデックスマッピングと設定、インデックステンプレート、およびインデックスライフサイクル管理(ILM)ポリシーをElasticsearchクラスターから別のElasticsearchクラスターに同期する方法について説明します。
前提条件
2つのAlibaba Cloud Elasticsearchクラスターが作成されています。詳細については、Alibaba Cloud Elasticsearchクラスターを作成するをご参照ください。
Elasticsearchクラスターは、ソースクラスターとデスティネーションクラスターとして個別に使用され、両方のクラスターのバージョンはV7.10です。
説明この例では、Elasticsearch V7.10クラスター用のPythonスクリプトを使用しています。Elasticsearch V7.10クラスターのインデックスのマッピングは、V7.10より前のElasticsearchクラスターのマッピングとは異なる場合があります。たとえば、Elasticsearch V7.10クラスターは、V7.10より前のElasticsearchクラスターでサポートされているマルチタイプインデックスをサポートしておらず、この例で提供されているPythonスクリプトを使用してElasticsearch V7.10クラスターのマルチタイプインデックスを作成することはできません。V7.10より前のElasticsearchクラスターを使用する場合は、ビジネス要件に基づいて、この例で提供されているPythonスクリプトを変更する必要があります。
Elastic Compute Service(ECS)インスタンスが作成され、Python環境が準備されています。詳細については、Linuxインスタンスの使用を開始するをご参照ください。
この例では、Python 3.6.8環境が準備されています。別のバージョンのPython環境を使用する場合は、そのバージョンのAPI操作のPython requestsモジュールに基づいて構成を変更できます。
ElasticsearchクラスターとECSインスタンス間にネットワーク接続が確立され、ECSインスタンスのパブリックまたはプライベートIPアドレスがElasticsearchクラスターのパブリックまたはプライベートIPアドレスホワイトリストに追加されます。
説明本番環境でのデータセキュリティを確保するために、内部ネットワークを介してElasticsearchクラスターとECSインスタンス間のネットワーク接続を確立することをお勧めします。
インデックスのマッピングと設定を同期する
ソースElasticsearchクラスター内のインデックスのマッピングと設定を同期します。インデックスの設定は、インデックスのプライマリシャードとレプリカシャードの構成を定義します。
テストデータを準備します。
ソースElasticsearchクラスターで次のコマンドを実行して、クラスターにインデックスを作成します。
PUT /product_info { "settings": { "number_of_shards": 3, "number_of_replicas": 1 }, "mappings": { "properties": { "productName": { "type": "text" }, "annual_rate":{ "type":"keyword" }, "describe": { "type": "text" } } } }ECSインスタンスで次のスクリプトを実行して、作成されたインデックスのマッピングと設定を同期します。
import requests from requests.auth import HTTPBasicAuth # 使用する環境に基づいて、以下の構成を変更します。 config = { # ソースElasticsearchクラスターのホスト。 'old_cluster_host': 'es-cn-27a3jyyd2000770dd.public.elasticsearch.aliyuncs.com:9200', # ソースElasticsearchクラスターにアクセスするために使用できるユーザー名。 'old_cluster_user': 'yourusername', # ソースElasticsearchクラスターにアクセスするために使用できるパスワード。 'old_cluster_password': 'yourpassward', # ソースElasticsearchクラスターで使用されるプロトコル。有効な値:httpまたはhttps。 'old_cluster_protocol': 'http', # デスティネーションElasticsearchクラスターのホスト。ホストは、ElasticsearchコンソールのデスティネーションElasticsearchクラスターの[基本情報]ページで取得できます。 'new_cluster_host': 'es-cn-o493kbics000cw4pt.elasticsearch.aliyuncs.com:9200', # デスティネーションElasticsearchクラスターにアクセスするために使用できるユーザー名。 'new_cluster_user': 'yourusername', # デスティネーションElasticsearchクラスターにアクセスするために使用できるパスワード。 'new_cluster_password': 'yourpassward', # デスティネーションElasticsearchクラスターで使用されるプロトコル。有効な値:httpおよびhttps。 'new_cluster_protocol': 'http', # デスティネーションElasticsearchクラスターのレプリカシャードのデフォルト数。 'default_replicas': 1, } # HTTPリクエストの共通関数を指定します。 def send_http_request(method, host, endpoint, username="", password="", params=None, json_body=None, protocol='http'): url = f"{protocol}://{host}{endpoint}" auth = (username, password) if username and password else None headers = {'Content-Type': 'application/json'} if method != 'GET' else None try: response = requests.request(method, url, auth=auth, params=params, json=json_body, headers=headers) response.raise_for_status() return response.json() except requests.HTTPError as e: # エラーメッセージを出力します。 print(f"HTTP Error: {e.response.status_code} for {url}") print(e.response.text) except ValueError as e: # レスポンスがJSON形式でない場合は、関連するエラーメッセージを出力し、レスポンスを返します。 print("Invalid JSON response:") print(response.text) raise # すべてのインデックスをクエリします。 def get_indices(): endpoint = "/_cat/indices?format=json" indices_result = send_http_request('GET', config['old_cluster_host'], endpoint, config['old_cluster_user'], config['old_cluster_password'], protocol=config['old_cluster_protocol']) index_list = [index['index'] for index in indices_result if index['status'] == 'open'] return index_list # インデックスの設定をクエリします。 def get_index_settings(index): endpoint = f"/{index}/_settings" index_settings = send_http_request('GET', config['old_cluster_host'], endpoint, config['old_cluster_user'], config['old_cluster_password'], protocol=config['old_cluster_protocol']) settings = index_settings[index]['settings']['index'] shards_replicas_settings = { 'number_of_shards': settings.get('number_of_shards'), 'number_of_replicas': config['default_replicas'] } return {'settings': shards_replicas_settings} # インデックスのマッピングをクエリします。 def get_index_mapping(index): endpoint = f"/{index}/_mapping" index_mapping = send_http_request('GET', config['old_cluster_host'], endpoint, config['old_cluster_user'], config['old_cluster_password'], protocol=config['old_cluster_protocol']) return {'mappings': index_mapping[index]['mappings']} # 新しいインデックスを作成します。 def create_index(old_index_name, new_index_name=""): if not new_index_name: new_index_name = old_index_name settings = get_index_settings(old_index_name) mappings = get_index_mapping(old_index_name) body = {**settings, **mappings} endpoint = f"/{new_index_name}" create_result = send_http_request('PUT', config['new_cluster_host'], endpoint, config['new_cluster_user'], config['new_cluster_password'], protocol=config['new_cluster_protocol'], json_body=body) print(f"Index {new_index_name} created with result: {create_result}") # メイン関数を指定します。 def main(): index_list = get_indices() for index in index_list: if not index.startswith("."): # システムインデックスは無視します。 create_index(index) if __name__ == '__main__': main()同期結果を確認します。
デスティネーションElasticsearchクラスターで次のコマンドを実行して、同期されたマッピングと設定を表示します。
GET _cat/indices/product_info
インデックステンプレートを同期する
テストデータを準備します。
ソースElasticsearchクラスターで次のコマンドを実行して、インデックステンプレートを作成します。
PUT _template/product { "index_patterns": ["product_*"], "settings": { "number_of_shards": 3, "number_of_replicas": 1 }, "mappings": { "properties": { "productName": { "type": "text" }, "annual_rate":{ "type":"keyword" }, "describe": { "type": "text" } } } }ECSインスタンスで次のスクリプトを実行して、インデックステンプレートを同期します。
import requests from requests.auth import HTTPBasicAuth # 使用する環境に基づいて、以下の構成を変更します。 config = { # ソースElasticsearchクラスターのホスト。 'old_cluster_host': 'es-cn-27a3jyyd2000770dd.public.elasticsearch.aliyuncs.com:9200', # ソースElasticsearchクラスターにアクセスするために使用できるユーザー名。 'old_cluster_user': 'yourusername', # ソースElasticsearchクラスターにアクセスするために使用できるパスワード。 'old_cluster_password': 'yourpassward', # ソースElasticsearchクラスターで使用されるプロトコル。有効な値:httpまたはhttps。 'old_cluster_protocol': 'http', # デスティネーションElasticsearchクラスターのホスト。ホストは、ElasticsearchコンソールのデスティネーションElasticsearchクラスターの[基本情報]ページで取得できます。 'new_cluster_host': 'es-cn-o493kbics000cw4pt.elasticsearch.aliyuncs.com:9200', # デスティネーションElasticsearchクラスターにアクセスするために使用できるユーザー名。 'new_cluster_user': 'yourusername', # デスティネーションElasticsearchクラスターにアクセスするために使用できるパスワード。 'new_cluster_password': 'yourpassward', # デスティネーションElasticsearchクラスターで使用されるプロトコル。有効な値:httpおよびhttps。 'new_cluster_protocol': 'http', # デスティネーションElasticsearchクラスターのレプリカシャードのデフォルト数。 'default_replicas': 1, } # HTTPリクエストの共通関数を指定します。 def send_http_request(method, host, endpoint, username="", password="", params=None, json_body=None, protocol='http'): url = f"{protocol}://{host}{endpoint}" auth = (username, password) if username and password else None headers = {'Content-Type': 'application/json'} if method != 'GET' else None try: response = requests.request(method, url, auth=auth, params=params, json=json_body, headers=headers) response.raise_for_status() return response.json() except requests.HTTPError as e: # エラーメッセージを出力します。 print(f"HTTP Error: {e.response.status_code} for {url}") print(e.response.text) except ValueError as e: # レスポンスがJSON形式でない場合は、関連するエラーメッセージを出力し、レスポンスを返します。 print("Invalid JSON response:") print(response.text) raise # ソースElasticsearchクラスターのすべてのインデックステンプレートをクエリします。 def get_index_templates(): endpoint = "/_template" templates_result = send_http_request('GET', config['old_cluster_host'], endpoint, config['old_cluster_user'], config['old_cluster_password'], protocol=config['old_cluster_protocol']) return templates_result # デスティネーションElasticsearchクラスターのインデックステンプレートを作成します。 def create_index_template(template_name, template_body): endpoint = f"/_template/{template_name}" create_result = send_http_request('PUT', config['new_cluster_host'], endpoint, config['new_cluster_user'], config['new_cluster_password'], protocol=config['new_cluster_protocol'], json_body=template_body) print(f"Template {template_name} created with result: {create_result}") # メイン関数を指定します。 def main(): # ソースElasticsearchクラスターのインデックステンプレートを同期します。 templates = get_index_templates() for template_name, template_body in templates.items(): create_index_template(template_name, template_body) if __name__ == '__main__': main()同期結果を確認します。
デスティネーションElasticsearchクラスターで次のコマンドを実行して、同期されたインデックステンプレートに関する情報をクエリします。
GET _template/product
ILMポリシーを同期する
テストデータを準備します。
ソースElasticsearchクラスターで次のコマンドを実行して、ILMポリシーを作成します。
PUT _ilm/policy/product { "policy": { "phases": { "hot": { "actions": { "rollover": { "max_size": "1GB", "max_age": "1d", "max_docs": 1000 } } }, "delete": { "min_age": "2h", "actions": { "delete": {} } } } } }
ECSインスタンスで次のスクリプトを実行して、ILMポリシーを同期します。
import requests from requests.auth import HTTPBasicAuth # 使用する環境に基づいて、以下の構成を変更します。 config = { # ソースElasticsearchクラスターのホスト。 'old_cluster_host': 'es-cn-27a3jyyd2000770dd.public.elasticsearch.aliyuncs.com:9200', # ソースElasticsearchクラスターにアクセスするために使用できるユーザー名。 'old_cluster_user': 'yourusername', # ソースElasticsearchクラスターにアクセスするために使用できるパスワード。 'old_cluster_password': 'yourpassward', # ソースElasticsearchクラスターで使用されるプロトコル。有効な値:httpまたはhttps。 'old_cluster_protocol': 'http', # デスティネーションElasticsearchクラスターのホスト。ホストは、ElasticsearchコンソールのデスティネーションElasticsearchクラスターの[基本情報]ページで取得できます。 'new_cluster_host': 'es-cn-o493kbics000cw4pt.elasticsearch.aliyuncs.com:9200', # デスティネーションElasticsearchクラスターにアクセスするために使用できるユーザー名。 'new_cluster_user': 'yourusername', # デスティネーションElasticsearchクラスターにアクセスするために使用できるパスワード。 'new_cluster_password': 'yourpassward', # デスティネーションElasticsearchクラスターで使用されるプロトコル。有効な値:httpおよびhttps。 'new_cluster_protocol': 'http', # デスティネーションElasticsearchクラスターのレプリカシャードのデフォルト数。 'default_replicas': 1, } # HTTPリクエストの共通関数を指定します。 def send_http_request(method, host, endpoint, username="", password="", params=None, json_body=None, protocol='http'): url = f"{protocol}://{host}{endpoint}" auth = (username, password) if username and password else None headers = {'Content-Type': 'application/json'} if method != 'GET' else None try: response = requests.request(method, url, auth=auth, params=params, json=json_body, headers=headers) response.raise_for_status() return response.json() except requests.HTTPError as e: # エラーメッセージを出力します。 print(f"HTTP Error: {e.response.status_code} for {url}") print(e.response.text) except ValueError as e: # レスポンスがJSON形式でない場合は、関連するエラーメッセージを出力し、レスポンスを返します。 print("Invalid JSON response:") print(response.text) raise # ソースElasticsearchクラスターのすべてのILMポリシーをクエリします。 def get_ilm_polices(): endpoint = "/_ilm/policy" templates_result = send_http_request('GET', config['old_cluster_host'], endpoint, config['old_cluster_user'], config['old_cluster_password'], protocol=config['old_cluster_protocol']) return templates_result # デスティネーションElasticsearchクラスターのILMポリシーを作成します。 def create_ilm_policy(policy_name, policy_body): policy_body.pop('version', None) # version 情報を削除します。 policy_body.pop('modified_date', None) # modified_date 情報を削除します。 policy_body.pop('modified_date_string', None) # modified_date_string 情報を削除します。 endpoint = f"/_ilm/policy/{policy_name}" create_result = send_http_request('PUT', config['new_cluster_host'], endpoint, config['new_cluster_user'], config['new_cluster_password'], protocol=config['new_cluster_protocol'], json_body=policy_body) print(f"Policy {policy_name} created with result: {create_result}") # メイン関数を指定します。 def main(): # ソースElasticsearchクラスターのILMポリシーを同期します。 policies = get_ilm_polices() for policy_name, policy_body in policies.items(): create_ilm_policy(policy_name, policy_body) if __name__ == '__main__': main()同期結果を確認します。
デスティネーションElasticsearchクラスターで次のコマンドを実行して、同期されたILMポリシーに関する情報をクエリします。
GET _ilm/policy/product