PolarDB for MySQL の遅いクエリがアプリケーションのパフォーマンスに影響を与える場合、Data Transmission Service (DTS) を使用して本番データを Alibaba Cloud Elasticsearch クラスターにリアルタイムでレプリケートできます。Elasticsearch にデータが取り込まれた後は、ソースデータベースに負荷をかけることなく、高速な全文検索および分析を実行できます。
仕組み
DTS は PolarDB for MySQL のバイナリログ (binlog) から行レベルの変更をキャプチャし、Elasticsearch にストリーミングします。同期は次の 2 つのフェーズで順次実行されます。
| フェーズ | トリガー | DTS の動作 |
|---|---|---|
| 完全データ同期 | タスク開始時 | ソーステーブルのスナップショットを読み取り、既存のすべての行を Elasticsearch にインデックスします。 |
| 増分データ同期 | 完全同期完了後 | バイナリログからの INSERT、DELETE、UPDATE 操作をリアルタイムで再生します。 |
DTS は増分同期にバイナリログに依存するため、タスク開始前に PolarDB クラスターでバイナリロギングを有効にしておく必要があります。DTS は DDL 操作(例:DROP COLUMN や RENAME TABLE)をキャプチャしません。キャプチャされるのは DML 操作(INSERT、DELETE、UPDATE)のみです。この違いは重要です。同期中にソーステーブルに対して DDL 操作を実行すると、Elasticsearch インデックスが同期外れとなり、手動での回復が必要になります。回復手順については、「制限事項」をご参照ください。
前提条件
作業を開始する前に、以下の要件を満たしていることを確認してください。
-
バイナリロギングが有効化された PolarDB for MySQL 8.0.1 Enterprise Edition クラスター
-
Auto Indexing が有効化された Alibaba Cloud Elasticsearch V7.10 クラスター
-
新しい DTS コンソールへのアクセス権
制限事項
| 制限事項 | 影響 | 回復方法 |
|---|---|---|
| DDL 操作はレプリケートされない | タスク実行中にソーステーブルに対して DDL 文(カラムの削除や名前変更など)を実行すると、Elasticsearch インデックスが同期外れとなります。これにより、送信先でデータ損失やクエリエラーが発生する可能性があります。 | 同期タスクから該当テーブルを削除し、Elasticsearch から対応するインデックスを削除した後、再度テーブルを追加します。「データ同期タスクからオブジェクトを削除する」および「データ同期タスクにオブジェクトを追加する」をご参照ください。 |
| カラムの追加には追加手順が必要 | カラムの追加は DDL 操作です。Elasticsearch のマッピングを事前に更新せずに ALTER TABLE を実行すると、新しいフィールドがインデックスに表示されず、送信先で不完全なデータとなる可能性があります。 | まず Elasticsearch のインデックスマッピングを更新して新しいフィールドを含めます。その後、ソースで ALTER TABLE を実行します。DDL 完了後、同期タスクを一時停止して再開します。 |
| 完全同期はソースおよび送信先の負荷を増加させる | DTS は初期の完全同期中に PolarDB から読み取り、同時に Elasticsearch に書き込みます。これにより、両方のリソースがすでに高負荷状態にある場合、アプリケーションのパフォーマンスが低下する可能性があります。 | ソースおよび送信先の CPU 使用率が 30 % 未満となる非ピーク時間帯に完全同期を実行してください。ピーク時間帯に完全同期が失敗した場合は、タスクを再起動します。ピーク時間帯中の増分同期はレプリケーションラグを引き起こす可能性がありますが、失敗することはありません。 |
同期の設定と実行
この例では、ソースとして PolarDB for MySQL 8.0.1 Enterprise Edition クラスターを、送信先として Alibaba Cloud Elasticsearch V7.10 クラスターを使用します。
ステップ 1:ソースデータベースの準備
-
PolarDB for MySQL 8.0.1 Enterprise Edition クラスターを作成します。詳細については、「Enterprise Edition クラスターの購入」をご参照ください。
-
クラスターでバイナリロギングを有効にします。増分同期にはバイナリロギングが必要です。DTS はバイナリログを読み取って INSERT、DELETE、UPDATE の変更をリアルタイムでキャプチャします。詳細については、「バイナリロギングの有効化」をご参照ください。

