This topic describes how to synchronize incremental data from a Relational Database Service (RDS) database to MaxCompute. You can refer to this topic to synchronize incremental data in different scenarios.

Background information

Data to be synchronized is divided into historical data and incremental data. Historical data, which is generally the log data, remains unchanged after it is written to the destination. Incremental data, such as changes of the staff status in the staff table, dynamically changes after it is written to the destination.

If the execution results of a node remain the same when you run the node multiple times, you can schedule the node to rerun it. If an error occurs in the node, you can clear dirty data. This principle is called idempotence. Each time when you write data, the data is written to a separate table or partition or overwrites the historical data in an existing table or partition.

In this topic, the scheduled date of a sync node is set to November 14, 2016, and historical data is synchronized to the ds=20161113 partition on the same day. In the incremental synchronization scenario, automatic scheduling is configured to synchronize incremental data to the ds=20161114 partition on the early morning of November 15. The optime field indicates the time when the data is modified. You can use this field to determine whether the data is incremental data.

Usage notes

  • Incremental synchronization is not supported for some data sources, such as HBase and OTSStream. You can refer to the topics that introduce the Reader plug-ins of the related data sources to check whether incremental synchronization is supported.
  • You may need to set different parameters if you use different Reader plug-ins to synchronize incremental data. For more information, see Supported data sources, readers, and writers. The following table provides examples of the required parameters and supported syntax.
    Reader plug-in Parameter required for incremental synchronization Supported syntax
    MySQL where
    Note If you configure a sync node by using the codeless UI, you must set the Filter parameter.
    Use the syntax of the database.
    Note You can set scheduling parameters to read the data that is generated during a specified period of time every day.
    MongoDB query
    Note If you configure a sync node by using the codeless UI, you must set the Search Condition parameter.
    Use the syntax of the database.
    Note You can set scheduling parameters to read the data that is generated during a specified period of time every day.
    OSS Object Specify the object path.
    Note You can set scheduling parameters to read data from specified objects every day.
    ... ... ...
  • The scheduling parameters are automatically set based on the data timestamp of the sync node. This way, incremental data generated each day is synchronized. For more information about scheduling parameters, see Configure scheduling parameters.

    In the following example, the daily incremental data of a MySQL database is written to the corresponding partition of a MaxCompute table.

    Example of incremental synchronization

Create a workflow

  1. Log on to the DataWorks console.
  2. In the left-side navigation pane, click Workspaces.
  3. After you select the region in which the workspace that you want to manage resides, find the workspace and click Data Analytics in the Actions column.
  4. In the Scheduled Workflow pane, right-click Business Flow and select Create Workflow.
  5. In the Create Workflow dialog box, set the Workflow Name and Description parameters.
    Note The workflow name must be 1 to 128 characters in length.
  6. Click Create.

Synchronize unchanged historical data in incremental mode

Historical data does not change after it is generated. Therefore, you can partition a table based on the pattern in which data is generated. Typically, you can partition a table by date, such as one partition per day.

  1. Execute the following statements in the RDS database to prepare data:
    drop table if exists oplog;
    create table if not exists oplog(
    optime DATETIME,
    uname varchar(50),
    action varchar(50),
    status varchar(10)
     );
    Insert into oplog values(str_to_date('2016-11-11','%Y-%m-%d'),'LiLei','SELECT','SUCCESS');
    Insert into oplog values(str_to_date('2016-11-12','%Y-%m-%d'),'HanMM','DESC','SUCCESS');

    The preceding two data entries are used as historical data. You must synchronize all the historical data to the ds=20161113 partition first.

  2. In the Scheduled Workflow pane, expand the created workflow, right-click Table under MaxCompute, and then select Create Table.
  3. In the Create Table dialog box, set the Table Name parameter to ods_oplog and click Create.
  4. On the configuration tab of the ods_oplog table on the right side, click DDL Statement. In the DDL Statement dialog box, enter the following statement to create a MaxCompute table:
    -- Create a MaxCompute table and partition the table by day. 
    create table if not exists ods_oplog(
     optime datetime,
     uname string,
     action string,
     status string
    ) partitioned by (ds string);
  5. Configure a sync node to synchronize historical data. For more information, see Create a sync node.
    After you test the sync node, click Properties in the right-side navigation pane. In the Properties panel, select Skip Execution and commit or deploy the node again to prevent the node from being automatically scheduled.
  6. Execute the following statements to insert data into the source RDS table as incremental data:
    insert into oplog values(CURRENT_DATE,'Jim','Update','SUCCESS');
    insert into oplog values(CURRENT_DATE,'Kate','Delete','Failed'); 
    insert into oplog values(CURRENT_DATE,'Lily','Drop','Failed');
  7. Configure a sync node to synchronize incremental data.
    In the Source section, enter date_format(optime,'%Y%m%d')=${bdp.system.bizdate} in the Filter field. In the Target section, enter ${bdp.system.bizdate} in the Partition Key Column field.
    Note By setting a data filter, you can query data that is inserted into the source table on November 14 and synchronize the data to the incremental data partition of the destination table on the early morning of November 15.
  8. View the incremental synchronization result.
    Click the Properties tab in the right-side navigation pane. In the Properties panel, set the Scheduling Cycle parameter to Day. After you commit or deploy the incremental sync node, the node is automatically scheduled to run from the next day. After the node is run, you can view data in the destination MaxCompute table.

