全部产品
Search
文档中心

DataWorks:Sinkronkan data inkremental dari ApsaraDB RDS ke MaxCompute

更新时间:Jul 06, 2025

Topik ini menjelaskan cara menyinkronkan data inkremental dari database ApsaraDB RDS ke MaxCompute. Anda dapat merujuk pada topik ini untuk menyinkronkan data inkremental dalam berbagai skenario.

Informasi latar belakang

Data yang akan disinkronkan diklasifikasikan menjadi dua jenis: data yang tidak berubah setelah dihasilkan dan data yang terus diperbarui setelah dihasilkan. Sebagai contoh, entri log tidak berubah setelah dihasilkan, sedangkan data pengguna mungkin terus diperbarui karena perubahan status pengguna.

Jika hasil operasi sebuah node yang dijalankan beberapa kali tetap sama, Anda dapat menjadwalkan ulang node tersebut untuk dijalankan kembali. Jika terjadi kesalahan pada node, Anda dapat menghapus data kotor. Prinsip ini disebut idempotensi. Setiap kali Anda menulis data, data tersebut ditulis ke tabel atau partisi terpisah atau menimpa data historis di tabel atau partisi yang ada.

Dalam topik ini, tanggal penjadwalan node sinkronisasi data diatur pada 14 November 2016, dan data historis disinkronkan ke partisi ds=20161113 pada hari itu. Dalam skenario sinkronisasi inkremental, penjadwalan otomatis dikonfigurasi untuk menyinkronkan data inkremental ke partisi ds=20161114 pada pagi hari tanggal 15 November 2016. Bidang optime menunjukkan waktu ketika catatan data dimodifikasi. Bidang ini digunakan untuk menentukan apakah catatan data adalah data inkremental.

Catatan penggunaan

  • Sinkronisasi inkremental tidak didukung untuk jenis sumber data tertentu, seperti sumber data HBase dan OTSStream. Anda dapat merujuk pada dokumentasi Plugin Reader untuk memeriksa apakah sinkronisasi inkremental didukung.

  • Parameter yang harus dikonfigurasikan bervariasi berdasarkan Plugin Reader yang digunakan untuk menyinkronkan data inkremental. Untuk informasi lebih lanjut, lihat Jenis Sumber Data yang Didukung, Plugin Reader, dan Plugin Writer.

  • Untuk informasi tentang cara mengonfigurasi node sinkronisasi batch untuk menyinkronkan data inkremental, lihat Konfigurasikan Node Sinkronisasi Batch untuk Menyinkronkan Data Inkremental.

  • Menyinkronkan data sumber ke tabel eksternal MaxCompute tidak didukung.

Sinkronkan data inkremental dari data yang tidak berubah setelah dihasilkan

