すべてのプロダクト
Search
ドキュメントセンター

Data Transmission Service:ApsaraDB for MongoDB から Message Queue for Apache Kafka へのデータ同期

最終更新日:Nov 09, 2025

Data Transmission Service (DTS) は、MongoDB から Kafka クラスターへのデータ同期をサポートしています。このトピックでは、ソースデータベースとしてレプリカセットアーキテクチャを使用する MongoDB インスタンスと、ターゲットデータベースとして Message Queue for Apache Kafka インスタンスを使用して同期操作を実行する手順について説明します。

前提条件

  • 宛先の Message Queue for Apache Kafka インスタンスを作成済みであること。

    説明

    サポートされているソースデータベースとターゲットデータベースのバージョンについては、「同期ソリューションの概要」をご参照ください。

  • 宛先の Message Queue for Apache Kafka インスタンスでデータを受信するための トピックを作成済みであること。

  • ソースデータベースが ApsaraDB for MongoDB シャードクラスターの場合、すべてのシャードノードのエンドポイントを申請する必要があります。シャードクラスターインスタンス内のシャードノードは、同じアカウントパスワードとエンドポイントを共有する必要があります。エンドポイントの申請方法の詳細については、「シャードのエンドポイントを申請する」をご参照ください。

考慮事項

タイプ

説明

ソースデータベースの制限

  • 帯域幅の要件: ソースデータベースがデプロイされているサーバーには、十分なアウトバウンド帯域幅が必要です。そうでない場合、データ同期速度が影響を受けます。

  • ターゲットデータベースのコレクションを変更する場合 (コレクションの名前マッピングの設定など)、1 つのデータ同期タスクで最大 1,000 個のコレクションを同期できます。1,000 個を超えるコレクションを同期するタスクを実行すると、リクエストエラーが発生します。この場合、複数のタスクを設定してコレクションを同期するか、データベース全体を同期するタスクを設定することをお勧めします。

  • ソースデータベースが ApsaraDB for MongoDB シャードクラスターインスタンスの場合、同期するコレクションの _id フィールドは一意である必要があります。そうでない場合、データの不整合が発生する可能性があります。

  • ソースデータベースが ApsaraDB for MongoDB シャードクラスターインスタンスの場合、インスタンス内の Mongos ノードの数は 10 を超えることはできません。また、ソースの ApsaraDB for MongoDB シャードクラスターインスタンスに孤立したドキュメントが含まれていないことを確認する必要があります。そうでない場合、データの不整合が発生し、タスクが失敗する可能性があります。詳細については、「MongoDB ドキュメント」および FAQ トピックの「シャードクラスターアーキテクチャでデプロイされた MongoDB データベースの孤立したドキュメントを削除するにはどうすればよいですか?」セクションをご参照ください。

  • スタンドアロンの ApsaraDB for MongoDB インスタンス、Azure Cosmos DB for MongoDB クラスター、または Amazon DocumentDB エラスティッククラスターをソースデータベースとして使用することはできません。

  • ソースデータベースでは Oplog が有効になっており、Oplog は少なくとも 7 日間保持されている必要があります。または、変更ストリームを有効にして、Data Transmission Service (DTS) が変更ストリームを介して過去 7 日以内にソースデータベースからデータの変更をサブスクライブできるようにする必要があります。そうでない場合、ソースデータベースからデータの変更を取得できないため、タスクが失敗する可能性があります。極端な場合、これによりデータの不整合やデータ損失が発生する可能性があります。これが原因で発生した問題は、DTS Service-level agreement (SLA) の対象外です。

    重要
    • Oplog を使用してソースデータベースからデータの変更を取得します。

    • MongoDB 4.0 以降のみが、変更ストリームを介したデータ変更の取得をサポートしています。変更ストリームを使用してデータ変更を取得する場合、双方向同期はサポートされていません。

    • ソースデータベースが Amazon DocumentDB (非エラスティッククラスター) の場合、手動で変更ストリームを有効にし、タスク設定中に 移行方法ChangeStream に、アーキテクチャシャードクラスター に設定する必要があります。

  • ソースデータベースで実行される操作の制限:

    • 完全データ同期中、データベースまたはコレクションのスキーマ、または ARRAY 型のデータを変更しないでください。そうでない場合、データ同期タスクが失敗するか、ソースデータベースとターゲットデータベース間でデータの不整合が発生します。

    • ソース MongoDB インスタンスがシャードクラスターアーキテクチャを使用している場合、同期タスク中に同期対象オブジェクトのデータ分布を変更するコマンド (shardCollection、reshardCollection、unshardCollection、moveCollection、movePrimary など) を実行しないでください。そうでない場合、データの不整合が発生する可能性があります。

  • ソースデータベースがシャードクラスターアーキテクチャを使用する MongoDB インスタンスであり、ソースデータベースのバランサーがデータを分散する場合、インスタンスで遅延が発生する可能性があります。

  • SRV アドレスを使用して MongoDB データベースに接続することはサポートされていません。

