すべてのプロダクト
Search
ドキュメントセンター

Data Transmission Service:ApsaraDB for MongoDB から Message Queue for Apache Kafka へのデータ同期

最終更新日:Mar 29, 2026

Data Transmission Service (DTS) を使用すると、ApsaraDB for MongoDB の変更データをストリーミング方式で Message Queue for Apache Kafka インスタンスに送信できます。以下の手順では、MongoDB レプリカセットをソース、Kafka インスタンスをターゲットとして同期タスクを作成する方法を説明します。

本手順の対象範囲:

  • サポートされるアーキテクチャ:レプリカセットおよびシャードクラスター

  • 増分同期方法:Oplog(推奨)および Change Streams

  • データ形式:Canal JSON 形式で Kafka トピックに配信

  • 同期範囲:コレクション単位での完全同期および増分同期

  • 課金:完全データ同期は無料、増分データ同期は課金対象

前提条件

開始する前に、以下の条件を満たしていることを確認してください。

  • Message Queue for Apache Kafka インスタンス

  • ターゲット Kafka インスタンス内に、データ受信用に作成済みの トピック

  • (シャードクラスターのみ) すべてのシャードノードにエンドポイントを適用し、同じアカウントパスワードとエンドポイントを共有する必要があります。 詳細については、「シャードのエンドポイントを申請する」をご参照ください。

サポートされているソースおよびターゲットデータベースのバージョンについては、「同期ソリューションの概要」をご参照ください。

制限事項

同期タスクを作成する前に、以下の制限事項を確認してください。

ソースデータベースの制限

  • ソースサーバーには十分なアウトバウンド帯域幅が必要です。帯域幅が不足していると、同期速度が低下します。

  • コレクションの名前マッピングを設定する場合、1 つのタスクで同期できるコレクション数は最大 1,000 個です。この上限を超えるとリクエストエラーが発生します。1,000 個を超えるコレクションを同期する場合は、複数のタスクを作成するか、名前マッピングを適用せずにデータベース全体を同期してください。

  • シャードクラスターをソースとする場合:各同期対象コレクションの _id フィールドは一意である必要があります。一意でない場合、データの不整合が発生する可能性があります。

  • シャードクラスターをソースとする場合:mongos ノードの数は 10 個を超えてはならず、インスタンス内に孤立ドキュメントが存在してはなりません。「よくある質問」トピックをご参照いただき、孤立ドキュメントの削除方法をご確認ください。

  • スタンドアロンの ApsaraDB for MongoDB インスタンス、Azure Cosmos DB for MongoDB クラスター、Amazon DocumentDB エラスティッククラスターは、ソースとしてサポートされていません。

  • DTS は SRV エンドポイント経由で MongoDB に接続できません。

  • oplog は有効化され、ログデータを少なくとも 7 日間保持する必要があります。または、Change Streams を有効化し、DTS が過去 7 日間のデータ変更をサブスクライブできるようにする必要があります。いずれの条件も満たさない場合、DTS がソースの変更を取得できず、サービスレベルアグリーメント(SLA)の範囲外でデータ損失または不整合が発生する可能性があります。

    重要

    - ソースデータベースの変更を記録するには、oplog を使用することを推奨します(推奨)。 - Change Streams を使用するには、MongoDB 4.0 以降が必要です。Change Streams を使用した双方向同期はサポートされていません。 - 非エラスティックの Amazon DocumentDB クラスターでは、Change Streams を有効化し、移行方法ChangeStream に、アーキテクチャシャードクラスター に設定します。

  • 完全データ同期中は、データベースまたはコレクションのスキーマを変更したり、ARRAY 型のデータを変更したりしないでください。

  • シャードクラスターをソースとする場合、同期中に以下のコマンドを実行しないでください:shardCollectionreshardCollectionunshardCollectionmoveCollectionmovePrimary。これらのコマンドはデータの不整合を引き起こす可能性があります。

  • ソースデータベースがシャードクラスター構成の MongoDB インスタンスであり、その balancer がデータのバランス調整を行っている場合、インスタンスに遅延が発生する可能性があります。

