全部产品
Search
文档中心

Elasticsearch:Gunakan skrip Python untuk menyinkronkan pemetaan dan pengaturan indeks, templat indeks, serta kebijakan ILM

更新时间:Jul 06, 2025

Saat memigrasikan data antar kluster Elasticsearch, seperti dari kluster yang dikelola sendiri ke kluster Elasticsearch Alibaba Cloud, pastikan pemetaan kedua kluster identik. Hal ini mencegah masalah seperti kehilangan data, kesalahan format, dan penurunan kinerja kueri akibat pemetaan otomatis. Sebelum migrasi, buat indeks yang diperlukan dan definisikan pemetaan serta pengaturannya secara manual. Topik ini menjelaskan cara menggunakan skrip Python untuk menyinkronkan pemetaan dan pengaturan indeks, templat indeks, serta kebijakan manajemen siklus hidup indeks (ILM) dari satu kluster Elasticsearch ke kluster lainnya.

Prasyarat

  1. Dua kluster Elasticsearch Alibaba Cloud telah dibuat. Untuk informasi lebih lanjut, lihat Buat kluster Elasticsearch Alibaba Cloud.

    Kluster Elasticsearch digunakan sebagai kluster sumber dan tujuan, dengan versi V7.10.

    Catatan

    Contoh ini menggunakan skrip Python untuk kluster Elasticsearch V7.10. Pemetaan indeks pada V7.10 mungkin berbeda dari versi sebelumnya. Misalnya, V7.10 tidak mendukung indeks multi-tipe yang tersedia di versi sebelumnya. Jika Anda menggunakan versi sebelum V7.10, modifikasi skrip sesuai kebutuhan bisnis Anda.

  2. Sebuah instance Elastic Compute Service (ECS) telah dibuat, dan lingkungan Python telah dipersiapkan. Untuk informasi lebih lanjut, lihat Memulai dengan instance Linux.

    Contoh ini menggunakan lingkungan Python 3.6.8. Jika Anda ingin menggunakan versi lain, sesuaikan konfigurasi berdasarkan modul permintaan Python dari Operasi API untuk versi tersebut.

  3. Koneksi jaringan telah dibangun antara kluster Elasticsearch dan instance ECS, serta alamat IP publik atau privat dari instance ECS telah ditambahkan ke daftar putih alamat IP kluster Elasticsearch.

    Catatan

    Untuk memastikan keamanan data dalam lingkungan produksi, disarankan membangun koneksi jaringan melalui jaringan internal.

Menyinkronkan pemetaan dan pengaturan indeks