-
test_polardbという名前のデータベースアカウントおよびデータベースを作成します。詳細については、「データベースアカウントの作成と管理」および「データベース管理操作」をご参照ください。 -
test_polardbデータベース内にproductという名前のテーブルを作成します。CREATE TABLE `product` ( `id` bigint(32) NOT NULL AUTO_INCREMENT, `name` varchar(32) NULL, `price` varchar(32) NULL, `code` varchar(32) NULL, `color` varchar(32) NULL, PRIMARY KEY (`id`) ) ENGINE=InnoDB DEFAULT CHARACTER SET=utf8; -
テストデータをテーブルに挿入します。
INSERT INTO `test_polardb`.`product` (`id`,`name`,`price`,`code`,`color`) VALUES (1,'mobile phone A','2000','amp','golden'); INSERT INTO `test_polardb`.`product` (`id`,`name`,`price`,`code`,`color`) VALUES (2,'mobile phone B','2200','bmp','white'); INSERT INTO `test_polardb`.`product` (`id`,`name`,`price`,`code`,`color`) VALUES (3,'mobile phone C','2600','cmp','black'); INSERT INTO `test_polardb`.`product` (`id`,`name`,`price`,`code`,`color`) VALUES (4,'mobile phone D','2700','dmp','red'); INSERT INTO `test_polardb`.`product` (`id`,`name`,`price`,`code`,`color`) VALUES (5,'mobile phone E','2800','emp','silvery');
ステップ 2:送信先 Elasticsearch クラスターの準備
-
Alibaba Cloud Elasticsearch V7.10 クラスターを作成します。詳細については、「Alibaba Cloud Elasticsearch クラスターの作成」をご参照ください。
-
クラスターで Auto Indexing を有効にします。DTS はデータ書き込み時に自動的に Elasticsearch インデックスを作成します。この機能を利用するには Auto Indexing を有効にする必要があります。詳細については、「YML ファイルの構成」をご参照ください。

ステップ 3:DTS 同期タスクの作成
-
新しい DTS コンソールのデータ同期ページに移動します。
-
[タスクの作成] をクリックします。
-
タスクを構成します。各パラメーターの詳細については、「PolarDB for MySQL クラスターからのデータ同期」をご参照ください。
-
ソース(PolarDB for MySQL クラスター)および送信先(Elasticsearch クラスター)を構成します。主なパラメーターは以下のとおりです。
パラメーター 設定場所 入力内容 ソースタイプ ソース構成 PolarDB for MySQL 送信先タイプ 送信先構成 Alibaba Cloud Elasticsearch クラスターエンドポイントおよび認証情報 ソースおよび送信先セクション ご利用の PolarDB クラスターおよび Elasticsearch クラスターの接続詳細 [接続テストと次へ] をクリックして接続を検証します。

-
同期するオブジェクトを選択します。
test_polardbデータベースおよびproductテーブルを選択します。
-
高度な設定はデフォルトのままにします。
-
データ検証 サブステップで、[_routing ポリシーを適用しない] を選択します。> 注: Elasticsearch V7.X クラスターの場合、[_routing ポリシーを適用しない] を選択する必要があります。
-
-
タスクを保存し、事前チェックを完了させた後、DTS インスタンスを購入して同期を開始します。DTS インスタンスの購入後、完全データ同期が自動的に開始されます。データ同期ページで進捗を監視してください。完全同期完了後、DTS は増分同期に切り替わり、バイナリログの変更をリアルタイムで Elasticsearch にストリーミングし始めます。

