全部产品
Search
文档中心

DataWorks:Sumber data Kafka

更新时间:Nov 10, 2025

Sumber data Kafka menyediakan saluran dua arah untuk membaca dan menulis data ke Kafka. Topik ini menjelaskan kemampuan sinkronisasi data yang disediakan oleh DataWorks untuk Kafka.

Versi yang didukung

DataWorks mendukung Kafka Alibaba Cloud serta versi Kafka yang dikelola sendiri mulai dari 0.10.2 hingga 3.6.x.

Catatan

Sinkronisasi data tidak didukung untuk versi Kafka sebelum 0.10.2 karena versi tersebut tidak mendukung pengambilan offset partisi dan struktur datanya mungkin tidak kompatibel dengan timestamp.

Baca real-time

  • Jika Anda menggunakan kelompok sumber daya Serverless berlangganan, perkirakan spesifikasi yang dibutuhkan terlebih dahulu untuk mencegah kegagalan tugas akibat sumber daya yang tidak mencukupi.

    Perkirakan 1 CU per topik. Selain itu, perkirakan sumber daya berdasarkan lalu lintas sebagai berikut:

    • Untuk data Kafka yang tidak dikompresi, alokasikan 1 CU per 10 MB/detik lalu lintas.

    • Untuk data Kafka yang dikompresi, alokasikan 2 CU per 10 MB/detik lalu lintas.

    • Untuk data Kafka yang dikompresi dan memerlukan penguraian JSON, alokasikan 3 CU per 10 MB/detik lalu lintas.

  • Jika Anda menggunakan kelompok sumber daya Serverless berlangganan atau versi lama grup sumber daya eksklusif untuk Integrasi Data:

    • Untuk beban kerja dengan toleransi tinggi terhadap failover, pastikan penggunaan slot kluster tidak melebihi 80%.

    • Untuk beban kerja dengan toleransi rendah terhadap failover, pastikan penggunaan slot kluster tidak melebihi 70%.

Catatan

Penggunaan sumber daya aktual bergantung pada faktor seperti konten dan format data. Sesuaikan alokasi sumber daya berdasarkan kondisi waktu proses setelah estimasi awal.

Batasan

Sumber data Kafka mendukung kelompok sumber daya Serverless (disarankan) dan versi lama grup sumber daya eksklusif untuk Integrasi Data.

Baca offline dari satu tabel

Jika kedua parameter parameter.groupId dan parameter.kafkaConfig.group.id dikonfigurasi, parameter.groupId memiliki prioritas lebih tinggi daripada group.id dalam parameter kafkaConfig.

Tulis real-time ke satu tabel

Deduplikasi data tidak didukung untuk operasi tulis. Jika suatu tugas dimulai ulang setelah reset offset atau failover, data duplikat mungkin ditulis.

Tulis real-time untuk seluruh database

  • Tugas sinkronisasi data real-time mendukung kelompok sumber daya Serverless (disarankan) dan versi lama grup sumber daya eksklusif untuk Integrasi Data.

  • Jika tabel sumber memiliki kunci utama, nilai kunci utama digunakan sebagai kunci catatan Kafka. Hal ini memastikan bahwa perubahan pada kunci utama yang sama ditulis ke partisi Kafka yang sama secara berurutan.

  • Jika tabel sumber tidak memiliki kunci utama, Anda memiliki dua opsi. Jika memilih opsi untuk menyinkronkan tabel tanpa kunci utama, kunci catatan Kafka akan kosong. Untuk memastikan perubahan tabel ditulis ke Kafka secara berurutan, topik Kafka tujuan harus hanya memiliki satu partisi. Jika memilih kunci utama kustom, kombinasi satu atau beberapa bidang non-kunci utama digunakan sebagai kunci catatan Kafka.

  • Jika kluster Kafka mengembalikan pengecualian dan Anda perlu memastikan bahwa perubahan pada kunci utama yang sama ditulis ke partisi Kafka yang sama secara berurutan, tambahkan konfigurasi berikut ke parameter tambahan sumber data Kafka:

    {"max.in.flight.requests.per.connection":1,"buffer.memory": 100554432}

    Penting

    Konfigurasi ini secara signifikan menurunkan kinerja replikasi. Anda harus menyeimbangkan antara kinerja dan kebutuhan akan pengurutan serta keandalan yang ketat.

  • Untuk informasi lebih lanjut tentang format pesan lengkap yang ditulis ke Kafka dalam sinkronisasi real-time, termasuk format pesan heartbeat dan format pesan yang sesuai dengan perubahan data sumber, lihat Lampiran: Format pesan.

Tipe bidang yang didukung

Kafka menyediakan penyimpanan data tidak terstruktur. Catatan Kafka biasanya mencakup kunci, nilai, offset, timestamp, header, dan partisi. Saat DataWorks membaca atau menulis data ke Kafka, pemrosesan data mengikuti kebijakan berikut.

Baca data offline

Saat DataWorks membaca data dari Kafka, data tersebut dapat diurai dalam format JSON. Tabel berikut menjelaskan cara setiap modul data diproses.

Modul data catatan Kafka

Tipe data yang diproses

key

Bergantung pada item konfigurasi keyType dalam tugas sinkronisasi data. Untuk informasi lebih lanjut tentang parameter keyType, lihat deskripsi parameter lengkap di lampiran.

value

Bergantung pada item konfigurasi valueType dalam tugas sinkronisasi data. Untuk informasi lebih lanjut tentang parameter valueType, lihat deskripsi parameter lengkap di lampiran.

offset

Long

timestamp

Long

headers

String

partition

Long

Tulis data offline

Saat DataWorks menulis data ke Kafka, format penulisan yang didukung adalah JSON atau teks. Kebijakan pemrosesan data bervariasi tergantung pada solusi sinkronisasi yang digunakan. Tabel berikut menjelaskan detailnya.

