全部产品
Search
文档中心

数据传输服务 DTS:PolarDB-X 2.0同步至MaxCompute

更新时间:Mar 28, 2024

本文介绍如何使用数据传输服务DTS(Data Transmission Service),将PolarDB-X 2.0同步至MaxCompute。

前提条件

注意事项

说明

DTS不会将源数据库中的外键同步到目标数据库,因此源数据库的级联、删除等操作不会同步到目标数据库。

类型

说明

源库限制

  • 待同步的表需具备主键或唯一约束,且字段具有唯一性,否则可能会导致目标数据库中出现重复数据。

  • 若同步对象为表级别,且需进行编辑(如表列名映射),单次同步任务的表数量超过5000时,建议您拆分待同步的表,分批配置多个任务,或者配置整库的同步任务,否则任务提交后可能会显示请求报错。

  • Binlog日志:

    • 在PolarDB-X 2.0控制台参数设置界面开启binlog,开启方法请参见参数设置。并且binlog_row_image为full。否则预检查阶段提示报错,且无法成功启动数据同步任务。

    • 如为增量同步任务,DTS要求源数据库的本地Binlog日志保存24小时以上,如为全量同步和增量同步任务,DTS要求源数据库的本地Binlog日志至少保留7天以上(您可在全量同步完成后将Binlog保存时间设置为24小时以上),否则DTS可能因无法获取Binlog而导致任务失败,极端情况下可能会导致数据不一致或丢失。由于您所设置的Binlog日志保存时间低于DTS要求的时间进而导致的问题,不在DTS的SLA保障范围内。

  • 若源端PolarDB-X 2.0待同步的表名中含大写字母,则仅支持库表结构同步。

  • PolarDB分布式版需兼容MySQL 5.7版本。

其他限制

  • 仅支持表级别的数据同步。

  • 执行数据同步前需评估源库和目标库的性能,同时建议业务低峰期执行数据同步。否则全量数据初始化时将占用源库和目标库一定的读写资源,可能会导致数据库的负载上升。

  • 全量初始化会并发执行INSERT操作,导致目标数据库的表产生碎片,因此全量初始化完成后目标实例的表空间比源实例的表空间大。

  • 请勿对源库的同步对象使用pt-online-schema-change等类似工具执行在线DDL变更,否则会导致同步失败。

  • 若在DTS同步数据期间,有除DTS同步任务外的数据写入目标库,则会导致源库与目标库数据不一致。例如,使用DMS执行在线DDL变更,可能引起目标库数据丢失。

  • 由于MaxCompute不支持主键约束,当DTS在同步数据时因网络等原因触发重传,可能会导致MaxCompute中出现重复记录。

其他注意事项

DTS会在源库定时更新`dts_health_check`.`ha_health_check`表以推进binlog位点。

费用说明

同步类型链路配置费用
库表结构同步和全量数据同步不收费。
增量数据同步收费,详情请参见计费概述

支持同步的SQL操作

操作类型

SQL操作语句

DML

INSERT、UPDATE、DELETE

DDL

ALTER TABLE、ADD COLUMN

