All Products
Search
Document Center

Elasticsearch:Gunakan skrip untuk menyinkronkan skema indeks, templat, atau ILM

Last Updated:Mar 27, 2026

Sebelum memigrasikan data antar kluster Elasticsearch, buat indeks yang diperlukan di kluster tujuan dengan pemetaan dan pengaturan yang sama seperti di sumber. Melewatkan langkah ini menyebabkan Elasticsearch melakukan inferensi tipe bidang melalui dynamic mapping, yang dapat mengakibatkan kehilangan data, ketidaksesuaian tipe bidang, dan penurunan performa kueri.

Panduan ini menjelaskan cara menggunakan skrip Python untuk menyalin hal-hal berikut dari kluster sumber ke kluster tujuan:

  • Kebijakan manajemen siklus hidup indeks (ILM)

  • Templat indeks

  • Pemetaan dan pengaturan indeks

Jalankan skrip dalam urutan berikut: kebijakan ILM terlebih dahulu, lalu templat indeks, kemudian pemetaan dan pengaturan indeks. Templat indeks dan kebijakan ILM harus sudah tersedia sebelum indeks dibuat berdasarkan keduanya.

Prasyarat

Sebelum memulai, pastikan Anda telah memenuhi persyaratan berikut:

  • Dua kluster Alibaba Cloud Elasticsearch (keduanya versi 7.10), satu sebagai sumber dan satu sebagai tujuan. Lihat Buat kluster Alibaba Cloud Elasticsearch.

  • Instance Elastic Compute Service (ECS) dengan Python 3.6.8 terinstal. Lihat Memulai instance Linux.

  • Konektivitas jaringan antara instance ECS dan kedua kluster, dengan alamat IP instance ECS ditambahkan ke daftar putih alamat IP publik atau privat kedua kluster.

Untuk lingkungan produksi, lakukan koneksi melalui jaringan internal agar data tidak melewati internet publik.
Skrip dalam panduan ini ditujukan untuk Elasticsearch versi 7.10, yang tidak mendukung indeks multi-tipe. Jika kluster Anda menjalankan versi sebelumnya, sesuaikan skrip tersebut.

Konfigurasi skrip

Setiap skrip menggunakan dictionary config yang sama. Perbarui bidang-bidang berikut sebelum menjalankan skrip apa pun:

Bidang Deskripsi Contoh
old_cluster_host Host dan port kluster sumber es-cn-27a3jyyd2000770dd.public.elasticsearch.aliyuncs.com:9200
old_cluster_user Username kluster sumber yourusername
old_cluster_password Password kluster sumber yourpassward
old_cluster_protocol Protokol kluster sumber (http atau https) http
new_cluster_host Host dan port kluster tujuan. Temukan informasi ini di halaman Informasi Dasar di Konsol Elasticsearch. es-cn-o493kbics000cw4pt.elasticsearch.aliyuncs.com:9200
new_cluster_user Username kluster tujuan yourusername
new_cluster_password Password kluster tujuan yourpassward
new_cluster_protocol Protokol kluster tujuan (http atau https) http
default_replicas Jumlah shard replika yang diterapkan di kluster tujuan. Nilai ini menggantikan jumlah replika dari kluster sumber. 1

Menyinkronkan kebijakan ILM

Kebijakan ILM menentukan bagaimana indeks berpindah melalui berbagai fase—seperti hot dan delete—serta aksi yang harus diambil pada setiap fase. Sinkronkan kebijakan ILM sebelum templat indeks karena templat dapat mereferensikan kebijakan ILM.

Buat data uji di kluster sumber

Jalankan perintah berikut di kluster sumber untuk membuat kebijakan ILM uji:

PUT _ilm/policy/product
{
  "policy": {
    "phases": {
      "hot": {
        "actions": {
          "rollover": {
            "max_size": "1GB",
            "max_age": "1d",
            "max_docs": 1000
          }
        }
      },
      "delete": {
        "min_age": "2h",
        "actions": {
          "delete": {}
        }
      }
    }
  }
}

