すべてのプロダクト
Search
ドキュメントセンター

Elasticsearch:MongoDB から Alibaba Cloud Elasticsearch にデータをリアルタイムで同期するために Monstache を使用する

最終更新日:Jan 11, 2025

Alibaba Cloud Elasticsearch では、MongoDB データベースに格納されているビジネスデータのセマンティクスを分析し、分析結果を大きなチャートに表示できます。このトピックでは、MongoDB データベースから Alibaba Cloud Elasticsearch クラスターにデータをリアルタイムで同期するために Monstache を使用する方法について説明します。また、データの分析方法と分析結果の表示方法についても説明します。

背景情報

このトピックの例では、人気映画のデータを解析して統計を収集する方法を示します。次の操作を実行できます。

  • Monstache を使用して、完全または増分データを迅速に同期およびサブスクライブする。

  • MongoDB データベースから Elasticsearch クラスターの新しいバージョンにデータをリアルタイムで同期する。

  • Monstache の一般的な構成パラメーターについて理解する。

メリット

  • MongoDB データベース、Alibaba Cloud Elasticsearch クラスター、および Monstache は仮想プライベートクラウド(VPC)にデプロイされます。データは内部ネットワークを介して安全かつ高速に転送できます。

  • Monstache は、MongoDB オペログに基づいてデータをリアルタイムで同期およびサブスクライブします。これにより、MongoDB データベースと Elasticsearch クラスターの新しいバージョン間でデータを同期できます。 Monstache は、MongoDB の変更ストリームと集約パイプラインをサポートしています。 Monstache の機能の詳細については、「機能」をご参照ください。

  • Monstache は論理削除と物理削除をサポートしています。また、データベースとコレクションを削除するためにも使用できます。Elasticsearch クラスターと MongoDB データベース間のデータ整合性をリアルタイムで確保できます。

手順

  1. 手順 1:準備を行う

    同じ VPC に ApsaraDB for MongoDB インスタンス、Alibaba Cloud Elasticsearch クラスター、および Elastic Compute Service(ECS)インスタンスを作成します。 ECS インスタンスは Monstache をインストールするために使用されます。

    説明

    インストールした Monstache のバージョンが ApsaraDB for MongoDB インスタンスのバージョンおよび Elasticsearch クラスターのバージョンと互換性があることを確認する必要があります。 Monstache のバージョン互換性については、「Monstache バージョン」をご参照ください。

  2. 手順 2:Monstache 環境を構築する

    ECS インスタンスに Monstache をインストールします。 Monstache をインストールする前に、Go 環境変数を構成していることを確認してください。

  3. 手順 3:リアルタイムデータ同期タスクを構成する

    Monstache のデフォルト構成ファイルの情報を変更します。情報には、ApsaraDB for MongoDB インスタンスと Elasticsearch クラスターのエンドポイント、同期するコレクション、Elasticsearch クラスターのユーザー名とパスワードが含まれます。上記の情報を変更した後、Monstache を実行して、ApsaraDB for MongoDB データベースから Elasticsearch クラスターにデータをリアルタイムで同期します。

  4. 手順 4:データ同期結果を表示する

    ApsaraDB for MongoDB データベースのデータを追加、更新、または削除します。次に、データがリアルタイムで同期されているかどうかを確認します。

  5. 手順 5:Kibana コンソールでデータを分析し、分析結果を表示する

    Kibana コンソールで、データを分析し、分析結果を円グラフに表示します。

手順 1:準備を行う

  1. Elasticsearch クラスターを作成し、クラスターの自動インデックス作成機能を有効にします。

    詳細については、「Alibaba Cloud Elasticsearch クラスターを作成する」および「YML ファイルを構成する」をご参照ください。この例では、Standard Edition の Elasticsearch V6.7 クラスターを使用します。

  2. ApsaraDB for MongoDB インスタンスを作成し、テストデータを準備します。

    詳細については、「レプリカセットインスタンスのクイックスタート」をご参照ください。このトピックでは、V4.2 の ApsaraDB for MongoDB レプリカセットインスタンスを使用します。次の図は、テストデータの一部を示しています。 Test data

    重要

    使用する ApsaraDB for MongoDB インスタンスは、レプリカセットインスタンスまたはシャーディングクラスタインスタンスである必要があります。

  3. ECS インスタンスを作成します。

    詳細については、「ウィザードを使用してインスタンスを作成する」をご参照ください。 ECS インスタンスは Monstache をインストールするために使用されます。Linux オペレーティングシステムを実行し、Elasticsearch クラスターと同じ VPC に存在する必要があります。

