すべてのプロダクト
Search
ドキュメントセンター

Elasticsearch:Logstash を使用してセルフマネージド型 Elasticsearch から Alibaba Cloud Elasticsearch にデータを移行

最終更新日:Mar 01, 2026

Elastic Compute Service (ECS) インスタンスにセルフマネージド型 Logstash インスタンスをデプロイし、セルフマネージド型 Elasticsearch クラスターから Alibaba Cloud Elasticsearch クラスターに完全なデータまたは増分データを移行します。

適切な移行方法の選択

方法最適な用途トレードオフ
Logstash増分同期、継続的なレプリケーション、選択的なインデックス移行実行中の Logstash インスタンスが必要。パイプライン処理のオーバーヘッドが発生
スナップショット大規模なデータセットの 1 回限りの完全移行共有ストレージが必要。完全なインデックスを復元 (ドキュメントレベルの増分同期なし)
Reindex API小規模なデータセットまたはクロスバージョン移行大量のデータには低速。ネットワークに依存

前提条件

開始する前に、以下を確認してください。

  • 移行するデータを含むセルフマネージド型 Elasticsearch クラスター (ソース)

  • 作成および実行中の Alibaba Cloud Elasticsearch クラスター (宛先)。詳細については、「Alibaba Cloud Elasticsearch クラスターの作成」をご参照ください。

  • 宛先クラスターと同じ Virtual Private Cloud (VPC) 内にあり、パブリック IPv4 アドレスが割り当てられた ECS インスタンス

  • ECS インスタンスにインストールされたセルフマネージド型 Logstash

  • Logstash と両方の Elasticsearch クラスター間のネットワーク接続性の確認

  • ポート 5601 (Kibana アクセス用) でのトラフィックを許可するセキュリティグループのインバウンドルール

重要

ECS インスタンスは、Alibaba Cloud Elasticsearch クラスターと同じ VPC 内に存在する必要があります。Logstash は、ソースクラスターと宛先クラスターの両方に接続できる必要があります。

ステップ 1: 環境設定

このセクションでは、単一の ECS インスタンスにセルフマネージド型 Elasticsearch クラスター、Kibana、および Logstash をデプロイする手順を説明します。これらのコンポーネントがすでに実行されている場合は、「ステップ 2」に進んでください。

環境仕様の例

次の表は、このドキュメントで使用されている構成の例を示しています。

Alibaba Cloud Elasticsearch クラスター

構成
リージョン中国 (杭州)
エディションとバージョンV7.10.0、Standard Edition
ゾーンとノード3 つのゾーン、3 つのデータノード
単一ノードのスペック4 vCPU、16 GiB メモリ、100 GiB ストレージの Enhanced SSD (ESSD)

ECS インスタンス

設定
リージョン中国 (杭州)
仕様4 vCPU、16 GiB メモリ
イメージCentOS 7.9 64 ビット (パブリックイメージ)
システムディスクESSD、100 GiB
ネットワークAlibaba Cloud Elasticsearch クラスターと同じ VPC。 [パブリック IPv4 アドレスの割り当て] を選択済み。トラフィック課金、ピーク帯域幅 100 Mbit/s
セキュリティグループポート 5601 (Kibana) のトラフィックを許可するインバウンドルール。権限付与オブジェクトとして、ご利用のクライアント IP アドレスを追加
重要
  • クライアントが自宅またはオフィスの LAN 上にある場合は、ローカル IP アドレスではなく、パブリックエグレス IP アドレスを追加してください。パブリック IP を確認するには、cip.cc にアクセスしてください。

  • 0.0.0.0/0 を権限付与オブジェクトとして追加すると、すべてのパブリック IPv4 アドレスからのアクセスが許可されます。セキュリティリスクがあるため、本番環境ではこれを避けてください。

ECS インスタンスの作成の詳細については、「カスタム起動タブでのインスタンス作成」をご参照ください。

Alibaba Cloud Elasticsearch クラスターの作成