その他の制限

  • コレクションレベルの同期のみがサポートされています。

  • admin、config、および local データベースからのデータ同期はサポートされていません。

  • 同期する単一のデータが 10 MB を超える場合、タスクは失敗します。

  • ソースデータベースがシャードクラスター MongoDB インスタンスの場合:

    • 完全データ同期中、各サブタスクが増分同期フェーズに入るまで、ソース MongoDB データベースのバランサーを無効にする必要があります。そうでない場合、データの不整合が発生する可能性があります。バランサーの操作の詳細については、「MongoDB バランサーの管理」をご参照ください。

    • 増分データ同期方法が Oplog の場合、DTS はソースデータベースの異なるシャードからターゲット Kafka へのデータの書き込み順序を保証できません。

  • トランザクション情報は保持されません。トランザクションがターゲットデータベースに同期されると、トランザクションは単一のレコードに変換されます。

  • DTS タスク中に宛先 Kafka インスタンスでブローカーノードが増減した場合、DTS タスクを再起動する必要があります。

  • DTS がソースインスタンスと宛先インスタンスに接続できることを確認してください。たとえば、データベースインスタンスのセキュリティ設定、および自己管理 Kafka インスタンスの server.properties ファイル内の listeners および advertised.listeners パラメーターが DTS からのアクセスを制限していないことを確認してください。

  • 完全データ同期タスク中、DTS はソースデータベースとターゲットデータベースの読み取りおよび書き込みリソースを使用します。これにより、データベースサーバーの負荷が増加する可能性があります。したがって、DTS インスタンスを開始する前にソースデータベースとターゲットデータベースのパフォーマンスを評価し、オフピーク時にデータを同期することをお勧めします。たとえば、ソースデータベースとターゲットデータベースの CPU 負荷が 30% 未満の場合などです。

  • DTS は、7 日未満実行されている失敗したインスタンスを再開しようとします。ビジネスを宛先インスタンスに切り替える前に、同期インスタンスを終了またはリリースしてください。これにより、インスタンスが自動的に再開され、宛先データベースのデータが上書きされるのを防ぎます。

  • DTS は、ターゲットデータベースの最後に同期されたデータのタイムスタンプとソースデータベースの現在のタイムスタンプに基づいて、増分データ同期の遅延を計算します。ソースデータベースで長期間更新操作が実行されない場合、同期遅延が不正確になる可能性があります。データ同期タスクの遅延が過度に高い場合は、ソースデータベースで更新操作を実行して遅延を更新できます。

  • インスタンスが失敗した場合、DTS ヘルプデスクは 8 時間以内にインスタンスの回復を試みます。回復プロセス中、インスタンスの再起動やパラメーターの調整などの操作が実行される場合があります。

    説明

    パラメーターが調整されるとき、DTS インスタンスのパラメーターのみが変更されます。データベースのパラメーターは変更されません。変更される可能性のあるパラメーターには、「インスタンスパラメーターの変更」で説明されているものが含まれますが、これらに限定されません。

課金

同期タイプ

タスク設定料金

完全データ同期

無料です。

増分データ同期

課金されます。詳細については、「課金の概要」をご参照ください。

同期タイプ

同期タイプ

説明

完全同期

ソースの ApsaraDB for MongoDB の同期オブジェクトのすべての既存データをターゲットの Kafka インスタンスに同期します。

説明

DATABASE と COLLECTION の完全同期がサポートされています。

増分同期

完全同期に加えて、ソースの ApsaraDB for MongoDB からターゲットの Kafka インスタンスに増分更新を同期できます。

Oplog の使用

増分同期は、タスク開始後に作成されたデータベースをサポートしません。次の増分更新がサポートされています:

  • CREATE COLLECTION, INDEX

  • DROP DATABASE, COLLECTION, INDEX

  • RENAME COLLECTION

  • コレクション内のドキュメントを挿入、更新、削除する操作。

ChangeStream の使用

次の増分更新がサポートされています:

  • DROP DATABASE, COLLECTION

  • RENAME COLLECTION

  • コレクション内のドキュメントを挿入、更新、削除する操作。

データベースアカウントの権限

データベース

必要な権限

アカウントの作成と権限付与

ソース ApsaraDB for MongoDB

同期するデータベース、admin データベース、および local データベースに対する読み取り権限。

アカウント管理