手順 2:Monstache 環境を構築する

  1. ECS インスタンスに接続します。

    詳細については、「パスワードまたはキーを使用して Linux インスタンスに接続する」をご参照ください。

    説明

    この例では、共通ユーザーを使用します。

  2. SDK for Go をインストールし、環境変数を構成します。

    説明

    Monstache ベースのデータ同期は、Go プログラミング言語に依存しています。したがって、Monstache をインストールする前に、ECS インスタンスで Go 環境を準備する必要があります。

    1. SDK for Go のインストールパッケージをダウンロードして解凍します。

      wget https://dl.google.com/go/go1.14.4.linux-amd64.tar.gz
      tar -xzf go1.14.4.linux-amd64.tar.gz
    2. 環境変数を構成します。

      vim ~/.bash_profile コマンドを実行して、環境変数の構成ファイルを開きます。次に、次の内容をファイルに追加します。 GOPROXY は、Alibaba Cloud SDK for Go のモジュールのプロキシを指定します。

      export GOROOT=/home/test1/go
      export GOPATH=/home/go/
      export PATH=$PATH:$GOROOT/bin:$GOPATH/bin
      export GOPROXY=https://mirrors.aliyun.com/goproxy/
    3. 環境変数を適用します。

      source ~/.bash_profile
  3. Monstache をインストールします。

    1. GitHub リポジトリからインストールパッケージをダウンロードします。

      git clone https://github.com/rwynn/monstache.git
      説明

      git: command not found というエラーメッセージが表示された場合は、インストールパッケージをダウンロードする前に sudo yum install -y git コマンドを実行して GitHub リポジトリをインストールする必要があります。

    2. monstache ディレクトリに移動します。

      cd monstache
    3. バージョンを切り替えます。

      この例では、rel5 を使用します。

      git checkout rel5
    4. Monstache をインストールします。

      sudo go install
    5. Monstache のバージョンを表示します。

      monstache -v

      コマンドが正常に実行されると、次の結果が返されます。

      5.5.5

手順 3:リアルタイムデータ同期タスクを構成する

Monstache は、構成に TOML 形式を使用します。ほとんどの場合、Monstache はデフォルトポートを使用してオンプレミスホスト上の Elasticsearch クラスターと ApsaraDB for MongoDB インスタンスに接続し、ApsaraDB for MongoDB インスタンスのオペログを追跡します。 Monstache の実行中、ApsaraDB for MongoDB データベースのデータに対するすべての変更は Elasticsearch クラスターに同期されます。

