全部产品
Search
文档中心

数据传输服务 DTS:云数据库MongoDB版(分片集群架构)同步至函数计算FC

更新时间:Mar 29, 2024

数据传输服务DTS(Data Transmission Service)支持将云数据库MongoDB版(分片集群架构)实例的增量数据同步至函数计算FC的指定函数。您可以编写函数代码,结合同步至函数中的数据,对数据进行二次加工。

前提条件

  • 已创建源云数据库MongoDB版(分片集群架构)实例,创建方式,请参见创建分片集群实例

  • 已创建请求处理程序类型处理事件请求的目标服务和函数资源。创建方式,请参见快速创建函数

注意事项

类型

说明

源库限制

  • 带宽要求:源库所属的服务器需具备足够出口带宽,否则将影响数据同步速率。

  • 待同步的集合需具备主键或唯一约束,且字段具有唯一性,否则可能会导致目标数据库中出现重复数据。

  • 若源库待同步的数据中,单条数据不能超过16 MB,否则DTS任务会因无法将数据写入到函数计算FC而产生报错。若您需要同步部分字段,可以使用ETL功能过滤大字段的数据。

  • 如同步对象为集合级别,则单次同步任务仅支持同步至多1000张集合。当超出数量限制,任务提交后会显示请求报错,此时建议您拆分待同步的集合,分批配置多个任务,或者配置整库的同步任务。

  • 需开启Oplog日志。

    说明

    DTS要求源数据库的Oplog日志至少保留7天以上,否则DTS可能因无法获取Oplog日志而导致任务失败,极端情况下甚至可能会导致数据不一致或丢失。由于您所设置的Oplog日志保存时间低于DTS要求的时间进而导致的问题,不在DTS的SLA保障范围内。

  • 在DTS同步期间,不支持MongoDB分片集群进行分片的扩缩容,否则会导致DTS任务失败。

  • 源MongoDB分片集群实例的Mongos节点的数量不能超过10个。

  • 若源实例为分片集群架构的自建MongoDB,则接入方式仅支持专线/VPN网关/智能网关云企业网CEN

其他限制

  • 不支持同步admin和local库中的数据。

  • 不支持全量同步任务。

  • 不支持跨地域的同步任务。

  • 不支持映射功能。

  • 在任务开始后,待同步的数据在使用INSERT命令时必须包含分片键,使用UPDATE命令时不支持更改分片键。

  • 请避免多个DTS任务同步到同一个目标函数,建议不同任务使用不同的函数进行数据隔离,否则可能会造成目标端数据混乱。

  • 不保留事务信息,即源库中的事务同步到目标库时会转变为单条的记录。

费用说明

同步类型

链路配置费用

增量数据同步

收费,详情请参见计费概述

支持同步的操作

同步类型

说明

增量同步

支持同步的增量更新如下:

  • CREATE COLLECTION、INDEX

  • DROP DATABASE、COLLECTION、INDEX

  • RENAME COLLECTION

  • 在集合中插入、更新、删除文档的操作。

数据库账号的权限要求

数据库

所需权限

创建及授权方式

云数据库MongoDB版

待同步库、admin库和local库的read权限。

MongoDB数据库账号权限管理

