全部产品
Search
文档中心

DataWorks:Sinkronisasi offline tabel tunggal dari Kafka ke MaxCompute

更新时间:Nov 28, 2025

Topik ini menjelaskan cara mengonfigurasi jadwal berulang untuk menyinkronkan data inkremental dari Kafka ke MaxCompute. Contoh ini mencakup sinkronisasi data per menit, per jam, atau per hari ke tabel partisi harian atau per jam di MaxCompute.

Perhatian

  • Versi Kafka Anda harus 0.10.2 atau lebih baru dan 2.2.x atau lebih lama. Kafka harus mengaktifkan timestamp pada record, dan record tersebut harus berisi timestamp bisnis yang valid.

  • Setelah sinkronisasi data inkremental dimulai, record dengan timestamp yang lebih awal dari atau sama dengan waktu mulai mungkin tetap ditulis ke topik Kafka. Record tersebut berpotensi tidak terbaca. Jika penulisan data ke topik Kafka tertunda atau timestamp tidak berurutan, waspadai risiko kehilangan data pada tugas sinkronisasi offline.

  • Untuk kebijakan akhir sinkronisasi di sisi Kafka, Anda hanya dapat memilih No new data for 1 minute jika kondisi berikut terpenuhi. Jika tidak, kehilangan data dapat terjadi.

    • Beberapa atau semua partisi dalam topik Kafka tidak menerima data baru dalam waktu lama, misalnya lebih dari 10 menit.

    • Setelah setiap instans berulang dimulai, tidak ada record dengan timestamp lebih awal dari parameter waktu akhir yang ditulis ke topik Kafka.

Prasyarat

Batasan

Sinkronisasi data sumber ke tabel eksternal MaxCompute tidak didukung.

Prosedur

Catatan

Topik ini menggunakan antarmuka DataStudio yang baru untuk menunjukkan cara mengonfigurasi tugas sinkronisasi offline.

Langkah 1: Buat node dan konfigurasikan tugas

Untuk langkah umum membuat dan mengonfigurasi node di Antarmuka tanpa kode, lihat panduan Codeless UI configuration. Topik ini tidak mengulangi langkah-langkah tersebut.

Langkah 2: Konfigurasikan sumber data dan tujuan

Konfigurasikan sumber data (Kafka)

Topik ini menjelaskan cara melakukan sinkronisasi offline tabel tunggal dari Kafka ke MaxCompute. Sumber datanya adalah Kafka. Poin konfigurasi utamanya sebagai berikut.

Catatan

Untuk deskripsi umum item konfigurasi sumber data Kafka, lihat dokumen Kafka Reader. Berikut adalah konfigurasi referensi untuk tutorial ini.

Configuration item

Key configuration

Topic

Pilih topik Kafka yang akan disinkronkan. Jika Anda menggunakan ruang kerja DataWorks mode standar, topik dengan nama yang sama harus ada di kluster Kafka untuk lingkungan pengembangan dan produksi. Untuk Topic, pilih topik ini.

Catatan

Jika:

  • Jika topik tidak ada di lingkungan pengembangan, Anda tidak dapat menemukan topik tersebut di daftar drop-down Topic saat mengonfigurasi node sinkronisasi offline.

  • Jika topik tidak ada di lingkungan produksi, jadwal berulang untuk tugas akan gagal setelah tugas dikirim dan dipublikasikan. Hal ini karena tugas tidak dapat menemukan tabel yang akan disinkronkan.

Consumer Group ID

Masukkan ID unik untuk kluster Kafka sesuai kebutuhan bisnis Anda. Ini membantu statistik dan pemantauan di sisi kluster Kafka.

Read Start Offset, Start Time, Read End Offset, End Time

Atur kedua Read Start Offset dan Read End Offset ke Specific Time. Atur Start Time dan End Time ke parameter penjadwalan ${startTime} dan ${endTime}, masing-masing.

Parameter ini menentukan posisi awal dan akhir sinkronisasi data. Konfigurasi ini berarti sinkronisasi dimulai dari data ${startTime} dan berakhir pada data ${endTime}. Parameter ${startTime} dan ${endTime} diganti dengan nilai berdasarkan ekspresi parameter penjadwalan saat tugas sinkronisasi dijalankan.

Time Zone

Anda dapat mengosongkannya atau memilih zona waktu server default wilayah tempat DataWorks berada.

Catatan

Jika Anda telah menghubungi dukungan teknis Alibaba Cloud untuk mengubah zona waktu penjadwalan, Anda dapat memilih zona waktu yang telah dimodifikasi di sini.

Key Type, Value Type, Encoding

Pilih berdasarkan record aktual di topik Kafka.

Synchronization Completion Policy

