このトピックでは、Data Transmission Service (DTS) を使用して、PolarDB-X 2.0 インスタンスから Elasticsearch インスタンスにデータを同期する方法について説明します。
前提条件
-
ソース PolarDB-X 2.0 インスタンスを作成済みであること。
-
ターゲット Elasticsearch インスタンスを作成済みであること。詳細については、「Alibaba Cloud Elasticsearch インスタンスの作成」をご参照ください。
-
サポートされているソースデータベースとターゲットデータベースのバージョンについては、「同期シナリオの概要」をご参照ください。
-
ターゲット Elasticsearch インスタンスのストレージ容量は、ソース PolarDB-X 2.0 インスタンスのストレージ容量よりも大きい必要があります。
注意事項
スキーマ同期中、DTS はソースデータベースからターゲットデータベースに外部キーを同期します。
完全データ同期および増分データ同期中、DTS はセッションレベルで制約チェックと外部キーのカスケード操作を一時的に無効にします。タスクの実行中にソースデータベースでカスケード更新または削除操作が実行されると、データの不整合が発生する可能性があります。
タイプ | 説明 |
ソースデータベースの制限 |
|
その他の制限 |
|
その他の注意事項 | DTS は、バイナリログのオフセットを進めるために、ソースデータベースの `dts_health_check`.`ha_health_check` テーブルを定期的に更新します。 |
課金
同期タイプ | 価格 |
スキーマ同期と完全データ同期 | 無料です。 |
増分データ同期 | 有料です。詳細については、「課金の概要」をご参照ください。 |
サポートされる同期トポロジー
-
一方向 1 対 1 同期
-
一方向 1 対多同期
-
一方向多対 1 同期
これらの同期トポロジーの説明と注意事項については、「データ同期トポロジーの概要」をご参照ください。
サポートされる SQL 操作
|
操作タイプ |
SQL 操作 |
|
DML |
INSERT、UPDATE、DELETE 説明
UPDATE 文を使用してフィールドを削除することはできません。 |
データ型のマッピング
ソースデータベースと Elasticsearch インスタンスではサポートされるデータ型が異なるため、データ型を常に直接マッピングできるわけではありません。初期スキーマ同期中、DTS はターゲット Elasticsearch インスタンスがサポートする型に基づいてデータ型をマッピングします。詳細については、「初期スキーマ同期のデータ型マッピング」をご参照ください。
説明DTS はスキーマ移行中に
mappingパラメーターをdynamicに設定しません。このパラメーターの動作は、ご利用の Elasticsearch インスタンスの設定に依存します。ソースデータが JSON 形式の場合、テーブル内のすべての行で同じキーの値が同じデータ型であることを確認してください。そうしないと、DTS が同期エラーを報告する可能性があります。詳細については、「dynamic」をご参照ください。次の表に、Elasticsearch とリレーショナルデータベースのマッピングを示します。
Elasticsearch
リレーショナルデータベース
インデックス
データベース
タイプ
テーブル
ドキュメント
行
フィールド
列
マッピング
データベーススキーマ
操作手順
ターゲットリージョンの同期タスクリストページに移動します。次の 2 つの方法のいずれかを使用できます。
DTS コンソールから
左側のナビゲーションウィンドウで、データ同期 をクリックします。
ページの左上隅で、同期インスタンスが配置されているリージョンを選択します。
DMS コンソールから
説明実際の操作は、DMS コンソールのモードとレイアウトによって異なる場合があります。詳細については、「シンプルモード」および「DMS インターフェイスのレイアウトとスタイルをカスタマイズする」をご参照ください。
Data Management (DMS) にログインします。
トップメニューバーで、 を選択します。
データ同期タスク の右側で、同期インスタンスが配置されているリージョンを選択します。
タスクの作成 をクリックして、タスク設定ページを開きます。
-
ソースデータベースとターゲットデータベースを設定します。
警告ソースインスタンスとターゲットインスタンスを選択した後、ページの上部に表示される制限を注意深くお読みください。制限に従わない場合、タスクが失敗したり、データの不整合が発生したりする可能性があります。
カテゴリ
構成
説明
なし
タスク名
DTS は自動的にタスク名を生成します。簡単に識別できるように、わかりやすい名前を指定することを推奨します。名前は一意である必要はありません。
ソースデータベース
データベースタイプ
[PolarDB-X 2.0] を選択します。
アクセス方法
[クラウドインスタンス] を選択します。
インスタンスリージョン
ソース PolarDB-X 2.0 インスタンスが存在するリージョンを選択します。
Alibaba Cloud アカウント間でデータを複製
この例では、現在の Alibaba Cloud アカウントに属するデータベースインスタンスを使用します。× を選択します。
インスタンス ID
ソース PolarDB-X 2.0 インスタンスの ID を選択します。
データベースアカウント
ソース PolarDB-X 2.0 インスタンスのデータベースアカウントを入力します。アカウントには、同期するオブジェクトに対する REPLICATION SLAVE、REPLICATION CLIENT、および SELECT 権限が必要です。
説明権限の付与方法の詳細については、「データ同期中のアカウント権限の問題」をご参照ください。
データベースパスワード
データベースアカウントに対応するパスワードを入力します。
宛先データベース
データベースタイプ
[Elasticsearch] を選択します。
アクセス方法
[クラウドインスタンス] を選択します。
インスタンスリージョン
ターゲット Elasticsearch インスタンスが存在するリージョンを選択します。
タイプ
必要に応じて [クラスター] または [サーバーレス] を選択します。
[インスタンス ID]
ターゲット Elasticsearch インスタンスの ID を選択します。
データベースアカウント
ターゲット Elasticsearch インスタンスのデータベースアカウントを入力します。アカウントには読み取りおよび書き込み権限が必要で、通常は elastic です。
データベースパスワード
データベースアカウントに対応するパスワードを入力します。
暗号化
必要に応じてHTTP または HTTPS を選択します。
設定が完了したら、ページ下部の 接続をテストして続行 をクリックします。
説明DTS サーバーからのアクセスを許可するために、DTS サーバーの IP アドレス CIDR ブロックがソースデータベースとターゲットデータベースのセキュリティ設定に追加されていることを確認してください。これは自動または手動で行うことができます。詳細については、「DTS サーバーの IP アドレス CIDR ブロックをホワイトリストに追加する」をご参照ください。
ソースデータベースまたはターゲットデータベースが自己管理データベースである場合 (アクセス方法 が Alibaba Cloud インスタンス ではない場合)、DTS サーバーの CIDR ブロック ダイアログボックスで 接続テスト をクリックする必要もあります。
-
タスクオブジェクトを設定します。
-
オブジェクト設定 ページで、同期するオブジェクトを設定します。
構成
説明
同期タイプ
増分データ同期 が選択されています。デフォルトでは、スキーマ同期 と 完全データ同期 も選択する必要があります。事前チェックが完了すると、DTS は選択したオブジェクトの完全データ同期をソースインスタンスからターゲットクラスターに実行します。これは、後続の増分データ同期のベースラインデータとして機能します。
インデックス名
-
[テーブル名]
[テーブル名] を選択すると、ターゲット Elasticsearch インスタンスで作成されるインデックス名はテーブル名と一致します。この例では、インデックス名は order です。
-
データベース テーブル
[データベース_テーブル] を選択すると、ターゲット Elasticsearch インスタンスで作成されるインデックス名は database_table になります。この例では、インデックス名は dtstest_order です。
競合するテーブルの処理モード
エラーの事前チェックと報告:ターゲットデータベースに同じ名前のテーブルが存在するかどうかを確認します。同じ名前のテーブルが存在しない場合、事前チェックは成功します。同じ名前のテーブルが存在する場合、事前チェックは失敗し、データ同期タスクは開始されません。
説明ターゲットデータベースで同じ名前のテーブルを削除または名前変更できない場合は、別のテーブル名にマッピングできます。詳細については、「テーブル名と列名のマッピング」をご参照ください。
エラーを無視して続行:ターゲットデータベースでの重複テーブル名のチェックをスキップします。
警告エラーを無視して続行 を選択すると、データの不整合が発生し、ビジネスにリスクをもたらす可能性があります。例:
テーブルスキーマが同じで、ターゲットデータベースのレコードがソースデータベースのレコードと同じプライマリキーまたは一意キーの値を持つ場合:
完全同期中、DTS はターゲットクラスターのレコードを保持します。ソースデータベースからの対応するレコードは同期されません。
増分同期中、ソースデータベースからのレコードはターゲットデータベースのレコードを上書きします。
テーブルスキーマが異なる場合、初期データ同期が失敗する可能性があります。これにより、一部の列データのみが同期されるか、同期が完全に失敗する可能性があります。注意して進めてください。
移行先インスタンスでのオブジェクト名の大文字化
ターゲットインスタンスに同期されるデータベース、テーブル、および列オブジェクト名の大文字/小文字の区別ポリシーを設定できます。デフォルトでは、DTS のデフォルトポリシー が選択されています。ソースデータベースとターゲットデータベースのデフォルトポリシーを使用することもできます。詳細については、「ターゲットオブジェクト名の大文字/小文字の区別ポリシー」をご参照ください。
ソースオブジェクト
ソースオブジェクト ボックスで、同期するオブジェクトをクリックし、
をクリックして 選択中のオブジェクト ボックスに移動します。説明データベース、テーブル、および列を同期オブジェクトとして選択できます。テーブルまたは列を選択した場合、ビュー、トリガー、ストアドプロシージャなどの他のオブジェクトはターゲットデータベースに同期されません。
選択中のオブジェクト
ターゲットインスタンスで単一の同期オブジェクトの名前を変更するには、選択中のオブジェクト ボックスでオブジェクトを右クリックします。オブジェクトの名前変更の詳細については、「単一のデータベース、テーブル、または列のマッピング」をご参照ください。
ターゲットインスタンスで複数の同期オブジェクトの名前をバッチで変更するには、選択中のオブジェクト ボックスの右上隅にある 一括編集 をクリックします。詳細については、「データベース、テーブル、および列のバッチマッピング」をご参照ください。
説明-
インデックス名とタイプ名では、特殊文字としてアンダースコア (_) のみがサポートされます。
-
データベースまたはテーブルレベルで同期する SQL 操作を選択するには、[選択済みオブジェクト] でオブジェクトを右クリックし、表示されるダイアログボックスで同期する SQL 操作を選択します。
-
WHERE 句を使用してデータをフィルター処理するには、[選択済みオブジェクト] でテーブルを右クリックし、表示されるダイアログボックスでフィルター条件を設定します。詳細については、「SQL 条件を使用したタスクデータのフィルター処理」をご参照ください。
-
オブジェクト名マッピング機能を使用すると、このオブジェクトに依存する他のオブジェクトの同期が失敗する可能性があります。
-
-
詳細設定へ をクリックして、詳細パラメーターを設定します。
構成
説明
タスクのスケジュールに使用する専用クラスターの選択
デフォルトでは、DTS は共有クラスターでタスクをスケジュールするため、クラスターを選択する必要はありません。より安定したパフォーマンスを得るために、専用クラスターを購入して DTS 同期タスクを実行できます。詳細については、「DTS 専用クラスターとは」をご参照ください。
失敗した接続の再試行時間
同期タスクが開始された後、ソースデータベースまたはターゲットデータベースへの接続が失敗した場合、DTS はエラーを報告し、すぐに接続のリトライを開始します。デフォルトのリトライ時間は 720 分です。10 分から 1,440 分の範囲でカスタムのリトライ時間を指定することもできます。30 分以上に設定することを推奨します。指定された時間内に DTS がデータベースに正常に再接続した場合、同期タスクは自動的に再開されます。そうでない場合、タスクは失敗します。
説明同じソースまたはターゲットを共有する複数の DTS インスタンス (たとえば、インスタンス A とインスタンス B) があり、インスタンス A のネットワークリトライ時間を 30 分、インスタンス B を 60 分に設定した場合、両方に対して短い方の 30 分が使用されます。
DTS は接続リトライ期間中のタスク実行時間に対して課金するため、ビジネスニーズに基づいてリトライ時間をカスタマイズするか、ソースデータベースとターゲットデータベースのインスタンスがリリースされた後、できるだけ早く DTS インスタンスをリリースすることを推奨します。
移行元データベースと移行先データベースで他の問題が発生した場合の、再試行までの待機時間です。
同期タスクが開始された後、ソースデータベースまたはターゲットデータベースでその他の非接続性の問題 (DDL または DML 実行例外など) が発生した場合、DTS はエラーを報告し、すぐに連続的なリトライ操作を開始します。デフォルトのリトライ時間は 10 分です。1 分から 1,440 分の範囲でカスタムのリトライ時間を指定することもできます。10 分以上に設定することを推奨します。設定されたリトライ時間内に関連する操作が成功した場合、同期タスクは自動的に再開されます。そうでない場合、タスクは失敗します。
重要移行元データベースと移行先データベースで他の問題が発生した場合の、再試行までの待機時間です。 の値は、失敗した接続の再試行時間 の値より小さくする必要があります。
完全同期レートを制限するかどうか
完全同期段階では、DTS はソースデータベースとターゲットデータベースの読み取りおよび書き込みリソースを消費し、データベースの負荷を増加させる可能性があります。ソースデータベースとターゲットデータベースの負荷を軽減するために、1 秒あたりのソースデータベースのクエリ率 QPS、1 秒あたりの完全移行の行数 RPS、および 1 秒あたりの完全移行データ量 (MB) BPS パラメーターを設定することで、完全同期タスクのレート制限を設定できます。
説明この設定項目は、同期タイプ が 完全データ同期 に設定されている場合にのみ利用可能です。
同期インスタンスの実行後に 完全同期レートを調整することもできます。
増分同期率を制限するかどうか
増分同期タスクのレート制限も設定できます。ターゲットデータベースへの圧力を軽減するために、1 秒あたりの増分同期の行数 RPS と 1 秒あたりの増分同期データ量 (MB) BPS を設定します。
環境タグ
必要に応じてインスタンスを識別するための環境タグを選択します。この例では選択は不要です。
シャード構成
ターゲット Elasticsearch インスタンスのインデックスの最大シャード設定に基づいて、インデックスのプライマリシャードとレプリカシャードの数を設定します。
文字列インデックス
ターゲット Elasticsearch インスタンスで文字列をどのようにインデックス付けするかを指定します。
-
analyzed:インデックス付けの前に文字列を分析します。特定のアナライザも選択する必要があります。アナライザの種類と機能については、「Analyzers」をご参照ください。
-
not analyzed:分析しません。元の値を直接インデックス付けします。
-
no:インデックス付けしません。
タイムゾーン
時間タイプのデータ (DATETIME や TIMESTAMP など) をターゲット Elasticsearch インスタンスに同期する場合、含めるタイムゾーンを選択します。
説明ターゲットインスタンスの時間タイプのデータにタイムゾーンが不要な場合は、同期前にターゲットインスタンスでこの時間タイプのデータのドキュメントタイプ (type) を設定してください。
DOCID
デフォルトでは、DOCID はテーブルのプライマリキーです。テーブルにプライマリキーがない場合、DOCID は Elasticsearch によって自動生成される ID 列です。
順方向および逆方向タスクのハートビートテーブル SQL を削除
DTS インスタンスの実行中にハートビート SQL 情報をソースデータベースに書き込むかどうかを選択します。
○:ハートビート SQL 情報はソースデータベースに書き込まれません。これにより、DTS インスタンスが遅延を報告する可能性があります。
×:ハートビート SQL 情報をソースデータベースに書き込みます。これにより、ソースデータベースの物理バックアップやクローニングなどの機能に干渉する可能性があります。
ETL 機能の設定
抽出・変換・書き出し (ETL) 機能を有効にするかどうかを選択します。詳細については、「ETL とは」をご参照ください。有効な値:
-
○:ETL 機能を有効にします。コードエディタにデータ処理文を入力します。詳細については、「データ移行またはデータ同期タスクで ETL を設定する」をご参照ください。
-
×:ETL 機能を無効にします。
監視アラート
アラートを設定するかどうかを指定します。同期が失敗した場合、または遅延が指定されたしきい値を超えた場合に、アラート連絡先に通知が送信されます。
×:アラートは設定されません。
○:アラートしきい値を設定し、アラート通知を指定してアラートを設定します。詳細については、「タスク設定中のモニタリングとアラートの設定」をご参照ください。
設定が完了したら、ページ下部の 次:データベースおよびテーブルのフィールド設定 をクリックして、ターゲット Elasticsearch で同期するテーブルの _routing ポリシーと _id の値を設定します。
タイプ
説明
_routing の設定
_routing を設定すると、ドキュメントをターゲット Elasticsearch インスタンスの特定のシャードにルーティングして保存できます。詳細については、「_routing」をご参照ください。
○ を選択した場合、ルーティングにカスタム列を使用できます。
× を選択した場合、_id がルーティングに使用されます。
説明ターゲット Elasticsearch インスタンスがバージョン 7.x の場合、[いいえ] を選択する必要があります。
_id の値
[テーブルのプライマリキー列]
複合プライマリキーは単一の列にマージされます。
ビジネスプライマリキー
[ビジネスプライマリキー] を選択した場合、対応する [ビジネスプライマリキー列] も設定する必要があります。
-
タスクを保存し、事前チェックを実行します。
このインスタンスを設定するための API パラメーターを表示するには、次:タスク設定の保存と事前チェック ボタンにカーソルを合わせ、バブル内の OpenAPI パラメーターのプレビュー をクリックします。
API パラメーターの表示が完了したら、ページ下部の 次:タスク設定の保存と事前チェック をクリックします。
説明同期タスクが開始される前に、DTS は事前チェックを実行します。タスクは、すべての事前チェック項目が成功した後にのみ開始できます。
事前チェックが失敗した場合は、失敗した項目の 詳細を表示 をクリックします。プロンプトに従って問題を修正し、再度事前チェックを実行します。
事前チェックで警告が返された場合:
チェック項目が失敗し、無視できない場合は、項目の横にある 詳細を表示 をクリックします。指示に従って問題を修正し、再度事前チェックを実行します。
無視できるチェック項目については、アラートの詳細を確認、無視、OK、再度事前チェックを実行 を順にクリックして警告をスキップし、事前チェックを再実行できます。警告項目を無視することを選択した場合、データの不整合などの問題が発生し、ビジネスにリスクをもたらす可能性があります。
-
インスタンスを購入します。
成功率 が 100% になったら、次:インスタンスの購入 をクリックします。
購入 ページで、データ同期インスタンスの課金方法とリンク仕様を選択します。次の表に、これらのパラメーターの詳細を示します。
カテゴリ
パラメーター
説明
新しいインスタンスクラス
課金方法
サブスクリプション:インスタンス作成時にお支払いいただきます。長期的なニーズに適しており、従量課金よりもコスト効率が高いです。サブスクリプション期間が長いほど、割引率が高くなります。
従量課金:時間単位で課金されます。短期的なニーズに適しています。使用後すぐにインスタンスをリリースしてコストを節約できます。
リソースグループ構成
インスタンスが属するリソースグループ。デフォルトはデフォルトのリソースグループです。詳細については、「Resource Management とは」をご参照ください。
リンク仕様
DTS は、異なるパフォーマンスレベルの同期仕様を提供します。同期リンク仕様は同期レートに影響します。ビジネスシナリオに基づいて仕様を選択できます。詳細については、「データ同期リンクの仕様」をご参照ください。
サブスクリプション期間
サブスクリプションモードでは、サブスクリプションインスタンスの期間と数量を選択します。1 か月から 9 か月までの月次サブスクリプション、または 1、2、3、5 年の年次サブスクリプションを選択できます。
説明このオプションは、課金方法が サブスクリプション の場合にのみ利用可能です。
設定が完了したら、Data Transmission Service (従量課金) 利用規約 を読んで選択します。
購入して起動 をクリックします。OK ダイアログボックスで、[OK] をクリックします。
[データ同期] ページでタスクの進捗状況を確認できます。
同期されたインデックスとデータの表示
データ同期タスクが 実行中 状態になったら、Kibana を使用して Elasticsearch インスタンスに接続し、作成されたインデックスと同期されたデータがビジネスの期待を満たしていることを確認します。ログイン手順については、「Kibana コンソールへのログイン」をご参照ください。
結果が期待どおりでない場合は、インデックスとそのデータを削除し、データ同期タスクを再設定してください。