Elasticsearch クラスター間でデータを移行する前に、送信先クラスターに、ソースクラスターと同一のマッピングおよび設定を持つ必要なインデックスを作成します。この手順を省略すると、Elasticsearch が動的マッピングによってフィールドタイプを推論し、データ損失、フィールドタイプの不一致、クエリパフォーマンスの低下を引き起こす可能性があります。
本ガイドでは、Python スクリプトを使用して、以下の項目をソースクラスターから送信先クラスターへコピーする方法について説明します。
-
インデックスライフサイクル管理 (ILM) ポリシー
-
インデックステンプレート
-
インデックスマッピングおよび設定
スクリプトは以下の順序で実行してください:最初に ILM ポリシー、次にインデックステンプレート、最後にインデックスマッピングおよび設定です。インデックスを作成する前に、インデックステンプレートおよび ILM ポリシーが既に存在している必要があります。
前提条件
開始する前に、以下の条件を満たしていることを確認してください。
-
Alibaba Cloud Elasticsearch クラスターを 2 つ(ともに V7.10)用意し、一方をソース、もう一方を送信先として使用します。詳細については、「Alibaba Cloud Elasticsearch クラスターの作成」をご参照ください。
-
Python 3.6.8 がインストール済みの Elastic Compute Service (ECS) インスタンス。詳細については、「Linux インスタンスの使い始め」をご参照ください。
-
ECS インスタンスと両方のクラスター間のネットワーク接続性。また、ECS インスタンスの IP アドレスを、両方のクラスターのパブリックまたはプライベート IP アドレスホワイトリストに追加済みであること。
本番環境では、データがパブリックインターネットを経由しないよう、内部ネットワーク経由で接続することを推奨します。
本ガイドのスクリプトは Elasticsearch V7.10 を対象としており、マルチタイプインデックスをサポートしていません。クラスターがそれより古いバージョンを実行している場合は、スクリプトを適宜変更してください。
スクリプトの構成
各スクリプトは、同じ config 辞書を使用します。任意のスクリプトを実行する前に、以下のフィールドを更新してください。
| フィールド | 説明 | 例 |
|---|---|---|
old_cluster_host |
ソースクラスターのホストおよびポート | es-cn-27a3jyyd2000770dd.public.elasticsearch.aliyuncs.com:9200 |
old_cluster_user |
ソースクラスターのユーザー名 | yourusername |
old_cluster_password |
ソースクラスターのパスワード | yourpassward |
old_cluster_protocol |
ソースクラスターのプロトコル(http または https) |
http |
new_cluster_host |
送信先クラスターのホストおよびポート。Elasticsearch コンソールの [基本情報] ページで確認できます。 | es-cn-o493kbics000cw4pt.elasticsearch.aliyuncs.com:9200 |
new_cluster_user |
送信先クラスターのユーザー名 | yourusername |
new_cluster_password |
送信先クラスターのパスワード | yourpassward |
new_cluster_protocol |
送信先クラスターのプロトコル(http または https) |
http |
default_replicas |
送信先クラスターに適用するレプリカシャード数。この値は、ソースクラスターのレプリカ数をオーバーライドします。 | 1 |
ILM ポリシーの同期
ILM ポリシーは、インデックスが「ホット」や「削除」などのフェーズを経てライフサイクルを進める方法、および各フェーズで実行する操作を定義します。インデックステンプレートは ILM ポリシーを参照できるため、ILM ポリシーの同期はインデックステンプレートの同期よりも先に実行してください。
ソースクラスターへのテストデータの作成
以下のコマンドをソースクラスターで実行し、テスト用の ILM ポリシーを作成します。
PUT _ilm/policy/product
{
"policy": {
"phases": {
"hot": {
"actions": {
"rollover": {
"max_size": "1GB",
"max_age": "1d",
"max_docs": 1000
}
}
},
"delete": {
"min_age": "2h",
"actions": {
"delete": {}
}
}
}
}
}
同期スクリプトの実行
以下のスクリプトを ECS インスタンスで実行します。このスクリプトは、ソースクラスターからすべての ILM ポリシーをフェッチし、送信先クラスターに作成します。Elasticsearch が生成する読み取り専用のメタデータである version、modified_date、modified_date_string は、送信先クラスターへの書き込み前に削除されます(これらのフィールドは作成時に設定できません)。
import requests
from requests.auth import HTTPBasicAuth
# 環境に合わせてこれらの値を更新してください。
config = {
# ソースクラスター
'old_cluster_host': 'es-cn-27a3jyyd2000770dd.public.elasticsearch.aliyuncs.com:9200',
'old_cluster_user': 'yourusername',
'old_cluster_password': 'yourpassward',
'old_cluster_protocol': 'http', # http または https
# 送信先クラスター
# ホストは、Elasticsearch コンソールの [基本情報] ページで確認できます。
'new_cluster_host': 'es-cn-o493kbics000cw4pt.elasticsearch.aliyuncs.com:9200',
'new_cluster_user': 'yourusername',
'new_cluster_password': 'yourpassward',
'new_cluster_protocol': 'http', # http または https
'default_replicas': 1,
}
def send_http_request(method, host, endpoint, username="", password="", params=None, json_body=None, protocol='http'):
url = f"{protocol}://{host}{endpoint}"
auth = (username, password) if username and password else None
headers = {'Content-Type': 'application/json'} if method != 'GET' else None
try:
response = requests.request(method, url, auth=auth, params=params, json=json_body, headers=headers)
response.raise_for_status()
return response.json()
except requests.HTTPError as e:
print(f"HTTP エラー: {e.response.status_code} for {url}")
print(e.response.text)
except ValueError as e:
print("無効な JSON 応答:")
print(response.text)
raise
def get_ilm_polices():
endpoint = "/_ilm/policy"
templates_result = send_http_request('GET', config['old_cluster_host'], endpoint, config['old_cluster_user'], config['old_cluster_password'], protocol=config['old_cluster_protocol'])
return templates_result
def create_ilm_policy(policy_name, policy_body):
# 送信先クラスターへの書き込み前に、読み取り専用のメタデータを削除します。
policy_body.pop('version', None)
policy_body.pop('modified_date', None)
policy_body.pop('modified_date_string', None)
endpoint = f"/_ilm/policy/{policy_name}"
create_result = send_http_request('PUT', config['new_cluster_host'], endpoint, config['new_cluster_user'], config['new_cluster_password'], protocol=config['new_cluster_protocol'], json_body=policy_body)
print(f"ポリシー {policy_name} を作成しました。結果: {create_result}")
def main():
policies = get_ilm_polices()
for policy_name, policy_body in policies.items():
create_ilm_policy(policy_name, policy_body)
if __name__ == '__main__':
main()
結果の確認
以下のコマンドを送信先クラスターで実行し、ILM ポリシーが正しく作成されたことを確認します。
GET _ilm/policy/product
インデックステンプレートの同期
インデックステンプレートは、名前が指定されたパターンに一致する新しいインデックスに対して、自動的にマッピングおよび設定を適用します。ILM ポリシーの同期後にテンプレートを同期し、依存関係のあるインデックスを作成する前に完了させてください。
ソースクラスターへのテストデータの作成
以下のコマンドをソースクラスターで実行し、テスト用のインデックステンプレートを作成します。
PUT _template/product
{
"index_patterns": ["product_*"],
"settings": {
"number_of_shards": 3,
"number_of_replicas": 1
},
"mappings": {
"properties": {
"productName": {
"type": "text"
},
"annual_rate": {
"type": "keyword"
},
"describe": {
"type": "text"
}
}
}
}
同期スクリプトの実行
以下のスクリプトを ECS インスタンスで実行します。このスクリプトは、ソースクラスターからすべてのインデックステンプレートをフェッチし、送信先クラスターに作成します。
import requests
from requests.auth import HTTPBasicAuth
# 環境に合わせてこれらの値を更新してください。
config = {
# ソースクラスター
'old_cluster_host': 'es-cn-27a3jyyd2000770dd.public.elasticsearch.aliyuncs.com:9200',
'old_cluster_user': 'yourusername',
'old_cluster_password': 'yourpassward',
'old_cluster_protocol': 'http', # http または https
# 送信先クラスター
# ホストは、Elasticsearch コンソールの [基本情報] ページで確認できます。
'new_cluster_host': 'es-cn-o493kbics000cw4pt.elasticsearch.aliyuncs.com:9200',
'new_cluster_user': 'yourusername',
'new_cluster_password': 'yourpassward',
'new_cluster_protocol': 'http', # http または https
'default_replicas': 1,
}
def send_http_request(method, host, endpoint, username="", password="", params=None, json_body=None, protocol='http'):
url = f"{protocol}://{host}{endpoint}"
auth = (username, password) if username and password else None
headers = {'Content-Type': 'application/json'} if method != 'GET' else None
try:
response = requests.request(method, url, auth=auth, params=params, json=json_body, headers=headers)
response.raise_for_status()
return response.json()
except requests.HTTPError as e:
print(f"HTTP エラー: {e.response.status_code} for {url}")
print(e.response.text)
except ValueError as e:
print("無効な JSON 応答:")
print(response.text)
raise
def get_index_templates():
endpoint = "/_template"
templates_result = send_http_request('GET', config['old_cluster_host'], endpoint, config['old_cluster_user'], config['old_cluster_password'], protocol=config['old_cluster_protocol'])
return templates_result
def create_index_template(template_name, template_body):
endpoint = f"/_template/{template_name}"
create_result = send_http_request('PUT', config['new_cluster_host'], endpoint, config['new_cluster_user'], config['new_cluster_password'], protocol=config['new_cluster_protocol'], json_body=template_body)
print(f"テンプレート {template_name} を作成しました。結果: {create_result}")
def main():
templates = get_index_templates()
for template_name, template_body in templates.items():
create_index_template(template_name, template_body)
if __name__ == '__main__':
main()
結果の確認
以下のコマンドを送信先クラスターで実行し、インデックステンプレートが正しく作成されたことを確認します。
GET _template/product
インデックスマッピングおよび設定の同期
インデックス設定は、インデックスのプライマリシャードおよびレプリカシャードの構成を定義します。本セクションでは、ソースクラスターから送信先クラスターへ、これらの設定およびフィールドマッピングをコピーします。
ソースクラスターへのテストデータの作成
以下のコマンドをソースクラスターで実行し、テスト用のインデックスを作成します。
PUT /product_info
{
"settings": {
"number_of_shards": 3,
"number_of_replicas": 1
},
"mappings": {
"properties": {
"productName": {
"type": "text"
},
"annual_rate": {
"type": "keyword"
},
"describe": {
"type": "text"
}
}
}
}
上記の 3 つのフィールドは、一般的なマッピングタイプを示しています。
-
productName—text型:検索のために全文インデックス化および解析される -
annual_rate—keyword型:完全一致によるフィルタリングおよび集約のためにそのまま格納される -
describe—text型:検索のために全文インデックス化および解析される
同期スクリプトの実行
以下のスクリプトを ECS インスタンスで実行します。このスクリプトは、ソースクラスターからすべてのオープン状態のインデックスを読み取り(. で始まるシステムインデックスはスキップ)、送信先クラスターに、同一のマッピングおよびシャード数で再作成します。送信先クラスターのレプリカ数は、default_replicas で指定された値が適用され、ソースクラスターからのコピーは行われません。
import requests
from requests.auth import HTTPBasicAuth
# 環境に合わせてこれらの値を更新してください。
config = {
# ソースクラスター
'old_cluster_host': 'es-cn-27a3jyyd2000770dd.public.elasticsearch.aliyuncs.com:9200',
'old_cluster_user': 'yourusername',
'old_cluster_password': 'yourpassward',
'old_cluster_protocol': 'http', # http または https
# 送信先クラスター
# ホストは、Elasticsearch コンソールの [基本情報] ページで確認できます。
'new_cluster_host': 'es-cn-o493kbics000cw4pt.elasticsearch.aliyuncs.com:9200',
'new_cluster_user': 'yourusername',
'new_cluster_password': 'yourpassward',
'new_cluster_protocol': 'http', # http または https
# 送信先クラスターに適用するレプリカ数。
'default_replicas': 1,
}
def send_http_request(method, host, endpoint, username="", password="", params=None, json_body=None, protocol='http'):
url = f"{protocol}://{host}{endpoint}"
auth = (username, password) if username and password else None
headers = {'Content-Type': 'application/json'} if method != 'GET' else None
try:
response = requests.request(method, url, auth=auth, params=params, json=json_body, headers=headers)
response.raise_for_status()
return response.json()
except requests.HTTPError as e:
print(f"HTTP エラー: {e.response.status_code} for {url}")
print(e.response.text)
except ValueError as e:
print("無効な JSON 応答:")
print(response.text)
raise
def get_indices():
endpoint = "/_cat/indices?format=json"
indices_result = send_http_request('GET', config['old_cluster_host'], endpoint, config['old_cluster_user'], config['old_cluster_password'], protocol=config['old_cluster_protocol'])
index_list = [index['index'] for index in indices_result if index['status'] == 'open']
return index_list
def get_index_settings(index):
endpoint = f"/{index}/_settings"
index_settings = send_http_request('GET', config['old_cluster_host'], endpoint, config['old_cluster_user'], config['old_cluster_password'], protocol=config['old_cluster_protocol'])
settings = index_settings[index]['settings']['index']
shards_replicas_settings = {
'number_of_shards': settings.get('number_of_shards'),
'number_of_replicas': config['default_replicas']
}
return {'settings': shards_replicas_settings}
def get_index_mapping(index):
endpoint = f"/{index}/_mapping"
index_mapping = send_http_request('GET', config['old_cluster_host'], endpoint, config['old_cluster_user'], config['old_cluster_password'], protocol=config['old_cluster_protocol'])
return {'mappings': index_mapping[index]['mappings']}
def create_index(old_index_name, new_index_name=""):
if not new_index_name:
new_index_name = old_index_name
settings = get_index_settings(old_index_name)
mappings = get_index_mapping(old_index_name)
body = {**settings, **mappings}
endpoint = f"/{new_index_name}"
create_result = send_http_request('PUT', config['new_cluster_host'], endpoint, config['new_cluster_user'], config['new_cluster_password'], protocol=config['new_cluster_protocol'], json_body=body)
print(f"インデックス {new_index_name} を作成しました。結果: {create_result}")
def main():
index_list = get_indices()
for index in index_list:
if not index.startswith("."): # システムインデックスをスキップします。
create_index(index)
if __name__ == '__main__':
main()
結果の確認
以下のコマンドを送信先クラスターで実行し、インデックスが正しいマッピングおよび設定で作成されたことを確認します。
GET _cat/indices/product_info