すべてのプロダクト
Search
ドキュメントセンター

Data Transmission Service:ApsaraDB for MongoDB レプリカセットインスタンスから Function Compute 関数へのデータ同期

最終更新日:Mar 28, 2026

Data Transmission Service (DTS) を使用して、ApsaraDB for MongoDB レプリカセットインスタンスの増分変更を、Function Compute 関数に直接ストリーミングします。各 INSERT、UPDATE、DELETE、DDL イベントは Canal JSON 形式で配信されるため、関数コードでデータを処理・変換・送信先へ転送できます。別途コンシューマー層を構築する必要はありません。

前提条件

開始する前に、以下の条件を満たしていることを確認してください。

課金

増分データ同期は有料機能です。詳細については、「課金概要」をご参照ください。

同期タイプ

タスク構成料金

増分データ同期

課金対象です。詳細については、「課金概要」をご参照ください。

制限事項

ソースデータベース

制約事項詳細
帯域幅ソースデータベースをホストするサーバーには、十分なアウトバウンド帯域幅が必要です。そうでない場合、同期速度に影響が出ます。
主キーまたは一意キー同期対象のコレクションには、主キー制約 (PRIMARY KEY constraint) または一意制約 (UNIQUE constraint) が設定されている必要があります。また、すべてのフィールド値が一意である必要があります。この条件を満たさないと、ターゲット関数で重複レコードが受信される可能性があります。
単一レコードサイズレコードあたり最大 16 MB です。これより大きいレコードでは DTS がエラーを報告します。抽出・変換・書き出し (ETL) 機能を使用して、大規模なフィールドを除外してください。
コレクション数タスクあたり最大 1,000 コレクションです。1,000 を超えるコレクションを同期する場合は、複数のタスクを構成するか、データベースレベルでの同期を実行してください。
非対応ソースAzure Cosmos DB for MongoDB クラスターおよび Amazon DocumentDB エラスティッククラスターはサポートされていません。
oplog / change streamsソースデータベースでは oplog を有効化し、少なくとも 7 日分のログデータを保持する必要があります。または、過去 7 日分をカバーするように change streams を有効化する必要があります。いずれの条件も満たされない場合、DTS が変更を検知できない可能性があり、データの不整合または損失を引き起こすことがあります。これは DTS のサービスレベルアグリーメント (SLA) の対象外です。
change streams に対応する MongoDB バージョンchange streams は MongoDB 4.0 以降を必要とします。change streams を使用した双方向同期はサポートされていません。
Amazon DocumentDB(非エラスティック)change streams を有効化し、移行方法ChangeStream に、アーキテクチャシャードクラスター に設定します。
SRV エンドポイントDTS は SRV エンドポイント経由で MongoDB に接続できません。

その他の制限事項

  • DTS は adminconfiglocal データベースを同期できません。

  • 完全データ同期はサポートされていません — 増分同期のみ対応しています。

  • クロスリージョン同期はサポートされていません。

  • オブジェクト名マッピング機能はサポートされていません。

  • データエラーを回避するため、1 つのターゲット関数に対して DTS タスクは 1 つだけ割り当ててください。

  • トランザクションコンテキストは保持されません。各トランザクションは個別のレコードに変換されます。

  • DTS タスクが失敗した場合、DTS テクニカルサポートが 8 時間以内に復旧を試みます。復旧中、タスクは再起動され、タスクパラメーター(データベースパラメーターではない)が変更される場合があります。

特殊ケース(自己管理 MongoDB)

  • タスク実行中にプライマリ/セカンダリ スイッチオーバーが発生すると、タスクが失敗します。

  • 同期遅延は、最新の同期済みレコードのタイムスタンプと現在のソースタイムスタンプとの差分で算出されます。ソースで長期間書き込みが行われていない場合、報告される遅延値は不正確になる可能性があります。遅延値を更新するには、ソースデータベースにレコードを書き込んでください。

データベース全体を同期する場合は、ハートビートテーブルを作成してください。DTS はハートビートテーブルを毎秒更新するため、遅延値の精度が保たれます。

サポートされる操作

DTS が同期する操作は、選択した 移行方法 によって異なります。

操作oplogchange streams
INSERT対応対応
UPDATE対応対応
DELETE対応対応
CREATE COLLECTION対応非対応
CREATE INDEX対応非対応
DROP DATABASE対応対応
DROP COLLECTION対応対応
DROP INDEX対応非対応
RENAME COLLECTION対応対応

oplog を使用する場合