手順

  1. 次のいずれかの方法でデータ同期ページに移動し、データ同期インスタンスが存在するリージョンを選択します。

    DTS コンソール

    1. DTS コンソールにログインします。

    2. 左側のナビゲーションウィンドウで、データ同期 をクリックします。

    3. ページの左上隅で、データ同期タスクが存在するリージョンを選択します。

    DMS コンソール

    説明

    実際の操作は、DMS コンソールのモードとレイアウトによって異なる場合があります。詳細については、「シンプルモード」および「DMS コンソールのレイアウトとスタイルをカスタマイズする」をご参照ください。

    1. DMS コンソールにログインします。

    2. 上部のナビゲーションバーで、[Data + AI] にポインターを合わせ、[DTS (DTS)] > [データ同期] を選択します。

    3. データ同期タスク の右側にあるドロップダウンリストから、データ同期インスタンスが存在するリージョンを選択します。

  2. タスクの作成 をクリックして、タスク設定ページに移動します。

  3. ソースデータベースとターゲットデータベースを設定します。次の表にパラメーターを示します。

    カテゴリ

    設定

    説明

    なし

    タスク名

    DTS タスクの名前。DTS は自動的にタスク名を生成します。タスクを簡単に識別できるわかりやすい名前を指定することをお勧めします。一意のタスク名を指定する必要はありません。

    移行元データベース

    既存の接続情報の選択

    • DTS に登録されているデータベースインスタンスを使用する場合は、ドロップダウンリストからインスタンスを選択します。DTS は、インスタンスの次のデータベースパラメーターを自動的に入力します。詳細については、「データベース接続の管理」をご参照ください。

      説明

      DMS コンソールでは、[DMS データベースインスタンスを選択] ドロップダウンリストからデータベースインスタンスを選択できます。

    • DTS へのインスタンスの登録に失敗した場合、または DTS に登録されているインスタンスを使用する必要がない場合は、次のデータベース情報を設定する必要があります。

    データベースタイプ

    MongoDB を選択します。

    アクセス方法

    Alibaba Cloud インスタンスを選択します。

    インスタンスのリージョン

    ソース ApsaraDB for MongoDB インスタンスのリージョンを選択します。

    Alibaba Cloud アカウント間でデータを複製

    この例では、現在の Alibaba Cloud アカウントのデータベースが使用されます。×を選択します。

    アーキテクチャ

    この例では レプリカセットを選択します。

    説明

    ソース ApsaraDB for MongoDB インスタンスが シャードクラスターの場合は、ShardアカウントShardパスワードも入力する必要があります。

    移行方法

    ソースデータベースから増分データを同期するために使用されるメソッド。ビジネス要件に基づいてメソッドを選択します。有効な値:

    • Oplog (推奨):

      このオプションは、ソースデータベースで oplog 機能が有効になっている場合に使用できます。

      説明

      デフォルトでは、oplog 機能は自己管理 MongoDB データベースと ApsaraDB for MongoDB インスタンスの両方で有効になっています。この機能により、ログのプル速度が速いため、低遅延で増分データを同期できます。したがって、[移行方法] パラメーターには Oplog を選択することをお勧めします。

    • ChangeStream:

      このオプションは、ソースデータベースで変更ストリームが有効になっている場合に使用できます。詳細については、「Change Streams」をご参照ください。

      説明
      • ソースデータベースが非エラスティック Amazon DocumentDB クラスターの場合、[移行方法] パラメーターは ChangeStream にのみ設定できます。

      • アーキテクチャ パラメーターに シャードクラスター を選択した場合、Shardアカウント および Shardパスワード パラメーターを設定する必要はありません。

    インスタンス ID

    ソース ApsaraDB for MongoDB のインスタンス ID を選択します。

    認証データベース

    ソース ApsaraDB for MongoDB インスタンスのデータベースアカウントのデータベース名を入力します。変更していない場合、デフォルトは admin です。

    データベースアカウント

    ソース ApsaraDB for MongoDB のデータベースアカウントを入力します。権限要件については、「データベースアカウントに必要な権限」をご参照ください。

    データベースのパスワード

    データベースへのアクセスに使用するパスワード。

    暗号化

    ソースデータベースへの接続を暗号化するかどうかを指定します。ビジネス要件に基づいて、非暗号化SSL 暗号化、または Mongo Atlas SSL を選択できます。暗号化 パラメーターで使用できるオプションは、アクセス方法 および アーキテクチャ パラメーターに選択された値によって決まります。DTS コンソールに表示されるオプションが優先されます。

    説明
    • [アーキテクチャ] パラメーターが [シャードクラスター] に設定され、ApsaraDB for MongoDB データベースの [移行方法] パラメーターが Oplog に設定されている場合、[暗号化] パラメーター SSL 暗号化 は使用できません。

    • ソースデータベースが レプリカセット アーキテクチャを使用する自己管理 MongoDB データベースであり、アクセス方法 パラメーターが Alibaba Cloud インスタンス に設定されておらず、[暗号化] パラメーターが SSL 暗号化 に設定されている場合、認証局 (CA) 証明書をアップロードしてソースデータベースへの接続を検証できます。

    移行先データベース

    既存の接続情報の選択

    • DTS に登録されているデータベースインスタンスを使用する場合は、ドロップダウンリストからインスタンスを選択します。DTS は、インスタンスの次のデータベースパラメーターを自動的に入力します。詳細については、「データベース接続の管理」をご参照ください。

      説明

      DMS コンソールでは、[DMS データベースインスタンスを選択] ドロップダウンリストからデータベースインスタンスを選択できます。

    • DTS へのインスタンスの登録に失敗した場合、または DTS に登録されているインスタンスを使用する必要がない場合は、次のデータベース情報を設定する必要があります。

    データベースタイプ

    Kafka を選択します。

    アクセス方法

    Alibaba Cloud インスタンスを選択します。

    インスタンスのリージョン

    宛先 Kafka インスタンスが存在するリージョンを選択します。

    Kafka インスタンス ID

    宛先 Kafka インスタンスの ID を選択します。

    暗号化

    ビジネスおよびセキュリティ要件に基づいて、非暗号化または SCRAM-SHA-256 を選択します。

    トピック

    ドロップダウンリストから、データを受信するために使用するトピックを選択します。

    Kafka スキーマレジストリの使用

    Kafka スキーマレジストリは、Avro スキーマを保存および取得するための RESTful インターフェイスを提供するメタデータサービスレイヤーです。

    • ×: Kafka スキーマレジストリを使用しません。

    • : Kafka スキーマレジストリを使用します。Avro スキーマ用に Kafka スキーマレジストリに登録されている URL または IP アドレスを入力する必要があります。

  4. ページの下部にある 接続をテストして続行 をクリックします。

    説明
    • DTS サーバーからのアクセスを許可するために、DTS サーバーの CIDR ブロックがソースデータベースとターゲットデータベースのセキュリティ設定に自動または手動で追加できることを確認してください。詳細については、「DTS サーバーの IP アドレスをホワイトリストに追加する」をご参照ください。

    • ソースデータベースまたはターゲットデータベースが自己管理データベースであり、その アクセス方法Alibaba Cloud インスタンス に設定されていない場合、DTS サーバーの CIDR ブロック ダイアログボックスで 接続テスト をクリックします。

  5. 同期するオブジェクトを設定します。

    1. オブジェクト設定 ステップで、同期するオブジェクトを設定します。

      設定

      説明

      同期タイプ

      デフォルトでは、[増分データ同期] が選択されています。[完全データ同期] のみを選択できます。[スキーマ同期] は選択できません。事前チェックが完了すると、DTS は選択したオブジェクトの既存データをソースデータベースからターゲットデータベースに同期します。既存データは、後続の増分同期の基礎となります。

      競合するテーブルの処理モード

      • エラーの事前チェックと報告: ターゲットデータベースにソースデータベースのコレクションと同じ名前のコレクションが含まれているかどうかを確認します。ソースデータベースとターゲットデータベースに同じコレクション名のコレクションが含まれていない場合、事前チェックは合格します。それ以外の場合、事前チェック中にエラーが返され、データ同期インスタンスを開始できません。

        説明

        ソースデータベースとターゲットデータベースに同じ名前のコレクションがあり、ターゲットデータベースのコレクションを削除または名前変更できない場合は、オブジェクト名マッピング機能を使用して、ターゲットデータベースに同期されるコレクションの名前を変更できます。詳細については、「同期するオブジェクトの名前を変更する」をご参照ください。

      • エラーを無視して続行: ソースデータベースとターゲットデータベースの同じコレクション名の事前チェックをスキップします。

        警告

        エラーを無視して続行 を選択すると、データの不整合が発生し、ビジネスに潜在的なリスクが生じる可能性があります。

        • ターゲットデータベースのデータレコードがソースデータベースのデータレコードと同じプライマリキー値または一意キー値を持つ場合、DTS はそのデータレコードをターゲットデータベースに同期しません。ターゲットデータベースの既存のデータレコードが保持されます。

        • データの初期化に失敗したり、特定の列のみが同期されたり、データ同期インスタンスが失敗したりする可能性があります。

      Kafka のデータ形式

      Canal JSON のみがサポートされています。

      説明

      Kafka が受信するデータは、3 つのシナリオに分類できます。

      Kafka 圧縮形式

      Kafka 圧縮データの圧縮形式。ビジネス要件に基づいて圧縮形式を選択します。有効な値:

      • LZ4 (デフォルト): 低圧縮率、高圧縮速度。

      • GZIP: 高圧縮率、低圧縮速度。

        説明

        GZIP 圧縮は大量の CPU リソースを消費します。

      • Snappy: 中程度の圧縮率、中程度の圧縮速度。

      Kafka パーティションへのデータ転送ポリシー

      要件に基づいてポリシーを選択します。

      メッセージ肯定応答メカニズム

      要件に基づいてメッセージ確認メカニズムを選択します。

      DDL 情報を格納するトピック

      移行先インスタンスでのオブジェクト名の大文字化

      宛先インスタンスのデータベース名とコレクション名の大文字/小文字。デフォルトでは、[DTS デフォルトポリシー] が選択されています。他のオプションを選択して、オブジェクト名の大文字/小文字がソースまたはターゲットデータベースのオブジェクト名のデフォルトの大文字/小文字と一致するようにすることができます。詳細については、「宛先インスタンスのオブジェクト名の大文字/小文字を指定する」をご参照ください。

      ソースオブジェクト

      ソースオブジェクト セクションから 1 つ以上のオブジェクトを選択し、向右 アイコンをクリックしてオブジェクトを 選択中のオブジェクト セクションに追加します。

      説明

      コレクションレベルで同期するオブジェクトを選択できます。

      選択中のオブジェクト

      この例では、追加の設定は必要ありません。

      マッピング機能を使用して、ソースデータベースのコレクションの宛先 Kafka インスタンスのマッピング情報を設定できます。

    2. 次へ:詳細設定 をクリックして詳細設定を行います。

      設定

      説明

      タスクのスケジュールに使用する専用クラスターの選択

      専用クラスターを指定しない場合、DTS はデフォルトでタスクを共有クラスターにスケジュールします。データ同期インスタンスの安定性を向上させたい場合は、専用クラスターを購入してください。詳細については、「DTS 専用クラスターとは」をご参照ください。

      失敗した接続の再試行時間

      失敗した接続のリトライ時間範囲。データ同期タスクの開始後にソースまたはターゲットデータベースへの接続に失敗した場合、DTS は時間範囲内にすぐに接続をリトライします。有効な値: 10~1440。単位: 分。デフォルト値: 720。このパラメーターを 30 より大きい値に設定することをお勧めします。DTS が指定された時間範囲内にソースデータベースとターゲットデータベースに再接続すると、DTS はデータ同期タスクを再開します。それ以外の場合、データ同期タスクは失敗します。

      説明
      • 同じソースまたはターゲットデータベースを持つ複数のデータ同期タスクに異なるリトライ時間範囲を指定した場合、最も短いリトライ時間範囲が優先されます。

      • DTS が接続をリトライすると、DTS インスタンスに対して課金されます。ビジネス要件に基づいてリトライ時間範囲を指定することをお勧めします。ソースインスタンスと宛先インスタンスがリリースされた後、できるだけ早く DTS インスタンスをリリースすることもできます。

      移行元データベースと移行先データベースで他の問題が発生した場合の、再試行までの待機時間です。

      その他の問題のリトライ時間範囲。たとえば、データ同期タスクの開始後に DDL または DML 操作が失敗した場合、DTS は時間範囲内にすぐに操作をリトライします。有効な値: 1~1440。単位: 分。デフォルト値: 10。このパラメーターを 10 より大きい値に設定することをお勧めします。失敗した操作が指定された時間範囲内に正常に実行されると、DTS はデータ同期タスクを再開します。それ以外の場合、データ同期タスクは失敗します。

      重要

      移行元データベースと移行先データベースで他の問題が発生した場合の、再試行までの待機時間です。 パラメーターの値は、失敗した接続の再試行時間 パラメーターの値より小さくする必要があります。

      更新操作後にドキュメント全体を取得するかどうか

      増分データ同期中に、更新操作に対応するドキュメントの完全なデータを宛先に同期するかどうかを指定します。

      説明

      この設定項目は、移行方法ChangeStream に設定されている場合にのみ使用できます。

      • : 更新されたフィールドを含むドキュメントの完全なデータを同期します。

        重要
        • この機能は MongoDB のネイティブ機能に基づいており、ソースデータベースの負荷を増加させる可能性があります。これにより、増分データ収集の速度が低下し、同期インスタンスで遅延が発生する可能性があります。

        • DTS が完全なデータを取得できない場合、更新されたフィールドのデータのみが同期されます。

      • ×: 更新されたフィールドのデータのみを同期します。

      完全同期レートを制限するかどうか

      完全データ同期中、DTS はソースデータベースとターゲットデータベースの読み取りおよび書き込みリソースを使用します。これにより、データベースサーバーの負荷が増加する可能性があります。完全データ同期タスクの 1 秒あたりのソースデータベースのクエリ率 QPS1 秒あたりの完全移行の行数 RPS、および 1 秒あたりの完全移行データ量 (MB) BPS パラメーターを設定して、ターゲットデータベースサーバーの負荷を軽減できます。

      説明

      このパラメーターは、同期タイプ パラメーターに 完全データ同期 が選択されている場合にのみ設定できます。

      同期するデータのうち、同一テーブル内のプライマリキー_id のデータ型が一意かどうか

      同期するデータの単一コレクション内のプライマリキー _id のデータ型は 1 つのみ。

      説明

      このパラメーターは、同期タイプ パラメーターに 完全データ同期 が選択されている場合にのみ表示されます。

      • : 完全データ同期中、DTS はソースデータベースから同期するデータのプライマリキーのデータ型をスキャンしません。

      • ×: 完全データ同期中、DTS はソースデータベースから同期するデータのプライマリキーのデータ型をスキャンします。

      増分同期率を制限するかどうか

      増分データ同期のスロットリングを有効にするかどうかを指定します。ビジネス要件に基づいて、増分データ同期のスロットリングを有効にできます。スロットリングを設定するには、1 秒あたりの増分同期の行数 RPS および 1 秒あたりの増分同期データ量 (MB) BPS パラメーターを設定する必要があります。これにより、ターゲットデータベースサーバーの負荷が軽減されます。

      環境タグ

      要件に基づいてインスタンスを識別するための環境タグを選択できます。この例では、タグを選択する必要はありません。

      ETL の設定

      抽出・変換・書き出し (ETL) 機能を有効にするかどうかを指定します。詳細については、「ETL とは」をご参照ください。有効な値:

      監視アラート

      データ同期インスタンスのアラートを設定するかどうかを指定します。タスクが失敗した場合、または同期遅延が指定されたしきい値を超えた場合、アラート連絡先は通知を受け取ります。有効な値:

      • いいえ: アラートを有効にしません。

      • はい: アラートを設定します。この場合、アラートのしきい値と アラート通知設定も設定する必要があります。詳細については、「モニタリングとアラートの設定」トピックの「DTS タスク作成時のモニタリングとアラートの設定」セクションをご参照ください。

  6. タスク設定を保存し、事前チェックを実行します。

    • 関連する API 操作を呼び出して DTS タスクを設定する際に指定するパラメーターを表示するには、次:タスク設定の保存と事前チェック にポインターを合わせ、OpenAPI パラメーターのプレビュー をクリックします。

    • パラメーターを表示する必要がない場合、または表示済みの場合、ページの下部にある 次:タスク設定の保存と事前チェック をクリックします。

    説明
    • データ同期タスクを開始する前に、DTS は事前チェックを実行します。タスクが事前チェックに合格した後にのみ、データ同期タスクを開始できます。

    • データ同期タスクが事前チェックに失敗した場合は、失敗した各項目の横にある [詳細の表示] をクリックします。チェック結果に基づいて原因を分析した後、問題をトラブルシューティングします。次に、事前チェックを再実行します。

    • 事前チェック中に項目のアラートがトリガーされた場合:

      • アラート項目を無視できない場合は、失敗した項目の横にある [詳細の表示] をクリックして問題をトラブルシューティングします。次に、事前チェックを再度実行します。

      • アラート項目を無視できる場合は、[アラート詳細の確認] をクリックします。[詳細の表示] ダイアログボックスで、[無視] をクリックします。表示されるメッセージで、[OK] をクリックします。次に、[再度事前チェック] をクリックして事前チェックを再度実行します。アラート項目を無視すると、データの不整合が発生し、ビジネスに潜在的なリスクが生じる可能性があります。

  7. インスタンスを購入します。

    1. [成功率][100%] になるまで待ちます。次に、[次へ: インスタンスの購入] をクリックします。

    2. [購入] ページで、データ同期タスクの課金方法とインスタンスクラスのパラメーターを設定します。次の表にパラメーターを示します。

      セクション

      パラメーター

      説明

      新しいインスタンスクラス

      課金方法

      • サブスクリプション: データ同期インスタンスを作成するときにサブスクリプションの料金を支払います。サブスクリプション課金方法は、長期使用の場合、従量課金方法よりも費用対効果が高くなります。

      • 従量課金: 従量課金インスタンスは時間単位で課金されます。従量課金方法は短期使用に適しています。従量課金データ同期インスタンスが不要になった場合は、インスタンスをリリースしてコストを削減できます。

      リソースグループ設定

      データ同期インスタンスが属するリソースグループ。デフォルト値: デフォルトのリソースグループ。詳細については、「Resource Management とは」をご参照ください。

      インスタンスクラス

      DTS は、同期速度が異なるインスタンスクラスを提供します。ビジネス要件に基づいてインスタンスクラスを選択できます。詳細については、「データ同期インスタンスのインスタンスクラス」をご参照ください。

      サブスクリプション期間

      サブスクリプション課金方法を選択した場合は、サブスクリプション期間と作成するデータ同期インスタンスの数を指定します。サブスクリプション期間は、1~9 か月、1 年、2 年、3 年、または 5 年です。

      説明

      このパラメーターは、サブスクリプション 課金方法を選択した場合にのみ使用できます。

    3. [Data Transmission Service (従量課金) サービス規約] を読み、選択します。

    4. [購入して開始] をクリックします。表示されるダイアログボックスで、OK をクリックします。

      タスクリストでタスクの進行状況を表示できます。

