このトピックでは、reindex 機能を使用して、ECS インスタンス上の自己管理型 Elasticsearch (ES) クラスターから Alibaba Cloud ES インスタンスにデータを移行する方法について説明します。このプロセスには、インデックスの作成とデータの移行が含まれます。
背景情報
reindex を使用したデータ移行は、シングルゾーンインスタンスでのみサポートされています。マルチゾーンインスタンスを使用する場合は、次のいずれかのソリューションを使用して、自己管理型 ES クラスターから Alibaba Cloud にデータを移行します。
ソースデータ量が大きい場合は、OSS スナップショットメソッドを使用します。詳細については、「高度な機能: OSS を使用して自己管理型 ES クラスターを Alibaba Cloud ES に移行する」をご参照ください。
ソースデータをフィルター処理するには、Logstash 移行ソリューションを使用します。詳細については、「Logstash を使用して自己管理型 Elasticsearch クラスターから Alibaba Cloud にデータを移行する」をご参照ください。
前提条件
次の操作を完了していること。
シングルゾーンの Alibaba Cloud ES インスタンスを作成します。
詳細については、「Alibaba Cloud Elasticsearch インスタンスの作成」をご参照ください。
自己管理型 ES クラスターと移行するデータを準備します。
自己管理型 ES クラスターがない場合は、Alibaba Cloud ECS インスタンスを使用して構築できます。詳細については、「Elasticsearch のインストールと実行」をご参照ください。自己管理型 ES クラスターは、次の条件を満たす必要があります。
クラスターが配置されている ECS インスタンスのネットワークタイプは、VPC (virtual private cloud) である必要があります。ClassicLink を使用して接続された ECS インスタンスはサポートされていません。ECS インスタンスは、Alibaba Cloud ES インスタンスと同じ VPC にある必要があります。
ECS インスタンスのセキュリティグループは、Alibaba Cloud ES インスタンスのノードの IP アドレスからのアクセスを制限してはなりません。ノードの IP アドレスは Kibana コンソールで表示できます。ポート 9200 も開いている必要があります。
自己管理型クラスターは、Alibaba Cloud ES インスタンスに接続できる必要があります。スクリプトを実行するマシンで、
curl -XGET http://<host>:9200コマンドを実行して接続を確認します。説明このトピックのスクリプトは、自己管理型 ES クラスターと Alibaba Cloud ES クラスターの両方のポート 9200 にアクセスできる任意のマシンで実行できます。
制限事項
Alibaba Cloud ES は、基本制御 (v2) アーキテクチャとクラウドネイティブ新制御 (v3) アーキテクチャの 2 つのデプロイメントモードを提供します。デプロイメントモードは、インスタンスの基本情報セクションで確認できます。

