DataWorks Data Integration menyediakan tugas sinkronisasi real-time tabel tunggal untuk memungkinkan replikasi dan transfer data berlatensi rendah serta throughput tinggi antar berbagai sumber data. Fitur ini menggunakan Real-time Compute Engine canggih untuk menangkap perubahan data real-time (insert, update, dan delete) dari sumber dan menerapkannya dengan cepat ke tujuan. Topik ini menggunakan contoh sinkronisasi data dari Kafka ke MaxCompute untuk menunjukkan cara mengonfigurasi tugas sinkronisasi real-time tabel tunggal.
Prasyarat
Persiapan sumber data
Anda telah membuat sumber data dan tujuan data. Untuk informasi selengkapnya, lihat Manajemen Sumber Data.
Pastikan sumber data mendukung sinkronisasi real-time. Untuk informasi selengkapnya, lihat Sumber data yang didukung dan solusi sinkronisasi.
Beberapa sumber data memerlukan logging yang diaktifkan. Metodenya bervariasi tergantung pada sumber datanya. Untuk detailnya, lihat panduan konfigurasi masing-masing sumber di Daftar sumber data.
Resource Group: Anda telah membeli dan mengonfigurasi Serverless Resource Group.
Konektivitas Jaringan: Anda telah menetapkan konektivitas jaringan antara resource group dan sumber data.
Langkah 1: Buat tugas sinkronisasi
Masuk ke Konsol DataWorks. Di bilah navigasi atas, pilih wilayah yang diinginkan. Di panel navigasi kiri, pilih . Pada halaman yang muncul, pilih ruang kerja yang diinginkan dari daftar drop-down lalu klik Go to Data Integration.
Di panel navigasi kiri, klik Sync Tasks. Pada halaman yang muncul, klik Create Sync Task dan konfigurasikan tugas tersebut. Topik ini menggunakan contoh sinkronisasi real-time data dari Kafka ke MaxCompute:
Source:
Kafka.Destination:
MaxCompute.Synchronization Type:
Single-table Real-time.Synchronization Steps:
Schema Migration: Secara otomatis membuat objek database (seperti tabel, field, dan tipe data) di tujuan yang sesuai dengan sumber. Langkah ini tidak mencakup data.
Incremental Synchronization (Opsional): Setelah sinkronisasi penuh selesai, langkah ini terus-menerus menangkap perubahan data (insert, update, dan delete) dari sumber dan menyinkronkannya ke tujuan.
Jika sumbernya adalah Hologres, Full Synchronization juga didukung. Tugas tersebut pertama-tama menyinkronkan semua data yang ada ke tabel tujuan, lalu secara otomatis memulai Incremental Synchronization.
Untuk informasi lebih lanjut tentang sumber data yang didukung dan solusi sinkronisasi, lihat Sumber data yang didukung dan solusi sinkronisasi.
Langkah 2: Konfigurasikan sumber data dan resource waktu proses
Untuk Source Data Source, pilih sumber data
KafkaAnda. Untuk Destination Data Source, pilih sumber dataMaxComputeAnda.Di bagian Runtime Resources, pilih Resource Group untuk tugas sinkronisasi dan alokasikan CUs untuk tugas tersebut. Anda dapat mengatur CUs secara terpisah untuk Full Synchronization dan Incremental Synchronization guna mengontrol resource secara tepat dan mencegah pemborosan. Jika tugas sinkronisasi Anda mengalami error out-of-memory (OOM) karena resource tidak mencukupi, tingkatkan alokasi CU untuk tugas tersebut.
Pastikan kedua sumber data (sumber dan tujuan) lolos pemeriksaan Network Connectivity.
Langkah 3: Konfigurasikan solusi sinkronisasi
1. Konfigurasikan sumber
Di tab Configuration, pilih topik Kafka yang ingin Anda sinkronkan.
Gunakan pengaturan default atau ubah sesuai kebutuhan. Untuk informasi lebih lanjut tentang parameter tersebut, lihat dokumentasi resmi Kafka.
Di pojok kanan atas, klik Data Sampling.
Di kotak dialog yang muncul, tentukan Start time dan Sampled Data Records, lalu klik Start Collection. Ini akan mengambil sampel data dari topik Kafka yang ditentukan. Anda dapat melihat pratinjau data dalam topik tersebut, yang menjadi masukan untuk pratinjau data dan konfigurasi visual di node pemrosesan data berikutnya.
Di tab Configure Output Field, pilih field yang ingin Anda sinkronkan.
Kafka menyediakan enam field default.
Field name
Description
__key__
The key of the Kafka record.
__value__
The value of the Kafka record.
__partition__
The partition number where the Kafka record is stored. Partition numbers are integers that start from 0.
__headers__
The headers of the Kafka record.
__offset__
The offset of the Kafka record in its partition. Offsets are integers that start from 0.
__timestamp__
The 13-digit integer millisecond timestamp of the Kafka record.
Anda juga dapat melakukan transformasi field lebih lanjut di langkah pemrosesan data berikutnya.
2. Proses data
Aktifkan Data Processing. Tersedia lima metode pemrosesan: Data Masking, String Replace, Data Filtering, JSON Parsing, dan Edit and assign fields. Susun metode-metode ini sesuai urutan eksekusi yang Anda inginkan.

