すべてのプロダクト
Search
ドキュメントセンター

Elasticsearch:Logstash を使用して PolarDB-X 1.0 から Elasticsearch にデータをリアルタイムで同期する

最終更新日:Jan 11, 2025

ビジネスデータが PolarDB-X 1.0 に保存されており、そのデータに対して全文検索やセマンティック分析を実行する場合、Alibaba Cloud Elasticsearch と Alibaba Cloud Logstash を使用できます。このトピックでは、Alibaba Cloud Logstash を使用して PolarDB-X 1.0 から Alibaba Cloud Elasticsearch にデータをリアルタイムで同期する方法について説明します。

背景情報

Alibaba Cloud Logstash は、データの収集、変換、最適化、および生成に使用できる強力なデータ処理ツールです。 Alibaba Cloud Logstash は logstash-input-jdbc プラグインを提供します。このプラグインは、Logstash クラスターにデフォルトでインストールされており、削除することはできません。このプラグインは、PolarDB-X 1.0 の複数のデータレコードを一度にクエリし、それらのデータレコードを Alibaba Cloud Elasticsearch に同期できます。このプラグインは、ラウンドロビン方式を使用して、PolarDB-X 1.0 に最近挿入または更新されたデータレコードを定期的に識別し、それらのデータレコードを Alibaba Cloud Elasticsearch に同期します。完全なデータを同期し、数秒の遅延を許容できる場合、または特定のデータを一度にクエリして同期する場合は、Alibaba Cloud Logstash を使用できます。

前提条件

PolarDB-X 1.0 インスタンス、Alibaba Cloud Elasticsearch クラスター、および Alibaba Cloud Logstash クラスターが作成されています。さらに、PolarDB-X 1.0 インスタンスにデータベースが作成されています。 PolarDB-X 1.0 インスタンス、Elasticsearch クラスター、および Logstash クラスターは、同じ仮想プライベートクラウド(VPC)内に作成することをお勧めします。

  • PolarDB-X 1.0 インスタンスを作成し、そのインスタンスにデータベースを作成する方法の詳細については、「PolarDB-X 1.0 インスタンスの作成」をご参照ください。

  • Alibaba Cloud Elasticsearch クラスターを作成する方法の詳細については、「Alibaba Cloud Elasticsearch クラスターの作成」をご参照ください。この例では、Standard Edition の Elasticsearch V6.7 クラスターが作成されます。

  • Alibaba Cloud Logstash クラスターを作成する方法の詳細については、「Alibaba Cloud Logstash クラスターの作成」をご参照ください。

    説明

    Logstash を使用してインターネットからデータを収集したり、収集したデータをインターネットに転送したりする場合は、ネットワークアドレス変換(NAT)ゲートウェイを設定し、そのゲートウェイを使用して Logstash クラスターをインターネットに接続する必要があります。詳細については、「インターネット経由のデータ転送のための NAT ゲートウェイの設定」をご参照ください。

制限事項

  • Elasticsearch クラスターの _id フィールドの値は、PolarDB-X 1.0 データベースの id フィールドの値と同じである必要があります。

    この条件により、データ同期タスクは、PolarDB-X 1.0 データベースのデータレコードと Elasticsearch クラスターのドキュメント間のマッピングを確立できます。 PolarDB-X 1.0 データベースのデータレコードを更新すると、データ同期タスクは更新されたデータレコードを使用して、Elasticsearch クラスター内で同じ ID を持つドキュメントを上書きします。

    説明

    本質的に、Elasticsearch の更新操作は、元のドキュメントを削除し、新しいドキュメントをインデックス付けします。したがって、上書き操作は、データ同期タスクによって実行される 更新操作 と同じくらい効率的です。

  • PolarDB-X 1.0 データベースにデータレコードを挿入または更新する場合、データレコードには、データレコードが挿入または更新された時刻を示すフィールドが含まれている必要があります。

    logstash-input-jdbc プラグインは、ラウンドロビンを実行するたびに、ラウンドロビン内の最後のデータレコードが PolarDB-X 1.0 データベースに挿入または更新された時刻を記録します。 Logstash は、次の要件を満たすデータレコードのみを PolarDB-X 1.0 データベースから同期します。データレコードが PolarDB-X 1.0 データベースに挿入または更新された時刻が、前のラウンドロビン内の最後のデータレコードが PolarDB-X 1.0 データベースに挿入または更新された時刻よりも後であること。

    重要

    PolarDB-X 1.0 データベースのデータレコードを削除した場合、logstash-input-jdbc プラグインは、同じ ID を持つドキュメントを Elasticsearch クラスターから削除できません。 Elasticsearch クラスターからドキュメントを削除するには、Elasticsearch クラスターで関連コマンドを実行する必要があります。

