すべてのプロダクト
Search
ドキュメントセンター

Elasticsearch:PolarDB for MySQL から Elasticsearch へのデータ同期

最終更新日:Mar 27, 2026

PolarDB for MySQL の遅いクエリがアプリケーションのパフォーマンスに影響を与える場合、Data Transmission Service (DTS) を使用して本番データを Alibaba Cloud Elasticsearch クラスターにリアルタイムでレプリケートできます。Elasticsearch にデータが取り込まれた後は、ソースデータベースに負荷をかけることなく、高速な全文検索および分析を実行できます。

仕組み

DTS は PolarDB for MySQL のバイナリログ (binlog) から行レベルの変更をキャプチャし、Elasticsearch にストリーミングします。同期は次の 2 つのフェーズで順次実行されます。

フェーズ トリガー DTS の動作
完全データ同期 タスク開始時 ソーステーブルのスナップショットを読み取り、既存のすべての行を Elasticsearch にインデックスします。
増分データ同期 完全同期完了後 バイナリログからの INSERT、DELETE、UPDATE 操作をリアルタイムで再生します。

DTS は増分同期にバイナリログに依存するため、タスク開始前に PolarDB クラスターでバイナリロギングを有効にしておく必要があります。DTS は DDL 操作(例:DROP COLUMN や RENAME TABLE)をキャプチャしません。キャプチャされるのは DML 操作(INSERT、DELETE、UPDATE)のみです。この違いは重要です。同期中にソーステーブルに対して DDL 操作を実行すると、Elasticsearch インデックスが同期外れとなり、手動での回復が必要になります。回復手順については、「制限事項」をご参照ください。

前提条件

作業を開始する前に、以下の要件を満たしていることを確認してください。

  • バイナリロギングが有効化された PolarDB for MySQL 8.0.1 Enterprise Edition クラスター

  • Auto Indexing が有効化された Alibaba Cloud Elasticsearch V7.10 クラスター

  • 新しい DTS コンソールへのアクセス権

制限事項

制限事項 影響 回復方法
DDL 操作はレプリケートされない タスク実行中にソーステーブルに対して DDL 文(カラムの削除や名前変更など)を実行すると、Elasticsearch インデックスが同期外れとなります。これにより、送信先でデータ損失やクエリエラーが発生する可能性があります。 同期タスクから該当テーブルを削除し、Elasticsearch から対応するインデックスを削除した後、再度テーブルを追加します。「データ同期タスクからオブジェクトを削除する」および「データ同期タスクにオブジェクトを追加する」をご参照ください。
カラムの追加には追加手順が必要 カラムの追加は DDL 操作です。Elasticsearch のマッピングを事前に更新せずに ALTER TABLE を実行すると、新しいフィールドがインデックスに表示されず、送信先で不完全なデータとなる可能性があります。 まず Elasticsearch のインデックスマッピングを更新して新しいフィールドを含めます。その後、ソースで ALTER TABLE を実行します。DDL 完了後、同期タスクを一時停止して再開します。
完全同期はソースおよび送信先の負荷を増加させる DTS は初期の完全同期中に PolarDB から読み取り、同時に Elasticsearch に書き込みます。これにより、両方のリソースがすでに高負荷状態にある場合、アプリケーションのパフォーマンスが低下する可能性があります。 ソースおよび送信先の CPU 使用率が 30 % 未満となる非ピーク時間帯に完全同期を実行してください。ピーク時間帯に完全同期が失敗した場合は、タスクを再起動します。ピーク時間帯中の増分同期はレプリケーションラグを引き起こす可能性がありますが、失敗することはありません。

同期の設定と実行

この例では、ソースとして PolarDB for MySQL 8.0.1 Enterprise Edition クラスターを、送信先として Alibaba Cloud Elasticsearch V7.10 クラスターを使用します。