Setelah mengonfigurasi langkah pemrosesan data, Anda dapat mengklik Preview Data Output di pojok kanan atas:
Tabel di bawah data masukan menampilkan hasil dari langkah Data Sampling sebelumnya. Klik Re-obtain Output of Ancestor Node untuk merefresh hasilnya.
Jika tidak ada output dari langkah hulu, Anda dapat menggunakan Manually Construct Data untuk mensimulasikan output hulu.
Klik Preview untuk melihat data keluaran dari langkah hulu setelah diproses oleh komponen pemrosesan data.

Pratinjau output data dan fitur pemrosesan data bergantung pada hasil Data Sampling dari sumber Kafka. Sebelum mengonfigurasi pemrosesan data, lakukan terlebih dahulu pengambilan sampel data di halaman konfigurasi sumber Kafka.
3. Konfigurasikan tujuan
Di bagian Destination, pilih resource group Tunnel. Secara default, Public Transfer Resource dipilih, yang menggunakan kuota gratis MaxCompute.
Pilih apakah akan menulis data ke tabel baru atau tabel yang sudah ada.
Jika Anda memilih membuat tabel baru, pilih Create dari daftar drop-down. Secara default, tabel dengan skema yang sama dengan sumber data akan dibuat. Anda dapat mengubah nama dan skema tabel tujuan secara manual.
Jika Anda memilih menggunakan tabel yang sudah ada, pilih tabel tujuan dari daftar drop-down.
(Opsional) Edit Table Schema.
Klik ikon edit di samping nama tabel untuk mengedit Table Schema. Anda dapat mengklik Re-generate Table Schema Based on Output Column of Ancestor Node untuk secara otomatis menghasilkan Table Schema berdasarkan kolom output dari node hulu. Anda dapat memilih satu kolom dalam skema yang dihasilkan secara otomatis sebagai primary key.
4. Konfigurasikan pemetaan field
Setelah memilih sumber dan tujuan, Anda harus menentukan pemetaan antara field sumber dan kolom tujuan. Tugas tersebut menulis data dari field sumber ke kolom tujuan yang sesuai berdasarkan pemetaan field yang dikonfigurasi.
Sistem secara otomatis menghasilkan pemetaan antara field hulu dan kolom tabel tujuan berdasarkan prinsip Same Name Mapping. Anda dapat menyesuaikan pemetaan tersebut sesuai kebutuhan. Anda dapat memetakan satu field hulu ke beberapa kolom tabel tujuan, tetapi tidak dapat memetakan beberapa field hulu ke satu kolom tabel tujuan. Jika suatu field hulu tidak dipetakan ke kolom tabel tujuan, datanya tidak akan ditulis ke tabel tujuan.
Anda dapat mengonfigurasi JSON Parsing khusus untuk field Kafka. Gunakan komponen pemrosesan data untuk mengambil konten field value guna konfigurasi field yang lebih detail.

