すべてのプロダクト
Search
ドキュメントセンター

Data Transmission Service:ApsaraDB for MongoDB を Message Queue for Apache Kafka に移行する

最終更新日:Apr 13, 2025

Data Transmission Service (DTS) は、MongoDB から Kafka クラスタへのデータ移行をサポートしています。このトピックでは、ApsaraDB for MongoDB インスタンス (レプリカセットアーキテクチャ) から Message Queue for Apache Kafka インスタンスにデータを移行する方法について説明します。

前提条件

  • 移行先の Message Queue for Apache Kafka インスタンス が作成されている。

    説明

    ソースデータベースと宛先データベースのサポートされているバージョンについては、「データ移行シナリオの概要」をご参照ください。

  • データを受信するために、移行先の Message Queue for Apache Kafka インスタンスに トピック が作成されている。

  • ソースデータベースが ApsaraDB for MongoDB シャードクラスターである場合は、すべてのシャードノードのエンドポイントを申請する必要があります。シャードクラスターインスタンス内のシャードノードは、同じアカウントパスワードとエンドポイントを共有する必要があります。エンドポイントの申請方法の詳細については、「シャードまたは ConfigServer コンポーネントのエンドポイントを申請する」をご参照ください。

注意事項

種類

説明

ソースデータベースの制限

  • ソースデータベースがデプロイされているサーバーには、十分なアウトバウンド帯域幅が必要です。そうでない場合、データ移行速度が低下します。

  • コレクションを移行対象のオブジェクトとして選択し、宛先データベース内のコレクションを変更する場合 (コレクションの名前変更など)、1 つのデータ移行タスクで最大 1,000 個のコレクションを移行できます。 1,000 個を超えるコレクションを移行するタスクを実行すると、リクエストエラーが発生します。この場合は、複数タスクを構成してコレクションを移行することをお勧めします。

  • 増分データを移行する場合は、操作ログを有効にする必要があります。そうでない場合、事前チェック中にエラーが発生し、移行タスクが開始できません。

    説明

    ソースデータベースの操作ログは、少なくとも 7 日間保持する必要があります。そうでない場合、DTS は操作ログの取得に失敗する可能性があり、タスクが失敗したり、データの不整合やデータ損失が発生したりする可能性があります。上記要件に基づいて操作ログの保存期間を設定してください。そうでない場合、DTS のサービスレベルアグリーメント (SLA) はサービスの信頼性またはパフォーマンスを保証しません。

  • ソースデータベースが ApsaraDB for MongoDB シャードクラスターインスタンスの場合、インスタンス内の Mongos ノード数は 10 を超えることはできません。また、ソース ApsaraDB for MongoDB シャードクラスターインスタンスに孤立ドキュメントが含まれていないことを確認する必要があります。そうでない場合、データの不整合が発生し、タスクが失敗する可能性があります。詳細については、MongoDB ドキュメントと FAQ トピックのシャードクラスターアーキテクチャにデプロイされた MongoDB データベースの孤立ドキュメントを削除するにはどうすればよいですか?セクションをご参照ください。

  • ソースデータベースで実行される操作の制限:

    • フルデータ移行中は、データベースまたはコレクションのスキーマを変更しないでください。そうでない場合、データ移行タスクは失敗します。

    • フルデータ移行のみを実行する場合は、データ移行中にソースデータベースにデータを書き込まないでください。そうでない場合、ソースデータベースと宛先データベース間でデータの不整合が発生します。

その他

  • コレクションレベルの移行のみがサポートされています。

  • DTS は、admin データベースまたは local データベースからのデータ移行をサポートしていません。

  • 移行する 1 つのデータが 10 MB を超えると、タスクは失敗します。

  • ソースデータベースがシャードクラスター MongoDB インスタンスの場合:

    • フルデータ移行中は、各サブタスクが増分データ移行のフェーズに達するまで、ソース MongoDB データベースのバランサーを無効にしてください。そうでない場合、データの不整合が発生する可能性があります。MongoDB バランサーの詳細については、「ApsaraDB for MongoDB バランサーを管理する」をご参照ください。

    • 増分データ移行方法が Oplog の場合、DTS はソースデータベースの異なるシャードから宛先 Kafka インスタンスへのデータの書き込み順序を保証できません。

  • DTS はトランザクション情報を保持しません。ソースデータベースのトランザクションは、個別のレコードとして宛先データベースに移行されます。

  • DTS タスク中に宛先 Kafka インスタンスのブローカーノードが増減した場合、DTS タスクを再起動する必要があります。

  • ApsaraMQ for Kafka インスタンスのブローカー数が 3 より大きい場合は、Express Connect、VPN Gateway、または Smart Access Gateway を使用して DTS に接続する必要があります。

  • DTS がソースと宛先に接続できることを確認してください。たとえば、データベースインスタンスのセキュリティ設定と、自己管理 Kafka 構成ファイル server.properties 内の listeners パラメータと advertised.listeners パラメータは、DTS からのアクセスを制限しません。

  • フルデータ移行タスク中は、DTS はソースデータベースと宛先データベースの読み取りリソースと書き込みリソースを使用します。これにより、データベースサーバーの負荷が増加する可能性があります。そのため、DTS インスタンスを開始する前にソースデータベースと宛先データベースのパフォーマンスを評価し、オフピーク時 (たとえば、ソースデータベースと宛先データベースの CPU 負荷が 30% 未満の場合) にデータ移行を行うことをお勧めします。

  • DTS は、過去 7 日間に失敗した DTS インスタンスの回復を試みます。ワークロードを宛先インスタンスに切り替える前に、DTS インスタンスを解放または完了して、DTS インスタンスが自動的に回復された後に宛先データベースのデータが上書きされないようにしてください。

  • DTS は、宛先データベースの最後に移行されたデータのタイムスタンプとソースデータベースの現在のタイムスタンプに基づいて、増分データ移行のレイテンシを計算します。ソースデータベースで長時間更新操作が実行されない場合、移行レイテンシが不正確になる可能性があります。タスクに大きなレイテンシが表示される場合は、ソースデータベースで更新操作を実行してレイテンシ情報を更新できます。

  • DTS タスクの実行に失敗した場合、DTS テクニカルサポートは 8 時間以内にタスクの復元を試みます。復元中に、タスクが再起動され、タスクのパラメータが変更される可能性があります。

    説明

    タスクのパラメータのみが変更される可能性があります。データベースのパラメータは変更されません。変更される可能性のあるパラメータには、「インスタンスパラメータを変更する」トピックの DTS インスタンスのパラメータを変更するセクションのパラメータが挙げられますが、これらに限定されません。