Jalankan skrip sinkronisasi

Jalankan skrip berikut di instance ECS. Skrip ini mengambil semua kebijakan ILM dari kluster sumber dan membuatnya di kluster tujuan. Skrip ini menghapus version, modified_date, dan modified_date_string sebelum menulis ke tujuan karena nilai-nilai tersebut merupakan metadata read-only yang dihasilkan oleh Elasticsearch dan tidak dapat ditetapkan saat pembuatan.

import requests
from requests.auth import HTTPBasicAuth

# Perbarui nilai-nilai berikut sesuai lingkungan Anda.
config = {
    # Kluster sumber
    'old_cluster_host': 'es-cn-27a3jyyd2000770dd.public.elasticsearch.aliyuncs.com:9200',
    'old_cluster_user': 'yourusername',
    'old_cluster_password': 'yourpassward',
    'old_cluster_protocol': 'http',          # http atau https

    # Kluster tujuan
    # Temukan host di halaman Informasi Dasar di Konsol Elasticsearch.
    'new_cluster_host': 'es-cn-o493kbics000cw4pt.elasticsearch.aliyuncs.com:9200',
    'new_cluster_user': 'yourusername',
    'new_cluster_password': 'yourpassward',
    'new_cluster_protocol': 'http',          # http atau https

    'default_replicas': 1,
}

def send_http_request(method, host, endpoint, username="", password="", params=None, json_body=None, protocol='http'):
    url = f"{protocol}://{host}{endpoint}"
    auth = (username, password) if username and password else None
    headers = {'Content-Type': 'application/json'} if method != 'GET' else None
    try:
        response = requests.request(method, url, auth=auth, params=params, json=json_body, headers=headers)
        response.raise_for_status()
        return response.json()
    except requests.HTTPError as e:
        print(f"HTTP Error: {e.response.status_code} untuk {url}")
        print(e.response.text)
    except ValueError as e:
        print("Respons JSON tidak valid:")
        print(response.text)
        raise

def get_ilm_polices():
    endpoint = "/_ilm/policy"
    templates_result = send_http_request('GET', config['old_cluster_host'], endpoint, config['old_cluster_user'], config['old_cluster_password'], protocol=config['old_cluster_protocol'])
    return templates_result

def create_ilm_policy(policy_name, policy_body):
    # Hapus metadata read-only sebelum menulis ke kluster tujuan.
    policy_body.pop('version', None)
    policy_body.pop('modified_date', None)
    policy_body.pop('modified_date_string', None)

    endpoint = f"/_ilm/policy/{policy_name}"
    create_result = send_http_request('PUT', config['new_cluster_host'], endpoint, config['new_cluster_user'], config['new_cluster_password'], protocol=config['new_cluster_protocol'], json_body=policy_body)
    print(f"Kebijakan {policy_name} dibuat dengan hasil: {create_result}")

def main():
    policies = get_ilm_polices()
    for policy_name, policy_body in policies.items():
        create_ilm_policy(policy_name, policy_body)

if __name__ == '__main__':
    main()

Verifikasi hasil

Jalankan perintah berikut di kluster tujuan untuk memastikan kebijakan ILM telah dibuat:

GET _ilm/policy/product

Menyinkronkan templat indeks

Templat indeks secara otomatis menerapkan pemetaan dan pengaturan ke indeks baru yang namanya sesuai dengan pola tertentu. Sinkronkan templat setelah kebijakan ILM dan sebelum membuat indeks apa pun yang bergantung padanya.

Buat data uji di kluster sumber

Jalankan perintah berikut di kluster sumber untuk membuat templat indeks uji:

PUT _template/product
{
  "index_patterns": ["product_*"],
  "settings": {
    "number_of_shards": 3,
    "number_of_replicas": 1
  },
  "mappings": {
    "properties": {
      "productName": {
        "type": "text"
      },
      "annual_rate": {
        "type": "keyword"
      },
      "describe": {
        "type": "text"
      }
    }
  }
}

