Data Transmission Service (DTS) は、MongoDB から Kafka クラスタへのデータ移行をサポートしています。このトピックでは、ApsaraDB for MongoDB インスタンス (レプリカセットアーキテクチャ) から Message Queue for Apache Kafka インスタンスにデータを移行する方法について説明します。
前提条件
移行先の Message Queue for Apache Kafka インスタンス が作成されている。
説明ソースデータベースと宛先データベースのサポートされているバージョンについては、「データ移行シナリオの概要」をご参照ください。
データを受信するために、移行先の Message Queue for Apache Kafka インスタンスに トピック が作成されている。
ソースデータベースが ApsaraDB for MongoDB シャードクラスターである場合は、すべてのシャードノードのエンドポイントを申請する必要があります。シャードクラスターインスタンス内のシャードノードは、同じアカウントパスワードとエンドポイントを共有する必要があります。エンドポイントの申請方法の詳細については、「シャードまたは ConfigServer コンポーネントのエンドポイントを申請する」をご参照ください。
注意事項
種類 | 説明 |
ソースデータベースの制限 |
|
その他 |
|
料金
移行タイプ | リンク構成料金 | データ転送コスト |
フルデータ移行 | 無料。 | この例では、データ転送コストは発生しません。宛先データベースの アクセス方法が パブリック IP アドレスの場合、データ転送コスト が発生します。 |
増分データ移行 | 課金されます。詳細については、「請求の概要」をご参照ください。 |
移行タイプ
移行タイプ | 説明 |
完全移行 | ソース ApsaraDB for MongoDB インスタンスの既存データはすべて、宛先 Kafka インスタンスに移行されます。 説明 DTS は、DATABASE オブジェクトと COLLECTION オブジェクトからのデータ移行をサポートしています。 |
増分移行 | フルデータ移行が完了した後、ソース ApsaraDB for MongoDB インスタンスからの増分データが宛先 Kafka インスタンスに移行されます。 oplog を使用する増分データ移行は、タスク開始後に作成されたデータベースをサポートしていません。次のタイプの増分更新がサポートされています。
changeStream を使用する次のタイプの増分更新がサポートされています。
|
データベースアカウントに必要な権限
データベース | 完全移行 | 増分移行 | アカウントの作成と承認方法 |
ソース ApsaraDB for MongoDB | 移行対象のデータベースと config データベースに対する読み取り権限。 | ソースデータベース、admin データベース、および local データベースに対する読み取り権限。 |
手順
次のいずれかの方法を使用して [データ移行] ページに移動し、データ移行インスタンスが存在するリージョンを選択します。
DTS コンソール
DTS コンソール にログインします。
左側のナビゲーションウィンドウで、データの移行 をクリックします。
ページの左上隅で、データ移行インスタンスが存在するリージョンを選択します。
DMS コンソール
説明実際の操作は、DMS コンソールのモードとレイアウトによって異なる場合があります。詳細については、「シンプルモード」と「DMS コンソールのレイアウトとスタイルをカスタマイズする」をご参照ください。
DMS コンソール にログインします。
上部のナビゲーションバーで、ポインタを に合わせます。
[データ移行タスク] の右側にあるドロップダウンリストから、データ同期インスタンスが存在するリージョンを選択します。
タスクの作成 をクリックして、タスク構成ページに移動します。
ソースデータベースと宛先データベースを構成します。次の表にパラメータを示します。
カテゴリ
パラメータ
説明
N/A
タスク名
DTS タスクの名前。DTS はタスク名を自動的に生成します。タスクを簡単に識別できる説明的な名前を指定することをお勧めします。一意のタスク名を指定する必要はありません。
移行元データベース
既存の接続情報の選択
DTS に登録されているデータベースインスタンスを使用する場合は、ドロップダウンリストからインスタンスを選択します。DTS は、インスタンスの次のデータベースパラメータを自動的に入力します。詳細については、「データベース接続を管理する」をご参照ください。
説明DMS コンソールでは、[DMS データベースインスタンスを選択] ドロップダウンリストからデータベースインスタンスを選択できます。
インスタンスを DTS に登録できない場合、または DTS に登録されているインスタンスを使用する必要がない場合は、次のデータベース情報を構成する必要があります。
データベースタイプ
MongoDB を選択します。
アクセス方法
Alibaba Cloud インスタンス を選択します。
インスタンスのリージョン
ソース ApsaraDB for MongoDB インスタンスが存在するリージョンを選択します。
Alibaba Cloud アカウント間でデータを複製
この例では、現在の Alibaba Cloud アカウントのデータベースインスタンスが使用されます。× を選択します。
アーキテクチャ
この例では、レプリカセット を選択します。
説明ソース ApsaraDB for MongoDB インスタンスが シャードクラスター の場合は、Shardアカウント と Shardパスワード も指定する必要があります。
移行方法
実際の状況に応じて、増分データ移行の方法を選択します。
Oplog (推奨):
ソースデータベースで Oplog が有効になっている場合、このオプションがサポートされます。
説明ローカルの自己管理 MongoDB と ApsaraDB for MongoDB では、デフォルトで Oplog が有効になっています。この方法を使用して増分データを移行する場合、増分移行タスクの遅延は小さくなります (ログのプル速度が速くなります)。そのため、Oplog を選択することをお勧めします。
ChangeStream: ソースデータベースで Change Streams (Change Streams) が有効になっている場合、このオプションがサポートされます。
説明ソースデータベースが Amazon DocumentDB (非エラスティッククラスター) の場合、ChangeStream のみサポートされます。
ソースデータベースの アーキテクチャ が シャードクラスター として選択されている場合、Shardアカウント と Shardパスワード を入力する必要はありません。
インスタンス ID
ソース ApsaraDB for MongoDB インスタンスの ID を選択します。
認証データベース
ソース ApsaraDB for MongoDB インスタンスのデータベースアカウントが属するデータベースの名前を入力します。データベースを変更していない場合は、デフォルト値 admin を入力します。
データベースアカウント
ソース ApsaraDB for MongoDB インスタンスのデータベースアカウントを入力します。アカウントに必要な権限については、「データベースアカウントに必要な権限」をご参照ください。
データベースのパスワード
データベースへのアクセスに使用するパスワード。
暗号化
ソースデータベースへの接続を暗号化するかどうかを指定します。ビジネス要件に基づいて、非暗号化、SSL 暗号化、または Mongo Atlas SSL を選択できます。暗号化 パラメータで使用可能なオプションは、アクセス方法 パラメータと アーキテクチャ パラメータで選択した値によって決まります。DTS コンソールに表示されるオプションが優先されます。
説明[アーキテクチャ] パラメータが [シャードクラスター] に設定され、ApsaraDB for MongoDB データベースの [移行方法] パラメータが Oplog に設定されている場合、[暗号化] パラメータ SSL 暗号化 は使用できません。
ソースデータベースが レプリカセット アーキテクチャを使用する自己管理 MongoDB データベースで、アクセス方法 パラメータが Alibaba Cloud インスタンス に設定されておらず、[暗号化] パラメータが SSL 暗号化 に設定されている場合、認証局 (CA) 証明書をアップロードしてソースデータベースへの接続を検証できます。
移行先データベース
既存の接続情報の選択
DTS に登録されているデータベースインスタンスを使用する場合は、ドロップダウンリストからインスタンスを選択します。DTS は、インスタンスの次のデータベースパラメータを自動的に入力します。詳細については、「データベース接続を管理する」をご参照ください。
説明DMS コンソールでは、[DMS データベースインスタンスを選択] ドロップダウンリストからデータベースインスタンスを選択できます。
インスタンスを DTS に登録できない場合、または DTS に登録されているインスタンスを使用する必要がない場合は、次のデータベース情報を構成する必要があります。
データベースタイプ
Kafka を選択します。
アクセス方法
Alibaba Cloud インスタンス を選択します。
インスタンスのリージョン
宛先 Kafka インスタンスが存在するリージョンを選択します。
Kafka インスタンス ID
宛先 Kafka インスタンスの ID を選択します。
暗号化
ビジネス要件とセキュリティ要件に基づいて、非暗号化 または SCRAM-SHA-256 を選択します。
トピック
ドロップダウンリストから、データを受信するトピックを選択します。
DDL 情報を格納するトピック
ドロップダウンリストから、DDL 情報を格納するトピックを選択します。
説明トピックを選択しない場合、DDL 情報は トピック に選択したトピックに格納されます。
Kafka スキーマレジストリの使用
Kafka Schema Registry は、メタデータのサービスレイヤーを提供します。Avro スキーマを格納および取得するための RESTful API 操作を提供します。
×: Kafka Schema Registry を使用しません。
○: Kafka Schema Registry を使用します。Kafka Schema Registry の Avro スキーマに登録されている URL または IP アドレスを入力する必要があります。
ページの下部にある [接続テストと続行] をクリックします。
説明DTS サーバーの CIDR ブロックをソースデータベースと宛先データベースのセキュリティ設定に自動または手動で追加して、DTS サーバーからのアクセスを許可できることを確認してください。詳細については、「DTS サーバーの CIDR ブロックを追加する」をご参照ください。
ソースデータベースまたは宛先データベースが自己管理データベースで、その アクセス方法 が Alibaba Cloud インスタンス に設定されていない場合は、DTS サーバーの CIDR ブロック ダイアログボックスの 接続テスト をクリックします。
移行するオブジェクトを構成します。
オブジェクト設定 ページで、移行するオブジェクトを構成します。
パラメータ
説明
移行タイプ
フルデータ移行のみを実行するには、[フルデータ移行] のみを選択します。
データ移行中のサービス継続性を確保するには、[フルデータ移行] と [増分データ移行] を選択します。
説明[増分データ移行] を選択しない場合は、データ移行中にソースデータベースにデータを書き込まないことをお勧めします。これにより、ソースデータベースと宛先データベース間でデータの整合性が確保されます。
競合するテーブルの処理モード
エラーの事前チェックと報告: 宛先データベースにソースデータベースのコレクションと同じ名前のコレクションが含まれているかどうかを確認します。ソースデータベースと宛先データベースに同じ名前のコレクションが含まれていない場合、事前チェックは合格です。そうでない場合、事前チェック中にエラーが返され、データ移行タスクを開始できません。
説明ソースデータベースと宛先データベースに同じ名前のコレクションが含まれており、宛先データベースのコレクションを削除または名前変更できない場合は、オブジェクト名マッピング機能を使用して、宛先データベースに移行されるコレクションの名前を変更できます。詳細については、「オブジェクト名をマッピングする」をご参照ください。
エラーを無視して続行: ソースデータベースと宛先データベースの同じコレクション名の事前チェックをスキップします。
警告エラーを無視して続行 を選択すると、データの整合性が保証されず、ビジネスが潜在的なリスクにさらされる可能性があります。
DTS は、宛先データベースのデータレコードと同じプライマリキーを持つデータレコードを移行しません。
データの初期化に失敗したり、特定の列のみが移行されたり、データ移行タスクが失敗したりする可能性があります。
Kafka のデータ形式
Canal JSON のみサポートされます。
説明Kafka が受信したデータは、3 つのシナリオ に分類できます。
Kafka 圧縮形式
要件に基づいて Kafka メッセージ圧縮形式を選択します。
[LZ4] (デフォルト): 低い圧縮率と高い圧縮速度を提供します。
[GZIP]: 高い圧縮率と遅い圧縮速度を提供します。
説明より多くの CPU リソースを使用します。
[Snappy]: 圧縮率と速度のバランスを提供します。
Kafka パーティションへのデータ転送ポリシー
ビジネス要件に基づいて 戦略 を選択します。
メッセージ肯定応答メカニズム
ビジネス要件に基づいて メッセージ確認応答メカニズム を選択します。
移行先インスタンスでのオブジェクト名の大文字化
宛先インスタンスのデータベース名とコレクション名の大文字と小文字。デフォルトでは、[DTS デフォルトポリシー] が選択されています。別のオプションを選択して、オブジェクト名の大文字と小文字がソースデータベースまたは宛先データベースと同じになるようにすることができます。詳細については、「宛先インスタンスのオブジェクト名の大文字と小文字を指定する」をご参照ください。
ソースオブジェクト
ソースオブジェクト セクションから 1 つ以上のオブジェクトを選択します。
アイコンをクリックして、選択中のオブジェクト セクションにオブジェクトを追加します。説明コレクションレベルでオブジェクトを選択できます。
選択中のオブジェクト
この例では、追加の構成は必要ありません。
マッピング機能を使用して、宛先 Kafka インスタンスのソースデータベースのコレクションに マッピング情報 を設定できます。
次へ:詳細設定 をクリックして、詳細設定を構成します。
パラメータ
説明
タスクのスケジュールに使用する専用クラスターの選択
デフォルトでは、専用クラスターを指定しない場合、DTS はデータ移行タスクを共有クラスターにスケジュールします。データ移行タスクの安定性を向上させるには、専用クラスターを購入します。詳細については、「DTS 専用クラスターとは」をご参照ください。
失敗した接続の再試行時間
接続失敗時のリトライ時間の範囲。データ移行タスクの開始後にソースデータベースまたは宛先データベースに接続できない場合、DTS はリトライ時間の範囲内で直ちに接続を再試行します。有効値: 10 ~ 1,440。単位: 分。デフォルト値: 720。このパラメータには 30 より大きい値を設定することをお勧めします。指定されたリトライ時間の範囲内で DTS がソースデータベースと宛先データベースに再接続された場合、DTS はデータ移行タスクを再開します。そうでない場合、データ移行タスクは失敗します。
説明同じソースデータベースまたは宛先データベースを共有する複数のデータ移行タスクに異なるリトライ時間の範囲を指定した場合、後で指定した値が優先されます。
DTS が接続を再試行すると、DTS インスタンスの料金が発生します。ビジネス要件に基づいてリトライ時間の範囲を指定することをお勧めします。また、ソースデータベースと宛先インスタンスが解放された後、できるだけ早く DTS インスタンスを解放することもできます。
移行元データベースと移行先データベースで他の問題が発生した場合の、再試行までの待機時間です。
その他の問題のリトライ時間の範囲。たとえば、データ移行タスクの開始後に DDL 操作または DML 操作の実行に失敗した場合、DTS はリトライ時間の範囲内で直ちに操作を再試行します。有効値: 1 ~ 1440。単位: 分。デフォルト値: 10。このパラメータには 10 より大きい値を設定することをお勧めします。指定されたリトライ時間の範囲内で失敗した操作が正常に実行された場合、DTS はデータ移行タスクを再開します。そうでない場合、データ移行タスクは失敗します。
重要移行元データベースと移行先データベースで他の問題が発生した場合の、再試行までの待機時間です。 パラメータの値は、失敗した接続の再試行時間 パラメータの値よりも小さくなければなりません。
更新操作後にドキュメント全体を取得するかどうか
増分データ移行中に、更新操作後のドキュメントの完全なデータを宛先データベースに移行するかどうかを指定します。
説明このパラメータは、移行方法 が ChangeStream に設定され、移行タイプ に 増分データ移行 が含まれている場合にのみ使用できます。
○: 更新操作後のドキュメントの完全なデータを移行します。
×: 更新されたフィールドのみを移行します。
完全移行率を制限するかどうか
フルデータ移行のスロットリングを有効にするかどうかを指定します。フルデータ移行中は、DTS はソースデータベースと宛先データベースの読み取りリソースと書き込みリソースを使用します。これにより、データベースサーバーの負荷が増加する可能性があります。ビジネス要件に基づいて、フルデータ移行のスロットリングを有効にすることができます。スロットリングを構成するには、1 秒あたりのソースデータベースのクエリ率 QPS、1 秒あたりの完全移行の行数 RPS、および 1 秒あたりの完全移行データ量 (MB) BPS パラメータを構成する必要があります。これにより、宛先データベースサーバーの負荷が軽減されます。
説明移行タイプ パラメータで 完全データ移行 を選択した場合にのみ、このパラメータを構成できます。
同じテーブル内のプライマリキー_id のデータ型が一意かどうか
移行するデータの同じコレクション内で、プライマリキー
_idのデータ型が一意かどうか。説明これは、移行タイプ が 完全データ移行 に設定されている場合にのみ構成できます。
○: 一意。フル移行フェーズでは、DTS はソースデータベースから移行されるデータのプライマリキーのデータ型をスキャンしません。
×: 一意ではない。フル移行フェーズでは、DTS はソースデータベースから移行されるデータのプライマリキーのデータ型をスキャンします。
増分移行率を制限するかどうか
増分データ移行のスロットリングを有効にするかどうかを指定します。スロットリングを構成するには、1 秒あたりの増分移行の行数 RPS と 1 秒あたりの増分移行データ量 (MB) BPS パラメータを構成する必要があります。これにより、宛先データベースサーバーの負荷が軽減されます。
説明移行タイプ パラメータで 増分データ移行 を選択した場合にのみ、このパラメータを構成できます。
環境タグ
DTS インスタンスを識別するために使用される環境タグ。ビジネス要件に基づいて環境タグを選択できます。この例では、環境タグは選択されていません。
ETL の設定
抽出、変換、書き出し (ETL) 機能を有効にするかどうかを指定します。詳細については、「ETL とは」をご参照ください。有効値:
[はい]: ETL 機能を構成します。コードエディタにデータ処理ステートメントを入力できます。詳細については、「データ移行タスクまたはデータ同期タスクで ETL を構成する」をご参照ください。
[いいえ]: ETL 機能を構成しません。
監視アラート
データ移行タスクのアラートを構成するかどうかを指定します。タスクが失敗した場合、または移行レイテンシが指定されたしきい値を超えた場合、アラート連絡先に通知が送信されます。有効値:
[いいえ]: アラートを構成しません。
[はい]: アラートを構成します。この場合、アラートしきい値と アラート通知設定 も構成する必要があります。詳細については、「監視とアラート」トピックの DTS タスクを作成するときに監視とアラートを構成する セクションをご参照ください。
タスク設定を保存し、事前チェックを実行します。
DTS タスクを構成するために関連 API 操作を呼び出すときに指定するパラメータを表示するには、次:タスク設定の保存と事前チェック にポインタを合わせ、OpenAPI パラメーターのプレビュー をクリックします。
パラメータを表示する必要がない場合、またはすでに表示している場合は、ページの下部にある 次:タスク設定の保存と事前チェック をクリックします。
説明データ移行タスクを開始する前に、DTS は事前チェックを実行します。タスクが事前チェックに合格した後でのみ、データ移行タスクを開始できます。
タスクが事前チェックに合格しない場合は、失敗した各項目の横にある [詳細の表示] をクリックします。チェック結果に基づいて原因を分析した後、問題をトラブルシューティングします。その後、事前チェックを再実行します。
事前チェック中に項目のアラートがトリガーされた場合:
アラート項目を無視できない場合は、失敗した項目の横にある [詳細の表示] をクリックして、問題をトラブルシューティングします。その後、事前チェックを再実行します。
アラート項目を無視できる場合は、[アラート詳細の確認] をクリックします。[詳細の表示] ダイアログボックスで、[無視] をクリックします。表示されるメッセージで、[OK] をクリックします。次に、[再チェック] をクリックして、事前チェックを再実行します。アラート項目を無視すると、データの不整合が発生し、ビジネスが潜在的なリスクにさらされる可能性があります。
インスタンスを購入します。
[成功率] が [100%] になるまで待ちます。次に、[次へ: インスタンスの購入] をクリックします。
[インスタンスの購入] ページで、データ移行インスタンスの [インスタンスクラス] パラメータを構成します。次の表にパラメータを示します。
セクション
パラメータ
説明
新しいインスタンスクラス
[リソースグループ]
データ移行インスタンスが属するリソースグループ。デフォルト値: [デフォルトリソースグループ]。詳細については、「リソース管理とは」をご参照ください。
インスタンスクラス
DTS は、移行速度が異なるインスタンスクラスを提供します。ビジネスシナリオに基づいてインスタンスクラスを選択できます。詳細については、「データ移行インスタンスのインスタンスクラス」をご参照ください。
チェックボックスをオンにして、[Data Transmission Service (従量課金制) サービス規約] を読んで同意します。
[購入して開始] をクリックします。表示されるメッセージで、[OK]をクリックします。
[データ移行] ページでタスクの進捗状況を確認できます。
説明データ移行タスクを増分データの移行に使用できない場合、タスクは自動的に停止します。[完了] が [ステータス] セクションに表示されます。
データ移行タスクを増分データの移行に使用できる場合、タスクは自動的に停止しません。増分データ移行タスクは停止または完了しません。[実行中] が [ステータス] セクションに表示されます。
マッピング情報
選択中のオブジェクト セクションで、宛先トピック名 (コレクションレベル) にポインタを合わせます。
宛先トピック名の後に表示される 編集 をクリックします。
表示される テーブルの編集 ダイアログボックスで、マッピング情報を構成します。
パラメータ
説明
テーブル名
ソースコレクションの移行先の宛先トピックの名前。デフォルトでは、移行先データベース セクションの ソースデータベースとターゲットデータベースの設定 ステップで トピック に選択したトピックが使用されます。
重要入力するトピック名は、宛先 Kafka インスタンスに存在する必要があります。そうでない場合、データ移行タスクは失敗します。
テーブル名 を変更すると、データは指定したトピックに書き込まれます。
フィルタリング条件
詳細については、「フィルター条件を構成する」をご参照ください。
パーティション数
宛先トピックにデータが書き込まれるときのパーティション数。
[OK] をクリックします。
データ配信シナリオ
シナリオ 1: oplog を使用して増分データを移行する
メインインスタンス構成
移行方法 は Oplog に設定され、移行タイプ には 増分データ移行 が含まれます。
データ配信例
ソースデータベースの増分変更のタイプ | ソースデータベースの増分変更ステートメント | 宛先トピックが受信したデータ |
| | |
| | |
| | |
| | |
| | |
| |
シナリオ 2: ChangeStream を使用して増分データを移行する (更新されたフィールドのみを移行する)
メインインスタンス構成
移行方法 は ChangeStream に設定され、移行タイプ には 増分データ移行 が含まれ、[更新後に完全なドキュメントを取得する] は × に設定されています。
データ配信例
ソースデータベースの増分変更のタイプ | ソースデータベースの増分変更ステートメント | 宛先トピックが受信したデータ |
| | |
| | |
| | |
| | |
| | |
| |
シナリオ 3: ChangeStream を使用して増分データを移行する (更新操作後にドキュメントの完全なデータを移行する)
メインインスタンス構成
移行方法 は ChangeStream に設定され、移行タイプ には 増分データ移行 が含まれ、[更新後に完全なドキュメントを取得する] は ○ に設定されています。
データ配信例
ソースデータベースの増分変更のタイプ | ソースデータベースの増分変更ステートメント | 宛先トピックが受信したデータ |
| | |
| | |
| | |
| | |
| | |
| |
特別なケース
注記
更新イベントの fullDocument フィールドが 欠落している 場合、データ配信結果は oplog を使用して増分データを移行する 場合と同じです。
例
ソースデータベースの基本データ | ソースデータベースの増分変更ステートメント | 宛先トピックが受信したデータ |
| |