全部产品
Search
文档中心

数据传输服务 DTS:云数据库MongoDB版(副本集架构)同步至函数计算FC

更新时间:Feb 10, 2025

数据传输服务DTS(Data Transmission Service)支持将云数据库MongoDB版(副本集架构)实例的增量数据同步至函数计算FC的指定函数。您可以编写函数代码,结合同步至函数中的数据,对数据进行二次加工。

前提条件

  • 已创建源云数据库MongoDB版(副本集架构)实例,创建方式,请参见创建副本集实例

  • 已创建请求处理程序类型处理事件请求的目标服务和函数资源。创建方式,请参见快速创建函数

注意事项

类型

说明

源库限制

  • 带宽要求:源库所属的服务器需具备足够的出口带宽,否则将影响数据同步速率。

  • 待同步的集合需具备主键或唯一约束,且字段具有唯一性,否则可能会导致目标数据库中出现重复数据。

  • 源库待同步的单条数据不能超过16 MB,否则DTS任务会因无法将数据写入到函数计算FC而产生报错。若您需要同步部分字段,可以使用ETL功能过滤大字段的数据。

  • 如同步对象为集合级别,则单次同步任务仅支持同步至多1000张集合。当超出数量限制,任务提交后会显示请求报错,此时建议您拆分待同步的集合,分批配置多个任务,或者配置整库的同步任务。

  • 源库不支持Azure Cosmos DB for MongoDB和弹性集群的Amazon DocumentDB。

  • 源库需开启Oplog日志,并确保Oplog日志至少保留7天以上;或者开启变更流(Change Streams),并确保DTS能够通过Change Streams订阅到源库最近7天内的数据变更。否则可能会因无法获取源库的数据变更而导致任务失败,极端情况下甚至可能会导致数据不一致或丢失。由此导致的问题,不在DTS的SLA保障范围内。

    重要
    • 建议通过Oplog日志获取源库的数据变更。

    • 仅4.0及以上版本的MongoDB支持通过Change Streams获取数据变更,使用Change Streams获取源库的数据变更不支持双向同步。

    • 源库为Amazon DocumentDB(非弹性集群)时,需要手动开启Change Streams,并在配置任务时将迁移方式选择为ChangeStream,将架构类型选择为分片集群架构

其他限制

  • 不支持同步admin和local库中的数据。

  • 不支持全量同步任务。

  • 不支持跨地域的同步任务。

  • 不支持映射功能。

  • 请避免多个DTS任务同步到同一个目标函数,建议不同任务使用不同的函数进行数据隔离,否则可能会造成目标端数据混乱。

  • 不保留事务信息,即源库中的事务同步到目标库时会转变为单条的记录。

  • 若实例运行失败,DTS技术支持人员将在8小时内尝试恢复该实例。在恢复失败实例的过程中,可能会对该实例进行重启、调整参数等操作。

    说明

    在调整参数时,仅会修改实例的参数,不会对数据库中的参数进行修改。可能修改的参数,包括但不限于修改实例参数中的参数。

特殊情况

当源库为自建MongoDB时:

  • 在同步时,如果源库进行主备切换,将会导致同步任务失败。

  • 由于DTS的延迟时间是根据同步到目标库最后一条数据的时间戳和当前时间戳对比得出,源库长时间未执行更新操作可能导致延迟信息不准确。如果任务显示的延迟时间过大,您可以在源库执行一个更新操作来更新延迟信息。

说明

如果同步对象选择为整库,您还可以创建心跳表,心跳表每秒定期更新或者写入数据。

费用说明

同步类型

链路配置费用

增量数据同步

收费,详情请参见计费概述

支持同步的操作

同步类型

说明

增量同步

使用Oplog

增量同步不支持在任务开始运行后新建的数据库,支持同步的增量更新如下:

  • CREATE COLLECTION、INDEX

  • DROP DATABASE、COLLECTION、INDEX

  • RENAME COLLECTION

  • 在集合中插入、更新、删除文档的操作。

    说明

    在同步增量更新文档的数据时,仅支持同步使用$set命令更新的操作。

使用ChangeStream

