Topik ini menjelaskan cara menggunakan konektor ApsaraMQ for Kafka.
Informasi latar belakang
Apache Kafka adalah sistem antrian pesan terdistribusi open source yang banyak digunakan dalam bidang data besar, seperti pemrosesan data berkinerja tinggi, analitik aliran, dan integrasi data. Konektor Kafka didasarkan pada klien Apache Kafka open source, menyediakan throughput data tinggi, mendukung pembacaan dan penulisan berbagai format data, serta menawarkan semantik tepat-sekali (exactly-once semantics) untuk Realtime Compute for Apache Flink.
Kategori | Detail |
Jenis yang didukung | Tabel sumber, tabel sink, dan target integrasi data |
Mode runtime | Mode streaming |
Format data | |
Metrik pemantauan spesifik | |
Jenis API | SQL, DataStream, dan YAML integrasi data |
Pembaruan atau penghapusan data di tabel sink | Konektor tidak mendukung pembaruan atau penghapusan data di tabel sink. Konektor hanya mendukung penyisipan data. Catatan Untuk fitur terkait pembaruan dan penghapusan data, lihat Upsert Kafka. |
Prasyarat
Anda dapat menghubungkan ke kluster dengan salah satu cara berikut:
Menghubungkan ke kluster Alibaba Cloud ApsaraMQ for Kafka
Versi kluster Kafka minimal 0.11.
Anda telah membuat kluster ApsaraMQ for Kafka. Untuk informasi selengkapnya, lihat Buat sumber daya.
Ruang kerja Flink dan kluster Kafka berada dalam VPC yang sama, serta kluster ApsaraMQ for Kafka telah menambahkan Flink ke daftar putihnya. Untuk informasi selengkapnya, lihat Konfigurasikan daftar putih.
PentingBatasan untuk menulis data ke ApsaraMQ for Kafka:
ApsaraMQ for Kafka tidak mendukung penulisan data dalam format kompresi zstd.
ApsaraMQ for Kafka tidak mendukung penulisan idempoten atau transaksional. Oleh karena itu, Anda tidak dapat menggunakan fitur semantik tepat-sekali dari tabel sink Kafka. Jika Anda menggunakan Ververica Runtime (VVR) 8.0.0 atau versi lebih baru, Anda harus menambahkan item konfigurasi
properties.enable.idempotence=falseke tabel sink untuk menonaktifkan penulisan idempoten. Untuk perbandingan mesin penyimpanan dan batasan fitur ApsaraMQ for Kafka, lihat Perbandingan mesin penyimpanan.
Menghubungkan ke kluster Apache Kafka yang dikelola sendiri
Versi kluster Apache Kafka yang dikelola sendiri minimal 0.11.
Anda telah menetapkan konektivitas jaringan antara Flink dan kluster Apache Kafka yang dikelola sendiri. Untuk informasi tentang cara menghubungkan ke kluster yang dikelola sendiri melalui jaringan publik, lihat Pilih jenis koneksi jaringan.
Hanya item konfigurasi klien untuk Apache Kafka 2.8 yang didukung. Untuk informasi selengkapnya, lihat dokumentasi Apache Kafka untuk konfigurasi konsumen dan produsen.
Perhatian
Saat ini, penulisan transaksional tidak disarankan karena bug desain pada Flink dan Kafka. Saat Anda mengatur sink.delivery-guarantee = exactly-once, konektor Kafka mengaktifkan penulisan transaksional, dan terdapat tiga masalah yang diketahui:
Setiap checkpoint menghasilkan ID transaksi. Jika interval checkpoint terlalu singkat, terlalu banyak ID transaksi yang dihasilkan. Koordinator kluster Kafka mungkin kehabisan memori, sehingga mengganggu stabilitas kluster Kafka.
Setiap transaksi membuat instance produsen. Jika terlalu banyak transaksi dikomit secara bersamaan, Pengelola Tugas (TaskManager) mungkin kehabisan memori, sehingga mengganggu stabilitas pekerjaan Flink.
Jika beberapa pekerjaan Flink menggunakan
sink.transactional-id-prefixyang sama, ID transaksi yang dihasilkan mungkin bertabrakan. Jika satu pekerjaan gagal menulis, hal ini akan memblokir Log Start Offset (LSO) partisi Kafka agar tidak maju, yang memengaruhi semua konsumen yang membaca data dari partisi tersebut.
Jika Anda memerlukan semantik tepat-sekali, gunakan Upsert Kafka untuk menulis ke tabel kunci primer dan pastikan idempotensi dengan kunci primer tersebut. Jika Anda harus menggunakan penulisan transaksional, lihat Pertimbangan untuk semantik EXACTLY_ONCE.
Pemecahan masalah konektivitas jaringan
Jika pekerjaan Flink melaporkan error Timed out waiting for a node assignment saat startup, penyebabnya biasanya adalah masalah konektivitas jaringan antara Flink dan Kafka.
Klien Kafka menghubungkan ke server sebagai berikut:
Klien menggunakan alamat dalam
bootstrap.serversuntuk menghubungkan ke Kafka.Kafka mengembalikan metadata setiap broker dalam kluster, termasuk alamat koneksi mereka.
Klien kemudian menggunakan alamat yang dikembalikan tersebut untuk menghubungkan ke setiap broker guna melakukan operasi baca dan tulis.
Meskipun alamat bootstrap.servers dapat diakses, klien tidak dapat membaca atau menulis data jika Kafka mengembalikan alamat broker yang salah. Masalah ini sering terjadi pada arsitektur jaringan yang menggunakan proxy, penerusan port, atau jalur sewa.
Langkah pemecahan masalah
ApsaraMQ for Kafka
Konfirmasi jenis endpoint
Endpoint default (jaringan internal)
Endpoint SASL (jaringan internal + autentikasi)
Endpoint jaringan publik (memerlukan permintaan terpisah)
Anda dapat menggunakan Konsol pengembangan Flink untuk melakukan diagnostik jaringan dan mengesampingkan masalah konektivitas dengan alamat
bootstrap.servers.Periksa grup keamanan dan daftar putih
Instans Kafka harus menambahkan Blok CIDR VPC tempat Flink berada ke daftar putihnya. Untuk informasi selengkapnya, lihat Lihat Blok CIDR VPC dan Konfigurasikan daftar putih.
Periksa konfigurasi SASL (jika diaktifkan)
Jika Anda menggunakan endpoint SASL_SSL, Anda harus mengonfigurasi mekanisme JAAS, SSL, dan SASL dengan benar dalam pekerjaan Flink Anda. Autentikasi yang tidak lengkap dapat menyebabkan koneksi gagal pada fase handshake, yang juga dapat muncul sebagai timeout. Untuk informasi selengkapnya, lihat Keamanan dan autentikasi.
Kafka yang dikelola sendiri di ECS
Gunakan Konsol pengembangan Flink untuk melakukan diagnostik jaringan.
Evaluasi masalah konektivitas dengan alamat
bootstrap.serversdan konfirmasi kebenaran endpoint jaringan internal dan publik.Periksa grup keamanan dan daftar putih
Grup keamanan ECS harus mengizinkan trafik pada port endpoint Kafka (biasanya 9092 atau 9093).
Instans ECS harus menambahkan Blok CIDR VPC tempat Flink berada ke daftar putihnya. Untuk informasi selengkapnya, lihat Lihat Blok CIDR VPC.
Periksa konfigurasi
Masuk ke kluster ZooKeeper yang digunakan oleh Kafka. Anda dapat menggunakan tool zkCli.sh atau zookeeper-shell.sh.
Jalankan perintah untuk mengambil metadata broker. Contohnya:
get /brokers/ids/0. Dalam hasil yang dikembalikan, temukan alamat yang diiklankan Kafka kepada klien di bidang endpoints.
Gunakan Konsol pengembangan Flink untuk melakukan diagnostik jaringan untuk menguji apakah alamat tersebut dapat dijangkau.
CatatanJika alamat tersebut tidak dapat dijangkau, hubungi insinyur O&M Kafka untuk memeriksa dan memperbaiki konfigurasi
listenersdanadvertised.listeners. Pastikan alamat yang dikembalikan dapat diakses oleh Flink.Untuk informasi selengkapnya tentang cara klien Kafka menghubungkan ke server, lihat Pemecahan Masalah Konektivitas.
Periksa konfigurasi SASL (jika diaktifkan)
Jika Anda menggunakan endpoint SASL_SSL, Anda harus mengonfigurasi mekanisme JAAS, SSL, dan SASL dengan benar dalam pekerjaan Flink Anda. Autentikasi yang tidak lengkap dapat menyebabkan koneksi gagal pada fase handshake, yang juga dapat muncul sebagai timeout. Untuk informasi selengkapnya, lihat Keamanan dan autentikasi.
SQL
Konektor Kafka dapat digunakan dalam pekerjaan SQL sebagai tabel sumber atau tabel sink.
Sintaks
CREATE TABLE KafkaTable (
`user_id` BIGINT,
`item_id` BIGINT,
`behavior` STRING,
`ts` TIMESTAMP_LTZ(3) METADATA FROM 'timestamp' VIRTUAL
) WITH (
'connector' = 'kafka',
'topic' = 'user_behavior',
'properties.bootstrap.servers' = 'localhost:9092',
'properties.group.id' = 'testGroup',
'scan.startup.mode' = 'earliest-offset',
'format' = 'csv'
)Kolom metadata
Anda dapat mendefinisikan kolom metadata di tabel sumber dan sink untuk mengakses atau menulis metadata pesan Kafka. Misalnya, jika Anda mendefinisikan beberapa topik dalam parameter WITH dan mendefinisikan kolom metadata di tabel sumber Kafka, data yang dibaca Flink ditandai dengan topik asalnya. Contoh berikut menunjukkan cara menggunakan kolom metadata.
CREATE TABLE kafka_source (
-- Baca topik pesan sebagai bidang `record_topic`.
`record_topic` STRING NOT NULL METADATA FROM 'topic' VIRTUAL,
-- Baca timestamp dari ConsumerRecord sebagai bidang `ts`.
`ts` TIMESTAMP_LTZ(3) METADATA FROM 'timestamp' VIRTUAL,
-- Baca offset pesan sebagai bidang `record_offset`.
`record_offset` BIGINT NOT NULL METADATA FROM 'offset' VIRTUAL,
...
) WITH (
'connector' = 'kafka',
...
);
CREATE TABLE kafka_sink (
-- Tulis timestamp dari bidang `ts` sebagai timestamp ProducerRecord ke Kafka.
`ts` TIMESTAMP_LTZ(3) METADATA FROM 'timestamp' VIRTUAL,
...
) WITH (
'connector' = 'kafka',
...
);Tabel berikut mencantumkan kolom metadata yang didukung untuk tabel sumber dan sink Kafka.
Kunci | Tipe data | Deskripsi | Tabel sumber atau sink |
topic | STRING NOT NULL METADATA VIRTUAL | Nama topik yang berisi pesan Kafka. | Tabel sumber |
partition | INT NOT NULL METADATA VIRTUAL | ID partisi yang berisi pesan Kafka. | Tabel sumber |
headers | MAP<STRING, BYTES> NOT NULL METADATA VIRTUAL | Header pesan Kafka. | Tabel sumber dan sink |
leader-epoch | INT NOT NULL METADATA VIRTUAL | Leader epoch pesan Kafka. | Tabel sumber |
offset | BIGINT NOT NULL METADATA VIRTUAL | Offset pesan Kafka. | Tabel sumber |
timestamp | TIMESTAMP(3) WITH LOCAL TIME ZONE NOT NULL METADATA VIRTUAL | Timestamp pesan Kafka. | Tabel sumber dan sink |
timestamp-type | STRING NOT NULL METADATA VIRTUAL | Jenis timestamp pesan Kafka:
| Tabel sumber |
__raw_key__ | STRING NOT NULL METADATA VIRTUAL | Bidang kunci pesan Kafka mentah. | Tabel sumber dan sink |
__raw_value__ | STRING NOT NULL METADATA VIRTUAL | Bidang nilai pesan Kafka mentah. | Tabel sumber dan sink |
Parameter WITH
Umum
Parameter
Deskripsi
Tipe data
Wajib
Nilai default
Keterangan
connector
Jenis tabel.
String
Ya
Tidak ada
Nilainya tetap Kafka.
properties.bootstrap.servers
Alamat broker Kafka.
String
Ya
Tidak ada
Formatnya host:port,host:port,host:port. Pisahkan alamat dengan koma (,).
properties.*
Konfigurasi langsung untuk klien Kafka.
String
Tidak
Tidak ada
Akhiran harus merupakan konfigurasi yang didefinisikan dalam dokumentasi resmi Kafka untuk produsen dan konsumen.
Flink menghapus awalan properties. dan meneruskan konfigurasi yang tersisa ke klien Kafka. Misalnya, Anda dapat menggunakan
'properties.allow.auto.create.topics'='false'untuk menonaktifkan pembuatan topik otomatis.Jangan gunakan metode ini untuk memodifikasi konfigurasi berikut, karena akan ditimpa oleh konektor Kafka:
key.deserializer
value.deserializer
format
Format yang digunakan untuk membaca atau menulis bagian nilai pesan Kafka.
String
Tidak
Tidak ada
Format yang didukung:
csv
json
avro
debezium-json
canal-json
maxwell-json
avro-confluent
raw
CatatanUntuk informasi selengkapnya tentang pengaturan parameter format, lihat Parameter Format.
key.format
Format yang digunakan untuk membaca atau menulis bagian kunci pesan Kafka.
String
Tidak
Tidak ada
Format yang didukung:
csv
json
avro
debezium-json
canal-json
maxwell-json
avro-confluent
raw
CatatanJika Anda menggunakan konfigurasi ini, konfigurasi key.options wajib ditentukan.
key.fields
Bidang di tabel sumber atau sink yang sesuai dengan bagian kunci pesan Kafka.
String
Tidak
Tidak ada
Pisahkan beberapa nama bidang dengan titik koma (;). Contohnya:
field1;field2key.fields-prefix
Menentukan awalan khusus untuk semua bidang kunci pesan Kafka guna menghindari konflik nama dengan bidang di bagian nilai pesan.
String
Tidak
Tidak ada
Item konfigurasi ini hanya digunakan untuk membedakan nama kolom di tabel sumber dan sink. Awalan dihapus saat mengurai dan menghasilkan bagian kunci pesan Kafka.
CatatanJika Anda menggunakan konfigurasi ini, Anda harus mengatur
value.fields-includeke EXCEPT_KEY.value.format
Format yang digunakan untuk membaca atau menulis bagian nilai pesan Kafka.
String
Tidak
Tidak ada
Konfigurasi ini setara dengan
format. Anda hanya dapat mengatur salah satu dariformatatauvalue.format. Jika keduanya dikonfigurasi,value.formatakan menimpaformat.value.fields-include
Menentukan apakah akan menyertakan bidang yang sesuai dengan bagian kunci pesan saat mengurai atau menghasilkan bagian nilai pesan Kafka.
String
Tidak
ALL
Nilai yang valid:
ALL(default): Semua kolom diproses sebagai bagian nilai pesan Kafka.EXCEPT_KEY: Bidang yang tersisa, tidak termasuk yang didefinisikan di key.fields, diproses sebagai bagian nilai pesan Kafka.
Tabel sumber
Parameter
Deskripsi
Tipe data
Wajib
Nilai default
Keterangan
topic
Nama topik untuk dibaca.
String
Tidak
Tidak ada
Pisahkan beberapa nama topik dengan titik koma (;), misalnya topic-1;topic-2.
CatatanAnda hanya dapat menentukan salah satu opsi topic atau topic-pattern.
topic-pattern
Ekspresi reguler yang cocok dengan nama topik untuk dibaca. Semua topik yang cocok dengan ekspresi reguler ini akan dibaca saat pekerjaan berjalan.
String
Tidak
Tidak ada
CatatanAnda hanya dapat menentukan salah satu opsi topic atau topic-pattern.
properties.group.id
ID kelompok konsumen.
String
Tidak
KafkaSource-{source_table_name}
Jika ID grup yang ditentukan digunakan untuk pertama kalinya, Anda harus mengatur properties.auto.offset.reset ke earliest atau latest untuk menentukan offset awal.
scan.startup.mode
Offset awal untuk membaca data dari Kafka.
String
Tidak
group-offsets
Nilai yang valid:
earliest-offset: Mulai membaca dari partisi paling awal di Kafka.latest-offset: Mulai membaca dari offset terbaru di Kafka.group-offsets(default): Mulai membaca dari offset yang dikomit oleh properties.group.id yang ditentukan.timestamp: Mulai membaca dari timestamp yang ditentukan oleh scan.startup.timestamp-millis.specific-offsets: Mulai membaca dari offset yang ditentukan oleh scan.startup.specific-offsets.
CatatanParameter ini berlaku saat pekerjaan dimulai tanpa state. Saat pekerjaan dimulai ulang dari checkpoint atau pulih dari state, pekerjaan akan menggunakan progres yang disimpan dalam state untuk melanjutkan pembacaan.
scan.startup.specific-offsets
Dalam mode startup specific-offsets, menentukan offset awal untuk setiap partisi.
String
Tidak
Tidak ada
Contoh:
partition:0,offset:42;partition:1,offset:300scan.startup.timestamp-millis
Dalam mode startup timestamp, menentukan timestamp offset awal.
Long
Tidak
Tidak ada
Unitnya milidetik.
scan.topic-partition-discovery.interval
Interval untuk menemukan topik dan partisi Kafka secara dinamis.
Duration
Tidak
5 menit
Interval pemeriksaan partisi default adalah 5 menit. Untuk menonaktifkan fitur ini, Anda harus secara eksplisit mengatur interval pemeriksaan partisi ke nilai non-positif. Saat penemuan partisi dinamis diaktifkan, sumber Kafka dapat secara otomatis menemukan partisi baru dan membaca data darinya. Dalam mode topic-pattern, sumber Kafka tidak hanya membaca data dari partisi baru topik yang ada, tetapi juga membaca data dari semua partisi topik baru yang cocok dengan ekspresi reguler.
CatatanDalam Ververica Runtime (VVR) 6.0.x, penemuan partisi dinamis dinonaktifkan secara default. Mulai dari VVR 8.0, fitur ini diaktifkan secara default dengan interval penemuan 5 menit.
scan.header-filter
Memfilter data berdasarkan apakah data Kafka berisi header tertentu.
String
Tidak
Tidak ada
Pisahkan kunci header dan nilai dengan titik dua (:). Hubungkan beberapa kondisi header dengan operator logika (&, |). Operator logika NOT (!) juga didukung. Misalnya,
depart:toy|depart:book&!env:testmenyimpan data Kafka yang headernya berisi depart=toy atau depart=book, dan tidak berisi env=test.CatatanParameter ini hanya didukung di Ververica Runtime (VVR) 8.0.6 dan versi lebih baru.
Tanda kurung tidak didukung dalam operasi logika.
Operasi logika dilakukan dari kiri ke kanan.
Nilai header dikonversi ke string dalam format UTF-8 untuk perbandingan dengan nilai header yang ditentukan.
scan.check.duplicated.group.id
Menentukan apakah akan memeriksa duplikasi kelompok konsumen yang ditentukan oleh
properties.group.id.Boolean
Tidak
false
Nilai yang valid:
true: Sebelum memulai pekerjaan, sistem memeriksa duplikasi kelompok konsumen. Jika ditemukan duplikasi, pekerjaan melaporkan error dan berhenti, mencegah konflik dengan kelompok konsumen yang ada.
false: Pekerjaan langsung dimulai tanpa memeriksa konflik kelompok konsumen.
CatatanParameter ini hanya didukung di VVR 6.0.4 dan versi lebih baru.
Tabel sink
Parameter
Deskripsi
Tipe data
Wajib
Nilai default
Keterangan
topic
Nama topik untuk ditulis.
String
Ya
Tidak ada
Tidak ada
sink.partitioner
Mode pemetaan dari konkurensi Flink ke partisi Kafka.
String
Tidak
default
Nilai yang valid:
default: Menggunakan partitioner Kafka default.
fixed: Setiap konkurensi Flink sesuai dengan partisi Kafka tetap.
round-robin: Data dari konkurensi Flink dialokasikan ke partisi Kafka secara round-robin.
Partitioner kustom: Jika fixed dan round-robin tidak memenuhi kebutuhan Anda, Anda dapat membuat kelas turunan dari FlinkKafkaPartitioner untuk mendefinisikan partitioner kustom. Contohnya: org.mycompany.MyPartitioner
sink.delivery-guarantee
Semantik pengiriman untuk tabel sink Kafka.
String
Tidak
at-least-once
Nilai yang valid:
none: Tanpa jaminan. Data mungkin hilang atau diduplikasi.
at-least-once (default): Menjamin tidak ada data yang hilang, tetapi data mungkin diduplikasi.
exactly-once: Menggunakan transaksi Kafka untuk menjamin data tidak hilang atau diduplikasi.
CatatanSaat menggunakan semantik exactly-once, parameter sink.transactional-id-prefix wajib ditentukan.
sink.transactional-id-prefix
Awalan untuk ID transaksi Kafka yang digunakan dalam semantik exactly-once.
String
Tidak
Tidak ada
Konfigurasi ini hanya berlaku saat sink.delivery-guarantee diatur ke exactly-once.
sink.parallelism
Tingkat paralelisme untuk operator tabel sink Kafka.
Integer
Tidak
Tidak ada
Konkurensi operator hulu ditentukan oleh framework.
Keamanan dan autentikasi
Jika kluster Kafka memerlukan koneksi aman atau autentikasi, Anda dapat menambahkan konfigurasi keamanan dan autentikasi terkait ke parameter WITH dengan awalan properties.. Contoh berikut menunjukkan cara mengonfigurasi tabel Kafka untuk menggunakan PLAIN sebagai mekanisme SASL dan menyediakan konfigurasi JAAS.
CREATE TABLE KafkaTable (
`user_id` BIGINT,
`item_id` BIGINT,
`behavior` STRING,
`ts` TIMESTAMP_LTZ(3) METADATA FROM 'timestamp'
) WITH (
'connector' = 'kafka',
...
'properties.security.protocol' = 'SASL_PLAINTEXT',
'properties.sasl.mechanism' = 'PLAIN',
'properties.sasl.jaas.config' = 'org.apache.flink.kafka.shaded.org.apache.kafka.common.security.plain.PlainLoginModule required username="username" password="password";'
)Contoh berikut menunjukkan cara menggunakan SASL_SSL sebagai protokol keamanan dan SCRAM-SHA-256 sebagai mekanisme SASL.
CREATE TABLE KafkaTable (
`user_id` BIGINT,
`item_id` BIGINT,
`behavior` STRING,
`ts` TIMESTAMP_LTZ(3) METADATA FROM 'timestamp'
) WITH (
'connector' = 'kafka',
...
'properties.security.protocol' = 'SASL_SSL',
/*Konfigurasi SSL*/
/*Konfigurasikan path ke truststore (sertifikat CA) yang disediakan oleh server.*/
/*File yang diunggah melalui Manajemen File disimpan di path /flink/usrlib/.*/
'properties.ssl.truststore.location' = '/flink/usrlib/kafka.client.truststore.jks',
'properties.ssl.truststore.password' = 'test1234',
/*Jika autentikasi klien diperlukan, konfigurasikan path ke keystore (kunci privat).*/
'properties.ssl.keystore.location' = '/flink/usrlib/kafka.client.keystore.jks',
'properties.ssl.keystore.password' = 'test1234',
/*Algoritma bagi klien untuk memverifikasi alamat server. Nilai kosong menonaktifkan verifikasi alamat server.*/
'properties.ssl.endpoint.identification.algorithm' = '',
/*Konfigurasi SASL*/
/*Atur mekanisme SASL ke SCRAM-SHA-256.*/
'properties.sasl.mechanism' = 'SCRAM-SHA-256',
/*Konfigurasikan JAAS*/
'properties.sasl.jaas.config' = 'org.apache.flink.kafka.shaded.org.apache.kafka.common.security.scram.ScramLoginModule required username="username" password="password";'
)Anda dapat mengunggah sertifikat CA dan kunci privat yang disebutkan dalam contoh ke platform menggunakan fitur Manajemen File di Konsol Realtime Compute. Setelah diunggah, file disimpan di direktori /flink/usrlib. Jika file sertifikat CA yang akan digunakan bernama my-truststore.jks, tentukan 'properties.ssl.truststore.location' = '/flink/usrlib/my-truststore.jks' dalam parameter WITH untuk menggunakan sertifikat ini.
Contoh di atas berlaku untuk sebagian besar skenario konfigurasi. Sebelum mengonfigurasi konektor Kafka, hubungi insinyur O&M server Kafka untuk mendapatkan konfigurasi keamanan dan autentikasi yang benar.
Berbeda dengan Flink open source, editor SQL Realtime Compute for Apache Flink secara otomatis meng-escape tanda kutip ganda ("). Oleh karena itu, Anda tidak perlu menambahkan karakter escape (\) tambahan untuk tanda kutip ganda dalam username dan password saat mengonfigurasi
properties.sasl.jaas.config.
Offset awal tabel sumber
Mode startup
Anda dapat menentukan offset baca awal untuk tabel sumber Kafka dengan mengonfigurasi scan.startup.mode:
earliest-offset: Mulai membaca dari offset paling awal partisi saat ini.
latest-offset: Mulai membaca dari offset terbaru partisi saat ini.
group-offsets: Mulai membaca dari offset yang dikomit oleh ID grup yang ditentukan. ID grup ditentukan oleh properties.group.id.
timestamp: Mulai membaca dari pesan pertama yang timestamp-nya lebih besar dari atau sama dengan waktu yang ditentukan. Timestamp ditentukan oleh scan.startup.timestamp-millis.
specific-offsets: Mulai mengonsumsi dari offset partisi yang ditentukan. Offset ditentukan oleh scan.startup.specific-offsets.
Jika Anda tidak menentukan offset awal, konsumsi dimulai dari offset yang dikomit (group-offsets) secara default.
Parameter scan.startup.mode hanya berlaku untuk pekerjaan yang dimulai tanpa state. Saat pekerjaan stateful dimulai, pekerjaan mulai mengonsumsi dari offset yang disimpan dalam statenya.
Kode berikut memberikan contoh:
CREATE TEMPORARY TABLE kafka_source (
...
) WITH (
'connector' = 'kafka',
...
-- Mulai mengonsumsi dari offset paling awal.
'scan.startup.mode' = 'earliest-offset',
-- Mulai mengonsumsi dari offset terbaru.
'scan.startup.mode' = 'latest-offset',
-- Mulai mengonsumsi dari offset yang dikomit oleh kelompok konsumen "my-group".
'properties.group.id' = 'my-group',
'scan.startup.mode' = 'group-offsets',
'properties.auto.offset.reset' = 'earliest', -- Jika "my-group" digunakan untuk pertama kalinya, mulai mengonsumsi dari offset paling awal.
'properties.auto.offset.reset' = 'latest', -- Jika "my-group" digunakan untuk pertama kalinya, mulai mengonsumsi dari offset terbaru.
-- Mulai mengonsumsi dari timestamp milidetik yang ditentukan 1655395200000.
'scan.startup.mode' = 'timestamp',
'scan.startup.timestamp-millis' = '1655395200000',
-- Mulai mengonsumsi dari offset yang ditentukan.
'scan.startup.mode' = 'specific-offsets',
'scan.startup.specific-offsets' = 'partition:0,offset:42;partition:1,offset:300'
);Prioritas offset awal
Prioritas offset awal tabel sumber adalah sebagai berikut:
Prioritas dari tinggi ke rendah | Offset yang disimpan dalam checkpoint atau titik simpan |
Waktu mulai yang dipilih di Konsol Realtime Compute saat memulai pekerjaan | |
Offset awal yang ditentukan oleh scan.startup.mode dalam parameter WITH | |
Jika scan.startup.mode tidak ditentukan, group-offsets digunakan, dan offset kelompok konsumen yang sesuai digunakan |
Jika offset menjadi tidak valid pada langkah apa pun (misalnya, karena kedaluwarsa atau masalah kluster Kafka), sistem menggunakan kebijakan yang ditetapkan dalam properties.auto.offset.reset untuk mengatur ulang offset. Jika parameter ini tidak dikonfigurasi, terjadi exception dan memerlukan intervensi manual.
Skenario umum adalah memulai konsumsi dengan ID grup baru. Pertama, tabel sumber menanyakan kluster Kafka untuk offset yang dikomit oleh grup tersebut. Karena ini pertama kalinya ID grup digunakan, tidak ditemukan offset yang valid. Oleh karena itu, offset diatur ulang sesuai dengan kebijakan yang dikonfigurasi dalam parameter properties.auto.offset.reset. Saat mengonsumsi dengan ID grup baru, Anda harus mengonfigurasi properties.auto.offset.reset untuk menentukan kebijakan pengaturan ulang offset.
Komit offset tabel sumber
Tabel sumber Kafka hanya mengkomit offset konsumen saat ini ke kluster Kafka setelah checkpoint berhasil. Jika interval checkpoint panjang, offset konsumen yang diamati di kluster Kafka akan tertinggal. Selama checkpoint, tabel sumber Kafka menyimpan progres baca saat ini dalam statenya dan tidak bergantung pada offset yang dikomit ke kluster untuk pemulihan kesalahan. Pengkomitan offset hanya untuk memantau progres baca di sisi Kafka. Kegagalan komit offset tidak memengaruhi kebenaran data.
Partitioner kustom untuk tabel sink
Jika partitioner produsen Kafka bawaan tidak memenuhi kebutuhan Anda, Anda dapat mengimplementasikan partitioner kustom untuk menulis data ke partisi yang sesuai. Partitioner kustom harus mewarisi dari FlinkKafkaPartitioner. Setelah pengembangan, kompilasi paket JAR dan unggah ke Konsol Realtime Compute menggunakan fitur Manajemen File. Setelah diunggah dan direferensikan, atur parameter sink.partitioner dalam klausa WITH ke path kelas lengkap partitioner tersebut, seperti org.mycompany.MyPartitioner.
Memilih antara Kafka, Upsert Kafka, dan katalog Kafka JSON
Kafka adalah sistem antrian pesan append-only yang tidak mendukung pembaruan atau penghapusan data. Oleh karena itu, Kafka tidak dapat menangani data Change Data Capture (CDC) hulu atau logika penarikan dari operator seperti agregasi dan join dalam SQL streaming. Untuk menulis data dengan perubahan atau penarikan ke Kafka, gunakan tabel sink Upsert Kafka, yang dirancang khusus untuk menangani data perubahan.
Untuk menyinkronkan data perubahan dari satu atau beberapa tabel di database hulu ke Kafka secara batch dengan mudah, Anda dapat menggunakan katalog Kafka JSON. Jika data yang disimpan di Kafka dalam format JSON, penggunaan katalog Kafka JSON menghilangkan kebutuhan untuk mendefinisikan skema dan parameter WITH. Untuk informasi selengkapnya, lihat Kelola katalog Kafka JSON.
Contoh
Contoh 1: Baca data dari Kafka dan tulis ke Kafka
Baca data Kafka dari topik bernama `source` dan tulis ke topik bernama `sink`. Datanya dalam format CSV.
CREATE TEMPORARY TABLE kafka_source (
id INT,
name STRING,
age INT
) WITH (
'connector' = 'kafka',
'topic' = 'source',
'properties.bootstrap.servers' = '<yourKafkaBrokers>',
'properties.group.id' = '<yourKafkaConsumerGroupId>',
'format' = 'csv'
);
CREATE TEMPORARY TABLE kafka_sink (
id INT,
name STRING,
age INT
) WITH (
'connector' = 'kafka',
'topic' = 'sink',
'properties.bootstrap.servers' = '<yourKafkaBrokers>',
'properties.group.id' = '<yourKafkaConsumerGroupId>',
'format' = 'csv'
);
INSERT INTO kafka_sink SELECT id, name, age FROM kafka_source;Contoh 2: Sinkronisasi skema dan data tabel
Sinkronisasi pesan dari topik Kafka ke Hologres secara real time. Dalam kasus ini, Anda dapat menggunakan offset dan ID partisi pesan Kafka sebagai kunci primer untuk memastikan tidak ada pesan duplikat di Hologres selama failover.
CREATE TEMPORARY TABLE kafkaTable (
`offset` INT NOT NULL METADATA,
`part` BIGINT NOT NULL METADATA FROM 'partition',
PRIMARY KEY (`part`, `offset`) NOT ENFORCED
) WITH (
'connector' = 'kafka',
'properties.bootstrap.servers' = '<yourKafkaBrokers>',
'topic' = 'kafka_evolution_demo',
'scan.startup.mode' = 'earliest-offset',
'format' = 'json',
'json.infer-schema.flatten-nested-columns.enable' = 'true'
-- Opsional. Meratakan semua kolom bersarang.
);
CREATE TABLE IF NOT EXISTS hologres.kafka.`sync_kafka`
WITH (
'connector' = 'hologres'
) AS TABLE vvp.`default`.kafkaTable;Contoh 3: Sinkronisasi skema tabel dan data kunci serta nilai pesan Kafka
Jika bagian kunci pesan Kafka sudah menyimpan informasi terkait, Anda dapat menyinkronkan kunci dan nilai dari Kafka.
CREATE TEMPORARY TABLE kafkaTable (
`key_id` INT NOT NULL,
`val_name` VARCHAR(200)
) WITH (
'connector' = 'kafka',
'properties.bootstrap.servers' = '<yourKafkaBrokers>',
'topic' = 'kafka_evolution_demo',
'scan.startup.mode' = 'earliest-offset',
'key.format' = 'json',
'value.format' = 'json',
'key.fields' = 'key_id',
'key.fields-prefix' = 'key_',
'value.fields-prefix' = 'val_',
'value.fields-include' = 'EXCEPT_KEY'
);
CREATE TABLE IF NOT EXISTS hologres.kafka.`sync_kafka`(
WITH (
'connector' = 'hologres'
) AS TABLE vvp.`default`.kafkaTable;Bagian kunci pesan Kafka tidak mendukung perubahan skema tabel atau penguraian tipe. Anda harus mendeklarasikannya secara manual.
Contoh 4: Sinkronisasi skema dan data tabel serta melakukan komputasi
Saat menyinkronkan data Kafka ke Hologres, Anda sering perlu melakukan komputasi ringan.
CREATE TEMPORARY TABLE kafkaTable (
`distinct_id` INT NOT NULL,
`properties` STRING,
`timestamp` TIMESTAMP_LTZ METADATA,
`date` AS CAST(`timestamp` AS DATE)
) WITH (
'connector' = 'kafka',
'properties.bootstrap.servers' = '<yourKafkaBrokers>',
'topic' = 'kafka_evolution_demo',
'scan.startup.mode' = 'earliest-offset',
'key.format' = 'json',
'value.format' = 'json',
'key.fields' = 'key_id',
'key.fields-prefix' = 'key_'
);
CREATE TABLE IF NOT EXISTS hologres.kafka.`sync_kafka` WITH (
'connector' = 'hologres'
) AS TABLE vvp.`default`.kafkaTable
ADD COLUMN
`order_id` AS COALESCE(JSON_VALUE(`properties`, '$.order_id'), 'default');
-- Gunakan COALESCE untuk menangani nilai null.Contoh 5: Uraikan JSON bersarang
Contoh pesan JSON
{
"id": 101,
"name": "VVP",
"properties": {
"owner": "Alibaba Cloud",
"engine": "Flink"
}
}Untuk menghindari penggunaan fungsi seperti JSON_VALUE(payload, '$.properties.owner') untuk mengurai bidang nanti, Anda dapat mendefinisikan struktur langsung dalam DDL Sumber:
CREATE TEMPORARY TABLE kafka_source (
id VARCHAR,
`name` VARCHAR,
properties ROW<`owner` STRING, engine STRING>
) WITH (
'connector' = 'kafka',
'topic' = 'xxx',
'properties.bootstrap.servers' = 'xxx',
'scan.startup.mode' = 'earliest-offset',
'format' = 'json'
);Dengan cara ini, Flink mengurai JSON menjadi bidang terstruktur pada tahap baca. Kueri SQL selanjutnya dapat langsung menggunakan properties.owner tanpa panggilan fungsi tambahan, menghasilkan kinerja keseluruhan yang lebih baik.
API Datastream
Untuk membaca dan menulis data menggunakan API DataStream, Anda harus menggunakan konektor DataStream yang sesuai untuk menghubungkan ke Realtime Compute for Apache Flink. Untuk informasi tentang cara menyiapkan konektor DataStream, lihat Gunakan konektor DataStream.
Buat sumber Kafka
Sumber Kafka menyediakan kelas builder untuk membuat instance KafkaSource. Contoh kode berikut menunjukkan cara membuat sumber Kafka untuk mengonsumsi data dari offset paling awal `input-topic`. Kelompok konsumen bernama `my-group`, dan isi pesan Kafka dideserialisasi menjadi string.
Java
KafkaSource<String> source = KafkaSource.<String>builder() .setBootstrapServers(brokers) .setTopics("input-topic") .setGroupId("my-group") .setStartingOffsets(OffsetsInitializer.earliest()) .setValueOnlyDeserializer(new SimpleStringSchema()) .build(); env.fromSource(source, WatermarkStrategy.noWatermarks(), "Kafka Source");Saat membuat KafkaSource, Anda harus menentukan parameter berikut.
Parameter
Deskripsi
BootstrapServers
Alamat broker Kafka. Konfigurasikan ini menggunakan metode setBootstrapServers(String).
GroupId
ID kelompok konsumen. Konfigurasikan ini menggunakan metode setGroupId(String).
Topics or Partition
Nama topik atau partisi yang berlangganan. Sumber Kafka menyediakan tiga cara berikut untuk berlangganan topik atau partisi:
Daftar topik: Berlangganan semua partisi dalam daftar topik.
KafkaSource.builder().setTopics("topic-a","topic-b")Pencocokan ekspresi reguler: Berlangganan semua partisi topik yang cocok dengan ekspresi reguler.
KafkaSource.builder().setTopicPattern("topic.*")Daftar partisi: Berlangganan partisi yang ditentukan.
final HashSet<TopicPartition> partitionSet = new HashSet<>(Arrays.asList( new TopicPartition("topic-a", 0), // Partisi 0 dari topik "topic-a" new TopicPartition("topic-b", 5))); // Partisi 5 dari topik "topic-b" KafkaSource.builder().setPartitions(partitionSet)
Deserializer
Deserializer untuk mengurai pesan Kafka.
Tentukan deserializer menggunakan setDeserializer(KafkaRecordDeserializationSchema), di mana KafkaRecordDeserializationSchema mendefinisikan cara mengurai ConsumerRecord Kafka. Jika Anda hanya perlu mengurai data dalam isi pesan (nilai) pesan Kafka, Anda dapat melakukannya dengan salah satu cara berikut:
Gunakan metode setValueOnlyDeserializer(DeserializationSchema) dalam kelas builder KafkaSource yang disediakan oleh Flink. DeserializationSchema mendefinisikan cara mengurai data biner dalam isi pesan Kafka.
Gunakan parser yang disediakan oleh Kafka, yang mencakup beberapa kelas implementasi. Misalnya, Anda dapat menggunakan StringDeserializer untuk mengurai isi pesan Kafka menjadi string.
import org.apache.kafka.common.serialization.StringDeserializer; KafkaSource.<String>builder() .setDeserializer(KafkaRecordDeserializationSchema.valueOnly(StringDeserializer.class));
CatatanUntuk mengurai ConsumerRecord sepenuhnya, Anda harus mengimplementasikan antarmuka KafkaRecordDeserializationSchema sendiri.
XML
Konektor DataStream Kafka tersedia di Repositori Maven Central.
<dependency> <groupId>com.alibaba.ververica</groupId> <artifactId>ververica-connector-kafka</artifactId> <version>${vvr-version}</version> </dependency>Saat menggunakan konektor DataStream Kafka, Anda perlu memahami properti Kafka berikut:
Offset awal konsumen
Sumber Kafka dapat menentukan offset awal untuk konsumsi melalui inisialisator offset (OffsetsInitializer). Inisialisator offset bawaan meliputi yang berikut.
Inisialisator offset
Pengaturan kode
Mulai mengonsumsi dari offset paling awal.
KafkaSource.builder().setStartingOffsets(OffsetsInitializer.earliest())Mulai mengonsumsi dari offset terbaru.
KafkaSource.builder().setStartingOffsets(OffsetsInitializer.latest())Mulai mengonsumsi dari data dengan timestamp lebih besar dari atau sama dengan waktu yang ditentukan, dalam milidetik.
KafkaSource.builder().setStartingOffsets(OffsetsInitializer.timestamp(1592323200000L))Mulai mengonsumsi dari offset yang dikomit oleh kelompok konsumen. Jika offset yang dikomit tidak ada, gunakan offset paling awal.
KafkaSource.builder().setStartingOffsets(OffsetsInitializer.committedOffsets(OffsetResetStrategy.EARLIEST))Mulai mengonsumsi dari offset yang dikomit oleh kelompok konsumen, tanpa menentukan strategi pengaturan ulang offset.
KafkaSource.builder().setStartingOffsets(OffsetsInitializer.committedOffsets())CatatanJika inisialisator bawaan tidak memenuhi kebutuhan Anda, Anda dapat mengimplementasikan inisialisator offset kustom.
Jika tidak ada inisialisator offset yang ditentukan, OffsetsInitializer.earliest() (offset paling awal) digunakan secara default.
Mode streaming dan batch
Sumber Kafka mendukung mode runtime streaming dan batch. Secara default, sumber Kafka diatur untuk berjalan dalam mode streaming, sehingga pekerjaan tidak pernah berhenti sampai pekerjaan Flink gagal atau dibatalkan. Untuk mengonfigurasi sumber Kafka agar berjalan dalam mode batch, Anda dapat menggunakan setBounded(OffsetsInitializer) untuk menentukan offset berhenti. Saat semua partisi mencapai offset berhentinya, sumber Kafka keluar.
CatatanUmumnya, tidak ada offset berhenti dalam mode streaming. Untuk memudahkan debugging kode, Anda dapat menggunakan setUnbounded(OffsetsInitializer) untuk menentukan offset berhenti dalam mode streaming. Perhatikan bahwa nama metode untuk menentukan offset berhenti dalam mode streaming dan batch (setUnbounded dan setBounded) berbeda.
Penemuan partisi dinamis
Untuk menangani skenario seperti penskalaan topik atau pembuatan topik baru tanpa memulai ulang pekerjaan Flink, Anda dapat mengaktifkan fitur penemuan partisi dinamis dalam mode langganan topik atau partisi yang disediakan.
CatatanFitur penemuan partisi dinamis diaktifkan secara default, dan interval pemeriksaan partisi adalah 5 menit. Untuk menonaktifkan fitur ini, Anda harus secara eksplisit mengatur interval pemeriksaan partisi ke nilai non-positif. Kode berikut memberikan contoh.
KafkaSource.builder() .setProperty("partition.discovery.interval.ms", "10000") // Periksa partisi baru setiap 10 detik.PentingFitur penemuan partisi dinamis bergantung pada mekanisme pembaruan metadata kluster Kafka. Jika kluster Kafka tidak memperbarui informasi partisi secara tepat waktu, partisi baru mungkin tidak ditemukan. Pastikan konfigurasi partition.discovery.interval.ms kluster Kafka sesuai dengan situasi aktual.
Waktu event dan watermark
Secara default, sumber Kafka menggunakan timestamp dalam pesan Kafka sebagai waktu event. Anda dapat mendefinisikan strategi watermark kustom untuk mengekstrak waktu event dari pesan dan mengirim watermark ke downstream.
env.fromSource(kafkaSource, new CustomWatermarkStrategy(), "Kafka Source With Custom Watermark Strategy")Untuk informasi selengkapnya tentang strategi watermark kustom, lihat Generating Watermarks.
CatatanJika beberapa tugas sumber paralel menganggur untuk waktu yang lama (misalnya, jika partisi Kafka tidak memiliki input data untuk waktu yang lama, atau jika konkurensi sumber melebihi jumlah partisi Kafka), mekanisme pembuatan watermark mungkin gagal. Dalam kasus ini, sistem tidak dapat memicu perhitungan window secara normal, yang menyebabkan aliran pemrosesan data macet.
Untuk mengatasi masalah ini, Anda dapat melakukan penyesuaian berikut:
Konfigurasikan mekanisme timeout watermark: Aktifkan parameter table.exec.source.idle-timeout untuk memaksa sistem menghasilkan watermark setelah periode timeout tertentu. Ini memastikan siklus perhitungan window berjalan.
Optimalkan sumber data: Kami merekomendasikan mempertahankan rasio partisi Kafka terhadap konkurensi sumber yang wajar (disarankan: jumlah partisi ≥ tingkat paralelisme sumber).
Consumer Offset Commit
Sumber Kafka mengkomit offset konsumen saat ini ketika checkpoint selesai. Ini memastikan bahwa state checkpoint Flink konsisten dengan offset yang dikomit di broker Kafka. Jika checkpointing tidak diaktifkan, sumber Kafka bergantung pada logika komit offset otomatis internal konsumen Kafka. Fitur komit otomatis dikonfigurasi oleh item konfigurasi konsumen Kafka enable.auto.commit dan auto.commit.interval.ms.
CatatanSumber Kafka tidak bergantung pada offset yang dikomit di broker untuk memulihkan pekerjaan yang gagal. Pengkomitan offset hanya untuk melaporkan progres konsumsi konsumen Kafka dan kelompok konsumen untuk pemantauan di sisi broker.
Properti lainnya
Selain properti yang disebutkan di atas, Anda dapat menggunakan setProperties(Properties) dan setProperty(String, String) untuk mengatur properti apa pun untuk sumber Kafka dan konsumen Kafka. KafkaSource biasanya memiliki item konfigurasi berikut.
Item konfigurasi
Deskripsi
client.id.prefix
Menentukan awalan ID klien untuk konsumen Kafka.
partition.discovery.interval.ms
Menentukan interval di mana sumber Kafka memeriksa partisi baru.
CatatanNilai partition.discovery.interval.ms ditimpa menjadi -1 dalam mode Batch.
register.consumer.metrics
Menentukan apakah akan mendaftarkan metrik konsumen Kafka di Flink.
Konfigurasi konsumen Kafka lainnya
Untuk informasi selengkapnya tentang konfigurasi konsumen Kafka, lihat Apache Kafka.
PentingKonektor Kafka secara paksa menimpa beberapa parameter yang dikonfigurasi secara manual sebagai berikut:
key.deserializer selalu ditimpa ke ByteArrayDeserializer.
value.deserializer selalu ditimpa ke ByteArrayDeserializer.
auto.offset.reset.strategy ditimpa ke OffsetsInitializer#getAutoOffsetResetStrategy().
Contoh berikut menunjukkan cara mengonfigurasi konsumen Kafka untuk menggunakan PLAIN sebagai mekanisme SASL dan menyediakan konfigurasi JAAS.
KafkaSource.builder() .setProperty("sasl.mechanism", "PLAIN") .setProperty("sasl.jaas.config", "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"username\" password=\"password\";")Pemantauan
Sumber Kafka mendaftarkan metrik di Flink untuk pemantauan dan diagnostik.
Cakupan metrik
Semua metrik pembaca sumber Kafka didaftarkan di bawah grup metrik KafkaSourceReader, yang merupakan subgrup dari grup metrik operator. Metrik yang terkait dengan partisi topik tertentu didaftarkan di grup metrik KafkaSourceReader.topic.<topic_name>.partition.<partition_id>.
Misalnya, offset konsumen saat ini (currentOffset) untuk partisi 1 topik "my-topic" didaftarkan di <some_parent_groups>.operator.KafkaSourceReader.topic.my-topic.partition.1.currentOffset. Jumlah komit offset yang berhasil (commitsSucceeded) didaftarkan di <some_parent_groups>.operator.KafkaSourceReader.commitsSucceeded.
Daftar metrik
Nama metrik
Deskripsi
Cakupan
currentOffset
Offset konsumen saat ini.
TopicPartition
committedOffset
Offset yang dikomit saat ini.
TopicPartition
commitsSucceeded
Jumlah komit yang berhasil.
KafkaSourceReader
commitsFailed
Jumlah komit yang gagal.
KafkaSourceReader
Metrik konsumen Kafka
Metrik konsumen Kafka didaftarkan di grup metrik KafkaSourceReader.KafkaConsumer. Misalnya, metrik konsumen Kafka records-consumed-total didaftarkan di <some_parent_groups>.operator.KafkaSourceReader.KafkaConsumer.records-consumed-total.
Anda dapat menggunakan item konfigurasi register.consumer.metrics untuk menentukan apakah akan mendaftarkan metrik konsumen Kafka. Secara default, opsi ini diatur ke true. Untuk informasi selengkapnya tentang metrik konsumen Kafka, lihat Apache Kafka.
Buat sink Kafka
Sink Kafka Flink dapat menulis data stream ke satu atau beberapa topik Kafka.
DataStream<String> stream = ... Properties properties = new Properties(); properties.setProperty("bootstrap.servers", ); KafkaSink<String> kafkaSink = KafkaSink.<String>builder() .setKafkaProducerConfig(kafkaProperties) // konfigurasi produsen .setRecordSerializer( KafkaRecordSerializationSchema.builder() .setTopic("my-topic") // topik target .setKafkaValueSerializer(StringSerializer.class) // skema serialisasi .build()) .setDeliveryGuarantee(DeliveryGuarantee.EXACTLY_ONCE) // toleransi kesalahan .build(); stream.sinkTo(kafkaSink);Anda perlu mengonfigurasi parameter berikut.
Parameter
Deskripsi
Topik
Nama topik default tempat data ditulis.
Serialisasi data
Saat membangun, Anda perlu menyediakan
KafkaRecordSerializationSchemauntuk mengonversi data input menjadiProducerRecordKafka. Flink menyediakan builder skema untuk menawarkan beberapa komponen umum, seperti serialisasi kunci/nilai pesan, pemilihan topik, dan partisi pesan. Anda juga dapat mengimplementasikan antarmuka yang sesuai untuk kontrol lebih lanjut. Metode ProducerRecord<byte[], byte[]> serialize(T element, KafkaSinkContext context, Long timestamp) dipanggil untuk setiap catatan data yang masuk untuk menghasilkan ProducerRecord yang akan ditulis ke Kafka.Anda dapat memiliki kontrol detail halus atas cara setiap catatan data ditulis ke Kafka. Dengan ProducerRecord, Anda dapat melakukan operasi berikut:
Atur nama topik tempat menulis.
Tentukan kunci pesan.
Tentukan partisi tempat data ditulis.
Properti klien Kafka
bootstrap.servers wajib diisi. Ini adalah daftar broker Kafka yang dipisahkan koma.
Semantik toleransi kesalahan
Saat checkpointing Flink diaktifkan, sink Kafka Flink dapat menjamin semantik tepat-sekali. Selain mengaktifkan checkpointing Flink, Anda juga dapat menentukan semantik toleransi kesalahan yang berbeda melalui parameter DeliveryGuarantee. Detail parameter DeliveryGuarantee adalah sebagai berikut:
DeliveryGuarantee.NONE: (Pengaturan default) Flink tidak memberikan jaminan apa pun. Data mungkin hilang atau diduplikasi.
DeliveryGuarantee.AT_LEAST_ONCE: Menjamin tidak ada data yang hilang, tetapi data mungkin diduplikasi.
DeliveryGuarantee.EXACTLY_ONCE: Menggunakan transaksi Kafka untuk menyediakan semantik tepat-sekali.
CatatanUntuk pertimbangan saat menggunakan semantik EXACTLY_ONCE, lihat Pertimbangan untuk semantik EXACTLY_ONCE.
Integrasi data
Konektor Kafka dapat digunakan dalam pengembangan pekerjaan YAML integrasi data untuk membaca dari sumber atau menulis ke target.
Batasan
Kami merekomendasikan menggunakan Kafka sebagai sumber data sinkron untuk integrasi data Flink Change Data Capture (CDC) di Ververica Runtime (VVR) 11.1 dan versi lebih baru.
Hanya format JSON, Debezium JSON, dan Canal JSON yang didukung. Format data lain tidak didukung saat ini.
Untuk sumber data, hanya Ververica Runtime (VVR) 8.0.11 dan versi lebih baru yang mendukung pendistribusian data dari satu tabel ke beberapa partisi.
Sintaks
source:
type: kafka
name: Kafka source
properties.bootstrap.servers: localhost:9092
topic: ${kafka.topic}sink:
type: kafka
name: Kafka Sink
properties.bootstrap.servers: localhost:9092Item konfigurasi
Umum
Parameter
Deskripsi
Wajib
Tipe data
Nilai default
Keterangan
type
Jenis sumber atau target.
Ya
String
Tidak ada
Atur parameter ini ke kafka.
name
Nama sumber atau target.
Tidak
String
Tidak ada
Tidak ada
properties.bootstrap.servers
Alamat broker Kafka.
Ya
String
Tidak ada
Formatnya
host:port,host:port,host:port. Pisahkan alamat dengan koma (,).properties.*
Konfigurasi langsung untuk klien Kafka.
Tidak
String
Tidak ada
Akhiran harus merupakan konfigurasi yang didefinisikan dalam dokumentasi resmi Kafka untuk produsen dan konsumen.
Flink menghapus awalan properties. dan meneruskan konfigurasi yang tersisa ke klien Kafka. Misalnya, Anda dapat menggunakan
'properties.allow.auto.create.topics' = 'false'untuk menonaktifkan pembuatan topik otomatis.key.format
Format yang digunakan untuk membaca atau menulis bagian kunci pesan Kafka.
Tidak
String
Tidak ada
Untuk sumber, hanya json yang didukung.
Untuk sink, nilai yang valid adalah:
csv
json
CatatanParameter ini hanya didukung di Ververica Runtime (VVR) 11.0.0 dan versi lebih baru.
value.format
Format yang digunakan untuk membaca atau menulis bagian nilai pesan Kafka.
Tidak
String
debezium-json
Nilai yang valid:
debezium-json
canal-json
json
CatatanFormat debezium-json dan canal-json hanya didukung di Ververica Runtime (VVR) 8.0.10 dan versi lebih baru.
Format json hanya didukung di Ververica Runtime (VVR) 11.0.0 dan versi lebih baru.
Tabel sumber
Parameter
Deskripsi
Wajib
Tipe data
Nilai default
Keterangan
topic
Nama topik untuk dibaca.
Tidak
String
Tidak ada
Pisahkan beberapa nama topik dengan titik koma (;), misalnya topic-1;topic-2.
CatatanAnda hanya dapat menentukan salah satu opsi topic atau topic-pattern.
topic-pattern
Ekspresi reguler yang cocok dengan nama topik untuk dibaca. Semua topik yang cocok dengan ekspresi reguler ini akan dibaca saat pekerjaan berjalan.
Tidak
String
Tidak ada
CatatanAnda hanya dapat menentukan salah satu opsi topic atau topic-pattern.
properties.group.id
ID kelompok konsumen.
Tidak
String
Tidak ada
Jika ID grup yang ditentukan digunakan untuk pertama kalinya, Anda harus mengatur properties.auto.offset.reset ke earliest atau latest untuk menentukan offset awal.
scan.startup.mode
Offset awal untuk membaca data dari Kafka.
Tidak
String
group-offsets
Nilai yang valid:
earliest-offset: Mulai membaca dari partisi Kafka paling awal.
latest-offset: Mulai membaca dari offset Kafka terbaru.
group-offsets (default): Mulai membaca dari offset yang dikomit oleh properties.group.id yang ditentukan.
timestamp: Mulai membaca dari timestamp yang ditentukan oleh scan.startup.timestamp-millis.
specific-offsets: Mulai membaca dari offset yang ditentukan oleh scan.startup.specific-offsets.
CatatanParameter ini berlaku saat pekerjaan dimulai tanpa state. Saat pekerjaan dimulai ulang dari checkpoint atau pulih dari state, pekerjaan akan menggunakan progres yang disimpan dalam state untuk melanjutkan pembacaan.
scan.startup.specific-offsets
Dalam mode startup specific-offsets, menentukan offset awal untuk setiap partisi.
Tidak
String
Tidak ada
Contoh:
partition:0,offset:42;partition:1,offset:300scan.startup.timestamp-millis
Dalam mode startup timestamp, menentukan timestamp offset awal.
Tidak
Long
Tidak ada
Unitnya milidetik.
scan.topic-partition-discovery.interval
Interval untuk menemukan topik dan partisi Kafka secara dinamis.
Tidak
Duration
5 menit
Interval pemeriksaan partisi default adalah 5 menit. Untuk menonaktifkan fitur ini, Anda harus secara eksplisit mengatur interval pemeriksaan partisi ke nilai non-positif. Saat penemuan partisi dinamis diaktifkan, sumber Kafka dapat secara otomatis menemukan partisi baru dan membaca data darinya. Dalam mode topic-pattern, sumber Kafka tidak hanya membaca data dari partisi baru topik yang ada, tetapi juga membaca data dari semua partisi topik baru yang cocok dengan ekspresi reguler.
scan.check.duplicated.group.id
Menentukan apakah akan memeriksa duplikasi kelompok konsumen yang ditentukan oleh
properties.group.id.Tidak
Boolean
false
Nilai yang valid:
true: Sebelum memulai pekerjaan, sistem memeriksa duplikasi kelompok konsumen. Jika ditemukan duplikasi, pekerjaan melaporkan error, mencegah konflik dengan kelompok konsumen yang ada.
false: Pekerjaan langsung dimulai tanpa memeriksa konflik kelompok konsumen.
schema.inference.strategy
Strategi penguraian skema.
Tidak
String
continuous
Nilai yang valid:
continuous: Mengurai skema untuk setiap catatan data. Jika skema sebelumnya dan sesudahnya tidak kompatibel, skema yang lebih luas diurai, dan event perubahan skema dihasilkan.
static: Mengurai skema hanya sekali saat pekerjaan dimulai. Data selanjutnya diurai berdasarkan skema awal, dan tidak ada event perubahan skema yang dihasilkan.
CatatanUntuk informasi selengkapnya tentang penguraian skema, lihat Strategi penguraian dan sinkronisasi perubahan skema tabel.
Item konfigurasi ini hanya didukung di VVR 8.0.11 dan versi lebih baru.
scan.max.pre.fetch.records
Jumlah maksimum pesan yang dicoba dikonsumsi dan diurai untuk setiap partisi selama penguraian skema awal.
Tidak
Int
50
Sebelum pekerjaan benar-benar membaca dan memproses data, pekerjaan mencoba pra-mengonsumsi jumlah pesan terbaru yang ditentukan untuk setiap partisi untuk menginisialisasi informasi skema.
key.fields-prefix
Awalan kustom yang ditambahkan ke nama bidang yang diurai dari kunci pesan untuk menghindari konflik penamaan setelah mengurai kunci pesan Kafka.
Tidak
String
Tidak ada
Asumsikan item konfigurasi ini diatur ke key_. Jika kunci berisi bidang bernama `a`, nama bidang setelah mengurai kunci akan menjadi `key_a`.
CatatanNilai key.fields-prefix tidak boleh menjadi awalan dari value.fields-prefix.
value.fields-prefix
Awalan kustom yang ditambahkan ke nama bidang yang diurai dari nilai pesan untuk menghindari konflik penamaan setelah mengurai isi pesan Kafka.
Tidak
String
Tidak ada
Asumsikan item konfigurasi ini diatur ke value_. Jika nilai berisi bidang bernama `b`, nama bidang setelah mengurai nilai akan menjadi `value_b`.
CatatanNilai value.fields-prefix tidak boleh menjadi awalan dari key.fields-prefix.
metadata.list
Kolom metadata yang akan diteruskan ke downstream.
Tidak
String
Tidak ada
Kolom metadata yang tersedia meliputi
topic,partition,offset,timestamp,timestamp-type,headers,leader-epoch,__raw_key__, dan__raw_value__. Pisahkan dengan koma.Format Debezium JSON tabel sumber
Parameter
Wajib
Tipe data
Nilai default
Deskripsi
debezium-json.distributed-tables
Tidak
Boolean
false
Jika data untuk satu tabel dalam Debezium JSON muncul di beberapa partisi, Anda perlu mengaktifkan opsi ini.
CatatanItem konfigurasi ini hanya didukung di VVR 8.0.11 dan versi lebih baru.
PentingSetelah memodifikasi item konfigurasi ini, Anda perlu memulai pekerjaan tanpa state.
debezium-json.schema-include
Tidak
Boolean
false
Saat menyiapkan Debezium Kafka Connect, Anda dapat mengaktifkan konfigurasi Kafka value.converter.schemas.enable untuk menyertakan skema dalam pesan. Opsi ini menunjukkan apakah pesan Debezium JSON menyertakan skema.
Nilai yang valid:
true: Pesan Debezium JSON menyertakan skema.
false: Pesan Debezium JSON tidak menyertakan skema.
debezium-json.ignore-parse-errors
Tidak
Boolean
false
Nilai yang valid:
true: Melewatkan baris saat ini ketika terjadi error penguraian.
false (default): Melaporkan error, dan pekerjaan gagal dimulai.
debezium-json.infer-schema.primitive-as-string
Tidak
Boolean
false
Menentukan apakah akan mengurai semua tipe sebagai String saat mengurai skema tabel.
Nilai yang valid:
true: Mengurai semua tipe data primitif sebagai String.
false (default): Mengurai sesuai aturan dasar.
Format Canal JSON tabel sumber
Parameter
Wajib
Tipe data
Nilai default
Deskripsi
canal-json.distributed-tables
Tidak
Boolean
false
Jika data untuk satu tabel dalam Canal JSON muncul di beberapa partisi, Anda perlu mengaktifkan opsi ini.
CatatanItem konfigurasi ini hanya didukung di VVR 8.0.11 dan versi lebih baru.
PentingSetelah memodifikasi item konfigurasi ini, Anda perlu memulai pekerjaan tanpa state.
canal-json.database.include
Tidak
String
Tidak ada
Ekspresi reguler opsional yang cocok dengan bidang metadata `database` dalam catatan Canal. Ini hanya membaca catatan changelog dari database yang ditentukan. String ekspresi reguler kompatibel dengan Pattern Java.
canal-json.table.include
Tidak
String
Tidak ada
Ekspresi reguler opsional yang cocok dengan bidang metadata `table` dalam catatan Canal. Ini hanya membaca catatan changelog dari tabel yang ditentukan. String ekspresi reguler kompatibel dengan Pattern Java.
canal-json.ignore-parse-errors
Tidak
Boolean
false
Nilai yang valid:
true: Melewatkan baris saat ini ketika terjadi error penguraian.
false (default): Melaporkan error, dan pekerjaan gagal dimulai.
canal-json.infer-schema.primitive-as-string
Tidak
Boolean
false
Menentukan apakah akan mengurai semua tipe sebagai String saat mengurai skema tabel.
Nilai yang valid:
true: Mengurai semua tipe data primitif sebagai String.
false (default): Mengurai sesuai aturan dasar.
canal-json.infer-schema.strategy
Tidak
String
AUTO
Strategi penguraian untuk skema tabel.
Nilai yang valid:
AUTO (default): Mengurai secara otomatis dengan menganalisis data JSON. Jika data tidak berisi bidang `sqlType`, kami merekomendasikan menggunakan AUTO untuk menghindari kegagalan penguraian.
SQL_TYPE: Mengurai menggunakan array `sqlType` dalam data Canal JSON. Jika data berisi bidang `sqlType`, kami merekomendasikan mengatur canal-json.infer-schema.strategy ke SQL_TYPE untuk mendapatkan tipe yang lebih tepat.
MYSQL_TYPE: Mengurai menggunakan array `mysqlType` dalam data Canal JSON.
Saat data Canal JSON di Kafka berisi bidang `sqlType` dan diperlukan pemetaan tipe yang lebih tepat, kami merekomendasikan mengatur canal-json.infer-schema.strategy ke SQL_TYPE.
Untuk aturan pemetaan `sqlType`, lihat Penguraian skema untuk Canal JSON.
CatatanKonfigurasi ini didukung di VVR 11.1 dan versi lebih baru.
MYSQL_TYPE didukung di VVR 11.3 dan versi lebih baru.
canal-json.mysql.treat-mysql-timestamp-as-datetime-enabled
Tidak
Boolean
true
Menentukan apakah akan memetakan tipe MySQL `timestamp` ke tipe CDC `timestamp`:
true (default): Tipe MySQL `timestamp` dipetakan ke tipe CDC `timestamp`.
false: Tipe MySQL `timestamp` dipetakan ke tipe CDC `timestamp_ltz`.
canal-json.mysql.treat-tinyint1-as-boolean.enabled
Tidak
Boolean
true
Saat mengurai dengan MYSQL_TYPE, menentukan apakah akan memetakan tipe MySQL `tinyint(1)` ke tipe CDC `boolean`:
true (default): Tipe MySQL `tinyint(1)` dipetakan ke tipe CDC `boolean`.
false: Tipe MySQL `tinyint(1)` dipetakan ke tipe CDC `tinyint(1)`.
Konfigurasi ini hanya berlaku saat canal-json.infer-schema.strategy diatur ke MYSQL_TYPE.
Format JSON tabel sumber
Parameter
Wajib
Tipe data
Nilai default
Deskripsi
json.timestamp-format.standard
Tidak
String
SQL
Menentukan format timestamp input dan output. Nilai yang valid:
SQL: Mengurai timestamp input dalam format yyyy-MM-dd HH:mm:ss.s{precision}, misalnya, 2020-12-30 12:13:14.123.
ISO-8601: Mengurai timestamp input dalam format yyyy-MM-ddTHH:mm:ss.s{precision}, misalnya, 2020-12-30T12:13:14.123.
json.ignore-parse-errors
Tidak
Boolean
false
Nilai yang valid:
true: Melewatkan baris saat ini ketika terjadi error penguraian.
false (default): Melaporkan error, dan pekerjaan gagal dimulai.
json.infer-schema.primitive-as-string
Tidak
Boolean
false
Menentukan apakah akan mengurai semua tipe sebagai String saat mengurai skema tabel.
Nilai yang valid:
true: Mengurai semua tipe data primitif sebagai String.
false (default): Mengurai sesuai aturan dasar.
json.infer-schema.flatten-nested-columns.enable
Tidak
Boolean
false
Saat mengurai data JSON, menentukan apakah akan memperluas kolom bersarang dalam JSON secara rekursif. Nilai yang valid:
true: Memperluas secara rekursif.
false (default): Memperlakukan kolom bersarang sebagai String.
json.decode.parser-table-id.fields
Tidak
String
Tidak ada
Saat mengurai data JSON, menentukan apakah akan menggunakan beberapa nilai bidang JSON untuk menghasilkan tableId. Hubungkan beberapa bidang dengan koma
,. Misalnya, jika data JSON adalah{"col0":"a", "col1","b", "col2","c"}, hasilnya adalah sebagai berikut:Konfigurasi
tableId
col0
a
col0,col1
a.b
col0,col1,col2
a.b.c
Tabel sink
Parameter
Deskripsi
Wajib
Tipe data
Nilai default
Keterangan
type
Jenis target.
Ya
String
Tidak ada
Atur parameter ini ke kafka.
name
Nama target.
Tidak
String
Tidak ada
Tidak ada
topic
Nama topik Kafka.
Tidak
String
Tidak ada
Jika parameter ini ditentukan, semua data ditulis ke topik ini.
CatatanJika tidak ditentukan, setiap catatan data ditulis ke topik yang sesuai dengan string TableID-nya. TableID dibentuk dengan menggabungkan nama database dan nama tabel menggunakan titik (
.), contohnya:databaseName.tableName.partition.strategy
Strategi penulisan data ke partisi Kafka.
Tidak
String
all-to-zero
Nilai yang valid:
all-to-zero (default): Menulis semua data ke partisi 0.
hash-by-key: Menulis data ke partisi berdasarkan nilai hash kunci primer. Strategi ini memastikan bahwa catatan data dengan kunci primer yang sama ditulis ke partisi yang sama dan urutannya dipertahankan.
sink.tableId-to-topic.mapping
Pemetaan dari nama tabel hulu ke nama topik Kafka downstream.
Tidak
String
Tidak ada
Gunakan titik koma (
;) untuk memisahkan beberapa pemetaan. Dalam setiap pemetaan, gunakan titik dua (:) untuk memisahkan nama tabel hulu dari nama topik downstream. Nama tabel dapat menggunakan ekspresi reguler. Untuk memetakan beberapa tabel ke satu topik yang sama, pisahkan nama tabel dengan koma (,). Contoh:mydb.mytable1:topic1;mydb.mytable2:topic2.CatatanParameter ini memungkinkan Anda mengubah topik tujuan sambil tetap mempertahankan informasi nama tabel asli.
Format Debezium JSON tabel sink
Parameter
Wajib
Tipe data
Nilai default
Deskripsi
debezium-json.include-schema.enabled
Tidak
Boolean
false
Menentukan apakah data Debezium JSON menyertakan informasi skema.
Contoh
Gunakan Kafka sebagai sumber integrasi data:
source: type: kafka name: Kafka source properties.bootstrap.servers: ${kafka.bootstraps.server} topic: ${kafka.topic} value.format: ${value.format} scan.startup.mode: ${scan.startup.mode} sink: type: hologres name: Hologres sink endpoint: <yourEndpoint> dbname: <yourDbname> username: ${secret_values.ak_id} password: ${secret_values.ak_secret} sink.type-normalize-strategy: BROADENGunakan Kafka sebagai target integrasi data:
source: type: mysql name: MySQL Source hostname: ${secret_values.mysql.hostname} port: ${mysql.port} username: ${secret_values.mysql.username} password: ${secret_values.mysql.password} tables: ${mysql.source.table} server-id: 8601-8604 sink: type: kafka name: Kafka Sink properties.bootstrap.servers: ${kafka.bootstraps.server} route: - source-table: ${mysql.source.table} sink-table: ${kafka.topic}Di sini, modul `route` digunakan untuk mengatur nama topik untuk menulis dari tabel sumber ke Kafka.
ApsaraMQ for Kafka tidak mengaktifkan pembuatan topik otomatis secara default. Untuk informasi selengkapnya, lihat Masalah terkait pembuatan topik otomatis. Saat menulis ke ApsaraMQ for Kafka, Anda perlu membuat topik yang sesuai terlebih dahulu. Untuk informasi selengkapnya, lihat Langkah 3: Buat sumber daya.
Strategi penguraian skema tabel dan sinkronisasi perubahan
Pra-konsumsi pesan partisi dan inisialisasi skema tabel
Konektor Kafka memelihara skema semua tabel yang diketahui saat ini. Sebelum membaca data Kafka, konektor Kafka mencoba pra-mengonsumsi hingga scan.max.pre.fetch.records pesan di setiap partisi. Konektor mengurai skema setiap catatan data dan kemudian menggabungkan skema tersebut untuk menginisialisasi informasi skema tabel. Selanjutnya, sebelum mengonsumsi data, konektor menghasilkan event pembuatan tabel yang sesuai berdasarkan skema yang diinisialisasi.
CatatanUntuk format Debezium JSON dan Canal JSON, informasi tabel diperoleh dari pesan tertentu. Pesan scan.max.pre.fetch.records yang pra-dikonsumsi mungkin berisi data dari beberapa tabel. Oleh karena itu, jumlah catatan data yang pra-dikonsumsi untuk setiap tabel tidak dapat ditentukan. Pra-konsumsi dan inisialisasi skema tabel hanya dilakukan sekali sebelum mengonsumsi dan memproses pesan dari setiap partisi. Jika data tabel baru muncul nanti, skema tabel yang diurai dari catatan data pertama tabel tersebut digunakan sebagai skema awal. Skema untuk tabel tersebut tidak akan diinisialisasi ulang melalui pra-konsumsi.
PentingHanya VVR 8.0.11 dan versi lebih baru yang mendukung pendistribusian data dari satu tabel ke beberapa partisi. Untuk skenario ini, Anda perlu mengatur item konfigurasi debezium-json.distributed-tables atau canal-json.distributed-tables ke true.
Informasi tabel
Untuk format Canal JSON dan Debezium JSON, informasi tabel, termasuk database dan nama tabel, diurai dari pesan tertentu.
Untuk format JSON, informasi tabel hanya mencakup nama tabel, yaitu nama topik tempat data berada.
Informasi kunci primer
Untuk format Canal JSON, kunci primer tabel didefinisikan berdasarkan bidang `pkNames` dalam JSON.
Untuk format Debezium JSON dan JSON, JSON tidak berisi informasi kunci primer. Anda dapat menambahkan kunci primer ke tabel secara manual menggunakan aturan transformasi:
transform: - source-table: \.*.\.* projection: \* primary-keys: key1, key2
Penguraian skema dan perubahan skema
Setelah skema tabel diinisialisasi, jika schema.inference.strategy diatur ke `static`, konektor Kafka mengurai nilai setiap pesan berdasarkan skema tabel awal dan tidak menghasilkan event perubahan skema. Jika schema.inference.strategy diatur ke `continuous`, konektor Kafka mengurai isi setiap pesan Kafka untuk mengekstrak kolom fisik dan membandingkannya dengan skema yang dipelihara saat ini. Jika skema yang diurai tidak konsisten dengan skema saat ini, konektor mencoba menggabungkan skema dan menghasilkan event perubahan skema tabel yang sesuai. Aturan penggabungan adalah sebagai berikut:
Jika kolom fisik yang diurai berisi bidang yang tidak ada dalam skema saat ini, bidang tersebut ditambahkan ke skema, dan event penambahan kolom nullable dihasilkan.
Jika kolom fisik yang diurai tidak berisi bidang yang sudah ada dalam skema saat ini, bidang tersebut dipertahankan, dan datanya diisi dengan NULL. Tidak ada event penghapusan kolom yang dihasilkan.
Jika ada kolom dengan nama yang sama di kedua skema, ditangani sesuai skenario berikut:
Jika tipenya sama tetapi presisinya berbeda, tipe dengan presisi lebih besar digunakan, dan event perubahan tipe kolom dihasilkan.
Jika tipenya berbeda, sistem menemukan node induk paling rendah dalam struktur pohon yang ditunjukkan pada gambar berikut untuk digunakan sebagai tipe kolom dengan nama yang sama, dan event perubahan tipe kolom dihasilkan.

Strategi perubahan skema yang didukung saat ini adalah sebagai berikut:
Tambah kolom: Menambahkan kolom yang sesuai ke akhir skema saat ini dan menyinkronkan data kolom baru. Kolom baru diatur sebagai nullable.
Hapus kolom: Tidak menghasilkan event hapus kolom. Sebagai gantinya, data untuk kolom tersebut secara otomatis diisi dengan nilai NULL.
Ganti nama kolom: Ini dianggap sebagai penambahan dan penghapusan kolom. Kolom yang diganti namanya ditambahkan ke akhir skema saat ini, dan data kolom sebelum penggantian nama diisi dengan nilai NULL.
Perubahan tipe kolom:
Untuk sistem downstream yang mendukung perubahan tipe kolom, setelah sink downstream mendukung penanganan perubahan tipe kolom, pekerjaan integrasi data mendukung perubahan tipe kolom biasa, misalnya, dari INT ke BIGINT. Perubahan tersebut bergantung pada aturan perubahan tipe kolom yang didukung oleh sink downstream. Tabel sink yang berbeda mendukung aturan perubahan tipe kolom yang berbeda. Untuk informasi selengkapnya, lihat dokumentasi untuk tabel sink tertentu.
Untuk sistem downstream yang tidak mendukung perubahan tipe kolom, seperti Hologres, Anda dapat menggunakan pemetaan tipe luas. Ini berarti membuat tabel dengan tipe yang lebih luas di sistem downstream saat pekerjaan dimulai. Saat terjadi perubahan tipe kolom, sistem menentukan apakah sink downstream dapat menerima perubahan tersebut, sehingga memberikan dukungan toleran untuk perubahan tipe kolom.
Perubahan skema yang saat ini tidak didukung:
Perubahan pada kendala seperti kunci primer atau indeks.
Mengubah dari NOT NULL ke NULLABLE.
Penguraian skema untuk Canal JSON
Data Canal JSON mungkin berisi bidang `sqlType` opsional, yang mencatat informasi tipe yang tepat untuk kolom data. Untuk mengambil skema yang lebih akurat, Anda dapat menggunakan tipe dalam `sqlType` dengan mengatur canal-json.infer-schema.strategy ke SQL_TYPE. Hubungan pemetaan tipe adalah sebagai berikut:
Tipe JDBC
Kode Tipe
Tipe CDC
BIT
-7
BOOLEAN
BOOLEAN
16
TINYINT
-6
TINYINT
SMALLINT
-5
SMALLINT
INTEGER
4
INT
BIGINT
-5
BIGINT
DECIMAL
3
DECIMAL(38,18)
NUMERIC
2
REAL
7
FLOAT
FLOAT
6
DOUBLE
8
DOUBLE
BINARY
-2
BYTES
VARBINARY
-3
LONGVARBINARY
-4
BLOB
2004
DATE
91
DATE
TIME
92
TIME
TIMESTAMP
93
TIMESTAMP
CHAR
1
STRING
VARCHAR
12
LONGVARCHAR
-1
Tipe lainnya
Strategi pemetaan nama tabel ke topik
Saat menggunakan Kafka sebagai target untuk pekerjaan integrasi data, Anda perlu mengonfigurasi strategi pemetaan nama tabel ke topik dengan hati-hati. Hal ini karena format pesan Kafka yang ditulis (Debezium JSON atau Canal JSON) juga berisi informasi nama tabel, dan konsumsi pesan Kafka selanjutnya sering menggunakan informasi nama tabel dalam data sebagai nama tabel aktual (bukan nama topik).
Anggaplah Anda perlu menyinkronkan dua tabel, `mydb.mytable1` dan `mydb.mytable2`, dari MySQL. Strategi konfigurasi yang mungkin adalah sebagai berikut:
1. Jangan mengonfigurasi strategi pemetaan apa pun
Tanpa strategi pemetaan apa pun, setiap tabel ditulis ke topik dengan format "nama_database.nama_tabel". Oleh karena itu, data dari `mydb.mytable1` ditulis ke topik bernama `mydb.mytable1`, dan data dari `mydb.mytable2` ditulis ke topik bernama `mydb.mytable2`. Kode berikut memberikan contoh konfigurasi:
source:
type: mysql
name: MySQL Source
hostname: ${secret_values.mysql.hostname}
port: ${mysql.port}
username: ${secret_values.mysql.username}
password: ${secret_values.mysql.password}
tables: mydb.mytable1,mydb.mytable2
server-id: 8601-8604
sink:
type: kafka
name: Kafka Sink
properties.bootstrap.servers: ${kafka.bootstraps.server}2. Konfigurasikan aturan route untuk pemetaan (tidak disarankan)
Dalam banyak skenario, pengguna tidak ingin topik yang ditulis berada dalam format "nama_database.nama_tabel". Mereka ingin menulis data ke topik tertentu. Oleh karena itu, mereka mengonfigurasi aturan route untuk pemetaan. Kode berikut memberikan contoh konfigurasi:
source:
type: mysql
name: MySQL Source
hostname: ${secret_values.mysql.hostname}
port: ${mysql.port}
username: ${secret_values.mysql.username}
password: ${secret_values.mysql.password}
tables: mydb.mytable1,mydb.mytable2
server-id: 8601-8604
sink:
type: kafka
name: Kafka Sink
properties.bootstrap.servers: ${kafka.bootstraps.server}
route:
- source-table: mydb.mytable1,mydb.mytable2
sink-table: mytable1Dalam kasus ini, semua data dari `mydb.mytable1` dan `mydb.mytable2` ditulis ke topik `mytable1`.
Namun, saat Anda menggunakan aturan route untuk mengubah nama topik yang ditulis, hal ini juga memodifikasi informasi nama tabel dalam pesan Kafka (format Debezium JSON atau Canal JSON). Dalam kasus ini, semua nama tabel dalam pesan Kafka menjadi `mytable1`. Hal ini dapat menyebabkan perilaku yang tidak diinginkan saat sistem lain mengonsumsi pesan Kafka dari topik ini.
3. Konfigurasikan parameter sink.tableId-to-topic.mapping untuk pemetaan (disarankan)
Untuk mengonfigurasi aturan pemetaan nama tabel ke topik sambil mempertahankan informasi nama tabel sumber, gunakan parameter `sink.tableId-to-topic.mapping`. Kode berikut memberikan contoh konfigurasi:
source:
type: mysql
name: MySQL Source
hostname: ${secret_values.mysql.hostname}
port: ${mysql.port}
username: ${secret_values.mysql.username}
password: ${secret_values.mysql.password}
tables: mydb.mytable1,mydb.mytable2
server-id: 8601-8604
sink.tableId-to-topic.mapping: mydb.mytable1,mydb.mytable2:mytable
sink:
type: kafka
name: Kafka Sink
properties.bootstrap.servers: ${kafka.bootstraps.server}atau
source:
type: mysql
name: MySQL Source
hostname: ${secret_values.mysql.hostname}
port: ${mysql.port}
username: ${secret_values.mysql.username}
password: ${secret_values.mysql.password}
tables: mydb.mytable1,mydb.mytable2
server-id: 8601-8604
sink.tableId-to-topic.mapping: mydb.mytable1:mytable;mydb.mytable2:mytable
sink:
type: kafka
name: Kafka Sink
properties.bootstrap.servers: ${kafka.bootstraps.server}Dalam kasus ini, semua data dari `mydb.mytable1` dan `mydb.mytable2` ditulis ke topik `mytable1`. Informasi nama tabel dalam pesan Kafka (format Debezium JSON atau Canal JSON) tetap `mydb.mytable1` atau `mydb.mytable2`. Saat sistem lain mengonsumsi pesan Kafka dari topik ini, mereka dapat memperoleh informasi nama tabel sumber dengan benar.
Pertimbangan untuk semantik EXACTLY_ONCE
Konfigurasikan tingkat isolasi konsumen
Semua aplikasi yang mengonsumsi data Kafka harus mengatur `isolation.level`:
read_committed: Hanya membaca data yang telah dikomit.read_uncommitted(default): Dapat membaca data yang belum dikomit.
EXACTLY_ONCE bergantung pada
read_committed. Jika tidak, konsumen mungkin melihat data yang belum dikomit, yang merusak konsistensi.Timeout transaksi dan kehilangan data
Saat Flink pulih dari checkpoint, Flink hanya bergantung pada transaksi yang telah dikomit sebelum checkpoint tersebut dimulai. Jika waktu antara kegagalan pekerjaan dan restart-nya melebihi timeout transaksi Kafka, Kafka secara otomatis membatalkan transaksi tersebut, menyebabkan kehilangan data.
transaction.max.timeout.msdefault broker Kafka adalah 15 menit.transaction.timeout.msdefault sink Kafka Flink adalah 1 jam.Anda harus meningkatkan
transaction.max.timeout.msdi sisi broker agar tidak kurang dari pengaturan Flink.
Kolam produsen dan checkpoint konkuren
Mode EXACTLY_ONCE menggunakan kolam produsen Kafka dengan ukuran tetap. Setiap checkpoint menempati satu produsen dari kolam. Jika jumlah checkpoint konkuren melebihi ukuran kolam, pekerjaan akan gagal.
Sesuaikan ukuran kolam produsen berdasarkan jumlah maksimum checkpoint konkuren.
Batasan penskalaan turun tingkat paralelisme
Jika pekerjaan gagal sebelum checkpoint pertama, informasi kolam produsen asli tidak dipertahankan saat restart. Oleh karena itu, jangan mengurangi tingkat paralelisme pekerjaan sebelum checkpoint pertama selesai. Jika Anda harus menskala turun, tingkat paralelisme tidak boleh lebih rendah dari
FlinkKafkaProducer.SAFE_SCALE_DOWN_FACTOR.Pembacaan pemblokiran transaksi
Dalam mode
read_committed, transaksi terbuka apa pun (tidak dikomit maupun dibatalkan) memblokir pembacaan dari seluruh topik.Misalnya:
Transaksi 1 menulis data.
Transaksi 2 menulis dan mengkomit data.
Selama Transaksi 1 belum selesai, data dari Transaksi 2 tidak terlihat oleh konsumen.
Oleh karena itu:
Saat operasi normal, latensi visibilitas data kira-kira sama dengan interval checkpoint.
Saat pekerjaan gagal, topik yang sedang ditulis akan memblokir konsumen sampai pekerjaan dijalankan ulang atau transaksi timeout. Dalam kasus ekstrem, timeout transaksi bahkan dapat memengaruhi pembacaan.