すべてのプロダクト
Search
ドキュメントセンター

Elasticsearch:MySQL から Alibaba Cloud Elasticsearch にデータをリアルタイムで同期するために DTS を使用する

最終更新日:Jan 11, 2025

ApsaraDB RDS for MySQL データベースのプロダクションデータを Alibaba Cloud Elasticsearch を使用して検索および分析する場合、データ伝送サービス (DTS) を使用してデータを Elasticsearch クラスターにリアルタイムで同期できます。同期は、リアルタイム同期タスクに基づいて実装されます。このソリューションは、リアルタイム同期の高パフォーマンスが要求されるシナリオに適しています。このトピックでは、ApsaraDB RDS for MySQL データベースから Alibaba Cloud Elasticsearch クラスターにデータをリアルタイムで同期するためのリアルタイム同期タスクを作成する方法について説明します。また、フルデータと増分データの同期結果を確認する方法についても説明します。

背景情報

  • DTS は、データ移行、データサブスクリプション、およびリアルタイムデータ同期を統合したデータ伝送サービスです。詳細については、「DTS」をご参照ください。DTS は、挿入、削除、および更新操作によって生成されたデータ変更の同期をサポートしています。DTS がデータを同期できるデータソースのバージョンについては、データ同期シナリオの概要 をご参照ください。

  • DTS を使用して、MySQL から Elasticsearch にフルデータまたは増分データを同期できます。このソリューションは、リレーショナルデータベースからのリアルタイム同期の高パフォーマンスが必要な場合、またはリレーショナルデータベースから Alibaba Cloud Elasticsearch クラスターにフルデータまたは増分データを同期する必要がある場合に適しています。

注意事項

  • DTS は、DDL 操作によって生成されたデータ変更を同期しません。データ同期中にソースデータベースのテーブルで DDL 操作が実行された場合は、次の操作を実行する必要があります。データ同期タスクからテーブルを削除し、Elasticsearch クラスターからテーブルのインデックスを削除してから、テーブルをデータ同期タスクに再度追加します。詳細については、データ同期タスクからオブジェクトを削除する および データ同期タスクにオブジェクトを追加する をご参照ください。

  • ソーステーブルに列を追加する場合は、テーブルに対応するインデックスのマッピングを変更します。次に、ソーステーブルで関連する DDL 操作を実行し、データ同期タスクを一時停止してから、タスクを再開します。

  • DTS は、初期フルデータ同期中にソースとデスティネーションの読み取りおよび書き込みリソースを使用します。これにより、ソースとデスティネーションの負荷が増加する可能性があります。ソースまたはデスティネーションのパフォーマンスが良くない場合、ソースまたはデスティネーションの仕様が低い場合、またはデータ量が多い場合、ソースまたはデスティネーションが使用できなくなる可能性があります。たとえば、ソースで多数のスロー SQL クエリが実行されている場合、1 つ以上のテーブルにプライマリキーがない場合、またはデスティネーションでデッドロックが発生した場合、DTS は多数の読み取りおよび書き込みリソースを占有します。この問題を防ぐために、データ同期前に、データ同期がソースとデスティネーションのパフォーマンスに与える影響を評価する必要があります。オフピーク時にデータを同期することをお勧めします。たとえば、ソースとデスティネーションの CPU 使用率が 30% 未満のときにデータを同期できます。

    • ピーク時にフルデータを同期すると、同期が失敗する可能性があります。この場合は、同期タスクを再起動してください。

    • ピーク時に増分データを同期すると、データ同期の遅延が発生する可能性があります。

  • ApsaraDB RDS for MySQL と Elasticsearch は、異なるデータ型をサポートしています。初期スキーマ同期中に、DTS はデスティネーションでサポートされているデータ型に基づいて、ソースフィールドとデスティネーションフィールド間のマッピングを確立します。詳細については、スキーマ同期のデータ型マッピング をご参照ください。