Untuk kebijakan akhir sinkronisasi, pilih No new data for 1 minute jika kondisi terpenuhi. Jika tidak, pilih Stop at the specified end position.

  • Beberapa atau semua partisi dalam topik Kafka tidak menerima data baru dalam waktu lama, misalnya lebih dari 10 menit.

  • Setelah setiap instans berulang dimulai, tidak ada record dengan timestamp lebih awal dari parameter waktu akhir yang ditulis ke topik Kafka.

Advanced Configuration

Pertahankan pengaturan default.

Konfigurasikan tujuan data (MaxCompute)

Topik ini menjelaskan cara melakukan sinkronisasi offline tabel tunggal dari Kafka ke MaxCompute. Tujuan datanya adalah sebuah tabel. Poin konfigurasi utamanya sebagai berikut.

Catatan

Untuk parameter yang tidak dijelaskan dalam tabel berikut, Anda dapat mempertahankan pengaturan default.

Configuration item

Key configuration

Data Source

Ini menampilkan sumber data MaxCompute yang dipilih pada langkah sebelumnya. Jika Anda menggunakan ruang kerja DataWorks mode standar, nama proyek pengembangan dan produksi akan ditampilkan.

Table

Pilih tabel MaxCompute yang akan menjadi tujuan sinkronisasi. Jika Anda menggunakan ruang kerja DataWorks mode standar, pastikan tabel MaxCompute dengan nama dan skema yang sama ada di lingkungan pengembangan dan produksi.

Anda juga dapat mengklik Generate Target Table Schema. Sistem secara otomatis membuat tabel untuk menerima data. Anda dapat menyesuaikan pernyataan pembuatan tabel secara manual.

Catatan

Jika kondisi berikut berlaku:

  • Jika tabel MaxCompute tujuan tidak ada di lingkungan pengembangan, Anda tidak dapat menemukan tabel tersebut di daftar drop-down saat mengonfigurasi tabel tujuan untuk node sinkronisasi offline.

  • Jika tabel MaxCompute tujuan tidak ada di lingkungan produksi, tugas sinkronisasi akan gagal saat dijalankan sesuai jadwal setelah dikirim dan dipublikasikan. Hal ini karena tugas tidak dapat menemukan tabel tujuan.

  • Jika skema tabel di lingkungan pengembangan dan produksi tidak konsisten, pemetaan kolom saat dijalankan sesuai jadwal mungkin berbeda dari pemetaan yang dikonfigurasi di node sinkronisasi offline. Hal ini dapat menyebabkan penulisan data yang salah.

Partition

Jika tabel merupakan tabel partisi, Anda dapat memasukkan nilai untuk kolom kunci partisi.

  • Nilainya dapat berupa bidang statis, seperti ds=20220101.

  • Nilainya dapat berupa parameter sistem penjadwalan, seperti ds=${partition}. Parameter sistem penjadwalan secara otomatis diganti saat tugas dijalankan.

Langkah 3: Konfigurasikan pemetaan bidang

Setelah Anda memilih sumber data dan tujuan, Anda harus menentukan pemetaan kolom antara reader dan writer. Anda dapat memilih Map Fields with Same Name, Map Fields in Same Line, Clear Mappings, or Manually Edit Mapping.

  • Terdapat enam bidang default di sisi Kafka.

    Field Name

    Description

    __key__

    Kunci record Kafka.

    __value__

    Nilai record Kafka.

    __partition__

    Nomor partisi tempat record Kafka berada. Nomor partisi dimulai dari 0.

    __headers__

    Header record Kafka.

    __offset__

    Offset record Kafka dalam partisinya. Offset dimulai dari 0.

    __timestamp__

    Timestamp milidetik integer 13 digit dari record Kafka.

  • Anda dapat mengonfigurasi penguraian JSON kustom untuk bidang di sisi Kafka. Gunakan sintaks . (untuk mengambil subfield) dan [] (untuk mengambil elemen array) untuk mendapatkan konten dari bidang nilai record Kafka berformat JSON.

    Penting

    Jika nama bidang JSON mengandung karakter ".", Anda tidak dapat mengambil nilai bidang tersebut dengan mendefinisikan bidang tersebut. Hal ini karena menyebabkan ambiguitas dalam sintaks definisi bidang.

    Berikut adalah contoh nilai data record berformat JSON di Kafka.

    {
          "a": {
          "a1": "hello"
          },
          "b": "world",
          "c":[
                "xxxxxxx",
                "yyyyyyy"
                ],
          "d":[
                {
                      "AA":"this",
                      "BB":"is_data"
                },
                {
                      "AA":"that",
                      "BB":"is_also_data"
                }
            ],
         "a.b": "unreachable"
    }
    • Untuk menyinkronkan data a1, "hello", Anda dapat menambahkan bidang a.a1 di sisi Kafka.

    • Untuk menyinkronkan data b, "world", Anda dapat menambahkan bidang b di sisi Kafka.

    • Untuk menyinkronkan data c, "yyyyyyy", Anda dapat menambahkan bidang c[1] di sisi Kafka.

    • Untuk menyinkronkan data AA, "this", Anda dapat menambahkan bidang d[0].AA di sisi Kafka.

    • Jika bidang di sisi Kafka didefinisikan sebagai a.b, Anda tidak dapat menyinkronkan data "unreachable".

  • Bidang tabel sumber atau tujuan dapat dikecualikan dari pemetaan. Instans sinkronisasi tidak membaca bidang sumber yang tidak dipetakan. NULL ditulis ke bidang tujuan yang tidak dipetakan.

  • Anda tidak dapat memetakan satu bidang sumber ke beberapa bidang tujuan. Bidang tujuan juga tidak dapat dipetakan dari beberapa bidang sumber.

