すべてのプロダクト
Search
ドキュメントセンター

Elasticsearch:PolarDB for MySQL データベースから Alibaba Cloud Elasticsearch に DTS を使用してデータを同期する

最終更新日:Jan 11, 2025

PolarDB for MySQL データベースを使用しているときにクエリが遅い場合、データ伝送サービス (DTS) を使用して、データベースから Alibaba Cloud Elasticsearch クラスタに本番データをリアルタイムで同期できます。その後、クラスタ内の同期済みデータを検索および分析できます。このソリューションは、リレーショナルデータベースからのリアルタイム同期で高パフォーマンスが求められるシナリオに適しています。

背景情報

次のクラウドサービスが使用されます。

  • DTS は、データ移行、データサブスクリプション、リアルタイムデータ同期を統合したデータ伝送サービスです。DTS を使用すると、INSERT、DELETE、UPDATE などの SQL ステートメントによって生成されたデータを同期できます。 詳細については、「DTS」および「データ同期シナリオの概要」をご参照ください。

  • PolarDB は、Alibaba Cloud によって開発された次世代のリレーショナルデータベースサービスです。MySQL、PostgreSQL、Oracle データベースエンジンと互換性があります。PolarDB クラスタは最大 100 TB のストレージスペースを提供でき、最大 16 ノードまでスケーリングできます。PolarDB は、企業の多様な要件を満たすために、ストレージとコンピューティングにおいて優れたパフォーマンスを提供します。詳細については、「PolarDB for MySQL の概要」をご参照ください。

  • Elasticsearch は、Lucene ベースの分散型リアルタイム検索および分析エンジンです。ほぼリアルタイムで大規模なデータセットを保存、クエリ、分析できます。ほとんどの場合、複雑なクエリと高いアプリケーションパフォーマンスに対応するための基本エンジンまたはテクノロジーとして使用されます。詳細については、「Alibaba Cloud Elasticsearch とは」をご参照ください。

注意事項

  • DTS は、DDL 操作によって生成されたデータ変更を同期しません。データ同期中にソースデータベースのテーブルで DDL 操作が実行された場合は、次の操作を実行する必要があります。データ同期タスクからテーブルを削除し、Elasticsearch クラスタからテーブルのインデックスを削除してから、テーブルをデータ同期タスクに再度追加します。詳細については、「データ同期タスクからオブジェクトを削除する」および「データ同期タスクにオブジェクトを追加する」をご参照ください。

  • ソーステーブルに列を追加する場合は、テーブルに対応するインデックスのマッピングを変更します。次に、ソーステーブルで関連する DDL 操作を実行し、データ同期タスクを一時停止してから、タスクを再開します。

  • DTS は、初期のフルデータ同期中にソースとデスティネーションの読み取りおよび書き込みリソースを使用します。これにより、ソースとデスティネーションの負荷が増加する可能性があります。ソースまたはデスティネーションのパフォーマンスが良くない場合、ソースまたはデスティネーションの仕様が低い場合、またはデータ量が大きい場合、ソースまたはデスティネーションが使用できなくなる可能性があります。たとえば、ソースで多数のスロー SQL クエリが実行されている場合、1 つ以上のテーブルにプライマリキーがない場合、またはデスティネーションでデッドロックが発生した場合、DTS は多数の読み取りおよび書き込みリソースを占有します。この問題を防ぐには、データ同期前に、データ同期がソースとデスティネーションのパフォーマンスに及ぼす影響を評価する必要があります。オフピーク時にデータを同期することをお勧めします。たとえば、ソースとデスティネーションの CPU 使用率が 30% 未満のときにデータを同期できます。

    • ピーク時にフルデータを同期すると、同期が失敗する可能性があります。この場合は、同期タスクを再起動します。

    • ピーク時に増分データを同期すると、データ同期の遅延が発生する可能性があります。

プロセス

データを同期するには、次の手順を実行します。

  1. 準備を行う: 同期するデータをソース PolarDB for MySQL データベースに追加し、Alibaba Cloud Elasticsearch クラスタを作成し、Elasticsearch クラスタの自動インデックス作成機能を有効にします。

  2. データ同期タスクを作成して実行する: DTS コンソールでデータ同期タスクを作成して実行します。その後、フルデータと増分データが自動的に同期されます。

手順 1: 準備を行う