マッピング情報

  1. 選択中のオブジェクト エリアで、コレクションレベルの宛先 Topic 名にマウスポインターを合わせます。

  2. 宛先 Topic 名の横にある 編集 をクリックします。

  3. 表示される テーブルの編集 ダイアログボックスで、マッピング情報を設定します。

    設定

    説明

    対象トピックの名前

    ソースコレクションが同期される宛先 Topic の名前。デフォルトでは、ソースデータベースとターゲットデータベースの設定 ステップの 移行先データベース セクションで選択された トピック です。

    重要
    • 入力するトピック名は、宛先 Kafka インスタンスに存在する必要があります。そうでない場合、データ同期は失敗します。

    • 対象トピックの名前 を変更すると、データは入力した Topic に書き込まれます。

    フィルタリング条件

    詳細については、「フィルター条件の設定」をご参照ください。

    パーティション数

    宛先トピックにデータを書き込むためのパーティションの数。

  4. [OK] をクリックします。

データ配信シナリオ

シナリオ 1: Oplog を使用した増分データの同期

主なインスタンス設定

移行方法 で、Oplog を選択します。

データ配信の例

ソース増分変更タイプ

ソース増分変更ステートメント

宛先トピックが受信したデータ

insert

db.kafka_test.insert({"cid":"a","person":{"name":"testName","age":NumberInt(18),"skills":["database","ai"]}})