この例では、ApsaraDB for MongoDB と Alibaba Cloud Elasticsearch を使用しており、同期するオブジェクトを指定する必要があります。したがって、Monstache のデフォルト構成ファイルを変更する必要があります。この例で同期されるオブジェクトは、mydb データベースの hotmovies コレクションと col コレクションです。構成ファイルを変更するには、次の手順を実行します。

  1. Monstache インストールディレクトリに移動し、構成ファイルを作成して、ファイルを編集します。

    cd monstache
    vim config.toml
  2. 構成ファイルを変更します。

    次のコードは、構成ファイルを変更する方法の例を示しています。詳細については、「Monstache の使用方法」をご参照ください。

    # 接続設定
    
    // 以下の URL を使用して MongoDB に接続します
    mongo-url = "mongodb://<your_mongodb_user>:<your_mongodb_password>@dds-bp1aadcc629******.mongodb.rds.aliyuncs.com:3717"
    // 次のノード URL で Elasticsearch REST API に接続します
    elasticsearch-urls = ["http://es-cn-mp91kzb8m00******.elasticsearch.aliyuncs.com:9200"]
    
    # 頻繁に必要な設定
    
    // 変更イベントをリッスンして同期するだけでなく、コレクションからインデックスをシードする必要がある場合
    // MongoDB から Elasticsearch にコレクション全体またはビューをコピーできます
    direct-read-namespaces = ["mydb.hotmovies","mydb.col"]
    
    // レガシーオペログテーリングの代わりに MongoDB 変更ストリームを使用する場合、change-stream-namespaces を使用します
    // 変更ストリームには、少なくとも MongoDB API 3.6 以降が必要です
    // MongoDB 4 以降がある場合、データベース全体またはデプロイメント全体の変更をリッスンできます
    // この場合、デプロイメントをターゲットにしない限り、通常は構成で正規表現を使用してコレクションをフィルタリングする必要はありません。
    // データベース全体をリッスンするには、データベース名のみを使用します。デプロイメントの場合は、空の文字列を使用します。
    //change-stream-namespaces = ["mydb.col"]
    
    # 追加設定
    
    // MongoDB のすべてのコレクションではなく、少数の変更のみをリッスンする場合
    // 例:mydb.mycollection からの挿入、更新、削除、およびドロップのみをリッスンする
    // この設定はコピーを開始しません。変更イベントリスナーのフィルターにすぎません
    //namespace-regex = '^mydb\.col$'
    // Elasticsearch へのリクエストを圧縮する
    //gzip = true
    // インデックス作成統計を生成する
    //stats = true
    // Elasticsearch にインデックス統計を作成する
    //index-stats = true
    // MongoDB への接続に次の PEM ファイルを使用する
    //mongo-pem-file = "/path/to/mongoCert.pem"
    // PEM 検証を無効にする
    //mongo-validate-pem-file = false
    // Elasticsearch ベーシック認証に次のユーザー名を使用する
    elasticsearch-user = "elastic"
    // Elasticsearch ベーシック認証に次のパスワードを使用する
    elasticsearch-password = "<your_es_password>"
    // 4 つの Go ルーチンを同時に使用して Elasticsearch にドキュメントをプッシュする
    elasticsearch-max-conns = 4
    // Elasticsearch への接続に次の PEM ファイルを使用する
    //elasticsearch-pem-file = "/path/to/elasticCert.pem"
    // Elasticsearch への接続を検証する
    //elastic-validate-pem-file = true
    // MongoDB のドロップされたコレクションを Elasticsearch のインデックス削除として伝播する
    dropped-collections = true
    // MongoDB のドロップされたデータベースを Elasticsearch のインデックス削除として伝播する
    dropped-databases = true
    // MongoDB オペログの先頭から処理を開始しない
    // replay を true に設定すると、以前に同期していた場合、ログにバージョン競合メッセージが表示されることがあります。これは、Elasticsearch に既に存在する古いドキュメントを新しいバージョンで再生していることを意味します。 Elasticsearch は、古いドキュメントが新しいドキュメントを上書きするのを防ぎます。
    //replay = false
    // 前回の実行で保存されたタイムスタンプから処理を再開する
    resume = true
    // 進捗タイムスタンプが保存されていることを検証しない
    //resume-write-unsafe = false
    // 再開状態が保存される名前をオーバーライドする
    //resume-name = "default"
    // デフォルト戦略(タイムスタンプ)の代わりにカスタム再開戦略(トークン)を使用する
    // トークンは MongoDB API 3.6 以降で動作しますが、タイムスタンプは MongoDB API 4.0 以降でのみ動作します
    resume-strategy = 0
    // 名前空間が次のパターンに一致するドキュメントを除外する
    //namespace-exclude-regex = '^mydb\.ignorecollection$'
    // GridFS ファイルコンテンツのインデックス作成をオンにする
    //index-files = true
    // GridFS コンテンツの検索結果の強調表示をオンにする
    //file-highlighting = true
    // 次のコレクションに挿入された GridFS ファイルのインデックスを作成する
    //file-namespaces = ["users.fs.files"]
    // リクエストトレースを含む詳細情報を表示する
    verbose = true
    // クラスタリングモードを有効にする
    cluster-name = 'es-cn-mp91kzb8m00******'
    // フル同期後に終了せず、オペログのテーリングを続行する
    //exit-after-direct-reads = false
    [[mapping]]
    namespace = "mydb.hotmovies"
    index = "hotmovies"
    type = "movies"
    
    [[mapping]]
    namespace = "mydb.col"
    index = "mydbcol"
    type = "collection"

    パラメーター

    説明

    mongo-url

    ApsaraDB for MongoDB インスタンスのプライマリノードの接続文字列。接続文字列は、ApsaraDB for MongoDB コンソールの ApsaraDB for MongoDB インスタンスの詳細ページから取得できます。 <your_mongodb_user> は、ApsaraDB for MongoDB インスタンスでアクセスする ApsaraDB for MongoDB データベースのユーザー名を指定します。 <your_mongodb_password> は、ApsaraDB for MongoDB データベースのパスワードを指定します。

    ApsaraDB for MongoDB インスタンスのプライマリノードの接続文字列を取得する前に、Monstache がインストールされている ECS インスタンスのプライベート IP アドレスを ApsaraDB for MongoDB インスタンスの IP アドレスホワイトリストに追加します。詳細については、「シャーディングクラスタインスタンスのホワイトリストを構成する」をご参照ください。

    elasticsearch-urls

    Elasticsearch クラスターにアクセスするために使用される URL。http://<Elasticsearch クラスターの内部エンドポイント>:9200 の形式で URL を指定します。エンドポイントは、Elasticsearch コンソールの Elasticsearch クラスターの基本情報ページで取得できます。詳細については、「クラスターの基本情報を表示する」をご参照ください。

    direct-read-namespaces

    データを同期するコレクション。詳細については、「direct-read-namespaces」をご参照ください。この例では、mydb データベースの hotmovies コレクションと col コレクションからデータが同期されます。

    change-stream-namespaces

    ApsaraDB for MongoDB の変更ストリーム機能を使用する場合は、このパラメーターを構成する必要があります。このパラメーターを構成すると、オペログトラッキングは無効になります。詳細については、「change-stream-namespaces」をご参照ください。

    namespace-regex

    監視するコレクションを指定するために使用される正規表現。正規表現を指定すると、システムは正規表現に一致するコレクションのデータの変更を監視できます。

    elasticsearch-user

    Elasticsearch クラスターにアクセスするために使用されるユーザー名。デフォルトのユーザー名は elastic です。

    重要

    システムのセキュリティを確保するために、実際のビジネスシナリオでは elastic アカウントを使用しないことをお勧めします。カスタムアカウントを使用できます。カスタムアカウントを使用する前に、アカウントのロールを作成し、ロールに必要な権限を付与してから、アカウントにロールを割り当てる必要があります。詳細については、「Elasticsearch X-Pack が提供する RBAC メカニズムを使用してアクセス制御を実装する」をご参照ください。

    elasticsearch-password

    Elasticsearch クラスターにアクセスするために使用されるパスワード。elastic ユーザー名に対応するパスワードは、Elasticsearch クラスターの作成時に指定されます。パスワードを忘れた場合は、リセットできます。パスワードをリセットする場合の注意事項と手順については、「Elasticsearch クラスターのアクセスパスワードをリセットする」をご参照ください。

    elasticsearch-max-conns

    Elasticsearch クラスターに接続するために使用されるスレッドの数。デフォルト値は 4 です。この値は、4 つの Go スレッドが同時に使用されて Elasticsearch クラスターにデータが同期されることを示します。

    dropped-collections

    ApsaraDB for MongoDB データベースのコレクションが削除された場合に、Elasticsearch クラスター内のマップされたインデックスを削除するかどうかを指定します。デフォルト値は true で、ApsaraDB for MongoDB データベースのコレクションが削除された場合に、Elasticsearch クラスター内のマップされたインデックスを削除することを示します。

    dropped-databases

    ApsaraDB for MongoDB データベースが削除された場合に、Elasticsearch クラスター内のマップされたインデックスを削除するかどうかを指定します。デフォルト値は true で、ApsaraDB for MongoDB データベースが削除された場合に、Elasticsearch クラスター内のマップされたインデックスを削除することを示します。

    resume

    Monstache が Elasticsearch クラスターに同期された操作のタイムスタンプを monstache.monstache コレクションに書き込むことを許可するかどうかを指定します。デフォルト値は false です。このパラメーターを true に設定すると、Monstache は Elasticsearch クラスターに同期された操作のタイムスタンプを monstache.monstache コレクションに書き込みます。 Monstache が失敗した場合、タイムスタンプを使用して同期タスクを再開できます。これにより、データの損失を防ぎます。 Monstache が cluster-name パラメーターを構成して起動すると、resume パラメーターは自動的に true に設定されます。詳細については、「resume」をご参照ください。

    resume-strategy

    再開ポリシー。このパラメーターは、resume パラメーターが true に設定されている場合にのみ有効です。詳細については、「resume-strategy」をご参照ください。

    verbose

    デバッグログを有効にするかどうかを指定します。デフォルト値は false で、デバッグログが無効になっていることを示します。

    cluster-name

    Elasticsearch クラスターの名前。このパラメーターを構成すると、Monstache は高可用性モードで実行されます。同じクラスター名を持つプロセスは互いに連携します。詳細については、「cluster-name」をご参照ください。

    mapping

    Elasticsearch クラスターのインデックスのマッピング。デフォルトでは、ApsaraDB for MongoDB データベースから Elasticsearch クラスターにデータが同期されると、インデックス名は データベース名.コレクション名 形式のコレクション名に自動的にマップされます。このパラメーターを構成することで、インデックス名を変更できます。詳細については、「インデックスマッピング」をご参照ください。

    説明

    Monstache は多数のパラメーターをサポートしています。上記の表では、リアルタイムデータ同期に使用される一部のパラメーターのみについて説明しています。複雑なデータ同期に使用されるパラメーターの構成方法については、「Monstache config」および「詳細設定」をご参照ください。

  3. Monstache を実行します。

    monstache -f config.toml
    説明

    -f パラメーターは、Monstache を明示的に実行するために使用されます。この場合、システムは Elasticsearch クラスターのリクエストトラッキングを含むすべてのデバッグ操作をログに記録します。