上記の仕様でクラスターを作成するか、要件に合わせて調整してください。詳細な手順については、「Alibaba Cloud Elasticsearch クラスターの作成」をご参照ください。

セルフマネージド型 Elasticsearch のデプロイ

この例では、単一ノードの Elasticsearch 7.6.2 クラスターを ECS インスタンスにデプロイします。

  1. ECS インスタンスに接続します。詳細については、「パスワードまたはキーを使用して Linux インスタンスに接続」をご参照ください。

  2. ルートユーザーとして elastic という名前のユーザーを作成します。

       useradd elastic
  3. elastic ユーザーのパスワードを設定します。プロンプトが表示されたらパスワードを入力して確認します。

       passwd elastic
  4. elastic ユーザーに切り替えます。

       su -l elastic
  5. Elasticsearch パッケージをダウンロードして展開します。

       wget https://artifacts.elastic.co/downloads/elasticsearch/elasticsearch-7.6.2-linux-x86_64.tar.gz
       tar -zvxf elasticsearch-7.6.2-linux-x86_64.tar.gz
  6. Elasticsearch を起動します。

       cd elasticsearch-7.6.2
       ./bin/elasticsearch -d
  7. Elasticsearch が実行中であることを確認します。正常な応答には、バージョン番号とメッセージ "You Know, for Search" が含まれています。

       cd ~
       curl localhost:9200

Kibana のデプロイとテストデータの追加

この例では、Kibana 7.6.2 を同じ ECS インスタンスにデプロイします。

Kibana はルートではなく、一般ユーザーとして実行してください。
  1. Kibana パッケージをダウンロードして展開します。

       wget https://artifacts.elastic.co/downloads/kibana/kibana-7.6.2-linux-x86_64.tar.gz
       tar -zvxf kibana-7.6.2-linux-x86_64.tar.gz
  2. すべての IP アドレスからの接続を許可するように Kibana を設定します。config/ ディレクトリにある kibana.yml ファイルを開き、次の行を追加します。追加:

       cd kibana-7.6.2-linux-x86_64
       vi config/kibana.yml
       server.host: "0.0.0.0"
  3. Kibana を起動します。

       sudo nohup ./bin/kibana &
  4. ブラウザで Kibana コンソールを開きます。

       http://<ECS-public-IP>:5601/app/kibana#/home
  5. Kibana ホームページで、[サンプルデータを試す] をクリックします。[サンプルデータ] タブで、サンプル Web ログカードの [データを追加] をクリックして、テストデータを読み込みます。

Logstash のデプロイ

この例では、Logstash 7.10.0 を同じ ECS インスタンスにデプロイします。

Logstash はルートではなく、一般ユーザーとして実行してください。
  1. Logstash パッケージをダウンロードして展開します。

       cd ~
       wget https://artifacts.elastic.co/downloads/logstash/logstash-7.10.0-linux-x86_64.tar.gz
       tar -zvxf logstash-7.10.0-linux-x86_64.tar.gz
  2. JVM ヒープメモリを増やします。デフォルトのヒープサイズは 1 GiB です。データ移行を高速化するために、ECS インスタンスのスペックに適した値を設定します。ヒープサイズを設定します (例: 16 GiB インスタンスの場合は 8 GiB)。

       cd logstash-7.10.0
       sudo vi config/jvm.options
       -Xms8g
       -Xmx8g
  3. バッチサイズを増やします。pipelines.yml 構成ファイルを編集し、pipeline.batch.size をデフォルトの 125 から 5000 に変更します。これにより、Logstash はバッチあたり 5 MiB から 15 MiB のデータを書き込むことができ、移行が高速化されます。

       vi config/pipelines.yml
  4. Logstash が正しく動作することを確認します: Hello world! と入力して Enter キーを押します。Logstash が動作している場合、ターミナルに Hello world! が出力されます。Ctrl+C を押して停止します。

       bin/logstash -e 'input { stdin { } } output { stdout {} }'

