すべてのプロダクト
Search
ドキュメントセンター

Data Transmission Service:ApsaraDB for MongoDB のシャードクラスターインスタンスから Function Compute の関数へのデータ同期

最終更新日:Mar 29, 2026

Data Transmission Service (DTS) は、ApsaraDB for MongoDB のシャードクラスターから増分変更をキャプチャし、各変更イベントを Canal JSON フォーマットで Function Compute (FC) の関数に配信します。関数コードを記述して、イベントを処理、変換、またはダウンストリームに転送します。

関数が受信する内容

各呼び出しは、Records 配列を持つオブジェクトを渡します。配列内のすべての要素は、1 つの変更イベントを表します。

フィールドタイプ説明
isDdlブール値True は DDL 操作用です。 False は DML 操作用です。
type文字列DML: INSERTUPDATE、または DELETE。DDL: DDL
database文字列MongoDB データベース名
table文字列コレクション名
pkNames文字列プライマリキー名。MongoDB の場合は常に _id です
esLong13 桁の UNIX タイムスタンプ (ミリ秒) — ソースで操作が発生した時刻
tsLong13 桁の UNIX タイムスタンプ (ミリ秒) — DTS が宛先への書き込みを開始した時刻
dataオブジェクト配列1 つの要素を持つ配列。要素には doc キーがあり、その値はドキュメントを表す JSON 文字列です。レコードを読み取るには、値を逆シリアル化します
oldオブジェクト配列data と同じフォーマットです。typeUPDATE の場合にのみ存在し、更新前のドキュメントの状態が含まれます
idInt操作のシリアル番号

関数は、次の 2 種類の操作を受け取ります。

  • DDL — スキーマの変更: CreateIndexCreateCollectionDropIndexDropCollection

  • DML — データの変更: INSERTUPDATEDELETE

ペイロードの例

コレクションの作成 (DDL)

db.createCollection("testCollection")
{
  "Records": [{
    "data": [{"doc": "{\"create\": \"testCollection\", \"idIndex\": {\"v\": 2, \"key\": {\"_id\": 1}, \"name\": \"_id_\"}}"}],
    "pkNames": ["_id"],
    "type": "DDL",
    "es": 1694056437000,
    "database": "MongoDBTest",
    "id": 0,
    "isDdl": true,
    "table": "testCollection",
    "ts": 1694056437510
  }]
}

コレクションの削除 (DDL)

db.testCollection.drop()
{
  "Records": [{
    "data": [{"doc": "{\"drop\": \"testCollection\"}"}],
    "pkNames": ["_id"],
    "type": "DDL",
    "es": 1694056577000,
    "database": "MongoDBTest",
    "id": 0,
    "isDdl": true,
    "table": "testCollection",
    "ts": 1694056577789
  }]
}

インデックスの作成 (DDL)

db.testCollection.createIndex({name: 1})
{
  "Records": [{
    "data": [{"doc": "{\"createIndexes\": \"testCollection\", \"v\": 2, \"key\": {\"name\": 1}, \"name\": \"name_1\"}"}],
    "pkNames": ["_id"],
    "type": "DDL",
    "es": 1694056670000,
    "database": "MongoDBTest",
    "id": 0,
    "isDdl": true,
    "table": "testCollection",
    "ts": 1694056670719
  }]
}

インデックスの削除 (DDL)

db.testCollection.dropIndex({name: 1})
{
  "Records": [{
    "data": [{"doc": "{\"dropIndexes\": \"testCollection\", \"index\": \"name_1\"}"}],
    "pkNames": ["_id"],
    "type": "DDL",
    "es": 1694056817000,
    "database": "MongoDBTest",
    "id": 0,
    "isDdl": true,
    "table": "$cmd",
    "ts": 1694056818035
  }]
}

ドキュメントの挿入 (DML)

// バッチ挿入
db.runCommand({insert: "user", documents: [{"name": "jack", "age": 20}, {"name": "lili", "age": 20}]})