Penting
  • Jika data ditulis dalam format teks, nama bidang tidak disertakan. Nilai bidang dipisahkan oleh pemisah.

  • Jika data ditulis ke Kafka dalam tugas sinkronisasi real-time, format JSON bawaan digunakan. Data yang ditulis mencakup semua informasi, seperti pesan perubahan database, waktu bisnis, dan informasi DDL. Untuk informasi lebih lanjut tentang format data, lihat Lampiran: Format pesan.

Jenis tugas sinkronisasi

Format nilai yang ditulis ke Kafka

Tipe bidang sumber

Metode pemrosesan untuk operasi tulis

Sinkronisasi offline

Node sinkronisasi offline di DataStudio

json

String

String yang dikodekan UTF-8

Boolean

Dikonversi menjadi string "true" atau "false" yang dikodekan UTF-8

Waktu/Tanggal

String yang dikodekan UTF-8 dalam format yyyy-MM-dd HH:mm:ss

Numerik

String numerik yang dikodekan UTF-8

Aliran byte

Aliran byte diperlakukan sebagai string yang dikodekan UTF-8 dan dikonversi menjadi string.

teks

String

String yang dikodekan UTF-8

Boolean

Dikonversi menjadi string "true" atau "false" yang dikodekan UTF-8

Waktu/Tanggal

String yang dikodekan UTF-8 dalam format yyyy-MM-dd HH:mm:ss

Numerik

String numerik yang dikodekan UTF-8

Aliran byte

Aliran byte diperlakukan sebagai string yang dikodekan UTF-8 dan dikonversi menjadi string.

Sinkronisasi real-time: ETL real-time ke Kafka

Node sinkronisasi real-time di DataStudio

json

String

String yang dikodekan UTF-8

Boolean

Tipe Boolean JSON

Waktu/Tanggal

  • Untuk nilai waktu dengan presisi kurang dari milidetik: Dikonversi menjadi bilangan bulat JSON 13 digit yang merepresentasikan timestamp dalam milidetik.

  • Untuk nilai waktu dengan presisi mikrodetik atau nanodetik: Dikonversi menjadi bilangan titik mengambang JSON yang mencakup bilangan bulat 13 digit untuk timestamp milidetik dan desimal 6 digit untuk timestamp nanodetik.

Numerik

Tipe numerik JSON

Aliran byte

Aliran byte dikodekan Base64 lalu dikonversi menjadi string yang dikodekan UTF-8.

teks

String

String yang dikodekan UTF-8

Boolean

Dikonversi menjadi string "true" atau "false" yang dikodekan UTF-8

Waktu/Tanggal

String yang dikodekan UTF-8 dalam format yyyy-MM-dd HH:mm:ss

Numerik

String numerik yang dikodekan UTF-8

Aliran byte

Aliran byte dikodekan Base64 lalu dikonversi menjadi string yang dikodekan UTF-8.

Sinkronisasi real-time: Sinkronisasi real-time seluruh database ke Kafka

Hanya sinkronisasi data inkremental

Format JSON bawaan

String

String yang dikodekan UTF-8

Boolean

Tipe Boolean JSON

Waktu/Tanggal

Timestamp milidetik 13 digit

Numerik

Nilai numerik JSON

Aliran byte

Aliran byte dikodekan Base64 lalu dikonversi menjadi string yang dikodekan UTF-8.

Solusi sinkronisasi: Sinkronisasi real-time sekali klik ke Kafka

Sinkronisasi offline penuh + sinkronisasi real-time inkremental

Format JSON bawaan

String

String yang dikodekan UTF-8

Boolean

Tipe Boolean JSON

Waktu/Tanggal

Timestamp milidetik 13 digit

Numerik

Nilai numerik JSON

Aliran byte

Aliran byte dikodekan Base64 lalu dikonversi menjadi string yang dikodekan UTF-8.

Tambahkan sumber data

Sebelum mengembangkan tugas sinkronisasi di DataWorks, Anda harus menambahkan sumber data yang diperlukan ke DataWorks sesuai petunjuk di Manajemen sumber data. Anda dapat melihat infotips parameter di konsol DataWorks untuk memahami arti parameter saat menambahkan sumber data.

Kembangkan tugas sinkronisasi data

Untuk informasi mengenai titik masuk dan prosedur konfigurasi tugas sinkronisasi, lihat panduan konfigurasi berikut.

Konfigurasikan tugas sinkronisasi offline untuk satu tabel

Konfigurasikan tugas sinkronisasi real-time untuk satu tabel atau seluruh database

Untuk informasi lebih lanjut tentang prosedurnya, lihat Konfigurasikan tugas sinkronisasi real-time di DataStudio.

Konfigurasikan tugas sinkronisasi real-time penuh dan inkremental untuk satu tabel atau seluruh database

Untuk informasi lebih lanjut tentang prosedurnya, lihat Konfigurasikan tugas sinkronisasi di Integrasi Data.

Konfigurasi autentikasi

SSL

Jika Anda mengatur Special Authentication Method untuk sumber data Kafka ke SSL atau SASL_SSL, autentikasi SSL diaktifkan untuk kluster Kafka. Anda harus mengunggah file sertifikat truststore klien dan memasukkan frasa sandi truststore.

  • Jika kluster Kafka adalah instans Kafka Alibaba Cloud, lihat Petunjuk peningkatan algoritma sertifikat SSL untuk mengunduh file sertifikat truststore yang sesuai. Frasa sandi truststore adalah KafkaOnsClient.

  • Jika kluster Kafka adalah instans EMR, lihat Gunakan enkripsi SSL untuk koneksi Kafka untuk mengunduh file sertifikat truststore yang sesuai dan mendapatkan frasa sandi truststore.

  • Untuk kluster yang dikelola sendiri, Anda harus mengunggah sertifikat truststore yang sesuai dan memasukkan frasa sandi truststore yang benar.