ステップ 2: (オプション) インデックスメタデータの移行

Logstash がデータを移行すると、Alibaba Cloud Elasticsearch クラスターの自動インデックス作成機能によってインデックスが自動的に作成されます。ただし、自動作成されたインデックスは、ソースインデックスとは異なる設定とマッピングを持つ場合があります。元のインデックス構造を保持するには、データを移行する前に宛先クラスターにインデックスを手動で作成します。

次の Python スクリプトは、ソースクラスターからインデックス設定とマッピングを読み取り、宛先クラスターに一致するインデックスを作成します。

  1. 一般ユーザーとして ECS インスタンスに接続します。

  2. Python スクリプトファイルを作成します。

       sudo vi indiceCreate.py
  3. 次のスクリプトを貼り付けます。ソースクラスターと宛先クラスターの両方のホスト、ユーザー名、パスワードの値を置き換えてください。このスクリプトの主な動作:

    • ソースインデックスのプライマリシャードの数を保持します。

    • 初期インデックス作成を高速化するために、レプリカシャードの数を 0 (DEFAULT_REPLICAS = 0) に設定します。移行後にレプリカを増やしてください。

    • システムインデックス (名前が . で始まるもの) をスキップします。必要に応じて、これらを手動で再作成してください。

       #!/usr/bin/python
       # -*- coding: UTF-8 -*-
       # File name: indiceCreate.py
       import sys
       import base64
       import time
       import httplib
       import json
       ## Source Elasticsearch cluster
       oldClusterHost = "<source-es-host>:9200"
       ## Source cluster username (leave blank if not required)
       oldClusterUserName = "elastic"
       ## Source cluster password (leave blank if not required)
       oldClusterPassword = "<source-es-password>"
       ## Destination Elasticsearch cluster (find on the Basic Information page)
       newClusterHost = "<destination-es-endpoint>:9200"
       ## Destination cluster username
       newClusterUser = "elastic"
       ## Destination cluster password
       newClusterPassword = "<destination-es-password>"
       DEFAULT_REPLICAS = 0
       def httpRequest(method, host, endpoint, params="", username="", password=""):
           conn = httplib.HTTPConnection(host)
           headers = {}
           if (username != "") :
               'Hello {name}, your age is {age} !'.format(name = 'Tom', age = '20')
               base64string = base64.encodestring('{username}:{password}'.format(username = username, password = password)).replace('\n', '')
               headers["Authorization"] = "Basic %s" % base64string;
           if "GET" == method:
               headers["Content-Type"] = "application/x-www-form-urlencoded"
               conn.request(method=method, url=endpoint, headers=headers)
           else :
               headers["Content-Type"] = "application/json"
               conn.request(method=method, url=endpoint, body=params, headers=headers)
           response = conn.getresponse()
           res = response.read()
           return res
       def httpGet(host, endpoint, username="", password=""):
           return httpRequest("GET", host, endpoint, "", username, password)
       def httpPost(host, endpoint, params, username="", password=""):
           return httpRequest("POST", host, endpoint, params, username, password)
       def httpPut(host, endpoint, params, username="", password=""):
           return httpRequest("PUT", host, endpoint, params, username, password)
       def getIndices(host, username="", password=""):
           endpoint = "/_cat/indices"
           indicesResult = httpGet(oldClusterHost, endpoint, oldClusterUserName, oldClusterPassword)
           indicesList = indicesResult.split("\n")
           indexList = []
           for indices in indicesList:
               if (indices.find("open") > 0):
                   indexList.append(indices.split()[2])
           return indexList
       def getSettings(index, host, username="", password=""):
           endpoint = "/" + index + "/_settings"
           indexSettings = httpGet(host, endpoint, username, password)
           print (index + "  Original settings: \n" + indexSettings)
           settingsDict = json.loads(indexSettings)
           ## Number of primary shards matches the source index
           number_of_shards = settingsDict[index]["settings"]["index"]["number_of_shards"]
           ## Default replica count is 0
           number_of_replicas = DEFAULT_REPLICAS
           newSetting = "\"settings\": {\"number_of_shards\": %s, \"number_of_replicas\": %s}" % (number_of_shards, number_of_replicas)
           return newSetting
       def getMapping(index, host, username="", password=""):
           endpoint = "/" + index + "/_mapping"
           indexMapping = httpGet(host, endpoint, username, password)
           print (index + " Original mappings: \n" + indexMapping)
           mappingDict = json.loads(indexMapping)
           mappings = json.dumps(mappingDict[index]["mappings"])
           newMapping = "\"mappings\" : " + mappings
           return newMapping
       def createIndexStatement(oldIndexName):
           settingStr = getSettings(oldIndexName, oldClusterHost, oldClusterUserName, oldClusterPassword)
           mappingStr = getMapping(oldIndexName, oldClusterHost, oldClusterUserName, oldClusterPassword)
           createstatement = "{\n" + str(settingStr) + ",\n" + str(mappingStr) + "\n}"
           return createstatement
       def createIndex(oldIndexName, newIndexName=""):
           if (newIndexName == "") :
               newIndexName = oldIndexName
           createstatement = createIndexStatement(oldIndexName)
           print ("New index " + newIndexName + " Index settings and mappings: \n" + createstatement)
           endpoint = "/" + newIndexName
           createResult = httpPut(newClusterHost, endpoint, createstatement, newClusterUser, newClusterPassword)
           print ("New index " + newIndexName + " Creation result: " + createResult)
       ## main
       indexList = getIndices(oldClusterHost, oldClusterUserName, oldClusterPassword)
       systemIndex = []
       for index in indexList:
           if (index.startswith(".")):
               systemIndex.append(index)
           else :
               createIndex(index, index)
       if (len(systemIndex) > 0) :
           for index in systemIndex:
               print (index + " It may be a system index and will not be recreated. You can manually recreate the index based on your business requirements.")
  4. スクリプトを実行します。

       sudo /usr/bin/python indiceCreate.py
  5. 作成されたインデックスを確認します。Alibaba Cloud Elasticsearch クラスターの Kibana コンソールにログインし (「Kibana コンソールへのログイン」をご参照ください)、以下を実行します。

       GET /_cat/indices?v