Sinkronkan pemetaan dan pengaturan indeks di kluster Elasticsearch sumber. Pengaturan indeks mencakup konfigurasi shard utama dan shard replika.

  1. Persiapkan data uji.

    Jalankan perintah berikut di kluster Elasticsearch sumber untuk membuat indeks:

    PUT /product_info
    {
      "settings": {
        "number_of_shards": 3,
        "number_of_replicas": 1
      },
      "mappings": {
          "properties": {
            "productName": {
              "type": "text"
            },
            "annual_rate":{
              "type":"keyword"
            },
            "describe": {
              "type": "text"
            }
        }
      }
    }
  2. Jalankan skrip berikut di instance ECS untuk menyinkronkan pemetaan dan pengaturan indeks:

    import requests
    from requests.auth import HTTPBasicAuth
    
    # Modifikasi konfigurasi berikut berdasarkan lingkungan yang Anda gunakan.
    config = {
        # Host kluster Elasticsearch sumber.
        'old_cluster_host': 'es-cn-27a3jyyd2000770dd.public.elasticsearch.aliyuncs.com:9200',
        # Nama pengguna yang dapat Anda gunakan untuk mengakses kluster Elasticsearch sumber.
        'old_cluster_user': 'yourusername',
        # Kata sandi yang dapat Anda gunakan untuk mengakses kluster Elasticsearch sumber.
        'old_cluster_password': 'yourpassward',
        # Protokol yang digunakan oleh kluster Elasticsearch sumber. Nilai valid: http atau https.
        'old_cluster_protocol': 'http',
        # Host kluster Elasticsearch tujuan. Anda dapat memperoleh host di halaman Informasi Dasar kluster Elasticsearch tujuan di konsol Elasticsearch.
        'new_cluster_host': 'es-cn-o493kbics000cw4pt.elasticsearch.aliyuncs.com:9200',
        # Nama pengguna yang dapat Anda gunakan untuk mengakses kluster Elasticsearch tujuan.
        'new_cluster_user': 'yourusername',
        # Kata sandi yang dapat Anda gunakan untuk mengakses kluster Elasticsearch tujuan.
        'new_cluster_password': 'yourpassward',
        # Protokol yang digunakan oleh kluster Elasticsearch tujuan. Nilai valid: http dan https.
        'new_cluster_protocol': 'http',
        # Jumlah default shard replika untuk kluster Elasticsearch tujuan.
        'default_replicas': 1,
    }
    
    # Tentukan fungsi umum untuk permintaan HTTP.
    def send_http_request(method, host, endpoint, username="", password="", params=None, json_body=None, protocol='http'):
        url = f"{protocol}://{host}{endpoint}"
        auth = (username, password) if username and password else None
        headers = {'Content-Type': 'application/json'} if method != 'GET' else None
        try:
            response = requests.request(method, url, auth=auth, params=params, json=json_body, headers=headers)
            response.raise_for_status()
            return response.json()
        except requests.HTTPError as e:
            # Cetak pesan kesalahan.
            print(f"HTTP Error: {e.response.status_code} for {url}")
            print(e.response.text)
        except ValueError as e:
            # Jika respons tidak dalam format JSON, cetak pesan kesalahan terkait dan kembalikan respons.
            print("Invalid JSON response:")
            print(response.text)
            raise
    
    # Kueri semua indeks.
    def get_indices():
        endpoint = "/_cat/indices?format=json"
        indices_result = send_http_request('GET', config['old_cluster_host'], endpoint, config['old_cluster_user'], config['old_cluster_password'], protocol=config['old_cluster_protocol'])
        index_list = [index['index'] for index in indices_result if index['status'] == 'open']
        return index_list
    
    # Kueri pengaturan indeks.
    def get_index_settings(index):
        endpoint = f"/{index}/_settings"
        index_settings = send_http_request('GET', config['old_cluster_host'], endpoint, config['old_cluster_user'], config['old_cluster_password'], protocol=config['old_cluster_protocol'])
        settings = index_settings[index]['settings']['index']
        shards_replicas_settings = {
            'number_of_shards': settings.get('number_of_shards'),
            'number_of_replicas': config['default_replicas']
        }
        return {'settings': shards_replicas_settings}
    
    # Kueri pemetaan indeks.
    def get_index_mapping(index):
        endpoint = f"/{index}/_mapping"
        index_mapping = send_http_request('GET', config['old_cluster_host'], endpoint, config['old_cluster_user'], config['old_cluster_password'], protocol=config['old_cluster_protocol'])
        return {'mappings': index_mapping[index]['mappings']}
    
    # Buat indeks baru.
    def create_index(old_index_name, new_index_name=""):
        if not new_index_name:
            new_index_name = old_index_name
    
        settings = get_index_settings(old_index_name)
        mappings = get_index_mapping(old_index_name)
        body = {**settings, **mappings}
    
        endpoint = f"/{new_index_name}"
        create_result = send_http_request('PUT', config['new_cluster_host'], endpoint, config['new_cluster_user'], config['new_cluster_password'], protocol=config['new_cluster_protocol'], json_body=body)
    
        print(f"Indeks {new_index_name} dibuat dengan hasil: {create_result}")
    
    # Tentukan fungsi utama.
    def main():
        index_list = get_indices()
        for index in index_list:
            if not index.startswith("."):  # Abaikan indeks sistem.
                create_index(index)
    
    
    if __name__ == '__main__':
        main()
    
  3. Verifikasi hasil sinkronisasi.

    Jalankan perintah berikut di kluster Elasticsearch tujuan untuk melihat pemetaan dan pengaturan yang disinkronkan:

    GET _cat/indices/product_info