データを表示 (クリックして展開)

{
	"data": [{
		"person": {
			"skills": ["database", "ai"],
			"name": "testName",
			"age": 18
		},
		"_id": {
			"$oid": "67d27da49591697476e1****"
		},
		"cid": "a"
	}],
	"database": "kafkadb",
	"es": 1741847972000,
	"gtid": null,
	"id": 174184797200000****,
	"isDdl": false,
	"mysqlType": null,
	"old": null,
	"pkNames": ["_id"],
	"serverId": null,
	"sql": null,
	"sqlType": null,
	"table": "kafka_test",
	"ts": 1741847973438,
	"type": "INSERT"
}

update $set

db.kafka_test.update({"cid":"a"},{$set:{"person.age":NumberInt(20)}})

データを表示 (クリックして展開)

{
	"data": [{
		"$set": {
			"person.age": 20
		}
	}],
	"database": "kafkadb",
	"es": 1741848051000,
	"gtid": null,
	"id": 174184805100000****,
	"isDdl": false,
	"mysqlType": null,
	"old": [{
		"_id": {
			"$oid": "67d27da49591697476e1****"
		}
	}],
	"pkNames": ["_id"],
	"serverId": null,
	"sql": null,
	"sqlType": null,
	"table": "kafka_test",
	"ts": 1741848051984,
	"type": "UPDATE"
}