手順 4:データ同期結果を表示する

  1. ApsaraDB for MongoDB インスタンスのデータ管理(DMS)コンソールと Elasticsearch クラスターの Kibana コンソールにログオンします。同期前の ApsaraDB for MongoDB データベースのドキュメント数と同期後の Elasticsearch クラスターのドキュメント数をクエリします。

    説明
    • ApsaraDB for MongoDB インスタンス

      db.hotmovies.find().count()

      コマンドが正常に実行されると、次の結果が返されます。

      [
      10000
      ]
    • Elasticsearch クラスター

      GET hotmovies/_count

      コマンドが正常に実行されると、次の結果が返されます。結果は、同期前の ApsaraDB for MongoDB データベースのドキュメント数と同期後の Elasticsearch クラスターのドキュメント数が 10,000 であることを示しています。

      {
        "count" : 10000,
        "_shards" : {
          "total" : 5,
          "successful" : 5,
          "skipped" : 0,
          "failed" : 0
        }
      }
  2. ApsaraDB for MongoDB データベースにデータを挿入し、データが Elasticsearch クラスターに同期されているかどうかを確認します。

    • ApsaraDB for MongoDB インスタンス

      db.hotmovies.insert({id: 11003,title: "Beauty",overview: "How a group of IT women with high IQ become outstanding",original_language:"cn",release_date:"2020-06-17",popularity:67.654,vote_count:65487,vote_average:9.9})
      db.hotmovies.insert({id: 11004,title: "Heroic Programmers",overview: "How a group of IT men with high IQ become outstanding",original_language:"cn",release_date:"2020-06-15",popularity:77.654,vote_count:85487,vote_average:11.9})
    • Elasticsearch クラスター

      GET hotmovies/_search
      {
        "query": {
          "bool": {
            "should": [
              {"term":{"id":"11003"}},
              null
            ]
          }
        }
      }

      コマンドが正常に実行されると、図に示されている結果が返されます。Insert data

  3. ApsaraDB for MongoDB データベースのデータを更新します。次に、Elasticsearch クラスターのデータも更新されているかどうかを確認します。

    • ApsaraDB for MongoDB インスタンス

      db.hotmovies.update({'title':'Beauty'},{$set:{'title':'Beautiful Programmers'}})
    • Elasticsearch クラスター

      GET hotmovies/_search
      {
        "query": {
          "match": {
            "id":"11003"
          }
        }
      }

      コマンドが正常に実行されると、図に示されている結果が返されます。Update data

  4. ApsaraDB for MongoDB データベースからデータを削除します。次に、Elasticsearch クラスターからもデータが削除されているかどうかを確認します。

    • ApsaraDB for MongoDB インスタンス

      db.hotmovies.remove({id: 11003})
      db.hotmovies.remove({id: 11004})
    • Elasticsearch クラスター

      GET hotmovies/_search
      {
        "query": {
          "bool": {
            "should": [
              {"term":{"id":"11003"}},
              null
            ]
          }
        }
      }

      コマンドが正常に実行されると、図に示されている結果が返されます。Remove data