Menyinkronkan templat indeks

  1. Persiapkan data uji.

    Jalankan perintah berikut di kluster Elasticsearch sumber untuk membuat templat indeks:

    PUT _template/product
    {
      "index_patterns": ["product_*"],
      "settings": {
        "number_of_shards": 3,
        "number_of_replicas": 1
      },
      "mappings": {
        "properties": {
            "productName": {
              "type": "text"
            },
            "annual_rate":{
              "type":"keyword"
            },
            "describe": {
              "type": "text"
            }
        }
      }
    }
  2. Jalankan skrip berikut di instance ECS untuk menyinkronkan templat indeks:

    import requests
    from requests.auth import HTTPBasicAuth
    
    # Modifikasi konfigurasi berikut berdasarkan lingkungan yang Anda gunakan.
    config = {
        # Host kluster Elasticsearch sumber.
        'old_cluster_host': 'es-cn-27a3jyyd2000770dd.public.elasticsearch.aliyuncs.com:9200',
        # Nama pengguna yang dapat Anda gunakan untuk mengakses kluster Elasticsearch sumber.
        'old_cluster_user': 'yourusername',
        # Kata sandi yang dapat Anda gunakan untuk mengakses kluster Elasticsearch sumber.
        'old_cluster_password': 'yourpassward',
        # Protokol yang digunakan oleh kluster Elasticsearch sumber. Nilai valid: http atau https.
        'old_cluster_protocol': 'http',
        # Host kluster Elasticsearch tujuan. Anda dapat memperoleh host di halaman Informasi Dasar kluster Elasticsearch tujuan di konsol Elasticsearch.
        'new_cluster_host': 'es-cn-o493kbics000cw4pt.elasticsearch.aliyuncs.com:9200',
        # Nama pengguna yang dapat Anda gunakan untuk mengakses kluster Elasticsearch tujuan.
        'new_cluster_user': 'yourusername',
        # Kata sandi yang dapat Anda gunakan untuk mengakses kluster Elasticsearch tujuan.
        'new_cluster_password': 'yourpassward',
        # Protokol yang digunakan oleh kluster Elasticsearch tujuan. Nilai valid: http dan https.
        'new_cluster_protocol': 'http',
        # Jumlah default shard replika untuk kluster Elasticsearch tujuan.
        'default_replicas': 1,
    }
    
    # Tentukan fungsi umum untuk permintaan HTTP.
    def send_http_request(method, host, endpoint, username="", password="", params=None, json_body=None, protocol='http'):
        url = f"{protocol}://{host}{endpoint}"
        auth = (username, password) if username and password else None
        headers = {'Content-Type': 'application/json'} if method != 'GET' else None
        try:
            response = requests.request(method, url, auth=auth, params=params, json=json_body, headers=headers)
            response.raise_for_status()
            return response.json()
        except requests.HTTPError as e:
            # Cetak pesan kesalahan.
            print(f"HTTP Error: {e.response.status_code} for {url}")
            print(e.response.text)
        except ValueError as e:
            # Jika respons tidak dalam format JSON, cetak pesan kesalahan terkait dan kembalikan respons.
            print("Invalid JSON response:")
            print(response.text)
            raise
    
    # Kueri semua templat indeks kluster Elasticsearch sumber.
    def get_index_templates():
        endpoint = "/_template"
        templates_result = send_http_request('GET', config['old_cluster_host'], endpoint, config['old_cluster_user'], config['old_cluster_password'], protocol=config['old_cluster_protocol'])
        return templates_result
    
    # Buat templat indeks untuk kluster Elasticsearch tujuan.
    def create_index_template(template_name, template_body):
        endpoint = f"/_template/{template_name}"
        create_result = send_http_request('PUT', config['new_cluster_host'], endpoint, config['new_cluster_user'], config['new_cluster_password'], protocol=config['new_cluster_protocol'], json_body=template_body)
        print(f"Templat {template_name} dibuat dengan hasil: {create_result}")
    
    # Tentukan fungsi utama.
    def main():
    
        # Sinkronkan templat indeks kluster Elasticsearch sumber.
        templates = get_index_templates()
        for template_name, template_body in templates.items():
            create_index_template(template_name, template_body)
    
    if __name__ == '__main__':
        main()
  3. Verifikasi hasil sinkronisasi.

    Jalankan perintah berikut di kluster Elasticsearch tujuan untuk menanyakan informasi tentang templat indeks yang disinkronkan:

    GET _template/product

