全部产品
Search
文档中心

DataWorks:Sumber data Kafka

更新时间:Jan 10, 2026

Sumber data Kafka menyediakan saluran dua arah untuk membaca dan menulis data ke Kafka. Topik ini menjelaskan kemampuan sinkronisasi data yang disediakan oleh DataWorks untuk Kafka.

Versi yang didukung

DataWorks mendukung Alibaba Cloud Kafka dan versi Kafka self-managed mulai dari 0.10.2 hingga 3.6.x.

Catatan

Sinkronisasi data tidak didukung untuk versi Kafka sebelum 0.10.2 karena versi tersebut tidak mendukung pengambilan offset partisi dan struktur datanya mungkin tidak kompatibel dengan timestamp.

Baca real-time

  • Jika Anda menggunakan Serverless resource group berlangganan, perkirakan spesifikasi yang diperlukan terlebih dahulu untuk mencegah kegagalan tugas akibat sumber daya tidak mencukupi.

    Perkirakan 1 CU per topik. Selain itu, perkirakan sumber daya berdasarkan trafik sebagai berikut:

    • Untuk data Kafka yang tidak dikompresi, perkirakan 1 CU untuk setiap 10 MB/s trafik.

    • Untuk data Kafka yang dikompresi, perkirakan 2 CU untuk setiap 10 MB/s trafik.

    • Untuk data Kafka yang dikompresi dan memerlukan parsing JSON, perkirakan 3 CU untuk setiap 10 MB/s trafik.

  • Saat menggunakan Serverless resource group berlangganan atau versi lama Grup sumber daya eksklusif untuk Integrasi Data:

    • Jika workload Anda memiliki toleransi tinggi terhadap failover, penggunaan slot kluster tidak boleh melebihi 80%.

    • Jika workload Anda memiliki toleransi rendah terhadap failover, penggunaan slot kluster tidak boleh melebihi 70%.

Catatan

Penggunaan sumber daya aktual bergantung pada faktor-faktor seperti konten dan format data. Anda dapat menyesuaikan alokasi sumber daya berdasarkan kondisi waktu proses aktual setelah estimasi awal.

Batasan

Sumber data Kafka mendukung Serverless resource groups (direkomendasikan) dan versi lama Grup sumber daya eksklusif untuk Integrasi Data.

Baca offline dari satu tabel

Jika kedua parameter parameter.groupId dan parameter.kafkaConfig.group.id dikonfigurasi, parameter.groupId memiliki prioritas lebih tinggi daripada group.id dalam parameter kafkaConfig.

Tulis real-time ke satu tabel

Deduplikasi data tidak didukung untuk operasi tulis. Jika tugas dijalankan ulang setelah reset offset atau failover, data duplikat mungkin ditulis.

Tulis real-time untuk seluruh database

  • Tugas sinkronisasi data real-time mendukung Serverless resource groups (direkomendasikan) dan versi lama Grup sumber daya eksklusif untuk Integrasi Data.

  • Jika tabel sumber memiliki primary key, nilai primary key digunakan sebagai kunci untuk record Kafka. Hal ini memastikan bahwa perubahan pada primary key yang sama ditulis ke partisi Kafka yang sama secara berurutan.

  • Jika tabel sumber tidak memiliki primary key, Anda memiliki dua opsi. Jika Anda memilih opsi untuk menyinkronkan tabel tanpa primary key, kunci untuk record Kafka kosong. Untuk memastikan perubahan tabel ditulis ke Kafka secara berurutan, topik Kafka tujuan harus hanya memiliki satu partisi. Jika Anda memilih primary key kustom, kombinasi satu atau beberapa bidang non-primary key digunakan sebagai kunci untuk record Kafka.

  • Jika kluster Kafka mengembalikan exception dan Anda perlu memastikan bahwa perubahan pada primary key yang sama ditulis ke partisi Kafka yang sama secara berurutan, tambahkan konfigurasi berikut ke parameter ekstensi sumber data Kafka.

    {"max.in.flight.requests.per.connection":1,"buffer.memory": 100554432}

    Penting

    Konfigurasi ini secara signifikan menurunkan performa replikasi. Anda harus menyeimbangkan performa dengan kebutuhan akan pengurutan dan keandalan yang ketat.

  • Untuk informasi lebih lanjut tentang format keseluruhan pesan yang ditulis ke Kafka dalam sinkronisasi real-time, format pesan heartbeat, dan format pesan yang sesuai dengan perubahan data sumber, lihat Apendiks: Format pesan.

Tipe bidang yang didukung

Kafka menyediakan penyimpanan data tidak terstruktur. Record Kafka biasanya mencakup bidang data berikut: key, value, offset, timestamp, headers, dan partition. Saat DataWorks membaca atau menulis data ke Kafka, pemrosesan dilakukan berdasarkan kebijakan berikut.

Baca data

Saat DataWorks membaca data dari Kafka, data tersebut dapat diurai dalam format JSON. Tabel berikut menjelaskan cara setiap modul data diproses.

Modul data record Kafka

Tipe data yang diproses

key

Bergantung pada item konfigurasi keyType dalam tugas sinkronisasi data. Untuk informasi lebih lanjut tentang parameter keyType, lihat deskripsi parameter lengkap di apendiks.

value

Bergantung pada item konfigurasi valueType dalam tugas sinkronisasi data. Untuk informasi lebih lanjut tentang parameter valueType, lihat deskripsi parameter lengkap di apendiks.

offset

Long

timestamp

Long

headers

String

partition

Long

Tulis data

Saat DataWorks menulis data ke Kafka, data tersebut dapat ditulis dalam format JSON atau teks. Kebijakan pemrosesan data bervariasi berdasarkan jenis tugas sinkronisasi data. Tabel berikut menjelaskan detailnya.

