全部产品
Search
文档中心

AnalyticDB:Sinkronisasi data Kafka menggunakan fitur sinkronisasi data APS (Direkomendasikan)

更新时间:Dec 06, 2025

AnalyticDB for MySQL menyediakan fitur sinkronisasi data AnalyticDB Pipeline Service (APS). Anda dapat menggunakan fitur ini untuk membuat tautan data Kafka guna mengingesti data dari Kafka secara real time mulai dari offset tertentu. Fitur ini mendukung output data near real-time, pengarsipan data historis lengkap, dan analitik elastis. Topik ini menjelaskan cara menambahkan sumber data Kafka, membuat dan memulai tautan data Kafka, lalu menganalisis serta mengelola sumber data tersebut.

Prasyarat

Catatan

  • Hanya data Kafka dalam format JSON yang dapat disinkronkan.

  • Data dalam topik Kafka secara otomatis dihapus setelah periode tertentu. Jika tugas sinkronisasi data gagal dan data topik telah kedaluwarsa, data yang dihapus tidak dapat dipulihkan saat Anda memulai ulang tugas tersebut. Hal ini dapat menyebabkan kehilangan data. Untuk mencegahnya, tingkatkan siklus hidup data topik tersebut. Jika tugas sinkronisasi gagal, segera hubungi dukungan teknis.

  • Jika data sampel Kafka berukuran lebih dari 8 KB, API Kafka akan memotong data tersebut. Hal ini menyebabkan sistem gagal mengurai data sampel menjadi format JSON, sehingga informasi pemetaan bidang tidak dapat dihasilkan secara otomatis.

  • Perubahan pada skema tabel Kafka sumber tidak secara otomatis memicu perubahan DDL di AnalyticDB for MySQL.

  • Setelah data diingesti, operasi Commit harus dieksekusi agar data yang ditulis menjadi terlihat. Untuk mencegah interval operasi Commit yang terlalu pendek memengaruhi stabilitas pekerjaan serta performa baca-tulis, fitur sinkronisasi data AnalyticDB for MySQL memiliki interval operasi Commit default selama 5 menit. Oleh karena itu, saat pertama kali membuat dan memulai pekerjaan sinkronisasi data, Anda harus menunggu setidaknya 5 menit untuk melihat batch pertama data yang ditulis.

Penagihan

Penggunaan fitur sinkronisasi data AnalyticDB for MySQL dikenakan biaya sebagai berikut.

Prosedur

Buat sumber data

Catatan

Jika Anda telah menambahkan sumber data Kafka, lewati langkah ini dan lanjutkan ke Buat tautan data.

  1. Masuk ke Konsol AnalyticDB for MySQL. Di pojok kiri atas konsol, pilih wilayah. Di panel navigasi sebelah kiri, klik Clusters. Temukan kluster yang ingin Anda kelola dan klik ID kluster tersebut.

  2. Di panel navigasi sebelah kiri, pilih Data Ingestion > Data Sources.

  3. Di pojok kiri atas, klik Create Data Source.

  4. Pada halaman Create Data Source, konfigurasikan parameter-parameter berikut. Tabel berikut menjelaskan parameter tersebut.

    Parameter

    Deskripsi

    Data Source Type

    Pilih Kafka.

    Data Source Name

    Sistem secara otomatis menghasilkan nama berdasarkan jenis sumber data dan waktu saat ini. Anda dapat mengubah nama sesuai kebutuhan.

    Data Source Description

    Masukkan deskripsi untuk sumber data, misalnya skenario danau data terpadu atau batasan bisnis.

    Deployment Mode

    Hanya Alibaba Cloud Instance yang didukung.

    Kafka Instance

    ID instans Kafka.

    Masuk ke Konsol ApsaraMQ for Kafka dan lihat ID instans pada halaman Instances.

    Kafka Topic

    Nama topik yang dibuat di Kafka.

    Masuk ke Konsol ApsaraMQ for Kafka dan lihat nama topik pada halaman Topics dari instans tujuan.

    Message Data Format

    Format data pesan Kafka. Hanya JSON yang didukung.

  5. Setelah mengonfigurasi parameter, klik Create.