Menyinkronkan kebijakan ILM

  1. Persiapkan data uji.

    Jalankan perintah berikut di kluster Elasticsearch sumber untuk membuat kebijakan ILM:

    PUT _ilm/policy/product
    {
      "policy": {
        "phases": {
          "hot": {
            "actions": {
              "rollover": {
                "max_size": "1GB",
                "max_age": "1d",
                "max_docs": 1000
              }
            }
          },
          "delete": {
            "min_age": "2h",
            "actions": {
              "delete": {}
            }
          }
        }
      }
    }
  1. Jalankan skrip berikut di instance ECS untuk menyinkronkan kebijakan ILM:

    import requests
    from requests.auth import HTTPBasicAuth
    
    # Modifikasi konfigurasi berikut berdasarkan lingkungan yang Anda gunakan.
    config = {
        # Host kluster Elasticsearch sumber.
        'old_cluster_host': 'es-cn-27a3jyyd2000770dd.public.elasticsearch.aliyuncs.com:9200',
        # Nama pengguna yang dapat Anda gunakan untuk mengakses kluster Elasticsearch sumber.
        'old_cluster_user': 'yourusername',
        # Kata sandi yang dapat Anda gunakan untuk mengakses kluster Elasticsearch sumber.
        'old_cluster_password': 'yourpassward',
        # Protokol yang digunakan oleh kluster Elasticsearch sumber. Nilai valid: http atau https.
        'old_cluster_protocol': 'http',
        # Host kluster Elasticsearch tujuan. Anda dapat memperoleh host di halaman Informasi Dasar kluster Elasticsearch tujuan di konsol Elasticsearch.
        'new_cluster_host': 'es-cn-o493kbics000cw4pt.elasticsearch.aliyuncs.com:9200',
        # Nama pengguna yang dapat Anda gunakan untuk mengakses kluster Elasticsearch tujuan.
        'new_cluster_user': 'yourusername',
        # Kata sandi yang dapat Anda gunakan untuk mengakses kluster Elasticsearch tujuan.
        'new_cluster_password': 'yourpassward',
        # Protokol yang digunakan oleh kluster Elasticsearch tujuan. Nilai valid: http dan https.
        'new_cluster_protocol': 'http',
        # Jumlah default shard replika untuk kluster Elasticsearch tujuan.
        'default_replicas': 1,
    }
    
    # Tentukan fungsi umum untuk permintaan HTTP.
    def send_http_request(method, host, endpoint, username="", password="", params=None, json_body=None, protocol='http'):
        url = f"{protocol}://{host}{endpoint}"
        auth = (username, password) if username and password else None
        headers = {'Content-Type': 'application/json'} if method != 'GET' else None
        try:
            response = requests.request(method, url, auth=auth, params=params, json=json_body, headers=headers)
            response.raise_for_status()
            return response.json()
        except requests.HTTPError as e:
            # Cetak pesan kesalahan.
            print(f"HTTP Error: {e.response.status_code} for {url}")
            print(e.response.text)
        except ValueError as e:
            # Jika respons tidak dalam format JSON, cetak pesan kesalahan terkait dan kembalikan respons.
            print("Invalid JSON response:")
            print(response.text)
            raise
    
    # Kueri semua kebijakan ILM kluster Elasticsearch sumber.
    def get_ilm_polices():
        endpoint = "/_ilm/policy"
        templates_result = send_http_request('GET', config['old_cluster_host'], endpoint, config['old_cluster_user'], config['old_cluster_password'], protocol=config['old_cluster_protocol'])
        return templates_result
    
    # Buat kebijakan ILM untuk kluster Elasticsearch tujuan.
    def create_ilm_policy(policy_name, policy_body):
        policy_body.pop('version', None)
        policy_body.pop('modified_date', None)
        policy_body.pop('modified_date_string', None)
    
        endpoint = f"/_ilm/policy/{policy_name}"
        create_result = send_http_request('PUT', config['new_cluster_host'], endpoint, config['new_cluster_user'], config['new_cluster_password'], protocol=config['new_cluster_protocol'], json_body=policy_body)
        print(f"Kebijakan {policy_name} dibuat dengan hasil: {create_result}")
    
    # Tentukan fungsi utama.
    def main():
    
        # Sinkronkan kebijakan ILM kluster Elasticsearch sumber.
        policies = get_ilm_polices()
        for policy_name, policy_body in policies.items():
            create_ilm_policy(policy_name, policy_body)
    if __name__ == '__main__':
        main()
    
  2. Verifikasi hasil sinkronisasi.

    Jalankan perintah berikut di kluster Elasticsearch tujuan untuk menanyakan informasi tentang kebijakan ILM yang disinkronkan:

    GET _ilm/policy/product