その他の制限

  • コレクション単位の同期のみがサポートされています。

  • adminconfiglocal データベースは同期できません。

  • 1 件のレコードが 10 MB を超えると、タスクは失敗します。

  • トランザクションは保持されません。DTS は各トランザクションをターゲット側で個別のレコードに変換します。

  • DTS タスク実行中に、ターゲット Kafka インスタンスのブローカーノードが追加または削除された場合、DTS タスクを再起動してください。

  • DTS がソースおよびターゲットインスタンスに接続できることを確認してください。たとえば、データベースインスタンスのセキュリティ設定や、自己管理型 Kafka インスタンスの server.properties ファイル内の listeners および advertised.listeners パラメーターが、DTS からのアクセスを制限していないことを確認してください。

  • CPU 使用率がソースおよびターゲットデータベースともに 30 % 未満となるオフピーク時間帯に同期を実行してください。

  • DTS は、実行期間が 7 日間 未満の失敗したインスタンスを自動的に再試行します。トラフィックをターゲットに切り替える前に、同期インスタンスを停止またはリリースして、自動再開によるターゲットデータの上書きを防止してください。

  • DTS は、ターゲットデータベース内の最新同期データのタイムスタンプとソースデータベース内の現在時刻に基づいて、増分データ同期の同期遅延を計算します。ソースデータベースで長期間更新操作が行われない場合、同期遅延の値が不正確になる可能性があります。同期遅延が過度に高くなった場合は、ソースデータベースで更新操作を実行して遅延値を更新してください。

  • DTS タスクの実行に失敗した場合、DTS テクニカルサポートが 8 時間 以内にタスクの復旧を試みます。復旧中はタスクが再起動されることがあり、タスクのパラメーターが変更される場合があります。変更されるのは DTS タスクのパラメーターのみであり、データベースのパラメーターは変更されません。変更される可能性のあるパラメーターには、「インスタンスパラメーターの変更」セクションのパラメーターが含まれますが、これに限定されません。

  • 増分同期方法として Oplog を選択したシャードクラスターをソースとする場合、DTS はシャード間の書き込み順序をターゲット Kafka トピックに対して保証しません。

  • シャードクラスターのソースの場合、完全データ同期中に MongoDB バランサーを無効化します。完全同期が完了し、増分同期が開始された後のみ、再度有効化してください。詳細については、「ApsaraDB for MongoDB バランサーの管理」をご参照ください。

課金

同期タイプ料金
完全データ同期無料
増分データ同期課金概要

同期タイプおよびサポートされる操作

完全同期 では、選択した MongoDB コレクションからすべての既存データをターゲット Kafka トピックにコピーします。DTS は DATABASE および COLLECTION オブジェクトをサポートしています。

増分同期 では、完全同期完了後に変更イベントを継続的に配信します。サポートされる操作は、増分同期方法によって異なります。

操作OplogChange Streams
INSERTサポートサポート
UPDATEサポートサポート
DELETEサポートサポート
CREATE COLLECTION / INDEXサポート非サポート
DROP DATABASE / COLLECTION / INDEXサポートDROP DATABASE と DROP COLLECTION のみ
RENAME COLLECTIONサポートサポート
増分同期では、タスク開始後に作成されたデータベースは対象となりません。

必要なデータベースアカウント権限

データベース必要な権限
ソース ApsaraDB for MongoDB同期対象データベース、admin データベース、および local データベースに対する読み取り権限

アカウント作成手順については、「アカウント管理」をご参照ください。

同期タスクの作成

ステップ 1:データ同期ページを開く

DTS コンソールまたは DMS コンソールのいずれかを使用します。

DTS コンソール

  1. DTS コンソール にログインします。DTS コンソール

  2. 左側ナビゲーションウィンドウで、データ同期 をクリックします。

  3. 画面左上隅で、同期タスクが配置されるリージョンを選択します。

DMS コンソール

