This topic describes how to synchronize incremental data from a Relational Database Service (RDS) database to MaxCompute. You can refer to this topic to synchronize incremental data in different scenarios.

Background information

Based on whether the data to be synchronized is subject to changes after being written, we can divide the data into unchanged historical data, generally the log data, and dynamically updated data, such as changes of the staff status in the staff table.

If the running results of a node remain the same when you run the node multiple times, you can schedule the node to rerun it. If an error occurs in the node, you can clear dirty data easily. This principle is called idempotence. According to this principle, each time when you write data, the data is written to a separate table or partition or overwrites the historical data in an existing table or partition.

In this topic, the running date of a sync node is set to November 14, 2016 and historical data is synchronized to the ds=20161113 partition on the same day. In the incremental synchronization scenario, automatic scheduling is configured to synchronize incremental data to the ds=20161114 partition in the early morning on November 15. The optime field indicates the data modification time and is used to determine whether data is incremental data.

Create a workflow

  1. Log on to the DataWorks console. In the left-side navigation pane, click Workspaces. On the Workspaces page, find the target workspace and click Data Analytics in the Actions column.
  2. In the Data Analytics section, right-click Business Flow and select Create Workflow.
  3. In the Create Workflow dialog box that appears, set Workflow Name and Description.
  4. Click Create.

Synchronize unchanged historical data in incremental mode

Historical data does not change after it is generated. Therefore, you can easily partition a table based on the pattern in which data is generated. Typically, you can partition a table by date, such as generating one partition per day.

  1. Execute the following statements in the RDS database to prepare data:
    drop table if exists oplog;
    create table if not exists oplog(
    optime DATETIME,
    uname varchar(50),
    action varchar(50),
    status varchar(10)
     );
    Insert into oplog values(str_to_date('2016-11-11','%Y-%m-%d'),'LiLei','SELECT','SUCCESS');
    Insert into oplog values(str_to_date('2016-11-12','%Y-%m-%d'),'HanMM','DESC','SUCCESS');

    The preceding two data entries are used as historical data. You must synchronize all the historical data to the ds=20161113 partition first.

  2. In the Data Analytics section, expand the created workflow, right-click Table under MaxCompute, and then select Create Table.
  3. In the Create Table dialog box that appears, set Table Name to ods_oplog and click Commit.
  4. On the editing tab of the ods_oplog table on the right side, click DDL Statement. In the DDL Statement dialog box that appears, enter the following statement for creating a MaxCompute table:
    -- Create a MaxCompute table and partition the table by day.
    create table if not exists ods_oplog(
     optime datetime,
     uname string,
     action string,
     status string
    ) partitioned by (ds string);
  5. Configure a sync node to synchronize historical data. For more information, see Create a batch synchronization node.
    After you test the sync node, click the Properties tab on the right side of the node editing tab. In the Properties dialog box, select Skip Execution and commit or publish the node again to prevent the node from being automatically scheduled to run.
  6. Execute the following statements to insert data into the RDS source table as incremental data:
    insert into oplog values(CURRENT_DATE,'Jim','Update','SUCCESS');
    insert into oplog values(CURRENT_DATE,'Kate','Delete','Failed'); 
    insert into oplog values(CURRENT_DATE,'Lily','Drop','Failed');
  7. Configure a sync node to synchronize incremental data.
    In the Source section, set Filter to date_format(optime,'%Y%m%d')=${bdp.system.bizdate}. In the Target section, enter ${bdp.system.bizdate} in the Partition Key Column field.
    Note By setting a data filter, you can query data inserted to the source table on November 14 and synchronize the data to the incremental data partition of the destination table in the early morning on November 15.
  8. View the incremental synchronization result.
    Click the Properties tab on the right side. In the Properties dialog box, set Instance Recurrence to Day. After you commit or publish the incremental sync node, the node is automatically scheduled to run from the next day. After the node is run, you can view data in the destination MaxCompute table.

Synchronize dynamically updated data in incremental mode

Based on the time-variant characteristic of data warehouses, we recommend that you daily synchronize all data in tables that are subject to changes, such as staff and order tables. In other words, full data is stored daily. In this way, you can retrieve historical and current data easily.