update $set new filed

db.kafka_test.update({"cid":"a"},{$set:{"salary":100}})

データを表示 (クリックして展開)

{
	"data": [{
		"$set": {
			"salary": 100.0
		}
	}],
	"database": "kafkadb",
	"es": 1741848146000,
	"gtid": null,
	"id": 174184814600000****,
	"isDdl": false,
	"mysqlType": null,
	"old": [{
		"_id": {
			"$oid": "67d27da49591697476e1****"
		}
	}],
	"pkNames": ["_id"],
	"serverId": null,
	"sql": null,
	"sqlType": null,
	"table": "kafka_test",
	"ts": 1741848147734,
	"type": "UPDATE"
}

update $unset remove field

db.kafka_test.update({"cid":"a"},{$unset:{"salary":1}})

データを表示 (クリックして展開)

{
	"data": [{
		"$unset": {
			"salary": true
		}
	}],
	"database": "kafkadb",
	"es": 1741848207000,
	"gtid": null,
	"id": 174184820700000****,
	"isDdl": false,
	"mysqlType": null,
	"old": [{
		"_id": {
			"$oid": "67d27da49591697476e1****"
		}
	}],
	"pkNames": ["_id"],
	"serverId": null,
	"sql": null,
	"sqlType": null,
	"table": "kafka_test",
	"ts": 1741848208186,
	"type": "UPDATE"
}

delete

db.kafka_test.deleteOne({"cid":"a"})

データを表示 (クリックして展開)

{
	"data": [{
		"_id": {
			"$oid": "67d27da49591697476e1****"
		}
	}],
	"database": "kafkadb",
	"es": 1741848289000,
	"gtid": null,
	"id": 174184828900000****,
	"isDdl": false,
	"mysqlType": null,
	"old": null,
	"pkNames": ["_id"],
	"serverId": null,
	"sql": null,
	"sqlType": null,
	"table": "kafka_test",
	"ts": 1741848289798,
	"type": "DELETE"
}