DTS タスクは、タスク開始後に作成されたデータベースからの増分データを同期しません。DTS が同期する増分データは、以下の操作によって生成されます。

  • CREATE COLLECTION および CREATE INDEX

  • データベース、コレクション、およびインデックス の DROP

  • RENAME COLLECTION

  • コレクション内のドキュメントに対する挿入、更新、削除操作

    説明

    ドキュメントの増分データを同期する場合、$set コマンドを使用した更新操作のみが対応します。

change streams を使用する場合

DTS が同期する増分データは、以下の操作によって生成されます。

  • DROP DATABASE および DROP COLLECTION

  • RENAME COLLECTION

  • コレクション内のドキュメントに対する挿入、更新、削除操作

    説明

    ドキュメントの増分データを同期する場合、$set コマンドを使用した更新操作のみが対応します。

ファイルの増分データを同期する場合、$set コマンドのみが対応します。タスク開始後に作成されたデータベースの増分データは同期されません(oplog モードに適用)。

必要なデータベースアカウント権限

データベース必要な権限参考情報
ソース MongoDB インスタンスソース、admin、および local データベースの読み取りMongoDB データベースユーザーの権限管理

同期タスクの作成

ステップ 1:データ同期ページを開く

以下のいずれかのコンソールを使用します。

DTS コンソール

  1. DTS コンソール にログインします。DTS コンソール

  2. 左側ナビゲーションウィンドウで、データ同期 をクリックします。

  3. 画面左上隅で、同期タスクを実行するリージョンを選択します。

DMS コンソール

具体的なナビゲーション手順は、DMSコンソールモードとレイアウトによって異なります。「シンプルモード」および「DMSコンソールのレイアウトとスタイルをカスタマイズする」をご参照ください。
  1. DMS コンソール にログインします。DMS コンソール

  2. 上部ナビゲーションバーで、Data + AI にポインターを合わせ、DTS (DTS) > データ同期 を選択します。

  3. データ同期タスク の右側にあるドロップダウンリストから、リージョンを選択します。

ステップ 2:ソースおよびターゲットデータベースの構成

タスクの作成 をクリックし、以下の表に記載されているパラメーターを構成します。

一般

パラメーター説明
タスク名DTS タスクの名前です。DTS がデフォルト名を生成します。タスクを識別しやすいように、意味のある名前を指定してください。名前の重複は許可されます。

ソースデータベース

パラメーター説明
既存の接続を選択ドロップダウンリストから登録済みのデータベースインスタンスを選択するか、手動で接続を構成します。
データベースタイプMongoDB を選択します。
アクセス方法Alibaba Cloud インスタンス を選択します。
インスタンスリージョンソース MongoDB インスタンスが配置されているリージョンです。
Alibaba Cloud アカウント間でのデータ複製同一アカウント内での同期の場合は、いいえ を選択します。
アーキテクチャレプリカセット を選択します。
移行方法DTS がソースから増分データを読み取る方法。「[Oplog]」(推奨):ソースで oplog 機能が有効になっている場合に利用可能です。Oplog は高速なログプルにより低遅延を実現します。「[ChangeStream]」:チェンジストリームが有効になっている場合に利用可能です。詳細については、「Change Streams」をご参照ください。[シャードクラスタ][アーキテクチャ] で選択した場合、[シャードアカウント] パラメーターおよび [シャードパスワード] パラメーターは不要です。
インスタンス IDソース MongoDB インスタンスの ID です。
認証データベースアカウントおよびパスワードを格納するデータベースです。デフォルト値:admin
データベースアカウント必要な読み取り権限を持つアカウントです。
データベースパスワードデータベースアカウントのパスワードです。
暗号化接続暗号化: 暗号化なしSSL 暗号化、または Mongo Atlas SSL。利用可能なオプションは、アクセス方法 および アーキテクチャ の値によって異なります。アーキテクチャシャードクラスター で、移行方法Oplog の場合、SSL 暗号化は利用できません。Alibaba Cloud インスタンスアクセスを使用しない自己管理 MongoDB レプリカセットの場合、SSL 暗号化 を選択した際に CA 証明書をアップロードできます。

ターゲットデータベース