この例では、エンタープライズエディションの PolarDB for MySQL 8.0.1 クラスタと Alibaba Cloud Elasticsearch V7.10 クラスタが準備されています。

ソースデータベースと同期するデータを準備する

  1. エンタープライズエディションの PolarDB for MySQL 8.0.1 クラスタを作成します。詳細については、「エンタープライズエディションクラスタを購入する」をご参照ください。

  2. PolarDB for MySQL クラスタでバイナリロギング機能が有効になっています。詳細については、「バイナリロギングを有効にする」をご参照ください。

    image

  3. test_polardb という名前のアカウントとデータベースを作成します。 詳細については、「データベースアカウントを作成および管理する」および「データベース管理操作」をご参照ください。

  4. test_polardb データベースに、product という名前のテーブルを作成し、テーブルにデータを挿入します。

    • テーブルを作成する

      CREATE TABLE `product` (
          `id` bigint(32) NOT NULL AUTO_INCREMENT,
          `name` varchar(32) NULL,
          `price` varchar(32) NULL,
          `code` varchar(32) NULL,
          `color` varchar(32) NULL,
          PRIMARY KEY (`id`)
      ) ENGINE=InnoDB
      DEFAULT CHARACTER SET=utf8;
      // テーブルを作成する
    • テーブルにテストデータを挿入する

      INSERT INTO `test_polardb`.`product` (`id`,`name`,`price`,`code`,`color`) VALUES (1,'mobile phone A','2000','amp','golden');
      INSERT INTO `test_polardb`.`product` (`id`,`name`,`price`,`code`,`color`) VALUES (2,'mobile phone B','2200','bmp','white');
      INSERT INTO `test_polardb`.`product` (`id`,`name`,`price`,`code`,`color`) VALUES (3,'mobile phone C','2600','cmp','black');
      INSERT INTO `test_polardb`.`product` (`id`,`name`,`price`,`code`,`color`) VALUES (4,'mobile phone D','2700','dmp','red');
      INSERT INTO `test_polardb`.`product` (`id`,`name`,`price`,`code`,`color`) VALUES (5,'mobile phone E','2800','emp','silvery');
      // テストデータを挿入する

デスティネーション Elasticsearch クラスタを準備する

  1. Alibaba Cloud Elasticsearch V7.10 クラスタを作成します。詳細については、「Alibaba Cloud Elasticsearch クラスタを作成する」をご参照ください。

  2. Elasticsearch クラスタの自動インデックス作成機能を有効にします。詳細については、「YML ファイルを設定する」をご参照ください。

    image

手順 2: データ同期タスクを作成して実行する

  1. 新しい DTS コンソールのデータ同期ページに移動します。

  2. [タスクの作成] をクリックします。

  3. 表示されるページで、プロンプトに従ってデータ同期タスクを作成および設定します。

    以下の手順に関連するパラメータについては、「PolarDB for MySQL クラスタからデータを同期する」をご参照ください。

    1. ソースとデスティネーションを設定します。ページの下部にある [接続テストと続行] をクリックします。

      image

    2. データを同期するオブジェクトを設定します。

      image

    3. 詳細設定を行います。この例では、デフォルトの詳細設定が使用されます。

    4. [データ検証] サブステップで、[_routing ポリシーをテーブルに適用しない] を選択します。

      説明

      デスティネーション Elasticsearch クラスタが V7.X の場合は、[_routing ポリシーをテーブルに適用しない] を選択する必要があります。

  4. 設定が完了したら、データ同期タスクを保存し、タスクの事前チェックを実行し、DTS インスタンスを購入してデータ同期タスクを開始します。

    DTS インスタンスが購入されると、データ同期タスクの実行が開始されます。データ同期ページでタスクのデータ同期進捗状況を表示できます。フルデータが同期されると、Elasticsearch クラスタでフルデータを表示できます。

    image