Buat tautan data

  1. Di panel navigasi sebelah kiri, klik Simple Log Service/Kafka Data Synchronization.

  2. Di pojok kiri atas, klik Create Synchronization Job.

  3. Pada halaman Create Synchronization Job, konfigurasikan bagian Source and Destination Settings, Destination Database and Table Settings, dan Synchronization Settings.

    • Tabel berikut menjelaskan parameter untuk Source and Destination Settings.

      Parameter

      Deskripsi

      Job Name

      Nama tautan data. Sistem secara otomatis menghasilkan nama berdasarkan jenis sumber data dan waktu saat ini. Anda dapat mengubah nama sesuai kebutuhan.

      Data Source

      Pilih sumber data Kafka yang sudah ada atau buat yang baru.

      Destination Type

      Nilai yang valid:

      • Data Lake - User OSS.

      • Data Lake - AnalyticDB Lake Storage (Direkomendasikan).

        Penting

        Jika Anda mengatur Destination type ke Data Lake - AnalyticDB Lake Storage, Anda harus mengaktifkan fitur penyimpanan danau.

      ADB Lake Storage

      Nama penyimpanan danau tempat data danau AnalyticDB for MySQL berada.

      Pilih penyimpanan danau tujuan dari daftar drop-down. Jika belum ada penyimpanan danau yang dibuat, klik Automatically Created dalam daftar drop-down untuk membuatnya secara otomatis.

      Penting

      Parameter ini wajib diisi ketika Anda mengatur Destination Type ke Data Lake - AnalyticDB Lake Storage.

      OSS Path

      Jalur penyimpanan di OSS untuk data danau AnalyticDB for MySQL.

      Penting
      • Parameter ini wajib diisi ketika Anda mengatur Destination Type ke Data Lake - User OSS.

      • Bucket yang ditampilkan adalah semua bucket di wilayah yang sama dengan kluster AnalyticDB for MySQL. Anda dapat memilih salah satunya. Rencanakan jalur penyimpanan dengan cermat. Anda tidak dapat mengubahnya setelah pembuatan.

      • Pilih folder kosong. Jalur OSS tidak boleh memiliki hubungan awalan (prefix) dengan jalur OSS tugas lainnya untuk mencegah data tertimpa. Misalnya, jika jalur OSS untuk dua tugas sinkronisasi data adalah oss://testBucketName/test/sls1/ dan oss://testBucketName/test/, keduanya memiliki hubungan awalan, yang akan menyebabkan data tertimpa selama sinkronisasi data.

      Storage Format

      Format penyimpanan data. Nilai yang valid:

      • PAIMON.

        Penting

        Format ini hanya didukung ketika Destination Type diatur ke Data Lake - User OSS.

      • ICEBERG.

    • Tabel berikut menjelaskan parameter untuk Destination Database and Table Settings.

      Parameter

      Deskripsi

      Database Name

      Nama database tujuan di AnalyticDB for MySQL. Jika database dengan nama yang sama belum ada, database baru akan dibuat. Jika database dengan nama yang sama sudah ada, data akan disinkronkan ke database yang sudah ada. Untuk informasi selengkapnya tentang konvensi penamaan database, lihat Batasan.

      Penting

      Pada bagian Source and Destination Settings, jika Anda mengatur Storage Format ke PAIMON, database yang sudah ada harus memenuhi kondisi berikut. Jika tidak, tugas sinkronisasi data akan gagal.

      • Harus merupakan database eksternal. Pernyataan untuk membuat database harus berupa CREATE EXTERNAL DATABASE<database_name>.

      • Parameter `DBPROPERTIES` dalam pernyataan `CREATE DATABASE` harus mencakup properti catalog, dan nilai catalog harus paimon.

      • Parameter `DBPROPERTIES` dalam pernyataan `CREATE DATABASE` harus mencakup properti adb.paimon.warehouse. Contoh: adb.paimon.warehouse=oss://testBucketName/aps/data.

      • Parameter `DBPROPERTIES` dalam pernyataan `CREATE DATABASE` harus mencakup properti LOCATION, dan Anda harus menambahkan .db setelah nama database. Jika tidak, kueri XIHE akan gagal. Contoh: LOCATION=oss://testBucketName/aps/data/kafka_paimon_external_db.db/.

        Direktori bucket dalam jalur OSS yang dikonfigurasi untuk LOCATION harus sudah ada. Jika tidak, pembuatan database akan gagal.

      Table Name

      Nama tabel tujuan di AnalyticDB for MySQL. Jika tabel dengan nama yang sama belum ada di database, tabel baru akan dibuat. Jika tabel dengan nama yang sama sudah ada, sinkronisasi data akan gagal. Untuk informasi selengkapnya tentang konvensi penamaan tabel, lihat Batasan.

      Sample Data

      Data terbaru secara otomatis diambil dari topik Kafka dan digunakan sebagai data sampel.

      Catatan

      Data dalam topik Kafka harus dalam format JSON. Jika format data lainnya ada, akan terjadi error selama sinkronisasi data.

      Parsed JSON Layers

      Atur jumlah level bersarang yang akan diurai dalam data JSON. Nilai yang valid:

      • 0: Tidak ada penguraian.

      • 1 (Default): Uraikan satu level.

      • 2: Uraikan dua level.

      • 3: Uraikan tiga level.

      • 4: Uraikan empat level.

      Untuk informasi selengkapnya tentang kebijakan penguraian bersarang JSON, lihat Contoh level penguraian JSON dan inferensi skema.

      Schema Field Mapping

      Menampilkan informasi skema data sampel setelah penguraian JSON. Anda dapat menyesuaikan nama dan tipe bidang tujuan, atau menambah/menghapus bidang sesuai kebutuhan.

      Partition Key Settings

      Atur kunci partisi untuk tabel tujuan. Kami merekomendasikan mengonfigurasi partisi berdasarkan waktu log atau logika bisnis untuk memastikan performa ingesti dan kueri data. Jika Anda tidak mengatur kunci partisi, tabel tujuan tidak akan memiliki partisi secara default.

      Anda dapat memformat kunci partisi tujuan menggunakan format waktu atau dengan menentukan bidang partisi.

      • Untuk mempartisi berdasarkan tanggal dan waktu, pilih bidang bertipe tanggal-waktu sebagai nama bidang partisi. Untuk metode penanganan format, pilih Time Formatting, lalu pilih format bidang sumber dan format partisi tujuan. AnalyticDB for MySQL mengidentifikasi nilai bidang partisi berdasarkan format bidang sumber dan mengonversinya ke format partisi tujuan untuk partisi. Misalnya, jika bidang sumber adalah gmt_created dengan nilai 1711358834, format bidang sumber adalah timestamp presisi tingkat detik, dan format partisi tujuan adalah yyyyMMdd, data akan dipartisi berdasarkan 20240325.

      • Untuk mempartisi berdasarkan nilai bidang, pilih Specify Partition Field sebagai metode penanganan format.

    • Tabel berikut menjelaskan parameter untuk Synchronization Settings.

      Parameter

      Deskripsi

      Starting Consumer Offset for Incremental Synchronization

      Saat tugas sinkronisasi dimulai, sistem mulai mengonsumsi data Kafka dari titik waktu yang dipilih. Nilai yang valid:

      • Earliest offset (begin_cursor): Secara otomatis mengonsumsi data dari titik waktu paling awal dalam data Kafka.

      • Latest offset (end_cursor): Secara otomatis mengonsumsi data dari titik waktu paling akhir dalam data Kafka.

      • Custom offset: Anda dapat memilih titik waktu apa pun. Sistem akan mulai mengonsumsi dari data pertama di Kafka yang berada pada atau setelah waktu ini.

      Job Resource Group

      Tentukan kelompok sumber daya pekerjaan tempat tugas akan dijalankan.

      ACUs for Incremental Synchronization

      Tentukan jumlah ACU untuk kelompok sumber daya pekerjaan. Jumlah minimum ACU adalah 2, dan maksimum adalah sumber daya komputasi maksimum yang tersedia di kelompok sumber daya pekerjaan. Kami merekomendasikan menentukan jumlah ACU yang lebih tinggi untuk meningkatkan performa ingesti data dan stabilitas tugas.

      Catatan

      Saat Anda membuat tugas sinkronisasi data, tugas tersebut menggunakan sumber daya elastis dari kelompok sumber daya pekerjaan. Tugas sinkronisasi data menempati sumber daya dalam jangka waktu lama, sehingga sistem mengurangi sumber daya yang digunakan oleh tugas tersebut dari kelompok sumber daya. Misalnya, jika kelompok sumber daya pekerjaan memiliki maksimum 48 ACU dan Anda telah membuat tugas sinkronisasi yang menggunakan 8 ACU, jumlah maksimum ACU yang dapat Anda pilih untuk tugas sinkronisasi lain dalam kelompok sumber daya ini adalah 40.

      Advanced Settings

      Konfigurasi lanjutan memungkinkan Anda menyesuaikan tugas sinkronisasi. Untuk melakukan konfigurasi khusus, hubungi dukungan teknis.

  4. Setelah mengonfigurasi parameter, klik Submit.