パラメーター説明
既存の接続を選択ドロップダウンリストから登録済みのインスタンスを選択するか、手動で構成します。
データベースタイプFunction Compute を選択します。
アクセス方法Alibaba Cloud インスタンス を選択します。
インスタンスリージョンデフォルトでソースと同じリージョンになります。変更できません。
サービスターゲット関数を含む Function Compute サービスです。
関数同期データを受信するターゲット関数です。
サービスバージョンおよびエイリアスデータを受信する関数のバージョンを決定します。デフォルトバージョン を選択すると、サービスバージョンが LATEST に固定されます。指定バージョン を選択した場合は、サービスバージョン パラメーターを構成する必要があります。指定エイリアス を選択した場合は、サービスエイリアス パラメーターを構成する必要があります。Function Compute の用語については、「用語」をご参照ください。

ステップ 3:接続性のテスト

ページ下部の 接続性のテストと続行 をクリックします。

DTS サーバーの CIDR ブロックを、ソースおよびターゲットデータベースのセキュリティ設定に追加する必要があります。詳細については、「DTS サーバーの CIDR ブロックをオンプレミスデータベースのセキュリティ設定に追加する」をご参照ください。ソースまたはターゲットが、[Alibaba Cloud Instance] アクセスを使用しない自己管理データベースである場合、[接続テスト][DTS サーバーの CIDR ブロック] ダイアログボックスでクリックします。

ステップ 4:同期対象オブジェクトの構成

[オブジェクトの設定] ステップで、以下のパラメーターを設定します。

パラメーター説明
同期タイプ固定値:増分データ同期
データ形式固定値:Canal Json。フィールドの説明については、「Kafka クラスターのデータ形式」をご参照ください。
ソースオブジェクトデータベースまたはコレクションを選択し、右向き矢印アイコンをクリックして 選択済みオブジェクト に移動します。
選択済みオブジェクト同期対象のオブジェクトを確認します。オブジェクトを削除するには、該当オブジェクトを選択し、左向き矢印アイコンをクリックします。データベースレベルまたはコレクションレベルの同期を構成するには、選択済みオブジェクト 内で右クリックします。

次へ:高度な設定 をクリックします。

ステップ 5:高度な設定の構成

パラメーター説明
タスクスケジューリング専用クラスターデフォルトでは、DTS はタスクを共有クラスターにスケジュールします。安定性を向上させるには、専用クラスターを購入してください。詳細については、「DTS 専用クラスターとは」をご参照ください。
接続失敗時の再試行時間ソースまたはターゲットデータベースに到達不能な場合の DTS の再試行時間です。有効範囲:10~1,440 分。デフォルト値:720 分。30 分を超える値を設定してください。複数のタスクが同一データベースを共有する場合、最も短い再試行時間が適用されます。再試行期間中は、DTS インスタンスに対して課金されます。
その他の問題発生時の再試行時間DDL または DML 操作が失敗した場合の DTS の再試行時間です。有効範囲:1~1,440 分。デフォルト値:10 分。10 分を超える値を設定してください。この値は 接続失敗時の再試行時間 よりも短くする必要があります。
更新後の完全ドキュメントの取得移行方法ChangeStream の場合にのみ利用可能です。はい:更新後に完全なドキュメントを同期します。いいえ:変更されたフィールドのみを同期します。
増分データ同期のスロットリングを有効化スループットを制限してターゲットの負荷を軽減します。増分データ同期の RPS および 増分同期のデータ同期速度 (MB/s) を構成します。
環境タグDTS インスタンスを識別するための任意のタグです。
ETL の構成送信先に到達する前にデータをフィルターまたは変換するには、ETL 機能を有効化します。詳細については、「ETL とは」および「データ移行またはデータ同期タスクで ETL を設定する」をご参照ください。
モニタリングとアラートタスク失敗または遅延がしきい値を超えた場合にアラートを設定します。詳細については、「DTS タスク作成時のモニタリングとアラートの設定」をご参照ください。

ステップ 6:事前チェックの実行

次へ:タスク設定の保存と事前チェック をクリックします。

このタスク構成の OpenAPI パラメーターをプレビューするには、次へ:タスク設定の保存と事前チェック にポインターを合わせ、OpenAPI パラメーターのプレビュー をクリックします。

DTS はタスク開始前に事前チェックを実行します。事前チェックが失敗した場合:

  • 各失敗項目について、詳細の表示 をクリックして問題を解決し、その後 再チェック をクリックします。

  • 無視可能なアラート項目については、アラート詳細の確認 > 無視 > OK > 再チェック の順にクリックします。アラートを無視すると、データの不整合が発生する可能性があります。

ステップ 7:インスタンスの購入

  1. 成功率100% になるまで待ち、その後 次へ:インスタンスの購入 をクリックします。

  2. 購入ページで、以下のパラメーターを構成します。