手順

手順 1:準備を行う

  1. PolarDB-X 1.0 データベースにテーブルを作成し、テーブルにテストデータを準備します。

    この例では、次のステートメントを使用してテーブルを作成します。

    CREATE table food(
      id int PRIMARY key AUTO_INCREMENT,
      name VARCHAR (32),
      insert_time DATETIME,
      update_time DATETIME
    );

    次のステートメントを使用して、テーブルにデータを挿入します。

    INSERT INTO food values(null,'Chocolates',now(),now());
    INSERT INTO food values(null,'Yogurt',now(),now());
    INSERT INTO food values(null,'Ham sausage',now(),now());
    // チョコレート、ヨーグルト、ハムソーセージを挿入
  2. Elasticsearch クラスターの自動インデックス作成機能を有効にします。詳細については、「Elasticsearch クラスターへのアクセスと設定」をご参照ください。

  3. Logstash クラスターで、PolarDB-X 1.0 データベースのバージョンと互換性のあるバージョンの SQL JDBC ドライバーをアップロードします。詳細については、「サードパーティライブラリの構成」をご参照ください。この例では、mysql-connector-java-5.1.35 ドライバーを使用します。

    説明

    この例では、MySQL JDBC ドライバーを使用して PolarDB-X 1.0 データベースに接続します。 PolarDB JDBC ドライバーを使用して PolarDB-X 1.0 データベースに接続することもできます。ただし、PolarDB-X 2.0 データベースの場合、PolarDB JDBC ドライバーは機能しない可能性があります。 MySQL JDBC ドライバーを使用することをお勧めします。

  4. Elasticsearch コンソールの Logstash クラスターの [基本情報] ページで、Logstash クラスター内のノードの IP アドレスを取得します。次に、PolarDB-X 1.0 インスタンスの IP アドレスホワイトリストに IP アドレスを追加します。詳細については、「IP アドレスホワイトリストの設定」をご参照ください。

