すべてのプロダクト
Search
ドキュメントセンター

DataWorks:ApsaraDB RDS から MaxCompute への増分データの同期

最終更新日:Jul 05, 2025

このトピックでは、ApsaraDB RDS データベースから MaxCompute に増分データを同期する方法について説明します。このトピックを参照して、さまざまなシナリオで増分データを同期できます。

背景情報

同期されるデータは、生成後に変更されないデータと、生成後に常に更新されるデータに分類されます。たとえば、ログエントリは生成後に変更されませんが、ユーザーデータはユーザーの状態の変化により常に更新される可能性があります。

複数回実行されるノードの実行結果が同じままである場合は、ノードを再実行するようにスケジュールできます。ノードでエラーが発生した場合は、ダーティデータを削除できます。この原則はべき等性と呼ばれます。データを書き込むたびに、データは別のテーブルまたはパーティションに書き込まれるか、既存のテーブルまたはパーティションの履歴データを上書きします。

このトピックでは、データ同期ノードのスケジュール日が 2016 年 11 月 14 日に設定されており、履歴データはこの日に ds=20161113 パーティションに同期されます。増分同期シナリオでは、自動スケジューリングは、2016 年 11 月 15 日の早朝に増分データを ds=20161114 パーティションに同期するように構成されています。optime フィールドは、データレコードが変更された時刻を示します。このフィールドは、データレコードが増分データであるかどうかを判断するために使用されます。

使用上の注意

生成後に変更されないデータの増分データを同期する

生成後に変更されないデータの場合、データが生成されるパターンに基づいてテーブルをパーティション化できます。ほとんどの場合、1 日 1 パーティションなど、日付ごとにテーブルをパーティション化できます。

  1. 次のステートメントを実行して、ApsaraDB RDS データベースにデータを準備します。

    drop table if exists oplog;
    create table if not exists oplog(
    optime DATETIME,
    uname varchar(50),
    action varchar(50),
    status varchar(10)
     );
    Insert into oplog values(str_to_date('2016-11-11','%Y-%m-%d'),'LiLei','SELECT','SUCCESS');
    Insert into oplog values(str_to_date('2016-11-12','%Y-%m-%d'),'HanMM','DESC','SUCCESS');

    上記のデータエントリは履歴データとして使用されます。最初にすべての履歴データを ds=20161113 パーティションに同期する必要があります。

  2. DataStudio ページの [スケジュール済みワークフロー] ペインで、作成したワークフローを展開し、MaxCompute の下の [テーブル] を右クリックして、[テーブルの作成] を選択します。

  3. [テーブルの作成] ダイアログボックスで、[名前] パラメーターを ods_oplog に設定し、[作成] をクリックします。

  4. ods_oplog テーブルの構成タブで、[DDL] をクリックします。 DDL ダイアログボックスで、次のステートメントを入力して MaxCompute テーブルを作成します。

    -- MaxCompute テーブルを作成し、日ごとにテーブルをパーティション化します。
    create table if not exists ods_oplog(
     optime datetime,
     uname string,
     action string,
     status string
    ) partitioned by (ds string);
  5. 履歴データを同期するようにデータ同期ノードを構成します。詳細については、「コードレス UI を使用してバッチ同期タスクを構成する」をご参照ください。

    データ同期ノードのテストが完了したら、ノードの構成タブの右側ナビゲーションペインにある [プロパティ] をクリックします。[プロパティ] タブで、[繰り返し] パラメーターの [実行をスキップ] を選択し、ノードが自動的にスケジュールされないように、ノードを再度コミットまたはデプロイします。

  6. 次のステートメントを実行して、ApsaraDB RDS データベースのソーステーブルに増分データとしてデータを挿入します。

    insert into oplog values(CURRENT_DATE,'Jim','Update','SUCCESS');
    insert into oplog values(CURRENT_DATE,'Kate','Delete','Failed'); 
    insert into oplog values(CURRENT_DATE,'Lily','Drop','Failed');
  7. 増分データを同期するようにデータ同期ノードを構成します。

    ノードの [ソース] を構成するときに、[フィルター] フィールドに date_format(optime,'%Y%m%d')=${bdp.system.bizdate} と入力します。ノードの [宛先] を構成するときに、[パーティションキー列] フィールドに ${bdp.system.bizdate} と入力します。

    説明

    フィルター条件を指定すると、2016 年 11 月 14 日にソーステーブルに挿入されたデータをクエリし、2016 年 11 月 15 日の早朝に宛先テーブルの増分データパーティションにデータを同期できます。

  8. 増分同期の結果を表示します。

    ノードの構成タブの右側ナビゲーションペインにある [プロパティ] タブをクリックします。[プロパティ] タブで、[スケジューリングサイクル] パラメーターを [日] に設定します。増分データの同期に使用するデータ同期ノードをコミットまたはデプロイすると、ノードは翌日実行するように自動的にスケジュールされます。ノードが正常に実行された後、宛先 MaxCompute テーブルのデータを表示できます。

生成後に常に更新されるデータの増分データを同期する

データウェアハウスには時変特性があります。そのため、ユーザーテーブルや注文テーブルなど、変更が生成されるテーブルのすべてのデータを毎日同期することをお勧めします。このようにして、完全なデータが毎日保存され、履歴データと現在のデータを取得できます。