// 単一挿入
db.user.insert({"name": "jack", "age": 20})
{
  "Records": [
    {
      "data": [{"doc": "{\"_id\": {\"$oid\": \"64f9397f6e255f74d65a****\"}, \"name\": \"jack\", \"age\": 20}"}],
      "pkNames": ["_id"],
      "type": "INSERT",
      "es": 1694054783000,
      "database": "MongoDBTest",
      "id": 0,
      "isDdl": false,
      "table": "user",
      "ts": 1694054784427
    },
    {
      "data": [{"doc": "{\"_id\": {\"$oid\": \"64f9397f6e255f74d65a****\"}, \"name\": \"lili\", \"age\": 20}"}],
      "pkNames": ["_id"],
      "type": "INSERT",
      "es": 1694054783000,
      "database": "MongoDBTest",
      "id": 0,
      "isDdl": false,
      "table": "user",
      "ts": 1694054784428
    }
  ]
}

ドキュメントの更新 (DML)

db.user.update({"name": "jack"}, {$set: {"age": 30}})
{
  "Records": [{
    "data": [{"doc": "{\"$set\": {\"age\": 30}}"}],
    "pkNames": ["_id"],
    "old": [{"doc": "{\"_id\": {\"$oid\": \"64f9397f6e255f74d65a****\"}}"}],
    "type": "UPDATE",
    "es": 1694054989000,
    "database": "MongoDBTest",
    "id": 0,
    "isDdl": false,
    "table": "user",
    "ts": 1694054990555
  }]
}
UPDATE 操作では、DTS が増分データを同期する際に $set コマンドのみが同期的に実行されます。

ドキュメントの削除 (DML)

db.user.remove({"name": "jack"})
{
  "Records": [{
    "data": [{"doc": "{\"_id\": {\"$oid\": \"64f9397f6e255f74d65a****\"}}"}],
    "pkNames": ["_id"],
    "type": "DELETE",
    "es": 1694055452000,
    "database": "MongoDBTest",
    "id": 0,
    "isDdl": false,
    "table": "user",
    "ts": 1694055452852
  }]
}

制限事項

同期タスクを作成する前に、これらの制約を確認してください。

範囲の制約:

  • 増分データ同期のみがサポートされています。完全データ同期はサポートされていません。

  • リージョン間同期はサポートされていません。

  • DTS は adminconfig、または local データベースからデータを同期できません。

  • オブジェクトマッピングはサポートされていません。

  • トランザクション情報は保持されません。トランザクションは、宛先で個別のレコードに変換されます。

ソースデータベースの制約:

  • ソースは、10 個以下の Mongos ノードを持つシャードクラスターアーキテクチャを使用する必要があります。

  • ソースは、Azure Cosmos DB for MongoDB クラスターまたは Amazon DocumentDB エラスティッククラスターであってはなりません。

  • 同期するコレクションには、PRIMARY KEY 制約または UNIQUE 制約が必要であり、すべてのフィールドが一意である必要があります。そうでない場合、ターゲットデータベースに重複したデータレコードが含まれる可能性があります。

  • 単一のドキュメントは 16 MB を超えることはできません。このサイズを超えるドキュメントは、宛先関数に書き込むことができず、エラーをトリガーします。必要に応じて、抽出、変換、ロード (ETL) 機能を使用して大きなフィールドをフィルター処理します。

  • タスクごとに最大 1,000 個のコレクションを同期します。より多くのコレクションを同期するには、複数のタスクを作成するか、データベースレベルで同期します。

  • 同期タスクの実行中にソースインスタンスをスケーリングすることはできません。

  • DTS は SRV エンドポイントを介して MongoDB データベースに接続できません。

  • ソースデータベースのバランサーがアクティブな場合、タスクに遅延が発生する可能性があります。

  • ソースデータベースがシャードクラスターアーキテクチャを使用する自社運用 MongoDB データベースである場合、[アクセス方法] パラメーターを [Express Connect、VPN Gateway、または Smart Access Gateway] または [Cloud Enterprise Network (CEN)] に設定します。

制約の記述:

  • INSERT 操作の場合、挿入されるデータにはシャードキーが含まれている必要があります。

  • UPDATE 操作の場合、シャードキーは変更できません。

  • 宛先関数ごとに 1 つの DTS タスクのみを設定します。複数のタスクが同じ関数に書き込むと、データエラーが発生する可能性があります。