Penting
  • Saat data ditulis dalam format teks, nama bidang tidak disertakan. Nilai bidang dipisahkan oleh pemisah.

  • Saat data ditulis ke Kafka dalam tugas sinkronisasi real-time, digunakan format JSON bawaan. Data yang ditulis mencakup semua informasi, seperti pesan perubahan database, waktu bisnis, dan informasi DDL. Untuk informasi lebih lanjut tentang format data, lihat Apendiks: Format pesan.

Jenis tugas sinkronisasi

Format value yang ditulis ke Kafka

Tipe bidang sumber

Metode pemrosesan untuk operasi tulis

Sinkronisasi offline

Node sinkronisasi offline di DataStudio

json

String

String yang dikodekan UTF-8

Boolean

Dikonversi menjadi string "true" atau "false" yang dikodekan UTF-8

Waktu/Tanggal

String yang dikodekan UTF-8 dalam format yyyy-MM-dd HH:mm:ss

Numerik

String numerik yang dikodekan UTF-8

Aliran byte

Aliran byte dianggap sebagai string yang dikodekan UTF-8 dan dikonversi menjadi string.

text

String

String yang dikodekan UTF-8

Boolean

Dikonversi menjadi string "true" atau "false" yang dikodekan UTF-8

Waktu/Tanggal

String yang dikodekan UTF-8 dalam format yyyy-MM-dd HH:mm:ss

Numerik

String numerik yang dikodekan UTF-8

Aliran byte

Aliran byte dianggap sebagai string yang dikodekan UTF-8 dan dikonversi menjadi string.

Sinkronisasi real-time: ETL real-time ke Kafka

Node sinkronisasi real-time di DataStudio

json

String

String yang dikodekan UTF-8

Boolean

Tipe Boolean JSON

Waktu/Tanggal

  • Untuk nilai waktu dengan presisi kurang dari milidetik: Dikonversi menjadi bilangan bulat JSON 13 digit yang merepresentasikan timestamp dalam milidetik.

  • Untuk nilai waktu dengan presisi mikrodetik atau nanodetik: Dikonversi menjadi bilangan titik mengambang JSON yang mencakup bilangan bulat 13 digit untuk timestamp milidetik dan desimal 6 digit untuk timestamp nanodetik.

Numerik

Tipe numerik JSON

Aliran byte

Aliran byte dikodekan Base64 lalu dikonversi menjadi string yang dikodekan UTF-8.

text

String

String yang dikodekan UTF-8

Boolean

Dikonversi menjadi string "true" atau "false" yang dikodekan UTF-8

Waktu/Tanggal

String yang dikodekan UTF-8 dalam format yyyy-MM-dd HH:mm:ss

Numerik

String numerik yang dikodekan UTF-8

Aliran byte

Aliran byte dikodekan Base64 lalu dikonversi menjadi string yang dikodekan UTF-8.

Sinkronisasi real-time: Sinkronisasi real-time seluruh database ke Kafka

Hanya sinkronisasi data inkremental

Format JSON bawaan

String

String yang dikodekan UTF-8

Boolean

Tipe Boolean JSON

Waktu/Tanggal

Timestamp milidetik 13 digit

Numerik

Nilai numerik JSON

Aliran byte

Aliran byte dikodekan Base64 lalu dikonversi menjadi string yang dikodekan UTF-8.

Solusi sinkronisasi: Sinkronisasi real-time sekali klik ke Kafka

Sinkronisasi offline penuh + sinkronisasi real-time inkremental

Format JSON bawaan

String

String yang dikodekan UTF-8

Boolean

Tipe Boolean JSON

Waktu/Tanggal

Timestamp milidetik 13 digit

Numerik

Nilai numerik JSON

Aliran byte

Aliran byte dikodekan Base64 lalu dikonversi menjadi string yang dikodekan UTF-8.

Tambahkan sumber data

Sebelum mengembangkan tugas sinkronisasi di DataWorks, Anda harus menambahkan sumber data yang diperlukan ke DataWorks dengan mengikuti petunjuk di Manajemen Sumber Data. Anda dapat melihat deskripsi parameter di Konsol DataWorks untuk memahami arti parameter saat menambahkan sumber data.

Kembangkan tugas sinkronisasi data

Untuk informasi tentang titik masuk dan prosedur konfigurasi tugas sinkronisasi, lihat panduan konfigurasi berikut.

Konfigurasikan tugas sinkronisasi offline untuk satu tabel

Konfigurasikan tugas sinkronisasi real-time untuk satu tabel atau seluruh database

Untuk informasi lebih lanjut tentang prosedur, lihat Konfigurasikan tugas sinkronisasi real-time di DataStudio, Konfigurasikan tugas sinkronisasi real-time di Integrasi Data, dan Konfigurasikan tugas sinkronisasi untuk seluruh database.

Konfigurasi autentikasi

SSL

Jika Anda mengatur Special Authentication Method untuk sumber data Kafka ke SSL atau SASL_SSL, autentikasi SSL diaktifkan untuk kluster Kafka. Anda harus mengunggah file sertifikat truststore klien dan memasukkan passphrase truststore.

  • Jika kluster Kafka adalah instans Alibaba Cloud Kafka, lihat Petunjuk peningkatan algoritma sertifikat SSL untuk mengunduh file sertifikat truststore yang benar. Passphrase truststore adalah KafkaOnsClient.

  • Jika kluster Kafka adalah instans EMR, lihat Gunakan enkripsi SSL untuk koneksi Kafka untuk mengunduh file sertifikat truststore yang benar dan mendapatkan passphrase truststore.

  • Untuk kluster self-managed, Anda harus mengunggah sertifikat truststore yang benar dan memasukkan passphrase truststore yang sesuai.