操作步骤

  1. 进入同步任务的列表页面。

    1. 登录DMS数据管理服务

    2. 在顶部菜单栏中,单击集成与开发(DTS)

    3. 在左侧导航栏,选择数据传输(DTS) > 数据同步

    说明
  2. 同步任务右侧,选择同步实例所属地域。

    说明

    新版DTS同步任务列表页面,需要在页面左上角选择同步实例所属地域。

  3. 单击创建任务,配置源库及目标库信息。

    类别

    配置

    说明

    任务名称

    DTS会自动生成一个任务名称,建议配置具有业务意义的名称(无唯一性要求),便于后续识别。

    源库信息

    选择已有的DMS数据库实例

    您可以按实际需求,选择是否使用已有实例。

    • 如使用已有实例,下方数据库信息将自动填入,您无需重复输入。

    • 如不使用已有实例,您需要输入下方的数据库信息。

    数据库类型

    选择PolarDB-X 2.0

    接入方式

    选择云实例

    实例地区

    选择PolarDB-X 2.0实例所属地域。

    是否跨阿里云账号

    本示例为同一阿里云账号间的同步,选择不跨账号

    实例ID

    选择源PolarDB-X 2.0实例ID。

    数据库账号

    填入PolarDB-X 2.0的数据库账号,需具备REPLICATION SLAVE、REPLICATION CLIENT及待同步对象的SELECT权限。

    说明

    授权方式,请参见数据同步过程中的账号权限问题

    数据库密码

    填入该数据库账号对应的密码。

    目标库信息

    选择已有的DMS数据库实例

    您可以按实际需求,选择是否使用已有实例。

    • 如使用已有实例,下方数据库信息将自动填入,您无需重复输入。

    • 如不使用已有实例,您需要输入下方的数据库信息。

    数据库类型

    选择MaxCompute

    接入方式

    选择云实例

    实例地区

    选择目标MaxCompute实例所属地域。

    Project

    填入MaxCompute实例的Project,您可以在MaxCompute工作空间列表页面中查询。

    主账号AccessKeyId

    填入用于标识主账号的AccessKey Id,获取方式请参见创建AccessKey

    主账号AccessKeySecret

    填入用于验证主账号的密钥AccessKey Secret,获取方式请参见创建AccessKey

  4. 配置完成后,单击页面下方的测试连接以进行下一步

    如果源或目标数据库是阿里云数据库实例(例如RDS MySQL云数据库MongoDB版等),DTS会自动将对应地区DTS服务的IP地址添加到阿里云数据库实例的白名单中;如果源或目标数据库是ECS上的自建数据库,DTS会自动将对应地区DTS服务的IP地址添到ECS的安全规则中,您还需确保自建数据库没有限制ECS的访问(若数据库是集群部署在多个ECS实例,您需要手动将DTS服务对应地区的IP地址添到其余每个ECS的安全规则中);如果源或目标数据库是IDC自建数据库或其他云数据库,则需要您手动添加对应地区DTS服务的IP地址,以允许来自DTS服务器的访问。DTS服务的IP地址,请参见DTS服务器的IP地址段

    警告

    DTS自动添加或您手动添加DTS服务的公网IP地址段可能会存在安全风险,一旦使用本产品代表您已理解和确认其中可能存在的安全风险,并且需要您做好基本的安全防护,包括但不限于加强账号密码强度防范、限制各网段开放的端口号、内部各API使用鉴权方式通信、定期检查并限制不需要的网段,或者使用通过内网(专线/VPN网关/智能网关)的方式接入。

  5. 单击确定,完成ODPS账号授权。

  6. 配置任务对象及高级配置。

    配置

    说明

    同步类型

    固定选中增量同步。默认情况下,您还需要同时选中库表结构同步全量同步。预检查完成后,DTS会将源实例中待同步对象的全量数据在目标集群中初始化,作为后续增量同步数据的基线数据。

    目标已存在表的处理模式

    • 预检查并报错拦截:检查目标数据库中是否有同名的表。如果目标数据库中没有同名的表,则通过该检查项目;如果目标数据库中有同名的表,则在预检查阶段提示错误,数据同步任务不会被启动。

      说明

      如果目标库中同名的表不方便删除或重命名,您可以更改该表在目标库中的名称,请参见库表列名映射

    • 忽略报错并继续执行:跳过目标数据库中是否有同名表的检查项。

      警告

      选择为忽略报错并继续执行,可能导致数据不一致,给业务带来风险,例如:

      • 表结构一致的情况下,如在目标库遇到与源库主键或唯一键的值相同的记录:

        • 全量期间,DTS会保留目标集群中的该条记录,即源库中的该条记录不会同步至目标数据库中。

        • 增量期间,DTS不会保留目标集群中的该条记录,即源库中的该条记录会覆盖至目标数据库中。

      • 表结构不一致的情况下,可能会导致无法初始化数据、只能同步部分列的数据或同步失败,请谨慎操作。

    附加列规则

    DTS在将数据同步到MaxCompute时,会在同步的目标表中添加一些附加列。如果附加列和目标表中已有的列出现名称冲突将会导致数据同步失败。您需要根据业务需求选择新规则旧规则

    警告

    在选择附加列规则前,您需要评估附加列和目标表中已有的列是否会出现名称冲突,否则可能会导致数据丢失或任务失败。关于附加列的规则和定义说明,请参见附加列名称和定义说明

    增量日志表分区定义

    根据业务需求,选择分区名称。关于分区的相关介绍请参见分区

    目标库对象名称大小写策略

    您可以配置目标实例中同步对象的库名、表名和列名的英文大小写策略。默认情况下选择DTS默认策略,您也可以选择与源库、目标库默认策略保持一致。更多信息,请参见目标库对象名称大小写策略

    源库对象

    源库对象框中单击待同步对象,然后单击向右将其移动至已选择对象框。

    说明

    同步对象选择的粒度为表。

    已选择对象

    • 如需更改单个同步对象在目标实例中的名称,请右击已选择对象中的同步对象,设置方式,请参见库表列名单个映射

    • 如需批量更改同步对象在目标实例中的名称,请单击已选择对象方框右上方的批量编辑,设置方式,请参见库表列名批量映射

    说明
    • 如需按库或表级别选择同步的SQL操作,请在已选择对象中右击待同步对象,并在弹出的对话框中选择所需同步的SQL操作。支持的操作请参见支持同步的SQL操作

    • 如需设置WHERE条件过滤数据,请在已选择对象中右击待同步的表,在弹出的对话框中设置过滤条件。设置方法请参见通过SQL条件过滤任务数据

  7. 单击下一步高级配置,进行高级配置。

    配置

    说明

    选择调度该任务的专属集群

    DTS默认将任务调度到共享集群上,您无需选择。若您希望任务更加稳定,可以购买专属集群来运行DTS同步任务。更多信息,请参见什么是DTS专属集群

    源库、目标库无法连接后的重试时间

    在同步任务启动后,若源库或目标库连接失败则DTS会报错,并会立即进行持续的重试连接,默认持续重试时间为720分钟,您也可以在取值范围(10~1440分钟)内自定义重试时间,建议设置30分钟以上。如果DTS在设置的重试时间内重新连接上源库、目标库,同步任务将自动恢复。否则,同步任务将会失败。

    说明
    • 针对同源或者同目标的多个DTS实例,如DTS实例A和DTS实例B,设置网络重试时间时A设置30分钟,B设置60分钟,则重试时间以低的30分钟为准。

    • 由于连接重试期间,DTS将收取任务运行费用,建议您根据业务需要自定义重试时间,或者在源和目标库实例释放后尽快释放DTS实例。

    源库、目标库出现其他问题后的重试时间

    在同步任务启动后,若源库或目标库出现非连接性的其他问题(如DDL或DML执行异常),则DTS会报错并会立即进行持续的重试操作,默认持续重试时间为10分钟,您也可以在取值范围(1~1440分钟)内自定义重试时间,建议设置10分钟以上。如果DTS在设置的重试时间内相关操作执行成功,同步任务将自动恢复。否则,同步任务将会失败。

    重要

    源库、目标库出现其他问题后的重试时间的值需要小于源库、目标库无法连接后的重试时间的值。

    是否限制全量迁移速率

    在全量同步阶段,DTS将占用源库和目标库一定的读写资源,可能会导致数据库的负载上升。您可以根据实际情况,选择是否对全量同步任务进行限速设置(设置每秒查询源库的速率QPS每秒全量迁移的行数RPS每秒全量迁移的数据量(MB)BPS),以缓解目标库的压力。

    说明

    仅当同步类型选择了全量同步时才可以配置。

    是否限制增量同步速率

    您也可以根据实际情况,选择是否对增量同步任务进行限速设置(设置每秒增量同步的行数RPS每秒增量同步的数据量(MB)BPS),以缓解目标库的压力。

    环境标签

    您可以根据实际情况,选择用于标识实例的环境标签。本示例无需选择。

    是否去除正反向任务的心跳表sql

    根据业务需求选择是否在DTS实例运行时,在源库中写入心跳SQL信息。

    • :不在源库中写入心跳SQL信息,DTS实例可能会显示有延迟。

    • :在源库中写入心跳SQL信息,可能会影响源库的物理备份和克隆等功能。

    配置ETL功能

    选择是否配置ETL功能。关于ETL的更多信息,请参见什么是ETL

    监控报警

    是否设置告警,当同步失败或延迟超过阈值后,将通知告警联系人。

  8. 保存任务并进行预检查。

    • 若您需要查看调用API接口配置该实例时的参数信息,请将鼠标光标移动至下一步保存任务并预检查按钮上,然后单击气泡中的预览OpenAPI参数

    • 若您无需查看或已完成查看API参数,请单击页面下方的下一步保存任务并预检查

    说明
    • 在同步作业正式启动之前,会先进行预检查。只有预检查通过后,才能成功启动同步作业。

    • 如果预检查失败,请单击失败检查项后的查看详情,并根据提示修复后重新进行预检查。

    • 如果预检查产生警告:

      • 对于不可以忽略的检查项,请单击失败检查项后的查看详情,并根据提示修复后重新进行预检查。

      • 对于可以忽略无需修复的检查项,您可以依次单击点击确认告警详情确认屏蔽确定重新进行预检查,跳过告警检查项重新进行预检查。如果选择屏蔽告警检查项,可能会导致数据不一致等问题,给业务带来风险。

  9. 预检查通过率显示为100%时,单击下一步购买

  10. 购买页面,选择数据同步实例的计费方式、链路规格,详细说明请参见下表。

    类别

    参数

    说明

    信息配置

    计费方式

    • 预付费(包年包月):在新建实例时支付费用。适合长期需求,价格比按量付费更实惠,且购买时长越长,折扣越多。

    • 后付费(按量付费):按小时扣费。适合短期需求,用完可立即释放实例,节省费用。

    资源组配置

    实例所属的资源组,默认为default resource group。更多信息,请参见什么是资源管理

    链路规格

    DTS为您提供了不同性能的同步规格,同步链路规格的不同会影响同步速率,您可以根据业务场景进行选择。更多信息,请参见数据同步链路规格说明

    订购时长

    在预付费模式下,选择包年包月实例的时长和数量,包月可选择1~9个月,包年可选择1年、2年、3年和5年。

    说明

    该选项仅在付费类型为预付费时出现。

  11. 配置完成后,阅读并勾选《数据传输(按量付费)服务条款》

  12. 单击购买并启动,同步任务正式开始,您可在数据同步界面查看具体任务进度。