Untuk data yang tidak berubah setelah dihasilkan, Anda dapat mempartisi tabel berdasarkan pola pembuatan data. Dalam sebagian besar kasus, Anda dapat mempartisi tabel berdasarkan tanggal, seperti satu partisi per hari.

  1. Eksekusi pernyataan berikut untuk menyiapkan data di database ApsaraDB RDS:

    drop table if exists oplog;
    create table if not exists oplog(
    optime DATETIME,
    uname varchar(50),
    action varchar(50),
    status varchar(10)
     );
    Insert into oplog values(str_to_date('2016-11-11','%Y-%m-%d'),'LiLei','SELECT','SUCCESS');
    Insert into oplog values(str_to_date('2016-11-12','%Y-%m-%d'),'HanMM','DESC','SUCCESS');

    Entri data di atas digunakan sebagai data historis. Anda harus terlebih dahulu menyinkronkan semua data historis ke partisi ds=20161113.

  2. Di panel Scheduled Workflow halaman DataStudio, perluas alur kerja yang telah dibuat, klik kanan Table di bawah MaxCompute, lalu pilih Create Table.

  3. Di kotak dialog Create Table, atur parameter Name menjadi ods_oplog dan klik Create.

  4. Pada tab konfigurasi tabel ods_oplog, klik DDL. Di kotak dialog DDL, masukkan pernyataan berikut untuk membuat tabel MaxCompute:

    -- Buat tabel MaxCompute dan partisi tabel berdasarkan hari.
    create table if not exists ods_oplog(
     optime datetime,
     uname string,
     action string,
     status string
    ) partitioned by (ds string);
  5. Konfigurasikan node sinkronisasi data untuk menyinkronkan data historis. Untuk informasi lebih lanjut, lihat Konfigurasikan Tugas Sinkronisasi Batch Menggunakan Antarmuka Tanpa Kode.

    Setelah pengujian pada node sinkronisasi data selesai, klik Properties di panel navigasi sisi kanan tab konfigurasi node. Pada tab Properti, pilih Skip Execution untuk parameter Recurrence dan komit atau deploy ulang node untuk mencegah node dijadwalkan secara otomatis.

  6. Eksekusi pernyataan berikut untuk memasukkan data ke tabel sumber di database ApsaraDB RDS sebagai data inkremental:

    insert into oplog values(CURRENT_DATE,'Jim','Update','SUCCESS');
    insert into oplog values(CURRENT_DATE,'Kate','Delete','Failed'); 
    insert into oplog values(CURRENT_DATE,'Lily','Drop','Failed');
  7. Konfigurasikan node sinkronisasi data untuk menyinkronkan data inkremental.

    Saat mengonfigurasi source untuk node, masukkan date_format(optime,'%Y%m%d')=${bdp.system.bizdate} di bidang Filter. Saat mengonfigurasi destination untuk node, masukkan ${bdp.system.bizdate} di bidang Partition Key Column.

    Catatan

    Setelah Anda menentukan kondisi filter, Anda dapat menanyakan data yang dimasukkan ke tabel sumber pada 14 November 2016 dan menyinkronkan data ke partisi data inkremental tabel tujuan pada pagi hari 15 November 2016.

  8. Lihat hasil sinkronisasi inkremental.

    Klik tab Properties di panel navigasi sisi kanan tab konfigurasi node. Pada tab Properti, atur parameter Siklus Penjadwalan ke Hari. Setelah Anda commit atau deploy node sinkronisasi data yang digunakan untuk menyinkronkan data inkremental, node tersebut akan dijadwalkan secara otomatis untuk dijalankan pada hari berikutnya. Setelah node berhasil dijalankan, Anda dapat melihat data di tabel MaxCompute tujuan.

Sinkronkan data inkremental dari data yang terus diperbarui setelah dihasilkan

Gudang data memiliki karakteristik variabel waktu. Oleh karena itu, kami sarankan Anda menyinkronkan semua data di tabel tempat perubahan dihasilkan, seperti tabel pengguna dan pesanan, setiap hari. Dengan cara ini, data lengkap disimpan setiap hari, dan Anda dapat mengambil data historis dan saat ini.