手順 2:Logstash パイプラインを設定する

  1. Alibaba Cloud Elasticsearch コンソールの [Logstash クラスター] ページ に移動します。

  2. 目的のクラスターに移動します。

    1. 上部のナビゲーションバーで、クラスターが存在するリージョンを選択します。

    2. [logstash クラスター] ページで、クラスターを見つけて ID をクリックします。

  3. 左側のナビゲーションペインで、[パイプライン] をクリックします。
  4. [パイプライン] ページで、[パイプラインの作成] をクリックします。

  5. [タスクの作成] ページで、[パイプライン ID] フィールドにパイプライン ID を入力し、[設定の設定] フィールドに必要な設定を入力します。

    この例では、[設定の設定] フィールドに次の設定を入力します。

    input {
      jdbc {
        jdbc_driver_class => "com.mysql.jdbc.Driver"
        jdbc_driver_library => "/ssd/1/share/<Logstash cluster ID>/logstash/current/config/custom/mysql-connector-java-5.1.35.jar"
        jdbc_connection_string => "jdbc:mysql://drdshbga51x6****.drds.aliyuncs.com:3306/<Database name>?useUnicode=true&characterEncoding=utf-8&useSSL=false&allowLoadLocalInfile=false&autoDeserialize=false"
        jdbc_user => "db_user"
        jdbc_password => "db_password"
        jdbc_paging_enabled => "true"
        jdbc_page_size => "50000"
        statement => "select * from food where update_time >= :sql_last_value" // 更新時間が最後の値以上の場合のfoodテーブルからすべての列を選択
        schedule => "* * * * *" // 毎分実行
        record_last_run => true
        last_run_metadata_path => "/ssd/1/<Logstash cluster ID>/logstash/data/last_run_metadata_update_time.txt"
        clean_run => false
        tracking_column_type => "timestamp"
        use_column_value => true
        tracking_column => "update_time"
      }
    }
    filter {
    
    }
    output {
      elasticsearch {
        hosts => "http://es-cn-n6w1o1x0w001c****.elasticsearch.aliyuncs.com:9200"
        user => "elastic"
        password => "es_password"
        index => "drds_test"
        document_id => "%{id}"
      }
    }
    説明

    上記のコードの <Logstash cluster ID> は、使用する Logstash クラスターの ID に置き換える必要があります。 ID の取得方法の詳細については、「[Logstash クラスター] ページの概要」をご参照ください。

    表 1. 入力部分のパラメーター

    パラメーター

    説明

    jdbc_driver_class

    JDBC ドライバーのクラス。

    jdbc_driver_library

    JDBC ドライバーファイルのパス。詳細については、「サードパーティライブラリの構成」をご参照ください。

    jdbc_connection_string

    PolarDB-X 1.0 データベースへの接続に使用する JDBC 接続文字列。 JDBC 接続文字列には、PolarDB-X 1.0 データベースのエンドポイント、ポート番号、および名前が含まれます。

    jdbc_user

    PolarDB-X 1.0 データベースへのアクセスに使用するユーザー名。

    jdbc_password

    PolarDB-X 1.0 データベースへのアクセスに使用するパスワード。

    jdbc_paging_enabled

    ページングを有効にするかどうかを指定します。デフォルト値:false。

    jdbc_page_size

    1 ページあたりのエントリ数。

    statement

    SQL ステートメント。

    schedule

    SQL ステートメントを実行する間隔。値 * * * * * は、SQL ステートメントが毎分実行されることを示します。

    record_last_run

    最後の実行結果を記録するかどうかを指定します。このパラメーターを true に設定すると、最後の実行結果の tracking_column の値が、last_run_metadata_path パラメーターを使用して指定されたパスにあるファイルに保存されます。

    last_run_metadata_path

    最後の実行時間が含まれるファイルのパス。ファイルパスはバックエンドで提供されます。パスは /ssd/1/<Logstash cluster ID>/logstash/data/ 形式です。パスを指定すると、Logstash はパスにファイルを自動的に生成しますが、ファイル内のデータを表示することはできません。

    clean_run

    last_run_metadata_path パラメーターで指定されたパスをクリアするかどうかを指定します。デフォルト値:false。このパラメーターを true に設定すると、各クエリはデータベースの最初のエントリから開始されます。

    use_column_value

    特定の列の値を記録するかどうかを指定します。

    tracking_column_type

    値を追跡する列のタイプ。デフォルト値:numeric。

    tracking_column

    値を追跡する列。値は昇順にソートする必要があります。ほとんどの場合、この列はテーブルの主キーです。

    表 2. 出力部分のパラメーター

    パラメーター

    説明

    hosts

    Elasticsearch クラスターへのアクセスに使用する URL。 URL は http://<Elasticsearch クラスターの内部エンドポイント>:9200 の形式で指定します。内部エンドポイントは、クラスターの [基本情報] ページから取得できます。詳細については、「クラスターの基本情報の表示」をご参照ください。

    user

    Elasticsearch クラスターへのアクセスに使用するユーザー名。デフォルトのユーザー名は elastic です。

    password

    elastic アカウントのパスワード。 elastic アカウントのパスワードは、Elasticsearch クラスターの作成時に指定します。パスワードを忘れた場合は、リセットできます。パスワードをリセットする手順と注意事項の詳細については、「Elasticsearch クラスターのアクセスパスワードのリセット」をご参照ください。

    index

    Elasticsearch クラスターのインデックスの名前。

    document_id

    Elasticsearch クラスターのドキュメントの ID。このパラメーターを %{id} に設定します。これは、ドキュメントの ID が PolarDB-X 1.0 データベースのデータレコードの ID と同じであることを示します。

    重要
    • 上記の設定はテストデータに基づいています。ビジネス要件に基づいてパイプラインを設定できます。入力プラグインでサポートされているその他のパラメーターの詳細については、「Logstash Jdbc input plugin」をご参照ください。

    • 設定に last_run_metadata_path のようなパラメーターが含まれている場合、ファイルパスは Alibaba Cloud Logstash によって提供される必要があります。 /ssd/1/<Logstash cluster ID>/logstash/data/ 形式のパスがバックエンドで提供され、テストに使用できます。システムはこのパスのデータを削除しません。このパスを使用する場合は、ディスクに十分なストレージ容量があることを確認してください。パスを指定すると、Logstash はパスにファイルを自動的に生成しますが、ファイル内のデータを表示することはできません。

    • セキュリティのために、パイプラインを設定するときに JDBC ドライバーを指定する場合は、allowLoadLocalInfile=false&autoDeserialize=falsejdbc_connection_string パラメーターの最後に追加する必要があります。例:jdbc_connection_string => "jdbc:mysql://drdshbga51x6****.drds.aliyuncs.com:3306/<Database name>?allowLoadLocalInfile=false&autoDeserialize=false"。そうしないと、Logstash パイプラインの設定ファイルを追加するときに、チェックの失敗を示すエラーメッセージが表示されます。

    [設定の設定] フィールドのパラメーターの設定方法の詳細については、「Logstash 設定ファイル」をご参照ください。

  6. [次へ] をクリックして、パイプラインパラメーターを設定します。

    管道参数配置

    パラメーター

    説明

    パイプラインワーカー

    パイプラインのフィルタープラグインと出力プラグインを並列で実行するワーカースレッドの数。イベントのバックログが存在する場合、または一部の CPU リソースが使用されていない場合は、CPU 使用率を最大化するためにスレッド数を増やすことをお勧めします。このパラメーターのデフォルト値は vCPU の数です。

    パイプラインバッチサイズ

    単一のワーカースレッドがフィルタープラグインと出力プラグインの実行を試みる前に、入力プラグインから収集できるイベントの最大数。このパラメーターを大きな値に設定すると、単一のワーカースレッドはより多くのイベントを収集できますが、より多くのメモリを消費します。ワーカースレッドがより多くのイベントを収集するのに十分なメモリを確保するには、LS_HEAP_SIZE 変数を指定して Java 仮想マシン(JVM)ヒープサイズを増やします。デフォルト値:125。

    パイプラインバッチ遅延

    イベントの待機時間。この時間は、小さなバッチをパイプラインワーカースレッドに割り当てる前、およびパイプラインイベントのバッチタスクを作成した後に発生します。デフォルト値:50。単位:ミリ秒。

    キュータイプ

    イベントをバッファリングするための内部キューモデル。有効な値:

    • メモリ:従来のメモリベースのキュー。これがデフォルト値です。

    • 永続化:ディスクベースの ACKed キュー。永続キューです。

    キュー最大バイト数

    値はディスクの合計容量よりも小さくなければなりません。デフォルト値:1024。単位:MB。

    キューチェックポイント書き込み

    永続キューが有効になっている場合に、チェックポイントが適用される前に書き込まれるイベントの最大数。値 0 は制限がないことを示します。デフォルト値:1024。

    警告

    パラメーターを設定した後、設定を保存してパイプラインをデプロイする必要があります。これにより、Logstash クラスターが再起動されます。続行する前に、再起動がビジネスに影響を与えないことを確認してください。

  7. [保存] または [保存してデプロイ] をクリックします。

    • [保存]:このボタンをクリックすると、システムはパイプライン設定を保存し、クラスターの変更をトリガーします。ただし、設定は有効になりません。[保存] をクリックした後、[パイプライン] ページが表示されます。 [パイプライン] ページで、作成したパイプラインを見つけて、[アクション] 列の [今すぐデプロイ] をクリックします。その後、システムは Logstash クラスターを再起動して、設定を有効にします。

    • [保存してデプロイ]:このボタンをクリックすると、システムは Logstash クラスターを再起動して、設定を有効にします。