Langkah 4: Konfigurasikan parameter lanjutan

Klik Advanced Configuration di sisi kanan tugas untuk mengatur parameter seperti Maximum Expected Concurrency dan Policy for Dirty Data Records. Untuk tutorial ini, atur Policy for Dirty Data Records ke Ignore Dirty Data Records dan biarkan parameter lain pada nilai default-nya. Untuk informasi selengkapnya, lihat Codeless UI Configuration.

Langkah 5: Konfigurasikan dan jalankan pengujian

  1. Di sisi kanan halaman pengeditan node sinkronisasi offline, klik Debugging Configurations. Atur Script Parameters dan Resource Group untuk pengujian. Lalu, klik Run di bilah alat atas untuk menguji apakah tautan sinkronisasi berjalan sukses.

  2. Anda dapat mengklik ikon image di panel navigasi kiri lalu klik ikon image di sebelah kanan Personal Directory untuk membuat file dengan ekstensi .sql. Anda kemudian dapat menjalankan kueri SQL berikut untuk memeriksa apakah data di tabel tujuan sesuai harapan.

    Catatan
    SELECT * FROM <MaxCompute_destination_table_name> WHERE pt=<specified_partition> LIMIT 20;

Langkah 6: Konfigurasikan penjadwalan dan publikasikan tugas

Klik Scheduling di sisi kanan tugas sinkronisasi offline. Atur parameter konfigurasi penjadwalan untuk eksekusi berulang. Lalu, klik Publish di bilah alat atas untuk membuka panel penerbitan. Ikuti petunjuk di layar untuk publish tugas.

Saat Anda mengonfigurasi sumber data dan tujuan, Anda menggunakan tiga parameter penjadwalan: ${startTime}, ${endTime}, dan ${partition}. Dalam konfigurasi penjadwalan, Anda harus menentukan kebijakan penggantian untuk parameter-parameter ini berdasarkan kebutuhan sinkronisasi Anda. Berikut adalah contoh konfigurasi untuk beberapa skenario khas.

Typical scenario

Recommended configuration

Scenario description

Tugas sinkronisasi dijadwalkan setiap 5 menit

  • startTime=$[yyyymmddhh24mi-8/24/60]00

  • endTime=$[yyyymmddhh24mi-3/24/60]00

  • partition=$[yyyymmddhh24mi-8/24/60]

Jika tugas sinkronisasi dijadwalkan mulai pukul 10:00 pada 2022-11-22:

  • Tugas tersebut menyinkronkan record dari topik Kafka dengan timestamp mulai pukul 09:52 pada 2022-11-22 (inklusif) hingga pukul 09:57 pada 2022-11-22 (eksklusif).

  • Data Kafka yang disinkronkan ditulis ke partisi 202211220952 di MaxCompute.

  • endTime diatur tiga menit lebih awal dari waktu penjadwalan instans ($[yyyymmddhh24mi]). Praktik ini memastikan bahwa semua data untuk rentang waktu tersebut telah ditulis ke topik Kafka sebelum instans tugas sinkronisasi dimulai, sehingga mencegah kehilangan data.

Tugas sinkronisasi dijadwalkan setiap jam

  • startTime=$[yyyymmddhh24-1/24]0000

  • endTime=$[yyyymmddhh24]0000

  • partition=$[yyyymmddhh24]

Catatan
  • Jika tugas sinkronisasi dijadwalkan setiap 2 jam, atur startTime=$[yyyymmddhh24-2/24]0000. Parameter penjadwalan lain tetap tidak berubah.

  • Jika tugas sinkronisasi dijadwalkan setiap 3 jam, atur startTime=$[yyyymmddhh24-3/24]0000. Parameter penjadwalan lain tetap tidak berubah.

  • Logika yang sama berlaku untuk siklus penjadwalan per jam lainnya.