ステップ 3: 完全データの移行

ソースクラスターでデータの書き込みまたは更新が継続的に発生する場合は、まず完全なデータ移行を実行します。これにより、増分同期を開始する前に、宛先クラスターに完全なベースラインが確立されます。

データ精度を向上させるには、複数の Logstash パイプラインを作成し、異なるインデックスを個別に移行してください。
Logstash の構成フォーマットはバージョン 8.5 で変更されました。このセクションでは、7.x と 8.x の両方の構成を提供します。
  1. ECS インスタンスに接続します。

  2. Logstash 構成ファイルを作成します。

       cd logstash-7.10.0/config
       vi es2es_all.conf
  3. Logstash のバージョンに応じた構成を追加します。

    説明
    • Logstash 8.5 の構成パラメーターは異なります。このセクションでは、7.x と 8.x の両方の構成を提供します。

    • データ精度を向上させるには、複数の Logstash パイプラインを作成し、異なるインデックスを個別に移行してください。

    Logstash 7.x の構成

       input{
           elasticsearch{
               # Source Elasticsearch host
               hosts =>  ["http://<source-es-host>:9200"]
               # Source cluster credentials
               user => "<source-es-username>"
               password => "<source-es-password>"
               # Indices to migrate (supports wildcards)
               index => "kibana_sample_data_*"
               # Performance settings
               docinfo=>true
               slices => 5
               size => 5000
           }
       }
    
       filter {
         # Remove fields added by Logstash
         mutate {
           remove_field => ["@timestamp", "@version"]
         }
       }
    
       output{
           elasticsearch{
               # Destination cluster endpoint (find on the Basic Information page)
               hosts => ["http://<destination-es-endpoint>:9200"]
               # Destination cluster credentials
               user => "elastic"
               password => "<destination-es-password>"
               # Preserve original index names
               index => "%{[@metadata][_index]}"
               # Preserve original document types (7.x only)
               document_type => "%{[@metadata][_type]}"
               # Preserve original document IDs (remove for better performance)
               document_id => "%{[@metadata][_id]}"
               ilm_enabled => false
               manage_template => false
           }
       }

    Logstash 8.x の構成

       input{
           elasticsearch{
               # Source Elasticsearch host
               hosts =>  ["http://<source-es-host>:9200"]
               # Source cluster credentials
               user => "elastic"
               password => "<source-es-password>"
               # Indices to migrate
               index => "<index-name>"
               # Performance settings
               docinfo => true
               size => 10000
               docinfo_target => "[@metadata]"
           }
       }
    
       filter {
         # Remove fields added by Logstash
         mutate {
           remove_field => ["@timestamp","@version"]
         }
       }
    
       output{
           elasticsearch{
               # Destination cluster endpoint (find on the Basic Information page)
               hosts => ["http://<destination-es-endpoint>:9200"]
               # Destination cluster credentials
               user => "elastic"
               password => "<destination-es-password>"
               # Preserve original index names
               index => "%{[@metadata][_index]}"
               # Preserve original document IDs (remove for better performance)
               document_id => "%{[@metadata][_id]}"
               ilm_enabled => false
               manage_template => false
           }
       }

    重複データの防止: Elasticsearch 入力プラグインはすべてのデータを読み取った後、Logstash プロセスを停止します。Logstash はプロセスを自動的に再起動するため、単一のパイプラインで重複書き込みが発生する可能性があります。これを防ぐには、schedule パラメーターを cron 式とともに使用して、特定の時間にパイプラインを実行します。たとえば、3 月 5 日の 13:20 にパイプラインを 1 回実行するには、次のようにします。cron 構文の詳細については、Logstash ドキュメントの「スケジューリング」をご参照ください。

    Logstash 8.x では、document_type 設定は削除され、docinfo_target => "[@metadata]" が必須です。
       schedule => "20 13 5 3 *"
  4. Logstash インストールディレクトリに移動します。

       cd ~/logstash-7.10.0/config
       vi es2es_kibana_sample_data_logs.conf
  5. 移行を開始します。

       nohup bin/logstash -f config/es2es_all.conf >/dev/null 2>&1 &