Jalankan skrip sinkronisasi

Jalankan skrip berikut di instance ECS. Skrip ini mengambil semua templat indeks dari kluster sumber dan membuatnya di kluster tujuan.

import requests
from requests.auth import HTTPBasicAuth

# Perbarui nilai-nilai berikut sesuai lingkungan Anda.
config = {
    # Kluster sumber
    'old_cluster_host': 'es-cn-27a3jyyd2000770dd.public.elasticsearch.aliyuncs.com:9200',
    'old_cluster_user': 'yourusername',
    'old_cluster_password': 'yourpassward',
    'old_cluster_protocol': 'http',          # http atau https

    # Kluster tujuan
    # Temukan host di halaman Informasi Dasar di Konsol Elasticsearch.
    'new_cluster_host': 'es-cn-o493kbics000cw4pt.elasticsearch.aliyuncs.com:9200',
    'new_cluster_user': 'yourusername',
    'new_cluster_password': 'yourpassward',
    'new_cluster_protocol': 'http',          # http atau https

    'default_replicas': 1,
}

def send_http_request(method, host, endpoint, username="", password="", params=None, json_body=None, protocol='http'):
    url = f"{protocol}://{host}{endpoint}"
    auth = (username, password) if username and password else None
    headers = {'Content-Type': 'application/json'} if method != 'GET' else None
    try:
        response = requests.request(method, url, auth=auth, params=params, json=json_body, headers=headers)
        response.raise_for_status()
        return response.json()
    except requests.HTTPError as e:
        print(f"HTTP Error: {e.response.status_code} untuk {url}")
        print(e.response.text)
    except ValueError as e:
        print("Respons JSON tidak valid:")
        print(response.text)
        raise

def get_index_templates():
    endpoint = "/_template"
    templates_result = send_http_request('GET', config['old_cluster_host'], endpoint, config['old_cluster_user'], config['old_cluster_password'], protocol=config['old_cluster_protocol'])
    return templates_result

def create_index_template(template_name, template_body):
    endpoint = f"/_template/{template_name}"
    create_result = send_http_request('PUT', config['new_cluster_host'], endpoint, config['new_cluster_user'], config['new_cluster_password'], protocol=config['new_cluster_protocol'], json_body=template_body)
    print(f"Templat {template_name} dibuat dengan hasil: {create_result}")

def main():
    templates = get_index_templates()
    for template_name, template_body in templates.items():
        create_index_template(template_name, template_body)

if __name__ == '__main__':
    main()

Verifikasi hasil

Jalankan perintah berikut di kluster tujuan untuk memastikan templat indeks telah dibuat:

GET _template/product

Menyinkronkan pemetaan dan pengaturan indeks

Pengaturan indeks menentukan konfigurasi shard utama dan shard replika untuk suatu indeks. Bagian ini menyalin pengaturan tersebut beserta pemetaan bidang dari kluster sumber ke kluster tujuan.

Buat data uji di kluster sumber

Jalankan perintah berikut di kluster sumber untuk membuat indeks uji:

PUT /product_info
{
  "settings": {
    "number_of_shards": 3,
    "number_of_replicas": 1
  },
  "mappings": {
    "properties": {
      "productName": {
        "type": "text"
      },
      "annual_rate": {
        "type": "keyword"
      },
      "describe": {
        "type": "text"
      }
    }
  }
}

Ketiga bidang tersebut menunjukkan tipe pemetaan umum:

  1. productName — tipe text, diindeks teks penuh dan dianalisis untuk pencarian

  2. annual_rate — tipe keyword, disimpan apa adanya untuk penyaringan dan agregasi berbasis kecocokan eksak

  3. describe — tipe text, diindeks teks penuh dan dianalisis untuk pencarian

Jalankan skrip sinkronisasi