File sertifikat keystore, frasa sandi keystore, dan frasa sandi SSL hanya diperlukan jika autentikasi SSL dua arah diaktifkan untuk kluster Kafka. Server kluster Kafka menggunakan informasi ini untuk mengautentikasi identitas klien. Autentikasi SSL dua arah diaktifkan ketika ssl.client.auth=required diatur dalam file server.properties kluster Kafka. Untuk informasi lebih lanjut, lihat Gunakan enkripsi SSL untuk koneksi Kafka.

GSSAPI

Jika Anda mengatur Sasl Mechanism ke GSSAPI saat mengonfigurasi sumber data Kafka, Anda harus mengunggah tiga file autentikasi: file konfigurasi JAAS, file konfigurasi Kerberos, dan file Keytab. Anda juga harus mengonfigurasi pengaturan DNS/HOST untuk grup sumber daya eksklusif. Bagian berikut menjelaskan file-file tersebut serta pengaturan DNS dan HOST yang diperlukan.

Catatan

Untuk grup sumber daya Serverless, Anda harus mengonfigurasi informasi alamat host menggunakan resolusi DNS internal. Untuk informasi lebih lanjut, lihat Resolusi DNS internal (PrivateZone).

  • File konfigurasi JAAS

    File JAAS harus dimulai dengan KafkaClient, diikuti oleh semua item konfigurasi yang diapit sepasang tanda kurung kurawal {}:

    • Baris pertama di dalam tanda kurung kurawal mendefinisikan kelas komponen logon yang digunakan. Untuk mekanisme autentikasi SASL yang berbeda, kelas komponen logon bersifat tetap. Setiap item konfigurasi berikutnya ditulis dalam format key=value.

    • Semua item konfigurasi kecuali yang terakhir tidak boleh diakhiri dengan titik koma.

    • Item konfigurasi terakhir harus diakhiri dengan titik koma. Titik koma juga harus ditambahkan setelah tanda kurung kurawal penutup "}".

    Jika persyaratan format tidak dipenuhi, file konfigurasi JAAS tidak dapat diurai. Kode berikut menunjukkan format file konfigurasi JAAS khas. Ganti placeholder xxx dengan informasi aktual Anda.

    KafkaClient {
       com.sun.security.auth.module.Krb5LoginModule required
       useKeyTab=true
       keyTab="xxx"
       storeKey=true
       serviceName="kafka-server"
       principal="kafka-client@EXAMPLE.COM";
    };

    Item konfigurasi

    Deskripsi

    Modul logon

    Harus diatur ke com.sun.security.auth.module.Krb5LoginModule.

    useKeyTab

    Harus diatur ke true.

    keyTab

    Anda dapat menentukan path apa pun. Saat tugas sinkronisasi berjalan, file keytab yang diunggah selama konfigurasi sumber data secara otomatis diunduh ke path lokal. Path file lokal ini kemudian digunakan untuk mengisi item konfigurasi keyTab.

    storeKey

    Menentukan apakah klien menyimpan kunci. Anda dapat mengatur ini ke true atau false. Ini tidak memengaruhi sinkronisasi data.

    serviceName

    Sesuai dengan item konfigurasi sasl.kerberos.service.name dalam file konfigurasi server.properties server Kafka. Konfigurasikan item ini sesuai kebutuhan.

    principal

    Principal Kerberos yang digunakan oleh klien Kafka. Konfigurasikan item ini sesuai kebutuhan dan pastikan bahwa file keytab yang diunggah berisi kunci untuk principal ini.

  • File konfigurasi Kerberos

    File konfigurasi Kerberos harus berisi dua modul: [libdefaults] dan [realms].

    • Modul [libdefaults] menentukan parameter autentikasi Kerberos. Setiap item konfigurasi dalam modul ditulis dalam format key=value.

    • Modul [realms] menentukan alamat KDC. Modul ini dapat berisi beberapa submodul realm. Setiap submodul realm dimulai dengan nama_realm=.

    Ini diikuti oleh serangkaian item konfigurasi yang diapit tanda kurung kurawal. Setiap item konfigurasi juga ditulis dalam format key=value. Kode berikut menunjukkan format file konfigurasi Kerberos khas. Ganti placeholder xxx dengan informasi aktual Anda.

    [libdefaults]
      default_realm = xxx
    
    [realms]
      xxx = {
        kdc = xxx
      }

    Item konfigurasi

    Deskripsi

    [libdefaults].default_realm

    Realm default yang digunakan saat mengakses node kluster Kafka. Biasanya sama dengan realm principal klien yang ditentukan dalam file konfigurasi JAAS.

    Parameter [libdefaults] lainnya

    Modul [libdefaults] dapat menentukan parameter autentikasi Kerberos lainnya, seperti ticket_lifetime. Konfigurasikan sesuai kebutuhan.

    [realms].nama_realm

    Harus sama dengan realm principal klien yang ditentukan dalam file konfigurasi JAAS dan [libdefaults].default_realm. Jika realm principal klien dalam file konfigurasi JAAS berbeda dari [libdefaults].default_realm, Anda perlu menyertakan dua submodul realms. Submodul ini harus sesuai dengan realm principal klien dalam file konfigurasi JAAS dan [libdefaults].default_realm, masing-masing.

    [realms].nama_realm.kdc

    Menentukan alamat dan port KDC dalam format ip:port. Misalnya, kdc=10.0.0.1:88. Jika port dihilangkan, port default 88 digunakan. Misalnya, kdc=10.0.0.1.

  • File Keytab

    File Keytab harus berisi kunci untuk principal yang ditentukan dalam file konfigurasi JAAS dan harus dapat diverifikasi oleh KDC. Misalnya, jika ada file bernama client.keytab di direktori kerja saat ini, Anda dapat menjalankan perintah berikut untuk memverifikasi apakah file Keytab berisi kunci untuk principal yang ditentukan.

    klist -ket ./client.keytab
    
    Keytab name: FILE:client.keytab
    KVNO Timestamp           Principal
    ---- ------------------- ------------------------------------------------------
       7 2018-07-30T10:19:16 te**@**.com (des-cbc-md5)
  • Konfigurasi DNS dan HOST untuk grup sumber daya eksklusif

    Dalam kluster Kafka dengan autentikasi Kerberos diaktifkan, hostname node dalam kluster digunakan sebagai bagian dari principal yang terdaftar untuk node tersebut di KDC (Key Distribution Center). Saat klien mengakses node dalam kluster Kafka, klien tersebut menginferensi principal node berdasarkan pengaturan DNS dan HOST lokal untuk mendapatkan kredensial akses dari KDC. Jika Anda menggunakan grup sumber daya eksklusif untuk mengakses kluster Kafka dengan autentikasi Kerberos diaktifkan, Anda harus mengonfigurasi pengaturan DNS dan HOST dengan benar agar kredensial akses untuk node kluster dapat diperoleh dari KDC:

    • Pengaturan DNS

      Jika instans PrivateZone digunakan untuk resolusi nama domain node kluster Kafka di VPC tempat grup sumber daya eksklusif dilampirkan, Anda dapat menambahkan rute kustom untuk alamat IP 100.100.2.136 dan 100.100.2.138 ke lampiran VPC grup sumber daya eksklusif di konsol DataWorks. Hal ini memastikan bahwa pengaturan resolusi nama domain PrivateZone untuk node kluster Kafka berlaku bagi grup sumber daya eksklusif.

    • Pengaturan HOST

      Jika instans PrivateZone tidak digunakan untuk resolusi nama domain node kluster Kafka di VPC tempat grup sumber daya eksklusif dilampirkan, Anda harus menambahkan pemetaan alamat IP ke nama domain untuk setiap node kluster Kafka ke konfigurasi Host di pengaturan jaringan untuk grup sumber daya eksklusif di konsol DataWorks.