特別なシナリオでは、増分データのみを毎日同期する必要がある場合があります。 MaxCompute では、UPDATE ステートメントを使用してデータを変更することはできません。そのため、他の方法を使用して増分データを同期する必要があります。このセクションでは、完全なデータを毎日同期する方法と、一度に完全なデータを同期してから増分データを毎日同期する方法について説明します。

  1. 次のステートメントを実行して、データを準備します。

    drop table if exists user ;
    create table if not exists user(
        uid int,
        uname varchar(50),
        deptno int,
        gender VARCHAR(1),
        optime DATETIME
        );
    -- 履歴データを挿入します。
    insert into user values (1,'LiLei',100,'M',str_to_date('2016-11-13','%Y-%m-%d'));
    insert into user values (2,'HanMM',null,'F',str_to_date('2016-11-13','%Y-%m-%d'));
    insert into user values (3,'Jim',102,'M',str_to_date('2016-11-12','%Y-%m-%d'));
    insert into user values (4,'Kate',103,'F',str_to_date('2016-11-12','%Y-%m-%d'));
    insert into user values (5,'Lily',104,'F',str_to_date('2016-11-11','%Y-%m-%d'));
    -- 増分データを挿入します。
    update user set deptno=101,optime=CURRENT_TIME  where uid = 2; -- null 値を null 以外の値に変更します。
    update user set deptno=104,optime=CURRENT_TIME  where uid = 3; -- null 以外の値を他の null 以外の値に変更します。
    update user set deptno=null,optime=CURRENT_TIME  where uid = 4; -- null 以外の値を null 値に変更します。
    delete from user where uid = 5;
    insert into user(uid,uname,deptno,gender,optime) values (6,'Lucy',105,'F',CURRENT_TIME);
  2. データを同期します。

    • 完全なデータを毎日同期します。

      1. 次のステートメントを実行して、MaxCompute テーブルを作成します。詳細については、「MaxCompute テーブルを作成する」をご参照ください。

        -- 完全なデータを同期します。
        create table ods_user_full(
            uid bigint,
            uname string,
            deptno bigint,
            gender string,
            optime DATETIME 
        ) partitioned by (ds string);
      2. 完全なデータを同期するようにデータ同期ノードを構成します。

        説明

        完全なデータは毎日同期する必要があるため、[スケジューリングサイクル] パラメーターを [日] に設定します。

      3. データ同期ノードを実行し、同期が完了したら、宛先 MaxCompute テーブルのデータを表示します。

        完全同期が毎日実行されると、増分データも毎日同期されます。ノードが翌日実行するように自動的にスケジュールされた後、テーブルのデータ結果を表示できます。

    • 増分データを毎日同期します。

      DELETE ステートメントがサポートされておらず、関連する SQL ステートメントを実行して削除されたデータを表示できないシナリオを除いて、この同期モードを使用しないことをお勧めします。ほとんどの場合、DELETE ステートメントの代わりに UPDATE ステートメントを使用してデータを削除できます。この方法が適用できないシナリオでは、このデータ同期モードによってデータの不整合が発生する可能性があります。さらに、このデータ同期モードを使用する場合は、データ同期の後に新しいデータと履歴データをマージする必要があります。

      データを準備する

      2 つのテーブルを作成します。1 つのテーブルはすべての最新データを格納するために使用され、もう 1 つのテーブルは増分データを格納するために使用されます。

      -- 結果テーブルを作成します。
      create table dw_user_inc(
          uid bigint,
          uname string,
          deptno bigint,
          gender string,
          optime DATETIME 
      );
      -- 増分データテーブルを作成します。
      create table ods_user_inc(
          uid bigint,
          uname string,
          deptno bigint,
          gender string,
          optime DATETIME 
      )
      1. 結果テーブルに完全なデータを書き込むようにデータ同期ノードを構成します。

        説明

        ノードを実行する必要があるのは 1 回だけです。ノードを実行した後、ノードの構成タブの右側ナビゲーションペインにある [プロパティ] タブをクリックし、[プロパティ] タブの [繰り返し] パラメーターの [実行をスキップ] を選択します。

      2. 増分データテーブルに増分データを書き込むようにデータ同期ノードを構成します。データをフィルタリングするには、フィルター条件 date_format(optime,'%Y%m%d')=${bdp.system.bizdate} を指定します。

      3. 次のステートメントを実行してデータをマージします。

        insert overwrite table dw_user_inc 
        select 
        -- すべての SELECT 操作がリストされています。増分データテーブルにデータが含まれている場合、ソースのデータが変更されます。この場合、増分データテーブルのデータが最新データです。
        case when b.uid is not null then b.uid else a.uid end as uid,
        case when b.uid is not null then b.uname else a.uname end as uname,
        case when b.uid is not null then b.deptno else a.deptno end as deptno,
        case when b.uid is not null then b.gender else a.gender end as gender,
        case when b.uid is not null then b.optime else a.optime end as optime
        from 
        dw_user_inc a 
        full outer join ods_user_inc b
        on a.uid  = b.uid ;

        マージ結果を表示します。結果は、削除されたデータエントリが同期されていないことを示しています。

    毎日の増分同期には、毎日少量のデータのみを同期する必要があるという利点があります。ただし、データの不整合が発生する可能性があり、データをマージするために余分な計算ワークロードが必要になります。

    毎日完全モードで常に変化するデータを同期することをお勧めします。さらに、履歴データの ライフサイクル を指定できます。このようにして、履歴データは特定の期間保持された後に自動的に削除されます。