File sertifikat keystore, passphrase keystore, dan passphrase SSL hanya diperlukan jika autentikasi SSL dua arah diaktifkan untuk kluster Kafka. Server kluster Kafka menggunakan informasi ini untuk mengautentikasi identitas klien. Autentikasi SSL dua arah diaktifkan ketika ssl.client.auth=required dikonfigurasi dalam file server.properties kluster Kafka. Untuk informasi lebih lanjut, lihat Gunakan enkripsi SSL untuk koneksi Kafka.

GSSAPI

Jika Anda mengatur Sasl Mechanism ke GSSAPI saat mengonfigurasi sumber data Kafka, Anda harus mengunggah tiga file autentikasi: file konfigurasi JAAS, file konfigurasi Kerberos, dan file Keytab. Anda juga harus mengonfigurasi pengaturan DNS/HOST untuk grup sumber daya eksklusif. Bagian berikut menjelaskan file-file ini serta pengaturan DNS dan HOST yang diperlukan.

Catatan

Untuk Serverless resource group, Anda harus mengonfigurasi informasi alamat host menggunakan resolusi DNS internal. Untuk informasi lebih lanjut, lihat Resolusi DNS internal (PrivateZone).

  • File konfigurasi JAAS

    File JAAS harus dimulai dengan KafkaClient, diikuti oleh semua item konfigurasi yang diapit sepasang tanda kurung kurawal {}:

    • Baris pertama di dalam tanda kurung kurawal mendefinisikan kelas komponen logon yang digunakan. Untuk mekanisme autentikasi SASL yang berbeda, kelas komponen logon bersifat tetap. Setiap item konfigurasi berikutnya ditulis dalam format key=value.

    • Semua item konfigurasi kecuali yang terakhir tidak boleh diakhiri dengan titik koma.

    • Item konfigurasi terakhir harus diakhiri dengan titik koma. Titik koma juga harus ditambahkan setelah tanda kurung kurawal penutup }.

    Jika persyaratan format tidak dipenuhi, file konfigurasi JAAS tidak dapat diurai. Kode berikut menunjukkan format file konfigurasi JAAS yang khas. Ganti placeholder xxx dengan informasi aktual Anda.

    KafkaClient {
       com.sun.security.auth.module.Krb5LoginModule required
       useKeyTab=true
       keyTab="xxx"
       storeKey=true
       serviceName="kafka-server"
       principal="kafka-client@EXAMPLE.COM";
    };

    Item konfigurasi

    Deskripsi

    Modul logon

    Anda harus mengonfigurasi com.sun.security.auth.module.Krb5LoginModule.

    useKeyTab

    Harus diatur ke true.

    keyTab

    Anda dapat menentukan path apa pun. Saat tugas sinkronisasi dijalankan, file keytab yang diunggah selama konfigurasi sumber data secara otomatis diunduh ke path lokal. Path file lokal ini kemudian digunakan untuk mengisi item konfigurasi keytab.

    storeKey

    Menentukan apakah klien menyimpan kunci. Anda dapat mengatur ini ke true atau false. Ini tidak memengaruhi sinkronisasi data.

    serviceName

    Sesuai dengan item konfigurasi sasl.kerberos.service.name dalam file konfigurasi server.properties server Kafka. Konfigurasikan item ini sesuai kebutuhan.

    principal

    Principal Kerberos yang digunakan oleh klien Kafka. Konfigurasikan item ini sesuai kebutuhan dan pastikan file keytab yang diunggah berisi kunci untuk principal ini.

  • File konfigurasi Kerberos

    File konfigurasi Kerberos harus berisi dua modul: [libdefaults] dan [realms].

    • Modul [libdefaults] menentukan parameter autentikasi Kerberos. Setiap item konfigurasi dalam modul ditulis dalam format key=value.

    • Modul [realms] menentukan alamat KDC. Modul ini dapat berisi beberapa submodul realm. Setiap submodul realm dimulai dengan nama realm=.

    Ini diikuti oleh serangkaian item konfigurasi yang diapit tanda kurung kurawal. Setiap item konfigurasi juga ditulis dalam format key=value. Kode berikut menunjukkan format file konfigurasi Kerberos yang khas. Ganti placeholder xxx dengan informasi aktual Anda.

    [libdefaults]
      default_realm = xxx
    
    [realms]
      xxx = {
        kdc = xxx
      }

    Item konfigurasi

    Deskripsi

    [libdefaults].default_realm

    Realm default yang digunakan saat mengakses node kluster Kafka. Biasanya sama dengan realm principal klien yang ditentukan dalam file konfigurasi JAAS.

    Parameter [libdefaults] lainnya

    Modul [libdefaults] dapat menentukan parameter autentikasi Kerberos lainnya, seperti ticket_lifetime. Konfigurasikan sesuai kebutuhan.

    [realms].realm name

    Harus sama dengan realm principal klien yang ditentukan dalam file konfigurasi JAAS dan [libdefaults].default_realm. Jika realm principal klien dalam file konfigurasi JAAS berbeda dari [libdefaults].default_realm, Anda perlu menyertakan dua submodul realms. Submodul ini harus sesuai dengan realm principal klien dalam file konfigurasi JAAS dan [libdefaults].default_realm, masing-masing.

    [realms].realm name.kdc

    Menentukan alamat dan port KDC dalam format ip:port. Misalnya, kdc=10.0.0.1:88. Jika port dihilangkan, port default 88 digunakan. Misalnya, kdc=10.0.0.1.

  • File Keytab

    File Keytab harus berisi kunci untuk principal yang ditentukan dalam file konfigurasi JAAS dan harus dapat diverifikasi oleh KDC. Misalnya, jika ada file bernama client.keytab di direktori kerja saat ini, Anda dapat menjalankan perintah berikut untuk memverifikasi apakah file Keytab berisi kunci untuk principal yang ditentukan.

    klist -ket ./client.keytab
    
    Keytab name: FILE:client.keytab
    KVNO Timestamp           Principal
    ---- ------------------- ------------------------------------------------------
       7 2018-07-30T10:19:16 te**@**.com (des-cbc-md5)
  • Konfigurasi DNS dan HOST untuk grup sumber daya eksklusif

    Dalam kluster Kafka dengan autentikasi Kerberos diaktifkan, hostname node dalam kluster digunakan sebagai bagian dari principal yang terdaftar untuk node tersebut di Key Distribution Center (KDC). Saat klien mengakses node dalam kluster Kafka, klien tersebut menginferensi principal node berdasarkan pengaturan DNS dan HOST lokal untuk mendapatkan kredensial akses untuk node tersebut dari KDC. Saat Anda menggunakan grup sumber daya eksklusif untuk mengakses kluster Kafka dengan autentikasi Kerberos diaktifkan, Anda harus mengonfigurasi pengaturan DNS dan HOST dengan benar agar kredensial akses untuk node kluster dapat diperoleh dari KDC:

    • Pengaturan DNS

      Jika instans PrivateZone digunakan untuk resolusi nama domain node kluster Kafka di VPC tempat grup sumber daya eksklusif dilampirkan, Anda dapat menambahkan rute kustom untuk alamat IP 100.100.2.136 dan 100.100.2.138 ke lampiran VPC grup sumber daya eksklusif di Konsol DataWorks. Hal ini memastikan bahwa pengaturan resolusi nama domain PrivateZone untuk node kluster Kafka berlaku untuk grup sumber daya eksklusif.

    • Pengaturan HOST

      Jika instans PrivateZone tidak digunakan untuk resolusi nama domain node kluster Kafka di VPC tempat grup sumber daya eksklusif dilampirkan, Anda harus menambahkan pemetaan alamat IP ke nama domain untuk setiap node kluster Kafka ke konfigurasi Host di pengaturan jaringan untuk grup sumber daya eksklusif di Konsol DataWorks.