PLAIN

Saat mengonfigurasi sumber data Kafka, jika Anda mengatur Sasl Mechanism ke PLAIN, file JAAS harus dimulai dengan KafkaClient, diikuti oleh semua item konfigurasi yang diapit sepasang tanda kurung kurawal {}.

  • Baris pertama di dalam tanda kurung kurawal mendefinisikan kelas komponen logon yang digunakan. Untuk mekanisme autentikasi SASL yang berbeda, kelas komponen logon bersifat tetap. Setiap item konfigurasi berikutnya ditulis dalam format key=value.

  • Semua item konfigurasi kecuali yang terakhir tidak boleh diakhiri dengan titik koma.

  • Item konfigurasi terakhir harus diakhiri dengan titik koma. Titik koma juga harus ditambahkan setelah tanda kurung kurawal penutup "}".

Jika persyaratan format tidak dipenuhi, file konfigurasi JAAS tidak dapat diurai. Kode berikut menunjukkan format file konfigurasi JAAS khas. Ganti placeholder xxx dengan informasi aktual Anda.

KafkaClient {
  org.apache.kafka.common.security.plain.PlainLoginModule required
  username="xxx"
  password="xxx";
};

Item konfigurasi

Deskripsi

Modul logon

Harus diatur ke org.apache.kafka.common.security.plain.PlainLoginModul

username

Nama pengguna. Konfigurasikan item ini sesuai kebutuhan.

password

Kata sandi. Konfigurasikan item ini sesuai kebutuhan.

FAQ

Lampiran: Demo skrip dan deskripsi parameter

Konfigurasikan tugas sinkronisasi batch menggunakan editor kode

Jika Anda ingin mengonfigurasi tugas sinkronisasi batch menggunakan editor kode, Anda harus mengonfigurasi parameter terkait dalam skrip sesuai dengan persyaratan format skrip terpadu. Untuk informasi lebih lanjut, lihat Konfigurasikan tugas di editor kode. Informasi berikut menjelaskan parameter yang harus Anda konfigurasi untuk sumber data saat mengonfigurasi tugas sinkronisasi batch menggunakan editor kode.

Demo skrip Reader

Kode berikut menunjukkan konfigurasi JSON untuk membaca data dari Kafka.

{
    "type": "job",
    "steps": [
        {
            "stepType": "kafka",
            "parameter": {
                "server": "host:9093",
                "column": [
                    "__key__",
                    "__value__",
                    "__partition__",
                    "__offset__",
                    "__timestamp__",
                    "'123'",
                    "event_id",
                    "tag.desc"
                ],
                "kafkaConfig": {
                    "group.id": "demo_test"
                },
                "topic": "topicName",
                "keyType": "ByteArray",
                "valueType": "ByteArray",
                "beginDateTime": "20190416000000",
                "endDateTime": "20190416000006",
                "skipExceedRecord": "true"
            },
            "name": "Reader",
            "category": "reader"
        },
        {
            "stepType": "stream",
            "parameter": {
                "print": false,
                "fieldDelimiter": ","
            },
            "name": "Writer",
            "category": "writer"
        }
    ],
    "version": "2.0",
    "order": {
        "hops": [
            {
                "from": "Reader",
                "to": "Writer"
            }
        ]
    },
    "setting": {
        "errorLimit": {
            "record": "0"
        },
        "speed": {
            "throttle": true,//Ketika throttle adalah false, parameter mbps tidak berlaku, yang berarti laju tidak dibatasi. Ketika throttle adalah true, laju dibatasi.
            "concurrent": 1,//Jumlah thread konkuren.
            "mbps":"12"//Laju transmisi maksimum. 1 mbps sama dengan 1 MB/detik.
        }
    }
}

Parameter skrip Reader

Parameter

Deskripsi