oplog と変更ストリームの要件:

  • oplog を有効にし、少なくとも 7 日間のログデータを保持するか、変更ストリームを有効にして少なくとも過去 7 日間の変更をカバーする必要があります。どちらの条件も満たされない場合、DTS はソースの変更をキャプチャできず、DTS サービスレベルアグリーメント (SLA) の対象外となるデータの不整合や損失を引き起こす可能性があります。

変更ストリームの制限 (該当する場合):

  • 変更ストリームには MongoDB 4.0 以降が必要です。

  • 変更ストリームを使用する場合、双方向同期はサポートされていません。

  • 非エラスティック Amazon DocumentDB クラスターの場合は、変更ストリームを使用します。[移行方法][ChangeStream] に、[アーキテクチャ][シャードクラスター] に設定します。

新しいデータベース:

  • DTS は、同期タスクの開始後に作成されたデータベースからの増分データを同期しません。

サポートされる操作

キャプチャされる操作は、移行方法によって異なります。

oplog の使用 (推奨):

  • CREATE COLLECTIONCREATE INDEX

  • DROP DATABASEDROP COLLECTIONDROP INDEX

  • RENAME COLLECTION

  • ドキュメントレベルの INSERT、UPDATE、DELETE

変更ストリームの使用:

  • DROP DATABASEDROP COLLECTION

  • RENAME COLLECTION

  • ドキュメントレベルの INSERT、UPDATE、DELETE

課金

増分データ同期は課金対象です。料金の詳細については、「課金概要」をご参照ください。

課金方法説明
サブスクリプション1〜9 か月、または 1、2、3、5 年分を前払いします。長期利用の場合、より費用対効果が高くなります
従量課金時間単位で課金されます。不要になったインスタンスをリリースすると、課金が停止します
DTS は、接続再試行期間中もインスタンスに対して課金します。

前提条件

開始する前に、以下を確認してください。

  • 実行中の ApsaraDB for MongoDB シャードクラスターインスタンス。詳細については、「シャードクラスターインスタンスの作成」をご参照ください

  • Function Compute サービスおよび関数で、ハンドラータイプ[イベントハンドラー] に設定されています。詳細については、「関数をすばやく作成する」をご参照ください。

  • ソース ApsaraDB for MongoDB インスタンス上のデータベースアカウントに、ソース、admin、および local データベースに対する読み取り権限があること。詳細については、「MongoDB データベースユーザーの権限管理」をご参照ください

同期タスクの作成

ステップ 1: データ同期ページの表示

いずれかのコンソールを使用して [データ同期] ページに移動します。

DTS コンソール

  1. DTS コンソールにログインします。DTS コンソール

  2. 左側のナビゲーションウィンドウで、[データ同期] をクリックします。

  3. 左上隅で、同期タスクを実行するリージョンを選択します。

DMS コンソール

正確なナビゲーションパスは、お客様の DMS コンソールのレイアウトによって異なります。「シンプルモード」および「DMS コンソールのレイアウトとスタイルのカスタマイズ」を参照してください。

  1. DMS コンソールにログインします。DMS コンソール

  2. 上部のナビゲーションバーで、[データ + AI] にポインターを合わせ、[DTS (DTS)] > [データ同期] を選択します。

  3. [データ同期タスク] の右側にあるドロップダウンから、ターゲットリージョンを選択します。

ステップ 2: ソースデータベースとターゲットデータベースの設定

[タスクの作成] をクリックし、次のパラメーターを設定します。