PLAIN

Saat mengonfigurasi sumber data Kafka, jika Anda mengatur Sasl Mechanism ke PLAIN, file JAAS harus dimulai dengan KafkaClient, diikuti oleh semua item konfigurasi yang diapit sepasang tanda kurung kurawal {}.

  • Baris pertama di dalam tanda kurung kurawal mendefinisikan kelas komponen logon yang digunakan. Untuk mekanisme autentikasi SASL yang berbeda, kelas komponen logon bersifat tetap. Setiap item konfigurasi berikutnya ditulis dalam format key=value.

  • Semua item konfigurasi kecuali yang terakhir tidak boleh diakhiri dengan titik koma.

  • Item konfigurasi terakhir harus diakhiri dengan titik koma. Titik koma juga harus ditambahkan setelah tanda kurung kurawal penutup "}".

Jika persyaratan format tidak dipenuhi, file konfigurasi JAAS tidak dapat diurai. Kode berikut menunjukkan format file konfigurasi JAAS yang khas. Ganti placeholder xxx dengan informasi aktual Anda.

KafkaClient {
  org.apache.kafka.common.security.plain.PlainLoginModule required
  username="xxx"
  password="xxx";
};

Item konfigurasi

Deskripsi

Modul logon

Anda perlu mengonfigurasi org.apache.kafka.common.security.plain.PlainLoginModule.

username

Username. Konfigurasikan item ini sesuai kebutuhan.

password

Password. Konfigurasikan item ini sesuai kebutuhan.

FAQ

Apendiks: Demo skrip dan deskripsi parameter

Konfigurasikan tugas sinkronisasi batch menggunakan editor kode

Jika Anda ingin mengonfigurasi tugas sinkronisasi batch menggunakan editor kode, Anda harus mengonfigurasi parameter terkait dalam skrip sesuai dengan persyaratan format skrip terpadu. Untuk informasi lebih lanjut, lihat Konfigurasikan tugas di editor kode. Informasi berikut menjelaskan parameter yang harus Anda konfigurasi untuk sumber data saat mengonfigurasi tugas sinkronisasi batch menggunakan editor kode.

Demo skrip Reader

Kode berikut menunjukkan konfigurasi JSON untuk membaca data dari Kafka.

{
    "type": "job",
    "steps": [
        {
            "stepType": "kafka",
            "parameter": {
                "server": "host:9093",
                "column": [
                    "__key__",
                    "__value__",
                    "__partition__",
                    "__offset__",
                    "__timestamp__",
                    "'123'",
                    "event_id",
                    "tag.desc"
                ],
                "kafkaConfig": {
                    "group.id": "demo_test"
                },
                "topic": "topicName",
                "keyType": "ByteArray",
                "valueType": "ByteArray",
                "beginDateTime": "20190416000000",
                "endDateTime": "20190416000006",
                "skipExceedRecord": "true"
            },
            "name": "Reader",
            "category": "reader"
        },
        {
            "stepType": "stream",
            "parameter": {
                "print": false,
                "fieldDelimiter": ","
            },
            "name": "Writer",
            "category": "writer"
        }
    ],
    "version": "2.0",
    "order": {
        "hops": [
            {
                "from": "Reader",
                "to": "Writer"
            }
        ]
    },
    "setting": {
        "errorLimit": {
            "record": "0"
        },
        "speed": {
            "throttle": true,//Ketika throttle adalah false, parameter mbps tidak berlaku, yang berarti laju tidak dibatasi. Ketika throttle adalah true, laju dibatasi.
            "concurrent": 1,//Jumlah thread konkuren.
            "mbps":"12"//Laju transmisi maksimum. 1 mbps sama dengan 1 MB/s.
        }
    }
}

Parameter skrip Reader

Parameter

Deskripsi