Wajib

datasource

Nama sumber data. Editor kode mendukung penambahan sumber data. Nilai parameter ini harus sama dengan nama sumber data yang ditambahkan.

Ya

server

Alamat server broker Kafka dalam format ip:port.

Anda hanya dapat mengonfigurasi satu server, tetapi Anda harus memastikan bahwa DataWorks dapat terhubung ke alamat IP semua broker di kluster Kafka.

Ya

topic

Topik Kafka. Topik adalah agregasi feed pesan yang diproses Kafka.

Ya

column

Data Kafka yang akan dibaca. Kolom konstan, kolom data, dan kolom atribut didukung.

  • Kolom konstan: Kolom yang diapit tanda kutip tunggal, seperti ["'abc'", "'123'"].

  • Kolom data

    • Jika data Anda dalam format JSON, Anda dapat mendapatkan properti objek JSON, seperti ["event_id"].

    • Jika data Anda dalam format JSON, Anda dapat mendapatkan sub-properti bersarang objek JSON, seperti ["tag.desc"].

  • Kolom atribut

    • __key__: kunci pesan.

    • __value__: konten lengkap pesan.

    • __partition__: partisi tempat pesan saat ini berada.

    • __headers__: header pesan saat ini.

    • __offset__: offset pesan saat ini.

    • __timestamp__: timestamp pesan saat ini.

    Kode berikut memberikan contoh lengkap.

    "column": [
        "__key__",
        "__value__",
        "__partition__",
        "__offset__",
        "__timestamp__",
        "'123'",
        "event_id",
        "tag.desc"
        ]

Ya

keyType

Tipe kunci Kafka. Nilai yang valid: BYTEARRAY, DOUBLE, FLOAT, INTEGER, LONG, dan SHORT.

Ya

valueType

Tipe nilai Kafka. Nilai yang valid: BYTEARRAY, DOUBLE, FLOAT, INTEGER, LONG, dan SHORT.

Ya

beginDateTime

Offset awal untuk konsumsi data. Ini adalah batas kiri rentang waktu (inklusif). Ini adalah string waktu dalam format yyyymmddhhmmss. Anda dapat menggunakannya dengan Scheduling Parameters. Untuk informasi lebih lanjut, lihat Format yang didukung untuk parameter penjadwalan.

Catatan

Fitur ini didukung di Kafka 0.10.2 dan yang lebih baru.

Anda harus menentukan salah satu parameter ini atau beginOffset.

Catatan

beginDateTime dan endDateTime digunakan bersama.

endDateTime

Offset akhir untuk konsumsi data. Ini adalah batas kanan rentang waktu (eksklusif). Ini adalah string waktu dalam format yyyymmddhhmmss. Anda dapat menggunakannya dengan Scheduling Parameters. Untuk informasi lebih lanjut, lihat Format yang didukung untuk parameter penjadwalan.

Catatan

Fitur ini didukung di Kafka 0.10.2 dan yang lebih baru.

Anda harus menentukan salah satu parameter ini atau endOffset.

Catatan

endDateTime dan beginDateTime digunakan bersama.

beginOffset

Offset awal untuk konsumsi data. Anda dapat mengonfigurasinya dalam bentuk berikut:

  • Bilangan, seperti 15553274, yang menunjukkan offset awal untuk konsumsi.

  • seekToBeginning: menunjukkan bahwa data dikonsumsi dari offset paling awal.

  • seekToLast: menunjukkan bahwa data dibaca dari offset yang disimpan untuk ID grup yang ditentukan oleh group.id dalam konfigurasi kafkaConfig. Perhatikan bahwa offset grup secara otomatis dikomit ke server Kafka oleh klien pada interval reguler. Oleh karena itu, jika tugas gagal dan dijalankan ulang, data mungkin diduplikasi atau hilang. Jika parameter skipExceedRecord diatur ke true, tugas mungkin membuang beberapa catatan terakhir yang dibaca. Offset grup untuk data yang dibuang ini telah dikomit ke server, sehingga data ini tidak dapat dibaca dalam eksekusi tugas berikutnya.

  • seekToEnd: menunjukkan bahwa data dikonsumsi dari offset terbaru. Ini akan membaca data kosong.

Anda harus menentukan salah satu parameter ini atau beginDateTime.

endOffset

Offset akhir untuk konsumsi data. Ini digunakan untuk mengontrol kapan tugas konsumsi data berhenti.

Anda harus menentukan salah satu parameter ini atau endDateTime.

skipExceedRecord

Kafka menggunakan public ConsumerRecords<K, V> poll(final Duration timeout) untuk mengonsumsi data. Satu panggilan poll tunggal mungkin mengambil data yang berada di luar rentang endOffset atau endDateTime. skipExceedRecord mengontrol apakah data berlebih ditulis ke tujuan. Karena konsumsi data menggunakan komitmen offset otomatis, kami merekomendasikan hal berikut:

  • Untuk versi Kafka sebelum 0.10.2: Atur skipExceedRecord ke false.

  • Untuk Kafka 0.10.2 dan yang lebih baru: Atur skipExceedRecord ke true.

Tidak. Nilai default adalah false.

partition

Topik Kafka memiliki beberapa partisi (partition). Secara default, tugas sinkronisasi data membaca data dari rentang offset yang mencakup semua partisi dalam topik. Anda juga dapat menentukan partition untuk membaca data hanya dari rentang offset satu partisi saja.

Tidak. Tidak ada nilai default.

kafkaConfig

Saat membuat klien KafkaConsumer untuk konsumsi data, Anda dapat menentukan parameter tambahan seperti bootstrap.servers, auto.commit.interval.ms, dan session.timeout.ms. Anda dapat menggunakan kafkaConfig untuk mengontrol perilaku konsumsi KafkaConsumer.

Tidak

encoding