料金

移行タイプ

リンク構成料金

データ転送コスト

フルデータ移行

無料。

この例では、データ転送コストは発生しません。宛先データベースの アクセス方法パブリック IP アドレスの場合、データ転送コスト が発生します。

増分データ移行

課金されます。詳細については、「請求の概要」をご参照ください。

移行タイプ

移行タイプ

説明

完全移行

ソース ApsaraDB for MongoDB インスタンスの既存データはすべて、宛先 Kafka インスタンスに移行されます。

説明

DTS は、DATABASE オブジェクトと COLLECTION オブジェクトからのデータ移行をサポートしています。

増分移行

フルデータ移行が完了した後、ソース ApsaraDB for MongoDB インスタンスからの増分データが宛先 Kafka インスタンスに移行されます。

oplog を使用する

増分データ移行は、タスク開始後に作成されたデータベースをサポートしていません。次のタイプの増分更新がサポートされています。

  • CREATE COLLECTION, INDEX

  • DROP DATABASE, COLLECTION, INDEX

  • RENAME COLLECTION

  • コレクション内のドキュメントに対する挿入、更新、削除操作。

changeStream を使用する

次のタイプの増分更新がサポートされています。

  • DROP DATABASE, COLLECTION

  • RENAME COLLECTION

  • コレクション内のドキュメントに対する挿入、更新、削除操作。

データベースアカウントに必要な権限

データベース

完全移行

増分移行

アカウントの作成と承認方法

ソース ApsaraDB for MongoDB

移行対象のデータベースと config データベースに対する読み取り権限。

ソースデータベース、admin データベース、および local データベースに対する読み取り権限。

アカウント管理