手順はDMSコンソールのモードによって異なる場合があります。「シンプルモード」と「ビジネス要件に基づくDMSコンソールのレイアウトとスタイルのカスタマイズ」をご参照ください。
  1. DMS コンソール にログインします。DMS コンソール

  2. 上部ナビゲーションバーで、Data + AI の上にポインターを合わせ、DTS (DTS) > データ同期 を選択します。

  3. データ同期タスク の右側にあるドロップダウンリストから、リージョンを選択します。

ステップ 2:ソースおよびターゲットデータベースの構成

タスクの作成 をクリックし、以下の表に示すパラメーターを構成します。

一般

パラメーター説明
タスク名DTS タスクの名前です。DTS が自動的に名前を生成します。タスクを識別しやすいように、意味のある名前を指定してください。名前は一意である必要はありません。

ソースデータベース

パラメーター説明
既存の接続を選択ソースインスタンスが DTS に登録済みの場合、ドロップダウンリストから選択してください。DTS が残りのフィールドを自動的に入力します。それ以外の場合は、以下のフィールドを構成してください。DMS コンソールでは、DMS データベースインスタンスの選択 リストから選択します。
データベースタイプMongoDB を選択します。
アクセス方法Alibaba Cloud インスタンス を選択します。
インスタンス リージョンソース ApsaraDB for MongoDB インスタンスのリージョンを選択します。
Alibaba Cloud アカウント間でのデータ複製ソースデータベースが現在の Alibaba Cloud アカウントに属している場合は、いいえ を選択します。
アーキテクチャ本例では レプリカセット を選択します。ソースが シャードクラスター の場合、シャードアカウント および シャードパスワード も入力してください。
移行方法増分データの同期方法です。Oplog は、oplog 機能が有効化されている場合に推奨されます。ChangeStream は、ソースで Change Streams が有効化されている場合に利用可能です。詳細については、「Change Streams」をご参照ください。注: 非エラスティックの Amazon DocumentDB クラスターでは、ChangeStream のみがサポートされます。アーキテクチャシャードクラスター に、移行方法ChangeStream に設定した場合、シャードアカウント および シャードパスワード フィールドは不要です。
インスタンス IDソース ApsaraDB for MongoDB インスタンスのインスタンス ID を選択します。
認証データベースアカウント認証情報が格納されているデータベースです。デフォルトは admin です。
データベースアカウントソースデータベースのアカウントです。「必要なデータベースアカウント権限」をご参照ください。
データベースパスワードデータベースアカウントのパスワードです。
暗号化接続暗号化タイプを指定します:暗号化なしSSL 暗号化、または Mongo Atlas SSL。利用可能なオプションは、アクセス方法 および アーキテクチャ の値によって異なります。注: アーキテクチャシャードクラスター で、移行方法Oplog の場合、SSL 暗号化は利用できません。自己管理型 MongoDB で レプリカセット アーキテクチャを採用し、SSL 暗号化 を選択した場合、CA 証明書をアップロードして接続を検証してください。

ターゲットデータベース

パラメーター説明
既存の接続を選択ターゲット Kafka インスタンスが DTS に登録済みの場合、ドロップダウンリストから選択してください。それ以外の場合は、以下のフィールドを構成してください。
データベースタイプKafka を選択します。
アクセス方法Alibaba Cloud インスタンス を選択します。
インスタンスリージョンターゲット Kafka インスタンスのリージョンを選択します。
Kafka インスタンス IDターゲット Kafka インスタンス ID を選択します。
暗号化セキュリティ要件に応じて、暗号化なし または SCRAM-SHA-256 を選択します。
トピック同期データを受信するトピックを選択します。
Kafka Schema Registry の使用Kafka Schema Registry は、Avro スキーマを保存および取得するための RESTful メタデータサービスです。いいえ を選択するとスキップされ、はいアラート通知設定 を選択すると、Schema Registry の URL または IP アドレスを指定してください。

ステップ 3:接続性のテスト

ページ下部の 接続性のテストと続行 をクリックします。