Jika tugas sinkronisasi dijadwalkan mulai pukul 10:05 pada 2022-11-22:

  • Tugas tersebut menyinkronkan record dari topik Kafka dengan timestamp mulai pukul 09:00 pada 2022-11-22 (inklusif) hingga pukul 10:00 pada 2022-11-22 (eksklusif).

  • Data Kafka yang disinkronkan ditulis ke partisi 2022112210 di MaxCompute.

Tugas sinkronisasi dijadwalkan setiap hari

  • startTime=$[yyyymmdd-1]000000

  • endTime=$[yyyymmdd]000000

  • partition=$[yyyymmdd-1]

Jika tugas sinkronisasi dijadwalkan mulai pukul 00:05 pada 2022-11-22:

  • Tugas tersebut menyinkronkan record dari topik Kafka dengan timestamp mulai pukul 00:00 pada 2022-11-21 (inklusif) hingga pukul 00:00 pada 2022-11-22 (eksklusif).

  • Data Kafka yang disinkronkan ditulis ke partisi 20221121 di MaxCompute.

Tugas sinkronisasi dijadwalkan setiap minggu

  • startTime=$[yyyymmdd-7]000000

  • endTime=$[yyyymmdd]000000

  • partition=$[yyyymmdd-1]

Jika tugas sinkronisasi dijadwalkan mulai pukul 00:05 pada 2022-11-22:

  • Tugas tersebut menyinkronkan record dari topik Kafka dengan timestamp mulai pukul 00:00 pada 2022-11-15 (inklusif) hingga pukul 00:00 pada 2022-11-22 (eksklusif).

  • Data Kafka yang disinkronkan ditulis ke partisi 20221121 di MaxCompute.

Tugas sinkronisasi dijadwalkan setiap bulan

  • startTime=$[add_months(yyyymmdd,-1)]000000

  • endTime=$[yyyymmdd]000000

  • partition=$[yyyymmdd-1]

Jika tugas sinkronisasi dijadwalkan mulai pukul 00:05 pada 2022-11-22:

  • Tugas tersebut menyinkronkan record dari topik Kafka dengan timestamp mulai pukul 00:00 pada 2022-10-22 (inklusif) hingga pukul 00:00 pada 2022-11-22 (eksklusif).

  • Data Kafka yang disinkronkan ditulis ke partisi 20221121 di MaxCompute.

Atur siklus penjadwalan berdasarkan interval yang diinginkan.

Typical scenario

Recommended configuration

Scenario description

Tugas sinkronisasi dijadwalkan setiap 5 menit

  • Scheduling Cycle: Minute

  • Start Time: 00:00

  • Interval: 5 minutes

  • End Time: 23:59

None

Tugas sinkronisasi dijadwalkan setiap jam

  • Scheduling Cycle: Hour

  • Start Time: 00:15

  • Interval: 1 hour

  • End Time: 23:59

Atur waktu mulai sedikit lebih lambat dari 00:00, misalnya 00:15. Praktik ini memastikan bahwa semua data untuk rentang waktu tersebut telah ditulis ke topik Kafka sebelum instans tugas sinkronisasi dimulai.

Tugas sinkronisasi dijadwalkan setiap hari

  • Scheduling Cycle: Day

  • Scheduled Time: 00:15

Atur waktu terjadwal sedikit lebih lambat dari 00:00, misalnya 00:15. Praktik ini memastikan bahwa semua data untuk rentang waktu tersebut telah ditulis ke topik Kafka sebelum instans tugas sinkronisasi dimulai.

Tugas sinkronisasi dijadwalkan setiap minggu

  • Scheduling Cycle: Week

  • Specified Time: Monday

  • Scheduled Time: 00:15

Atur waktu terjadwal sedikit lebih lambat dari 00:00, misalnya 00:15. Praktik ini memastikan bahwa semua data untuk rentang waktu tersebut telah ditulis ke topik Kafka sebelum instans tugas sinkronisasi dimulai.

Tugas sinkronisasi dijadwalkan setiap bulan

  • Scheduling Cycle: Month

  • Specified Time: 1st of each month

  • Scheduled Time: 00:15

Atur waktu terjadwal sedikit lebih lambat dari 00:00, misalnya 00:15. Praktik ini memastikan bahwa semua data untuk rentang waktu tersebut telah ditulis ke topik Kafka sebelum instans tugas sinkronisasi dimulai.

Penting

Jika record dengan timestamp yang lebih awal dari atau sama dengan waktu mulai ditulis ke topik Kafka setelah instans dimulai, record tersebut berpotensi tidak terbaca. Jika penulisan data ke topik Kafka tertunda atau timestamp tidak berurutan, waspadai risiko kehilangan data pada tugas sinkronisasi offline.