Jalankan skrip berikut di instance ECS. Skrip ini membaca semua indeks terbuka dari kluster sumber (melewatkan indeks sistem yang diawali dengan .) dan membuat ulangnya di kluster tujuan dengan pemetaan dan jumlah shard yang sama. Jumlah replika di tujuan ditetapkan oleh default_replicas, bukan disalin dari sumber.

import requests
from requests.auth import HTTPBasicAuth

# Perbarui nilai-nilai berikut sesuai lingkungan Anda.
config = {
    # Kluster sumber
    'old_cluster_host': 'es-cn-27a3jyyd2000770dd.public.elasticsearch.aliyuncs.com:9200',
    'old_cluster_user': 'yourusername',
    'old_cluster_password': 'yourpassward',
    'old_cluster_protocol': 'http',          # http atau https

    # Kluster tujuan
    # Temukan host di halaman Informasi Dasar di Konsol Elasticsearch.
    'new_cluster_host': 'es-cn-o493kbics000cw4pt.elasticsearch.aliyuncs.com:9200',
    'new_cluster_user': 'yourusername',
    'new_cluster_password': 'yourpassward',
    'new_cluster_protocol': 'http',          # http atau https

    # Jumlah replika yang diterapkan di kluster tujuan.
    'default_replicas': 1,
}

def send_http_request(method, host, endpoint, username="", password="", params=None, json_body=None, protocol='http'):
    url = f"{protocol}://{host}{endpoint}"
    auth = (username, password) if username and password else None
    headers = {'Content-Type': 'application/json'} if method != 'GET' else None
    try:
        response = requests.request(method, url, auth=auth, params=params, json=json_body, headers=headers)
        response.raise_for_status()
        return response.json()
    except requests.HTTPError as e:
        print(f"HTTP Error: {e.response.status_code} untuk {url}")
        print(e.response.text)
    except ValueError as e:
        print("Respons JSON tidak valid:")
        print(response.text)
        raise

def get_indices():
    endpoint = "/_cat/indices?format=json"
    indices_result = send_http_request('GET', config['old_cluster_host'], endpoint, config['old_cluster_user'], config['old_cluster_password'], protocol=config['old_cluster_protocol'])
    index_list = [index['index'] for index in indices_result if index['status'] == 'open']
    return index_list

def get_index_settings(index):
    endpoint = f"/{index}/_settings"
    index_settings = send_http_request('GET', config['old_cluster_host'], endpoint, config['old_cluster_user'], config['old_cluster_password'], protocol=config['old_cluster_protocol'])
    settings = index_settings[index]['settings']['index']
    shards_replicas_settings = {
        'number_of_shards': settings.get('number_of_shards'),
        'number_of_replicas': config['default_replicas']
    }
    return {'settings': shards_replicas_settings}

def get_index_mapping(index):
    endpoint = f"/{index}/_mapping"
    index_mapping = send_http_request('GET', config['old_cluster_host'], endpoint, config['old_cluster_user'], config['old_cluster_password'], protocol=config['old_cluster_protocol'])
    return {'mappings': index_mapping[index]['mappings']}

def create_index(old_index_name, new_index_name=""):
    if not new_index_name:
        new_index_name = old_index_name

    settings = get_index_settings(old_index_name)
    mappings = get_index_mapping(old_index_name)
    body = {**settings, **mappings}

    endpoint = f"/{new_index_name}"
    create_result = send_http_request('PUT', config['new_cluster_host'], endpoint, config['new_cluster_user'], config['new_cluster_password'], protocol=config['new_cluster_protocol'], json_body=body)

    print(f"Indeks {new_index_name} dibuat dengan hasil: {create_result}")

def main():
    index_list = get_indices()
    for index in index_list:
        if not index.startswith("."):  # Lewati indeks sistem.
            create_index(index)

if __name__ == '__main__':
    main()

Verifikasi hasil

Jalankan perintah berikut di kluster tujuan untuk memastikan indeks telah dibuat dengan pemetaan dan pengaturan yang benar:

GET _cat/indices/product_info