セクションパラメーター説明
該当なしタスク名DTS が自動生成する名前です。タスクを容易に識別できるよう、意味のある名前を指定してください。一意性は必須ではありません。
ソースデータベース既存の接続を選択登録済みのデータベースインスタンスを選択すると、以下のパラメーターが自動で入力されます。または、手動で設定することもできます。
データベースタイプMongoDB
アクセス方法Alibaba Cloud インスタンス
インスタンスリージョンソース MongoDB インスタンスのリージョン
Alibaba Cloud アカウント間でのデータ複製同一アカウント内での同期を行う場合は、いいえ を選択します。
アーキテクチャシャードクラスター
移行方法DTS による増分データの取得方法です。オプション:推奨の Oplog、または下記の「詳細」をご参照ください。ChangeStream
インスタンス IDソース MongoDB インスタンスの ID
認証用データベースアカウントの認証情報を格納するデータベースです。デフォルト値:admin
データベースアカウント必要な読み取り権限を持つソースデータベースのアカウント
データベースパスワードデータベースアカウントのパスワード
シャードアカウントソースインスタンス内のシャードにアクセスするためのアカウント
シャードパスワードシャードアカウントのパスワード
暗号化接続時の暗号化モードです。オプション: 非暗号化SSL 暗号化Mongo Atlas SSL。利用可能なオプションは、アクセス方法 および アーキテクチャ の選択内容によって異なります。アーキテクチャシャードクラスター かつ 移行方法Oplog の場合、SSL 暗号化
送信先データベース既存の接続を選択登録済みの Function Compute インスタンスを選択すると、以下のパラメーターが自動で入力されます。または、手動で設定することもできます。
データベースタイプFunction Compute
アクセス方法Alibaba Cloud インスタンス
インスタンスリージョンソースリージョンと一致します。変更できません。
サービス送信先関数を含む Function Compute サービス
関数同期されたデータを受信する関数
サービスバージョンおよびエイリアスサービスのバージョンまたはエイリアスです。オプション: デフォルトバージョン(固定値:LATEST)、指定したバージョンサービスバージョン の指定が必要)、指定したエイリアスサービスエイリアス用語

移行方法の選択:

方法使用する状況
Oplog (推奨)ソースで oplog が有効になっている場合 (自社運用 MongoDB と ApsaraDB for MongoDB の両方でデフォルト)。ログのプルが高速なため、同期遅延が短縮されます
ChangeStreamoplog が無効になっている場合、または非エラスティック Amazon DocumentDB クラスターを使用している場合 (変更ストリームが必要)。MongoDB 4.0 以降が必要です。双方向同期はサポートされていません
[アーキテクチャ][シャードクラスター] で、[移行方法][ChangeStream] の場合、[シャードアカウント][シャードパスワード] パラメーターは不要です。

ステップ 3: 接続テスト

