This topic describes how to synchronize incremental data from an ApsaraDB RDS database to MaxCompute. You can refer to this topic to synchronize incremental data in different scenarios.

Background information

Data to be synchronized is classified into data that does not change after generation and data that is constantly updated after generation. For example, log entries do not change after they are generated, and user data may be constantly updated due to the changes of user status.

If the running results of a node that is run multiple times remain the same, you can schedule the node to rerun it. If an error occurs on the node, you can delete dirty data. This principle is called idempotence. Each time when you write data, the data is written to a separate table or partition or overwrites the historical data in an existing table or partition.

In this topic, the scheduled date of a data synchronization node is set to November 14, 2016, and historical data is synchronized to the ds=20161113 partition on this day. In the incremental synchronization scenario, automatic scheduling is configured to synchronize incremental data to the ds=20161114 partition on the early morning of November 15, 2016. The optime field indicates the time when a data record is modified. This field is used to determine whether a data record is incremental data.

Usage notes

  • Incremental synchronization is not supported for specific types of data sources, such as HBase and OTSStream data sources. You can refer to the topics for the Reader plug-ins of the related data sources to check whether incremental synchronization is supported.
  • The parameters that you must configure vary based on the Reader plug-ins that you use to synchronize incremental data. For more information, see Supported data source types, readers, and writers.
  • For information about how to configure a batch synchronization node to synchronize incremental data, see Synchronize incremental data.

Synchronize the incremental data of data that does not change after generation

For data that does not change after it is generated, you can partition a table based on the pattern in which data is generated. In most cases, you can partition a table by date, such as one partition per day.

  1. Execute the following statements to prepare data in the ApsaraDB RDS database:
    drop table if exists oplog;
    create table if not exists oplog(
    optime DATETIME,
    uname varchar(50),
    action varchar(50),
    status varchar(10)
     );
    Insert into oplog values(str_to_date('2016-11-11','%Y-%m-%d'),'LiLei','SELECT','SUCCESS');
    Insert into oplog values(str_to_date('2016-11-12','%Y-%m-%d'),'HanMM','DESC','SUCCESS');

    The preceding data entries are used as historical data. You must first synchronize all historical data to the ds=20161113 partition.

  2. In the Scheduled Workflow pane of the DataStudio page, expand the created workflow, right-click Table under MaxCompute, and then select Create Table.
  3. In the Create Table dialog box, set the Name parameter to ods_oplog and click Create.
  4. On the configuration tab of the ods_oplog table, click DDL. In the DDL dialog box, enter the following statement to create a MaxCompute table:
    -- Create a MaxCompute table and partition the table by day. 
    create table if not exists ods_oplog(
     optime datetime,
     uname string,
     action string,
     status string
    ) partitioned by (ds string);
  5. Configure a data synchronization node to synchronize historical data. For more information, see Configure a batch synchronization node by using the codeless UI.
    After the test on the data synchronization node is complete, click Properties in the right-side navigation pane of the configuration tab of the node. On the Properties tab, select Skip Execution for the Recurrence parameter and commit or deploy the node again to prevent the node from being automatically scheduled.
  6. Execute the following statements to insert data into the source table in the ApsaraDB RDS database as incremental data:
    insert into oplog values(CURRENT_DATE,'Jim','Update','SUCCESS');
    insert into oplog values(CURRENT_DATE,'Kate','Delete','Failed'); 
    insert into oplog values(CURRENT_DATE,'Lily','Drop','Failed');
  7. Configure a data synchronization node to synchronize incremental data.
    When you configure the source for the node, enter date_format(optime,'%Y%m%d')=${bdp.system.bizdate} in the Filter field. When you configure the destination for the node, enter ${bdp.system.bizdate} in the Partition Key Column field.
    Note After you specify a filter condition, you can query data that is inserted into the source table on November 14, 2016 and synchronize the data to the incremental data partition of the destination table on the early morning of November 15, 2016.
  8. View the incremental synchronization result.
    Click the Properties tab in the right-side navigation pane of the configuration tab of the node. On the Properties tab, set the Scheduling Cycle parameter to Day. After you commit or deploy the data synchronization node that is used to synchronize incremental data, the node is automatically scheduled to run on the next day. After the node is successfully run, you can view the data in the destination MaxCompute table.

Synchronize the incremental data of data that is constantly updated after generation

Data warehouses have a time-variant characteristic. Therefore, we recommend that you synchronize all data in tables on which changes are generated, such as user and order tables, on a daily basis. This way, full data is stored on a daily basis, and you can retrieve historical and current data.