Dalam beberapa skenario khusus, Anda mungkin perlu menyinkronkan hanya data inkremental setiap hari. MaxCompute tidak mengizinkan Anda mengubah data dengan menggunakan pernyataan UPDATE. Oleh karena itu, Anda harus menggunakan metode lain untuk menyinkronkan data inkremental. Bagian ini menjelaskan cara menyinkronkan data lengkap setiap hari dan cara menyinkronkan data lengkap sekali lalu data inkremental setiap hari.

  1. Eksekusi pernyataan berikut untuk menyiapkan data:

    drop table if exists user ;
    create table if not exists user(
        uid int,
        uname varchar(50),
        deptno int,
        gender VARCHAR(1),
        optime DATETIME
        );
    -- Masukkan data historis.
    insert into user values (1,'LiLei',100,'M',str_to_date('2016-11-13','%Y-%m-%d'));
    insert into user values (2,'HanMM',null,'F',str_to_date('2016-11-13','%Y-%m-%d'));
    insert into user values (3,'Jim',102,'M',str_to_date('2016-11-12','%Y-%m-%d'));
    insert into user values (4,'Kate',103,'F',str_to_date('2016-11-12','%Y-%m-%d'));
    insert into user values (5,'Lily',104,'F',str_to_date('2016-11-11','%Y-%m-%d'));
    -- Masukkan data inkremental.
    update user set deptno=101,optime=CURRENT_TIME  where uid = 2; -- Ubah nilai null menjadi nilai non-null.
    update user set deptno=104,optime=CURRENT_TIME  where uid = 3; -- Ubah nilai non-null menjadi nilai non-null lainnya.
    update user set deptno=null,optime=CURRENT_TIME  where uid = 4; -- Ubah nilai non-null menjadi nilai null.
    delete from user where uid = 5;
    insert into user(uid,uname,deptno,gender,optime) values (6,'Lucy',105,'F',CURRENT_TIME);
  2. Sinkronkan data.

    • Sinkronkan data lengkap setiap hari.

      1. Eksekusi pernyataan berikut untuk membuat tabel MaxCompute. Untuk informasi lebih lanjut, lihat Buat Tabel MaxCompute.

        -- Sinkronkan data lengkap.
        create table ods_user_full(
            uid bigint,
            uname string,
            deptno bigint,
            gender string,
            optime DATETIME 
        ) partitioned by (ds string);
      2. Konfigurasikan node sinkronisasi data untuk menyinkronkan data lengkap.

        Catatan

        Atur parameter Siklus Penjadwalan ke Hari karena data lengkap harus disinkronkan setiap hari.

      3. Jalankan node sinkronisasi data dan lihat data di tabel MaxCompute tujuan setelah sinkronisasi selesai.

        Saat sinkronisasi penuh dilakukan setiap hari, data inkremental juga disinkronkan setiap hari. Anda dapat melihat hasil data di tabel setelah node dijadwalkan secara otomatis untuk dijalankan pada hari berikutnya.

    • Sinkronkan data inkremental setiap hari.

      Kami sarankan Anda tidak menggunakan mode sinkronisasi ini kecuali dalam skenario di mana pernyataan DELETE tidak didukung dan Anda gagal mengeksekusi pernyataan SQL terkait untuk melihat data yang dihapus. Dalam sebagian besar kasus, data dapat dihapus menggunakan pernyataan UPDATE daripada pernyataan DELETE. Dalam skenario di mana metode ini tidak dapat diterapkan, mode sinkronisasi data ini dapat menyebabkan ketidaksesuaian data. Selain itu, jika Anda menggunakan mode sinkronisasi data ini, Anda harus menggabungkan data baru dan historis setelah sinkronisasi data.

      Siapkan data

      Buat dua tabel. Satu tabel digunakan untuk menyimpan semua data terbaru, dan tabel lainnya digunakan untuk menyimpan data inkremental.

      -- Buat tabel hasil.
      create table dw_user_inc(
          uid bigint,
          uname string,
          deptno bigint,
          gender string,
          optime DATETIME 
      );
      -- Buat tabel data inkremental.
      create table ods_user_inc(
          uid bigint,
          uname string,
          deptno bigint,
          gender string,
          optime DATETIME 
      )
      1. Konfigurasikan node sinkronisasi data untuk menulis data lengkap ke tabel hasil.

        Catatan

        Anda hanya perlu menjalankan node sekali. Setelah Anda menjalankan node, klik tab Properties di panel navigasi sisi kanan tab konfigurasi node, dan pilih Skip Execution untuk parameter Recurrence di tab Properti.

      2. Konfigurasikan node sinkronisasi data untuk menulis data inkremental ke tabel data inkremental. Untuk menyaring data, tentukan kondisi filter date_format(optime,'%Y%m%d')=${bdp.system.bizdate}.

      3. Eksekusi pernyataan berikut untuk menggabungkan data:

        insert overwrite table dw_user_inc 
        select 
        -- Semua operasi SELECT terdaftar. Jika tabel data inkremental berisi data, data di sumber berubah. Dalam hal ini, data di tabel data inkremental adalah data terbaru.
        case when b.uid is not null then b.uid else a.uid end as uid,
        case when b.uid is not null then b.uname else a.uname end as uname,
        case when b.uid is not null then b.deptno else a.deptno end as deptno,
        case when b.uid is not null then b.gender else a.gender end as gender,
        case when b.uid is not null then b.optime else a.optime end as optime
        from 
        dw_user_inc a 
        full outer join ods_user_inc b
        on a.uid  = b.uid ;

        Lihat hasil penggabungan. Hasilnya menunjukkan bahwa entri data yang dihapus tidak disinkronkan.

    Sinkronisasi inkremental harian memberikan manfaat bahwa hanya sejumlah kecil data yang perlu disinkronkan setiap hari. Namun, ketidaksesuaian data mungkin terjadi, yang memerlukan beban komputasi tambahan untuk menggabungkan data.

    Kami sarankan Anda menyinkronkan data yang terus berubah dalam mode lengkap setiap hari. Selain itu, Anda dapat menentukan masa hidup untuk data historis. Dengan cara ini, data historis dapat dihapus secara otomatis setelah disimpan selama periode waktu tertentu.