增量日志表结构定义说明

说明

你需要在MaxCompute中执行set odps.sql.allow.fullscan=true;,设置项目空间属性,允许进行全表扫描。

DTS在将MySQL产生的增量数据同步至MaxCompute的增量日志表时,除了存储增量数据,还会存储一些元信息,示例如下。

增量日志表结构

说明

示例中的modifytime_yearmodifytime_monthmodifytime_daymodifytime_hourmodifytime_minute为分区字段,是在配置任务对象及高级配置步骤中指定的。

结构定义说明

字段

说明

record_id

增量日志的记录ID,为该日志唯一标识。

说明
  • ID的值唯一且递增。

  • 如果增量日志的操作类型为UPDATE,那么增量更新会被拆分成两条记录(分别记录更新前和更新后的值),且record_id的值相同。

operation_flag

操作类型,取值:

  • I:INSERT操作。

  • D:DELETE操作。

  • U:UPDATE操作。

utc_timestamp

操作时间戳,即binlog的时间戳(UTC 时间)。

before_flag

所有列的值是否为更新前的值,取值:Y或N。

after_flag

所有列的值是否为更新后的值,取值:Y或N。

关于before_flag和after_flag的补充说明

对于不同的操作类型,增量日志中的before_flagafter_flag定义如下:

  • INSERT

    当操作类型为INSERT时,所有列的值为新插入的记录值,即为更新后的值,所以before_flag取值为N,after_flag取值为Y,示例如下。

    INSERT操作示例

  • UPDATE

    当操作类型为UPDATE时,DTS会将UPDATE操作拆为两条增量日志。这两条增量日志的record_id、operation_flag及utc_timestamp对应的值相同。

    第一条增量日志记录了更新前的值,所以before_flag取值为Y,after_flag取值为N。第二条增量日志记录了更新后的值,所以before_flag取值为N,after_flag取值为Y,示例如下。

    UPDATE操作示例

  • DELETE

    当操作类型为DELETE时,增量日志中所有的列值为被删除的值,即列值不变,所以before_flag取值为Y,after_flag取值为N,示例如下。

    DELETE操作示例