手順

  1. 次のいずれかの方法を使用して [データ移行] ページに移動し、データ移行インスタンスが存在するリージョンを選択します。

    DTS コンソール

    1. DTS コンソール にログインします。

    2. 左側のナビゲーションウィンドウで、データの移行 をクリックします。

    3. ページの左上隅で、データ移行インスタンスが存在するリージョンを選択します。

    DMS コンソール

    説明

    実際の操作は、DMS コンソールのモードとレイアウトによって異なる場合があります。詳細については、「シンプルモード」と「DMS コンソールのレイアウトとスタイルをカスタマイズする」をご参照ください。

    1. DMS コンソール にログインします。

    2. 上部のナビゲーションバーで、ポインタを [データ + AI] > DTS (DTS) > データ移行 に合わせます。

    3. [データ移行タスク] の右側にあるドロップダウンリストから、データ同期インスタンスが存在するリージョンを選択します。

  2. タスクの作成 をクリックして、タスク構成ページに移動します。

  3. ソースデータベースと宛先データベースを構成します。次の表にパラメータを示します。

    カテゴリ

    パラメータ

    説明

    N/A

    タスク名

    DTS タスクの名前。DTS はタスク名を自動的に生成します。タスクを簡単に識別できる説明的な名前を指定することをお勧めします。一意のタスク名を指定する必要はありません。

    移行元データベース

    既存の接続情報の選択

    • DTS に登録されているデータベースインスタンスを使用する場合は、ドロップダウンリストからインスタンスを選択します。DTS は、インスタンスの次のデータベースパラメータを自動的に入力します。詳細については、「データベース接続を管理する」をご参照ください。

      説明

      DMS コンソールでは、[DMS データベースインスタンスを選択] ドロップダウンリストからデータベースインスタンスを選択できます。

    • インスタンスを DTS に登録できない場合、または DTS に登録されているインスタンスを使用する必要がない場合は、次のデータベース情報を構成する必要があります。

    データベースタイプ

    MongoDB を選択します。

    アクセス方法

    Alibaba Cloud インスタンス を選択します。

    インスタンスのリージョン

    ソース ApsaraDB for MongoDB インスタンスが存在するリージョンを選択します。

    Alibaba Cloud アカウント間でデータを複製

    この例では、現在の Alibaba Cloud アカウントのデータベースインスタンスが使用されます。× を選択します。

    アーキテクチャ

    この例では、レプリカセット を選択します。

    説明

    ソース ApsaraDB for MongoDB インスタンスが シャードクラスター の場合は、ShardアカウントShardパスワード も指定する必要があります。

    移行方法

    実際の状況に応じて、増分データ移行の方法を選択します。

    • Oplog (推奨):

      ソースデータベースで Oplog が有効になっている場合、このオプションがサポートされます。

      説明

      ローカルの自己管理 MongoDB と ApsaraDB for MongoDB では、デフォルトで Oplog が有効になっています。この方法を使用して増分データを移行する場合、増分移行タスクの遅延は小さくなります (ログのプル速度が速くなります)。そのため、Oplog を選択することをお勧めします。

    • ChangeStream: ソースデータベースで Change Streams (Change Streams) が有効になっている場合、このオプションがサポートされます。

      説明
      • ソースデータベースが Amazon DocumentDB (非エラスティッククラスター) の場合、ChangeStream のみサポートされます。

      • ソースデータベースの アーキテクチャシャードクラスター として選択されている場合、ShardアカウントShardパスワード を入力する必要はありません。

    インスタンス ID

    ソース ApsaraDB for MongoDB インスタンスの ID を選択します。

    認証データベース

    ソース ApsaraDB for MongoDB インスタンスのデータベースアカウントが属するデータベースの名前を入力します。データベースを変更していない場合は、デフォルト値 admin を入力します。

    データベースアカウント

    ソース ApsaraDB for MongoDB インスタンスのデータベースアカウントを入力します。アカウントに必要な権限については、「データベースアカウントに必要な権限」をご参照ください。

    データベースのパスワード

    データベースへのアクセスに使用するパスワード。

    暗号化

    ソースデータベースへの接続を暗号化するかどうかを指定します。ビジネス要件に基づいて、非暗号化SSL 暗号化、または Mongo Atlas SSL を選択できます。暗号化 パラメータで使用可能なオプションは、アクセス方法 パラメータと アーキテクチャ パラメータで選択した値によって決まります。DTS コンソールに表示されるオプションが優先されます。

    説明
    • [アーキテクチャ] パラメータが [シャードクラスター] に設定され、ApsaraDB for MongoDB データベースの [移行方法] パラメータが Oplog に設定されている場合、[暗号化] パラメータ SSL 暗号化 は使用できません。

    • ソースデータベースが レプリカセット アーキテクチャを使用する自己管理 MongoDB データベースで、アクセス方法 パラメータが Alibaba Cloud インスタンス に設定されておらず、[暗号化] パラメータが SSL 暗号化 に設定されている場合、認証局 (CA) 証明書をアップロードしてソースデータベースへの接続を検証できます。

    移行先データベース

    既存の接続情報の選択

    • DTS に登録されているデータベースインスタンスを使用する場合は、ドロップダウンリストからインスタンスを選択します。DTS は、インスタンスの次のデータベースパラメータを自動的に入力します。詳細については、「データベース接続を管理する」をご参照ください。

      説明

      DMS コンソールでは、[DMS データベースインスタンスを選択] ドロップダウンリストからデータベースインスタンスを選択できます。

    • インスタンスを DTS に登録できない場合、または DTS に登録されているインスタンスを使用する必要がない場合は、次のデータベース情報を構成する必要があります。

    データベースタイプ

    Kafka を選択します。

    アクセス方法

    Alibaba Cloud インスタンス を選択します。

    インスタンスのリージョン

    宛先 Kafka インスタンスが存在するリージョンを選択します。

    Kafka インスタンス ID

    宛先 Kafka インスタンスの ID を選択します。

    暗号化

    ビジネス要件とセキュリティ要件に基づいて、非暗号化 または SCRAM-SHA-256 を選択します。

    トピック

    ドロップダウンリストから、データを受信するトピックを選択します。

    DDL 情報を格納するトピック

    ドロップダウンリストから、DDL 情報を格納するトピックを選択します。

    説明

    トピックを選択しない場合、DDL 情報は トピック に選択したトピックに格納されます。

    Kafka スキーマレジストリの使用

    Kafka Schema Registry は、メタデータのサービスレイヤーを提供します。Avro スキーマを格納および取得するための RESTful API 操作を提供します。

    • ×: Kafka Schema Registry を使用しません。

    • : Kafka Schema Registry を使用します。Kafka Schema Registry の Avro スキーマに登録されている URL または IP アドレスを入力する必要があります。

  4. ページの下部にある [接続テストと続行] をクリックします。

    説明
    • DTS サーバーの CIDR ブロックをソースデータベースと宛先データベースのセキュリティ設定に自動または手動で追加して、DTS サーバーからのアクセスを許可できることを確認してください。詳細については、「DTS サーバーの CIDR ブロックを追加する」をご参照ください。

    • ソースデータベースまたは宛先データベースが自己管理データベースで、その アクセス方法Alibaba Cloud インスタンス に設定されていない場合は、DTS サーバーの CIDR ブロック ダイアログボックスの 接続テスト をクリックします。

  5. 移行するオブジェクトを構成します。

    1. オブジェクト設定 ページで、移行するオブジェクトを構成します。

      パラメータ

      説明

      移行タイプ

      • フルデータ移行のみを実行するには、[フルデータ移行] のみを選択します。

      • データ移行中のサービス継続性を確保するには、[フルデータ移行][増分データ移行] を選択します。

      説明

      [増分データ移行] を選択しない場合は、データ移行中にソースデータベースにデータを書き込まないことをお勧めします。これにより、ソースデータベースと宛先データベース間でデータの整合性が確保されます。

      競合するテーブルの処理モード

      • エラーの事前チェックと報告: 宛先データベースにソースデータベースのコレクションと同じ名前のコレクションが含まれているかどうかを確認します。ソースデータベースと宛先データベースに同じ名前のコレクションが含まれていない場合、事前チェックは合格です。そうでない場合、事前チェック中にエラーが返され、データ移行タスクを開始できません。

        説明

        ソースデータベースと宛先データベースに同じ名前のコレクションが含まれており、宛先データベースのコレクションを削除または名前変更できない場合は、オブジェクト名マッピング機能を使用して、宛先データベースに移行されるコレクションの名前を変更できます。詳細については、「オブジェクト名をマッピングする」をご参照ください。

      • エラーを無視して続行: ソースデータベースと宛先データベースの同じコレクション名の事前チェックをスキップします。

        警告

        エラーを無視して続行 を選択すると、データの整合性が保証されず、ビジネスが潜在的なリスクにさらされる可能性があります。

        • DTS は、宛先データベースのデータレコードと同じプライマリキーを持つデータレコードを移行しません。

        • データの初期化に失敗したり、特定の列のみが移行されたり、データ移行タスクが失敗したりする可能性があります。

      Kafka のデータ形式

      Canal JSON のみサポートされます。

      説明

      Kafka が受信したデータは、3 つのシナリオ に分類できます。

      Kafka 圧縮形式

      要件に基づいて Kafka メッセージ圧縮形式を選択します。

      • [LZ4] (デフォルト): 低い圧縮率と高い圧縮速度を提供します。

      • [GZIP]: 高い圧縮率と遅い圧縮速度を提供します。

        説明

        より多くの CPU リソースを使用します。

      • [Snappy]: 圧縮率と速度のバランスを提供します。

      Kafka パーティションへのデータ転送ポリシー

      ビジネス要件に基づいて 戦略 を選択します。

      メッセージ肯定応答メカニズム

      ビジネス要件に基づいて メッセージ確認応答メカニズム を選択します。

      移行先インスタンスでのオブジェクト名の大文字化

      宛先インスタンスのデータベース名とコレクション名の大文字と小文字。デフォルトでは、[DTS デフォルトポリシー] が選択されています。別のオプションを選択して、オブジェクト名の大文字と小文字がソースデータベースまたは宛先データベースと同じになるようにすることができます。詳細については、「宛先インスタンスのオブジェクト名の大文字と小文字を指定する」をご参照ください。

      ソースオブジェクト

      ソースオブジェクト セクションから 1 つ以上のオブジェクトを選択します。向右小箭头 アイコンをクリックして、選択中のオブジェクト セクションにオブジェクトを追加します。

      説明

      コレクションレベルでオブジェクトを選択できます。

      選択中のオブジェクト

      この例では、追加の構成は必要ありません。

      マッピング機能を使用して、宛先 Kafka インスタンスのソースデータベースのコレクションに マッピング情報 を設定できます。

    2. 次へ:詳細設定 をクリックして、詳細設定を構成します。

      パラメータ

      説明

      タスクのスケジュールに使用する専用クラスターの選択

      デフォルトでは、専用クラスターを指定しない場合、DTS はデータ移行タスクを共有クラスターにスケジュールします。データ移行タスクの安定性を向上させるには、専用クラスターを購入します。詳細については、「DTS 専用クラスターとは」をご参照ください。

      失敗した接続の再試行時間

      接続失敗時のリトライ時間の範囲。データ移行タスクの開始後にソースデータベースまたは宛先データベースに接続できない場合、DTS はリトライ時間の範囲内で直ちに接続を再試行します。有効値: 10 ~ 1,440。単位: 分。デフォルト値: 720。このパラメータには 30 より大きい値を設定することをお勧めします。指定されたリトライ時間の範囲内で DTS がソースデータベースと宛先データベースに再接続された場合、DTS はデータ移行タスクを再開します。そうでない場合、データ移行タスクは失敗します。

      説明
      • 同じソースデータベースまたは宛先データベースを共有する複数のデータ移行タスクに異なるリトライ時間の範囲を指定した場合、後で指定した値が優先されます。

      • DTS が接続を再試行すると、DTS インスタンスの料金が発生します。ビジネス要件に基づいてリトライ時間の範囲を指定することをお勧めします。また、ソースデータベースと宛先インスタンスが解放された後、できるだけ早く DTS インスタンスを解放することもできます。

      移行元データベースと移行先データベースで他の問題が発生した場合の、再試行までの待機時間です。

      その他の問題のリトライ時間の範囲。たとえば、データ移行タスクの開始後に DDL 操作または DML 操作の実行に失敗した場合、DTS はリトライ時間の範囲内で直ちに操作を再試行します。有効値: 1 ~ 1440。単位: 分。デフォルト値: 10。このパラメータには 10 より大きい値を設定することをお勧めします。指定されたリトライ時間の範囲内で失敗した操作が正常に実行された場合、DTS はデータ移行タスクを再開します。そうでない場合、データ移行タスクは失敗します。

      重要

      移行元データベースと移行先データベースで他の問題が発生した場合の、再試行までの待機時間です。 パラメータの値は、失敗した接続の再試行時間 パラメータの値よりも小さくなければなりません。

      更新操作後にドキュメント全体を取得するかどうか

      増分データ移行中に、更新操作後のドキュメントの完全なデータを宛先データベースに移行するかどうかを指定します。

      説明

      このパラメータは、移行方法ChangeStream に設定され、移行タイプ増分データ移行 が含まれている場合にのみ使用できます。

      • : 更新操作後のドキュメントの完全なデータを移行します。

      • ×: 更新されたフィールドのみを移行します。

      完全移行率を制限するかどうか

      フルデータ移行のスロットリングを有効にするかどうかを指定します。フルデータ移行中は、DTS はソースデータベースと宛先データベースの読み取りリソースと書き込みリソースを使用します。これにより、データベースサーバーの負荷が増加する可能性があります。ビジネス要件に基づいて、フルデータ移行のスロットリングを有効にすることができます。スロットリングを構成するには、1 秒あたりのソースデータベースのクエリ率 QPS1 秒あたりの完全移行の行数 RPS、および 1 秒あたりの完全移行データ量 (MB) BPS パラメータを構成する必要があります。これにより、宛先データベースサーバーの負荷が軽減されます。

      説明

      移行タイプ パラメータで 完全データ移行 を選択した場合にのみ、このパラメータを構成できます。

      同じテーブル内のプライマリキー_id のデータ型が一意かどうか

      移行するデータの同じコレクション内で、プライマリキー _id のデータ型が一意かどうか。

      説明

      これは、移行タイプ完全データ移行 に設定されている場合にのみ構成できます。

      • : 一意。フル移行フェーズでは、DTS はソースデータベースから移行されるデータのプライマリキーのデータ型をスキャンしません。

      • ×: 一意ではない。フル移行フェーズでは、DTS はソースデータベースから移行されるデータのプライマリキーのデータ型をスキャンします。

      増分移行率を制限するかどうか

      増分データ移行のスロットリングを有効にするかどうかを指定します。スロットリングを構成するには、1 秒あたりの増分移行の行数 RPS1 秒あたりの増分移行データ量 (MB) BPS パラメータを構成する必要があります。これにより、宛先データベースサーバーの負荷が軽減されます。

      説明

      移行タイプ パラメータで 増分データ移行 を選択した場合にのみ、このパラメータを構成できます。

      環境タグ

      DTS インスタンスを識別するために使用される環境タグ。ビジネス要件に基づいて環境タグを選択できます。この例では、環境タグは選択されていません。

      ETL の設定

      抽出、変換、書き出し (ETL) 機能を有効にするかどうかを指定します。詳細については、「ETL とは」をご参照ください。有効値:

      監視アラート

      データ移行タスクのアラートを構成するかどうかを指定します。タスクが失敗した場合、または移行レイテンシが指定されたしきい値を超えた場合、アラート連絡先に通知が送信されます。有効値:

      • [いいえ]: アラートを構成しません。

      • [はい]: アラートを構成します。この場合、アラートしきい値と アラート通知設定 も構成する必要があります。詳細については、「監視とアラート」トピックの DTS タスクを作成するときに監視とアラートを構成する セクションをご参照ください。

  6. タスク設定を保存し、事前チェックを実行します。

    • DTS タスクを構成するために関連 API 操作を呼び出すときに指定するパラメータを表示するには、次:タスク設定の保存と事前チェック にポインタを合わせ、OpenAPI パラメーターのプレビュー をクリックします。

    • パラメータを表示する必要がない場合、またはすでに表示している場合は、ページの下部にある 次:タスク設定の保存と事前チェック をクリックします。

    説明
    • データ移行タスクを開始する前に、DTS は事前チェックを実行します。タスクが事前チェックに合格した後でのみ、データ移行タスクを開始できます。

    • タスクが事前チェックに合格しない場合は、失敗した各項目の横にある [詳細の表示] をクリックします。チェック結果に基づいて原因を分析した後、問題をトラブルシューティングします。その後、事前チェックを再実行します。

    • 事前チェック中に項目のアラートがトリガーされた場合:

      • アラート項目を無視できない場合は、失敗した項目の横にある [詳細の表示] をクリックして、問題をトラブルシューティングします。その後、事前チェックを再実行します。

      • アラート項目を無視できる場合は、[アラート詳細の確認] をクリックします。[詳細の表示] ダイアログボックスで、[無視] をクリックします。表示されるメッセージで、[OK] をクリックします。次に、[再チェック] をクリックして、事前チェックを再実行します。アラート項目を無視すると、データの不整合が発生し、ビジネスが潜在的なリスクにさらされる可能性があります。

  7. インスタンスを購入します。

    1. [成功率][100%] になるまで待ちます。次に、[次へ: インスタンスの購入] をクリックします。

    2. [インスタンスの購入] ページで、データ移行インスタンスの [インスタンスクラス] パラメータを構成します。次の表にパラメータを示します。

      セクション

      パラメータ

      説明

      新しいインスタンスクラス

      [リソースグループ]

      データ移行インスタンスが属するリソースグループ。デフォルト値: [デフォルトリソースグループ]。詳細については、「リソース管理とは」をご参照ください。

      インスタンスクラス

      DTS は、移行速度が異なるインスタンスクラスを提供します。ビジネスシナリオに基づいてインスタンスクラスを選択できます。詳細については、「データ移行インスタンスのインスタンスクラス」をご参照ください。

    3. チェックボックスをオンにして、[Data Transmission Service (従量課金制) サービス規約] を読んで同意します。

    4. [購入して開始] をクリックします。表示されるメッセージで、[OK]をクリックします。

      [データ移行] ページでタスクの進捗状況を確認できます。

      説明
      • データ移行タスクを増分データの移行に使用できない場合、タスクは自動的に停止します。[完了][ステータス] セクションに表示されます。

      • データ移行タスクを増分データの移行に使用できる場合、タスクは自動的に停止しません。増分データ移行タスクは停止または完了しません。[実行中][ステータス] セクションに表示されます。