Mulai tugas sinkronisasi data

  1. Pada halaman Simple Log Service/Kafka Data Synchronization, temukan tugas sinkronisasi data yang telah Anda buat dan klik Start di kolom Actions.

  2. Di pojok kiri atas, klik Search. Tugas berhasil dimulai ketika statusnya berubah menjadi Running.

Analitik data

Setelah data disinkronkan, Anda dapat menggunakan fitur pengembangan Spark Jar untuk menganalisis data di AnalyticDB for MySQL. Untuk informasi selengkapnya tentang pengembangan Spark, lihat Editor pengembangan Spark dan Pengembangan aplikasi Spark offline.

  1. Di panel navigasi sebelah kiri, pilih Job Development > Spark JAR Development.

  2. Masukkan pernyataan contoh dalam templat default dan klik Run Now.

    -- Here is just an example of SparkSQL. Modify the content and run your spark program.
    
    conf spark.driver.resourceSpec=medium;
    conf spark.executor.instances=2;
    conf spark.executor.resourceSpec=medium;
    conf spark.app.name=Spark SQL Test;
    conf spark.adb.connectors=oss;
    
    -- Here are your sql statements
    show tables from lakehouse20220413156_adbTest;
  3. (Opsional) Pada tab Applications, klik Logs di kolom Actions untuk melihat log eksekusi pekerjaan Spark SQL.