許可されている場合、DTS は、ソースおよびターゲットデータベースのセキュリティ設定に、DTS サーバーの CIDR ブロックを自動的に追加します。手動設定については、「DTS サーバーの CIDR ブロックをオンプレミスデータベースのセキュリティ設定に追加する」をご参照ください。
Alibaba Cloud インスタンス をアクセス方法として使用しない自己管理型データベースの場合、接続性のテストDTS サーバーの CIDR ブロック ダイアログボックス内でクリックしてください。

ステップ 4:同期対象オブジェクトの構成

オブジェクトの構成 ステップで、以下のパラメーターを設定します。

パラメーター説明
同期タイプ増分データ同期 がデフォルトで選択されています。必要に応じて、完全データ同期 も選択できます。スキーマ同期 は利用できません。完全同期が有効化されている場合、DTS はまずすべての既存データをコピーし、その後で増分同期を開始します。
競合テーブルの処理モード事前チェックおよびエラー報告:ターゲットにソースと同じ名前のコレクションが存在する場合、事前チェックが失敗します。名前競合を解決するには、「オブジェクト名マッピング」をご利用ください。エラーを無視して続行:名前競合チェックをスキップします。ターゲットのレコードとソースのレコードでプライマリキーまたは一意キーが一致する場合、ターゲットのレコードが保持されます。
警告

このオプションはデータの不整合を引き起こす可能性があります。

Kafka 内のデータ形式サポートされるのは Canal JSON のみです。
Kafka データ圧縮形式Kafka に書き込まれるデータの圧縮形式です。オプション: LZ4(デフォルト — 圧縮率が低く、高速)、GZIP(圧縮率が高く、低速、CPU 使用率が高い)、Snappy(バランス型)。
Kafka パーティションへのデータ配信ポリシー要件に基づいてパーティションルーティングポリシーを選択します。
メッセージ確認応答メカニズム要件に応じて、「メッセージ確認応答メカニズム」を選択してください。
DDL 情報を格納するトピックDDL イベントを格納するトピックを選択します。空欄のままにした場合、DDL イベントはデータイベントと同じトピックに書き込まれます。
宛先インスタンスにおけるオブジェクト名の大文字小文字の区別宛先におけるデータベースおよびコレクション名の大文字小文字の区別を制御します。デフォルトは DTS デフォルトポリシー宛先インスタンスにおけるオブジェクト名の大文字小文字の指定 です。「」をご参照ください。
ソースオブジェクトソースオブジェクト セクションからオブジェクトを選択し、矢印アイコンをクリックして 選択済みオブジェクト に移動します。コレクション単位の選択のみがサポートされています。

ステップ 5:高度な設定の構成

次へ:高度な設定 をクリックし、以下のパラメーターを構成します。