Synchronize dynamically updated data in incremental mode

Because of the time-variant characteristic of data warehouses, we recommend that you daily synchronize all data in tables that are subject to changes, such as staff and order tables. In other words, full data is stored on a daily basis. This way, you can retrieve historical and current data.

In actual scenarios, you may synchronize only incremental data every day under special circumstances. MaxCompute does not support editing data by using the UPDATE statement. Therefore, you can only use other methods to synchronize data. The following section describes how to synchronize data in full mode and in incremental mode.

  1. Execute the following statements to prepare data:
    drop table if exists user ;
    create table if not exists user(
        uid int,
        uname varchar(50),
        deptno int,
        gender VARCHAR(1),
        optime DATETIME
        );
    -- Insert historical data.
    insert into user values (1,'LiLei',100,'M',str_to_date('2016-11-13','%Y-%m-%d'));
    insert into user values (2,'HanMM',null,'F',str_to_date('2016-11-13','%Y-%m-%d'));
    insert into user values (3,'Jim',102,'M',str_to_date('2016-11-12','%Y-%m-%d'));
    insert into user values (4,'Kate',103,'F',str_to_date('2016-11-12','%Y-%m-%d'));
    insert into user values (5,'Lily',104,'F',str_to_date('2016-11-11','%Y-%m-%d'));
    -- Insert incremental data.
    update user set deptno=101,optime=CURRENT_TIME  where uid = 2; -- Change null to non-null.
    update user set deptno=104,optime=CURRENT_TIME  where uid = 3; -- Change non-null to non-null.
    update user set deptno=null,optime=CURRENT_TIME  where uid = 4; -- Change non-null to null.
    delete from user where uid = 5;
    insert into user(uid,uname,deptno,gender,optime) values (6,'Lucy',105,'F',CURRENT_TIME);
  2. Synchronize data.
    • Daily synchronize all data.
      1. Execute the following statement to create a MaxCompute table. For more information, see Create a MaxCompute table.
        -- Synchronize all data.
        create table ods_user_full(
            uid bigint,
            uname string,
            deptno bigint,
            gender string,
            optime DATETIME 
        ) partitioned by (ds string);
      2. Configure a sync node to synchronize all data.
        Note Set the Scheduling Cycle parameter to Day because daily full data synchronization is required.
      3. Run the sync node and view data in the destination MaxCompute table after the synchronization is complete.

        When full data synchronization is performed on a daily basis, no incremental synchronization is performed. You can view the data results in the table after the node is automatically scheduled to run on the next day.

    • Daily synchronize incremental data.

      We recommend that you do not use this sync mode except in scenarios in which the DELETE statement is not supported and you fail to execute relevant SQL statements to view deleted data. Generally, your enterprise code is logically deleted, in which the UPDATE statement is applied instead of the DELETE statement. In scenarios in which this method is inapplicable, this sync mode may cause data inconsistency if a special condition is encountered. Another drawback is that you must merge new and historical data after the synchronization.

      Prepare data

      Create two tables, one for writing all the latest data and the other for writing incremental data.
      -- Create a result table.
      create table dw_user_inc(
          uid bigint,
          uname string,
          deptno bigint,
          gender string,
          optime DATETIME 
      );
      -- Create an incremental data table.
      create table ods_user_inc(
          uid bigint,
          uname string,
          deptno bigint,
          gender string,
          optime DATETIME 
      )
      1. Configure a sync node to write all data to the result table.
        Note You need only to run the node once. After you run the node, click the Properties tab in the right-side navigation pane. In the Properties panel, select Skip Execution for the Recurrence parameter.
      2. Configure a sync node to write incremental data to the incremental data table. To filter the data, set the where parameter to date_format(optime,'%Y%m%d')=${bdp.system.bizdate}.
      3. Execute the following statement to merge data:
        insert overwrite table dw_user_inc 
        select 
        -- All the SELECT clauses are listed. If the incremental data table contains data, data in the result table changes. In this case, use data in the incremental data table. 
        case when b.uid is not null then b.uid else a.uid end as uid,
        case when b.uid is not null then b.uname else a.uname end as uname,
        case when b.uid is not null then b.deptno else a.deptno end as deptno,
        case when b.uid is not null then b.gender else a.gender end as gender,
        case when b.uid is not null then b.optime else a.optime end as optime
        from 
        dw_user_inc a 
        full outer join ods_user_inc b
        on a.uid  = b.uid ;

        View the merge result. It is found that the deleted data entry is not synchronized.

    Daily incremental synchronization is advantageous because it synchronizes only a small amount of incremental data. However, it may cause data inconsistency, which requires an extra computing workload to merge data.

    If not necessary, daily synchronize dynamically updated data in full mode. In addition, you can set a lifecycle for the historical data so that it can be automatically deleted after being retained for a specific period.