ステップ 4: 増分データの移行

完全なデータ移行後、継続的な変更をキャプチャするために増分同期を設定します。増分パイプラインは、時間範囲クエリと cron スケジュールを使用して、ソースクラスターから新しいデータを定期的にプルします。

このステップは一般ユーザーとして実行してください。
  1. 増分移行構成ファイルを作成します。

       cd ~/logstash-7.10.0/config
       vi es2es_kibana_sample_data_logs.conf
  2. 次の構成を追加します。この例では、過去 5 分間に更新されたデータをクエリし、毎分実行します。Logstash 7.x の構成:

    Logstash 8.x の場合は、document_type => "%{[@metadata][_type]}" 設定を削除してください。
    重要

    - Logstash のタイムスタンプは UTC です。ローカルタイムゾーンが UTC + 08:00 の場合は、それに応じてタイムスタンプを変換してください。query パラメーターの @timestamp 範囲フィルターは UTC を使用します。 - ソースインデックスに時間フィールドが含まれていない場合は、ingest パイプライン_ingest.timestamp パラメーターを使用して 1 つ追加してください。

       input{
           elasticsearch{
               # Source Elasticsearch host
               hosts =>  ["http://<source-es-host>:9200"]
               # Source cluster credentials
               user => "<source-es-username>"
               password => "<source-es-password>"
               # Index to sync
               index => "kibana_sample_data_logs"
               # Query for incremental data (last 5 minutes)
               query => '{"query":{"range":{"@timestamp":{"gte":"now-5m","lte":"now/m"}}}}'
               # Run every minute
               schedule => "* * * * *"
               scroll => "5m"
               docinfo=>true
               size => 5000
           }
       }
    
       filter {
         # Remove fields added by Logstash
         mutate {
           remove_field => ["@timestamp", "@version"]
         }
       }
    
       output{
           elasticsearch{
               # Destination cluster endpoint (find on the Basic Information page)
               hosts => ["http://<destination-es-endpoint>:9200"]
               # Destination cluster credentials
               user => "elastic"
               password => "<destination-es-password>"
               # Preserve original index names
               index => "%{[@metadata][_index]}"
               # Preserve original document types (remove for 8.x)
               document_type => "%{[@metadata][_type]}"
               # Preserve original document IDs
               document_id => "%{[@metadata][_id]}"
               ilm_enabled => false
               manage_template => false
           }
       }
  3. Logstash インストールディレクトリに移動します。

       cd ~/logstash-7.10.0
  4. 増分移行を開始します。

       sudo nohup bin/logstash -f config/es2es_kibana_sample_data_logs.conf >/dev/null 2>&1 &
  5. Alibaba Cloud Elasticsearch クラスターで増分データを確認します。Kibana コンソールで、最近更新されたレコードをクエリします。

       GET kibana_sample_data_logs/_search
       {
         "query": {
           "range": {
             "@timestamp": {
               "gte": "now-5m",
               "lte": "now/m"
             }
           }
         },
         "sort": [
           {
             "@timestamp": {
               "order": "desc"
             }
           }
         ]
       }