(Opsional) Konfigurasikan partisi.
Automatic Time-based Partitioning mempartisi data berdasarkan field business time (dalam contoh ini,
_timestamp). Partisi tingkat pertama berdasarkan tahun, tingkat kedua berdasarkan bulan, dan seterusnya.Dynamic Partitioning by Field Content memetakan field dari tabel sumber ke field partisi di tabel MaxCompute tujuan. Hal ini memastikan bahwa baris yang berisi nilai field tertentu ditulis ke partisi yang sesuai di tabel MaxCompute.
Langkah 4: Konfigurasikan pengaturan lanjutan
Tugas sinkronisasi menyediakan parameter lanjutan untuk kontrol detail halus. Dalam kebanyakan kasus, Anda tidak perlu mengubah nilai default. Jika diperlukan, Anda dapat:
Klik Advanced Parameters di pojok kanan atas untuk membuka halaman konfigurasi Advanced Parameters.
CatatanParameter lanjutan berada di tab di sisi kanan halaman konfigurasi tugas.
Anda dapat mengatur parameter secara terpisah untuk reader dan writer tugas sinkronisasi. Untuk menyesuaikan Runtime Configuration, nonaktifkan Auto-configure Runtime Settings.
Ubah nilai parameter seperti yang dijelaskan dalam tooltip. Untuk deskripsi parameter, lihat penjelasan di samping nama parameter. Untuk rekomendasi konfigurasi beberapa parameter, lihat Parameter lanjutan untuk sinkronisasi real-time.
Ubah parameter ini hanya jika Anda benar-benar memahami tujuan dan dampak potensialnya. Pengaturan yang salah dapat menyebabkan error tak terduga atau masalah kualitas data.
Langkah 5: Jalankan uji coba
Setelah mengonfigurasi tugas, Anda dapat mengklik Perform Simulated Running di pojok kiri bawah untuk men-debug tugas tersebut. Uji Coba mensimulasikan alur tugas lengkap pada sejumlah kecil data sampel dan menampilkan pratinjau hasil sebagaimana akan ditulis ke tabel tujuan. Jika terdapat kesalahan konfigurasi, exception, atau Dirty Data selama Uji Coba, sistem memberikan umpan balik secara real-time. Hal ini membantu Anda memverifikasi dengan cepat bahwa tugas dikonfigurasi dengan benar dan menghasilkan output yang diharapkan.
Di kotak dialog yang muncul, atur parameter sampling: Start time dan Sampled Data Records.
Klik Start Collection untuk mendapatkan data sampel.
Klik Preview Result untuk mensimulasikan eksekusi tugas dan melihat hasil output.
Output Uji Coba hanya untuk pratinjau dan tidak ditulis ke sumber data tujuan. Hal ini tidak memengaruhi data produksi Anda.
Langkah 6: Publikasikan dan jalankan tugas
Setelah menyelesaikan semua konfigurasi, klik Complete Configuration di bagian bawah halaman.
Tugas Data Integration harus dipublikasikan ke lingkungan produksi agar dapat dijalankan. Setelah membuat atau mengedit tugas, klik Deploy untuk menerapkan perubahan Anda. Saat mempublikasikan, Anda dapat memilih an option to start the task immediately. Jika Anda tidak memilih opsi ini, Anda harus membuka halaman setelah publikasi dan menjalankan tugas secara manual.
Di Tasks, klik Name/ID tugas untuk melihat proses eksekusi secara detail.
Langkah 7: Konfigurasikan aturan notifikasi
Setelah tugas dipublikasikan dan berjalan, Anda dapat mengonfigurasi Alert Rules untuk segera diberi tahu jika terjadi exception. Hal ini membantu menjaga stabilitas lingkungan produksi dan memastikan data Anda selalu mutakhir. Di halaman Sync Tasks, klik di kolom Actions untuk tugas yang dituju.
1. Tambahkan notifikasi

(1) Klik Create Rule untuk mengonfigurasi Alert Rule.
Anda dapat mengatur Alert Reason untuk memantau metrik seperti Business delay, failover, Task status, DDL Notification, dan Task Resource Utilization. Anda dapat mengatur level notifikasi CRITICAL atau WARNING berdasarkan ambang batas yang ditentukan.
Setelah mengatur metode notifikasi, Anda dapat menggunakan Configure Advanced Parameters untuk mengontrol interval pengiriman pesan notifikasi. Hal ini mencegah pengiriman terlalu banyak pesan sekaligus yang dapat menyebabkan pemborosan dan penumpukan pesan.
Jika Anda memilih Business delay, Task status, atau Task Resource Utilization sebagai pemicu notifikasi, Anda juga dapat mengaktifkan notifikasi pemulihan untuk memberi tahu penerima saat tugas kembali normal.
(2) Mengelola Aturan Peringatan.
Untuk Alert Rules yang sudah ada, Anda dapat menggunakan sakelar notifikasi untuk mengaktifkan atau menonaktifkannya. Anda juga dapat memberi tahu kontak berbeda berdasarkan level notifikasi.
2. Lihat notifikasi
Klik untuk tugas tersebut guna melihat riwayat event notifikasi.
Langkah selanjutnya
Setelah tugas dimulai, Anda dapat mengklik nama tugas untuk melihat detail eksekusinya dan melakukan Operasi dan Pemeliharaan (O&M) serta Tuning Tugas.
FAQ
Untuk masalah umum terkait tugas sinkronisasi real-time, lihat FAQ sinkronisasi real-time.
Contoh lainnya
Sinkronisasi real-time tabel tunggal dari Kafka ke ApsaraDB for OceanBase
Ingesti real-time tabel tunggal dari LogHub (SLS) ke Data Lake Formation
Sinkronisasi real-time tabel tunggal dari Hologres ke Doris
Sinkronisasi real-time tabel tunggal dari Hologres ke Hologres
Sinkronisasi real-time tabel tunggal dari Kafka ke Hologres
Sinkronisasi real-time tabel tunggal dari LogHub (SLS) ke Hologres
Sinkronisasi real-time tabel tunggal dari Kafka ke Hologres
Sinkronisasi real-time tabel tunggal dari Hologres ke Kafka
Sinkronisasi real-time tabel tunggal dari LogHub (SLS) ke MaxCompute
Sinkronisasi real-time tabel tunggal dari Kafka ke data lake OSS
Sinkronisasi real-time tabel tunggal dari Kafka ke StarRocks
Sinkronisasi real-time tabel tunggal dari Oracle ke Tablestore