In actual scenarios, you only need to synchronize incremental data every day under special circumstances. MaxCompute does not support editing data with the UPDATE statement. Therefore, you can only use other methods to synchronize data. The following section describes how to synchronize data in full mode and in incremental mode.

  1. Execute the following statements to prepare data:
    drop table if exists user ;
    create table if not exists user(
        uid int,
        uname varchar(50),
        deptno int,
        gender VARCHAR(1),
        optime DATETIME
        );
    -- Insert historical data.
    insert into user values (1,'LiLei',100,'M',str_to_date('2016-11-13','%Y-%m-%d'));
    insert into user values (2,'HanMM',null,'F',str_to_date('2016-11-13','%Y-%m-%d'));
    insert into user values (3,'Jim',102,'M',str_to_date('2016-11-12','%Y-%m-%d'));
    insert into user values (4,'Kate',103,'F',str_to_date('2016-11-12','%Y-%m-%d'));
    insert into user values (5,'Lily',104,'F',str_to_date('2016-11-11','%Y-%m-%d'));
    -- Insert incremental data.
    update user set deptno=101,optime=CURRENT_TIME  where uid = 2; -- Change null to non-null.
    update user set deptno=104,optime=CURRENT_TIME  where uid = 3; -- Change non-null to non-null.
    update user set deptno=null,optime=CURRENT_TIME  where uid = 4; -- Change non-null to null.
    delete from user where uid = 5;
    insert into user(uid,uname,deptno,gender,optime) values (6,'Lucy',105,'F',CURRENT_TIME);
  2. Synchronize data.
    • Daily synchronize all data.
      1. Execute the following statement to create a MaxCompute table. For more information about how to create a MaxCompute table, see Create a table.
        -- Synchronize all data.
        create table ods_user_full(
            uid bigint,
            uname string,
            deptno bigint,
            gender string,
            optime DATETIME 
        ) partitioned by (ds string);ring);
      2. Configure a sync node to synchronize all data.
        Note Set Instance Recurrence to Day because daily full synchronization is required.
      3. Run the sync node and view data in the destination MaxCompute table after the synchronization is completed.

        When full synchronization is performed on a daily basis, no incremental synchronization is performed. You can view the data results in the table after the node is automatically scheduled to run on the next day.

    • Daily synchronize incremental data.

      We recommend that you do not use this sync mode except in scenarios where the DELETE statement is not supported and you fail to execute relevant SQL statements to view deleted data. Generally, your enterprise code is deleted logically, in which the UPDATE statement is applied instead of the DELETE statement. In scenarios where this method is inapplicable, using this sync mode may cause data inconsistency when a special condition is encountered. Another drawback is that you must merge new and historical data after the synchronization.

      Prepare data

      Create two tables, one for writing all the latest data and the other for writing incremental data.
      -- Create a result table.
      create table dw_user_inc(
          uid bigint,
          uname string,
          deptno bigint,
          gender string,
          optime DATETIME 
      );
      -- Create an incremental data table.
      create table ods_user_inc(
          uid bigint,
          uname string,
          deptno bigint,
          gender string,
          optime DATETIME 
      )
      1. Configure a sync node to write all data to the result table.
        Note You need to run the node only once. After running the node, click the Properties tab on the right side. In the Properties dialog box, select Skip Execution.
      2. Configure a sync node to write incremental data to the incremental data table. Set the data filter to date_format(optime,'%Y%m%d')=${bdp.system.bizdate}.
      3. Execute the following statement to merge data:
        insert overwrite table dw_user_inc 
        select 
        -- The following lists all the SELECT clauses. If the incremental data table contains data, data in the result table changes. In this case, use data in the incremental data table.
        case when b.uid is not null then b.uid else a.uid end as uid,
        case when b.uid is not null then b.uname else a.uname end as uname,
        case when b.uid is not null then b.deptno else a.deptno end as deptno,
        case when b.uid is not null then b.gender else a.gender end as gender,
        case when b.uid is not null then b.optime else a.optime end as optime
        from 
        dw_user_inc a 
        full outer join ods_user_inc b
        on a.uid  = b.uid ;

        View the merge result. It is found that the deleted data entry is not synchronized.

    Daily incremental synchronization is advantageous in that it synchronizes only a small amount of incremental data. However, it may cause data inconsistency, requiring an extra computing workload to merge data.

    If not necessary, daily synchronize dynamically updated data in full mode. In addition, you can set a lifecycle for the historical data so that it can be automatically deleted after being retained for a certain period.