手順 5:Kibana コンソールでデータを分析し、分析結果を表示する

  1. Elasticsearch クラスターの Kibana コンソールにログオンし、指示に従って Kibana コンソールのホームページに移動します。

    Kibana コンソールへのログオン方法の詳細については、「Kibana コンソールにログオンする」をご参照ください。

    説明

    この例では、Elasticsearch V6.7.0 クラスターを使用します。他のバージョンのクラスターでの操作は異なる場合があります。コンソールでの実際の操作が優先されます。

  2. インデックスパターンを作成します。

    Create an index pattern

    1. 左側のナビゲーションペインで、[管理] をクリックします。
    2. [kibana] セクションで、[インデックスパターン] をクリックします。
    3. [インデックスパターンの作成] をクリックします。
    4. [インデックスパターン] を指定し、[次の手順] をクリックします。
    5. [タイムフィルターフィールド名] を構成します。この例では、タイムフィルターフィールド名は [タイムフィルターを使用しない] に設定されています。

    6. [インデックスパターンの作成] をクリックします。
  3. チャートを構成します。

    次の例は、人気映画トップ 10 の円グラフを構成する方法を示しています。

    1. 左側のナビゲーションペインで、[ビジュアライズ] をクリックします。

    2. 検索ボックスの横にある [+] をクリックします。

    3. [新しいビジュアライゼーション] ダイアログボックスで、[円] をクリックします。

      Create a pie chart

    4. [hotmovies] インデックスパターンをクリックします。

      Click the index pattern

    5. 次の図に基づいて、[メトリック] セクションと [バケット] セクションのパラメーターを構成します。

      Pie chart configuration

    6. Run icon アイコンをクリックして構成を適用し、データを表示します。

      Display results in a pie chart

FAQ

  • 問題

    Elasticsearch クラスターの高可用性と高並列機能を構成した後、データ損失が発生します。どうすればよいですか?

  • 解決策

    Elasticsearch クラスターが正常な状態かどうかを確認します。

    • Elasticsearch クラスターが正常な状態である場合は、Monstache が想定どおりに動作するかどうかを確認します。詳細については、「オープンソース Monstache」の関連ドキュメントをご参照ください。

    • Elasticsearch クラスターが異常な状態である場合は、「FAQ」を参照して、Elasticsearch クラスターで発生している問題を特定し、トラブルシューティングを行ってください。 Elasticsearch クラスターの並列スレッド数を減らし、データ損失の問題が解決するかどうかを確認します。