ステップ 5: 移行結果の確認

完全なデータ移行の確認

  1. ソースクラスターの Kibana コンソールで、インデックスのドキュメント数をチェックします。

       GET _cat/indices?v
  2. Alibaba Cloud Elasticsearch クラスターの Kibana コンソールで、同じクエリを実行します。

       GET _cat/indices?v
  3. 各インデックスの docs.count の値を比較します。完全なデータ移行が成功した場合、ソースクラスターと宛先クラスター間でドキュメント数が一致します。

増分データ移行の確認

  1. ソースクラスターの Kibana コンソールで、最新の更新されたレコードをクエリします。

       GET kibana_sample_data_logs/_search
       {
         "query": {
           "range": {
             "@timestamp": {
               "gte": "now-5m",
               "lte": "now/m"
             }
           }
         },
         "sort": [
           {
             "@timestamp": {
               "order": "desc"
             }
           }
         ]
       }
  2. Alibaba Cloud Elasticsearch クラスターの Kibana コンソールで同じクエリを実行します。増分データ移行が成功した場合、ソースクラスターと宛先クラスター間で最近のレコードが一致します。

パフォーマンスチューニング

移行スループットを最適化するために、次のパラメーターを調整します。

パラメーター場所デフォルト推奨効果
pipeline.batch.sizeconfig/pipelines.yml1255000バッチあたりのイベント数。値が大きいほどスループットは向上しますが、より多くのメモリを使用します
-Xms / -Xmxconfig/jvm.options1g利用可能なメモリの 50% (例: 16 GiB インスタンスの場合は 8g)JVM ヒープサイズ。ヒープが大きいほど、より大きなバッチをサポートします
sizeパイプライン構成 (入力)10005000--10000ソースクラスターからのスクロールリクエストあたりのドキュメント数
slicesパイプライン構成 (入力)なし (無効)5並列スクロールスライス。スライススクロールを有効にして読み取り並列度を向上させます
scrollパイプライン構成 (入力)1m5mスクロールコンテキストのタイムアウト。低速ネットワークまたは大規模インデックスの場合に増加します
-Xms-Xmx を同じ値に設定して、JVM ガベージコレクションの一時停止を回避してください。システムの総メモリの 50% を超えないようにしてください。

認証情報のセキュリティ