操作步骤

  1. 进入同步任务的列表页面。

    1. 登录DMS数据管理服务

    2. 在顶部菜单栏中,单击集成与开发(DTS)

    3. 在左侧导航栏,选择数据传输(DTS) > 数据同步

    说明
  2. 同步任务右侧,选择同步实例所属地域。

    说明

    新版DTS同步任务列表页面,需要在页面左上角选择同步实例所属地域。

  3. 单击创建任务,配置源库及目标库信息。

    类别

    配置

    说明

    任务名称

    DTS会自动生成一个任务名称,建议配置具有业务意义的名称(无唯一性要求),便于后续识别。

    源库信息

    选择已有的DMS数据库实例(可选,如未创建可忽略此处选择,直接在下方配置数据库信息即可)

    您可以按实际需求,选择是否使用已有实例。

    • 如使用已有实例,下方数据库信息将自动填入,您无需重复输入。

    • 如不使用已有实例,您需要输入下方的数据库信息。

    数据库类型

    选择MongoDB

    接入方式

    选择云实例

    实例地区

    选择源云数据库MongoDB版实例所属地域。

    是否跨阿里云账号

    本示例为同一阿里云账号间的同步,选择不跨账号

    架构类型

    选择分片集群架构

    实例ID

    选择源云数据库MongoDB版实例ID。

    鉴权数据库名称

    填入源云数据库MongoDB版实例数据库账号所属的数据库名称,若未修改过则为默认的admin

    数据库账号

    填入源云数据库MongoDB版实例的数据库账号,权限要求请参见数据库账号的权限要求

    数据库密码

    填入该数据库账号对应的密码。

    shard账号

    填入源云数据库MongoDB版的数据库Shard账号。

    shard密码

    填入源云数据库MongoDB版数据库Shard账号的密码。

    目标库信息

    选择已有的DMS数据库实例(可选,如未创建可忽略此处选择,直接在下方配置数据库信息即可)

    您可以按实际需求,选择是否使用已有实例。

    • 如使用已有实例,下方数据库信息将自动填入,您无需重复输入。

    • 如不使用已有实例,您需要输入下方的数据库信息。

    数据库类型

    选择函数计算 FC

    接入方式

    选择云实例

    实例地区

    默认与源库实例地区一致,且不支持修改。

    服务

    选择目标函数计算FC的服务名称。

    函数

    选择目标函数计算FC用于接收数据的函数。

    服务版本和别名

    请根据实际情况进行选择。

    • 默认版本服务版本固定为LATEST

    • 指定版本:您还需选择服务版本

    • 指定别名:您还需选择服务别名

    说明

    目标函数计算FC的专有名词,请参见基本概念

  4. 配置完成后,单击页面下方的测试连接以进行下一步

    如果源或目标数据库是阿里云数据库实例(例如RDS MySQL云数据库MongoDB版等),DTS会自动将对应地区DTS服务的IP地址添加到阿里云数据库实例的白名单中;如果源或目标数据库是ECS上的自建数据库,DTS会自动将对应地区DTS服务的IP地址添到ECS的安全规则中,您还需确保自建数据库没有限制ECS的访问(若数据库是集群部署在多个ECS实例,您需要手动将DTS服务对应地区的IP地址添到其余每个ECS的安全规则中);如果源或目标数据库是IDC自建数据库或其他云数据库,则需要您手动添加对应地区DTS服务的IP地址,以允许来自DTS服务器的访问。DTS服务的IP地址,请参见DTS服务器的IP地址段

    警告

    DTS自动添加或您手动添加DTS服务的公网IP地址段可能会存在安全风险,一旦使用本产品代表您已理解和确认其中可能存在的安全风险,并且需要您做好基本的安全防护,包括但不限于加强账号密码强度防范、限制各网段开放的端口号、内部各API使用鉴权方式通信、定期检查并限制不需要的网段,或者使用通过内网(专线/VPN网关/智能网关)的方式接入。

  5. 配置任务对象及高级配置。

    配置

    说明

    同步类型

    仅支持增量同步,且默认已选中。

    数据格式

    同步到FC函数中的数据存储格式,当前仅支持Canal Json

    说明

    Canal Json的参数说明和示例,请参见Canal Json说明

    源库对象

    源库对象框中单击待同步对象,然后单击向右将其移动至已选择对象框。

    说明

    同步对象的选择粒度为库或集合。

    已选择对象

    请在已选择对象框中确认待同步的数据。

    说明

    若您需要移除已选择的对象,可以在已选择对象框中勾选目标对象,并单击zuoyi进行移除。

  6. 单击下一步高级配置,进行高级配置。

    配置

    说明

    选择调度该任务的专属集群

    DTS默认将任务调度到共享集群上,您无需选择。若您希望任务更加稳定,可以购买专属集群来运行DTS同步任务。更多信息,请参见什么是DTS专属集群

    设置告警

    是否设置告警,当同步失败或延迟超过阈值后,将通知告警联系人。

    源库、目标库无法连接后的重试时间

    在同步任务启动后,若源库或目标库连接失败则DTS会报错,并会立即进行持续的重试连接,默认持续重试时间为720分钟,您也可以在取值范围(10~1440分钟)内自定义重试时间,建议设置30分钟以上。如果DTS在设置的重试时间内重新连接上源库、目标库,同步任务将自动恢复。否则,同步任务将会失败。

    说明
    • 针对同源或者同目标的多个DTS实例,如DTS实例A和DTS实例B,设置网络重试时间时A设置30分钟,B设置60分钟,则重试时间以低的30分钟为准。

    • 由于连接重试期间,DTS将收取任务运行费用,建议您根据业务需要自定义重试时间,或者在源和目标库实例释放后尽快释放DTS实例。

    源库、目标库出现其他问题后的重试时间

    在同步任务启动后,若源库或目标库出现非连接性的其他问题(如DDL或DML执行异常),则DTS会报错并会立即进行持续的重试操作,默认持续重试时间为10分钟,您也可以在取值范围(1~1440分钟)内自定义重试时间,建议设置10分钟以上。如果DTS在设置的重试时间内相关操作执行成功,同步任务将自动恢复。否则,同步任务将会失败。

    重要

    源库、目标库出现其他问题后的重试时间的值需要小于源库、目标库无法连接后的重试时间的值。

    是否限制增量同步速率

    您也可以根据实际情况,选择是否对增量同步任务进行限速设置(设置每秒增量同步的行数RPS每秒增量同步的数据量(MB)BPS),以缓解目标库的压力。

    环境标签

    您可以根据实际情况,选择用于标识实例的环境标签。本示例无需选择。

    配置ETL功能

    选择是否配置ETL功能。关于ETL的更多信息,请参见什么是ETL

  7. 保存任务并进行预检查。

    • 若您需要查看调用API接口配置该实例时的参数信息,请将鼠标光标移动至下一步保存任务并预检查按钮上,然后单击气泡中的预览OpenAPI参数

    • 若您无需查看或已完成查看API参数,请单击页面下方的下一步保存任务并预检查

    说明
    • 在同步作业正式启动之前,会先进行预检查。只有预检查通过后,才能成功启动同步作业。

    • 如果预检查失败,请单击失败检查项后的查看详情,并根据提示修复后重新进行预检查。

    • 如果预检查产生警告:

      • 对于不可以忽略的检查项,请单击失败检查项后的查看详情,并根据提示修复后重新进行预检查。

      • 对于可以忽略无需修复的检查项,您可以依次单击点击确认告警详情确认屏蔽确定重新进行预检查,跳过告警检查项重新进行预检查。如果选择屏蔽告警检查项,可能会导致数据不一致等问题,给业务带来风险。

  8. 预检查通过率显示为100%时,单击下一步购买

  9. 购买页面,选择数据同步实例的计费方式、链路规格,详细说明请参见下表。

    类别

    参数

    说明

    信息配置

    计费方式

    • 预付费(包年包月):在新建实例时支付费用。适合长期需求,价格比按量付费更实惠,且购买时长越长,折扣越多。

    • 后付费(按量付费):按小时扣费。适合短期需求,用完可立即释放实例,节省费用。

    资源组配置

    实例所属的资源组,默认为default resource group。更多信息,请参见什么是资源管理

    链路规格

    DTS为您提供了不同性能的同步规格,同步链路规格的不同会影响同步速率,您可以根据业务场景进行选择。更多信息,请参见数据同步链路规格说明

    订购时长

    在预付费模式下,选择包年包月实例的时长和数量,包月可选择1~9个月,包年可选择1年、2年、3年和5年。

    说明

    该选项仅在付费类型为预付费时出现。

  10. 配置完成后,阅读并勾选《数据传输(按量付费)服务条款》

  11. 单击购买并启动,同步任务正式开始,您可在数据同步界面查看具体任务进度。

  12. 预检查通过率显示为100%时,单击下一步购买