パラメーター説明
タスクスケジューリング専用クラスターデフォルトでは、DTS はタスクを共有クラスターでスケジュールします。より高い安定性を得るために、専用クラスター を購入してください。
接続失敗時の再試行時間ソースまたはターゲットに到達できない場合の DTS の再試行時間です。範囲:10~1440 分。デフォルト:720 分。このパラメーターは 30 を超える値に設定することを推奨します。同一データベースを共有する複数のタスクで異なる再試行時間を指定した場合、最も短い値が有効になります。再試行時間は課金対象です。
その他の問題発生時の再試行時間DDL または DML 操作が失敗した場合の DTS の再試行時間です。範囲:1~1440 分。デフォルト:10 分。最低でも 10 分以上に設定してください。接続失敗時の再試行時間 よりも短く設定する必要があります。
更新後のドキュメント全体を取得移行方法ChangeStream の場合にのみ利用可能です。はい:各更新に対してドキュメント全体を同期しますが、これによりソース負荷が増加し、遅延が発生する可能性があります。DTS がドキュメント全体を取得できない場合、変更されたフィールドのみが送信されます。いいえ:変更されたフィールドのみを同期します。
完全データ同期のスロットリング有効化有効化した場合、ソースデータベースへの QPS完全データ移行の RPS、および 完全移行のデータ移行速度(MB/s) を構成して、ターゲットへの負荷を軽減します。完全データ同期 が選択されている場合にのみ利用可能です。
同期対象データのテーブルにおいて、プライマリキー _id は 1 つのデータ型のみ完全データ同期 が選択されている場合にのみ利用可能です。はい:完全同期中に DTS は _id フィールドのデータ型のスキャンをスキップします。いいえ:DTS は _id フィールドのデータ型をスキャンします。
増分データ同期のスロットリング有効化有効化した場合、増分データ同期の RPS および 増分同期のデータ同期速度(MB/s) を構成して、ターゲットへの負荷を軽減します。
環境タグインスタンスを識別するための任意のタグです。
ETL の構成ETL (抽出・変換・書き出し) ロジックを適用するには、有効化します。コードエディタにデータ処理文を入力します。詳細については、「ETL をデータ移行またはデータ同期タスクで設定する」をご参照ください。
モニタリングとアラート有効にすると、アラートのしきい値と通知設定を指定します。タスクが失敗した場合、または同期遅延がしきい値を超えた場合、DTS はアラート連絡先に通知します。詳細については、「モニタリングとアラートの設定」をご参照ください。

ステップ 6:事前チェックの実行

次へ:タスク設定の保存と事前チェック をクリックします。

この構成の API パラメーターをプレビューするには、ボタンにカーソルを合わせ、OpenAPI パラメーターのプレビュー をクリックしてください。
事前チェックが成功するまで、タスクを開始できません。
事前チェックが失敗した場合、各失敗項目の横にある 詳細の表示 をクリックし、問題を解決した後、再チェック をクリックしてください。
項目に対してアラートが発行された場合:進行する前に、無視できないアラートを修正してください。無視可能なアラートの場合は、アラート詳細の確認 をクリックし、ダイアログで 無視 を選択して確認し、再チェック をクリックしてください。アラートを無視すると、データの不整合が発生する可能性があります。

ステップ 7:インスタンスの購入および開始

  1. 成功率100 % に達するのを待ってから、次へ:インスタンスの購入 をクリックします。

  2. 購入 ページで、以下のパラメーターを構成します。

    パラメーター説明
    課金方法サブスクリプション:前払い方式。長期利用に適しています。従量課金:時間単位で課金されます。短期利用に適しています。不要になったらインスタンスをリリースして、課金を停止してください。
    リソースグループ設定同期インスタンスのリソースグループです。デフォルト: デフォルトリソースグループ。「Resource Management とは?
    インスタンスクラス同期速度の階層です。「データ同期インスタンスのインスタンスクラス」をご参照ください。
    サブスクリプション期間サブスクリプション 課金に利用可能です。オプション:1~9 か月、1 年、2 年、3 年、または 5 年。
  3. Data Transmission Service(従量課金)サービス利用規約 をお読みになり、同意してください。

  4. 購入して開始 をクリックし、確認ダイアログで OK をクリックします。

タスクがタスク一覧に表示されます。そこで進捗状況を監視してください。

コレクションからトピックへのマッピングの構成

デフォルトでは、すべてのコレクションがターゲットデータベース構成で選択したトピックに書き込まれます。特定のコレクションを別のトピックにルーティングするには、以下の手順を実行します。

  1. 選択済みオブジェクト エリアで、コレクションレベルのターゲットトピック名の上にポインターを合わせます。

  2. トピック名の横にある 編集 をクリックします。

  3. テーブルの編集 ダイアログで、以下の設定を構成します。

    パラメーター説明
    ターゲットトピック名このコレクションのターゲットトピックです。トピックは Kafka インスタンス内に存在する必要があります。変更した場合、データは新しいトピックに書き込まれます。デフォルト:ターゲットデータベース構成時に選択したトピックです。
    フィルター条件オプションの行フィルターです。「フィルター条件の設定」をご参照ください。
    パーティション数トピックにデータを書き込む際のパーティション数です。
  4. OK をクリックします。