Saat keyType atau valueType diatur ke STRING, encoding yang ditentukan oleh parameter ini digunakan untuk mengurai string.

Tidak. Nilai default adalah UTF-8.

waitTIme

Waktu maksimum, dalam detik, yang ditunggu objek konsumen untuk menarik data dari Kafka dalam satu upaya.

Tidak. Nilai default adalah 60.

stopWhenPollEmpty

Nilai yang valid adalah true dan false. Jika parameter ini diatur ke true dan konsumen menarik data kosong dari Kafka (biasanya karena semua data di topik telah dibaca, atau karena masalah jaringan atau ketersediaan kluster Kafka), tugas berhenti segera. Jika tidak, tugas mencoba lagi hingga data berhasil dibaca.

Tidak. Nilai default adalah true.

stopWhenReachEndOffset

Parameter ini hanya berlaku ketika stopWhenPollEmpty adalah true. Nilai yang valid adalah true dan false.

  • Jika parameter ini diatur ke true dan konsumen menarik data kosong dari Kafka, sistem memeriksa apakah data terbaru di partisi topik Kafka telah dibaca. Jika data terbaru dari semua partisi telah dibaca, tugas berhenti segera. Jika tidak, sistem terus mencoba menarik data dari topik Kafka.

  • Jika parameter ini diatur ke false dan konsumen menarik data kosong dari Kafka, sistem tidak melakukan pemeriksaan dan langsung menghentikan tugas.

Tidak. Nilai default adalah false.

Catatan

Ini untuk kompatibilitas dengan logika historis. Versi Kafka sebelum V0.10.2 tidak dapat memeriksa apakah data terbaru dari semua partisi topik Kafka telah dibaca. Namun, beberapa tugas editor kode mungkin membaca data dari versi Kafka sebelum V0.10.2.

Tabel berikut menjelaskan parameter kafkaConfig.

Parameter

Deskripsi

fetch.min.bytes

Menentukan jumlah minimum byte pesan yang dapat diperoleh konsumen dari broker. Data dikembalikan ke konsumen hanya ketika tersedia data yang cukup.

fetch.max.wait.ms

Waktu maksimum menunggu broker mengembalikan data. Nilai default adalah 500 milidetik. Data dikembalikan berdasarkan kondisi mana yang terpenuhi lebih dulu: fetch.min.bytes atau fetch.max.wait.ms.

max.partition.fetch.bytes

Menentukan jumlah maksimum byte yang dapat dikembalikan broker ke konsumen dari setiap partition. Nilai default adalah 1 MB.

session.timeout.ms

Menentukan waktu maksimum konsumen dapat terputus dari server sebelum berhenti menerima layanan. Nilai default adalah 30 detik.

auto.offset.reset

Tindakan yang diambil konsumen saat membaca tanpa offset atau offset tidak valid (karena konsumen tidak aktif dalam waktu lama dan catatan dengan offset tersebut telah kedaluwarsa dan dihapus). Nilai default adalah none, yang berarti offset tidak diatur ulang secara otomatis. Anda dapat mengubahnya menjadi earliest, yang berarti konsumen membaca catatan partition dari offset paling awal.

max.poll.records

Jumlah pesan yang dapat dikembalikan oleh satu panggilan metode poll.

key.deserializer

Metode deserialisasi untuk kunci pesan, seperti org.apache.kafka.common.serialization.StringDeserializer.

value.deserializer

Metode deserialisasi untuk nilai data, seperti org.apache.kafka.common.serialization.StringDeserializer.

ssl.truststore.location

Path sertifikat root SSL.

ssl.truststore.password

Kata sandi untuk penyimpanan sertifikat root. Jika Anda menggunakan Kafka Alibaba Cloud, atur ini ke KafkaOnsClient.

security.protocol

Protokol akses. Saat ini, hanya protokol SASL_SSL yang didukung.

sasl.mechanism

Metode autentikasi SASL. Jika Anda menggunakan Kafka Alibaba Cloud, gunakan PLAIN.

java.security.auth.login.config

Path file autentikasi SASL.

Demo skrip Writer

Kode berikut menunjukkan konfigurasi JSON untuk menulis data ke Kafka.

{
  "type":"job",
  "version":"2.0",//Nomor versi.
  "steps":[
    {
      "stepType":"stream",
      "parameter":{},
      "name":"Reader",
      "category":"reader"
    },
    {
      "stepType":"Kafka",//Nama plugin.
      "parameter":{
          "server": "ip:9092", //Alamat server Kafka.
          "keyIndex": 0, //Kolom yang akan digunakan sebagai kunci. Harus mengikuti konvensi penamaan camel case, dengan k dalam huruf kecil.
          "valueIndex": 1, //Kolom yang akan digunakan sebagai nilai. Saat ini, Anda hanya dapat memilih satu kolom dari data sumber atau membiarkan parameter ini kosong. Jika kosong, semua data sumber digunakan.
          //Misalnya, untuk menggunakan kolom ke-2, ke-3, dan ke-4 dari tabel ODPS sebagai kafkaValue, buat tabel ODPS baru, bersihkan dan integrasikan data dari tabel ODPS asli ke tabel baru, lalu gunakan tabel baru untuk sinkronisasi.
          "keyType": "Integer", //Tipe kunci Kafka.
          "valueType": "Short", //Tipe nilai Kafka.
          "topic": "t08", //Topik Kafka.
          "batchSize": 1024 //Jumlah data yang ditulis ke Kafka sekaligus, dalam byte.
        },
      "name":"Writer",
      "category":"writer"
    }
  ],
  "setting":{
      "errorLimit":{
      "record":"0"//Jumlah catatan kesalahan.
    },
    "speed":{
        "throttle":true,//Ketika throttle adalah false, parameter mbps tidak berlaku, yang berarti laju tidak dibatasi. Ketika throttle adalah true, laju dibatasi.
        "concurrent":1, //Jumlah tugas konkuren.
        "mbps":"12"//Laju transmisi maksimum. 1 mbps sama dengan 1 MB/detik.
    }
   },
    "order":{
    "hops":[
        {
            "from":"Reader",
            "to":"Writer"
        }
      ]
    }
}

