Topik ini menjelaskan cara mengonfigurasi jadwal berulang untuk menyinkronkan data inkremental dari Kafka ke MaxCompute. Contoh ini mencakup sinkronisasi data per menit, per jam, atau per hari ke tabel partisi harian atau per jam di MaxCompute.
Perhatian
Versi Kafka Anda harus 0.10.2 atau lebih baru dan 2.2.x atau lebih lama. Kafka harus mengaktifkan timestamp pada record, dan record tersebut harus berisi timestamp bisnis yang valid.
Setelah sinkronisasi data inkremental dimulai, record dengan timestamp yang lebih awal dari atau sama dengan waktu mulai mungkin tetap ditulis ke topik Kafka. Record tersebut berpotensi tidak terbaca. Jika penulisan data ke topik Kafka tertunda atau timestamp tidak berurutan, waspadai risiko kehilangan data pada tugas sinkronisasi offline.
Untuk kebijakan akhir sinkronisasi di sisi Kafka, Anda hanya dapat memilih No new data for 1 minute jika kondisi berikut terpenuhi. Jika tidak, kehilangan data dapat terjadi.
Beberapa atau semua partisi dalam topik Kafka tidak menerima data baru dalam waktu lama, misalnya lebih dari 10 menit.
Setelah setiap instans berulang dimulai, tidak ada record dengan timestamp lebih awal dari parameter waktu akhir yang ditulis ke topik Kafka.
Prasyarat
Buat sumber data Kafka dan MaxCompute. Untuk informasi selengkapnya, lihat Data Source Configuration.
Buat koneksi jaringan antara resource group dan sumber data. Untuk informasi selengkapnya, lihat Overview of network connection solutions.
Batasan
Sinkronisasi data sumber ke tabel eksternal MaxCompute tidak didukung.
Prosedur
Topik ini menggunakan antarmuka DataStudio yang baru untuk menunjukkan cara mengonfigurasi tugas sinkronisasi offline.
Langkah 1: Buat node dan konfigurasikan tugas
Untuk langkah umum membuat dan mengonfigurasi node di Antarmuka tanpa kode, lihat panduan Codeless UI configuration. Topik ini tidak mengulangi langkah-langkah tersebut.
Langkah 2: Konfigurasikan sumber data dan tujuan
Konfigurasikan sumber data (Kafka)
Topik ini menjelaskan cara melakukan sinkronisasi offline tabel tunggal dari Kafka ke MaxCompute. Sumber datanya adalah Kafka. Poin konfigurasi utamanya sebagai berikut.
Untuk deskripsi umum item konfigurasi sumber data Kafka, lihat dokumen Kafka Reader. Berikut adalah konfigurasi referensi untuk tutorial ini.
Configuration item | Key configuration |
Topic | Pilih topik Kafka yang akan disinkronkan. Jika Anda menggunakan ruang kerja DataWorks mode standar, topik dengan nama yang sama harus ada di kluster Kafka untuk lingkungan pengembangan dan produksi. Untuk Topic, pilih topik ini. Catatan Jika:
|
Consumer Group ID | Masukkan ID unik untuk kluster Kafka sesuai kebutuhan bisnis Anda. Ini membantu statistik dan pemantauan di sisi kluster Kafka. |
Read Start Offset, Start Time, Read End Offset, End Time | Atur kedua Read Start Offset dan Read End Offset ke Specific Time. Atur Start Time dan End Time ke parameter penjadwalan Parameter ini menentukan posisi awal dan akhir sinkronisasi data. Konfigurasi ini berarti sinkronisasi dimulai dari data |
Time Zone | Anda dapat mengosongkannya atau memilih zona waktu server default wilayah tempat DataWorks berada. Catatan Jika Anda telah menghubungi dukungan teknis Alibaba Cloud untuk mengubah zona waktu penjadwalan, Anda dapat memilih zona waktu yang telah dimodifikasi di sini. |
Key Type, Value Type, Encoding | Pilih berdasarkan record aktual di topik Kafka. |
Synchronization Completion Policy | Untuk kebijakan akhir sinkronisasi, pilih No new data for 1 minute jika kondisi terpenuhi. Jika tidak, pilih Stop at the specified end position.
|
Advanced Configuration | Pertahankan pengaturan default. |
Konfigurasikan tujuan data (MaxCompute)
Topik ini menjelaskan cara melakukan sinkronisasi offline tabel tunggal dari Kafka ke MaxCompute. Tujuan datanya adalah sebuah tabel. Poin konfigurasi utamanya sebagai berikut.
Untuk parameter yang tidak dijelaskan dalam tabel berikut, Anda dapat mempertahankan pengaturan default.
Configuration item | Key configuration |
Data Source | Ini menampilkan sumber data MaxCompute yang dipilih pada langkah sebelumnya. Jika Anda menggunakan ruang kerja DataWorks mode standar, nama proyek pengembangan dan produksi akan ditampilkan. |
Table | Pilih tabel MaxCompute yang akan menjadi tujuan sinkronisasi. Jika Anda menggunakan ruang kerja DataWorks mode standar, pastikan tabel MaxCompute dengan nama dan skema yang sama ada di lingkungan pengembangan dan produksi. Anda juga dapat mengklik Generate Target Table Schema. Sistem secara otomatis membuat tabel untuk menerima data. Anda dapat menyesuaikan pernyataan pembuatan tabel secara manual. Catatan Jika kondisi berikut berlaku:
|
Partition | Jika tabel merupakan tabel partisi, Anda dapat memasukkan nilai untuk kolom kunci partisi.
|
Langkah 3: Konfigurasikan pemetaan bidang
Setelah Anda memilih sumber data dan tujuan, Anda harus menentukan pemetaan kolom antara reader dan writer. Anda dapat memilih Map Fields with Same Name, Map Fields in Same Line, Clear Mappings, or Manually Edit Mapping.
Terdapat enam bidang default di sisi Kafka.
Field Name
Description
__key__
Kunci record Kafka.
__value__
Nilai record Kafka.
__partition__
Nomor partisi tempat record Kafka berada. Nomor partisi dimulai dari 0.
__headers__
Header record Kafka.
__offset__
Offset record Kafka dalam partisinya. Offset dimulai dari 0.
__timestamp__
Timestamp milidetik integer 13 digit dari record Kafka.
Anda dapat mengonfigurasi penguraian JSON kustom untuk bidang di sisi Kafka. Gunakan sintaks
. (untuk mengambil subfield)dan[] (untuk mengambil elemen array)untuk mendapatkan konten dari bidang nilai record Kafka berformat JSON.PentingJika nama bidang JSON mengandung karakter ".", Anda tidak dapat mengambil nilai bidang tersebut dengan mendefinisikan bidang tersebut. Hal ini karena menyebabkan ambiguitas dalam sintaks definisi bidang.
Berikut adalah contoh nilai data record berformat JSON di Kafka.
{ "a": { "a1": "hello" }, "b": "world", "c":[ "xxxxxxx", "yyyyyyy" ], "d":[ { "AA":"this", "BB":"is_data" }, { "AA":"that", "BB":"is_also_data" } ], "a.b": "unreachable" }Untuk menyinkronkan data a1,
"hello", Anda dapat menambahkan bidanga.a1di sisi Kafka.Untuk menyinkronkan data b,
"world", Anda dapat menambahkan bidangbdi sisi Kafka.Untuk menyinkronkan data c,
"yyyyyyy", Anda dapat menambahkan bidangc[1]di sisi Kafka.Untuk menyinkronkan data AA,
"this", Anda dapat menambahkan bidangd[0].AAdi sisi Kafka.Jika bidang di sisi Kafka didefinisikan sebagai
a.b, Anda tidak dapat menyinkronkan data"unreachable".
Bidang tabel sumber atau tujuan dapat dikecualikan dari pemetaan. Instans sinkronisasi tidak membaca bidang sumber yang tidak dipetakan. NULL ditulis ke bidang tujuan yang tidak dipetakan.
Anda tidak dapat memetakan satu bidang sumber ke beberapa bidang tujuan. Bidang tujuan juga tidak dapat dipetakan dari beberapa bidang sumber.
Langkah 4: Konfigurasikan parameter lanjutan
Klik Advanced Configuration di sisi kanan tugas untuk mengatur parameter seperti Maximum Expected Concurrency dan Policy for Dirty Data Records. Untuk tutorial ini, atur Policy for Dirty Data Records ke Ignore Dirty Data Records dan biarkan parameter lain pada nilai default-nya. Untuk informasi selengkapnya, lihat Codeless UI Configuration.
Langkah 5: Konfigurasikan dan jalankan pengujian
Di sisi kanan halaman pengeditan node sinkronisasi offline, klik Debugging Configurations. Atur Script Parameters dan Resource Group untuk pengujian. Lalu, klik Run di bilah alat atas untuk menguji apakah tautan sinkronisasi berjalan sukses.
Anda dapat mengklik ikon
di panel navigasi kiri lalu klik ikon
di sebelah kanan Personal Directory untuk membuat file dengan ekstensi .sql. Anda kemudian dapat menjalankan kueri SQL berikut untuk memeriksa apakah data di tabel tujuan sesuai harapan.CatatanUntuk mengkueri data dengan cara ini, Anda harus attach proyek MaxCompute tujuan ke DataWorks sebagai resource komputasi.
Di halaman pengeditan file
.sql, klik Debugging Configurations di sisi kanan. Tentukan Type sumber data, Computing Resource, dan Resource Group. Lalu, klik Run di bilah alat atas.
SELECT * FROM <MaxCompute_destination_table_name> WHERE pt=<specified_partition> LIMIT 20;
Langkah 6: Konfigurasikan penjadwalan dan publikasikan tugas
Klik Scheduling di sisi kanan tugas sinkronisasi offline. Atur parameter konfigurasi penjadwalan untuk eksekusi berulang. Lalu, klik Publish di bilah alat atas untuk membuka panel penerbitan. Ikuti petunjuk di layar untuk publish tugas.
Saat Anda mengonfigurasi sumber data dan tujuan, Anda menggunakan tiga parameter penjadwalan: ${startTime}, ${endTime}, dan ${partition}. Dalam konfigurasi penjadwalan, Anda harus menentukan kebijakan penggantian untuk parameter-parameter ini berdasarkan kebutuhan sinkronisasi Anda. Berikut adalah contoh konfigurasi untuk beberapa skenario khas.
Typical scenario | Recommended configuration | Scenario description |
Tugas sinkronisasi dijadwalkan setiap 5 menit |
| Jika tugas sinkronisasi dijadwalkan mulai pukul 10:00 pada 2022-11-22:
|
Tugas sinkronisasi dijadwalkan setiap jam |
Catatan
| Jika tugas sinkronisasi dijadwalkan mulai pukul 10:05 pada 2022-11-22:
|
Tugas sinkronisasi dijadwalkan setiap hari |
| Jika tugas sinkronisasi dijadwalkan mulai pukul 00:05 pada 2022-11-22:
|
Tugas sinkronisasi dijadwalkan setiap minggu |
| Jika tugas sinkronisasi dijadwalkan mulai pukul 00:05 pada 2022-11-22:
|
Tugas sinkronisasi dijadwalkan setiap bulan |
| Jika tugas sinkronisasi dijadwalkan mulai pukul 00:05 pada 2022-11-22:
|
Atur siklus penjadwalan berdasarkan interval yang diinginkan.
Typical scenario | Recommended configuration | Scenario description |
Tugas sinkronisasi dijadwalkan setiap 5 menit |
| None |
Tugas sinkronisasi dijadwalkan setiap jam |
| Atur waktu mulai sedikit lebih lambat dari 00:00, misalnya 00:15. Praktik ini memastikan bahwa semua data untuk rentang waktu tersebut telah ditulis ke topik Kafka sebelum instans tugas sinkronisasi dimulai. |
Tugas sinkronisasi dijadwalkan setiap hari |
| Atur waktu terjadwal sedikit lebih lambat dari 00:00, misalnya 00:15. Praktik ini memastikan bahwa semua data untuk rentang waktu tersebut telah ditulis ke topik Kafka sebelum instans tugas sinkronisasi dimulai. |
Tugas sinkronisasi dijadwalkan setiap minggu |
| Atur waktu terjadwal sedikit lebih lambat dari 00:00, misalnya 00:15. Praktik ini memastikan bahwa semua data untuk rentang waktu tersebut telah ditulis ke topik Kafka sebelum instans tugas sinkronisasi dimulai. |
Tugas sinkronisasi dijadwalkan setiap bulan |
| Atur waktu terjadwal sedikit lebih lambat dari 00:00, misalnya 00:15. Praktik ini memastikan bahwa semua data untuk rentang waktu tersebut telah ditulis ke topik Kafka sebelum instans tugas sinkronisasi dimulai. |
Jika record dengan timestamp yang lebih awal dari atau sama dengan waktu mulai ditulis ke topik Kafka setelah instans dimulai, record tersebut berpotensi tidak terbaca. Jika penulisan data ke topik Kafka tertunda atau timestamp tidak berurutan, waspadai risiko kehilangan data pada tugas sinkronisasi offline.