[接続テストと次へ] をクリックします。

    DTS サーバーの CIDR ブロックを、ソースおよび送信先のセキュリティグループまたは許可リストに追加することを確認してください。詳細については、「DTS サーバーの CIDR ブロックをオンプレミスデータベースのセキュリティ設定に追加する」をご参照ください。ソースまたは送信先でセルフマネージドアクセス方法を使用している場合は、まず「[接続テスト]」を「[DTS サーバーの CIDR ブロック]」ダイアログでクリックします。

    ステップ 4: 同期するオブジェクトの選択

    [オブジェクトの設定] ステップで、以下を設定します。

    パラメーター説明
    同期タイプ[増分データ同期] に固定されています。変更できません
    データ形式[Canal Json] に固定されています。フィールドの説明については、「Kafka クラスター Topic のデータ形式」の Canal Json セクションをご参照ください
    ソースオブジェクト同期するデータベースまたはコレクションを選択し、向右 をクリックして [選択したオブジェクト]
    選択したオブジェクト選択したオブジェクトを確認します。zuoyi をクリックしてオブジェクトを削除します。オブジェクトを右クリックして、同期の粒度 (データベースまたはコレクションレベル) を設定します

    ステップ 5: 詳細設定

    [次へ: 詳細設定] をクリックし、以下を設定します。

    パラメーター説明
    タスクスケジューリング用の専用クラスターデフォルトでは、DTS はタスクを共有クラスターにスケジュールします。より高い安定性を得るために、専用クラスターをご購入ください。詳しくは、「DTS 専用クラスターとは
    接続失敗時の再試行時間ソースまたは宛先が到達不能な場合に DTS が再試行する時間。範囲: 10〜1440 分。デフォルト: 720。少なくとも 30 分に設定してください。複数のタスクが同じソースまたは宛先を共有する場合、最も短い再試行時間が優先されます
    その他の問題に対する再試行時間DTS が失敗した DDL または DML 操作を再試行する時間。範囲: 1〜1440 分。デフォルト: 10。少なくとも 10 分に設定してください。[接続失敗時の再試行時間]
    更新後にドキュメント全体を取得ChangeStream のみ。[はい]: 更新後に完全なドキュメントを送信します。[いいえ]: 変更されたフィールドのみを送信します
    増分データ同期のスループット制限を有効化送信先のロードを軽減するために、同期スループットを制限します。[増分データ同期の RPS][増分同期のデータ同期速度 (MB/s)]
    環境タグDTS インスタンスの環境を識別するためのタグ。オプション
    ETL の設定ETL 機能を有効化して、転送中のデータを変換します。詳細については、「ETL とは?」および「データ移行またはデータ同期タスクで ETL を設定する
    モニタリングとアラートタスクが失敗した場合、または同期遅延がしきい値を超えた場合にアラートを送信します。詳細については、「DTS タスクの作成時にモニタリングとアラートを設定する

    ステップ 6: 事前チェックの実行

    [次へ: タスク設定の保存と事前チェック] をクリックします。

    このタスク設定の API パラメーターをプレビューするには、[次へ: タスク設定の保存と事前チェック] にポインターを合わせ、[OpenAPI パラメーターのプレビュー] をクリックします。

    DTS は、同期タスクを開始する前に事前チェックを実行します。タスクは、事前チェックに合格した後にのみ開始されます。

    • 項目が失敗した場合: 失敗した項目の横にある [詳細の表示] をクリックし、報告された問題を修正してから、[再度事前チェック] をクリックします。

    • アラートがトリガーされた場合:

      • アラートを無視できない場合: [詳細の表示] をクリックし、問題を修正して、事前チェックを再実行します。

      • アラートを無視できる場合: [アラート詳細の確認] をクリックし、ダイアログで [無視] をクリックし、[OK] で確認してから、[再度事前チェック] をクリックします。アラートを無視すると、データの不整合が発生する可能性があります。

    ステップ 7: インスタンスの購入

    1. [成功率][100%] に達するまで待ってから、[次へ: インスタンスの購入] をクリックします。

    2. [購入] ページで、次の項目を設定します。

    セクションパラメーター説明
    新規インスタンスクラス課金方法サブスクリプション または 従量課金
    リソースグループ設定インスタンスのリソースグループ。デフォルト: デフォルトリソースグループResource Management とは
    インスタンスクラス同期速度を決定します。詳細については、「データ同期インスタンスのインスタンスクラス
    サブスクリプション期間サブスクリプション 課金でのみ利用可能です。オプション: 1〜9 か月、または 1、2、3、5 年
    1. [Data Transmission Service (従量課金) 利用規約] を読んで選択します。

    2. [購入して開始] をクリックし、確認ダイアログで [OK] をクリックします。

    タスクリストでタスクの進捗状況を追跡します。

    次のステップ

    • Function Compute のイベントハンドラーを記述して、Canal JSON ペイロードを処理します。isDdl フィールドを使用して DDL と DML のロジックを分岐させ、各 data 要素の doc 文字列を逆シリアル化してドキュメントフィールドにアクセスします。

    • 関数への大規模フィールドのドキュメントの負荷を軽減するには、データが関数に到達する前に ETL フィルターを設定します。詳細については、「データ移行またはデータ同期タスクで ETL を設定する」をご参照ください。

    • 同期の健全性をモニターするには、[高度な設定] でアラートの設定を行うか、タスク作成後にアラートの設定を行います。詳細については、「DTS タスクを作成するときにモニタリングとアラートの設定を行う」をご参照ください。

    • タスクが失敗した場合、DTS テクニカルサポートは 8 時間以内にそのタスクを復元しようと試みます。復元中に、タスクが再起動され、タスクパラメーター(データベースパラメーターではない)が調整される場合があります。変更可能なパラメーターのリストについては、「インスタンスパラメーターの変更」をご参照ください。