Kelola sumber data

Di panel navigasi sebelah kiri, pilih Data Ingestion > Data Sources. Anda dapat melakukan operasi berikut di kolom Actions.

Operasi

Deskripsi

Create Job

Langsung menuju halaman pembuatan tugas sinkronisasi data atau migrasi data untuk sumber data ini.

View

Lihat konfigurasi detail sumber data.

Edit

Edit properti sumber data, seperti nama dan deskripsinya.

Delete

Hapus sumber data saat ini.

Catatan

Jika terdapat tugas sinkronisasi data atau migrasi data untuk sumber data tersebut, Anda tidak dapat langsung menghapus sumber data. Anda harus terlebih dahulu menuju halaman Simple Log Service/Kafka Data Synchronization, temukan tugas sinkronisasi target, lalu klik Delete di kolom Actions untuk menghapus tugas sinkronisasi atau migrasi data tersebut.

Contoh level penguraian JSON dan inferensi skema

Level penguraian menentukan jumlah level bersarang yang akan diurai dalam data JSON. Misalnya, pengguna mengirim data JSON berikut ke Kafka.

{
  "name" : "zhangle",
  "age" : 18,
  "device" : {
    "os" : {
        "test":lag,
        "member":{
             "fa":zhangsan,
             "mo":limei
       }
     },
    "brand" : "none",
    "version" : "11.4.2"
  }
}

Bagian berikut menunjukkan hasil penguraian untuk level 0 hingga 4.

Penguraian Level 0

Data tidak diurai. Data JSON asli langsung dioutput.

JSON field

Value

Destination field name

__value__

{ "name" : "zhangle","age" : 18, "device" : { "os" : { "test":lag,"member":{ "fa":zhangsan,"mo":limei }},"brand": "none","version" : "11.4.2" }}

__value__

Penguraian Level 1

Level pertama data JSON diurai.

JSON field

Value

Destination field name

name

zhangle

name

age

18

age

device

{ "os" : { "test":lag,"member":{ "fa":zhangsan,"mo":limei }},"brand": "none","version" : "11.4.2" }

device

Penguraian Level 2

Level kedua data JSON diurai. Jika suatu bidang tidak bersarang, bidang tersebut langsung dioutput. Misalnya, bidang name dan age langsung dioutput. Jika suatu bidang bersarang, sub-bidangnya dioutput. Misalnya, bidang device bersarang, sehingga sub-bidangnya device.os, device.brand, dan device.version dioutput.

Penting

Karena nama bidang tujuan tidak boleh mengandung titik (.), titik secara otomatis diganti dengan garis bawah (_).

JSON field

Value

Destination field name

name

zhangle

name

age

18

age

device.os

{ "test":lag,"member":{ "fa":zhangsan,"mo":limei }

device_os

device.brand

none

device_brand

device.version

11.4.2

device_version

Penguraian Level 3

JSON field

Value

Destination field name

name

zhangle

name

age

18

age

device.os.test

lag

device_os_test

device.os.member

{ "fa":zhangsan,"mo":limei }

device_os_member

device.brand

none

device_brand

device.version

11.4.2

device_version

Penguraian Level 4

JSON field

Value

Destination field name

name

zhangle

name

age

18

age

device.os.test

lag

device_os_test

device.os.member.fa

zhangsan

device_os_member_fa

device.os.member.mo

lime

device_os_member_mo

device.brand

none

device_brand

device.version

11.4.2

device_version