マッピング情報

  1. 選択中のオブジェクト セクションで、宛先トピック名 (コレクションレベル) にポインタを合わせます。

  2. 宛先トピック名の後に表示される 編集 をクリックします。

  3. 表示される テーブルの編集 ダイアログボックスで、マッピング情報を構成します。

    パラメータ

    説明

    テーブル名

    ソースコレクションの移行先の宛先トピックの名前。デフォルトでは、移行先データベース セクションの ソースデータベースとターゲットデータベースの設定 ステップで トピック に選択したトピックが使用されます。

    重要
    • 入力するトピック名は、宛先 Kafka インスタンスに存在する必要があります。そうでない場合、データ移行タスクは失敗します。

    • テーブル名 を変更すると、データは指定したトピックに書き込まれます。

    フィルタリング条件

    詳細については、「フィルター条件を構成する」をご参照ください。

    パーティション数

    宛先トピックにデータが書き込まれるときのパーティション数。

  4. [OK] をクリックします。

データ配信シナリオ

シナリオ 1: oplog を使用して増分データを移行する

メインインスタンス構成

移行方法Oplog に設定され、移行タイプ には 増分データ移行 が含まれます。

データ配信例

ソースデータベースの増分変更のタイプ

ソースデータベースの増分変更ステートメント

