Sumber data Kafka menyediakan saluran dua arah untuk membaca dan menulis data ke Kafka. Topik ini menjelaskan kemampuan sinkronisasi data yang disediakan oleh DataWorks untuk Kafka.
Versi yang didukung
DataWorks mendukung Kafka Alibaba Cloud serta versi Kafka yang dikelola sendiri mulai dari 0.10.2 hingga 3.6.x.
Sinkronisasi data tidak didukung untuk versi Kafka sebelum 0.10.2 karena versi tersebut tidak mendukung pengambilan offset partisi dan struktur datanya mungkin tidak kompatibel dengan timestamp.
Baca real-time
Jika Anda menggunakan kelompok sumber daya Serverless berlangganan, perkirakan spesifikasi yang dibutuhkan terlebih dahulu untuk mencegah kegagalan tugas akibat sumber daya yang tidak mencukupi.
Perkirakan 1 CU per topik. Selain itu, perkirakan sumber daya berdasarkan lalu lintas sebagai berikut:
Untuk data Kafka yang tidak dikompresi, alokasikan 1 CU per 10 MB/detik lalu lintas.
Untuk data Kafka yang dikompresi, alokasikan 2 CU per 10 MB/detik lalu lintas.
Untuk data Kafka yang dikompresi dan memerlukan penguraian JSON, alokasikan 3 CU per 10 MB/detik lalu lintas.
Jika Anda menggunakan kelompok sumber daya Serverless berlangganan atau versi lama grup sumber daya eksklusif untuk Integrasi Data:
Untuk beban kerja dengan toleransi tinggi terhadap failover, pastikan penggunaan slot kluster tidak melebihi 80%.
Untuk beban kerja dengan toleransi rendah terhadap failover, pastikan penggunaan slot kluster tidak melebihi 70%.
Penggunaan sumber daya aktual bergantung pada faktor seperti konten dan format data. Sesuaikan alokasi sumber daya berdasarkan kondisi waktu proses setelah estimasi awal.
Batasan
Sumber data Kafka mendukung kelompok sumber daya Serverless (disarankan) dan versi lama grup sumber daya eksklusif untuk Integrasi Data.
Baca offline dari satu tabel
Jika kedua parameter parameter.groupId dan parameter.kafkaConfig.group.id dikonfigurasi, parameter.groupId memiliki prioritas lebih tinggi daripada group.id dalam parameter kafkaConfig.
Tulis real-time ke satu tabel
Deduplikasi data tidak didukung untuk operasi tulis. Jika suatu tugas dimulai ulang setelah reset offset atau failover, data duplikat mungkin ditulis.
Tulis real-time untuk seluruh database
Tugas sinkronisasi data real-time mendukung kelompok sumber daya Serverless (disarankan) dan versi lama grup sumber daya eksklusif untuk Integrasi Data.
Jika tabel sumber memiliki kunci utama, nilai kunci utama digunakan sebagai kunci catatan Kafka. Hal ini memastikan bahwa perubahan pada kunci utama yang sama ditulis ke partisi Kafka yang sama secara berurutan.
Jika tabel sumber tidak memiliki kunci utama, Anda memiliki dua opsi. Jika memilih opsi untuk menyinkronkan tabel tanpa kunci utama, kunci catatan Kafka akan kosong. Untuk memastikan perubahan tabel ditulis ke Kafka secara berurutan, topik Kafka tujuan harus hanya memiliki satu partisi. Jika memilih kunci utama kustom, kombinasi satu atau beberapa bidang non-kunci utama digunakan sebagai kunci catatan Kafka.
Jika kluster Kafka mengembalikan pengecualian dan Anda perlu memastikan bahwa perubahan pada kunci utama yang sama ditulis ke partisi Kafka yang sama secara berurutan, tambahkan konfigurasi berikut ke parameter tambahan sumber data Kafka:
{"max.in.flight.requests.per.connection":1,"buffer.memory": 100554432}PentingKonfigurasi ini secara signifikan menurunkan kinerja replikasi. Anda harus menyeimbangkan antara kinerja dan kebutuhan akan pengurutan serta keandalan yang ketat.
Untuk informasi lebih lanjut tentang format pesan lengkap yang ditulis ke Kafka dalam sinkronisasi real-time, termasuk format pesan heartbeat dan format pesan yang sesuai dengan perubahan data sumber, lihat Lampiran: Format pesan.
Tipe bidang yang didukung
Kafka menyediakan penyimpanan data tidak terstruktur. Catatan Kafka biasanya mencakup kunci, nilai, offset, timestamp, header, dan partisi. Saat DataWorks membaca atau menulis data ke Kafka, pemrosesan data mengikuti kebijakan berikut.
Baca data offline
Saat DataWorks membaca data dari Kafka, data tersebut dapat diurai dalam format JSON. Tabel berikut menjelaskan cara setiap modul data diproses.
Modul data catatan Kafka | Tipe data yang diproses |
key | Bergantung pada item konfigurasi keyType dalam tugas sinkronisasi data. Untuk informasi lebih lanjut tentang parameter keyType, lihat deskripsi parameter lengkap di lampiran. |
value | Bergantung pada item konfigurasi valueType dalam tugas sinkronisasi data. Untuk informasi lebih lanjut tentang parameter valueType, lihat deskripsi parameter lengkap di lampiran. |
offset | Long |
timestamp | Long |
headers | String |
partition | Long |
Tulis data offline
Saat DataWorks menulis data ke Kafka, format penulisan yang didukung adalah JSON atau teks. Kebijakan pemrosesan data bervariasi tergantung pada solusi sinkronisasi yang digunakan. Tabel berikut menjelaskan detailnya.
Jika data ditulis dalam format teks, nama bidang tidak disertakan. Nilai bidang dipisahkan oleh pemisah.
Jika data ditulis ke Kafka dalam tugas sinkronisasi real-time, format JSON bawaan digunakan. Data yang ditulis mencakup semua informasi, seperti pesan perubahan database, waktu bisnis, dan informasi DDL. Untuk informasi lebih lanjut tentang format data, lihat Lampiran: Format pesan.
Jenis tugas sinkronisasi | Format nilai yang ditulis ke Kafka | Tipe bidang sumber | Metode pemrosesan untuk operasi tulis |
Sinkronisasi offline Node sinkronisasi offline di DataStudio | json | String | String yang dikodekan UTF-8 |
Boolean | Dikonversi menjadi string "true" atau "false" yang dikodekan UTF-8 | ||
Waktu/Tanggal | String yang dikodekan UTF-8 dalam format yyyy-MM-dd HH:mm:ss | ||
Numerik | String numerik yang dikodekan UTF-8 | ||
Aliran byte | Aliran byte diperlakukan sebagai string yang dikodekan UTF-8 dan dikonversi menjadi string. | ||
teks | String | String yang dikodekan UTF-8 | |
Boolean | Dikonversi menjadi string "true" atau "false" yang dikodekan UTF-8 | ||
Waktu/Tanggal | String yang dikodekan UTF-8 dalam format yyyy-MM-dd HH:mm:ss | ||
Numerik | String numerik yang dikodekan UTF-8 | ||
Aliran byte | Aliran byte diperlakukan sebagai string yang dikodekan UTF-8 dan dikonversi menjadi string. | ||
Sinkronisasi real-time: ETL real-time ke Kafka Node sinkronisasi real-time di DataStudio | json | String | String yang dikodekan UTF-8 |
Boolean | Tipe Boolean JSON | ||
Waktu/Tanggal |
| ||
Numerik | Tipe numerik JSON | ||
Aliran byte | Aliran byte dikodekan Base64 lalu dikonversi menjadi string yang dikodekan UTF-8. | ||
teks | String | String yang dikodekan UTF-8 | |
Boolean | Dikonversi menjadi string "true" atau "false" yang dikodekan UTF-8 | ||
Waktu/Tanggal | String yang dikodekan UTF-8 dalam format yyyy-MM-dd HH:mm:ss | ||
Numerik | String numerik yang dikodekan UTF-8 | ||
Aliran byte | Aliran byte dikodekan Base64 lalu dikonversi menjadi string yang dikodekan UTF-8. | ||
Sinkronisasi real-time: Sinkronisasi real-time seluruh database ke Kafka Hanya sinkronisasi data inkremental | Format JSON bawaan | String | String yang dikodekan UTF-8 |
Boolean | Tipe Boolean JSON | ||
Waktu/Tanggal | Timestamp milidetik 13 digit | ||
Numerik | Nilai numerik JSON | ||
Aliran byte | Aliran byte dikodekan Base64 lalu dikonversi menjadi string yang dikodekan UTF-8. | ||
Solusi sinkronisasi: Sinkronisasi real-time sekali klik ke Kafka Sinkronisasi offline penuh + sinkronisasi real-time inkremental | Format JSON bawaan | String | String yang dikodekan UTF-8 |
Boolean | Tipe Boolean JSON | ||
Waktu/Tanggal | Timestamp milidetik 13 digit | ||
Numerik | Nilai numerik JSON | ||
Aliran byte | Aliran byte dikodekan Base64 lalu dikonversi menjadi string yang dikodekan UTF-8. |
Tambahkan sumber data
Sebelum mengembangkan tugas sinkronisasi di DataWorks, Anda harus menambahkan sumber data yang diperlukan ke DataWorks sesuai petunjuk di Manajemen sumber data. Anda dapat melihat infotips parameter di konsol DataWorks untuk memahami arti parameter saat menambahkan sumber data.
Kembangkan tugas sinkronisasi data
Untuk informasi mengenai titik masuk dan prosedur konfigurasi tugas sinkronisasi, lihat panduan konfigurasi berikut.
Konfigurasikan tugas sinkronisasi offline untuk satu tabel
Untuk informasi lebih lanjut tentang prosedurnya, lihat Konfigurasikan tugas sinkronisasi offline di antarmuka tanpa kode dan Konfigurasikan tugas sinkronisasi offline di editor kode.
Untuk daftar lengkap parameter dan demo skrip untuk editor kode, lihat Lampiran: Demo skrip dan deskripsi parameter.
Konfigurasikan tugas sinkronisasi real-time untuk satu tabel atau seluruh database
Untuk informasi lebih lanjut tentang prosedurnya, lihat Konfigurasikan tugas sinkronisasi real-time di DataStudio.
Konfigurasikan tugas sinkronisasi real-time penuh dan inkremental untuk satu tabel atau seluruh database
Untuk informasi lebih lanjut tentang prosedurnya, lihat Konfigurasikan tugas sinkronisasi di Integrasi Data.
Konfigurasi autentikasi
SSL
Jika Anda mengatur Special Authentication Method untuk sumber data Kafka ke SSL atau SASL_SSL, autentikasi SSL diaktifkan untuk kluster Kafka. Anda harus mengunggah file sertifikat truststore klien dan memasukkan frasa sandi truststore.
Jika kluster Kafka adalah instans Kafka Alibaba Cloud, lihat Petunjuk peningkatan algoritma sertifikat SSL untuk mengunduh file sertifikat truststore yang sesuai. Frasa sandi truststore adalah KafkaOnsClient.
Jika kluster Kafka adalah instans EMR, lihat Gunakan enkripsi SSL untuk koneksi Kafka untuk mengunduh file sertifikat truststore yang sesuai dan mendapatkan frasa sandi truststore.
Untuk kluster yang dikelola sendiri, Anda harus mengunggah sertifikat truststore yang sesuai dan memasukkan frasa sandi truststore yang benar.
File sertifikat keystore, frasa sandi keystore, dan frasa sandi SSL hanya diperlukan jika autentikasi SSL dua arah diaktifkan untuk kluster Kafka. Server kluster Kafka menggunakan informasi ini untuk mengautentikasi identitas klien. Autentikasi SSL dua arah diaktifkan ketika ssl.client.auth=required diatur dalam file server.properties kluster Kafka. Untuk informasi lebih lanjut, lihat Gunakan enkripsi SSL untuk koneksi Kafka.
GSSAPI
Jika Anda mengatur Sasl Mechanism ke GSSAPI saat mengonfigurasi sumber data Kafka, Anda harus mengunggah tiga file autentikasi: file konfigurasi JAAS, file konfigurasi Kerberos, dan file Keytab. Anda juga harus mengonfigurasi pengaturan DNS/HOST untuk grup sumber daya eksklusif. Bagian berikut menjelaskan file-file tersebut serta pengaturan DNS dan HOST yang diperlukan.
Untuk grup sumber daya Serverless, Anda harus mengonfigurasi informasi alamat host menggunakan resolusi DNS internal. Untuk informasi lebih lanjut, lihat Resolusi DNS internal (PrivateZone).
File konfigurasi JAAS
File JAAS harus dimulai dengan KafkaClient, diikuti oleh semua item konfigurasi yang diapit sepasang tanda kurung kurawal {}:
Baris pertama di dalam tanda kurung kurawal mendefinisikan kelas komponen logon yang digunakan. Untuk mekanisme autentikasi SASL yang berbeda, kelas komponen logon bersifat tetap. Setiap item konfigurasi berikutnya ditulis dalam format key=value.
Semua item konfigurasi kecuali yang terakhir tidak boleh diakhiri dengan titik koma.
Item konfigurasi terakhir harus diakhiri dengan titik koma. Titik koma juga harus ditambahkan setelah tanda kurung kurawal penutup "}".
Jika persyaratan format tidak dipenuhi, file konfigurasi JAAS tidak dapat diurai. Kode berikut menunjukkan format file konfigurasi JAAS khas. Ganti placeholder xxx dengan informasi aktual Anda.
KafkaClient { com.sun.security.auth.module.Krb5LoginModule required useKeyTab=true keyTab="xxx" storeKey=true serviceName="kafka-server" principal="kafka-client@EXAMPLE.COM"; };Item konfigurasi
Deskripsi
Modul logon
Harus diatur ke com.sun.security.auth.module.Krb5LoginModule.
useKeyTab
Harus diatur ke true.
keyTab
Anda dapat menentukan path apa pun. Saat tugas sinkronisasi berjalan, file keytab yang diunggah selama konfigurasi sumber data secara otomatis diunduh ke path lokal. Path file lokal ini kemudian digunakan untuk mengisi item konfigurasi keyTab.
storeKey
Menentukan apakah klien menyimpan kunci. Anda dapat mengatur ini ke true atau false. Ini tidak memengaruhi sinkronisasi data.
serviceName
Sesuai dengan item konfigurasi sasl.kerberos.service.name dalam file konfigurasi server.properties server Kafka. Konfigurasikan item ini sesuai kebutuhan.
principal
Principal Kerberos yang digunakan oleh klien Kafka. Konfigurasikan item ini sesuai kebutuhan dan pastikan bahwa file keytab yang diunggah berisi kunci untuk principal ini.
File konfigurasi Kerberos
File konfigurasi Kerberos harus berisi dua modul: [libdefaults] dan [realms].
Modul [libdefaults] menentukan parameter autentikasi Kerberos. Setiap item konfigurasi dalam modul ditulis dalam format key=value.
Modul [realms] menentukan alamat KDC. Modul ini dapat berisi beberapa submodul realm. Setiap submodul realm dimulai dengan nama_realm=.
Ini diikuti oleh serangkaian item konfigurasi yang diapit tanda kurung kurawal. Setiap item konfigurasi juga ditulis dalam format key=value. Kode berikut menunjukkan format file konfigurasi Kerberos khas. Ganti placeholder xxx dengan informasi aktual Anda.
[libdefaults] default_realm = xxx [realms] xxx = { kdc = xxx }Item konfigurasi
Deskripsi
[libdefaults].default_realm
Realm default yang digunakan saat mengakses node kluster Kafka. Biasanya sama dengan realm principal klien yang ditentukan dalam file konfigurasi JAAS.
Parameter [libdefaults] lainnya
Modul [libdefaults] dapat menentukan parameter autentikasi Kerberos lainnya, seperti ticket_lifetime. Konfigurasikan sesuai kebutuhan.
[realms].nama_realm
Harus sama dengan realm principal klien yang ditentukan dalam file konfigurasi JAAS dan [libdefaults].default_realm. Jika realm principal klien dalam file konfigurasi JAAS berbeda dari [libdefaults].default_realm, Anda perlu menyertakan dua submodul realms. Submodul ini harus sesuai dengan realm principal klien dalam file konfigurasi JAAS dan [libdefaults].default_realm, masing-masing.
[realms].nama_realm.kdc
Menentukan alamat dan port KDC dalam format ip:port. Misalnya, kdc=10.0.0.1:88. Jika port dihilangkan, port default 88 digunakan. Misalnya, kdc=10.0.0.1.
File Keytab
File Keytab harus berisi kunci untuk principal yang ditentukan dalam file konfigurasi JAAS dan harus dapat diverifikasi oleh KDC. Misalnya, jika ada file bernama client.keytab di direktori kerja saat ini, Anda dapat menjalankan perintah berikut untuk memverifikasi apakah file Keytab berisi kunci untuk principal yang ditentukan.
klist -ket ./client.keytab Keytab name: FILE:client.keytab KVNO Timestamp Principal ---- ------------------- ------------------------------------------------------ 7 2018-07-30T10:19:16 te**@**.com (des-cbc-md5)Konfigurasi DNS dan HOST untuk grup sumber daya eksklusif
Dalam kluster Kafka dengan autentikasi Kerberos diaktifkan, hostname node dalam kluster digunakan sebagai bagian dari principal yang terdaftar untuk node tersebut di KDC (Key Distribution Center). Saat klien mengakses node dalam kluster Kafka, klien tersebut menginferensi principal node berdasarkan pengaturan DNS dan HOST lokal untuk mendapatkan kredensial akses dari KDC. Jika Anda menggunakan grup sumber daya eksklusif untuk mengakses kluster Kafka dengan autentikasi Kerberos diaktifkan, Anda harus mengonfigurasi pengaturan DNS dan HOST dengan benar agar kredensial akses untuk node kluster dapat diperoleh dari KDC:
Pengaturan DNS
Jika instans PrivateZone digunakan untuk resolusi nama domain node kluster Kafka di VPC tempat grup sumber daya eksklusif dilampirkan, Anda dapat menambahkan rute kustom untuk alamat IP 100.100.2.136 dan 100.100.2.138 ke lampiran VPC grup sumber daya eksklusif di konsol DataWorks. Hal ini memastikan bahwa pengaturan resolusi nama domain PrivateZone untuk node kluster Kafka berlaku bagi grup sumber daya eksklusif.