パラメーター説明
課金方法サブスクリプション:前払い方式。長期利用に適しています。従量課金:時間単位で課金されます。短期利用に適しています。不要になったらインスタンスを解放して、課金を回避してください。
リソースグループ設定インスタンスのリソースグループです。デフォルト値:デフォルトリソースグループResource Management とは
インスタンスクラス同期スループットクラス。詳細については、「データ同期インスタンスのインスタンスクラス」をご参照ください。
サブスクリプション期間課金方法サブスクリプション の場合に利用可能です。選択肢:1~9 か月、または 1、2、3、5 年です。
  1. Data Transmission Service (従量課金) サービス利用規約 に同意します。

  2. 購入して開始 をクリックし、ダイアログボックスで OK をクリックします。

タスクがタスク一覧に表示されます。そこから進捗状況を監視できます。

次のステップ

関数が受信するデータ形式

DTS はデータを Object として関数に配信します。増分レコードは Records フィールドに配列として格納されます。配列の各要素は、以下のフィールドを持つ Object です。

関数は、2 種類の操作を受信します。
DDL:スキーマ変更 — CreateIndex、CreateCollection、DropIndex、DropCollection。
DML:データ変更 — INSERT、UPDATE、DELETE。
フィールド説明
isDdlブール値True(DDL の場合)、False(DML の場合)。
type文字列DML: DELETEUPDATE、または INSERT。DDL: DDL
database文字列MongoDB のデータベース名です。
table文字列コレクション名です。
pkNames文字列プライマリキー名です。MongoDB の場合は常に _id です。
esLongソースデータベースで操作が実行されたときの UNIX タイムスタンプ(ミリ秒単位)です。
tsLongDTS がターゲットへの書き込みを開始したときの UNIX タイムスタンプ(ミリ秒単位)です。
dataObject 配列1 要素の配列です。要素には doc というキーと、JSON 文字列を値とするペアが含まれます。値をデシリアライズしてレコードを取得します。
oldObject 配列元のデータが格納される配列です。フィールドの形式は data フィールドと同じです。type フィールドの値が UPDATE の場合にのみ利用可能です。
idInt操作のシリアル番号です。

DDL の例

コレクションの作成

コレクションの削除

インデックスの作成

インデックスの削除

DML の例

データの挿入

SQL ステートメント:

// 複数のレコードを一度に挿入
db.runCommand({insert: "user", documents: [{"name":"jack","age":20},{"name":"lili","age":20}]})

// 1 件ずつレコードを挿入
db.user.insert({"name":"jack","age":20})
db.user.insert({"name":"lili","age":20})

関数が受信するデータ:

{
  "Records": [
    {
      "data": [{"doc": "{\"_id\": {\"$oid\": \"64f9397f6e255f74d65a****\"}, \"name\": \"jack\", \"age\": 20}"}],
      "pkNames": ["_id"],
      "type": "INSERT",
      "es": 1694054783000,
      "database": "MongoDBTest",
      "id": 0,
      "isDdl": false,
      "table": "user",
      "ts": 1694054784427
    },
    {
      "data": [{"doc": "{\"_id\": {\"$oid\": \"64f9397f6e255f74d65a****\"}, \"name\": \"lili\", \"age\": 20}"}],
      "pkNames": ["_id"],
      "type": "INSERT",
      "es": 1694054783000,
      "database": "MongoDBTest",
      "id": 0,
      "isDdl": false,
      "table": "user",
      "ts": 1694054784428
    }
  ]
}

データの更新

SQL ステートメント:

db.user.update({"name":"jack"},{$set:{"age":30}})

関数が受信するデータ:

{
  "Records": [{
    "data": [{"doc": "{\"$set\": {\"age\": 30}}"}],
    "pkNames": ["_id"],
    "old": [{"doc": "{\"_id\": {\"$oid\": \"64f9397f6e255f74d65a****\"}}"}],
    "type": "UPDATE",
    "es": 1694054989000,
    "database": "MongoDBTest",
    "id": 0,
    "isDdl": false,
    "table": "user",
    "ts": 1694054990555
  }]
}

データの削除

SQL ステートメント:

db.user.remove({"name":"jack"})

関数が受信するデータ:

{
  "Records": [{
    "data": [{"doc": "{\"_id\": {\"$oid\": \"64f9397f6e255f74d65a****\"}}"}],
    "pkNames": ["_id"],
    "type": "DELETE",
    "es": 1694055452000,
    "database": "MongoDBTest",
    "id": 0,
    "isDdl": false,
    "table": "user",
    "ts": 1694055452852
  }]
}