ddl drop

db.kafka_test.drop()

データを表示 (クリックして展開)

{
	"data": null,
	"database": "kafkadb",
	"es": 1741847893000,
	"gtid": null,
	"id": 1741847893000000005,
	"isDdl": true,
	"mysqlType": null,
	"old": null,
	"pkNames": null,
	"serverId": null,
	"sql": {
		"drop": "kafka_test"
	},
	"sqlType": null,
	"table": "kafka_test",
	"ts": 1741847893760,
	"type": "DDL"
}

シナリオ 2: ChangeStream を使用した増分データの同期 (更新されたフィールドのデータの同期)

主なインスタンス設定

移行方法 で、ChangeStream を選択します。[更新操作後に完全なドキュメントを取得] で、× を選択します。

データ配信の例

ソース増分変更タイプ

ソース増分変更ステートメント

宛先トピックが受信したデータ

insert

db.kafka_test.insert({"cid":"a","person":{"name":"testName","age":NumberInt(18),"skills":["database","ai"]}})

データを表示 (クリックして展開)

{
	"data": [{
		"person": {
			"skills": ["database", "ai"],
			"name": "testName",
			"age": 18
		},
		"_id": {
			"$oid": "67d27da49591697476e1****"
		},
		"cid": "a"
	}],
	"database": "kafkadb",
	"es": 1741847972000,
	"gtid": null,
	"id": 174184797200000****,
	"isDdl": false,
	"mysqlType": null,
	"old": null,
	"pkNames": ["_id"],
	"serverId": null,
	"sql": null,
	"sqlType": null,
	"table": "kafka_test",
	"ts": 1741847973803,
	"type": "INSERT"
}

update $set

db.kafka_test.update({"cid":"a"},{$set:{"person.age":NumberInt(20)}})

データを表示 (クリックして展開)

{
	"data": [{
		"$set": {
			"person.age": 20
		}
	}],
	"database": "kafkadb",
	"es": 1741848051000,
	"gtid": null,
	"id": 174184805100000****,
	"isDdl": false,
	"mysqlType": null,
	"old": [{
		"_id": {
			"$oid": "67d27da49591697476e1****"
		}
	}],
	"pkNames": ["_id"],
	"serverId": null,
	"sql": null,
	"sqlType": null,
	"table": "kafka_test",
	"ts": 1741848052912,
	"type": "UPDATE"
}

update $set new filed

db.kafka_test.update({"cid":"a"},{$set:{"salary":100}})

データを表示 (クリックして展開)

{
	"data": [{
		"$set": {
			"salary": 100.0
		}
	}],
	"database": "kafkadb",
	"es": 1741848146000,
	"gtid": null,
	"id": 174184814600000****,
	"isDdl": false,
	"mysqlType": null,
	"old": [{
		"_id": {
			"$oid": "67d27da49591697476e1****"
		}
	}],
	"pkNames": ["_id"],
	"serverId": null,
	"sql": null,
	"sqlType": null,
	"table": "kafka_test",
	"ts": 1741848148056,
	"type": "UPDATE"
}

update $unset remove field

db.kafka_test.update({"cid":"a"},{$unset:{"salary":1}})

データを表示 (クリックして展開)

{
	"data": [{
		"$unset": {
			"salary": 1
		}
	}],
	"database": "kafkadb",
	"es": 1741848207000,
	"gtid": null,
	"id": 174184820700000****,
	"isDdl": false,
	"mysqlType": null,
	"old": [{
		"_id": {
			"$oid": "67d27da49591697476e1****"
		}
	}],
	"pkNames": ["_id"],
	"serverId": null,
	"sql": null,
	"sqlType": null,
	"table": "kafka_test",
	"ts": 1741848209142,
	"type": "UPDATE"
}

delete

db.kafka_test.deleteOne({"cid":"a"})

データを表示 (クリックして展開)

{
	"data": [{
		"_id": {
			"$oid": "67d27da49591697476e1****"
		}
	}],
	"database": "kafkadb",
	"es": 1741848289000,
	"gtid": null,
	"id": 174184828900000****,
	"isDdl": false,
	"mysqlType": null,
	"old": null,
	"pkNames": ["_id"],
	"serverId": null,
	"sql": null,
	"sqlType": null,
	"table": "kafka_test",
	"ts": 1741848290254,
	"type": "DELETE"
}

ddl drop

db.kafka_test.drop()

データを表示 (クリックして展開)

{
	"data": null,
	"database": "kafkadb",
	"es": 1741847893000,
	"gtid": null,
	"id": 174184789300000****,
	"isDdl": true,
	"mysqlType": null,
	"old": null,
	"pkNames": null,
	"serverId": null,
	"sql": {
		"drop": "kafka_test"
	},
	"sqlType": null,
	"table": "kafka_test",
	"ts": 1741847894679,
	"type": "DDL"
}

シナリオ 3: ChangeStream を使用した増分データの同期 (更新されたフィールドに対応するドキュメントの完全なデータの同期)

主なインスタンス設定

移行方法ChangeStream に設定し、[更新操作後に完全なドキュメントを取得] に設定します。

データ配信の例

ソース増分変更タイプ

ソース増分変更ステートメント

宛先トピックが受信したデータ

insert

db.kafka_test.insert({"cid":"a","person":{"name":"testName","age":NumberInt(18),"skills":["database","ai"]}})

データを表示 (クリックして展開)