Wajib

datasource

Nama sumber data. Editor kode mendukung penambahan sumber data. Nilai parameter ini harus sama dengan nama sumber data yang ditambahkan.

Ya

server

Alamat server broker Kafka dalam format ip:port.

Anda hanya dapat mengonfigurasi satu server, tetapi Anda harus memastikan bahwa DataWorks dapat terhubung ke alamat IP semua broker di kluster Kafka.

Ya

topic

Topik Kafka. Topik adalah agregasi feed pesan yang diproses Kafka.

Ya

column

Data Kafka yang akan dibaca. Kolom konstan, kolom data, dan kolom atribut didukung.

  • Kolom konstan: Kolom yang diapit tanda kutip tunggal, seperti ["'abc'", "'123'"].

  • Kolom data

    • Jika data Anda dalam format JSON, Anda dapat mendapatkan properti objek JSON, seperti ["event_id"].

    • Jika data Anda dalam format JSON, Anda dapat mendapatkan sub-properti bersarang objek JSON, seperti ["tag.desc"].

  • Kolom atribut

    • __key__: kunci pesan.

    • __value__: konten lengkap pesan.

    • __partition__: partisi tempat pesan saat ini berada.

    • __headers__: header pesan saat ini.

    • __offset__: offset pesan saat ini.

    • __timestamp__: timestamp pesan saat ini.

    Kode berikut memberikan contoh lengkap.

    "column": [
        "__key__",
        "__value__",
        "__partition__",
        "__offset__",
        "__timestamp__",
        "'123'",
        "event_id",
        "tag.desc"
        ]

Ya

keyType

Tipe kunci Kafka. Nilai yang valid: BYTEARRAY, DOUBLE, FLOAT, INTEGER, LONG, dan SHORT.

Tidak

valueType

Tipe nilai Kafka. Nilai yang valid: BYTEARRAY, DOUBLE, FLOAT, INTEGER, LONG, dan SHORT.

Tidak

beginDateTime

Offset awal untuk konsumsi data. Ini adalah batas kiri rentang waktu (inklusif). Ini adalah string waktu dalam format yyyymmddhhmmss. Anda dapat menggunakannya dengan Scheduling Parameters. Untuk informasi lebih lanjut, lihat Format yang didukung untuk parameter penjadwalan.

Catatan

Fitur ini didukung di Kafka 0.10.2 dan yang lebih baru.

Anda harus menentukan salah satu parameter ini atau beginOffset.

Catatan

beginDateTime dan endDateTime digunakan bersama.

endDateTime

Offset akhir untuk konsumsi data. Ini adalah batas kanan rentang waktu (eksklusif). Ini adalah string waktu dalam format yyyymmddhhmmss. Anda dapat menggunakannya dengan Scheduling Parameters. Untuk informasi lebih lanjut, lihat Format yang didukung untuk parameter penjadwalan.

Catatan

Fitur ini didukung di Kafka 0.10.2 dan yang lebih baru.

Anda harus menentukan salah satu parameter ini atau endOffset.

Catatan

endDateTime dan beginDateTime digunakan bersama.

beginOffset

Offset awal untuk konsumsi data. Anda dapat mengonfigurasinya dalam bentuk berikut:

  • Bilangan, seperti 15553274, yang menunjukkan offset awal untuk konsumsi.

  • seekToBeginning: menunjukkan bahwa data dikonsumsi dari offset paling awal.

  • seekToLast: menunjukkan bahwa data dibaca dari offset yang disimpan untuk ID grup yang ditentukan oleh group.id dalam konfigurasi kafkaConfig. Perhatikan bahwa offset grup secara otomatis dikomit ke server Kafka oleh klien secara berkala. Oleh karena itu, jika tugas gagal dan dijalankan ulang, data mungkin diduplikasi atau hilang. Jika parameter skipExceedRecord diatur ke true, tugas mungkin membuang beberapa record terakhir yang telah dibaca. Offset grup untuk data yang dibuang ini telah dikomit ke server, sehingga data ini tidak dapat dibaca dalam eksekusi tugas berikutnya.

  • seekToEnd: menunjukkan bahwa data dikonsumsi dari offset terbaru. Ini akan membaca data kosong.

Anda harus menentukan salah satu parameter ini atau beginDateTime.

endOffset

Offset akhir untuk konsumsi data. Ini digunakan untuk mengontrol kapan tugas konsumsi data berhenti.

Anda harus menentukan salah satu parameter ini atau endDateTime.

skipExceedRecord

Kafka menggunakan public ConsumerRecords<K, V> poll(final Duration timeout) untuk mengonsumsi data. Satu panggilan poll tunggal mungkin mengambil data yang berada di luar rentang endOffset atau endDateTime. skipExceedRecord mengontrol apakah data berlebih ditulis ke tujuan. Karena konsumsi data menggunakan komit offset otomatis, kami merekomendasikan hal berikut:

  • Untuk versi Kafka sebelum 0.10.2: Atur skipExceedRecord ke false.

  • Untuk Kafka 0.10.2 dan yang lebih baru: Atur skipExceedRecord ke true.

Tidak. Nilai default adalah false.

partition

Topik Kafka memiliki beberapa partisi (partition). Secara default, tugas sinkronisasi data membaca data dari rentang offset yang mencakup semua partisi dalam topik. Anda juga dapat menentukan partition untuk hanya membaca data dari rentang offset satu partisi tunggal.

Tidak. Tidak ada nilai default.

kafkaConfig

Saat membuat klien KafkaConsumer untuk konsumsi data, Anda dapat menentukan parameter ekstensi seperti bootstrap.servers, auto.commit.interval.ms, dan session.timeout.ms. Anda dapat menggunakan kafkaConfig untuk mengontrol perilaku konsumsi KafkaConsumer.