Parameter skrip Writer

Parameter

Deskripsi

Wajib

datasource

Nama sumber data. Editor kode mendukung penambahan sumber data. Nilai parameter ini harus sama dengan nama sumber data yang ditambahkan.

Ya

server

Alamat server Kafka dalam format ip:port.

Ya

topic

Topik Kafka. Ini adalah kategori untuk feed pesan berbeda yang diproses Kafka.

Setiap pesan yang dipublikasikan ke kluster Kafka memiliki kategori, yang disebut topik. Topik adalah kumpulan sekelompok pesan.

Ya

valueIndex

Kolom di writer Kafka yang digunakan sebagai nilai. Jika tidak ditentukan, semua kolom digabungkan untuk membentuk nilai secara default. Pemisah ditentukan oleh fieldDelimiter.

Tidak

writeMode

Saat valueIndex tidak dikonfigurasi, parameter ini menentukan format penggabungan semua kolom catatan sumber untuk membentuk nilai catatan Kafka. Nilai yang valid adalah text dan JSON. Nilai default adalah text.

  • Jika diatur ke text, semua kolom digabungkan menggunakan pemisah yang ditentukan oleh fieldDelimiter.

  • Jika diatur ke JSON, semua kolom digabungkan menjadi string JSON berdasarkan nama bidang yang ditentukan oleh parameter column.

Misalnya, jika catatan sumber memiliki tiga kolom dengan nilai a, b, dan c, dan writeMode diatur ke text serta fieldDelimiter diatur ke #, nilai catatan Kafka yang ditulis adalah string a#b#c. Jika writeMode diatur ke JSON dan column diatur ke [{"name":"col1"},{"name":"col2"},{"name":"col3"}], nilai catatan Kafka yang ditulis adalah string {"col1":"a","col2":"b","col3":"c"}.

Jika valueIndex dikonfigurasi, parameter ini tidak berlaku.

Tidak

column

Bidang di tabel tujuan tempat data akan ditulis, dipisahkan koma. Contoh: "column": ["id", "name", "age"].

Saat valueIndex tidak dikonfigurasi dan writeMode diatur ke JSON, parameter ini mendefinisikan nama bidang dalam struktur JSON untuk nilai kolom catatan sumber. Contoh, "column": [{"name":"id","type":"JSON_NUMBER"}, {"name":"name","type":"JSON_STRING"}, {"name":"age","type":"JSON_NUMBER"}].

  • Jika jumlah kolom dalam catatan sumber lebih besar daripada jumlah nama bidang yang dikonfigurasi dalam column, data dipotong saat penulisan. Contoh:

    Jika catatan sumber memiliki tiga kolom dengan nilai a, b, dan c, dan column dikonfigurasi sebagai [{"name":"col1","type":"JSON_STRING"},{"name":"col2","type":"JSON_STRING"}], nilai catatan Kafka yang ditulis adalah string {"col1":"a","col2":"b"}.

  • Jika jumlah kolom dalam catatan sumber lebih sedikit daripada jumlah nama bidang yang dikonfigurasi dalam column, nama bidang tambahan dalam konfigurasi column diisi dengan null atau string yang ditentukan oleh nullValueFormat. Contoh:

    Jika catatan sumber memiliki dua kolom dengan nilai a dan b, dan column dikonfigurasi sebagai [{"name":"col1","type":"JSON_STRING"},{"name":"col2","type":"JSON_STRING"},{"name":"col3","type":"JSON_STRING"}], nilai catatan Kafka yang ditulis adalah string {"col1":"a","col2":"b","col3":null}. Jika valueIndex dikonfigurasi, atau jika writeMode diatur ke text, parameter ini tidak berlaku.

  • Jika tipe bidang JSON tidak dikonfigurasi, tipe bidang default adalah JSON_STRING.

  • Untuk nilai yang valid dari tipe bidang JSON, lihat Lampiran: Tipe bidang JSON.

Jika valueIndex dikonfigurasi, atau jika writeMode diatur ke text, parameter ini tidak berlaku.

Wajib saat valueIndex tidak dikonfigurasi dan writeMode diatur ke JSON.

partition

Menentukan nomor partisi dalam topik Kafka tempat data ditulis. Ini harus berupa bilangan bulat yang lebih besar dari atau sama dengan 0.

Tidak

keyIndex

Kolom di writer Kafka yang digunakan sebagai kunci.

Nilai parameter keyIndex harus berupa bilangan bulat yang lebih besar dari atau sama dengan 0. Jika tidak, tugas akan gagal.

Tidak

keyIndexes

Array nomor urut kolom dalam catatan sumber yang digunakan sebagai kunci untuk catatan Kafka.

Nomor urut kolom dimulai dari 0. Misalnya, [0,1,2] akan menggabungkan nilai semua nomor kolom yang dikonfigurasi dengan koma untuk membentuk kunci catatan Kafka. Jika tidak ditentukan, kunci catatan Kafka adalah null, dan data ditulis ke partisi topik secara round-robin. Anda hanya dapat menentukan salah satu parameter ini atau keyIndex.

Tidak

fieldDelimiter

Saat writeMode diatur ke text dan valueIndex tidak dikonfigurasi, semua kolom catatan sumber digabungkan menggunakan pemisah kolom yang ditentukan oleh parameter ini untuk membentuk nilai catatan Kafka. Anda dapat mengonfigurasi satu karakter atau beberapa karakter sebagai pemisah. Anda dapat mengonfigurasi karakter Unicode dalam format \u0001. Karakter escape seperti \t dan \n didukung. Nilai default adalah \t.