宛先トピックが受信したデータ

insert

db.kafka_test.insert({"cid":"a","person":{"name":"testName","age":NumberInt(18),"skills":["database","ai"]}})

データの表示 (クリックして展開)

{
	"data": [{
		"person": {
			"skills": ["database", "ai"],
			"name": "testName",
			"age": 18
		},
		"_id": {
			"$oid": "67d27da49591697476e1****"
		},
		"cid": "a"
	}],
	"database": "kafkadb", // ソースデータベース名
	"es": 1741847972000, // イベントのタイムスタンプ (ミリ秒)
	"gtid": null,
	"id": 174184797200000****, // イベントのシーケンス番号
	"isDdl": false, // DDL 操作かどうか
	"mysqlType": null,
	"old": null, // 更新前のデータ (UPDATE 操作の場合)
	"pkNames": ["_id"], // プライマリキーの名前
	"serverId": null,
	"sql": null, // SQL ステートメント (MySQL の場合)
	"sqlType": null,
	"table": "kafka_test", // コレクション名
	"ts": 1741847973438, // DTS がイベントを受信したタイムスタンプ (ミリ秒)
	"type": "INSERT" // 操作タイプ
}

update $set

db.kafka_test.update({"cid":"a"},{$set:{"person.age":NumberInt(20)}})