Pengaturan HOST
Jika instans PrivateZone tidak digunakan untuk resolusi nama domain node kluster Kafka di VPC tempat grup sumber daya eksklusif dilampirkan, Anda harus menambahkan pemetaan alamat IP ke nama domain untuk setiap node kluster Kafka ke konfigurasi Host di pengaturan jaringan untuk grup sumber daya eksklusif di konsol DataWorks.

PLAIN
Saat mengonfigurasi sumber data Kafka, jika Anda mengatur Sasl Mechanism ke PLAIN, file JAAS harus dimulai dengan KafkaClient, diikuti oleh semua item konfigurasi yang diapit sepasang tanda kurung kurawal {}.
Baris pertama di dalam tanda kurung kurawal mendefinisikan kelas komponen logon yang digunakan. Untuk mekanisme autentikasi SASL yang berbeda, kelas komponen logon bersifat tetap. Setiap item konfigurasi berikutnya ditulis dalam format key=value.
Semua item konfigurasi kecuali yang terakhir tidak boleh diakhiri dengan titik koma.
Item konfigurasi terakhir harus diakhiri dengan titik koma. Titik koma juga harus ditambahkan setelah tanda kurung kurawal penutup "}".
Jika persyaratan format tidak dipenuhi, file konfigurasi JAAS tidak dapat diurai. Kode berikut menunjukkan format file konfigurasi JAAS khas. Ganti placeholder xxx dengan informasi aktual Anda.
KafkaClient {
org.apache.kafka.common.security.plain.PlainLoginModule required
username="xxx"
password="xxx";
};Item konfigurasi | Deskripsi |
Modul logon | Harus diatur ke org.apache.kafka.common.security.plain.PlainLoginModul |
username | Nama pengguna. Konfigurasikan item ini sesuai kebutuhan. |
password | Kata sandi. Konfigurasikan item ini sesuai kebutuhan. |
FAQ
Lampiran: Demo skrip dan deskripsi parameter
Konfigurasikan tugas sinkronisasi batch menggunakan editor kode
Jika Anda ingin mengonfigurasi tugas sinkronisasi batch menggunakan editor kode, Anda harus mengonfigurasi parameter terkait dalam skrip sesuai dengan persyaratan format skrip terpadu. Untuk informasi lebih lanjut, lihat Konfigurasikan tugas di editor kode. Informasi berikut menjelaskan parameter yang harus Anda konfigurasi untuk sumber data saat mengonfigurasi tugas sinkronisasi batch menggunakan editor kode.
Demo skrip Reader
Kode berikut menunjukkan konfigurasi JSON untuk membaca data dari Kafka.
{
"type": "job",
"steps": [
{
"stepType": "kafka",
"parameter": {
"server": "host:9093",
"column": [
"__key__",
"__value__",
"__partition__",
"__offset__",
"__timestamp__",
"'123'",
"event_id",
"tag.desc"
],
"kafkaConfig": {
"group.id": "demo_test"
},
"topic": "topicName",
"keyType": "ByteArray",
"valueType": "ByteArray",
"beginDateTime": "20190416000000",
"endDateTime": "20190416000006",
"skipExceedRecord": "true"
},
"name": "Reader",
"category": "reader"
},
{
"stepType": "stream",
"parameter": {
"print": false,
"fieldDelimiter": ","
},
"name": "Writer",
"category": "writer"
}
],
"version": "2.0",
"order": {
"hops": [
{
"from": "Reader",
"to": "Writer"
}
]
},
"setting": {
"errorLimit": {
"record": "0"
},
"speed": {
"throttle": true,//Ketika throttle adalah false, parameter mbps tidak berlaku, yang berarti laju tidak dibatasi. Ketika throttle adalah true, laju dibatasi.
"concurrent": 1,//Jumlah thread konkuren.
"mbps":"12"//Laju transmisi maksimum. 1 mbps sama dengan 1 MB/detik.
}
}
}Parameter skrip Reader
Parameter | Deskripsi | Wajib |
datasource | Nama sumber data. Editor kode mendukung penambahan sumber data. Nilai parameter ini harus sama dengan nama sumber data yang ditambahkan. | Ya |
server | Alamat server broker Kafka dalam format ip:port. Anda hanya dapat mengonfigurasi satu server, tetapi Anda harus memastikan bahwa DataWorks dapat terhubung ke alamat IP semua broker di kluster Kafka. | Ya |
topic | Topik Kafka. Topik adalah agregasi feed pesan yang diproses Kafka. | Ya |
column | Data Kafka yang akan dibaca. Kolom konstan, kolom data, dan kolom atribut didukung.
| Ya |
keyType | Tipe kunci Kafka. Nilai yang valid: BYTEARRAY, DOUBLE, FLOAT, INTEGER, LONG, dan SHORT. | Ya |
valueType | Tipe nilai Kafka. Nilai yang valid: BYTEARRAY, DOUBLE, FLOAT, INTEGER, LONG, dan SHORT. | Ya |
beginDateTime | Offset awal untuk konsumsi data. Ini adalah batas kiri rentang waktu (inklusif). Ini adalah string waktu dalam format yyyymmddhhmmss. Anda dapat menggunakannya dengan Scheduling Parameters. Untuk informasi lebih lanjut, lihat Format yang didukung untuk parameter penjadwalan. Catatan Fitur ini didukung di Kafka 0.10.2 dan yang lebih baru. | Anda harus menentukan salah satu parameter ini atau beginOffset. Catatan beginDateTime dan endDateTime digunakan bersama. |
endDateTime | Offset akhir untuk konsumsi data. Ini adalah batas kanan rentang waktu (eksklusif). Ini adalah string waktu dalam format yyyymmddhhmmss. Anda dapat menggunakannya dengan Scheduling Parameters. Untuk informasi lebih lanjut, lihat Format yang didukung untuk parameter penjadwalan. Catatan Fitur ini didukung di Kafka 0.10.2 dan yang lebih baru. | Anda harus menentukan salah satu parameter ini atau endOffset. Catatan endDateTime dan beginDateTime digunakan bersama. |
beginOffset | Offset awal untuk konsumsi data. Anda dapat mengonfigurasinya dalam bentuk berikut:
| Anda harus menentukan salah satu parameter ini atau beginDateTime. |
endOffset | Offset akhir untuk konsumsi data. Ini digunakan untuk mengontrol kapan tugas konsumsi data berhenti. | Anda harus menentukan salah satu parameter ini atau endDateTime. |
skipExceedRecord | Kafka menggunakan
| Tidak. Nilai default adalah false. |
partition | Topik Kafka memiliki beberapa partisi (partition). Secara default, tugas sinkronisasi data membaca data dari rentang offset yang mencakup semua partisi dalam topik. Anda juga dapat menentukan partition untuk membaca data hanya dari rentang offset satu partisi saja. | Tidak. Tidak ada nilai default. |
kafkaConfig | Saat membuat klien KafkaConsumer untuk konsumsi data, Anda dapat menentukan parameter tambahan seperti bootstrap.servers, auto.commit.interval.ms, dan session.timeout.ms. Anda dapat menggunakan kafkaConfig untuk mengontrol perilaku konsumsi KafkaConsumer. | Tidak |
encoding | Saat keyType atau valueType diatur ke STRING, encoding yang ditentukan oleh parameter ini digunakan untuk mengurai string. | Tidak. Nilai default adalah UTF-8. |
waitTIme | Waktu maksimum, dalam detik, yang ditunggu objek konsumen untuk menarik data dari Kafka dalam satu upaya. | Tidak. Nilai default adalah 60. |
stopWhenPollEmpty | Nilai yang valid adalah true dan false. Jika parameter ini diatur ke true dan konsumen menarik data kosong dari Kafka (biasanya karena semua data di topik telah dibaca, atau karena masalah jaringan atau ketersediaan kluster Kafka), tugas berhenti segera. Jika tidak, tugas mencoba lagi hingga data berhasil dibaca. | Tidak. Nilai default adalah true. |
stopWhenReachEndOffset | Parameter ini hanya berlaku ketika stopWhenPollEmpty adalah true. Nilai yang valid adalah true dan false.
| Tidak. Nilai default adalah false. Catatan Ini untuk kompatibilitas dengan logika historis. Versi Kafka sebelum V0.10.2 tidak dapat memeriksa apakah data terbaru dari semua partisi topik Kafka telah dibaca. Namun, beberapa tugas editor kode mungkin membaca data dari versi Kafka sebelum V0.10.2. |
Tabel berikut menjelaskan parameter kafkaConfig.
Parameter | Deskripsi |
fetch.min.bytes | Menentukan jumlah minimum byte pesan yang dapat diperoleh konsumen dari broker. Data dikembalikan ke konsumen hanya ketika tersedia data yang cukup. |
fetch.max.wait.ms | Waktu maksimum menunggu broker mengembalikan data. Nilai default adalah 500 milidetik. Data dikembalikan berdasarkan kondisi mana yang terpenuhi lebih dulu: fetch.min.bytes atau fetch.max.wait.ms. |
max.partition.fetch.bytes | Menentukan jumlah maksimum byte yang dapat dikembalikan broker ke konsumen dari setiap partition. Nilai default adalah 1 MB. |
session.timeout.ms | Menentukan waktu maksimum konsumen dapat terputus dari server sebelum berhenti menerima layanan. Nilai default adalah 30 detik. |
auto.offset.reset | Tindakan yang diambil konsumen saat membaca tanpa offset atau offset tidak valid (karena konsumen tidak aktif dalam waktu lama dan catatan dengan offset tersebut telah kedaluwarsa dan dihapus). Nilai default adalah none, yang berarti offset tidak diatur ulang secara otomatis. Anda dapat mengubahnya menjadi earliest, yang berarti konsumen membaca catatan partition dari offset paling awal. |
max.poll.records | Jumlah pesan yang dapat dikembalikan oleh satu panggilan metode poll. |
key.deserializer | Metode deserialisasi untuk kunci pesan, seperti org.apache.kafka.common.serialization.StringDeserializer. |
value.deserializer | Metode deserialisasi untuk nilai data, seperti org.apache.kafka.common.serialization.StringDeserializer. |
ssl.truststore.location | Path sertifikat root SSL. |
ssl.truststore.password | Kata sandi untuk penyimpanan sertifikat root. Jika Anda menggunakan Kafka Alibaba Cloud, atur ini ke KafkaOnsClient. |
security.protocol | Protokol akses. Saat ini, hanya protokol SASL_SSL yang didukung. |
sasl.mechanism | Metode autentikasi SASL. Jika Anda menggunakan Kafka Alibaba Cloud, gunakan PLAIN. |
java.security.auth.login.config | Path file autentikasi SASL. |
Demo skrip Writer
Kode berikut menunjukkan konfigurasi JSON untuk menulis data ke Kafka.
{
"type":"job",
"version":"2.0",//Nomor versi.
"steps":[
{
"stepType":"stream",
"parameter":{},
"name":"Reader",
"category":"reader"
},
{
"stepType":"Kafka",//Nama plugin.
"parameter":{
"server": "ip:9092", //Alamat server Kafka.
"keyIndex": 0, //Kolom yang akan digunakan sebagai kunci. Harus mengikuti konvensi penamaan camel case, dengan k dalam huruf kecil.
"valueIndex": 1, //Kolom yang akan digunakan sebagai nilai. Saat ini, Anda hanya dapat memilih satu kolom dari data sumber atau membiarkan parameter ini kosong. Jika kosong, semua data sumber digunakan.
//Misalnya, untuk menggunakan kolom ke-2, ke-3, dan ke-4 dari tabel ODPS sebagai kafkaValue, buat tabel ODPS baru, bersihkan dan integrasikan data dari tabel ODPS asli ke tabel baru, lalu gunakan tabel baru untuk sinkronisasi.
"keyType": "Integer", //Tipe kunci Kafka.
"valueType": "Short", //Tipe nilai Kafka.
"topic": "t08", //Topik Kafka.
"batchSize": 1024 //Jumlah data yang ditulis ke Kafka sekaligus, dalam byte.
},
"name":"Writer",
"category":"writer"
}
],
"setting":{
"errorLimit":{
"record":"0"//Jumlah catatan kesalahan.
},
"speed":{
"throttle":true,//Ketika throttle adalah false, parameter mbps tidak berlaku, yang berarti laju tidak dibatasi. Ketika throttle adalah true, laju dibatasi.
"concurrent":1, //Jumlah tugas konkuren.
"mbps":"12"//Laju transmisi maksimum. 1 mbps sama dengan 1 MB/detik.
}
},
"order":{
"hops":[
{
"from":"Reader",
"to":"Writer"
}
]
}
}Parameter skrip Writer
Parameter | Deskripsi | Wajib |
datasource | Nama sumber data. Editor kode mendukung penambahan sumber data. Nilai parameter ini harus sama dengan nama sumber data yang ditambahkan. | Ya |
server | Alamat server Kafka dalam format ip:port. | Ya |
topic | Topik Kafka. Ini adalah kategori untuk feed pesan berbeda yang diproses Kafka. Setiap pesan yang dipublikasikan ke kluster Kafka memiliki kategori, yang disebut topik. Topik adalah kumpulan sekelompok pesan. | Ya |
valueIndex | Kolom di writer Kafka yang digunakan sebagai nilai. Jika tidak ditentukan, semua kolom digabungkan untuk membentuk nilai secara default. Pemisah ditentukan oleh fieldDelimiter. | Tidak |
writeMode | Saat valueIndex tidak dikonfigurasi, parameter ini menentukan format penggabungan semua kolom catatan sumber untuk membentuk nilai catatan Kafka. Nilai yang valid adalah text dan JSON. Nilai default adalah text.
Misalnya, jika catatan sumber memiliki tiga kolom dengan nilai a, b, dan c, dan writeMode diatur ke text serta fieldDelimiter diatur ke #, nilai catatan Kafka yang ditulis adalah string a#b#c. Jika writeMode diatur ke JSON dan column diatur ke [{"name":"col1"},{"name":"col2"},{"name":"col3"}], nilai catatan Kafka yang ditulis adalah string {"col1":"a","col2":"b","col3":"c"}. Jika valueIndex dikonfigurasi, parameter ini tidak berlaku. | Tidak |
column | Bidang di tabel tujuan tempat data akan ditulis, dipisahkan koma. Contoh: Saat valueIndex tidak dikonfigurasi dan writeMode diatur ke JSON, parameter ini mendefinisikan nama bidang dalam struktur JSON untuk nilai kolom catatan sumber. Contoh,
Jika valueIndex dikonfigurasi, atau jika writeMode diatur ke text, parameter ini tidak berlaku. | Wajib saat valueIndex tidak dikonfigurasi dan writeMode diatur ke JSON. |
partition | Menentukan nomor partisi dalam topik Kafka tempat data ditulis. Ini harus berupa bilangan bulat yang lebih besar dari atau sama dengan 0. | Tidak |
keyIndex | Kolom di writer Kafka yang digunakan sebagai kunci. Nilai parameter keyIndex harus berupa bilangan bulat yang lebih besar dari atau sama dengan 0. Jika tidak, tugas akan gagal. | Tidak |
keyIndexes | Array nomor urut kolom dalam catatan sumber yang digunakan sebagai kunci untuk catatan Kafka. Nomor urut kolom dimulai dari 0. Misalnya, [0,1,2] akan menggabungkan nilai semua nomor kolom yang dikonfigurasi dengan koma untuk membentuk kunci catatan Kafka. Jika tidak ditentukan, kunci catatan Kafka adalah null, dan data ditulis ke partisi topik secara round-robin. Anda hanya dapat menentukan salah satu parameter ini atau keyIndex. | Tidak |
fieldDelimiter | Saat writeMode diatur ke text dan valueIndex tidak dikonfigurasi, semua kolom catatan sumber digabungkan menggunakan pemisah kolom yang ditentukan oleh parameter ini untuk membentuk nilai catatan Kafka. Anda dapat mengonfigurasi satu karakter atau beberapa karakter sebagai pemisah. Anda dapat mengonfigurasi karakter Unicode dalam format \u0001. Karakter escape seperti \t dan \n didukung. Nilai default adalah \t. Jika writeMode tidak diatur ke text atau jika valueIndex dikonfigurasi, parameter ini tidak berlaku. | Tidak |
keyType | Tipe kunci Kafka. Nilai yang valid: BYTEARRAY, DOUBLE, FLOAT, INTEGER, LONG, dan SHORT. | Ya |
valueType | Tipe nilai Kafka. Nilai yang valid: BYTEARRAY, DOUBLE, FLOAT, INTEGER, LONG, dan SHORT. | Ya |
nullKeyFormat | Jika nilai kolom sumber yang ditentukan oleh keyIndex atau keyIndexes adalah null, nilai tersebut diganti dengan string yang ditentukan oleh parameter ini. Jika tidak dikonfigurasi, tidak ada penggantian yang dilakukan. | Tidak |
nullValueFormat | Jika nilai kolom sumber adalah null, nilai tersebut diganti dengan string yang ditentukan oleh parameter ini saat merakit nilai catatan Kafka. Jika tidak dikonfigurasi, tidak ada penggantian yang dilakukan. | Tidak |
acks | Konfigurasi acks saat menginisialisasi produsen Kafka. Ini menentukan metode pengakuan untuk penulisan yang berhasil. Secara default, parameter acks diatur ke all. Nilai yang valid untuk acks adalah:
| Tidak |
Lampiran: Definisi format pesan untuk penulisan ke Kafka
Setelah Anda mengonfigurasi dan menjalankan tugas sinkronisasi real-time, data yang dibaca dari database sumber ditulis ke topik Kafka dalam format JSON. Pertama, semua data yang ada di tabel sumber yang ditentukan ditulis ke topik Kafka yang sesuai. Kemudian, tugas memulai sinkronisasi real-time untuk terus menulis data inkremental ke topik tersebut. Informasi perubahan DDL inkremental dari tabel sumber juga ditulis ke topik Kafka dalam format JSON. Anda dapat memperoleh status dan informasi perubahan pesan yang ditulis ke Kafka. Untuk informasi lebih lanjut, lihat Lampiran: Format pesan.
Dalam struktur JSON data yang ditulis ke Kafka oleh tugas sinkronisasi offline, bidang payload.sequenceId, payload.timestamp.eventTime dan payload.timestamp.checkpointTime diatur ke -1.
Lampiran: Tipe bidang JSON
Saat writeMode diatur ke JSON, Anda dapat menggunakan bidang type dalam parameter column untuk menentukan tipe data JSON. Selama operasi tulis, sistem mencoba mengonversi nilai kolom dalam catatan sumber ke tipe yang ditentukan. Jika konversi gagal, data kotor dihasilkan.
Nilai yang valid | Deskripsi |
JSON_STRING | Mengonversi nilai kolom catatan sumber menjadi string dan menulisnya ke bidang JSON. Misalnya, jika nilai kolom catatan sumber adalah bilangan bulat |
JSON_NUMBER | Mengonversi nilai kolom catatan sumber menjadi angka dan menulisnya ke bidang JSON. Misalnya, jika nilai kolom catatan sumber adalah string |
JSON_BOOL | Mengonversi nilai kolom catatan sumber menjadi nilai Boolean dan menulisnya ke bidang JSON. Misalnya, jika nilai kolom catatan sumber adalah string |
JSON_ARRAY | Mengonversi nilai kolom catatan sumber menjadi array JSON dan menulisnya ke bidang JSON. Misalnya, jika nilai kolom catatan sumber adalah string |
JSON_MAP | Mengonversi nilai kolom catatan sumber menjadi objek JSON dan menulisnya ke bidang JSON. Misalnya, jika nilai kolom catatan sumber adalah string |
JSON_BASE64 | Mengonversi konten byte biner dari nilai kolom catatan sumber menjadi string yang dikodekan BASE64 dan menulisnya ke bidang JSON. Misalnya, jika nilai kolom catatan sumber adalah array byte dengan panjang 2, direpresentasikan dalam heksadesimal sebagai |
JSON_HEX | Mengonversi konten byte biner dari nilai kolom catatan sumber menjadi string numerik heksadesimal dan menulisnya ke bidang JSON. Misalnya, jika nilai kolom catatan sumber adalah array byte dengan panjang 2, direpresentasikan dalam heksadesimal sebagai |