ステップ 1:ソースデータベースの準備

  1. PolarDB for MySQL 8.0.1 Enterprise Edition クラスターを作成します。詳細については、「Enterprise Edition クラスターの購入」をご参照ください。

  2. クラスターでバイナリロギングを有効にします。増分同期にはバイナリロギングが必要です。DTS はバイナリログを読み取って INSERT、DELETE、UPDATE の変更をリアルタイムでキャプチャします。詳細については、「バイナリロギングの有効化」をご参照ください。

    image

  3. test_polardb という名前のデータベースアカウントおよびデータベースを作成します。詳細については、「データベースアカウントの作成と管理」および「データベース管理操作」をご参照ください。

  4. test_polardb データベース内に product という名前のテーブルを作成します。

    CREATE TABLE `product` (
        `id` bigint(32) NOT NULL AUTO_INCREMENT,
        `name` varchar(32) NULL,
        `price` varchar(32) NULL,
        `code` varchar(32) NULL,
        `color` varchar(32) NULL,
        PRIMARY KEY (`id`)
    ) ENGINE=InnoDB
    DEFAULT CHARACTER SET=utf8;
  5. テストデータをテーブルに挿入します。

    INSERT INTO `test_polardb`.`product` (`id`,`name`,`price`,`code`,`color`) VALUES (1,'mobile phone A','2000','amp','golden');
    INSERT INTO `test_polardb`.`product` (`id`,`name`,`price`,`code`,`color`) VALUES (2,'mobile phone B','2200','bmp','white');
    INSERT INTO `test_polardb`.`product` (`id`,`name`,`price`,`code`,`color`) VALUES (3,'mobile phone C','2600','cmp','black');
    INSERT INTO `test_polardb`.`product` (`id`,`name`,`price`,`code`,`color`) VALUES (4,'mobile phone D','2700','dmp','red');
    INSERT INTO `test_polardb`.`product` (`id`,`name`,`price`,`code`,`color`) VALUES (5,'mobile phone E','2800','emp','silvery');

ステップ 2:送信先 Elasticsearch クラスターの準備

  1. Alibaba Cloud Elasticsearch V7.10 クラスターを作成します。詳細については、「Alibaba Cloud Elasticsearch クラスターの作成」をご参照ください。

  2. クラスターで Auto Indexing を有効にします。DTS はデータ書き込み時に自動的に Elasticsearch インデックスを作成します。この機能を利用するには Auto Indexing を有効にする必要があります。詳細については、「YML ファイルの構成」をご参照ください。

    image

ステップ 3:DTS 同期タスクの作成

  1. 新しい DTS コンソールのデータ同期ページに移動します。

  2. [タスクの作成] をクリックします。

  3. タスクを構成します。各パラメーターの詳細については、「PolarDB for MySQL クラスターからのデータ同期」をご参照ください。

    1. ソース(PolarDB for MySQL クラスター)および送信先(Elasticsearch クラスター)を構成します。主なパラメーターは以下のとおりです。

      パラメーター 設定場所 入力内容
      ソースタイプ ソース構成 PolarDB for MySQL
      送信先タイプ 送信先構成 Alibaba Cloud Elasticsearch
      クラスターエンドポイントおよび認証情報 ソースおよび送信先セクション ご利用の PolarDB クラスターおよび Elasticsearch クラスターの接続詳細

      [接続テストと次へ] をクリックして接続を検証します。

      image

    2. 同期するオブジェクトを選択します。test_polardb データベースおよび product テーブルを選択します。image

    3. 高度な設定はデフォルトのままにします。

    4. データ検証 サブステップで、[_routing ポリシーを適用しない] を選択します。> 注: Elasticsearch V7.X クラスターの場合、[_routing ポリシーを適用しない] を選択する必要があります。

  4. タスクを保存し、事前チェックを完了させた後、DTS インスタンスを購入して同期を開始します。DTS インスタンスの購入後、完全データ同期が自動的に開始されます。データ同期ページで進捗を監視してください。完全同期完了後、DTS は増分同期に切り替わり、バイナリログの変更をリアルタイムで Elasticsearch にストリーミングし始めます。

    image

同期結果の確認(任意)