手順 3: (オプション) データ同期結果を確認する

  1. Elasticsearch クラスタの Kibana コンソールにログインします。

    詳細については、「Kibana コンソールにログインする」をご参照ください。

  2. Kibana コンソールの左上隅で、菜单.png > [管理] > [開発ツール] を選択します。表示されるページで、[コンソール] タブをクリックします。

  3. フルデータの同期結果を確認します。

    次のコマンドを実行します。

    GET /product/_search
    // フルデータの同期結果を確認する

    コマンドが正常に実行されると、次の結果が返されます。

    {
      "took" : 3,
      "timed_out" : false,
      "_shards" : {
        "total" : 5,
        "successful" : 5,
        "skipped" : 0,
        "failed" : 0
      },
      "hits" : {
        "total" : {
          "value" : 5,
          "relation" : "eq"
        },
        "max_score" : 1.0,
        "hits" : [
          {
            "_index" : "product",
            "_type" : "product",
            "_id" : "3",
            "_score" : 1.0,
            "_source" : {
              "id" : 3,
              "name" : "mobile phone C",
              "price" : "2600",
              "code" : "cmp",
              "color" : "black"
            }
          },
          {
            "_index" : "product",
            "_type" : "product",
            "_id" : "5",
            "_score" : 1.0,
            "_source" : {
              "id" : 5,
              "name" : "mobile phone E",
              "price" : "2800",
              "code" : "emp",
              "color" : "silvery"
            }
          },
          {
            "_index" : "product",
            "_type" : "product",
            "_id" : "4",
            "_score" : 1.0,
            "_source" : {
              "id" : 4,
              "name" : "mobile phone D",
              "price" : "2700",
              "code" : "dmp",
              "color" : "red"
            }
          },
          {
            "_index" : "product",
            "_type" : "product",
            "_id" : "2",
            "_score" : 1.0,
            "_source" : {
              "id" : 2,
              "name" : "mobile phone B",
              "price" : "2200",
              "code" : "bmp",
              "color" : "white"
            }
          },
          {
            "_index" : "product",
            "_type" : "product",
            "_id" : "1",
            "_score" : 1.0,
            "_source" : {
              "id" : 1,
              "name" : "mobile phone A",
              "price" : "2000",
              "code" : "amp",
              "color" : "golden"
            }
          }
        ]
      }
    }
  4. 増分データの同期結果を確認します。

    1. 次のステートメントを実行して、ソーステーブルにデータレコードを挿入します。

      INSERT INTO `test_polardb`.`product` (`id`,`name`,`price`,`code`,`color`) VALUES (6,'mobile phone F','2750','fmp','white');
      // データレコードを挿入する
    2. データレコードが同期された後、GET /product/_search コマンドを実行します。

      コマンドが正常に実行されると、次の結果が返されます。

      {
        "took" : 439,
        "timed_out" : false,
        "_shards" : {
          "total" : 5,
          "successful" : 5,
          "skipped" : 0,
          "failed" : 0
        },
        "hits" : {
          "total" : {
            "value" : 6,
            "relation" : "eq"
          },
          "max_score" : 1.0,
          "hits" : [
            {
              "_index" : "product",
              "_type" : "product",
              "_id" : "3",
              "_score" : 1.0,
              "_source" : {
                "id" : 3,
                "name" : "mobile phone C",
                "price" : "2600",
                "code" : "cmp",
                "color" : "black"
              }
            },
            {
              "_index" : "product",
              "_type" : "product",
              "_id" : "5",
              "_score" : 1.0,
              "_source" : {
                "id" : 5,
                "name" : "mobile phone E",
                "price" : "2800",
                "code" : "emp",
                "color" : "silvery"
              }
            },
            {
              "_index" : "product",
              "_type" : "product",
              "_id" : "4",
              "_score" : 1.0,
              "_source" : {
                "id" : 4,
                "name" : "mobile phone D",
                "price" : "2700",
                "code" : "dmp",
                "color" : "red"
              }
            },
            {
              "_index" : "product",
              "_type" : "product",
              "_id" : "2",
              "_score" : 1.0,
              "_source" : {
                "id" : 2,
                "name" : "mobile phone B",
                "price" : "2200",
                "code" : "bmp",
                "color" : "white"
              }
            },
            {
              "_index" : "product",
              "_type" : "product",
              "_id" : "6",
              "_score" : 1.0,
              "_source" : {
                "code" : "fmp",
                "color" : "white",
                "price" : "2750",
                "name" : "mobile phone F",
                "id" : 6
              }
            },
            {
              "_index" : "product",
              "_type" : "product",
              "_id" : "1",
              "_score" : 1.0,
              "_source" : {
                "id" : 1,
                "name" : "mobile phone A",
                "price" : "2000",
                "code" : "amp",
                "color" : "golden"
              }
            }
          ]
        }
      }