認証情報を Logstash 構成ファイルに直接埋め込むことは避けてください。次のいずれかの代替方法を使用してください。

  • Logstash キーストア: 機密値を Logstash キーストアに保存し、構成ファイルで ${KEY_NAME} 構文で参照します。

  • 環境変数: 認証情報を環境変数としてエクスポートし、構成ファイルで参照します。

詳細については、Logstash ドキュメントの「シークレットキーストア」をご参照ください。

トラブルシューティング

Logstash がソースクラスターに接続できない

症状: Logstash ログに接続拒否またはタイムアウトエラーが表示される。

ソリューション:

  1. ソース Elasticsearch のホストとポートが正しいことを確認します。

  2. ECS セキュリティグループがソースクラスターへのアウトバウンドトラフィックを許可していることを確認します。

  3. ECS インスタンスから curl <source-es-host>:9200 を使用して接続性をテストします。

Logstash が宛先クラスターに接続できない

症状: Alibaba Cloud Elasticsearch クラスターへの書き込み時に認証失敗または接続タイムアウトが発生する。

ソリューション:

  1. Alibaba Cloud Elasticsearch コンソールの [基本情報] ページで、送信先クラスターエンドポイントを確認してください。

  2. ECS インスタンスが宛先クラスターと同じ VPC 内にあることを確認します。

  3. ユーザー名とパスワードが正しいことを確認します。

移行後にドキュメント数が一致しない

症状: 宛先クラスターのドキュメント数がソースクラスターよりも少ない。

ソリューション:

  1. Logstash ログでエラーを確認します。tail -f logs/logstash-plain.log

  2. パイプライン構成のインデックス名がソースインデックスと一致することを確認します。

  3. ワイルドカードパターンを使用している場合、意図したすべてのインデックスと一致することを確認します。

  4. 完全な移行パイプラインを再実行します。document_id が設定されている場合、Logstash はドキュメント ID を使用して重複を防止します。

完全な移行後の重複データ

症状: 宛先クラスターのドキュメント数がソースクラスターよりも多い。

ソリューション: これは、Logstash が完了後にパイプラインを自動的に再起動するときに発生します。schedule パラメーターを cron 式とともに使用して、パイプラインを 1 回だけ実行します (例: schedule => "20 13 5 3 *")。

増分移行におけるタイムゾーンの不一致

症状: 増分クエリが予期しない結果を返すか、最近のデータを見逃す。

ソリューション: Logstash のタイムスタンプは UTC を使用します。ソースデータが異なるタイムゾーン (例: UTC + 08:00) を使用している場合、query パラメーターの @timestamp 範囲をそれに応じて調整してください。

インデックスマッピングの競合

症状: 宛先クラスターでデータをインデックス作成する際に、フィールドタイプ競合に関するエラーが発生する。

ソリューション:

  1. データを移行する前に、インデックスメタデータ移行スクリプト (ステップ 2) を実行します。

  2. 宛先クラスターのマッピングがソースと一致することを確認します。GET /<index-name>/_mapping

  3. 競合が続く場合、宛先クラスターで自動作成されたインデックスを削除し、正しいマッピングで再作成します。

移行後のチェックリスト

移行完了後、以下を確認してください。

  • [ ] 移行されたすべてのインデックスについて、ソースと宛先間でドキュメント数が一致していること

  • [ ] サンプルドキュメントのコンテンツが同一であること (いくつかのドキュメントをスポットチェック)

  • [ ] 宛先クラスターのインデックスマッピングが正しいこと

  • [ ] Logstash ログ (logs/logstash-plain.log) にエラーがないこと

  • [ ] アプリケーションエンドポイントが Alibaba Cloud Elasticsearch クラスターを指すように更新されていること

  • [ ] 新しいクラスターへのアプリケーション接続性が確認されていること

  • [ ] 切り替え後に増分同期パイプラインが停止されていること

  • [ ] ECS インスタンス上の Logstash パイプラインがクリーンアップされていること

  • [ ] 宛先クラスターがエラーとパフォーマンスについて監視されていること

参考文献