Tutorial ini menjelaskan cara menyiapkan tugas sinkronisasi offline berulang yang membaca catatan berbasis jendela waktu dari topik Kafka dan menuliskannya ke tabel partisi MaxCompute. Tugas tersebut dapat dijadwalkan untuk dijalankan per menit, jam, hari, minggu, atau bulan.
Prasyarat
Sebelum memulai, pastikan Anda telah memiliki:
-
Sumber data Kafka dan MaxCompute telah dikonfigurasi. Lihat Konfigurasi sumber data.
-
Konektivitas jaringan antara kelompok sumber daya dan kedua sumber data tersebut. Lihat Ikhtisar solusi koneksi jaringan.
Batasan
-
Sinkronisasi data sumber ke tabel eksternal MaxCompute tidak didukung.
-
Versi Kafka harus 0.10.2 atau lebih baru, dan 2.2.x atau lebih lama.
-
Kafka harus mengaktifkan timestamp pada catatan, dan setiap catatan harus menyertakan timestamp bisnis yang valid.
Risiko kehilangan data
Catatan dengan timestamp lebih awal dari atau sama dengan waktu mulai mungkin tiba di topik Kafka setelah instans berulang dimulai. Catatan tersebut mungkin tidak terbaca. Jika penulisan ke topik Kafka tertunda atau timestamp tidak berurutan, kehilangan data dapat terjadi pada tugas sinkronisasi offline.
Konfigurasikan tugas sinkronisasi
Tutorial ini menggunakan antarmuka DataStudio yang baru.
Langkah 1: Buat node
Ikuti panduan Konfigurasi UI tanpa kode untuk membuat dan mengonfigurasi node sinkronisasi offline. Tutorial ini berfokus pada detail konfigurasi khusus untuk sinkronisasi dari Kafka ke MaxCompute.
Langkah 2: Konfigurasikan sumber data dan tujuan
Konfigurasikan sumber data (Kafka)
Tutorial ini mendemonstrasikan sinkronisasi offline satu topik dari Kafka ke MaxCompute. Untuk referensi lengkap semua opsi konfigurasi Kafka Reader, lihat dokumen Kafka Reader.
Tabel berikut mencakup item konfigurasi utama untuk tutorial ini.
| Item konfigurasi | Konfigurasi utama |
|---|---|
| Topic | Pilih topik Kafka yang akan disinkronkan. Di ruang kerja DataWorks mode standar, topik dengan nama yang sama harus ada di kluster Kafka untuk lingkungan pengembangan maupun produksi. Catatan
Jika topik tidak ada di lingkungan pengembangan, topik tersebut tidak akan muncul dalam daftar drop-down Topic. Jika tidak ada di lingkungan produksi, jadwal berulang akan gagal setelah tugas dipublikasikan karena tugas tidak dapat menemukan topik yang akan disinkronkan. |
| Consumer Group ID | Masukkan ID unik untuk kluster Kafka. ID ini digunakan untuk statistik dan pemantauan di sisi Kafka. |
| Read Start Offset dan Start Time | Atur Read Start Offset ke Specific Time, lalu atur Start Time ke ${startTime}. Ini menentukan titik awal sinkronisasi—catatan mulai dari ${startTime} akan disertakan. |
| Read End Offset dan End Time | Atur Read End Offset ke Specific Time, lalu atur End Time ke ${endTime}. Catatan hingga (namun tidak termasuk) ${endTime} akan disinkronkan. Kedua parameter ${startTime} dan ${endTime} akan diganti dengan timestamp aktual saat waktu proses berdasarkan ekspresi parameter penjadwalan yang Anda konfigurasi. |
| Time Zone | Biarkan kosong untuk menggunakan zona waktu server default untuk wilayah DataWorks. Jika Anda telah mengubah zona waktu penjadwalan melalui dukungan Alibaba Cloud, pilih zona waktu tersebut di sini. |
| Key Type, Value Type, Encoding | Pilih berdasarkan catatan aktual di topik Kafka. |
| Synchronization Completion Policy | Mengontrol kapan tugas sinkronisasi berhenti membaca. Pilih berdasarkan pola lalu lintas Kafka Anda—lihat perbandingan di bawah. |
| Advanced Configuration | Pertahankan nilai default. |
Memilih kebijakan penyelesaian sinkronisasi
Kedua opsi berperilaku berbeda dan cocok untuk skenario berbeda:
| No new data for 1 minute | Stop at the specified end position | |
|---|---|---|
| Cara kerja | Tugas berhenti ketika tidak ada catatan baru yang tiba di semua partisi selama 1 menit berturut-turut. | Tugas berhenti segera setelah mencapai offset yang sesuai dengan ${endTime}. |
| Gunakan saat | Semua kondisi berikut terpenuhi: (1) beberapa atau semua partisi rutin diam untuk periode panjang, misalnya lebih dari 10 menit, dan (2) tidak ada catatan dengan timestamp lebih awal dari ${endTime} yang akan ditulis ke topik setelah setiap instans berulang dimulai. |
Anda tidak dapat menjamin (1) atau (2) di atas. Ini adalah pilihan default yang lebih aman. |
| Risiko jika salah digunakan | Kehilangan data jika catatan yang datang terlambat masih ditulis saat terjadi jeda 1 menit. | Tidak ada—tugas berhenti secara andal pada posisi akhir yang dikonfigurasi. |
Konfigurasikan tujuan data (MaxCompute)
| Item konfigurasi | Konfigurasi utama |
|---|---|
| Data Source | Menampilkan sumber data MaxCompute yang dipilih pada langkah sebelumnya. Di ruang kerja mode standar, nama proyek pengembangan dan produksi keduanya ditampilkan. |
| Table | Pilih tabel MaxCompute tujuan. Di ruang kerja mode standar, tabel dengan nama dan skema yang sama harus ada di kedua lingkungan. Atau, klik Generate Target Table Schema agar sistem membuat tabel secara otomatis, lalu sesuaikan pernyataan CREATE TABLE sesuai kebutuhan. Catatan
Jika tabel tidak ada di lingkungan pengembangan, tabel tersebut tidak akan muncul dalam daftar drop-down. Jika tidak ada di lingkungan produksi, tugas akan gagal saat waktu proses. Jika skema berbeda antarlingkungan, pemetaan kolom dapat menghasilkan penulisan data yang salah. |
| Partition | Untuk tabel partisi, masukkan nilai kunci partisi. Gunakan nilai statis seperti ds=20220101, atau parameter penjadwalan seperti ds=${partition} yang akan diganti secara otomatis saat waktu proses. |
Langkah 3: Konfigurasikan pemetaan bidang
Setelah memilih sumber dan tujuan, petakan kolom antara pembaca Kafka dan penulis MaxCompute. Gunakan Map Fields with Same Name, Map Fields in Same Line, Clear Mappings, atau Manually Edit Mapping sesuai kebutuhan.
Bidang Kafka default
Kafka menyediakan enam bidang bawaan yang dapat Anda petakan langsung ke kolom MaxCompute.
| Nama bidang | Deskripsi |
|---|---|
__key__ |
Kunci catatan Kafka. |
__value__ |
Nilai catatan Kafka. |
__partition__ |
Nomor partisi tempat catatan berada. Dimulai dari 0. |
__headers__ |
Header catatan Kafka. |
__offset__ |
Offset catatan dalam partisinya. Dimulai dari 0. |
__timestamp__ |
Timestamp catatan sebagai bilangan bulat milidetik 13 digit. |
Penguraian bidang JSON
Untuk nilai Kafka berformat JSON, tambahkan definisi bidang kustom menggunakan . untuk mengakses subfield dan [] untuk mengakses elemen array.
Diberikan contoh nilai catatan berikut:
{
"a": {
"a1": "hello"
},
"b": "world",
"c": [
"xxxxxxx",
"yyyyyyy"
],
"d": [
{ "AA": "this", "BB": "is_data" },
{ "AA": "that", "BB": "is_also_data" }
],
"a.b": "unreachable"
}
| Definisi bidang | Nilai yang diambil | Catatan |
|---|---|---|
a.a1 |
"hello" |
Akses subfield dengan . |
b |
"world" |
Bidang tingkat atas |
c[1] |
"yyyyyyy" |
Akses elemen array dengan [] |
d[0].AA |
"this" |
Akses gabungan |
a.b |
*(tidak dapat diambil)* | Nama bidang yang mengandung . tidak dapat diurai—. diinterpretasikan sebagai pemisah subfield, sehingga menimbulkan ambiguitas. |
Aturan pemetaan
-
Bidang sumber yang tidak dipetakan tidak akan dibaca oleh instans sinkronisasi.
-
NULL ditulis ke bidang tujuan yang tidak dipetakan.
-
Satu bidang sumber tidak dapat dipetakan ke beberapa bidang tujuan.
-
Satu bidang tujuan tidak dapat dipetakan dari beberapa bidang sumber.
Langkah 4: Konfigurasikan parameter lanjutan
Klik Advanced Configuration di sisi kanan tugas. Untuk tutorial ini, atur Policy for Dirty Data Records ke Ignore Dirty Data Records dan biarkan semua parameter lain tetap pada nilai default-nya. Untuk detail parameter, lihat Konfigurasi UI tanpa kode.
Langkah 5: Uji sinkronisasi
-
Klik Run Configuration di sisi kanan halaman pengeditan. Atur Resource Group dan Script Parameters untuk uji coba, lalu klik Run di bilah alat atas.
-
Setelah uji coba selesai, verifikasi data di tabel tujuan. Di panel navigasi kiri, buat file dengan ekstensi
.sqldan jalankan kueri berikut:Catatan-
Untuk mengkueri data dengan cara ini, sambungkan proyek MaxCompute tujuan ke DataWorks sebagai resource komputasi.
-
Di halaman pengeditan file
.sql, klik Run Configuration di sisi kanan, tentukan Computing Resource dan Resource Group, lalu klik Run.
SELECT * FROM <MaxCompute_destination_table_name> WHERE pt=<specified_partition> LIMIT 20; -
Langkah 6: Konfigurasikan penjadwalan dan publikasikan
Klik Scheduling di sisi kanan tugas untuk mengonfigurasi pengaturan penjadwalan untuk eksekusi berulang, lalu klik Publish untuk menerbitkan tugas.
Tiga parameter penjadwalan yang digunakan dalam tutorial ini—${startTime}, ${endTime}, dan ${partition}—saling terkait: nilai yang disuntikkan ke ${startTime} dan ${endTime} mengontrol jendela waktu Kafka untuk setiap instans, sedangkan ${partition} menentukan partisi MaxCompute mana yang menerima data. Konfigurasikan ketiganya secara bersamaan berdasarkan siklus penjadwalan Anda.
Contoh berikut mencakup pola penjadwalan paling umum.
Ekspresi parameter penjadwalan
| Siklus penjadwalan | startTime expression |
endTime expression |
partition expression |
|---|---|---|---|
| Setiap 5 menit | $[yyyymmddhh24mi-8/24/60]00 |
$[yyyymmddhh24mi-3/24/60]00 |
$[yyyymmddhh24mi-8/24/60] |
| Setiap jam | $[yyyymmddhh24-1/24]0000 |
$[yyyymmddhh24]0000 |
$[yyyymmddhh24] |
| Setiap 2 jam | $[yyyymmddhh24-2/24]0000 |
$[yyyymmddhh24]0000 |
$[yyyymmddhh24] |
| Setiap 3 jam | $[yyyymmddhh24-3/24]0000 |
$[yyyymmddhh24]0000 |
$[yyyymmddhh24] |
| Setiap hari | $[yyyymmdd-1]000000 |
$[yyyymmdd]000000 |
$[yyyymmdd-1] |
| Setiap minggu | $[yyyymmdd-7]000000 |
$[yyyymmdd]000000 |
$[yyyymmdd-1] |
| Setiap bulan | $[add_months(yyyymmdd,-1)]000000 |
$[yyyymmdd]000000 |
$[yyyymmdd-1] |
Cara kerja ekspresi (contoh per jam)
Untuk tugas yang dijadwalkan berjalan pada pukul 10:05 tanggal 2022-11-22:
-
startTimemenjadi20221122090000— membaca catatan mulai pukul 09:00 (inklusif) -
endTimemenjadi20221122100000— berhenti pada pukul 10:00 (eksklusif) -
partitionmenjadi2022112210— menulis ke partisi tersebut di MaxCompute
Contoh 5 menit
Untuk tugas yang dijadwalkan berjalan pada pukul 10:00 tanggal 2022-11-22:
-
startTimemenjadi20221122095200— membaca catatan mulai pukul 09:52 (inklusif) -
endTimemenjadi20221122095700— berhenti pada pukul 09:57 (eksklusif) -
partitionmenjadi202211220952
endTime diatur 3 menit sebelum waktu eksekusi instans. Buffer ini memastikan semua catatan untuk jendela waktu tersebut telah ditulis ke Kafka sebelum instans mulai membaca.
Pengaturan siklus penjadwalan
| Siklus penjadwalan | Siklus | Waktu mulai | Interval | Waktu akhir | Hari/tanggal |
|---|---|---|---|---|---|
| Setiap 5 menit | Menit | 00:00 | 5 menit | 23:59 | — |
| Setiap jam | Jam | 00:15 | 1 jam | 23:59 | — |
| Setiap hari | Hari | 00:15 | — | — | — |
| Setiap minggu | Minggu | 00:15 | — | — | Senin |
| Setiap bulan | Bulan | 00:15 | — | — | Tanggal 1 setiap bulan |
Untuk jadwal per jam, harian, mingguan, dan bulanan, atur waktu mulai sedikit setelah tengah malam (misalnya, 00:15, bukan 00:00). Hal ini memberi Kafka waktu untuk menyelesaikan penulisan catatan sebelum instans sinkronisasi dimulai, sehingga mengurangi risiko kehilangan data.
Jika catatan dengan timestamp lebih awal dari atau sama dengan ${startTime} ditulis ke topik Kafka setelah instans berulang telah dimulai, catatan tersebut mungkin tidak terbaca. Penulisan yang tertunda atau timestamp yang tidak berurutan meningkatkan risiko kehilangan data.
Langkah berikutnya
-
Tinjau referensi Kafka Reader untuk daftar lengkap opsi konfigurasi.
-
Lihat Konfigurasi penjadwalan untuk opsi penjadwalan lanjutan seperti dependensi dan kebijakan menjalankan ulang.