目标服务接收的数据格式

FC接收到的数据类型为Object,源端增量数据以数组的方式存储在Records字段中,数组中的每一个元素为一条Object类型的数据记录。其中Object字段和含义如下表所示。

说明

FC接收到的数据有DML和DDL两种:

  • DDL:记录更改数据库的结构信息,如CreateIndex、CreateCollection、DropIndex、DropCollection等。

  • DML:记录管理数据库中的数据信息,如INSERT、UPDATE、DELETE。

字段

类型

说明

isDdl

Boolean

是否为DDL操作。

  • True:是。

  • False:否。

type

String

SQL操作的类型。

  • DML操作:DELETEUPDATEINSERT

  • DDL操作:固定为DDL

database

String

MongoDB的数据库名。

table

String

MongoDB的集合名。

pkNames

String

MongoDB的主键名,固定为_id

es

Long

操作在源库的执行时间,13位Unix时间戳,单位为毫秒。

说明

Unix时间戳转换工具可用搜索引擎获取。

ts

Long

操作开始写入到目标库的时间,13位Unix时间戳,单位为毫秒。

说明

Unix时间戳转换工具可用搜索引擎获取。

data

Object Array

Array中仅包含一个元素,类型为Object,其中Key为doc,value的类型为Json String。

说明

将value反序列化即可得到数据记录。