データの表示 (クリックして展開)

{
	"data": [{
		"$set": {
			"person.age": 20
		}
	}],
	"database": "kafkadb",
	"es": 1741848051000,
	"gtid": null,
	"id": 174184805100000****,
	"isDdl": false,
	"mysqlType": null,
	"old": [{
		"_id": {
			"$oid": "67d27da49591697476e1****"
		}
	}],
	"pkNames": ["_id"],
	"serverId": null,
	"sql": null,
	"sqlType": null,
	"table": "kafka_test",
	"ts": 1741848051984,
	"type": "UPDATE"
}

update $set new filed

db.kafka_test.update({"cid":"a"},{$set:{"salary":100}})

データの表示 (クリックして展開)

{
	"data": [{
		"$set": {
			"salary": 100.0
		}
	}],
	"database": "kafkadb",
	"es": 1741848146000,
	"gtid": null,
	"id": 174184814600000****,
	"isDdl": false,
	"mysqlType": null,
	"old": [{
		"_id": {
			"$oid": "67d27da49591697476e1****"
		}
	}],
	"pkNames": ["_id"],
	"serverId": null,
	"sql": null,
	"sqlType": null,
	"table": "kafka_test",
	"ts": 1741848147734,
	"type": "UPDATE"
}

update $unset remove field

db.kafka_test.update({"cid":"a"},{$unset:{"salary":1}})

データの表示 (クリックして展開)

{
	"data": [{
		"$unset": {
			"salary": true
		}
	}],
	"database": "kafkadb",
	"es": 1741848207000,
	"gtid": null,
	"id": 174184820700000****,
	"isDdl": false,
	"mysqlType": null,
	"old": [{
		"_id": {
			"$oid": "67d27da49591697476e1****"
		}
	}],
	"pkNames": ["_id"],
	"serverId": null,
	"sql": null,
	"sqlType": null,
	"table": "kafka_test",
	"ts": 1741848208186,
	"type": "UPDATE"
}

delete

db.kafka_test.deleteOne({"cid":"a"})

データの表示 (クリックして展開)

{
	"data": [{
		"_id": {
			"$oid": "67d27da49591697476e1****"
		}
	}],
	"database": "kafkadb",
	"es": 1741848289000,
	"gtid": null,
	"id": 174184828900000****,
	"isDdl": false,
	"mysqlType": null,
	"old": null,
	"pkNames": ["_id"],
	"serverId": null,
	"sql": null,
	"sqlType": null,
	"table": "kafka_test",
	"ts": 1741848289798,
	"type": "DELETE"
}

ddl drop

db.kafka_test.drop()

データの表示 (クリックして展開)

{
	"data": null, // DDL 操作のデータは null
	"database": "kafkadb",
	"es": 1741847893000,
	"gtid": null,
	"id": 1741847893000000005,
	"isDdl": true,
	"mysqlType": null,
	"old": null,
	"pkNames": null, // DDL 操作のプライマリキー名は null
	"serverId": null,
	"sql": { // DDL 操作の SQL ステートメント
		"drop": "kafka_test"
	},
	"sqlType": null,
	"table": "kafka_test",
	"ts": 1741847893760,
	"type": "DDL"
}