全量数据合并示例

执行数据同步的操作后,DTS会在MaxCompute中分别创建该表的全量基线表和增量日志表。您可以通过MaxCompute的SQL命令,对这两个表执行合并操作,得到某个时间点的全量数据。

本案例以customer表为例(表结构如下),介绍操作流程。

customer表结构

  1. 根据源库中待同步表的结构,在MaxCompute中创建用于存储合并结果的表。

    例如,需要获取customer表在1565944878时间点的全量数据。为方便业务识别,创建如下数据表:

    CREATE TABLE `customer_1565944878` (
        `id` bigint NULL,
        `register_time` datetime NULL,
        `address` string);
    说明
  2. 在MaxCompute中执行如下SQL命令,合并全量基线表和增量日志表,获取该表在某一时间点的全量数据。

    set odps.sql.allow.fullscan=true;
    insert overwrite table <result_storage_table>
    select <col1>,
           <col2>,
           <colN>
      from(
    select row_number() over(partition by t.<primary_key_column>
     order by record_id desc, after_flag desc) as row_number, record_id, operation_flag, after_flag, <col1>, <col2>, <colN>
      from(
    select incr.record_id, incr.operation_flag, incr.after_flag, incr.<col1>, incr.<col2>,incr.<colN>
      from <table_log> incr
     where utc_timestamp< <timestamp>
     union all
    select 0 as record_id, 'I' as operation_flag, 'Y' as after_flag, base.<col1>, base.<col2>,base.<colN>
      from <table_base> base) t) gt
    where row_number=1 
      and after_flag='Y'
    说明
    • <result_storage_table>:存储全量merge结果集的表名。

    • <col1>/<col2>/<colN>:同步表中的列名。

    • <primary_key_column>:同步表中的主键列名。

    • <table_log>:增量日志表名。

    • <table_base>:全量基线表名。

    • <timestamp>:需要获取全量数据的时间点。

    合并数据表,获取customer表在1565944878时间点的全量数据,示例如下:

    set odps.sql.allow.fullscan=true;
    insert overwrite table customer_1565944878
    select id,
           register_time,
           address
      from(
    select row_number() over(partition by t.id
     order by record_id desc, after_flag desc) as row_number, record_id, operation_flag, after_flag, id, register_time, address
      from(
    select incr.record_id, incr.operation_flag, incr.after_flag, incr.id, incr.register_time, incr.address
      from customer_log incr
     where utc_timestamp< 1565944878
     union all
    select 0 as record_id, 'I' as operation_flag, 'Y' as after_flag, base.id, base.register_time, base.address
      from customer_base base) t) gt
     where gt.row_number= 1
       and gt.after_flag= 'Y';
  3. 上述命令执行完成后,可在customer_1565944878表中查询合并后的数据。

    查询merge后的数据