id

Int

操作的序列号。

DDL操作数据格式示例

创建集合

SQL语句

db.createCollection("testCollection")

FC接收到的数据

{
	'Records': [{
		'data': [{
			'doc': '{"create": "testCollection", "idIndex": {"v": 2, "key": {"_id": 1}, "name": "_id_"}}'
		}],
		'pkNames': ['_id'],
		'type': 'DDL',
		'es': 1694056437000,
		'database': 'MongoDBTest',
		'id': 0,
		'isDdl': True,
		'table': 'testCollection',
		'ts': 1694056437510
	}]
}

删除集合

SQL语句

db.testCollection.drop()

FC接收到的数据

{
	'Records': [{
		'data': [{
			'doc': '{"drop": "testCollection"}'
		}],
		'pkNames': ['_id'],
		'type': 'DDL',
		'es': 1694056577000,
		'database': 'MongoDBTest',
		'id': 0,
		'isDdl': True,
		'table': 'testCollection',
		'ts': 1694056577789
	}]
}

创建索引

SQL语句

db.testCollection.createIndex({name:1})

FC接收到的数据

{
	'Records': [{
		'data': [{
			'doc': '{"createIndexes": "testCollection", "v": 2, "key": {"name": 1}, "name": "name_1"}'
		}],
		'pkNames': ['_id'],
		'type': 'DDL',
		'es': 1694056670000,
		'database': 'MongoDBTest',
		'id': 0,
		'isDdl': True,
		'table': 'testCollection',
		'ts': 1694056670719
	}]
}

删除索引

SQL语句

db.testCollection.dropIndex({name:1})

FC接收到的数据

{
	'Records': [{
		'data': [{
			'doc': '{"dropIndexes": "testCollection", "index": "name_1"}'
		}],
		'pkNames': ['_id'],
		'type': 'DDL',
		'es': 1694056817000,
		'database': 'MongoDBTest',
		'id': 0,
		'isDdl': True,
		'table': '$cmd',
		'ts': 1694056818035
	}]
}

DML操作数据格式示例

插入数据

SQL语句

// 批量插入
db.runCommand({insert: "user", documents: [{"name":"jack","age":20},{"name":"lili","age":20}]})

// 逐条插入
db.user.insert({"name":"jack","age":20})
db.user.insert({"name":"lili","age":20})

FC接收到的数据

{
	'Records': [{
		'data': [{
			'doc': '{"_id": {"$oid": "64f9397f6e255f74d65a****"}, "name": "jack", "age": 20}'
		}],
		'pkNames': ['_id'],
		'type': 'INSERT',
		'es': 1694054783000,
		'database': 'MongoDBTest',
		'id': 0,
		'isDdl': False,
		'table': 'user',
		'ts': 1694054784427
	}, {
		'data': [{
			'doc': '{"_id": {"$oid": "64f9397f6e255f74d65a****"}, "name": "lili", "age": 20}'
		}],
		'pkNames': ['_id'],
		'type': 'INSERT',
		'es': 1694054783000,
		'database': 'MongoDBTest',
		'id': 0,
		'isDdl': False,
		'table': 'user',
		'ts': 1694054784428
	}]
}

更新数据

SQL语句

db.user.update({"name":"jack"},{$set:{"age":30}}) 

FC接收到的数据

{
	'Records': [{
		'data': [{
			'doc': '{"$set": {"age": 30}}'
		}],
		'pkNames': ['_id'],
		'old': [{
			'doc': '{"_id": {"$oid": "64f9397f6e255f74d65a****"}}'
		}],
		'type': 'UPDATE',
		'es': 1694054989000,
		'database': 'MongoDBTest',
		'id': 0,
		'isDdl': False,
		'table': 'user',
		'ts': 1694054990555
	}]
}

删除数据

SQL语句

db.user.remove({"name":"jack"})

FC接收到的数据

{
	'Records': [{
		'data': [{
			'doc': '{"_id": {"$oid": "64f9397f6e255f74d65a****"}}'
		}],
		'pkNames': ['_id'],
		'type': 'DELETE',
		'es': 1694055452000,
		'database': 'MongoDBTest',
		'id': 0,
		'isDdl': False,
		'table': 'user',
		'ts': 1694055452852
	}]
}