In some special scenarios, you may need to synchronize only incremental data on a daily basis. MaxCompute does not allow you to change data by using the UPDATE statement. Therefore, you must use other methods to synchronize incremental data. This section describes how to synchronize full data on a daily basis and how to synchronize full data at a time and then incremental data on a daily basis.

  1. Execute the following statements to prepare data:
    drop table if exists user ;
    create table if not exists user(
        uid int,
        uname varchar(50),
        deptno int,
        gender VARCHAR(1),
        optime DATETIME
        );
    -- Insert historical data.
    insert into user values (1,'LiLei',100,'M',str_to_date('2016-11-13','%Y-%m-%d'));
    insert into user values (2,'HanMM',null,'F',str_to_date('2016-11-13','%Y-%m-%d'));
    insert into user values (3,'Jim',102,'M',str_to_date('2016-11-12','%Y-%m-%d'));
    insert into user values (4,'Kate',103,'F',str_to_date('2016-11-12','%Y-%m-%d'));
    insert into user values (5,'Lily',104,'F',str_to_date('2016-11-11','%Y-%m-%d'));
    -- Insert incremental data.
    update user set deptno=101,optime=CURRENT_TIME  where uid = 2; -- Change null values to non-null values.
    update user set deptno=104,optime=CURRENT_TIME  where uid = 3; -- Change non-null values to other non-null values.
    update user set deptno=null,optime=CURRENT_TIME  where uid = 4; -- Change non-null values to null values.
    delete from user where uid = 5;
    insert into user(uid,uname,deptno,gender,optime) values (6,'Lucy',105,'F',CURRENT_TIME);
  2. Synchronize data.
    • Synchronize full data on a daily basis.
      1. Execute the following statement to create a MaxCompute table. For more information, see Create a MaxCompute table.
        -- Synchronize full data.
        create table ods_user_full(
            uid bigint,
            uname string,
            deptno bigint,
            gender string,
            optime DATETIME 
        ) partitioned by (ds string);
      2. Configure a data synchronization node to synchronize full data.
        Note Set the Scheduling Cycle parameter to Day because full data must be synchronized on a daily basis.
      3. Run the data synchronization node and view the data in the destination MaxCompute table after the synchronization is complete.

        When full synchronization is performed on a daily basis, incremental data is also synchronized every day. You can view the data results in the table after the node is automatically scheduled to run on the next day.

    • Synchronize incremental data on a daily basis.

      We recommend that you do not use this synchronization mode except in scenarios in which the DELETE statement is not supported and you fail to execute related SQL statements to view the deleted data. In most cases, data can be deleted by using the UPDATE statement instead of the DELETE statement. In scenarios in which this method is inapplicable, this data synchronization mode may cause data inconsistency. In addition, if you use this data synchronization mode, you must merge new and historical data after data synchronization.

      Prepare data

      Create two tables. One table is used to store all the latest data, and the other table is used to store incremental data.
      -- Create a result table.
      create table dw_user_inc(
          uid bigint,
          uname string,
          deptno bigint,
          gender string,
          optime DATETIME 
      );
      -- Create an incremental data table.
      create table ods_user_inc(
          uid bigint,
          uname string,
          deptno bigint,
          gender string,
          optime DATETIME 
      )
      1. Configure a data synchronization node to write full data to the result table.
        Note You need to run the node only once. After you run the node, click the Properties tab in the right-side navigation pane of the configuration tab of the node, and select Skip Execution for the Recurrence parameter on the Properties tab.
      2. Configure a data synchronization node to write incremental data to the incremental data table. To filter the data, specify the filter condition date_format(optime,'%Y%m%d')=${bdp.system.bizdate}.
      3. Execute the following statements to merge data:
        insert overwrite table dw_user_inc 
        select 
        -- All SELECT operations are listed. If the incremental data table contains data, data in the source changes. In this case, data in the incremental data table is the latest data. 
        case when b.uid is not null then b.uid else a.uid end as uid,
        case when b.uid is not null then b.uname else a.uname end as uname,
        case when b.uid is not null then b.deptno else a.deptno end as deptno,
        case when b.uid is not null then b.gender else a.gender end as gender,
        case when b.uid is not null then b.optime else a.optime end as optime
        from 
        dw_user_inc a 
        full outer join ods_user_inc b
        on a.uid  = b.uid ;

        View the merge result. The result shows that the deleted data entry is not synchronized.

    Daily incremental synchronization provides the benefit that only a small amount of data needs to be synchronized every day. However, data inconsistency may occur, which requires an extra computing workload to merge data.

    We recommend that you synchronize constantly changing data in full mode on a daily basis. In addition, you can specify a lifecycle for the historical data. This way, the historical data can be automatically deleted after it is retained for a specific period of time.