クラウドネイティブ新制御 (v3) アーキテクチャを使用するクラスターの場合、クラスター間の reindex 操作には、PrivateLink を使用して Alibaba Cloud ES クラスターへのプライベート接続を確立する必要があります。ビジネスシナリオに基づいて、次の表からソリューションを選択してください。
シナリオ | ES クラスターのネットワークアーキテクチャ | ソリューション |
Alibaba Cloud ES クラスター間のデータ移行 | 両方の ES クラスターが基本制御 (v2) アーキテクチャを使用している。 | reindex メソッド: Alibaba Cloud ES クラスター間のクラスター間 reindex。 |
ES クラスターの 1 つがクラウドネイティブ新制御 (v3) アーキテクチャを使用している。 説明 もう一方の ES クラスターは、クラウドネイティブ新制御 (v3) アーキテクチャまたは基本制御 (v2) アーキテクチャのいずれかを使用できます。 | ||
ECS インスタンス上の自己管理型 ES クラスターから Alibaba Cloud ES クラスターへのデータ移行 | Alibaba Cloud ES クラスターが基本制御 (v2) アーキテクチャを使用している。 | reindex メソッド: reindex を使用して自己管理型 Elasticsearch クラスターから Alibaba Cloud Elasticsearch にデータを移行する。 |
Alibaba Cloud ES クラスターがクラウドネイティブ新制御 (v3) アーキテクチャを使用している。 | reindex メソッド: インスタンスへのプライベート接続を確立して、自己管理型 Elasticsearch クラスターから Alibaba Cloud にデータを移行する。 |
注意事項
2020 年 10 月、Alibaba Cloud Elasticsearch はネットワークアーキテクチャを調整しました。この調整前に使用されていたアーキテクチャは旧ネットワークアーキテクチャと呼ばれ、調整後に使用されるアーキテクチャは新ネットワークアーキテクチャと呼ばれます。新ネットワークアーキテクチャを使用するインスタンスは、旧ネットワークアーキテクチャを使用するインスタンスとのクラスター間 reindex、クラスター間検索、クラスター間レプリケーションなどの相互運用性操作をサポートしていません。相互運用性を有効にするには、インスタンスが同じネットワークアーキテクチャを使用していることを確認してください。中国 (張家口) リージョンおよび中国国外のリージョンでは、ネットワークアーキテクチャの調整時期は固定されていません。ネットワークの相互運用性を確認するには、Alibaba Cloud Elasticsearch のテクニカルサポートにお問い合わせください。
クラウドネイティブ新制御 (v3) アーキテクチャを使用する Alibaba Cloud ES インスタンスは、Alibaba Cloud サービスアカウントに属する VPC にデプロイされ、他のネットワーク環境のリソースにアクセスできません。対照的に、基本制御 (v2) アーキテクチャを使用するインスタンスはユーザーの VPC にデプロイされ、ネットワークアクセスは影響を受けません。
データ整合性を確保するため、移行前にセルフマネージド ES クラスターへのデータの書き込みと更新を停止することを推奨します。これにより、読み取り操作は影響を受けません。移行が完了したら、読み取りおよび書き込み操作を Alibaba Cloud ES クラスターに切り替えることができます。書き込み操作を停止できない場合は、スクリプトを使用してループタスクを設定し、書き込みサービスのダウンタイムを削減することを推奨します。詳細については、ステップ 4: データの移行の「大量のデータの移行 (削除なし、更新時間フィールドあり)」セクションをご参照ください。
ドメイン名を使用して自己管理型 ES または Alibaba Cloud ES クラスターにアクセスする場合、
http://host:port/pathのように path を含む URL は使用できません。
手順
手順 1: エンドポイントドメイン名を取得する (任意)
お使いの Alibaba Cloud ES インスタンスがクラウドネイティブの新しいコントロール (v3) アーキテクチャを使用している場合、PrivateLink を使用して、ECS インスタンス上の自己管理 ES クラスターの VPC を Alibaba Cloud サービスアカウントの VPC に接続する必要があります。その後、後続の構成で使用するエンドポイントドメイン名を取得する必要があります。詳細については、「インスタンスの非公開接続を構成する」をご参照ください。
手順 2: 移行先インデックスを作成する
開始する前に、Alibaba Cloud ES クラスターにインデックスを作成します。新しいインデックスは、自己管理型 ES クラスターから移行するインデックスと同じ構成を使用する必要があります。または、Alibaba Cloud ES クラスターの自動インデックス作成機能を有効にすることもできますが、この方法は推奨されません。
次の例は、Alibaba Cloud ES クラスターにインデックスをバッチで作成するために使用できる Python 2 スクリプトを示しています。これらのインデックスは、自己管理型 ES クラスターから移行するインデックスと同じ構成を持つ必要があります。デフォルトでは、新しいインデックスのレプリカ数は 0 に設定されます。
#!/usr/bin/python
# -*- coding: UTF-8 -*-
# ファイル名: indiceCreate.py
import sys
import base64
import time
import httplib
import json
## 自己管理型 Elasticsearch クラスターのホスト。
oldClusterHost = "old-cluster.com"
## 自己管理型 Elasticsearch クラスターのユーザー名。空にすることもできます。
oldClusterUserName = "old-username"
## 自己管理型 Elasticsearch クラスターのパスワード。空にすることもできます。
oldClusterPassword = "old-password"
## Alibaba Cloud Elasticsearch クラスターのホスト。Alibaba Cloud Elasticsearch インスタンスの [基本情報] ページから取得できます。
newClusterHost = "new-cluster.com"
## Alibaba Cloud Elasticsearch クラスターのユーザー名。
newClusterUser = "elastic"
## Alibaba Cloud Elasticsearch クラスターのパスワード。
newClusterPassword = "new-password"
DEFAULT_REPLICAS = 0
def httpRequest(method, host, endpoint, params="", username="", password=""):
conn = httplib.HTTPConnection(host)
headers = {}
if (username != "") :
'Hello {name}, your age is {age} !'.format(name = 'Tom', age = '20')
base64string = base64.encodestring('{username}:{password}'.format(username = username, password = password)).replace('\n', '')
headers["Authorization"] = "Basic %s" % base64string;
if "GET" == method:
headers["Content-Type"] = "application/x-www-form-urlencoded"
conn.request(method=method, url=endpoint, headers=headers)
else :
headers["Content-Type"] = "application/json"
conn.request(method=method, url=endpoint, body=params, headers=headers)
response = conn.getresponse()
res = response.read()
return res
def httpGet(host, endpoint, username="", password=""):
return httpRequest("GET", host, endpoint, "", username, password)
def httpPost(host, endpoint, params, username="", password=""):
return httpRequest("POST", host, endpoint, params, username, password)
def httpPut(host, endpoint, params, username="", password=""):
return httpRequest("PUT", host, endpoint, params, username, password)
def getIndices(host, username="", password=""):
endpoint = "/_cat/indices"
indicesResult = httpGet(oldClusterHost, endpoint, oldClusterUserName, oldClusterPassword)
indicesList = indicesResult.split("\n")
indexList = []
for indices in indicesList:
if (indices.find("open") > 0):
indexList.append(indices.split()[2])
return indexList
def getSettings(index, host, username="", password=""):
endpoint = "/" + index + "/_settings"
indexSettings = httpGet(host, endpoint, username, password)
print index + " 元の設定は次のとおりです:\n" + indexSettings
settingsDict = json.loads(indexSettings)
## デフォルトでは、シャード数は自己管理型 Elasticsearch クラスターのインデックスのシャード数と同じです。
number_of_shards = settingsDict[index]["settings"]["index"]["number_of_shards"]
## デフォルトでは、レプリカ数は 0 です。
number_of_replicas = DEFAULT_REPLICAS
newSetting = "\"settings\": {\"number_of_shards\": %s, \"number_of_replicas\": %s}" % (number_of_shards, number_of_replicas)
return newSetting
def getMapping(index, host, username="", password=""):
endpoint = "/" + index + "/_mapping"
indexMapping = httpGet(host, endpoint, username, password)
print index + " 元のマッピングは次のとおりです:\n" + indexMapping
mappingDict = json.loads(indexMapping)
mappings = json.dumps(mappingDict[index]["mappings"])
newMapping = "\"mappings\" : " + mappings
return newMapping
def createIndexStatement(oldIndexName):
settingStr = getSettings(oldIndexName, oldClusterHost, oldClusterUserName, oldClusterPassword)
mappingStr = getMapping(oldIndexName, oldClusterHost, oldClusterUserName, oldClusterPassword)
createstatement = "{\n" + str(settingStr) + ",\n" + str(mappingStr) + "\n}"
return createstatement
def createIndex(oldIndexName, newIndexName=""):
if (newIndexName == "") :
newIndexName = oldIndexName
createstatement = createIndexStatement(oldIndexName)
print "新しいインデックス " + newIndexName + " の設定とマッピングは次のとおりです:\n" + createstatement
endpoint = "/" + newIndexName
createResult = httpPut(newClusterHost, endpoint, createstatement, newClusterUser, newClusterPassword)
print "新しいインデックス " + newIndexName + " の作成結果: " + createResult
## main
indexList = getIndices(oldClusterHost, oldClusterUserName, oldClusterPassword)
systemIndex = []
for index in indexList:
if (index.startswith(".")):
systemIndex.append(index)
else :
createIndex(index, index)
if (len(systemIndex) > 0) :
for index in systemIndex:
print index + " はシステムインデックスである可能性があるため、再作成されません。必要に応じて別途処理してください。"手順 3: reindex ホワイトリストを設定する
Alibaba Cloud Elasticsearch コンソールにログインします。
左側のナビゲーションウィンドウで、[Elasticsearch クラスター] をクリックします。
目的のクラスターに移動します。
上部のナビゲーションバーで、クラスターが属するリソースグループとクラスターが存在するリージョンを選択します。
[Elasticsearch クラスター] ページで、クラスターを見つけてその ID をクリックします。
左側のナビゲーションウィンドウで、 を選択します。
YML 設定 セクションで、右側の 設定の更新 をクリックします。
YML 設定 パネルで、その他の設定 を変更して reindex ホワイトリストを設定します。
次の例は、サンプル構成を示しています。
reindex.remote.whitelist: ["10.0.xx.xx:9200","10.0.xx.xx:9200","10.0.xx.xx:9200","10.15.xx.xx:9200","10.15.xx.xx:9200","10.15.xx.xx:9200"]
reindex ホワイトリストを設定するには、reindex.remote.whitelist パラメーターを使用して自己管理型 ES クラスターのエンドポイントを指定します。これにより、エンドポイントが Alibaba Cloud ES クラスターのリモートアクセスホワイトリストに追加されます。構成ルールは、Alibaba Cloud ES クラスターのネットワークアーキテクチャによって異なります。
基本制御 (v2) アーキテクチャの場合、host と port の組み合わせを指定します。複数のホスト構成を区切るにはカンマを使用します。例: otherhost:9200,another:9200,127.0.10.**:9200,localhost:**。プロトコル情報は認識されません。
クラウドネイティブ新制御 (v3) アーキテクチャの場合、インスタンスに対応する エンドポイントドメイン名 と ポート の組み合わせを指定します。例: ep-bp1hfkx7coy8lvu4****-cn-hangzhou-i.epsrv-bp1zczi0fgoc5qtv****.cn-hangzhou.privatelink.aliyuncs.com:9200。
説明パラメーターの詳細については、「YML パラメーターの設定」をご参照ください。
この操作は、インスタンスを再起動します。確認してから操作を行ってください。 オプションを選択し、OK ボタンをクリックします。
OK をクリックすると、Elasticsearch インスタンスが再起動します。再起動中、[タスクリスト] で進捗を監視できます。インスタンスが再起動すると、構成は完了です。
手順 4: データを移行する
このセクションでは、基本制御 (v2) アーキテクチャを持つインスタンスを例として使用し、3 つのデータ移行メソッドを提供します。データ量とビジネス要件に基づいてメソッドを選択してください。
データ量が少ない場合
次のスクリプトを使用できます。
#!/bin/bash
# ファイル: reindex.sh
indexName="インデックス名"
newClusterUser="Alibaba Cloud Elasticsearch クラスターのユーザー名"
newClusterPass="Alibaba Cloud Elasticsearch クラスターのパスワード"
newClusterHost="Alibaba Cloud Elasticsearch クラスターのホスト"
oldClusterUser="自己管理型 Elasticsearch クラスターのユーザー名"
oldClusterPass="自己管理型 Elasticsearch クラスターのパスワード"
# 自己管理型 Elasticsearch クラスターのホストは、[scheme]://[host]:[port] の形式である必要があります (例: http://10.37.*.*:9200)。
oldClusterHost="自己管理型 Elasticsearch クラスターのホスト"
curl -u ${newClusterUser}:${newClusterPass} -XPOST "http://${newClusterHost}/_reindex?pretty" -H "Content-Type: application/json" -d'{
"source": {
"remote": {
"host": "'${oldClusterHost}'",
"username": "'${oldClusterUser}'",
"password": "'${oldClusterPass}'"
},
"index": "'${indexName}'",
"query": {
"match_all": {}
}
},
"dest": {
"index": "'${indexName}'"
}
}'データ量が多く、削除操作がなく、更新時間がある場合
データ量が大きく、削除操作が実行されない場合は、scroll 移行を使用して書き込みサービスのダウンタイムを短縮できます。scroll 移行には、新しいデータを追跡するための更新時間フィールドなどのフィールドが必要です。初期データ移行が完了したら、すべての書き込み操作を停止します。次に、reindex 操作を最後に 1 回実行して、最新の更新を移行します。最後に、読み取りおよび書き込み操作を Alibaba Cloud ES クラスターに切り替えます。
#!/bin/bash
# ファイル: circleReindex.sh
# 起動の制御:
# これはリモート reindex 用のスクリプトです。要件:
# 1. Alibaba Cloud Elasticsearch クラスターにインデックスが作成されているか、クラスターが自動作成と動的マッピングをサポートしていること。
# 2. Alibaba Cloud Elasticsearch クラスターの YML ファイルに IP アドレスホワイトリストが設定されていること (例: reindex.remote.whitelist: 172.16.**.**:9200)。
# 3. ホストは [scheme]://[host]:[port] の形式であること。
USAGE="使用法: sh circleReindex.sh <count>
count: 実行回数。負数は増分実行のループを示します。正数は 1 回または複数回の実行を示します。
例:
sh circleReindex.sh 1
sh circleReindex.sh 5
sh circleReindex.sh -1"
indexName="インデックス名"
newClusterUser="Alibaba Cloud Elasticsearch クラスターのユーザー名"
newClusterPass="Alibaba Cloud Elasticsearch クラスターのパスワード"
oldClusterUser="自己管理型 Elasticsearch クラスターのユーザー名"
oldClusterPass="自己管理型 Elasticsearch クラスターのパスワード"
## http://myescluster.com
newClusterHost="Alibaba Cloud Elasticsearch クラスターのホスト"
# 自己管理型 Elasticsearch クラスターのホストは、[scheme]://[host]:[port] の形式である必要があります (例: http://10.37.*.*:9200)。
oldClusterHost="自己管理型 Elasticsearch クラスターのホスト"
timeField="更新時間フィールド"
reindexTimes=0
lastTimestamp=0
curTimestamp=`date +%s`
hasError=false
function reIndexOP() {
reindexTimes=$[${reindexTimes} + 1]
curTimestamp=`date +%s`
ret=`curl -u ${newClusterUser}:${newClusterPass} -XPOST "${newClusterHost}/_reindex?pretty" -H "Content-Type: application/json" -d '{
"source": {
"remote": {
"host": "'${oldClusterHost}'",
"username": "'${oldClusterUser}'",
"password": "'${oldClusterPass}'"
},
"index": "'${indexName}'",
"query": {
"range" : {
"'${timeField}'" : {
"gte" : '${lastTimestamp}',
"lt" : '${curTimestamp}'
}
}
}
},
"dest": {
"index": "'${indexName}'"
}
}'`
lastTimestamp=${curTimestamp}
echo "${reindexTimes} 回目の reindex。この実行の更新期限は ${lastTimestamp} です。結果: ${ret}"
if [[ ${ret} == *error* ]]; then
hasError=true
echo "この実行中に例外が発生しました。後続の操作は中断されます。確認してください。"
fi
}
function start() {
## 数が負の場合、ループは継続的に実行されます。
if [[ $1 -lt 0 ]]; then
while :
do
reIndexOP
done
elif [[ $1 -gt 0 ]]; then
k=0
while [[ k -lt $1 ]] && [[ ${hasError} == false ]]; do
reIndexOP
let ++k
done
fi
}
## main
if [ $# -lt 1 ]; then
echo "$USAGE"
exit 1
fi
echo "インデックス ${indexName} の reindex 操作を開始します。"
start $1
echo "合計 ${reindexTimes} 回の reindex 操作が実行されました。"データ量が多く、削除操作がなく、更新時間がない場合
データ量が大きく、インデックスマッピングに更新時間フィールドが含まれていない場合は、アップストリームサービスのコードを変更してこのフィールドを追加する必要があります。フィールドを追加した後、まず既存データを移行します。次に、前のセクションで説明した scroll 移行メソッドを使用します。
#!/bin/bash
# ファイル: miss.sh
indexName="インデックス名"
newClusterUser="Alibaba Cloud Elasticsearch クラスターのユーザー名"
newClusterPass="Alibaba Cloud Elasticsearch クラスターのパスワード"
newClusterHost="Alibaba Cloud Elasticsearch クラスターのホスト"
oldClusterUser="自己管理型 Elasticsearch クラスターのユーザー名"
oldClusterPass="自己管理型 Elasticsearch クラスターのパスワード"
# 自己管理型 Elasticsearch クラスターのホストは、[scheme]://[host]:[port] の形式である必要があります (例: http://10.37.*.*:9200)
oldClusterHost="自己管理型 Elasticsearch クラスターのホスト"
timeField="updatetime"
curl -u ${newClusterUser}:${newClusterPass} -XPOST "http://${newClusterHost}/_reindex?pretty" -H "Content-Type: application/json" -d '{
"source": {
"remote": {
"host": "'${oldClusterHost}'",
"username": "'${oldClusterUser}'",
"password": "'${oldClusterPass}'"
},
"index": "'${indexName}'",
"query": {
"bool": {
"must_not": {
"exists": {
"field": "'${timeField}'"
}
}
}
}
},
"dest": {
"index": "'${indexName}'"
}
}'よくある質問
問題: curl コマンドを実行すると、エラーメッセージ
{"error":"Content-Type header [application/x-www-form-urlencoded] is not supported","status":406}が返されます。解決策:
-H "Content-Type: application/json"を curl コマンドに追加して、再試行してください。// 自己管理型 Elasticsearch クラスター内のすべてのインデックスに関する情報を取得します。権限がない場合は、"-u user:pass" パラメーターを削除できます。oldClusterHost は自己管理型 Elasticsearch クラスターのホストです。実際のホストに置き換えてください。 curl -u user:pass -XGET http://oldClusterHost/_cat/indices | awk '{print $3}' // 返されたインデックスリストに基づいて、移行するユーザーインデックスの設定とマッピングを取得します。indexName をクエリするユーザーインデックスの名前に置き換えます。 curl -u user:pass -XGET http://oldClusterHost/indexName/_settings,_mapping?pretty=true // 対応するインデックスの取得した _settings および _mapping 情報に基づいて、Alibaba Cloud Elasticsearch クラスターに対応するインデックスを作成します。レプリカ数を 0 に設定して、データ同期を高速化できます。データ移行が完了したら、レプリカ数を 1 にリセットします。 // newClusterHost は Alibaba Cloud Elasticsearch クラスターのホスト、testindex は作成されたインデックスの名前、testtype は対応するインデックスのタイプです。 curl -u user:pass -XPUT http://<newClusterHost>/<testindex> -d '{ "testindex" : { "settings" : { "number_of_shards" : "5", // 自己管理型 Elasticsearch クラスターの対応するインデックスのシャード数が 5 であると仮定します。 "number_of_replicas" : "0" // インデックスのレプリカ数を 0 に設定します。 } }, "mappings" : { // 自己管理型 Elasticsearch クラスターの対応するインデックスのマッピングが次のように設定されていると仮定します。 "testtype" : { "properties" : { "uid" : { "type" : "long" }, "name" : { "type" : "text" }, "create_time" : { "type" : "long" } } } } } }'問題: 単一インデックスのデータ量が大きく、データ同期速度が遅いです。どうすればよいですか?
解決策:
reindex 機能は scroll メソッドに基づいて実装されています。scroll サイズを増やすか、scroll スライスを設定して効率を向上させることができます。詳細については、「reindex API」ドキュメントをご参照ください。
ソースデータ量が大きい場合は、OSS スナップショットメソッドを使用します。詳細については、「高度な機能: OSS を使用して自己管理型 ES クラスターを Alibaba Cloud ES に移行する」をご参照ください。
単一インデックスのデータ量が大きい場合は、移行前に移行先インデックスのレプリカ数を 0 に、更新間隔を -1 に設定してデータ同期を高速化できます。データ移行が完了したら、これらの設定を元の値に戻します。
// インデックスデータを移行する前に、レプリカ数を 0 に設定し、リフレッシュを無効にしてデータ移行を高速化できます。 curl -u user:password -XPUT 'http://<host:port>/indexName/_settings' -d' { "number_of_replicas" : 0, "refresh_interval" : "-1" }' // インデックスデータが移行された後、レプリカ数を 1 に、更新間隔を 1s にリセットできます (1s はデフォルト値です)。 curl -u user:password -XPUT 'http://<host:port>/indexName/_settings' -d' { "number_of_replicas" : 1, "refresh_interval" : "1s" }'