Tidak

encoding

Saat keyType atau valueType diatur ke STRING, encoding yang ditentukan oleh parameter ini digunakan untuk mengurai string.

Tidak. Nilai default adalah UTF-8.

waitTIme

Waktu maksimum, dalam detik, yang ditunggu objek konsumen untuk menarik data dari Kafka dalam satu percobaan.

Tidak. Nilai default adalah 60.

stopWhenPollEmpty

Nilai yang valid adalah true dan false. Jika parameter ini diatur ke true dan konsumen menarik data kosong dari Kafka (biasanya karena semua data dalam topik telah dibaca, atau karena masalah ketersediaan jaringan atau kluster Kafka), tugas berhenti segera. Jika tidak, tugas mencoba lagi hingga data berhasil dibaca.

Tidak. Nilai default adalah true.

stopWhenReachEndOffset

Parameter ini hanya berlaku ketika stopWhenPollEmpty adalah true. Nilai yang valid adalah true dan false.

  • Jika parameter ini diatur ke true dan konsumen menarik data kosong dari Kafka, sistem memeriksa apakah data terbaru dalam partisi topik Kafka telah dibaca. Jika data terbaru dari semua partisi telah dibaca, tugas berhenti segera. Jika tidak, sistem terus mencoba menarik data dari topik Kafka.

  • Jika parameter ini diatur ke false dan konsumen menarik data kosong dari Kafka, sistem tidak melakukan pemeriksaan dan langsung menghentikan tugas.

Tidak. Nilai default adalah false.

Catatan

Ini untuk kompatibilitas dengan logika historis. Versi Kafka sebelum V0.10.2 tidak dapat memeriksa apakah data terbaru dari semua partisi topik Kafka telah dibaca. Namun, beberapa tugas editor kode mungkin membaca data dari versi Kafka sebelum V0.10.2.

Tabel berikut menjelaskan parameter kafkaConfig.

Parameter

Deskripsi

fetch.min.bytes

Menentukan jumlah minimum byte pesan yang dapat diperoleh konsumen dari broker. Data dikembalikan ke konsumen hanya ketika ada cukup data.

fetch.max.wait.ms

Waktu maksimum menunggu broker mengembalikan data. Nilai default adalah 500 milidetik. Data dikembalikan berdasarkan kondisi mana yang terpenuhi lebih dulu: fetch.min.bytes atau fetch.max.wait.ms.

max.partition.fetch.bytes

Menentukan jumlah maksimum byte yang dapat dikembalikan broker ke konsumen dari setiap partition. Nilai default adalah 1 MB.

session.timeout.ms

Menentukan waktu yang dapat dilewati konsumen terputus dari server sebelum berhenti menerima layanan. Nilai default adalah 30 detik.

auto.offset.reset

Tindakan yang diambil konsumen saat membaca tanpa offset atau offset tidak valid (karena konsumen tidak aktif dalam waktu lama dan record dengan offset tersebut telah kedaluwarsa dan dihapus). Nilai default adalah none, yang berarti offset tidak direset secara otomatis. Anda dapat mengubahnya menjadi earliest, yang berarti konsumen membaca record partition dari offset paling awal.

max.poll.records

Jumlah pesan yang dapat dikembalikan oleh satu panggilan metode poll.

key.deserializer

Metode deserialisasi untuk kunci pesan, seperti org.apache.kafka.common.serialization.StringDeserializer.

value.deserializer

Metode deserialisasi untuk nilai data, seperti org.apache.kafka.common.serialization.StringDeserializer.

ssl.truststore.location

Path sertifikat root SSL.

ssl.truststore.password

Password untuk penyimpanan sertifikat root. Jika Anda menggunakan Alibaba Cloud Kafka, atur ini ke KafkaOnsClient.

security.protocol

Protokol akses. Saat ini, hanya protokol SASL_SSL yang didukung.

sasl.mechanism

Metode autentikasi SASL. Jika Anda menggunakan Alibaba Cloud Kafka, gunakan PLAIN.

java.security.auth.login.config

Path file autentikasi SASL.

Demo skrip Writer

Kode berikut menunjukkan konfigurasi JSON untuk menulis data ke Kafka.

{
  "type":"job",
  "version":"2.0",//Nomor versi.
  "steps":[
    {
      "stepType":"stream",
      "parameter":{},
      "name":"Reader",
      "category":"reader"
    },
    {
      "stepType":"Kafka",//Nama plugin.
      "parameter":{
          "server": "ip:9092", //Alamat server Kafka.
          "keyIndex": 0, //Kolom yang akan digunakan sebagai kunci. Harus mengikuti konvensi penamaan camel case, dengan k huruf kecil.
          "valueIndex": 1, //Kolom yang akan digunakan sebagai nilai. Saat ini, Anda hanya dapat memilih satu kolom dari data sumber atau membiarkan parameter ini kosong. Jika kosong, semua data sumber digunakan.
          //Misalnya, untuk menggunakan kolom ke-2, ke-3, dan ke-4 dari tabel ODPS sebagai kafkaValue, buat tabel ODPS baru, bersihkan dan integrasikan data dari tabel ODPS asli ke tabel baru, lalu gunakan tabel baru untuk sinkronisasi.
          "keyType": "Integer", //Tipe kunci Kafka.
          "valueType": "Short", //Tipe nilai Kafka.
          "topic": "t08", //Topik Kafka.
          "batchSize": 1024 //Jumlah data yang ditulis ke Kafka sekaligus, dalam byte.
        },
      "name":"Writer",
      "category":"writer"
    }
  ],
  "setting":{
      "errorLimit":{
      "record":"0"//Jumlah record error.
    },
    "speed":{
        "throttle":true,//Ketika throttle adalah false, parameter mbps tidak berlaku, yang berarti laju tidak dibatasi. Ketika throttle adalah true, laju dibatasi.
        "concurrent":1, //Jumlah job konkuren.
        "mbps":"12"//Laju transmisi maksimum. 1 mbps sama dengan 1 MB/s.
    }
   },
    "order":{
    "hops":[
        {
            "from":"Reader",
            "to":"Writer"
        }
      ]
    }
}