プロセス

  1. 準備を行う: 同期するデータをソース ApsaraDB RDS for MySQL データベースに追加し、Alibaba Cloud Elasticsearch クラスターを作成し、Elasticsearch クラスターの自動インデックス作成機能を有効にします。

  2. データ同期タスクを作成して実行する: DTS コンソールでデータ同期タスクを作成して実行します。

  3. データ同期結果を確認する: Elasticsearch クラスターの Kibana コンソールにログインして、フルデータの同期結果を確認します。次に、ソース ApsaraDB RDS for MySQL データベースにデータを追加し、Elasticsearch クラスターの Kibana コンソールにログインして、増分データの同期結果を確認します。

手順

手順 1: 準備を行う

この例では、MySQL 8.0 を実行する ApsaraDB RDS インスタンスと Alibaba Cloud Elasticsearch V7.10 クラスターを準備します。

ソースデータベースと同期するデータを準備する

  1. MySQL 8.0 を実行する ApsaraDB RDS インスタンスを作成します。詳細については、ApsaraDB RDS for MySQL インスタンスを作成する をご参照ください。

  2. test_mysql という名前のアカウントとデータベースを作成します。詳細については、アカウントとデータベースを作成する をご参照ください。

  3. test_mysql データベースに、es_test という名前のテーブルを作成し、テーブルにデータを挿入します。この例では、次のステートメントを実行してテーブルを作成し、テーブルにデータを挿入します。

    -- テーブルを作成する
    CREATE TABLE `es_test` (
        `id` bigint(32) NOT NULL,
        `name` varchar(32) NULL,
        `age` bigint(32) NULL,
        `hobby` varchar(32) NULL,
        PRIMARY KEY (`id`)
    ) ENGINE=InnoDB
    DEFAULT CHARACTER SET=utf8;
    
    -- データを挿入する
    INSERT INTO `es_test` (`id`,`name`,`age`,`hobby`) VALUES (1,'user1',22,'music');
    INSERT INTO `es_test` (`id`,`name`,`age`,`hobby`) VALUES (2,'user2',23,'sport');
    INSERT INTO `es_test` (`id`,`name`,`age`,`hobby`) VALUES (3,'user3',43,'game');
    INSERT INTO `es_test` (`id`,`name`,`age`,`hobby`) VALUES (4,'user4',24,'run');
    INSERT INTO `es_test` (`id`,`name`,`age`,`hobby`) VALUES (5,'user5',42,'basketball');

デスティネーション Elasticsearch クラスターを準備する

  1. Alibaba Cloud Elasticsearch V7.10 クラスターを作成します。詳細については、Alibaba Cloud Elasticsearch クラスターを作成する をご参照ください。

  2. Elasticsearch クラスターの自動インデックス作成機能を有効にします。詳細については、YML ファイルを設定する をご参照ください。

    image

手順 2: データ同期タスクを作成して実行する

  1. 新しい DTS コンソールのデータ同期ページ に移動します。

  2. [タスクの作成] をクリックします。

  3. 表示されるページで、プロンプトに従ってデータ同期タスクを作成および設定します。

    説明

    以下の手順に含まれるパラメーターについては、ApsaraDB RDS for MySQL インスタンスから Elasticsearch クラスターにデータを同期する をご参照ください。

    1. ソースとデスティネーションを設定します。ページの下部にある [接続テストと続行] をクリックします。

      image

    2. データを同期するオブジェクトを設定します。

      image

    3. 詳細設定を行います。この例では、デフォルトの詳細設定を使用します。

    4. [データ検証] サブステップで、[_routing ポリシーをテーブルに適用しない] を選択します。

      説明

      デスティネーション Elasticsearch クラスターが V7.X の場合は、[_routing ポリシーをテーブルに適用しない] を選択する必要があります。

  4. 設定が完了したら、データ同期タスクを保存し、タスクの事前チェックを実行し、DTS インスタンスを購入してデータ同期タスクを開始します。

    DTS インスタンスが購入されると、データ同期タスクの実行が開始されます。[データ同期] ページでタスクのデータ同期進捗状況を確認できます。フルデータが同期されると、Elasticsearch クラスターでフルデータを確認できます。

    image