Jika writeMode tidak diatur ke text atau jika valueIndex dikonfigurasi, parameter ini tidak berlaku.

Tidak

keyType

Tipe kunci Kafka. Nilai yang valid: BYTEARRAY, DOUBLE, FLOAT, INTEGER, LONG, dan SHORT.

Ya

valueType

Tipe nilai Kafka. Nilai yang valid: BYTEARRAY, DOUBLE, FLOAT, INTEGER, LONG, dan SHORT.

Ya

nullKeyFormat

Jika nilai kolom sumber yang ditentukan oleh keyIndex atau keyIndexes adalah null, nilai tersebut diganti dengan string yang ditentukan oleh parameter ini. Jika tidak dikonfigurasi, tidak ada penggantian yang dilakukan.

Tidak

nullValueFormat

Jika nilai kolom sumber adalah null, nilai tersebut diganti dengan string yang ditentukan oleh parameter ini saat merakit nilai catatan Kafka. Jika tidak dikonfigurasi, tidak ada penggantian yang dilakukan.

Tidak

acks

Konfigurasi acks saat menginisialisasi produsen Kafka. Ini menentukan metode pengakuan untuk penulisan yang berhasil. Secara default, parameter acks diatur ke all. Nilai yang valid untuk acks adalah:

  • 0: Tidak ada pengakuan untuk penulisan yang berhasil.

  • 1: Pengakuan untuk penulisan yang berhasil ke replika primer.

  • all: Pengakuan untuk penulisan yang berhasil ke semua replika.

Tidak

Lampiran: Definisi format pesan untuk penulisan ke Kafka

Setelah Anda mengonfigurasi dan menjalankan tugas sinkronisasi real-time, data yang dibaca dari database sumber ditulis ke topik Kafka dalam format JSON. Pertama, semua data yang ada di tabel sumber yang ditentukan ditulis ke topik Kafka yang sesuai. Kemudian, tugas memulai sinkronisasi real-time untuk terus menulis data inkremental ke topik tersebut. Informasi perubahan DDL inkremental dari tabel sumber juga ditulis ke topik Kafka dalam format JSON. Anda dapat memperoleh status dan informasi perubahan pesan yang ditulis ke Kafka. Untuk informasi lebih lanjut, lihat Lampiran: Format pesan.

Catatan

Dalam struktur JSON data yang ditulis ke Kafka oleh tugas sinkronisasi offline, bidang payload.sequenceId, payload.timestamp.eventTime dan payload.timestamp.checkpointTime diatur ke -1.

Lampiran: Tipe bidang JSON

Saat writeMode diatur ke JSON, Anda dapat menggunakan bidang type dalam parameter column untuk menentukan tipe data JSON. Selama operasi tulis, sistem mencoba mengonversi nilai kolom dalam catatan sumber ke tipe yang ditentukan. Jika konversi gagal, data kotor dihasilkan.

Nilai yang valid

Deskripsi

JSON_STRING

Mengonversi nilai kolom catatan sumber menjadi string dan menulisnya ke bidang JSON. Misalnya, jika nilai kolom catatan sumber adalah bilangan bulat 123 dan column dikonfigurasi sebagai [{"name":"col1","type":"JSON_STRING"}, nilai catatan Kafka yang ditulis adalah string {"col1":"123"}.

JSON_NUMBER

Mengonversi nilai kolom catatan sumber menjadi angka dan menulisnya ke bidang JSON. Misalnya, jika nilai kolom catatan sumber adalah string 1.23 dan column dikonfigurasi sebagai [{"name":"col1","type":"JSON_NUMBER"}, nilai catatan Kafka yang ditulis adalah string {"col1":1.23}.

JSON_BOOL

Mengonversi nilai kolom catatan sumber menjadi nilai Boolean dan menulisnya ke bidang JSON. Misalnya, jika nilai kolom catatan sumber adalah string true dan column dikonfigurasi sebagai [{"name":"col1","type":"JSON_BOOL"}, nilai catatan Kafka yang ditulis adalah string {"col1":true}

JSON_ARRAY

Mengonversi nilai kolom catatan sumber menjadi array JSON dan menulisnya ke bidang JSON. Misalnya, jika nilai kolom catatan sumber adalah string [1,2,3] dan column dikonfigurasi sebagai [{"name":"col1","type":"JSON_ARRAY"}, nilai catatan Kafka yang ditulis adalah string {"col1":[1,2,3]}.

JSON_MAP

Mengonversi nilai kolom catatan sumber menjadi objek JSON dan menulisnya ke bidang JSON. Misalnya, jika nilai kolom catatan sumber adalah string {"k1":"v1"} dan column dikonfigurasi sebagai [{"name":"col1","type":"JSON_MAP"}, nilai catatan Kafka yang ditulis adalah string {"col1":{"k1":"v1"}}.

JSON_BASE64

Mengonversi konten byte biner dari nilai kolom catatan sumber menjadi string yang dikodekan BASE64 dan menulisnya ke bidang JSON. Misalnya, jika nilai kolom catatan sumber adalah array byte dengan panjang 2, direpresentasikan dalam heksadesimal sebagai 0x01 0x02, dan column dikonfigurasi sebagai [{"name":"col1","type":"JSON_BASE64"}, nilai catatan Kafka yang ditulis adalah string {"col1":"AQI="}.

JSON_HEX

Mengonversi konten byte biner dari nilai kolom catatan sumber menjadi string numerik heksadesimal dan menulisnya ke bidang JSON. Misalnya, jika nilai kolom catatan sumber adalah array byte dengan panjang 2, direpresentasikan dalam heksadesimal sebagai 0x01 0x02, dan column dikonfigurasi sebagai [{"name":"col1","type":"JSON_HEX"}, nilai catatan Kafka yang ditulis adalah string {"col1":"0102"}.