Parameter skrip Writer

Parameter

Deskripsi

Wajib

datasource

Nama sumber data. Editor kode mendukung penambahan sumber data. Nilai parameter ini harus sama dengan nama sumber data yang ditambahkan.

Ya

server

Alamat server Kafka dalam format ip:port.

Ya

topic

Topik Kafka. Ini adalah kategori untuk feed pesan berbeda yang diproses Kafka.

Setiap pesan yang dipublikasikan ke kluster Kafka memiliki kategori, yang disebut topik. Topik adalah kumpulan sekelompok pesan.

Ya

valueIndex

Kolom di writer Kafka yang digunakan sebagai nilai. Jika tidak ditentukan, semua kolom digabungkan secara default untuk membentuk nilai. Pemisah ditentukan oleh fieldDelimiter.

Tidak

writeMode

Saat valueIndex tidak dikonfigurasi, parameter ini menentukan format penggabungan semua kolom record sumber untuk membentuk nilai record Kafka. Nilai yang valid adalah text dan JSON. Nilai default adalah text.

  • Jika diatur ke text, semua kolom digabungkan menggunakan pemisah yang ditentukan oleh fieldDelimiter.

  • Jika diatur ke JSON, semua kolom digabungkan menjadi string JSON berdasarkan nama bidang yang ditentukan oleh parameter column.

Misalnya, jika record sumber memiliki tiga kolom dengan nilai a, b, dan c, dan writeMode diatur ke text serta fieldDelimiter diatur ke #, nilai record Kafka yang ditulis adalah string a#b#c. Jika writeMode diatur ke JSON dan column diatur ke [{"name":"col1"},{"name":"col2"},{"name":"col3"}], nilai record Kafka yang ditulis adalah string {"col1":"a","col2":"b","col3":"c"}.

Jika valueIndex dikonfigurasi, parameter ini tidak berlaku.

Tidak

column

Bidang di tabel tujuan tempat data akan ditulis, dipisahkan koma. Contoh: "column": ["id", "name", "age"].

Saat valueIndex tidak dikonfigurasi dan writeMode diatur ke JSON, parameter ini mendefinisikan nama bidang dalam struktur JSON untuk nilai kolom record sumber. Contoh, "column": [{"name":"id","type":"JSON_NUMBER"}, {"name":"name","type":"JSON_STRING"}, {"name":"age","type":"JSON_NUMBER"}].

  • Jika jumlah kolom dalam record sumber lebih besar daripada jumlah nama bidang yang dikonfigurasi dalam column, data dipotong saat penulisan. Contoh:

    Jika record sumber memiliki tiga kolom dengan nilai a, b, dan c, dan column dikonfigurasi sebagai [{"name":"col1","type":"JSON_STRING"},{"name":"col2","type":"JSON_STRING"}], nilai record Kafka yang ditulis adalah string {"col1":"a","col2":"b"}.

  • Jika jumlah kolom dalam record sumber lebih kecil daripada jumlah nama bidang yang dikonfigurasi dalam column, nama bidang tambahan dalam konfigurasi column diisi dengan null atau string yang ditentukan oleh nullValueFormat. Contoh:

    Jika record sumber memiliki dua kolom dengan nilai a dan b, dan column dikonfigurasi sebagai [{"name":"col1","type":"JSON_STRING"},{"name":"col2","type":"JSON_STRING"},{"name":"col3","type":"JSON_STRING"}], nilai record Kafka yang ditulis adalah string {"col1":"a","col2":"b","col3":null}. Jika valueIndex dikonfigurasi, atau jika writeMode diatur ke text, parameter ini tidak berlaku.

  • Jika tipe bidang JSON tidak dikonfigurasi, tipe bidang default adalah JSON_STRING.

  • Untuk nilai yang valid dari tipe bidang JSON, lihat Apendiks: Tipe bidang JSON.

Jika valueIndex dikonfigurasi, atau jika writeMode diatur ke text, parameter ini tidak berlaku.

Wajib saat valueIndex tidak dikonfigurasi dan writeMode diatur ke JSON.

partition

Menentukan nomor partisi dalam topik Kafka tempat data ditulis. Ini harus berupa bilangan bulat yang lebih besar dari atau sama dengan 0.

Tidak

keyIndex

Kolom di writer Kafka yang digunakan sebagai kunci.

Nilai parameter keyIndex harus berupa bilangan bulat yang lebih besar dari atau sama dengan 0. Jika tidak, tugas akan gagal.

Tidak

keyIndexes

Array nomor ordinal kolom dalam record sumber yang digunakan sebagai kunci untuk record Kafka.

Nomor ordinal kolom dimulai dari 0. Misalnya, [0,1,2] akan menggabungkan nilai semua nomor kolom yang dikonfigurasi dengan koma untuk membentuk kunci record Kafka. Jika tidak ditentukan, kunci record Kafka adalah null, dan data ditulis ke partisi topik secara round-robin. Anda hanya dapat menentukan salah satu parameter ini atau keyIndex.

Tidak

fieldDelimiter

Saat writeMode diatur ke text dan valueIndex tidak dikonfigurasi, semua kolom record sumber digabungkan menggunakan pemisah kolom yang ditentukan oleh parameter ini untuk membentuk nilai record Kafka. Anda dapat mengonfigurasi satu karakter atau beberapa karakter sebagai pemisah. Anda dapat mengonfigurasi karakter Unicode dalam format \u0001. Karakter escape seperti \t dan \n didukung. Nilai default adalah \t.