手順 3: (オプション) データ同期結果を確認する

  1. Elasticsearch クラスターの Kibana コンソールにログインします。

    詳細については、Kibana コンソールにログインする をご参照ください。

  2. Kibana コンソールの左上隅で、菜单.png > [管理] > [開発ツール] を選択します。表示されるページで、[コンソール] タブをクリックします。

  3. フルデータの同期結果を確認します。

    次のコマンドを実行します。

    GET /es_test/_search

    コマンドが正常に実行されると、次の結果が返されます。

    {
      "took" : 10,
      "timed_out" : false,
      "_shards" : {
        "total" : 5,
        "successful" : 5,
        "skipped" : 0,
        "failed" : 0
      },
      "hits" : {
        "total" : {
          "value" : 5,
          "relation" : "eq"
        },
        "max_score" : 1.0,
        "hits" : [
          {
            "_index" : "es_test",
            "_type" : "es_test",
            "_id" : "3",
            "_score" : 1.0,
            "_source" : {
              "id" : 3,
              "name" : "user3",
              "age" : 43,
              "hobby" : "game"
            }
          },
          {
            "_index" : "es_test",
            "_type" : "es_test",
            "_id" : "5",
            "_score" : 1.0,
            "_source" : {
              "id" : 5,
              "name" : "user5",
              "age" : 42,
              "hobby" : "basketball"
            }
          },
          {
            "_index" : "es_test",
            "_type" : "es_test",
            "_id" : "4",
            "_score" : 1.0,
            "_source" : {
              "id" : 4,
              "name" : "user4",
              "age" : 24,
              "hobby" : "run"
            }
          },
          {
            "_index" : "es_test",
            "_type" : "es_test",
            "_id" : "2",
            "_score" : 1.0,
            "_source" : {
              "id" : 2,
              "name" : "user2",
              "age" : 23,
              "hobby" : "sport"
            }
          },
          {
            "_index" : "es_test",
            "_type" : "es_test",
            "_id" : "1",
            "_score" : 1.0,
            "_source" : {
              "id" : 1,
              "name" : "user1",
              "age" : 22,
              "hobby" : "music"
            }
          }
        ]
      }
    }
  4. 増分データの同期結果を確認します。

    1. 次のステートメントを実行して、ソーステーブルにデータレコードを挿入します。

      INSERT INTO `test_mysql`.`es_test` (`id`,`name`,`age`,`hobby`) VALUES (6,'user6',30,'dance');
    2. データレコードが同期された後、GET /es_test/_search コマンドを実行します。

      コマンドが正常に実行されると、次の結果が返されます。

      {
        "took" : 541,
        "timed_out" : false,
        "_shards" : {
          "total" : 5,
          "successful" : 5,
          "skipped" : 0,
          "failed" : 0
        },
        "hits" : {
          "total" : {
            "value" : 6,
            "relation" : "eq"
          },
          "max_score" : 1.0,
          "hits" : [
            {
              "_index" : "es_test",
              "_type" : "es_test",
              "_id" : "3",
              "_score" : 1.0,
              "_source" : {
                "id" : 3,
                "name" : "user3",
                "age" : 43,
                "hobby" : "game"
              }
            },
            {
              "_index" : "es_test",
              "_type" : "es_test",
              "_id" : "5",
              "_score" : 1.0,
              "_source" : {
                "id" : 5,
                "name" : "user5",
                "age" : 42,
                "hobby" : "basketball"
              }
            },
            {
              "_index" : "es_test",
              "_type" : "es_test",
              "_id" : "4",
              "_score" : 1.0,
              "_source" : {
                "id" : 4,
                "name" : "user4",
                "age" : 24,
                "hobby" : "run"
              }
            },
            {
              "_index" : "es_test",
              "_type" : "es_test",
              "_id" : "2",
              "_score" : 1.0,
              "_source" : {
                "id" : 2,
                "name" : "user2",
                "age" : 23,
                "hobby" : "sport"
              }
            },
            {
              "_index" : "es_test",
              "_type" : "es_test",
              "_id" : "6",
              "_score" : 1.0,
              "_source" : {
                "name" : "user6",
                "id" : 6,
                "age" : 30,
                "hobby" : "dance"
              }
            },
            {
              "_index" : "es_test",
              "_type" : "es_test",
              "_id" : "1",
              "_score" : 1.0,
              "_source" : {
                "id" : 1,
                "name" : "user1",
                "age" : 22,
                "hobby" : "music"
              }
            }
          ]
        }
      }