手順 3:結果を確認する

  1. Elasticsearch クラスターの Kibana コンソールにログインします。

    詳細については、「Kibana コンソールへのログイン」をご参照ください。

  2. 左側のナビゲーションペインで、[開発ツール] をクリックします。

  3. 表示されたページの [コンソール] タブで、次のコマンドを実行して、同期されたデータが格納されているインデックスの数を確認します。

    GET drds_test/_count
    {
      "query": {"match_all": {}}
    }

    コマンドが正常に実行されると、次の結果が返されます。

    {
      "count" : 3,
      "_shards" : {
        "total" : 1,
        "successful" : 1,
        "skipped" : 0,
        "failed" : 0
      }
    }
  4. テーブルのデータを更新し、テーブルにデータを挿入します。

    UPDATE food SET name='Chocolates',update_time=now() where id = 1; // id が 1 のレコードの名前を Chocolates に更新し、更新時間を現在時刻に設定
    INSERT INTO food values(null,'Egg',now(),now()); // 卵を挿入
  5. 更新および挿入されたデータを表示します。

    • name の値が Chocolates であるデータレコードをクエリします。

      GET drds_test/_search
      {
        "query": {
          "match": {
            "name": "Chocolates"
         }}
      }

      コマンドが正常に実行されると、次の図に示す結果が返されます。Returned result

    • すべてのデータをクエリします。

      GET drds_test/_search
      {
        "query": {
          "match_all": {}
        }
      }

      コマンドが正常に実行されると、次の図に示す結果が返されます。Returned result

FAQ

Logstash を使用したデータ転送に関する FAQ