データ配信の例

MongoDB の各増分変更は Canal JSON メッセージとしてシリアル化され、構成済みの Kafka トピックに配信されます。メッセージ構造は、増分同期方法および更新設定によって異なります。

シナリオの選択

目的構成
低遅延同期および完全な DDL サポート移行方法Oplog に設定(シナリオ 1)
部分的なドキュメント更新を伴う Change Streams移行方法ChangeStream に、更新後のドキュメント全体を取得いいえ に設定(シナリオ 2)
各更新でドキュメント全体を含む Change Streams移行方法ChangeStream に、更新後のドキュメント全体を取得はい に設定(シナリオ 3)

Canal JSON メッセージのフィールド

すべてのシナリオで、同じトップレベルの Canal JSON エンベロープが使用されます。以下のフィールドがすべてのメッセージに含まれます。

フィールド説明
databasestringソースデータベース名
tablestringソースコレクション名
typestring操作タイプ:INSERTUPDATEDELETE、または DDL
isDdlbooleantrue は DDL イベント(コレクションの削除、名前変更)用、false は DML イベント用
esnumberソースデータベース内のイベントタイムスタンプ(UNIX ミリ秒)
tsnumberDTS がイベントを処理したタイムスタンプ(UNIX ミリ秒)
idnumberDTS 内部イベント ID
pkNamesarrayプライマリキーのフィールド名(通常は ["_id"]
dataarray操作後のドキュメントデータ。部分更新(Oplog または full document 取得なしの ChangeStream)の場合、MongoDB 更新演算子($set$unset)を使用して変更されたフィールドのみが含まれます。null は DDL イベント用です。
oldarray更新前のドキュメント状態:通常 _id フィールドのみが含まれます。null は INSERT および DDL イベント用です。
sqlobject または nullDDL 文の詳細(isDdl: true の場合)。null は DML イベント用です。
gtidnullMongoDB には該当せず(MySQL 互換性のために予約)。
mysqlTypenullMongoDB には該当しません。
serverIdnullMongoDB には該当しません。
sqlTypenullMongoDB には該当しません。

シナリオ 1:Oplog

移行方法Oplog に設定します。

ソースの変更タイプソース文ターゲットトピックで受信されるデータ
insertdb.kafka_test.insert({"cid":"a","person":{"name":"testName","age":NumberInt(18),"skills":["database","ai"]}})以下に例を示します
update $setdb.kafka_test.update({"cid":"a"},{$set:{"person.age":NumberInt(20)}})以下に例を示します
update $set new fielddb.kafka_test.update({"cid":"a"},{$set:{"salary":100}})以下に例を示します
update $unset (remove field)db.kafka_test.update({"cid":"a"},{$unset:{"salary":1}})以下に例を示します
deletedb.kafka_test.deleteOne({"cid":"a"})以下に例を示します
ddl dropdb.kafka_test.drop()以下に例を示します

データの表示(クリックして展開)

{
    "data": [{
        "person": {
            "skills": ["database", "ai"],
            "name": "testName",
            "age": 18
        },
        "_id": {
            "$oid": "67d27da49591697476e1****"
        },
        "cid": "a"
    }],
    "database": "kafkadb",
    "es": 1741847972000,
    "gtid": null,
    "id": 174184797200000****,
    "isDdl": false,
    "mysqlType": null,
    "old": null,
    "pkNames": ["_id"],
    "serverId": null,
    "sql": null,
    "sqlType": null,
    "table": "kafka_test",
    "ts": 1741847973438,
    "type": "INSERT"
}

データの表示(クリックして展開)

{
    "data": [{
        "$set": {
            "person.age": 20
        }
    }],
    "database": "kafkadb",
    "es": 1741848051000,
    "gtid": null,
    "id": 174184805100000****,
    "isDdl": false,
    "mysqlType": null,
    "old": [{
        "_id": {
            "$oid": "67d27da49591697476e1****"
        }
    }],
    "pkNames": ["_id"],
    "serverId": null,
    "sql": null,
    "sqlType": null,
    "table": "kafka_test",
    "ts": 1741848051984,
    "type": "UPDATE"
}

データの表示(クリックして展開)

{
    "data": [{
        "$set": {
            "salary": 100.0
        }
    }],
    "database": "kafkadb",
    "es": 1741848146000,
    "gtid": null,
    "id": 174184814600000****,
    "isDdl": false,
    "mysqlType": null,
    "old": [{
        "_id": {
            "$oid": "67d27da49591697476e1****"
        }
    }],
    "pkNames": ["_id"],
    "serverId": null,
    "sql": null,
    "sqlType": null,
    "table": "kafka_test",
    "ts": 1741848147734,
    "type": "UPDATE"
}

データの表示(クリックして展開)

{
    "data": [{
        "$unset": {
            "salary": true
        }
    }],
    "database": "kafkadb",
    "es": 1741848207000,
    "gtid": null,
    "id": 174184820700000****,
    "isDdl": false,
    "mysqlType": null,
    "old": [{
        "_id": {
            "$oid": "67d27da49591697476e1****"
        }
    }],
    "pkNames": ["_id"],
    "serverId": null,
    "sql": null,
    "sqlType": null,
    "table": "kafka_test",
    "ts": 1741848208186,
    "type": "UPDATE"
}

データの表示(クリックして展開)

{
    "data": [{
        "_id": {
            "$oid": "67d27da49591697476e1****"
        }
    }],
    "database": "kafkadb",
    "es": 1741848289000,
    "gtid": null,
    "id": 174184828900000****,
    "isDdl": false,
    "mysqlType": null,
    "old": null,
    "pkNames": ["_id"],
    "serverId": null,
    "sql": null,
    "sqlType": null,
    "table": "kafka_test",
    "ts": 1741848289798,
    "type": "DELETE"
}

データの表示(クリックして展開)

{
    "data": null,
    "database": "kafkadb",
    "es": 1741847893000,
    "gtid": null,
    "id": 1741847893000000005,
    "isDdl": true,
    "mysqlType": null,
    "old": null,
    "pkNames": null,
    "serverId": null,
    "sql": {
        "drop": "kafka_test"
    },
    "sqlType": null,
    "table": "kafka_test",
    "ts": 1741847893760,
    "type": "DDL"
}

シナリオ 2:ChangeStream — 更新されたフィールドのみ

移行方法ChangeStream に設定します。更新後のドキュメント全体を取得いいえ に設定します。

INSERT および DELETE メッセージはシナリオ 1 と同一です。UPDATE メッセージには、変更されたフィールドのみが含まれます。

データの表示(クリックして展開)

{
    "data": [{
        "$set": {
            "person.age": 20
        }
    }],
    "database": "kafkadb",
    "es": 1741848051000,
    "gtid": null,
    "id": 174184805100000****,
    "isDdl": false,
    "mysqlType": null,
    "old": [{
        "_id": {
            "$oid": "67d27da49591697476e1****"
        }
    }],
    "pkNames": ["_id"],
    "serverId": null,
    "sql": null,
    "sqlType": null,
    "table": "kafka_test",
    "ts": 1741848052912,
    "type": "UPDATE"
}

データの表示(クリックして展開)

{
    "data": [{
        "$unset": {
            "salary": 1
        }
    }],
    "database": "kafkadb",
    "es": 1741848207000,
    "gtid": null,
    "id": 174184820700000****,
    "isDdl": false,
    "mysqlType": null,
    "old": [{
        "_id": {
            "$oid": "67d27da49591697476e1****"
        }
    }],
    "pkNames": ["_id"],
    "serverId": null,
    "sql": null,
    "sqlType": null,
    "table": "kafka_test",
    "ts": 1741848209142,
    "type": "UPDATE"
}

シナリオ 3:ChangeStream — 更新時にドキュメント全体

移行方法ChangeStream に設定します。更新後のドキュメント全体を取得はい に設定します。

UPDATE イベントでは、変更されたフィールドではなく、変更後のドキュメント全体が配信されます。

データの表示(クリックして展開)

{
    "data": [{
        "person": {
            "skills": ["database", "ai"],
            "name": "testName",
            "age": 20
        },
        "_id": {
            "$oid": "67d27da49591697476e1****"
        },
        "cid": "a"
    }],
    "database": "kafkadb",
    "es": 1741848051000,
    "gtid": null,
    "id": 174184805100000****,
    "isDdl": false,
    "mysqlType": null,
    "old": [{
        "_id": {
            "$oid": "67d27da49591697476e1****"
        }
    }],
    "pkNames": ["_id"],
    "serverId": null,
    "sql": null,
    "sqlType": null,
    "table": "kafka_test",
    "ts": 1741848052219,
    "type": "UPDATE"
}

データの表示(クリックして展開)

{
    "data": [{
        "person": {
            "skills": ["database", "ai"],
            "name": "testName",
            "age": 20
        },
        "_id": {
            "$oid": "67d27da49591697476e1****"
        },
        "salary": 100.0,
        "cid": "a"
    }],
    "database": "kafkadb",
    "es": 1741848146000,
    "gtid": null,
    "id": 174184814600000****,
    "isDdl": false,
    "mysqlType": null,
    "old": [{
        "_id": {
            "$oid": "67d27da49591697476e1****"
        }
    }],
    "pkNames": ["_id"],
    "serverId": null,
    "sql": null,
    "sqlType": null,
    "table": "kafka_test",
    "ts": 1741848147327,
    "type": "UPDATE"
}

データの表示(クリックして展開)

{
    "data": [{
        "person": {
            "skills": ["database", "ai"],
            "name": "testName",
            "age": 20
        },
        "_id": {
            "$oid": "67d27da49591697476e1****"
        },
        "cid": "a"
    }],
    "database": "kafkadb",
    "es": 1741848207000,
    "gtid": null,
    "id": 174184820700000****,
    "isDdl": false,
    "mysqlType": null,
    "old": [{
        "_id": {
            "$oid": "67d27da49591697476e1****"
        }
    }],
    "pkNames": ["_id"],
    "serverId": null,
    "sql": null,
    "sqlType": null,
    "table": "kafka_test",
    "ts": 1741848208401,
    "type": "UPDATE"
}

特殊ケース:ChangeStream における fullDocument の欠落

ChangeStream の UPDATE イベントの fullDocument フィールドが欠落している場合(たとえば、シャードコレクションでドキュメントがシャード間を移動した場合など)、配信されるメッセージは Oplog の動作($set または $unset 演算子を用いた部分更新)にフォールバックします。

例:fullDocument が欠落しているシャードコレクションの更新

ソースの基本データ:

use admin
db.runCommand({ enablesharding:"dts_test" })

ソースの増分変更:

use dts_test
sh.shardCollection("dts_test.cstest",{"name":"hashed"})
db.cstest.insert({"_id":1,"name":"a"})
db.cstest.updateOne({"_id":1,"name":"a"},{$set:{"name":"b"}})

データの表示(クリックして展開)

{
    "data": [{
        "$set": {
            "name": "b"
        }
    }],
    "database": "dts_test",
    "es": 1740720994000,
    "gtid": null,
    "id": 174072099400000****,
    "isDdl": false,
    "mysqlType": null,
    "old": [{
        "name": "a",
        "_id": 1.0
    }],
    "pkNames": ["_id"],
    "serverId": null,
    "sql": null,
    "sqlType": null,
    "table": "cstest",
    "ts": 1740721007099,
    "type": "UPDATE"
}

よくある質問

タスク作成後に Kafka データ圧縮形式を変更できますか?

はい。 「同期対象オブジェクトの変更」をご参照ください。

タスク作成後にメッセージ確認応答メカニズムを変更できますか?

はい。 「同期対象オブジェクトの変更」をご参照ください。

次のステップ