Sebelum memigrasikan data antar kluster Elasticsearch, buat indeks yang diperlukan di kluster tujuan dengan pemetaan dan pengaturan yang sama seperti di sumber. Melewatkan langkah ini menyebabkan Elasticsearch melakukan inferensi tipe bidang melalui dynamic mapping, yang dapat mengakibatkan kehilangan data, ketidaksesuaian tipe bidang, dan penurunan performa kueri.
Panduan ini menjelaskan cara menggunakan skrip Python untuk menyalin hal-hal berikut dari kluster sumber ke kluster tujuan:
-
Kebijakan manajemen siklus hidup indeks (ILM)
-
Templat indeks
-
Pemetaan dan pengaturan indeks
Jalankan skrip dalam urutan berikut: kebijakan ILM terlebih dahulu, lalu templat indeks, kemudian pemetaan dan pengaturan indeks. Templat indeks dan kebijakan ILM harus sudah tersedia sebelum indeks dibuat berdasarkan keduanya.
Prasyarat
Sebelum memulai, pastikan Anda telah memenuhi persyaratan berikut:
-
Dua kluster Alibaba Cloud Elasticsearch (keduanya versi 7.10), satu sebagai sumber dan satu sebagai tujuan. Lihat Buat kluster Alibaba Cloud Elasticsearch.
-
Instance Elastic Compute Service (ECS) dengan Python 3.6.8 terinstal. Lihat Memulai instance Linux.
-
Konektivitas jaringan antara instance ECS dan kedua kluster, dengan alamat IP instance ECS ditambahkan ke daftar putih alamat IP publik atau privat kedua kluster.
Untuk lingkungan produksi, lakukan koneksi melalui jaringan internal agar data tidak melewati internet publik.
Skrip dalam panduan ini ditujukan untuk Elasticsearch versi 7.10, yang tidak mendukung indeks multi-tipe. Jika kluster Anda menjalankan versi sebelumnya, sesuaikan skrip tersebut.
Konfigurasi skrip
Setiap skrip menggunakan dictionary config yang sama. Perbarui bidang-bidang berikut sebelum menjalankan skrip apa pun:
| Bidang | Deskripsi | Contoh |
|---|---|---|
old_cluster_host |
Host dan port kluster sumber | es-cn-27a3jyyd2000770dd.public.elasticsearch.aliyuncs.com:9200 |
old_cluster_user |
Username kluster sumber | yourusername |
old_cluster_password |
Password kluster sumber | yourpassward |
old_cluster_protocol |
Protokol kluster sumber (http atau https) |
http |
new_cluster_host |
Host dan port kluster tujuan. Temukan informasi ini di halaman Informasi Dasar di Konsol Elasticsearch. | es-cn-o493kbics000cw4pt.elasticsearch.aliyuncs.com:9200 |
new_cluster_user |
Username kluster tujuan | yourusername |
new_cluster_password |
Password kluster tujuan | yourpassward |
new_cluster_protocol |
Protokol kluster tujuan (http atau https) |
http |
default_replicas |
Jumlah shard replika yang diterapkan di kluster tujuan. Nilai ini menggantikan jumlah replika dari kluster sumber. | 1 |
Menyinkronkan kebijakan ILM
Kebijakan ILM menentukan bagaimana indeks berpindah melalui berbagai fase—seperti hot dan delete—serta aksi yang harus diambil pada setiap fase. Sinkronkan kebijakan ILM sebelum templat indeks karena templat dapat mereferensikan kebijakan ILM.
Buat data uji di kluster sumber
Jalankan perintah berikut di kluster sumber untuk membuat kebijakan ILM uji:
PUT _ilm/policy/product
{
"policy": {
"phases": {
"hot": {
"actions": {
"rollover": {
"max_size": "1GB",
"max_age": "1d",
"max_docs": 1000
}
}
},
"delete": {
"min_age": "2h",
"actions": {
"delete": {}
}
}
}
}
}
Jalankan skrip sinkronisasi
Jalankan skrip berikut di instance ECS. Skrip ini mengambil semua kebijakan ILM dari kluster sumber dan membuatnya di kluster tujuan. Skrip ini menghapus version, modified_date, dan modified_date_string sebelum menulis ke tujuan karena nilai-nilai tersebut merupakan metadata read-only yang dihasilkan oleh Elasticsearch dan tidak dapat ditetapkan saat pembuatan.
import requests
from requests.auth import HTTPBasicAuth
# Perbarui nilai-nilai berikut sesuai lingkungan Anda.
config = {
# Kluster sumber
'old_cluster_host': 'es-cn-27a3jyyd2000770dd.public.elasticsearch.aliyuncs.com:9200',
'old_cluster_user': 'yourusername',
'old_cluster_password': 'yourpassward',
'old_cluster_protocol': 'http', # http atau https
# Kluster tujuan
# Temukan host di halaman Informasi Dasar di Konsol Elasticsearch.
'new_cluster_host': 'es-cn-o493kbics000cw4pt.elasticsearch.aliyuncs.com:9200',
'new_cluster_user': 'yourusername',
'new_cluster_password': 'yourpassward',
'new_cluster_protocol': 'http', # http atau https
'default_replicas': 1,
}
def send_http_request(method, host, endpoint, username="", password="", params=None, json_body=None, protocol='http'):
url = f"{protocol}://{host}{endpoint}"
auth = (username, password) if username and password else None
headers = {'Content-Type': 'application/json'} if method != 'GET' else None
try:
response = requests.request(method, url, auth=auth, params=params, json=json_body, headers=headers)
response.raise_for_status()
return response.json()
except requests.HTTPError as e:
print(f"HTTP Error: {e.response.status_code} untuk {url}")
print(e.response.text)
except ValueError as e:
print("Respons JSON tidak valid:")
print(response.text)
raise
def get_ilm_polices():
endpoint = "/_ilm/policy"
templates_result = send_http_request('GET', config['old_cluster_host'], endpoint, config['old_cluster_user'], config['old_cluster_password'], protocol=config['old_cluster_protocol'])
return templates_result
def create_ilm_policy(policy_name, policy_body):
# Hapus metadata read-only sebelum menulis ke kluster tujuan.
policy_body.pop('version', None)
policy_body.pop('modified_date', None)
policy_body.pop('modified_date_string', None)
endpoint = f"/_ilm/policy/{policy_name}"
create_result = send_http_request('PUT', config['new_cluster_host'], endpoint, config['new_cluster_user'], config['new_cluster_password'], protocol=config['new_cluster_protocol'], json_body=policy_body)
print(f"Kebijakan {policy_name} dibuat dengan hasil: {create_result}")
def main():
policies = get_ilm_polices()
for policy_name, policy_body in policies.items():
create_ilm_policy(policy_name, policy_body)
if __name__ == '__main__':
main()
Verifikasi hasil
Jalankan perintah berikut di kluster tujuan untuk memastikan kebijakan ILM telah dibuat:
GET _ilm/policy/product
Menyinkronkan templat indeks
Templat indeks secara otomatis menerapkan pemetaan dan pengaturan ke indeks baru yang namanya sesuai dengan pola tertentu. Sinkronkan templat setelah kebijakan ILM dan sebelum membuat indeks apa pun yang bergantung padanya.
Buat data uji di kluster sumber
Jalankan perintah berikut di kluster sumber untuk membuat templat indeks uji:
PUT _template/product
{
"index_patterns": ["product_*"],
"settings": {
"number_of_shards": 3,
"number_of_replicas": 1
},
"mappings": {
"properties": {
"productName": {
"type": "text"
},
"annual_rate": {
"type": "keyword"
},
"describe": {
"type": "text"
}
}
}
}
Jalankan skrip sinkronisasi
Jalankan skrip berikut di instance ECS. Skrip ini mengambil semua templat indeks dari kluster sumber dan membuatnya di kluster tujuan.
import requests
from requests.auth import HTTPBasicAuth
# Perbarui nilai-nilai berikut sesuai lingkungan Anda.
config = {
# Kluster sumber
'old_cluster_host': 'es-cn-27a3jyyd2000770dd.public.elasticsearch.aliyuncs.com:9200',
'old_cluster_user': 'yourusername',
'old_cluster_password': 'yourpassward',
'old_cluster_protocol': 'http', # http atau https
# Kluster tujuan
# Temukan host di halaman Informasi Dasar di Konsol Elasticsearch.
'new_cluster_host': 'es-cn-o493kbics000cw4pt.elasticsearch.aliyuncs.com:9200',
'new_cluster_user': 'yourusername',
'new_cluster_password': 'yourpassward',
'new_cluster_protocol': 'http', # http atau https
'default_replicas': 1,
}
def send_http_request(method, host, endpoint, username="", password="", params=None, json_body=None, protocol='http'):
url = f"{protocol}://{host}{endpoint}"
auth = (username, password) if username and password else None
headers = {'Content-Type': 'application/json'} if method != 'GET' else None
try:
response = requests.request(method, url, auth=auth, params=params, json=json_body, headers=headers)
response.raise_for_status()
return response.json()
except requests.HTTPError as e:
print(f"HTTP Error: {e.response.status_code} untuk {url}")
print(e.response.text)
except ValueError as e:
print("Respons JSON tidak valid:")
print(response.text)
raise
def get_index_templates():
endpoint = "/_template"
templates_result = send_http_request('GET', config['old_cluster_host'], endpoint, config['old_cluster_user'], config['old_cluster_password'], protocol=config['old_cluster_protocol'])
return templates_result
def create_index_template(template_name, template_body):
endpoint = f"/_template/{template_name}"
create_result = send_http_request('PUT', config['new_cluster_host'], endpoint, config['new_cluster_user'], config['new_cluster_password'], protocol=config['new_cluster_protocol'], json_body=template_body)
print(f"Templat {template_name} dibuat dengan hasil: {create_result}")
def main():
templates = get_index_templates()
for template_name, template_body in templates.items():
create_index_template(template_name, template_body)
if __name__ == '__main__':
main()
Verifikasi hasil
Jalankan perintah berikut di kluster tujuan untuk memastikan templat indeks telah dibuat:
GET _template/product
Menyinkronkan pemetaan dan pengaturan indeks
Pengaturan indeks menentukan konfigurasi shard utama dan shard replika untuk suatu indeks. Bagian ini menyalin pengaturan tersebut beserta pemetaan bidang dari kluster sumber ke kluster tujuan.
Buat data uji di kluster sumber
Jalankan perintah berikut di kluster sumber untuk membuat indeks uji:
PUT /product_info
{
"settings": {
"number_of_shards": 3,
"number_of_replicas": 1
},
"mappings": {
"properties": {
"productName": {
"type": "text"
},
"annual_rate": {
"type": "keyword"
},
"describe": {
"type": "text"
}
}
}
}
Ketiga bidang tersebut menunjukkan tipe pemetaan umum:
-
productName— tipetext, diindeks teks penuh dan dianalisis untuk pencarian -
annual_rate— tipekeyword, disimpan apa adanya untuk penyaringan dan agregasi berbasis kecocokan eksak -
describe— tipetext, diindeks teks penuh dan dianalisis untuk pencarian
Jalankan skrip sinkronisasi
Jalankan skrip berikut di instance ECS. Skrip ini membaca semua indeks terbuka dari kluster sumber (melewatkan indeks sistem yang diawali dengan .) dan membuat ulangnya di kluster tujuan dengan pemetaan dan jumlah shard yang sama. Jumlah replika di tujuan ditetapkan oleh default_replicas, bukan disalin dari sumber.
import requests
from requests.auth import HTTPBasicAuth
# Perbarui nilai-nilai berikut sesuai lingkungan Anda.
config = {
# Kluster sumber
'old_cluster_host': 'es-cn-27a3jyyd2000770dd.public.elasticsearch.aliyuncs.com:9200',
'old_cluster_user': 'yourusername',
'old_cluster_password': 'yourpassward',
'old_cluster_protocol': 'http', # http atau https
# Kluster tujuan
# Temukan host di halaman Informasi Dasar di Konsol Elasticsearch.
'new_cluster_host': 'es-cn-o493kbics000cw4pt.elasticsearch.aliyuncs.com:9200',
'new_cluster_user': 'yourusername',
'new_cluster_password': 'yourpassward',
'new_cluster_protocol': 'http', # http atau https
# Jumlah replika yang diterapkan di kluster tujuan.
'default_replicas': 1,
}
def send_http_request(method, host, endpoint, username="", password="", params=None, json_body=None, protocol='http'):
url = f"{protocol}://{host}{endpoint}"
auth = (username, password) if username and password else None
headers = {'Content-Type': 'application/json'} if method != 'GET' else None
try:
response = requests.request(method, url, auth=auth, params=params, json=json_body, headers=headers)
response.raise_for_status()
return response.json()
except requests.HTTPError as e:
print(f"HTTP Error: {e.response.status_code} untuk {url}")
print(e.response.text)
except ValueError as e:
print("Respons JSON tidak valid:")
print(response.text)
raise
def get_indices():
endpoint = "/_cat/indices?format=json"
indices_result = send_http_request('GET', config['old_cluster_host'], endpoint, config['old_cluster_user'], config['old_cluster_password'], protocol=config['old_cluster_protocol'])
index_list = [index['index'] for index in indices_result if index['status'] == 'open']
return index_list
def get_index_settings(index):
endpoint = f"/{index}/_settings"
index_settings = send_http_request('GET', config['old_cluster_host'], endpoint, config['old_cluster_user'], config['old_cluster_password'], protocol=config['old_cluster_protocol'])
settings = index_settings[index]['settings']['index']
shards_replicas_settings = {
'number_of_shards': settings.get('number_of_shards'),
'number_of_replicas': config['default_replicas']
}
return {'settings': shards_replicas_settings}
def get_index_mapping(index):
endpoint = f"/{index}/_mapping"
index_mapping = send_http_request('GET', config['old_cluster_host'], endpoint, config['old_cluster_user'], config['old_cluster_password'], protocol=config['old_cluster_protocol'])
return {'mappings': index_mapping[index]['mappings']}
def create_index(old_index_name, new_index_name=""):
if not new_index_name:
new_index_name = old_index_name
settings = get_index_settings(old_index_name)
mappings = get_index_mapping(old_index_name)
body = {**settings, **mappings}
endpoint = f"/{new_index_name}"
create_result = send_http_request('PUT', config['new_cluster_host'], endpoint, config['new_cluster_user'], config['new_cluster_password'], protocol=config['new_cluster_protocol'], json_body=body)
print(f"Indeks {new_index_name} dibuat dengan hasil: {create_result}")
def main():
index_list = get_indices()
for index in index_list:
if not index.startswith("."): # Lewati indeks sistem.
create_index(index)
if __name__ == '__main__':
main()
Verifikasi hasil
Jalankan perintah berikut di kluster tujuan untuk memastikan indeks telah dibuat dengan pemetaan dan pengaturan yang benar:
GET _cat/indices/product_info