Jika writeMode tidak diatur ke text atau jika valueIndex dikonfigurasi, parameter ini tidak berlaku.

Tidak

keyType

Tipe kunci Kafka. Nilai yang valid: BYTEARRAY, DOUBLE, FLOAT, INTEGER, LONG, dan SHORT.

Ya

valueType

Tipe nilai Kafka. Nilai yang valid: BYTEARRAY, DOUBLE, FLOAT, INTEGER, LONG, dan SHORT.

Ya

nullKeyFormat

Jika nilai kolom sumber yang ditentukan oleh keyIndex atau keyIndexes adalah null, nilai tersebut diganti dengan string yang ditentukan oleh parameter ini. Jika tidak dikonfigurasi, tidak ada penggantian yang dilakukan.

Tidak

nullValueFormat

Jika nilai kolom sumber adalah null, nilai tersebut diganti dengan string yang ditentukan oleh parameter ini saat merakit nilai record Kafka. Jika tidak dikonfigurasi, tidak ada penggantian yang dilakukan.

Tidak

acks

Konfigurasi acks saat menginisialisasi produsen Kafka. Ini menentukan metode acknowledgment untuk penulisan yang berhasil. Secara default, parameter acks diatur ke all. Nilai yang valid untuk acks adalah:

  • 0: Tidak ada acknowledgment untuk penulisan yang berhasil.

  • 1: Acknowledgment untuk penulisan yang berhasil ke replica primer.

  • all: Acknowledgment untuk penulisan yang berhasil ke semua replica.

Tidak

Apendiks: Definisi format pesan untuk penulisan ke Kafka

Setelah Anda mengonfigurasi dan menjalankan tugas sinkronisasi real-time, data yang dibaca dari database sumber ditulis ke topik Kafka dalam format JSON. Pertama, semua data yang ada dalam tabel sumber yang ditentukan ditulis ke topik Kafka yang sesuai. Kemudian, tugas memulai sinkronisasi real-time untuk terus menulis data inkremental ke topik tersebut. Informasi perubahan DDL inkremental dari tabel sumber juga ditulis ke topik Kafka dalam format JSON. Anda dapat memperoleh status dan informasi perubahan pesan yang ditulis ke Kafka. Untuk informasi lebih lanjut, lihat Apendiks: Format pesan.

Catatan

Dalam struktur JSON data yang ditulis ke Kafka oleh tugas sinkronisasi offline, bidang payload.sequenceId, payload.timestamp.eventTime, dan payload.timestamp.checkpointTime diatur ke -1.

Apendiks: Tipe bidang JSON

Saat writeMode diatur ke JSON, Anda dapat menggunakan bidang type dalam parameter column untuk menentukan tipe data JSON. Selama operasi tulis, sistem mencoba mengonversi nilai kolom dalam record sumber ke tipe yang ditentukan. Jika konversi ini gagal, data kotor dihasilkan.

Nilai yang Valid

Deskripsi

JSON_STRING

Mengonversi nilai kolom record sumber menjadi string dan menulisnya ke bidang JSON. Misalnya, jika nilai kolom record sumber adalah bilangan bulat 123 dan column dikonfigurasi sebagai [{"name":"col1","type":"JSON_STRING"}], nilai record Kafka yang ditulis adalah string {"col1":"123"}.

JSON_NUMBER

Mengonversi nilai kolom record sumber menjadi angka dan menulisnya ke bidang JSON. Misalnya, jika nilai kolom record sumber adalah string 1.23 dan column dikonfigurasi sebagai [{"name":"col1","type":"JSON_NUMBER"}], nilai record Kafka yang ditulis adalah string {"col1":1.23}.

JSON_BOOL

Mengonversi nilai kolom record sumber menjadi nilai Boolean dan menulisnya ke bidang JSON. Misalnya, jika nilai kolom record sumber adalah string true dan column dikonfigurasi sebagai [{"name":"col1","type":"JSON_BOOL"}, nilai record Kafka yang ditulis adalah string {"col1":true}

JSON_ARRAY

Mengonversi nilai kolom record sumber menjadi array JSON dan menulisnya ke bidang JSON. Misalnya, jika nilai kolom record sumber adalah string [1,2,3] dan column dikonfigurasi sebagai [{"name":"col1","type":"JSON_ARRAY"}], nilai record Kafka yang ditulis adalah string {"col1":[1,2,3]}.

JSON_MAP

Mengonversi nilai kolom record sumber menjadi objek JSON dan menulisnya ke bidang JSON. Misalnya, jika nilai kolom record sumber adalah string {"k1":"v1"} dan column dikonfigurasi sebagai [{"name":"col1","type":"JSON_MAP"}], nilai record Kafka yang ditulis adalah string {"col1":{"k1":"v1"}}.

JSON_BASE64

Mengonversi konten byte biner nilai kolom record sumber menjadi string yang dikodekan BASE64 dan menulisnya ke bidang JSON. Misalnya, jika nilai kolom record sumber adalah array byte dengan panjang 2, direpresentasikan dalam heksadesimal sebagai 0x01 0x02, dan column dikonfigurasi sebagai [{"name":"col1","type":"JSON_BASE64"}], nilai record Kafka yang ditulis adalah string {"col1":"AQI="}.

JSON_HEX

Mengonversi konten byte biner nilai kolom record sumber menjadi string numerik heksadesimal dan menulisnya ke bidang JSON. Misalnya, jika nilai kolom record sumber adalah array byte dengan panjang 2, direpresentasikan dalam heksadesimal sebagai 0x01 0x02, dan column dikonfigurasi sebagai [{"name":"col1","type":"JSON_HEX"}], nilai record Kafka yang ditulis adalah string {"col1":"0102"}.