支持同步的增量更新如下:

  • DROP DATABASE、COLLECTION

  • RENAME COLLECTION

  • 在集合中插入、更新、删除文档的操作。

    说明

    在同步增量更新文档的数据时,仅支持同步使用$set命令更新的操作。

数据库账号的权限要求

数据库

所需权限

创建及授权方式

云数据库MongoDB版

待同步库、admin库和local库的read权限。

MongoDB数据库账号权限管理

操作步骤

  1. 进入目标地域的同步任务列表页面(二选一)。

    通过DTS控制台进入

    1. 登录数据传输服务DTS控制台

    2. 在左侧导航栏,单击数据同步

    3. 在页面左上角,选择同步实例所属地域。

    通过DMS控制台进入

    说明

    实际操作可能会因DMS的模式和布局不同,而有所差异。更多信息,请参见极简模式控制台自定义DMS界面布局与样式

    1. 登录DMS数据管理服务

    2. 在顶部菜单栏中,选择集成与开发 > 数据传输(DTS) > 数据同步

    3. 同步任务右侧,选择同步实例所属地域。

  2. 单击创建任务,进入任务配置页面。

  3. 可选:在页面右上角,单击试用新版配置页

    说明
    • 若您已进入新版配置页(页面右上角的按钮为返回旧版配置页),则无需执行此操作。

    • 新版配置页和旧版配置页部分参数有差异,建议使用新版配置页。

  4. 配置源库及目标库信息。

    类别

    配置

    说明

    任务名称

    DTS会自动生成一个任务名称,建议配置具有业务意义的名称(无唯一性要求),便于后续识别。

    源库信息

    选择已有连接信息

    您可以按实际需求,选择是否使用已有数据库实例。

    • 如使用已有实例,下方数据库信息将自动填入,您无需重复输入。

    • 如不使用已有实例,您需要配置下方的数据库信息。

    说明
    • 您可以在数据连接管理页面或新版配置页面,将数据库录入DTS。更多信息,请参见数据连接管理

    • DMS控制台的配置项为选择DMS数据库实例,您可以单击新增DMS数据库实例或在控制台首页将数据库录入DMS。更多信息,请参见云数据库录入他云/自建数据库录入

    数据库类型

    选择MongoDB

    接入方式

    选择云实例

    实例地区

    选择源云数据库MongoDB版实例所属地域。

    是否跨阿里云账号

    本示例为同一阿里云账号间的同步,选择不跨账号

    架构类型

    选择副本集架构

    迁移方式

    请根据实际情况,选择增量数据同步的方式。

    • Oplog(推荐):

      若源库已开启Oplog日志,则支持此选项。

      说明

      本地自建MongoDB和云数据库MongoDB版默认已开启Oplog日志,且使用此方式同步增量数据时增量同步任务的延迟较小(拉取日志的速度较快),因此推荐选择Oplog

    • ChangeStream

      若源库已开启变更流(Change Streams),则支持此选项。

      说明
      • 源库为Amazon DocumentDB(非弹性集群)时,仅支持选择ChangeStream

      • 源库架构类型选择为分片集群架构,无需填写Shard账号Shard密码

    实例ID

    选择源云数据库MongoDB版实例ID。

    鉴权数据库名称

    填入源云数据库MongoDB版实例数据库账号所属的数据库名称,若未修改过则为默认的admin

    数据库账号

    填入源云数据库MongoDB版实例的数据库账号,权限要求请参见数据库账号的权限要求

    数据库密码

    填入该数据库账号对应的密码。

    连接方式

    DTS支持非加密连接SSL安全连接Mongo Atlas SSL三种连接方式。连接方式的选项与接入方式架构类型有关,请以控制台为准。

    说明
    • 架构类型分片集群架构,且迁移方式Oplog的MongoDB数据库,不支持SSL安全连接

    • 若源库为自建(接入方式不为云实例副本集架构的MongoDB数据库,并且选择了SSL安全连接,DTS还支持上传CA证书对连接进行校验。

    目标库信息

    选择已有连接信息

    您可以按实际需求,选择是否使用已有数据库实例。

    • 如使用已有实例,下方数据库信息将自动填入,您无需重复输入。

    • 如不使用已有实例,您需要配置下方的数据库信息。

    说明
    • 您可以在数据连接管理页面或新版配置页面,将数据库录入DTS。更多信息,请参见数据连接管理

    • DMS控制台的配置项为选择DMS数据库实例,您可以单击新增DMS数据库实例或在控制台首页将数据库录入DMS。更多信息,请参见云数据库录入他云/自建数据库录入

    数据库类型

    选择函数计算 FC

    接入方式

    选择云实例

    实例地区

    默认与源库实例地区一致,且不支持修改。

    服务

    选择目标函数计算FC的服务名称。

    函数

    选择目标函数计算FC用于接收数据的函数。

    服务版本和别名

    请根据实际情况进行选择。

    • 默认版本服务版本固定为LATEST

    • 指定版本:您还需选择服务版本

    • 指定别名:您还需选择服务别名

    说明

    目标函数计算FC的专有名词,请参见基本概念

  5. 配置完成后,在页面下方单击测试连接以进行下一步

    说明
    • 请确保DTS服务的IP地址段能够被自动或手动添加至源库和目标库的安全设置中,以允许DTS服务器的访问。更多信息,请参见添加DTS服务器的IP地址段

    • 若源库或目标库为自建数据库(接入方式不是云实例),则还需要在弹出的DTS服务器访问授权对话框单击测试连接

  6. 配置任务对象。

    1. 对象配置页面,配置待同步的对象。

      配置

      说明

      同步类型

      仅支持增量同步,且默认已选中。

      数据格式

      同步到FC函数中的数据存储格式,当前仅支持Canal Json

      说明

      Canal Json的参数说明和示例,请参见Canal Json说明

      源库对象

      源库对象框中单击待同步对象,然后单击向右将其移动至已选择对象框。

      说明

      同步对象的选择粒度为库或集合。

      已选择对象

      请在已选择对象框中确认待同步的数据。

      说明
      • 如需移除已选择的对象,可以在已选择对象框中勾选目标对象,并单击zuoyi进行移除。

      • 如需按库或集合级别筛选需要同步的增量更新操作,请在已选择对象框中右键单击待同步的对象,并在弹出的对话框中进行选择。

    2. 单击下一步高级配置,进行高级参数配置。

      配置

      说明

      选择调度该任务的专属集群

      DTS默认将任务调度到共享集群上,您无需选择。若您希望任务更加稳定,可以购买专属集群来运行DTS同步任务。更多信息,请参见什么是DTS专属集群

      源库、目标库无法连接后的重试时间

      在同步任务启动后,若源库或目标库连接失败则DTS会报错,并会立即进行持续的重试连接,默认持续重试时间为720分钟,您也可以在取值范围(10~1440分钟)内自定义重试时间,建议设置30分钟以上。如果DTS在设置的重试时间内重新连接上源库、目标库,同步任务将自动恢复。否则,同步任务将会失败。

      说明
      • 针对同源或者同目标的多个DTS实例,如DTS实例A和DTS实例B,设置网络重试时间时A设置30分钟,B设置60分钟,则重试时间以低的30分钟为准。

      • 由于连接重试期间,DTS将收取任务运行费用,建议您根据业务需要自定义重试时间,或者在源和目标库实例释放后尽快释放DTS实例。

      源库、目标库出现其他问题后的重试时间

      在同步任务启动后,若源库或目标库出现非连接性的其他问题(如DDL或DML执行异常),则DTS会报错并会立即进行持续的重试操作,默认持续重试时间为10分钟,您也可以在取值范围(1~1440分钟)内自定义重试时间,建议设置10分钟以上。如果DTS在设置的重试时间内相关操作执行成功,同步任务将自动恢复。否则,同步任务将会失败。

      重要

      源库、目标库出现其他问题后的重试时间的值需要小于源库、目标库无法连接后的重试时间的值。

      是否限制增量同步速率

      您也可以根据实际情况,选择是否对增量同步任务进行限速设置(设置每秒增量同步的行数RPS每秒增量同步的数据量(MB)BPS),以缓解目标库的压力。

      环境标签

      您可以根据实际情况,选择用于标识实例的环境标签。本示例无需选择。

      配置ETL功能

      选择是否配置ETL功能。关于ETL的更多信息,请参见什么是ETL

      监控告警

      是否设置告警,当同步失败或延迟超过阈值后,将通知告警联系人。

  7. 保存任务并进行预检查。

    • 若您需要查看调用API接口配置该实例时的参数信息,请将鼠标光标移动至下一步保存任务并预检查按钮上,然后单击气泡中的预览OpenAPI参数

    • 若您无需查看或已完成查看API参数,请单击页面下方的下一步保存任务并预检查

    说明
    • 在同步作业正式启动之前,会先进行预检查。只有预检查通过后,才能成功启动同步作业。

    • 如果预检查失败,请单击失败检查项后的查看详情,并根据提示修复后重新进行预检查。

    • 如果预检查产生警告:

      • 对于不可以忽略的检查项,请单击失败检查项后的查看详情,并根据提示修复后重新进行预检查。

      • 对于可以忽略无需修复的检查项,您可以依次单击点击确认告警详情确认屏蔽确定重新进行预检查,跳过告警检查项重新进行预检查。如果选择屏蔽告警检查项,可能会导致数据不一致等问题,给业务带来风险。

  8. 购买实例。

    1. 预检查通过率显示为100%时,单击下一步购买

    2. 购买页面,选择数据同步实例的计费方式、链路规格,详细说明请参见下表。

      类别

      参数

      说明

      信息配置

      计费方式

      • 预付费(包年包月):在新建实例时支付费用。适合长期需求,价格比按量付费更实惠,且购买时长越长,折扣越多。

      • 后付费(按量付费):按小时扣费。适合短期需求,用完可立即释放实例,节省费用。

      资源组配置

      实例所属的资源组,默认为default resource group。更多信息,请参见什么是资源管理

      链路规格

      DTS为您提供了不同性能的同步规格,同步链路规格的不同会影响同步速率,您可以根据业务场景进行选择。更多信息,请参见数据同步链路规格说明

      订购时长

      在预付费模式下,选择包年包月实例的时长和数量,包月可选择1~9个月,包年可选择1年、2年、3年和5年。

      说明

      该选项仅在付费类型为预付费时出现。

    3. 配置完成后,阅读并勾选《数据传输(按量付费)服务条款》

    4. 单击购买并启动,并在弹出的确认对话框,单击确定

      您可在数据同步界面查看具体任务进度。

后续步骤

目标服务接收的数据格式

FC接收到的数据类型为Object,源端增量数据以数组的方式存储在Records字段中,数组中的每一个元素为一条Object类型的数据记录。其中Object字段和含义如下表所示。

说明

FC接收到的数据有DML和DDL两种:

  • DDL:记录更改数据库的结构信息,如CreateIndex、CreateCollection、DropIndex、DropCollection等。

  • DML:记录管理数据库中的数据信息,如INSERT、UPDATE、DELETE。

字段

类型

说明

isDdl

Boolean

是否为DDL操作。

  • True:是。

  • False:否。

type

String

SQL操作的类型。

  • DML操作:DELETEUPDATEINSERT

  • DDL操作:固定为DDL

database

String

MongoDB的数据库名。

table

String

MongoDB的集合名。

pkNames

String

MongoDB的主键名,固定为_id

es

Long

操作在源库的执行时间,13位Unix时间戳,单位为毫秒。

说明

Unix时间戳转换工具可用搜索引擎获取。

ts

Long

操作开始写入到目标库的时间,13位Unix时间戳,单位为毫秒。

说明

Unix时间戳转换工具可用搜索引擎获取。

data

Object Array

Array中仅包含一个元素,类型为Object,其中Key为doc,value的类型为Json String。

说明

将value反序列化即可得到数据记录。

old

Object Array

存储更新前的数据,格式同 data 字段。

重要

仅当typeUPDATE时存在,格式与data字段相同。

id

Int

操作的序列号。

DDL操作数据格式示例

创建集合

SQL语句

db.createCollection("testCollection")

FC接收到的数据

{
	'Records': [{
		'data': [{
			'doc': '{"create": "testCollection", "idIndex": {"v": 2, "key": {"_id": 1}, "name": "_id_"}}'
		}],
		'pkNames': ['_id'],
		'type': 'DDL',
		'es': 1694056437000,
		'database': 'MongoDBTest',
		'id': 0,
		'isDdl': True,
		'table': 'testCollection',
		'ts': 1694056437510
	}]
}

删除集合

SQL语句

db.testCollection.drop()

FC接收到的数据

{
	'Records': [{
		'data': [{
			'doc': '{"drop": "testCollection"}'
		}],
		'pkNames': ['_id'],
		'type': 'DDL',
		'es': 1694056577000,
		'database': 'MongoDBTest',
		'id': 0,
		'isDdl': True,
		'table': 'testCollection',
		'ts': 1694056577789
	}]
}

创建索引

SQL语句

db.testCollection.createIndex({name:1})

FC接收到的数据

{
	'Records': [{
		'data': [{
			'doc': '{"createIndexes": "testCollection", "v": 2, "key": {"name": 1}, "name": "name_1"}'
		}],
		'pkNames': ['_id'],
		'type': 'DDL',
		'es': 1694056670000,
		'database': 'MongoDBTest',
		'id': 0,
		'isDdl': True,
		'table': 'testCollection',
		'ts': 1694056670719
	}]
}

删除索引

SQL语句

db.testCollection.dropIndex({name:1})

FC接收到的数据

{
	'Records': [{
		'data': [{
			'doc': '{"dropIndexes": "testCollection", "index": "name_1"}'
		}],
		'pkNames': ['_id'],
		'type': 'DDL',
		'es': 1694056817000,
		'database': 'MongoDBTest',
		'id': 0,
		'isDdl': True,
		'table': '$cmd',
		'ts': 1694056818035
	}]
}

DML操作数据格式示例

插入数据

SQL语句

// 批量插入
db.runCommand({insert: "user", documents: [{"name":"jack","age":20},{"name":"lili","age":20}]})

// 逐条插入
db.user.insert({"name":"jack","age":20})
db.user.insert({"name":"lili","age":20})

FC接收到的数据

{
	'Records': [{
		'data': [{
			'doc': '{"_id": {"$oid": "64f9397f6e255f74d65a****"}, "name": "jack", "age": 20}'
		}],
		'pkNames': ['_id'],
		'type': 'INSERT',
		'es': 1694054783000,
		'database': 'MongoDBTest',
		'id': 0,
		'isDdl': False,
		'table': 'user',
		'ts': 1694054784427
	}, {
		'data': [{
			'doc': '{"_id": {"$oid": "64f9397f6e255f74d65a****"}, "name": "lili", "age": 20}'
		}],
		'pkNames': ['_id'],
		'type': 'INSERT',
		'es': 1694054783000,
		'database': 'MongoDBTest',
		'id': 0,
		'isDdl': False,
		'table': 'user',
		'ts': 1694054784428
	}]
}

更新数据

SQL语句

db.user.update({"name":"jack"},{$set:{"age":30}}) 

FC接收到的数据

{
	'Records': [{
		'data': [{
			'doc': '{"$set": {"age": 30}}'
		}],
		'pkNames': ['_id'],
		'old': [{
			'doc': '{"_id": {"$oid": "64f9397f6e255f74d65a****"}}'
		}],
		'type': 'UPDATE',
		'es': 1694054989000,
		'database': 'MongoDBTest',
		'id': 0,
		'isDdl': False,
		'table': 'user',
		'ts': 1694054990555
	}]
}

删除数据

SQL语句

db.user.remove({"name":"jack"})

FC接收到的数据

{
	'Records': [{
		'data': [{
			'doc': '{"_id": {"$oid": "64f9397f6e255f74d65a****"}}'
		}],
		'pkNames': ['_id'],
		'type': 'DELETE',
		'es': 1694055452000,
		'database': 'MongoDBTest',
		'id': 0,
		'isDdl': False,
		'table': 'user',
		'ts': 1694055452852
	}]
}