シナリオ 2: ChangeStream を使用して増分データを移行する (更新されたフィールドのみを移行する)

メインインスタンス構成

移行方法ChangeStream に設定され、移行タイプ には 増分データ移行 が含まれ、[更新後に完全なドキュメントを取得する]× に設定されています。

データ配信例

ソースデータベースの増分変更のタイプ

ソースデータベースの増分変更ステートメント

宛先トピックが受信したデータ

insert

db.kafka_test.insert({"cid":"a","person":{"name":"testName","age":NumberInt(18),"skills":["database","ai"]}})

データの表示 (クリックして展開)

{
	"data": [{
		"person": {
			"skills": ["database", "ai"],
			"name": "testName",
			"age": 18
		},
		"_id": {
			"$oid": "67d27da49591697476e1****"
		},
		"cid": "a"
	}],
	"database": "kafkadb",
	"es": 1741847972000,
	"gtid": null,
	"id": 174184797200000****,
	"isDdl": false,
	"mysqlType": null,
	"old": null,
	"pkNames": ["_id"],
	"serverId": null,
	"sql": null,
	"sqlType": null,
	"table": "kafka_test",
	"ts": 1741847973803,
	"type": "INSERT"
}

update $set

db.kafka_test.update({"cid":"a"},{$set:{"person.age":NumberInt(20)}})

データの表示 (クリックして展開)

{
	"data": [{
		"$set": {
			"person.age": 20
		}
	}],
	"database": "kafkadb",
	"es": 1741848051000,
	"gtid": null,
	"id": 174184805100000****,
	"isDdl": false,
	"mysqlType": null,
	"old": [{
		"_id": {
			"$oid": "67d27da49591697476e1****"
		}
	}],
	"pkNames": ["_id"],
	"serverId": null,
	"sql": null,
	"sqlType": null,
	"table": "kafka_test",
	"ts": 1741848052912,
	"type": "UPDATE"
}

update $set new filed

db.kafka_test.update({"cid":"a"},{$set:{"salary":100}})

データの表示 (クリックして展開)

{
	"data": [{
		"$set": {
			"salary": 100.0
		}
	}],
	"database": "kafkadb",
	"es": 1741848146000,
	"gtid": null,
	"id": 174184814600000****,
	"isDdl": false,
	"mysqlType": null,
	"old": [{
		"_id": {
			"$oid": "67d27da49591697476e1****"
		}
	}],
	"pkNames": ["_id"],
	"serverId": null,
	"sql": null,
	"sqlType": null,
	"table": "kafka_test",
	"ts": 1741848148056,
	"type": "UPDATE"
}

update $unset remove field

db.kafka_test.update({"cid":"a"},{$unset:{"salary":1}})

データの表示 (クリックして展開)

{
	"data": [{
		"$unset": {
			"salary": 1
		}
	}],
	"database": "kafkadb",
	"es": 1741848207000,
	"gtid": null,
	"id": 174184820700000****,
	"isDdl": false,
	"mysqlType": null,
	"old": [{
		"_id": {
			"$oid": "67d27da49591697476e1****"
		}
	}],
	"pkNames": ["_id"],
	"serverId": null,
	"sql": null,
	"sqlType": null,
	"table": "kafka_test",
	"ts": 1741848209142,
	"type": "UPDATE"
}

delete

db.kafka_test.deleteOne({"cid":"a"})

データの表示 (クリックして展開)

{
	"data": [{
		"_id": {
			"$oid": "67d27da49591697476e1****"
		}
	}],
	"database": "kafkadb",
	"es": 1741848289000,
	"gtid": null,
	"id": 174184828900000****,
	"isDdl": false,
	"mysqlType": null,
	"old": null,
	"pkNames": ["_id"],
	"serverId": null,
	"sql": null,
	"sqlType": null,
	"table": "kafka_test",
	"ts": 1741848290254,
	"type": "DELETE"
}

ddl drop

db.kafka_test.drop()

データの表示 (クリックして展開)

{
	"data": null,
	"database": "kafkadb",
	"es": 1741847893000,
	"gtid": null,
	"id": 174184789300000****,
	"isDdl": true,
	"mysqlType": null,
	"old": null,
	"pkNames": null,
	"serverId": null,
	"sql": {
		"drop": "kafka_test"
	},
	"sqlType": null,
	"table": "kafka_test",
	"ts": 1741847894679,
	"type": "DDL"
}

シナリオ 3: ChangeStream を使用して増分データを移行する (更新操作後にドキュメントの完全なデータを移行する)

メインインスタンス構成

移行方法ChangeStream に設定され、移行タイプ には 増分データ移行 が含まれ、[更新後に完全なドキュメントを取得する] に設定されています。

データ配信例

ソースデータベースの増分変更のタイプ

ソースデータベースの増分変更ステートメント

宛先トピックが受信したデータ

insert

db.kafka_test.insert({"cid":"a","person":{"name":"testName","age":NumberInt(18),"skills":["database","ai"]}})

データの表示 (クリックして展開)