同期結果の確認(任意)
Kibana コンソールを使用して、PolarDB からのデータが正しくレプリケートされたことを確認します。
-
Elasticsearch クラスターの Kibana コンソールにログインします。詳細については、「Kibana コンソールへのログイン」をご参照ください。
-
左上隅で
> [Management] > [Dev Tools] を選択し、[Console] タブをクリックします。 -
完全同期の結果を確認します。次のコマンドを実行します。
GET /product/_search応答には、ステップ 1 で挿入した 5 件のレコードすべてが表示されるはずです。
{ "took" : 3, "timed_out" : false, "_shards" : { "total" : 5, "successful" : 5, "skipped" : 0, "failed" : 0 }, "hits" : { "total" : { "value" : 5, "relation" : "eq" }, "max_score" : 1.0, "hits" : [ { "_index" : "product", "_type" : "product", "_id" : "3", "_score" : 1.0, "_source" : { "id" : 3, "name" : "mobile phone C", "price" : "2600", "code" : "cmp", "color" : "black" } }, { "_index" : "product", "_type" : "product", "_id" : "5", "_score" : 1.0, "_source" : { "id" : 5, "name" : "mobile phone E", "price" : "2800", "code" : "emp", "color" : "silvery" } }, { "_index" : "product", "_type" : "product", "_id" : "4", "_score" : 1.0, "_source" : { "id" : 4, "name" : "mobile phone D", "price" : "2700", "code" : "dmp", "color" : "red" } }, { "_index" : "product", "_type" : "product", "_id" : "2", "_score" : 1.0, "_source" : { "id" : 2, "name" : "mobile phone B", "price" : "2200", "code" : "bmp", "color" : "white" } }, { "_index" : "product", "_type" : "product", "_id" : "1", "_score" : 1.0, "_source" : { "id" : 1, "name" : "mobile phone A", "price" : "2000", "code" : "amp", "color" : "golden" } } ] } } -
増分同期を確認します。ソーステーブルに新しいレコードを挿入します。
INSERT INTO `test_polardb`.`product` (`id`,`name`,`price`,`code`,`color`) VALUES (6,'mobile phone F','2750','fmp','white');レコードがレプリケートされた後、再度
GET /product/_searchを実行します。応答には、新しいレコードを含む合計 6 件のヒットが表示されるはずです。{ "took" : 439, "timed_out" : false, "_shards" : { "total" : 5, "successful" : 5, "skipped" : 0, "failed" : 0 }, "hits" : { "total" : { "value" : 6, "relation" : "eq" }, "max_score" : 1.0, "hits" : [ { "_index" : "product", "_type" : "product", "_id" : "3", "_score" : 1.0, "_source" : { "id" : 3, "name" : "mobile phone C", "price" : "2600", "code" : "cmp", "color" : "black" } }, { "_index" : "product", "_type" : "product", "_id" : "5", "_score" : 1.0, "_source" : { "id" : 5, "name" : "mobile phone E", "price" : "2800", "code" : "emp", "color" : "silvery" } }, { "_index" : "product", "_type" : "product", "_id" : "4", "_score" : 1.0, "_source" : { "id" : 4, "name" : "mobile phone D", "price" : "2700", "code" : "dmp", "color" : "red" } }, { "_index" : "product", "_type" : "product", "_id" : "2", "_score" : 1.0, "_source" : { "id" : 2, "name" : "mobile phone B", "price" : "2200", "code" : "bmp", "color" : "white" } }, { "_index" : "product", "_type" : "product", "_id" : "6", "_score" : 1.0, "_source" : { "code" : "fmp", "color" : "white", "price" : "2750", "name" : "mobile phone F", "id" : 6 } }, { "_index" : "product", "_type" : "product", "_id" : "1", "_score" : 1.0, "_source" : { "id" : 1, "name" : "mobile phone A", "price" : "2000", "code" : "amp", "color" : "golden" } } ] } }
次のステップ
-
Elasticsearch クラスター内のデータを検索および分析するには、「Alibaba Cloud Elasticsearch とは」をご参照ください。
-
実行中の同期タスクからオブジェクトを削除するには、「データ同期タスクからオブジェクトを削除する」をご参照ください。
-
実行中の同期タスクにオブジェクトを再度追加するには、「データ同期タスクにオブジェクトを追加する」をご参照ください。