Kibana コンソールを使用して、PolarDB からのデータが正しくレプリケートされたことを確認します。

  1. Elasticsearch クラスターの Kibana コンソールにログインします。詳細については、「Kibana コンソールへのログイン」をご参照ください。

  2. 左上隅で 菜单.png > [Management] > [Dev Tools] を選択し、[Console] タブをクリックします。

  3. 完全同期の結果を確認します。次のコマンドを実行します。

    GET /product/_search

    応答には、ステップ 1 で挿入した 5 件のレコードすべてが表示されるはずです。

    {
      "took" : 3,
      "timed_out" : false,
      "_shards" : {
        "total" : 5,
        "successful" : 5,
        "skipped" : 0,
        "failed" : 0
      },
      "hits" : {
        "total" : {
          "value" : 5,
          "relation" : "eq"
        },
        "max_score" : 1.0,
        "hits" : [
          {
            "_index" : "product",
            "_type" : "product",
            "_id" : "3",
            "_score" : 1.0,
            "_source" : {
              "id" : 3,
              "name" : "mobile phone C",
              "price" : "2600",
              "code" : "cmp",
              "color" : "black"
            }
          },
          {
            "_index" : "product",
            "_type" : "product",
            "_id" : "5",
            "_score" : 1.0,
            "_source" : {
              "id" : 5,
              "name" : "mobile phone E",
              "price" : "2800",
              "code" : "emp",
              "color" : "silvery"
            }
          },
          {
            "_index" : "product",
            "_type" : "product",
            "_id" : "4",
            "_score" : 1.0,
            "_source" : {
              "id" : 4,
              "name" : "mobile phone D",
              "price" : "2700",
              "code" : "dmp",
              "color" : "red"
            }
          },
          {
            "_index" : "product",
            "_type" : "product",
            "_id" : "2",
            "_score" : 1.0,
            "_source" : {
              "id" : 2,
              "name" : "mobile phone B",
              "price" : "2200",
              "code" : "bmp",
              "color" : "white"
            }
          },
          {
            "_index" : "product",
            "_type" : "product",
            "_id" : "1",
            "_score" : 1.0,
            "_source" : {
              "id" : 1,
              "name" : "mobile phone A",
              "price" : "2000",
              "code" : "amp",
              "color" : "golden"
            }
          }
        ]
      }
    }
  4. 増分同期を確認します。ソーステーブルに新しいレコードを挿入します。

    INSERT INTO `test_polardb`.`product` (`id`,`name`,`price`,`code`,`color`) VALUES (6,'mobile phone F','2750','fmp','white');

    レコードがレプリケートされた後、再度 GET /product/_search を実行します。応答には、新しいレコードを含む合計 6 件のヒットが表示されるはずです。

    {
      "took" : 439,
      "timed_out" : false,
      "_shards" : {
        "total" : 5,
        "successful" : 5,
        "skipped" : 0,
        "failed" : 0
      },
      "hits" : {
        "total" : {
          "value" : 6,
          "relation" : "eq"
        },
        "max_score" : 1.0,
        "hits" : [
          {
            "_index" : "product",
            "_type" : "product",
            "_id" : "3",
            "_score" : 1.0,
            "_source" : {
              "id" : 3,
              "name" : "mobile phone C",
              "price" : "2600",
              "code" : "cmp",
              "color" : "black"
            }
          },
          {
            "_index" : "product",
            "_type" : "product",
            "_id" : "5",
            "_score" : 1.0,
            "_source" : {
              "id" : 5,
              "name" : "mobile phone E",
              "price" : "2800",
              "code" : "emp",
              "color" : "silvery"
            }
          },
          {
            "_index" : "product",
            "_type" : "product",
            "_id" : "4",
            "_score" : 1.0,
            "_source" : {
              "id" : 4,
              "name" : "mobile phone D",
              "price" : "2700",
              "code" : "dmp",
              "color" : "red"
            }
          },
          {
            "_index" : "product",
            "_type" : "product",
            "_id" : "2",
            "_score" : 1.0,
            "_source" : {
              "id" : 2,
              "name" : "mobile phone B",
              "price" : "2200",
              "code" : "bmp",
              "color" : "white"
            }
          },
          {
            "_index" : "product",
            "_type" : "product",
            "_id" : "6",
            "_score" : 1.0,
            "_source" : {
              "code" : "fmp",
              "color" : "white",
              "price" : "2750",
              "name" : "mobile phone F",
              "id" : 6
            }
          },
          {
            "_index" : "product",
            "_type" : "product",
            "_id" : "1",
            "_score" : 1.0,
            "_source" : {
              "id" : 1,
              "name" : "mobile phone A",
              "price" : "2000",
              "code" : "amp",
              "color" : "golden"
            }
          }
        ]
      }
    }

次のステップ