{
	"data": [{
		"person": {
			"skills": ["database", "ai"],
			"name": "testName",
			"age": 18
		},
		"_id": {
			"$oid": "67d27da49591697476e1****"
		},
		"cid": "a"
	}],
	"database": "kafkadb",
	"es": 1741847972000,
	"gtid": null,
	"id": 174184797200000****,
	"isDdl": false,
	"mysqlType": null,
	"old": null,
	"pkNames": ["_id"],
	"serverId": null,
	"sql": null,
	"sqlType": null,
	"table": "kafka_test",
	"ts": 1741847973128,
	"type": "INSERT"
}

update $set

db.kafka_test.update({"cid":"a"},{$set:{"person.age":NumberInt(20)}})

データの表示 (クリックして展開)

{
	"data": [{
		"person": {
			"skills": ["database", "ai"],
			"name": "testName",
			"age": 20 // 更新後の値
		},
		"_id": {
			"$oid": "67d27da49591697476e1****"
		},
		"cid": "a"
	}],
	"database": "kafkadb",
	"es": 1741848051000,
	"gtid": null,
	"id": 174184805100000****,
	"isDdl": false,
	"mysqlType": null,
	"old": [{ // 更新前のプライマリキーの値
		"_id": {
			"$oid": "67d27da49591697476e1****"
		}
	}],
	"pkNames": ["_id"],
	"serverId": null,
	"sqlnull,
	"sqlType": null,
	"table": "kafka_test",
	"ts": 1741848052219,
	"type": "UPDATE"
}

update $set new filed

db.kafka_test.update({"cid":"a"},{$set:{"salary":100}})

データの表示 (クリックして展開)

{
	"data": [{
		"person": {
			"skills": ["database", "ai"],
			"name": "testName",
			"age": 20
		},
		"_id": {
			"$oid": "67d27da49591697476e1****"
		},
		"salary": 100.0, // 追加されたフィールド
		"cid": "a"
	}],
	"database": "kafkadb",
	"es": 1741848146000,
	"gtid": null,
	"id": 174184814600000****,
	"isDdl": false,
	"mysqlType": null,
	"old": [{
		"_id": {
			"$oid": "67d27da49591697476e1****"
		}
	}],
	"pkNames": ["_id"],
	"serverId": null,
	"sql": null,
	"sqlType": null,
	"table": "kafka_test",
	"ts": 1741848147327,
	"type": "UPDATE"
}

update $unset remove field

db.kafka_test.update({"cid":"a"},{$unset:{"salary":1}})

データの表示 (クリックして展開)

{
	"data": [{
		"person": {
			"skills": ["database", "ai"],
			"name": "testName",
			"age": 20
		},
		"_id": {
			"$oid": "67d27da49591697476e1****"
		},
		"cid": "a"
	}],
	"database": "kafkadb",
	"es": 1741848207000,
	"gtid": null,
	"id": 174184820700000****,
	"isDdl": false,
	"mysqlType": null,
	"old": [{
		"_id": {
			"$oid": "67d27da49591697476e1****"
		}
	}],
	"pkNames": ["_id"],
	"serverId": null,
	"sql": null,
	"sqlType": null,
	"table": "kafka_test",
	"ts": 1741848208401,
	"type": "UPDATE"
}

delete

db.kafka_test.deleteOne({"cid":"a"})

データの表示 (クリックして展開)

{
	"data": [{
		"_id": {
			"$oid": "67d27da49591697476e1****"
		}
	}],
	"database": "kafkadb",
	"es": 1741848289000,
	"gtid": null,
	"id": 174184828900000****,
	"isDdl": false,
	"mysqlType": null,
	"old": null,
	"pkNames": ["_id"],
	"serverId": null,
	"sql": null,
	"sqlType": null,
	"table": "kafka_test",
	"ts": 1741848290499,
	"type": "DELETE"
}

ddl drop

db.kafka_test.drop()

データの表示 (クリックして展開)

{
	"data": null,
	"database": "kafkadb",
	"es": 1741847893000,
	"gtid": null,
	"id": 174184789300000****,
	"isDdl": true,
	"mysqlType": null,
	"old": null,
	"pkNames": null,
	"serverId": null,
	"sql": {
		"drop": "kafka_test"
	},
	"sqlType": null,
	"table": "kafka_test",
	"ts": 1741847894045,
	"type": "DDL"
}

特別なケース

注記

更新イベントの fullDocument フィールドが 欠落している 場合、データ配信結果は oplog を使用して増分データを移行する 場合と同じです。

ソースデータベースの基本データ

ソースデータベースの増分変更ステートメント

宛先トピックが受信したデータ

use admin // admin データベースを使用する
db.runCommand({ enablesharding:"dts_test" }) // dts_test データベースをシャーディング用に有効にする
use dts_test // dts_test データベースを使用する
sh.shardCollection("dts_test.cstest",{"name":"hashed"}) // cstest コレクションをシャーディングする
db.cstest.insert({"_id":1,"name":"a"}) // データを挿入する
db.cstest.updateOne({"_id":1,"name":"a"},{$set:{"name":"b"}}) // データを更新する

データの表示 (クリックして展開)

{
	"data": [{
		"$set": {
			"name": "b" // 更新後の値のみ
		}
	}],
	"database": "dts_test",
	"es": 1740720994000,
	"gtid": null,
	"id": 174072099400000****,
	"isDdl": false,
	"mysqlType": null,
	"old": [{ // 更新前のプライマリキーの値と更新されたフィールドの値
		"name": "a",
		"_id": 1.0
	}],
	"pkNames": ["_id"],
	"serverId": null,
	"sql": null,
	"sqlType": null,
	"table": "cstest",
	"ts": 1740721007099,
	"type": "UPDATE"
}