本文以同步业务RDS数据库的数据至MaxCompute为例,为您介绍如何对不同场景的数据进行增量同步。
背景信息
根据需要同步的数据在写入后是否发生变化,分为恒定的存量数据(通常是日志数据)和持续更新的数据(例如人员表中,人员的状态会发生变化)。
根据幂等性原则(一个任务多次运行的结果一致,则该任务支持重跑调度。如果该任务出现错误,脏数据较容易清理),每次导入数据都是导入至一张单独的表或分区中,或者覆盖历史记录。
本文定义任务测试时间是2016年11月14日,在14日进行增量同步,同步历史数据至分区ds=20161113中。增量同步的场景配置了自动调度,把增量数据在15日凌晨同步至分区ds=20161114中。数据中的时间字段optime用来表示该数据的修改时间,从而判断这条数据是否为增量数据。
使用说明
- 部分数据源暂无增量同步方案,例如HBase、OTSStream数据源等。具体数据源是否支持增量同步可以看具体的Reader插件文档。
- 每个插件实现增量同步的所配置的参数可能不同,具体参数配置可以参考对应的Reader插件文档,详情可参考:支持的数据源与读写插件。
- 增量同步配置相关说明详情请参见:配置增量数据离线同步任务。
对恒定的存量数据进行增量同步
由于数据生成后不会发生变化,因此可以很方便地根据数据的生成规律进行分区。较常见的是根据日期进行分区,例如每天1个分区。
- 在RDS数据库中,执行下述语句准备数据。
drop table if exists oplog;
create table if not exists oplog(
optime DATETIME,
uname varchar(50),
action varchar(50),
status varchar(10)
);
Insert into oplog values(str_to_date('2016-11-11','%Y-%m-%d'),'LiLei','SELECT','SUCCESS');
Insert into oplog values(str_to_date('2016-11-12','%Y-%m-%d'),'HanMM','DESC','SUCCESS');
上述的两条数据作为历史数据,需要先进行一次全量数据同步,将历史数据同步至昨天的分区。
- 在数据开发页面,右键单击业务流程下的表,选择新建表。
- 在新建表对话框中,输入表名(ods_oplog),单击提交。
- 双击ods_oplog表,在右侧的编辑页面单击DDL模式,输入下述建表语句。
--创建好MaxCompute表,按天进行分区。
create table if not exists ods_oplog(
optime datetime,
uname string,
action string,
status string
) partitioned by (ds string);
- 配置同步历史数据的任务,详情请参见通过向导模式配置离线同步任务。
测试同步任务成功后,单击节点编辑页面右侧的调度配置,勾选暂停调度并重新提交或发布,避免任务自动调度执行。
- 执行下述语句,向RDS源头表中插入数据作为增量数据。
insert into oplog values(CURRENT_DATE,'Jim','Update','SUCCESS');
insert into oplog values(CURRENT_DATE,'Kate','Delete','Failed');
insert into oplog values(CURRENT_DATE,'Lily','Drop','Failed');
- 配置同步增量数据的任务。
在
数据来源中设置数据过滤为
date_format(optime,'%Y%m%d')=${bdp.system.bizdate}
,在
数据去向中设置分区信息为
${bdp.system.bizdate}
。
说明 通过配置数据过滤,在15日凌晨进行同步时,您可以查询14日源头表全天新增的数据,并同步至目标表的增量分区中。
- 查看同步结果。
单击节点编辑页面右侧的调度配置,设置任务的调度周期为天调度。提交或发布任务后,第2天任务将自动调度执行。执行成功后,即可查看MaxCompute目标表的数据。
对持续更新的数据进行增量同步
根据数据仓库反映历史变化的特点,建议每天对人员表、订单表等会发生变化的数据进行全量同步,即每天保存的都是全量数据,方便您获取历史数据和当前数据。
真实场景中因为某些特殊情况,需要每天只进行增量同步。但MaxCompute不支持Update语句修改数据,只能通过其它方式实现。下文将为您介绍两种同步策略(全量同步、增量同步)的具体操作。
- 执行下述语句准备数据。
drop table if exists user ;
create table if not exists user(
uid int,
uname varchar(50),
deptno int,
gender VARCHAR(1),
optime DATETIME
);
--历史数据
insert into user values (1,'LiLei',100,'M',str_to_date('2016-11-13','%Y-%m-%d'));
insert into user values (2,'HanMM',null,'F',str_to_date('2016-11-13','%Y-%m-%d'));
insert into user values (3,'Jim',102,'M',str_to_date('2016-11-12','%Y-%m-%d'));
insert into user values (4,'Kate',103,'F',str_to_date('2016-11-12','%Y-%m-%d'));
insert into user values (5,'Lily',104,'F',str_to_date('2016-11-11','%Y-%m-%d'));
--增量数据
update user set deptno=101,optime=CURRENT_TIME where uid = 2; --null改成非null
update user set deptno=104,optime=CURRENT_TIME where uid = 3; --非null改成非null
update user set deptno=null,optime=CURRENT_TIME where uid = 4; --非null改成null
delete from user where uid = 5;
insert into user(uid,uname,deptno,gender,optime) values (6,'Lucy',105,'F',CURRENT_TIME);
- 进行数据同步。
每天增量同步的优点是同步的增量数据量较小,但可能出现数据不一致的情况,并且需要通过额外的计算进行数据合并。
如果不是必要情况,对持续更新的数据进行每天全量同步即可。如果希望历史数据仅保留一定的时间,自动删除超出保留时间的数据,您可以设置Lifecycle。