{
	"data": [{
		"person": {
			"skills": ["database", "ai"],
			"name": "testName",
			"age": 18
		},
		"_id": {
			"$oid": "67d27da49591697476e1****"
		},
		"cid": "a"
	}],
	"database": "kafkadb",
	"es": 1741847972000,
	"gtid": null,
	"id": 174184797200000****,
	"isDdl": false,
	"mysqlType": null,
	"old": null,
	"pkNames": ["_id"],
	"serverId": null,
	"sql": null,
	"sqlType": null,
	"table": "kafka_test",
	"ts": 1741847973128,
	"type": "INSERT"
}

update $set

db.kafka_test.update({"cid":"a"},{$set:{"person.age":NumberInt(20)}})

データを表示 (クリックして展開)

{
	"data": [{
		"person": {
			"skills": ["database", "ai"],
			"name": "testName",
			"age": 20
		},
		"_id": {
			"$oid": "67d27da49591697476e1****"
		},
		"cid": "a"
	}],
	"database": "kafkadb",
	"es": 1741848051000,
	"gtid": null,
	"id": 174184805100000****,
	"isDdl": false,
	"mysqlType": null,
	"old": [{
		"_id": {
			"$oid": "67d27da49591697476e1****"
		}
	}],
	"pkNames": ["_id"],
	"serverId": null,
	"sql": null,
	"sqlType": null,
	"table": "kafka_test",
	"ts": 1741848052219,
	"type": "UPDATE"
}

update $set new filed

db.kafka_test.update({"cid":"a"},{$set:{"salary":100}})

データを表示 (クリックして展開)

{
	"data": [{
		"person": {
			"skills": ["database", "ai"],
			"name": "testName",
			"age": 20
		},
		"_id": {
			"$oid": "67d27da49591697476e1****"
		},
		"salary": 100.0,
		"cid": "a"
	}],
	"database": "kafkadb",
	"es": 1741848146000,
	"gtid": null,
	"id": 174184814600000****,
	"isDdl": false,
	"mysqlType": null,
	"old": [{
		"_id": {
			"$oid": "67d27da49591697476e1****"
		}
	}],
	"pkNames": ["_id"],
	"serverId": null,
	"sql": null,
	"sqlType": null,
	"table": "kafka_test",
	"ts": 1741848147327,
	"type": "UPDATE"
}

update $unset remove field

db.kafka_test.update({"cid":"a"},{$unset:{"salary":1}})

データを表示 (クリックして展開)

{
	"data": [{
		"person": {
			"skills": ["database", "ai"],
			"name": "testName",
			"age": 20
		},
		"_id": {
			"$oid": "67d27da49591697476e1****"
		},
		"cid": "a"
	}],
	"database": "kafkadb",
	"es": 1741848207000,
	"gtid": null,
	"id": 174184820700000****,
	"isDdl": false,
	"mysqlType": null,
	"old": [{
		"_id": {
			"$oid": "67d27da49591697476e1****"
		}
	}],
	"pkNames": ["_id"],
	"serverId": null,
	"sql": null,
	"sqlType": null,
	"table": "kafka_test",
	"ts": 1741848208401,
	"type": "UPDATE"
}

delete

db.kafka_test.deleteOne({"cid":"a"})

データを表示 (クリックして展開)

{
	"data": [{
		"_id": {
			"$oid": "67d27da49591697476e1****"
		}
	}],
	"database": "kafkadb",
	"es": 1741848289000,
	"gtid": null,
	"id": 174184828900000****,
	"isDdl": false,
	"mysqlType": null,
	"old": null,
	"pkNames": ["_id"],
	"serverId": null,
	"sql": null,
	"sqlType": null,
	"table": "kafka_test",
	"ts": 1741848290499,
	"type": "DELETE"
}

ddl drop

db.kafka_test.drop()

データを表示 (クリックして展開)

{
	"data": null,
	"database": "kafkadb",
	"es": 1741847893000,
	"gtid": null,
	"id": 174184789300000****,
	"isDdl": true,
	"mysqlType": null,
	"old": null,
	"pkNames": null,
	"serverId": null,
	"sql": {
		"drop": "kafka_test"
	},
	"sqlType": null,
	"table": "kafka_test",
	"ts": 1741847894045,
	"type": "DDL"
}

特殊なケース

注意事項

更新イベントの fullDocument フィールドが欠落している場合、データ配信結果はOplog を使用して増分データを同期する場合と同じになります。

ソースベースデータ

ソース増分変更ステートメント

宛先トピックが受信したデータ

use admin
db.runCommand({ enablesharding:"dts_test" }) 
use dts_test
sh.shardCollection("dts_test.cstest",{"name":"hashed"})
db.cstest.insert({"_id":1,"name":"a"})
db.cstest.updateOne({"_id":1,"name":"a"},{$set:{"name":"b"}})

データを表示 (クリックして展開)

{
	"data": [{
		"$set": {
			"name": "b"
		}
	}],
	"database": "dts_test",
	"es": 1740720994000,
	"gtid": null,
	"id": 174072099400000****,
	"isDdl": false,
	"mysqlType": null,
	"old": [{
		"name": "a",
		"_id": 1.0
	}],
	"pkNames": ["_id"],
	"serverId": null,
	"sql": null,
	"sqlType": null,
	"table": "cstest",
	"ts": 1740721007099,
	"type": "UPDATE"
}

よくある質問

  • [Kafka データ圧縮形式] を変更できますか?

    はい。詳細については、「[同期するオブジェクトの変更]」をご参照ください。

  • [メッセージ確認メカニズム] を変更できますか?

    はい。詳細については、「[同期するオブジェクトの変更]」をご参照ください。