Topik ini menjelaskan konektor Message Queue for Apache Kafka.
Latar Belakang
Apache Kafka adalah sistem Message Queue open-source terdistribusi yang banyak digunakan di bidang data besar untuk Pemrosesan Data berkinerja tinggi, Analitik Streaming, dan Integrasi Data. Konektor Kafka untuk Realtime Compute for Apache Flink menggunakan Apache Kafka Client open-source untuk menyediakan throughput data berkinerja tinggi, mendukung pembacaan dan penulisan berbagai Format Data, serta menawarkan Semantik tepat-sekali.
Kategori | Deskripsi |
Tipe yang didukung | Tabel Sumber, Tabel Sink, dan Sink Ingesti Data |
Mode Jalankan | Mode Streaming |
Format Data | |
Metrik | |
Tipe API | SQL API, DataStream API, dan Data Ingestion YAML API |
Memperbarui atau menghapus data di Tabel Sink | Konektor hanya mendukung penyisipan data ke Tabel Sink. Pembaruan dan penghapusan tidak didukung. Catatan Untuk informasi selengkapnya tentang cara memperbarui atau menghapus data di Tabel Sink, lihat Upsert Kafka. |
Prasyarat
Lengkapi prasyarat berdasarkan tipe kluster Kafka Anda:
Menghubungkan ke kluster ApsaraMQ for Kafka
Versi kluster Kafka minimal 0.11.
Anda telah membuat kluster ApsaraMQ for Kafka. Untuk informasi selengkapnya, lihat Langkah 3: Buat sumber daya.
Ruang kerja Flink dan kluster Kafka berada dalam Virtual Private Cloud (VPC) yang sama, dan Anda telah menambahkan Blok CIDR ruang kerja Flink ke daftar putih ApsaraMQ for Kafka. Untuk informasi selengkapnya, lihat Konfigurasi daftar putih.
PentingBatasan penulisan data ke ApsaraMQ for Kafka:
ApsaraMQ for Kafka tidak mendukung penulisan data dalam format kompresi Zstandard (zstd).
ApsaraMQ for Kafka tidak mendukung penulisan idempoten atau transaksional, sehingga mencegah penggunaan semantik tepat-sekali yang disediakan oleh tabel sink Kafka. Mulai dari Realtime Compute Engine VVR 8.0.0, konektor Kafka menggunakan Kafka client 3.x, di mana properti
properties.enable.idempotencesecara default bernilaitrue. Oleh karena itu, untuk mencegah kegagalan penulisan saat menggunakan Realtime Compute Engine VVR 8.0.0 atau versi setelahnya untuk menulis ke ApsaraMQ for Kafka, Anda harus menambahkan konfigurasiproperties.enable.idempotence=falseke definisi tabel sink Anda. Untuk perbandingan mesin penyimpanan dan batasan fitur ApsaraMQ for Kafka, lihat Perbandingan antara mesin penyimpanan.
Menghubungkan ke kluster Apache Kafka yang dikelola sendiri
Versi kluster Apache Kafka yang dikelola sendiri minimal 0.11.
Terdapat konektivitas jaringan antara ruang kerja Flink dan kluster Apache Kafka yang dikelola sendiri. Untuk detail cara menghubungkan ke kluster melalui internet publik, lihat FAQ tentang konektivitas jaringan.
Hanya opsi konfigurasi klien untuk Apache Kafka versi 2.8 yang didukung. Untuk informasi selengkapnya, lihat dokumentasi Apache Kafka Consumer Configs dan Producer Configs.
Catatan
Penulisan transaksional tidak disarankan karena adanya keterbatasan desain yang diketahui pada Apache Flink dan Apache Kafka. Saat Anda mengatur sink.delivery-guarantee = 'exactly-once', konektor Kafka mengaktifkan penulisan transaksional, dengan masalah yang diketahui sebagai berikut:
ID Transaksi baru dihasilkan untuk setiap checkpoint. Jika interval checkpoint terlalu pendek, jumlah ID Transaksi yang berlebihan dapat dibuat. Hal ini dapat menyebabkan koordinator di kluster Apache Kafka kehabisan memori, sehingga mengganggu stabilitas kluster.
Instance Producer dibuat untuk setiap transaksi. Jika terlalu banyak transaksi yang dikomit secara bersamaan, TaskManager dapat kehabisan memori, sehingga mengganggu stabilitas pekerjaan Apache Flink.
Jika beberapa pekerjaan Apache Flink menggunakan
sink.transactional-id-prefixyang sama, ID Transaksi yang dihasilkan dapat bertabrakan. Ketika operasi penulisan gagal pada satu pekerjaan, hal ini dapat mencegah Log Start Offset (LSO) partisi Apache Kafka maju. Hal ini memengaruhi semua konsumen partisi tersebut.
Jika Anda memerlukan Semantik tepat-sekali, gunakan konektor Upsert Kafka untuk menulis ke tabel kunci primer, memastikan idempotensi. Jika Anda harus menggunakan penulisan transaksional, lihat Catatan penggunaan Semantik tepat-sekali.
Pemecahan Masalah Konektivitas Jaringan
Error Timed out waiting for a node assignment saat Pekerjaan Realtime Compute for Apache Flink gagal dimulai biasanya menunjukkan adanya masalah konektivitas jaringan antara Realtime Compute for Apache Flink dan kluster Kafka.
Klien Kafka terhubung ke broker sebagai berikut:
Klien menggunakan alamat yang ditentukan dalam
bootstrap.serversuntuk membuat koneksi awal ke kluster Kafka.Kluster Kafka mengembalikan metadata untuk setiap broker, termasuk titik akhirnya.
Klien kemudian menggunakan titik akhir ini untuk terhubung ke broker guna membaca atau menulis data.
Meskipun alamat bootstrap.servers dapat dijangkau, klien tidak dapat membaca atau menulis data jika Kafka mengembalikan titik akhir broker yang salah. Masalah ini sering terjadi pada arsitektur jaringan yang menggunakan proxy, penerusan port, atau jalur sewa.
Langkah Pemecahan Masalah
Message Queue for Kafka
Konfirmasi tipe Titik Akhir
Titik Akhir Default (jaringan internal)
Titik Akhir SASL (jaringan internal dengan otentikasi)
Titik Akhir Publik (memerlukan aplikasi terpisah)
Gunakan fitur Network Probe di konsol pengembangan Realtime Compute for Apache Flink untuk mengesampingkan masalah konektivitas dengan alamat
bootstrap.servers.Periksa grup keamanan dan daftar putih
Tambahkan Blok CIDR ruang kerja Realtime Compute for Apache Flink ke daftar putih instance Kafka Anda. Untuk informasi selengkapnya, lihat Lihat Blok CIDR VPC dan Konfigurasi daftar putih.
Periksa konfigurasi SASL (jika diaktifkan)
Jika Anda menggunakan titik akhir SASL_SSL, pastikan mekanisme JAAS, SSL, dan SASL dikonfigurasi dengan benar di Pekerjaan Realtime Compute for Apache Flink Anda. Otentikasi yang hilang dapat menyebabkan koneksi gagal selama fase handshake, yang juga dapat muncul sebagai timeout. Untuk informasi selengkapnya, lihat Keamanan dan otentikasi.
Kafka yang Dikelola Sendiri
Gunakan fitur Network Probe
Fitur ini membantu Anda mengesampingkan masalah konektivitas dengan alamat
bootstrap.serversdan memverifikasi bahwa titik akhir internal atau publik yang benar digunakan.Periksa grup keamanan dan daftar putih
Grup keamanan untuk instance Elastic Compute Service (ECS) harus mengizinkan lalu lintas masuk pada port titik akhir Kafka, biasanya 9092 atau 9093.
Pastikan firewall apa pun pada instance ECS mengizinkan trafik dari VPC ruang kerja Realtime Compute for Apache Flink Anda. Untuk informasi selengkapnya, lihat Lihat Blok CIDR VPC.
Periksa konfigurasi
Gunakan tool zkCli.sh atau zookeeper-shell.sh untuk login ke kluster ZooKeeper yang digunakan Kafka.
Jalankan perintah untuk mendapatkan metadata broker. Misalnya, jalankan
get /brokers/ids/0. Di bidang endpoints respons, temukan alamat yang diiklankan Kafka kepada klien.
Gunakan fitur Network Probe di konsol pengembangan Realtime Compute for Apache Flink untuk menguji apakah alamat ini dapat diakses.
CatatanJika alamat tidak dapat diakses, hubungi administrator Kafka Anda untuk memeriksa dan memperbaiki konfigurasi
listenersdanadvertised.listenersagar alamat yang diiklankan dapat diakses dari Realtime Compute for Apache Flink.Untuk informasi selengkapnya tentang koneksi klien Kafka, lihat Troubleshoot Connectivity.
Periksa konfigurasi SASL (jika diaktifkan)
Jika Anda menggunakan titik akhir SASL_SSL, pastikan mekanisme JAAS, SSL, dan SASL dikonfigurasi dengan benar di Pekerjaan Realtime Compute for Apache Flink Anda. Otentikasi yang hilang dapat menyebabkan koneksi gagal selama fase handshake, yang juga dapat muncul sebagai timeout. Untuk informasi selengkapnya, lihat Keamanan dan otentikasi.
SQL
Konektor Kafka dapat digunakan sebagai tabel sumber atau tabel sink dalam pekerjaan SQL.
Sintaksis
CREATE TABLE KafkaTable (
`user_id` BIGINT,
`item_id` BIGINT,
`behavior` STRING,
`ts` TIMESTAMP_LTZ(3) METADATA FROM 'timestamp' VIRTUAL
) WITH (
'connector' = 'kafka',
'topic' = 'user_behavior',
'properties.bootstrap.servers' = 'localhost:9092',
'properties.group.id' = 'testGroup',
'scan.startup.mode' = 'earliest-offset',
'format' = 'csv'
)Kolom metadata
Definisikan kolom metadata di Tabel Sumber atau Tabel Sink untuk mengakses atau menulis metadata pesan Kafka. Misalnya, saat mendefinisikan beberapa topik dalam klausa WITH, gunakan kolom metadata di Tabel Sumber untuk mengidentifikasi topik sumber setiap catatan. Kode berikut menunjukkan contohnya.
CREATE TABLE kafka_source (
-- Baca topik pesan sebagai kolom `record_topic`
`record_topic` STRING NOT NULL METADATA FROM 'topic' VIRTUAL,
-- Baca timestamp dari ConsumerRecord sebagai kolom `ts`
`ts` TIMESTAMP_LTZ(3) METADATA FROM 'timestamp' VIRTUAL,
-- Baca offset pesan sebagai kolom `record_offset`
`record_offset` BIGINT NOT NULL METADATA FROM 'offset' VIRTUAL,
...
) WITH (
'connector' = 'kafka',
...
);
CREATE TABLE kafka_sink (
-- Tulis timestamp dari kolom `ts` sebagai timestamp ProducerRecord ke Kafka
`ts` TIMESTAMP_LTZ(3) METADATA FROM 'timestamp' VIRTUAL,
...
) WITH (
'connector' = 'kafka',
...
);Tabel berikut mencantumkan kolom metadata yang didukung oleh Tabel Sumber dan Sink Kafka.
Kunci | Tipe | Deskripsi | Cakupan |
topic | STRING NOT NULL METADATA VIRTUAL | Topik pesan. | Tabel sumber |
partition | INT NOT NULL METADATA VIRTUAL | ID partisi pesan. | Tabel sumber |
headers | MAP<STRING, BYTES> NOT NULL METADATA VIRTUAL | Header pesan. | Tabel sumber dan tabel sink |
leader-epoch | INT NOT NULL METADATA VIRTUAL | Leader-epoch pesan. | Tabel sumber |
offset | BIGINT NOT NULL METADATA VIRTUAL | Offset pesan. | Tabel sumber |
timestamp | TIMESTAMP(3) WITH LOCAL TIME ZONE NOT NULL METADATA VIRTUAL | Timestamp pesan. | Tabel sumber dan tabel sink |
timestamp-type | STRING NOT NULL METADATA VIRTUAL | Tipe timestamp pesan. Nilai yang valid adalah:
| Tabel sumber |
__raw_key__ | STRING NOT NULL METADATA VIRTUAL | Kunci pesan mentah. | Tabel sumber dan tabel sink Catatan Parameter ini hanya didukung di Ververica Runtime (VVR) 11.4 dan versi setelahnya. |
__raw_value__ | STRING NOT NULL METADATA VIRTUAL | Nilai pesan mentah. | Tabel sumber dan tabel sink Catatan Parameter ini hanya didukung di Ververica Runtime (VVR) 11.4 dan versi setelahnya. |
DENGAN parameter
Umum
Parameter
Deskripsi
Tipe
Wajib
Default
Keterangan
connector
Konektor yang digunakan.
String
Ya
N/A
Nilainya harus
kafka.properties.bootstrap.servers
Daftar alamat broker Kafka.
String
Ya
N/A
Format:
host:port,host:port,.... Pisahkan alamat dengan koma (,).properties.*
Properti tambahan untuk klien Kafka.
String
Tidak
N/A
Kunci properti harus merupakan opsi yang valid sebagaimana didefinisikan dalam dokumentasi resmi Apache Kafka untuk Producer Configs dan Consumer Configs.
Realtime Compute for Apache Flink menghapus prefiks properties. dan meneruskan pasangan kunci-nilai yang tersisa ke klien Kafka yang mendasari. Misalnya, Anda dapat mengatur
'properties.allow.auto.create.topics' = 'false'untuk menonaktifkan pembuatan topik otomatis.Anda tidak dapat menggunakan metode ini untuk mengonfigurasi opsi berikut karena Konektor Kafka akan menimpanya:
key.deserializer
value.deserializer
format
Format untuk serialisasi dan deserialisasi nilai pesan Kafka.
String
Tidak
N/A
Format yang didukung:
csv
json
avro
debezium-json
canal-json
maxwell-json
avro-confluent
raw
CatatanUntuk informasi selengkapnya, lihat Format options.
key.format
Format untuk serialisasi dan deserialisasi kunci pesan Kafka.
String
Tidak
N/A
Format yang didukung:
csv
json
avro
debezium-json
canal-json
maxwell-json
avro-confluent
raw
CatatanSaat menggunakan konfigurasi ini, key.options wajib diisi.
key.fields
Bidang dari skema tabel yang digunakan sebagai kunci pesan Kafka.
String
Tidak
N/A
Pisahkan beberapa nama bidang dengan titik koma (;). Misalnya,
'field1;field2'.key.fields-prefix
Prefiks khusus untuk semua bidang kunci guna mencegah konflik nama dengan bidang nilai.
String
Tidak
N/A
Prefiks ini digunakan untuk membedakan antara bidang kunci dan bidang nilai. Prefiks ini dihapus sebelum serialisasi kunci atau setelah deserialisasi.
CatatanJika Anda menggunakan opsi ini,
value.fields-includeharus diatur keEXCEPT_KEY.value.format
Format untuk serialisasi dan deserialisasi nilai pesan Kafka.
String
Tidak
N/A
Konfigurasi ini setara dengan
format. Anda hanya dapat mengatur salah satu dariformatatauvalue.format. Jika keduanya dikonfigurasi,value.formatakan menggantikanformat.value.fields-include
Menentukan apakah bidang kunci disertakan dalam format nilai.
String
Tidak
ALL
Nilai yang valid:
ALL: Nilai pesan Kafka mencakup semua kolom tabel.EXCEPT_KEY: Nilai pesan Kafka mencakup semua kolom tabel kecuali yang didefinisikan dalamkey.fields.
Tabel sumber
Parameter
Deskripsi
Type
Wajib
Default
Keterangan
topic
Topik atau topik yang akan dibaca.
String
Tidak
N/A
Untuk berlangganan beberapa topik, pisahkan nama mereka dengan titik koma (;), misalnya,
'topic-1;topic-2'.CatatanAnda dapat menentukan opsi ini atau
topic-pattern, tetapi tidak keduanya.topic-pattern
Ekspresi reguler yang menentukan topik mana yang akan berlangganan. Konsumen berlangganan ke semua topik yang namanya sesuai dengan pola ini.
String
Tidak
N/A
CatatanAnda dapat menentukan opsi ini atau
topic, tetapi tidak keduanya.properties.group.id
ID Kelompok Konsumen untuk sumber Kafka.
String
Tidak
KafkaSource-{Nama-Tabel-Sumber}
Jika Anda menggunakan ID Kelompok Konsumen untuk pertama kalinya, Anda juga harus mengatur properties.auto.offset.reset ke
earliestataulatestuntuk menentukan Offset Startup awal.scan.startup.mode
Offset Startup untuk Konsumen Kafka.
String
Tidak
group-offsets
Nilai yang valid:
earliest-offset: Mulai membaca dari Offset paling awal yang tersedia.latest-offset: Mulai membaca dari Offset terbaru.group-offsets: Mulai membaca dari Offset yang dikomit dari properties.group.id yang ditentukan.timestamp: Mulai membaca dari scan.startup.timestamp-millis yang ditentukan.specific-offsets: Mulai membaca dari Offset yang ditentukan dalam scan.startup.specific-offsets.
CatatanOpsi ini hanya berlaku saat Pekerjaan dimulai dari State bersih. Jika Pekerjaan dimulai ulang dari Checkpoint, selalu dilanjutkan dari Offset yang disimpan dalam State Checkpoint.
scan.startup.specific-offsets
Offset awal untuk setiap partisi saat
scan.startup.modeadalahspecific-offsets.String
Tidak
N/A
Misalnya,
partition:0,offset:42;partition:1,offset:300scan.startup.timestamp-millis
Timestamp awal dalam milidetik saat
scan.startup.modediatur ketimestamp.Long
Tidak
N/A
Unitnya adalah milidetik.
scan.topic-partition-discovery.interval
Interval untuk menemukan partisi baru dalam topik.
Duration
Tidak
5 menit
Konektor secara berkala menemukan dan membaca dari partisi baru. Saat menggunakan topic-pattern, Konektor juga menemukan topik baru yang sesuai dengan pola tersebut. Atur interval ke nilai non-positif untuk menonaktifkan fitur ini.
CatatanDi Ververica Runtime (VVR) 6.0.x, penemuan partisi dinamis dinonaktifkan secara default. Mulai dari VVR 8.0, fitur ini diaktifkan secara default dengan interval penemuan 5 menit.
scan.header-filter
Memfilter pesan berdasarkan header pesan Kafka.
String
Tidak
N/A
Kunci header dan nilainya dipisahkan dengan titik dua (:). Beberapa kondisi header dihubungkan menggunakan operator logika (& dan |). Operator logika NOT (!) juga didukung. Misalnya,
depart:toy|depart:book&!env:testmenyimpan data Kafka jika header berisidepart=toyataudepart=bookdan tidak berisienv=test.CatatanOpsi ini hanya didukung di Ververica Runtime (VVR) 8.0.6 dan versi setelahnya.
Tanda kurung dalam ekspresi tidak didukung.
Operasi logika dievaluasi dari kiri ke kanan.
Nilai header dikonversi ke string UTF-8 untuk perbandingan.
scan.check.duplicated.group.id
Menentukan apakah akan memeriksa apakah Konsumen aktif lain menggunakan
properties.group.id.Boolean
Tidak
false
Nilai yang valid:
true: Sebelum memulai Pekerjaan, sistem memeriksa duplikasi Kelompok Konsumen. Jika ditemukan, Pekerjaan gagal untuk mencegah konflik.
false: Memulai Pekerjaan tanpa memeriksa konflik.
CatatanOpsi ini hanya didukung di Ververica Runtime (VVR) 6.0.4 dan versi setelahnya.
Tabel sink
Parameter
Deskripsi
Type
Wajib
Default
Keterangan
topic
Topik yang akan ditulis.
String
Ya
N/A
N/A
sink.partitioner
Menentukan cara memetakan catatan dari instance sink paralel ke partisi Kafka.
String
Tidak
default
Nilai yang valid:
default: Menggunakan partitioner Kafka default.fixed: Setiap instance sink paralel menulis ke partisi Kafka tetap.round-robin: Catatan didistribusikan ke partisi secara round-robin.Partitioner khusus: Untuk menggunakan partitioner khusus, berikan nama kelas lengkap dari subclass
FlinkKafkaPartitioner, misalnya,org.mycompany.MyPartitioner.
sink.delivery-guarantee
Jaminan pengiriman untuk sink.
String
Tidak
at-least-once
Nilai yang valid:
none: Tidak memberikan jaminan. Catatan mungkin hilang atau diduplikasi.at-least-once: Menjamin bahwa tidak ada catatan yang hilang, tetapi mungkin diduplikasi.exactly-once: Menggunakan transaksi Kafka untuk memberikan Semantik tepat-sekali, memastikan catatan tidak hilang maupun diduplikasi.
CatatanSaat menggunakan Semantik
exactly-once, Anda juga harus menentukan sink.transactional-id-prefix.sink.transactional-id-prefix
Prefiks untuk ID transaksi, wajib saat
sink.delivery-guaranteeadalahexactly-once.String
Ya, jika
sink.delivery-guaranteeadalahexactly-onceN/A
Wajib hanya saat sink.delivery-guarantee diatur ke
exactly-once.sink.parallelism
Paralelisme Operator sink.
Integer
Tidak
N/A
Secara default, framework menentukan Paralelisme berdasarkan Operator hulu.
Keamanan dan otentikasi
Jika kluster Kafka memerlukan koneksi aman atau otentikasi, tambahkan prefiks konfigurasi keamanan dan otentikasi terkait dengan properties. dan atur di parameter WITH. Contoh konfigurasi tabel Kafka untuk menggunakan PLAIN sebagai mekanisme SASL dan menyediakan konfigurasi JAAS adalah sebagai berikut.
CREATE TABLE KafkaTable (
`user_id` BIGINT,
`item_id` BIGINT,
`behavior` STRING,
`ts` TIMESTAMP_LTZ(3) METADATA FROM 'timestamp'
) WITH (
'connector' = 'kafka',
...
'properties.security.protocol' = 'SASL_PLAINTEXT',
'properties.sasl.mechanism' = 'PLAIN',
'properties.sasl.jaas.config' = 'org.apache.flink.kafka.shaded.org.apache.kafka.common.security.plain.PlainLoginModule required username="username" password="password";'
)Contoh berikut menunjukkan cara menggunakan SASL_SSL sebagai protokol keamanan dan SCRAM-SHA-256 sebagai mekanisme SASL.
CREATE TABLE KafkaTable (
`user_id` BIGINT,
`item_id` BIGINT,
`behavior` STRING,
`ts` TIMESTAMP_LTZ(3) METADATA FROM 'timestamp'
) WITH (
'connector' = 'kafka',
...
'properties.security.protocol' = 'SASL_SSL',
/* Konfigurasi SSL */
/* Path ke truststore untuk sertifikat CA server. */
/* File yang diunggah menggunakan Artifacts disimpan di direktori /flink/usrlib/. */
'properties.ssl.truststore.location' = '/flink/usrlib/kafka.client.truststore.jks',
'properties.ssl.truststore.password' = 'test1234',
/* Jika otentikasi klien diperlukan, Anda juga harus mengonfigurasi path ke keystore (kunci privat). */
'properties.ssl.keystore.location' = '/flink/usrlib/kafka.client.keystore.jks',
'properties.ssl.keystore.password' = 'test1234',
/* Algoritma yang digunakan untuk memverifikasi hostname server. String kosong menonaktifkan verifikasi hostname. */
'properties.ssl.endpoint.identification.algorithm' = '',
/* Konfigurasi SASL */
/* Atur mekanisme SASL ke SCRAM-SHA-256. */
'properties.sasl.mechanism' = 'SCRAM-SHA-256',
/* Konfigurasi JAAS. */
'properties.sasl.jaas.config' = 'org.apache.flink.kafka.shaded.org.apache.kafka.common.security.scram.ScramLoginModule required username="username" password="password";'
)Anda dapat menggunakan fitur Artifacts dari Konsol Komputasi Real-time untuk mengunggah sertifikat CA dan kunci privat yang disebutkan dalam contoh. File yang diunggah disimpan di direktori /flink/usrlib. Untuk menggunakan file sertifikat CA bernama my-truststore.jks, Anda dapat mengatur properti 'properties.ssl.truststore.location' di klausa WITH dengan salah satu dari dua cara berikut:
Atur
'properties.ssl.truststore.location' = '/flink/usrlib/my-truststore.jks'. Metode ini menghindari pengunduhan file secara dinamis dari Object Storage Service (OSS) saat runtime, tetapi tidak mendukung Mode Debug.Jika versi mesin Realtime Compute adalah VVR 11.5 atau versi setelahnya, Anda dapat mengonfigurasi
properties.ssl.truststore.locationdanproperties.ssl.keystore.locationke path OSS absolut. Format path file adalah oss://flink-fullymanaged-<ID Ruang Kerja>/artifacts/namespaces/<Nama Namespace>/<nama file>. Metode ini mengunduh file OSS secara dinamis selama runtime Flink dan mendukung Mode Debug.
Verifikasi konfigurasi Anda: Contoh dalam topik ini menunjukkan konfigurasi umum. Sebelum mengonfigurasi konektor Kafka, hubungi tim O&M Kafka Anda untuk mendapatkan pengaturan keamanan dan otentikasi yang benar.
Escaping: Berbeda dengan Apache Flink asli, editor SQL Realtime Compute for Apache Flink secara default melakukan escaping tanda kutip ganda ("). Oleh karena itu, Anda tidak perlu menambahkan backslash (\) untuk melakukan escaping tanda kutip ganda yang digunakan untuk username dan password dalam opsi
properties.sasl.jaas.config.
Offset awal tabel sumber
Startup mode
Anda dapat mengonfigurasi opsi scan.startup.mode untuk menentukan offset dari mana tabel sumber Kafka mulai membaca data. Nilai yang valid meliputi:
earliest-offset: Mulai membaca dari offset paling awal.
latest-offset: Mulai membaca dari offset terbaru.
group-offsets: Mulai membaca dari offset yang dikomit untuk kelompok konsumen yang ditentukan dalam properties.group.id.
timestamp: Mulai membaca dari pesan pertama dengan timestamp lebih besar dari atau sama dengan nilai yang ditentukan dalam scan.startup.timestamp-millis.
specific-offsets: Mulai membaca dari offset partisi tertentu yang ditentukan dalam scan.startup.specific-offsets.
Jika Anda tidak menentukan mode startup, default-nya adalah 'group-offsets'.
Opsi scan.startup.mode hanya berlaku untuk pekerjaan tanpa status. Saat pekerjaan berstatus dimulai, selalu mengonsumsi dari offset yang disimpan dalam statusnya.
Kode berikut memberikan contoh:
CREATE TEMPORARY TABLE kafka_source (
...
) WITH (
'connector' = 'kafka',
...
-- Konsumsi dari offset paling awal.
'scan.startup.mode' = 'earliest-offset',
-- Konsumsi dari offset terbaru.
'scan.startup.mode' = 'latest-offset',
-- Konsumsi dari offset yang dikomit kelompok konsumen "my-group".
'properties.group.id' = 'my-group',
'scan.startup.mode' = 'group-offsets',
'properties.auto.offset.reset' = 'earliest', -- Jika "my-group" digunakan pertama kali, konsumsi dimulai dari offset paling awal.
'properties.auto.offset.reset' = 'latest', -- Jika "my-group" digunakan pertama kali, konsumsi dimulai dari offset terbaru.
-- Konsumsi dari timestamp tertentu dalam milidetik: 1655395200000.
'scan.startup.mode' = 'timestamp',
'scan.startup.timestamp-millis' = '1655395200000',
-- Konsumsi dari offset tertentu.
'scan.startup.mode' = 'specific-offsets',
'scan.startup.specific-offsets' = 'partition:0,offset:42;partition:1,offset:300'
);Prioritas offset awal
Offset awal untuk tabel sumber ditentukan berdasarkan prioritas berikut, dari tertinggi ke terendah:
Prioritas (tertinggi ke terendah) | Offset yang disimpan dalam Checkpoint atau Savepoint. |
Waktu mulai yang dipilih di Konsol Komputasi Real-time selama startup pekerjaan. | |
Offset awal yang ditentukan oleh scan.startup.mode di klausa WITH. | |
Jika scan.startup.mode tidak ditentukan, group-offsets digunakan untuk memulai konsumsi dari offset kelompok konsumen yang sesuai. |
Jika offset yang ditentukan oleh langkah-langkah ini tidak valid, misalnya karena telah kedaluwarsa atau terjadi masalah di kluster Kafka, sistem mereset offset sesuai kebijakan yang ditentukan dalam properties.auto.offset.reset. Jika opsi ini tidak dikonfigurasi, sistem melemparkan exception yang memerlukan intervensi pengguna.
Skenario umum adalah mengonsumsi data dengan ID kelompok konsumen baru. Tabel sumber terlebih dahulu menanyakan kluster Kafka untuk offset yang dikomit kelompok tersebut. Karena ID kelompok baru, tidak ditemukan offset yang valid. Akibatnya, sistem mereset offset sesuai kebijakan yang ditentukan dalam properties.auto.offset.reset. Oleh karena itu, saat mengonsumsi dengan ID kelompok baru, Anda harus mengonfigurasi opsi properties.auto.offset.reset.
Mengkomit offset sumber
Tabel sumber Kafka hanya mengkomit offset konsumen saat ini ke kluster Kafka setelah checkpoint berhasil. Jika interval checkpoint panjang, offset konsumen di kluster Kafka tertinggal. Selama checkpoint, tabel sumber Kafka menyimpan progres bacaannya saat ini dalam statusnya. Sistem menggunakan status ini untuk pemulihan kesalahan dan tidak bergantung pada offset yang dikomit ke kluster Kafka. Offset dikomit hanya untuk memantau progres bacaan di Kafka. Kegagalan komit offset tidak memengaruhi akurasi data.
Pemartisi sink kustom
Jika strategi partisi bawaan Kafka tidak memenuhi kebutuhan Anda, Anda dapat mengimplementasikan partitioner khusus dengan memperluas kelas FlinkKafkaPartitioner. Setelah pengembangan selesai, kompilasi kode Anda menjadi paket JAR dan unggah menggunakan fitur Artifacts di konsol Realtime Compute. Setelah paket JAR diunggah dan direferensikan, atur parameter sink.partitioner di klausa WITH ke nama kelas lengkap partitioner Anda, misalnya, org.mycompany.MyPartitioner.
Kafka, Upsert Kafka, dan katalog Kafka JSON
Kafka adalah sistem antrian pesan append-only yang tidak mendukung pembaruan atau penghapusan data. Dalam SQL streaming, tabel sink Kafka standar tidak dapat menangani data Change Data Capture (CDC) hulu atau logika penarikan kembali operator seperti agregat dan join. Jika Anda perlu menulis data yang berisi perubahan atau penarikan kembali, gunakan tabel sink Upsert Kafka.
Untuk menyederhanakan sinkronisasi batch data Change Data Capture (CDC) dari satu atau beberapa tabel database hulu ke Kafka, Anda dapat menggunakan katalog Kafka JSON. Jika data yang disimpan di Kafka dalam format JSON, katalog Kafka JSON memungkinkan Anda melewati langkah mendefinisikan skema dan parameter WITH. Untuk detailnya, lihat Kelola katalog Kafka JSON.
Contoh
Contoh 1: Baca dari dan tulis ke Kafka
Contoh ini membaca data dari topik Kafka sumber dan menulisnya ke topik sink. Datanya dalam format CSV.
CREATE TEMPORARY TABLE kafka_source (
id INT,
name STRING,
age INT
) WITH (
'connector' = 'kafka',
'topic' = 'source',
'properties.bootstrap.servers' = '<yourKafkaBrokers>',
'properties.group.id' = '<yourKafkaConsumerGroupId>',
'format' = 'csv'
);
CREATE TEMPORARY TABLE kafka_sink (
id INT,
name STRING,
age INT
) WITH (
'connector' = 'kafka',
'topic' = 'sink',
'properties.bootstrap.servers' = '<yourKafkaBrokers>',
'properties.group.id' = '<yourKafkaConsumerGroupId>',
'format' = 'csv'
);
INSERT INTO kafka_sink SELECT id, name, age FROM kafka_source;Contoh 2: Sinkronisasi skema dan data tabel
Anda dapat menggunakan konektor Kafka untuk menyinkronkan pesan dari topik Kafka ke Hologres secara real time. Untuk mencegah pesan duplikat di Hologres selama failover, Anda dapat menggunakan offset dan ID partisi pesan Kafka sebagai kunci primer komposit.
CREATE TEMPORARY TABLE kafkaTable (
`offset` INT NOT NULL METADATA,
`part` BIGINT NOT NULL METADATA FROM 'partition',
PRIMARY KEY (`part`, `offset`) NOT ENFORCED
) WITH (
'connector' = 'kafka',
'properties.bootstrap.servers' = '<yourKafkaBrokers>',
'topic' = 'kafka_evolution_demo',
'scan.startup.mode' = 'earliest-offset',
'format' = 'json',
'json.infer-schema.flatten-nested-columns.enable' = 'true'
-- Opsional. Meratakan semua kolom bersarang.
);
CREATE TABLE IF NOT EXISTS hologres.kafka.`sync_kafka`
WITH (
'connector' = 'hologres'
) AS TABLE vvp.`default`.kafkaTable;Contoh 3: Sinkronisasi kunci dan nilai Kafka
Jika kunci pesan Kafka berisi informasi relevan, Anda dapat menyinkronkan kunci dan nilainya.
CREATE TEMPORARY TABLE kafkaTable (
`key_id` INT NOT NULL,
`val_name` VARCHAR(200)
) WITH (
'connector' = 'kafka',
'properties.bootstrap.servers' = '<yourKafkaBrokers>',
'topic' = 'kafka_evolution_demo',
'scan.startup.mode' = 'earliest-offset',
'key.format' = 'json',
'value.format' = 'json',
'key.fields' = 'key_id',
'key.fields-prefix' = 'key_',
'value.fields-prefix' = 'val_',
'value.fields-include' = 'EXCEPT_KEY'
);
CREATE TABLE IF NOT EXISTS hologres.kafka.`sync_kafka`(
WITH (
'connector' = 'hologres'
) AS TABLE vvp.`default`.kafkaTable;Kunci pesan Kafka tidak mendukung Evolusi Skema atau penguraian tipe otomatis. Anda harus mendeklarasikan skema secara manual.
Contoh 4: Sinkronisasi data dan lakukan komputasi
Menyinkronkan data dari Kafka ke Hologres sering kali memerlukan komputasi ringan.
CREATE TEMPORARY TABLE kafkaTable (
`distinct_id` INT NOT NULL,
`properties` STRING,
`timestamp` TIMESTAMP_LTZ METADATA,
`date` AS CAST(`timestamp` AS DATE)
) WITH (
'connector' = 'kafka',
'properties.bootstrap.servers' = '<yourKafkaBrokers>',
'topic' = 'kafka_evolution_demo',
'scan.startup.mode' = 'earliest-offset',
'key.format' = 'json',
'value.format' = 'json',
'key.fields' = 'key_id',
'key.fields-prefix' = 'key_'
);
CREATE TABLE IF NOT EXISTS hologres.kafka.`sync_kafka` WITH (
'connector' = 'hologres'
) AS TABLE vvp.`default`.kafkaTable
ADD COLUMN
`order_id` AS COALESCE(JSON_VALUE(`properties`, '$.order_id'), 'default');
--Gunakan COALESCE untuk menangani nilai null.Contoh 5: Uraikan JSON bersarang
Berikut adalah contoh pesan JSON:
{
"id": 101,
"name": "VVP",
"properties": {
"owner": "Alibaba Cloud",
"engine": "Flink"
}
}Untuk menghindari penggunaan fungsi seperti JSON_VALUE(payload, '$.properties.owner') untuk mengurai bidang, Anda dapat langsung mendefinisikan struktur dalam DDL Sumber:
CREATE TEMPORARY TABLE kafka_source (
id VARCHAR,
`name` VARCHAR,
properties ROW<`owner` STRING, engine STRING>
) WITH (
'connector' = 'kafka',
'topic' = 'xxx',
'properties.bootstrap.servers' = 'xxx',
'scan.startup.mode' = 'earliest-offset',
'format' = 'json'
);Dengan cara ini, Flink mengurai JSON menjadi bidang terstruktur sekaligus selama fase baca. Kueri SQL selanjutnya kemudian dapat langsung menggunakan properties.owner tanpa panggilan fungsi tambahan, yang meningkatkan kinerja keseluruhan.
DataStream API
Untuk membaca atau menulis data dengan DataStream API, gunakan DataStream Connector yang sesuai untuk terhubung ke Realtime Compute for Apache Flink. Untuk informasi selengkapnya tentang cara menyiapkan DataStream Connector, lihat Integrasikan konektor DataStream.
Buat Kafka Source
Kafka Source menyediakan kelas builder untuk membuat instance Kafka Source. Kode contoh berikut membangun Kafka Source yang mengonsumsi data dari Offset paling awal dari topik
input-topic. Kelompok Konsumen adalahmy-group, dan Nilai Pesan Kafka dideserialisasi sebagai string.Java
KafkaSource<String> source = KafkaSource.<String>builder() .setBootstrapServers(brokers) .setTopics("input-topic") .setGroupId("my-group") .setStartingOffsets(OffsetsInitializer.earliest()) .setValueOnlyDeserializer(new SimpleStringSchema()) .build(); env.fromSource(source, WatermarkStrategy.noWatermarks(), "Kafka Source");Untuk membangun Kafka Source, Anda harus menentukan properti berikut.
Parameter
Deskripsi
BootstrapServers
Daftar alamat broker Kafka. Atur properti ini dengan memanggil metode
setBootstrapServers(String).GroupId
ID Kelompok Konsumen. Atur properti ini dengan memanggil metode
setGroupId(String).Topik atau Partisi
Topik atau partisi untuk berlangganan. Kafka Source mendukung tiga metode berikut untuk berlangganan ke topik atau partisi:
Berlangganan ke semua partisi topik dalam daftar.
KafkaSource.builder().setTopics("topic-a","topic-b")Pola topik: Berlangganan ke semua partisi topik yang namanya cocok dengan ekspresi reguler yang ditentukan.
KafkaSource.builder().setTopicPattern("topic.*")Daftar partisi, tempat Anda dapat berlangganan ke partisi tertentu.
final HashSet<TopicPartition> partitionSet = new HashSet<>(Arrays.asList( new TopicPartition("topic-a", 0), // Partisi 0 dari topik "topic-a" new TopicPartition("topic-b", 5))); // Partisi 5 dari topik "topic-b" KafkaSource.builder().setPartitions(partitionSet)
Deserializer
Deserializer yang digunakan untuk mengurai pesan Kafka.
Tentukan deserialisasi menggunakan metode
setDeserializer(KafkaRecordDeserializationSchema).KafkaRecordDeserializationSchemamendefinisikan cara menguraiConsumerRecordKafka. Jika Anda hanya perlu mengurai Value dari Message Kafka, Anda dapat menggunakan salah satu metode berikut:Gunakan metode
setValueOnlyDeserializer(DeserializationSchema)dari kelas pembuat.DeserializationSchemamenentukan cara mengurai data biner dari Message Value Kafka.Gunakan kelas yang mengimplementasikan antarmuka Deserializer Kafka. Misalnya, Anda dapat menggunakan StringDeserializer untuk mengurai Nilai Pesan Kafka menjadi string.
import org.apache.kafka.common.serialization.StringDeserializer; KafkaSource.<String>builder() .setDeserializer(KafkaRecordDeserializationSchema.valueOnly(StringDeserializer.class));
CatatanUntuk mengurai
ConsumerRecordlengkap, Anda harus mengimplementasikan antarmukaKafkaRecordDeserializationSchema.POM
Konektor DataStream Kafka tersedia di repositori pusat Maven.
<dependency> <groupId>com.alibaba.ververica</groupId> <artifactId>ververica-connector-kafka</artifactId> <version>${vvr-version}</version> </dependency>Saat menggunakan DataStream Connector Kafka, pertimbangkan properti berikut:
Offset Awal
Kafka Source menentukan offset awalnya menggunakan Offset Initializer (
OffsetsInitializer). Inisialisasi bawaan meliputi:Inisialisasi offset
Kode
Memulai konsumsi dari Offset paling awal.
KafkaSource.builder().setStartingOffsets(OffsetsInitializer.earliest())Memulai konsumsi dari Offset terbaru.
KafkaSource.builder().setStartingOffsets(OffsetsInitializer.latest())Memulai konsumsi data yang timestamp-nya lebih besar dari atau sama dengan waktu yang ditentukan. Unitnya adalah milidetik.
KafkaSource.builder().setStartingOffsets(OffsetsInitializer.timestamp(1592323200000L))Memulai konsumsi dari Offset yang dikomit oleh Kelompok Konsumen. Jika tidak ada Offset yang dikomit, menggunakan strategi reset yang ditentukan (misalnya, Offset paling awal).
KafkaSource.builder().setStartingOffsets(OffsetsInitializer.committedOffsets(OffsetResetStrategy.EARLIEST))Konsumsi dimulai dari offset yang dikomit oleh kelompok konsumen, dan tidak ada kebijakan reset offset yang ditentukan.
KafkaSource.builder().setStartingOffsets(OffsetsInitializer.committedOffsets())CatatanJika inisialisasi bawaan tidak memenuhi kebutuhan Anda, Anda dapat mengimplementasikan Offset Initializer khusus.
Jika Anda tidak menentukan Offset Initializer, default-nya adalah
OffsetsInitializer.earliest().
Mode Streaming dan Mode Batch
Kafka Source mendukung Mode Streaming dan Mode Batch. Secara default, beroperasi dalam Mode Streaming, di mana Pekerjaan berjalan tanpa henti hingga gagal atau dibatalkan. Untuk mengonfigurasi Kafka Source agar berjalan dalam Mode Batch, Anda dapat menggunakan
setBounded(OffsetsInitializer)untuk menentukan Offset berhenti. Kafka Source keluar saat semua partisi mencapai offset berhenti yang ditentukan.CatatanKafka Source dalam Mode Streaming biasanya tidak memiliki Offset berhenti. Namun, untuk tujuan pengujian, Anda dapat menggunakan
setUnbounded(OffsetsInitializer)untuk menentukan Offset berhenti bahkan dalam Mode Streaming. Perhatikan perbedaan nama metode untuk menentukan Offset berhenti:setUnboundeduntuk Mode Streaming dansetBoundeduntuk Mode Batch.Penemuan Partisi Dinamis
Untuk menangani penskalaan Topik atau pembuatan topik baru tanpa memulai ulang Pekerjaan Flink, Anda dapat mengaktifkan Penemuan Partisi Dinamis saat berlangganan ke topik berdasarkan pola. Fitur ini dinonaktifkan secara default dan harus diaktifkan secara eksplisit:
KafkaSource.builder() .setProperty("partition.discovery.interval.ms", "10000") // Temukan partisi baru setiap 10 detik.PentingFitur penemuan partisi dinamis bergantung pada mekanisme pembaruan metadata kluster Kafka. Jika kluster Kafka tidak memperbarui informasi partisi secara tepat waktu, partisi baru mungkin tidak ditemukan. Pastikan konfigurasi partition.discovery.interval.ms kluster Kafka sesuai dengan skenario aktual Anda.
Waktu Peristiwa dan Watermark
Secara default, Kafka Source menggunakan timestamp dari Pesan Kafka sebagai Waktu Event. Anda dapat mendefinisikan strategi Watermark khusus untuk mengekstrak Waktu Event dari isi Pesan dan mengirimkan Watermark ke hulu.
env.fromSource(kafkaSource, new CustomWatermarkStrategy(), "Kafka Source With Custom Watermark Strategy")Untuk mempelajari lebih lanjut tentang strategi Watermark khusus, lihat Generating Watermarks.
CatatanJika subtugas sumber tidak aktif—misalnya, ketika partisi Kafka tidak memiliki data baru atau paralelisme sumber lebih tinggi daripada jumlah partisi Kafka—watermark untuk subtugas tersebut tidak akan maju, yang dapat menghambat komputasi jendela downstream.
Untuk mengatasi masalah ini, pertimbangkan solusi berikut:
Konfigurasikan waktu tunggu idle sumber: Aktifkan properti table.exec.source.idle-timeout untuk menandai sumber idle sebagai sementara idle. Hal ini memungkinkan Watermark hilir maju.
Atur Paralelisme yang sesuai: Pastikan Paralelisme sumber tidak lebih besar dari jumlah partisi Kafka.
Offset Commit
Saat checkpoint diaktifkan, Kafka Source mengkomit Offset konsumen saat ini ke Kafka saat Checkpoint selesai. Hal ini memastikan status Checkpoint Flink konsisten dengan Offset yang dikomit di broker Kafka. Jika checkpoint dinonaktifkan, Kafka Source mengandalkan mekanisme komit periodik otomatis konsumen Kafka internal. Fitur ini dikontrol oleh properti konsumen Kafka
enable.auto.commitdanauto.commit.interval.ms.CatatanKafka Source tidak bergantung pada offset yang dikomit untuk toleransi kesalahan dan pemulihan. Mengkomit offset hanya untuk memantau progres konsumen Kafka dan Kelompok Konsumen.
Properti lainnya
Selain properti yang disebutkan, Anda dapat menggunakan
setProperties(Properties)dansetProperty(String, String)untuk mengatur properti apa pun untuk Kafka Source dan konsumen Kafka yang mendasarinya. Kafka Source menyediakan properti spesifik berikut.Parameter
Deskripsi
client.id.prefix
Menentukan prefiks ID klien untuk konsumen Kafka.
partition.discovery.interval.ms
Menentukan interval dalam milidetik untuk menemukan partisi baru. Nilai
-1menonaktifkan penemuan partisi dinamis.CatatanDalam Batch Mode, properti ini secara otomatis diatur ke
-1.register.consumer.metrics
Menentukan apakah akan mendaftarkan metrik konsumen Kafka di Flink.
Konfigurasi Konsumen Kafka Lainnya
Untuk daftar lengkap konfigurasi konsumen Kafka, lihat dokumentasi resmi Apache Kafka.
PentingUntuk memastikan operasi yang benar, DataStream Connector Kafka menimpa properti yang dikonfigurasi secara manual berikut:
key.deserializerselalu ditimpa ke org.apache.kafka.common.serialization.ByteArrayDeserializer.value.deserializerselalu ditimpa ke org.apache.kafka.common.serialization.ByteArrayDeserializer.auto.offset.reset.strategyditimpa oleh strategi yang disediakan olehOffsetsInitializer.
Contoh berikut menunjukkan cara mengonfigurasi konsumen Kafka untuk menggunakan mekanisme SASL PLAIN dan menyediakan konfigurasi JAAS.
KafkaSource.builder() .setProperty("sasl.mechanism", "PLAIN") .setProperty("sasl.jaas.config", "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"username\" password=\"password\";")Pemantauan
Kafka Source mengekspos metrik melalui sistem metrik Flink untuk pemantauan dan diagnostik.
Cakupan metrik
Semua metrik untuk pembaca sumber Kafka terdaftar di bawah grup metrik
KafkaSourceReader, yang merupakan subgrup dari grup metrik operator. Metrik yang terkait dengan Partisi Topik tertentu terdaftar di subgrupKafkaSourceReader.topic.<topic_name>.partition.<partition_id>.Misalnya, metrik Offset Konsumen saat ini (
currentOffset) untuk Partisi 1 dari topik "my-topic" tersedia di.operator.KafkaSourceReader.topic.my-topic.partition.1.currentOffset. Jumlah komit yang berhasil ( commitsSucceeded) tersedia di.operator.KafkaSourceReader.commitsSucceeded. Daftar metrik
Metrik
Deskripsi
Cakupan
currentOffset
Offset konsumen saat ini pada sebuah partisi.
TopicPartition
committedOffset
Offset terakhir yang dikomit untuk partisi.
TopicPartition
commitsSucceeded
Jumlah total komit offset yang berhasil.
KafkaSourceReader
commitsFailed
Jumlah komit yang gagal
KafkaSourceReader
Metrik konsumen Kafka
Metrik konsumen Kafka yang mendasari terdaftar dalam kelompok metrik KafkaSourceReader.KafkaConsumer. Sebagai contoh, metrik
records-consumed-totalMetrik terdaftar di.operator.KafkaSourceReader.KafkaConsumer.records-consumed-total. Anda dapat menggunakan properti
register.consumer.metricsProperti untuk menentukan apakah akan mendaftarkan metrik konsumen Kafka. Opsi ini diaktifkan secara default (true). Untuk informasi selengkapnya tentang metrik konsumen Kafka, lihat dokumentasi Apache Kafka.
Buat Kafka Sink
Kafka Sink Flink menulis aliran data ke satu atau beberapa topik Kafka.
DataStream<String> stream = ... Properties kafkaProperties = new Properties(); kafkaProperties.setProperty("bootstrap.servers", "localhost:9092"); KafkaSink<String> sink = KafkaSink.<String>builder() .setKafkaProducerConfig(kafkaProperties) .setRecordSerializer( KafkaRecordSerializationSchema.builder() .setTopic("my-topic") .setValueSerializationSchema(new SimpleStringSchema()) .build()) .setDeliveryGuarantee(DeliveryGuarantee.AT_LEAST_ONCE) .build(); stream.sinkTo(sink);Untuk membangun Kafka Sink, Anda harus mengonfigurasi properti berikut.
Parameter
Deskripsi
Properti klien Kafka
Properti
bootstrap.serversProperti wajib diisi. Properti ini menentukan daftar broker Kafka yang dipisahkan koma.Serializer catatan
Anda harus menyediakan
KafkaRecordSerializationSchemauntuk mengonversi data input menjadiProducerRecordKafka. Flink menyediakan builder skema yang menawarkan komponen umum, seperti serialisasi untuk kunci dan nilai pesan, pemilihan topik, dan partisi pesan. Anda juga dapat mengimplementasikan antarmuka yang sesuai untuk kontrol lebih rinci. Metode ProducerRecord<byte[], byte[]> serialize(T element, KafkaSinkContext context, Long timestamp) dipanggil untuk setiap catatan masuk untuk menghasilkan ProducerRecord yang akan ditulis ke Kafka.ProducerRecordmemberikan kontrol rinci tentang cara setiap catatan ditulis ke Kafka, memungkinkan Anda untuk:Atur Topic tujuan.
Atur Message Key.
Menentukan Partisi tujuan.
Jaminan pengiriman
Parameter
bootstrap.serverswajib diisi dan menentukan daftar broker Kafka yang dipisahkan koma.Jaminan pengiriman
Saat checkpoint Flink diaktifkan, Kafka Sink Flink dapat memberikan semantik tepat-sekali. Selain mengaktifkan checkpoint, Anda dapat menggunakan parameter DeliveryGuarantee untuk menentukan jaminan pengiriman yang berbeda. Parameter DeliveryGuarantee menyediakan opsi berikut:
DeliveryGuarantee.NONE: (Default) Flink tidak memberikan jaminan. Data mungkin hilang atau diduplikasi.
DeliveryGuarantee.AT_LEAST_ONCE: Menjamin bahwa tidak ada data yang hilang, tetapi duplikasi mungkin terjadi.
DeliveryGuarantee.EXACTLY_ONCE: Menggunakan transaksi Kafka untuk memberikan semantik tepat-sekali.
CatatanSaat menggunakan semantik EXACTLY_ONCE, lihat Pertimbangan untuk semantik EXACTLY_ONCE.
Ingesti Data
Anda dapat menggunakan Konektor Kafka sebagai Sumber atau Sink untuk membuat pekerjaan YAML untuk ingesti data.
Batasan
Gunakan Realtime Compute for Apache Flink (VVR) 11.1 atau versi setelahnya untuk mengingesti data CDC Flink dari Sumber Data Kafka.
Hanya JSON, Debezium JSON, dan Canal JSON yang didukung.
Hanya Realtime Compute for Apache Flink (VVR) 8.0.11 dan versi setelahnya yang mendukung pembacaan data dari satu tabel yang didistribusikan di beberapa Partisi.
Sintaksis
source:
type: kafka
name: Kafka source
properties.bootstrap.servers: localhost:9092
topic: ${kafka.topic}sink:
type: kafka
name: Kafka Sink
properties.bootstrap.servers: localhost:9092Parameter
Umum
Parameter
Deskripsi
Wajib
Jenis
Default
Keterangan
type
Tipe sumber atau sink.
Ya
String
N/A
Nilainya harus
kafka.name
Nama sumber atau sink.
Tidak
String
N/A
N/A
properties.bootstrap.servers
Alamat broker Kafka.
Ya
String
N/A
Formatnya adalah
host:port,host:port,host:port, dipisahkan dengan koma (,).properties.*
Properti konfigurasi untuk klien Kafka.
Tidak
String
N/A
Kunci properti harus merupakan opsi yang valid sebagaimana didefinisikan dalam dokumentasi resmi Apache Kafka untuk Producer Configs dan Consumer Configs.
Realtime Compute for Apache Flink (VVR) menghapus prefiks
properties.sebelum meneruskan pasangan kunci-nilai yang tersisa ke klien Kafka yang mendasari. Misalnya, atur'properties.allow.auto.create.topics' = 'false'untuk menonaktifkan pembuatan topik otomatis.key.format
Format untuk serialisasi dan deserialisasi kunci pesan Kafka.
Tidak
String
N/A
Untuk sumber, hanya format
jsonyang didukung.Untuk sink, nilai yang valid adalah:
csv
json
CatatanOpsi ini hanya didukung di Realtime Compute for Apache Flink (VVR) 11.0.0 dan versi setelahnya.
value.format
Format untuk serialisasi dan deserialisasi nilai pesan Kafka.
Tidak
String
debezium-json
Untuk sumber, nilai yang valid adalah:
debezium-json
canal-json
json
Untuk sink, nilai yang valid adalah:
debezium-json
canal-json
canal-protobuf
CatatanFormat
debezium-jsondancanal-jsonmemerlukan Realtime Compute for Apache Flink (VVR) versi 8.0.10 atau versi setelahnya.Format
jsonmemerlukan Realtime Compute for Apache Flink (VVR) versi 11.0.0 atau versi setelahnya.
Parameter sumber
Parameter
Deskripsi
Wajib
Jenis
Default
Keterangan
topic
Topik atau topik yang akan dibaca.
Tidak
String
N/A
Untuk berlangganan beberapa topik, pisahkan nama mereka dengan titik koma (;), misalnya,
topic-1;topic-2.CatatanTentukan salah satu parameter ini atau
topic-pattern, tetapi tidak keduanya.topic-pattern
Ekspresi reguler yang cocok dengan nama topik untuk berlangganan.
Tidak
String
N/A
CatatanTentukan salah satu parameter ini atau
topic, tetapi tidak keduanya.properties.group.id
ID kelompok konsumen.
Tidak
String
N/A
Saat menentukan ID kelompok konsumen baru, Anda harus mengatur parameter properties.auto.offset.reset ke
earliestataulatestuntuk menentukan offset awal.scan.startup.mode
Offset awal untuk konsumen Kafka.
Tidak
String
group-offsets
Nilai yang valid:
earliest-offset: Mulai membaca dari offset paling awal yang tersedia.
latest-offset: Mulai membaca dari offset terbaru.
group-offsets (Nilai default): Mulai membaca dari offset yang dikomit untuk properties.group.id yang ditentukan.
timestamp: Mulai membaca dari timestamp yang ditentukan oleh scan.startup.timestamp-millis.
specific-offsets: Mulai membaca dari offset yang ditentukan oleh scan.startup.specific-offsets.
CatatanParameter ini hanya berlaku saat pekerjaan dimulai dengan startup tanpa status. Saat pekerjaan berstatus dimulai, selalu mengonsumsi dari offset yang disimpan dalam statusnya.
scan.startup.specific-offsets
Offset awal untuk setiap partisi saat
scan.startup.modediatur kespecific-offsets.Tidak
String
N/A
Misalnya,
partition:0,offset:42;partition:1,offset:300scan.startup.timestamp-millis
Timestamp awal dalam milidetik saat
scan.startup.modediatur ketimestamp.Tidak
Long
N/A
Unitnya adalah milidetik.
scan.topic-partition-discovery.interval
Interval untuk menemukan partisi baru dalam topik secara dinamis.
Tidak
Duration
5 menit
Konektor secara berkala menemukan dan membaca dari partisi baru. Saat menggunakan
topic-pattern, konektor juga menemukan topik baru yang cocok dengan pola tersebut. Untuk menonaktifkan penemuan, atur nilai ini ke 0 atau kurang.scan.check.duplicated.group.id
Memeriksa apakah kelompok konsumen yang ditentukan oleh
properties.group.idadalah duplikat.Tidak
Boolean
false
Nilai yang valid:
true: Memeriksa duplikasi kelompok konsumen sebelum pekerjaan dimulai. Jika ditemukan duplikat, pekerjaan gagal.
false: Memulai pekerjaan tanpa memeriksa konflik.
schema.inference.strategy
Strategi penguraian skema.
Tidak
String
continuous
Nilai yang valid:
continuous: Mengurai skema setiap catatan data. Jika skema tidak kompatibel, sistem menyimpulkan skema yang lebih luas dan menghasilkan event perubahan skema.
static: Melakukan penguraian skema hanya sekali saat pekerjaan dimulai. Data kemudian diurai berdasarkan skema awal ini, dan tidak ada event perubahan skema yang dihasilkan.
CatatanUntuk informasi selengkapnya tentang penguraian skema, lihat Kebijakan untuk penguraian dan evolusi skema.
Opsi konfigurasi ini hanya didukung di Ververica Runtime (VVR) 8.0.11 dan versi setelahnya.
scan.max.pre.fetch.records
Jumlah maksimum pesan yang dikonsumsi dan diurai dari setiap partisi untuk inferensi skema awal.
Tidak
Int
50
Sebelum pemrosesan data dimulai, sistem melakukan pre-fetch dan mengonsumsi jumlah pesan terbaru yang ditentukan dari setiap partisi untuk menginisialisasi skema.
key.fields-prefix
Prefiks khusus yang ditambahkan ke nama bidang dari kunci pesan untuk menghindari konflik nama.
Tidak
String
N/A
Misalnya, jika parameter ini diatur ke
key_, dan kunci pesan berisi bidang bernamaa, nama bidang yang diurai menjadikey_a.CatatanNilai
key.fields-prefixtidak boleh menjadi prefiks dari nilaivalue.fields-prefix.value.fields-prefix
Prefiks khusus yang ditambahkan ke nama bidang dari nilai pesan untuk menghindari konflik nama.
Tidak
String
N/A
Misalnya, jika parameter ini diatur ke
value_, dan nilai pesan berisi bidang bernamab, nama bidang yang diurai menjadivalue_b.CatatanNilai
value.fields-prefixtidak boleh menjadi prefiks dari nilaikey.fields-prefix.metadata.list
Kolom metadata yang akan diteruskan ke sink hilir.
Tidak
String
N/A
Kolom metadata yang tersedia meliputi
topic,partition,offset,timestamp,timestamp-type,headers, danleader-epoch. Pisahkan nama kolom dengan koma.scan.value.initial-schemas.ddls
Pernyataan DDL yang menentukan skema awal untuk tabel tertentu.
Tidak
String
N/A
Gunakan titik koma (
;) untuk memisahkan beberapa pernyataan DDL. Misalnya, gunakanCREATE TABLE db1.t1 (id BIGINT, name VARCHAR(10)); CREATE TABLE db1.t2 (id BIGINT);untuk menentukan skema awal untuk tabel db1.t1 dan db1.t2, masing-masing.Skema tabel yang didefinisikan dalam DDL harus konsisten dengan tabel sink target dan mematuhi sintaksis SQL Flink.
CatatanOpsi konfigurasi ini hanya didukung di Ververica Runtime (VVR) 11.5 dan versi setelahnya.
ingestion.ignore-errors
Menentukan apakah akan mengabaikan error penguraian data.
Tidak
Boolean
false
CatatanOpsi konfigurasi ini hanya didukung di Ververica Runtime (VVR) 11.5 dan versi setelahnya.
ingestion.error-tolerance.max-count
Jika
ingestion.ignore-errorsadalahtrue, parameter ini menentukan jumlah maksimum error penguraian yang dapat ditoleransi sebelum pekerjaan gagal.Tidak
Integer
-1
Parameter ini hanya berlaku saat
ingestion.ignore-errorsdiatur ketrue. Nilai -1 menunjukkan toleransi tak terbatas, artinya exception penguraian tidak akan menyebabkan pekerjaan gagal.CatatanOpsi konfigurasi ini hanya didukung di Ververica Runtime (VVR) 11.5 dan versi setelahnya.
Parameter format Debezium JSON
Parameter
Wajib
Tipe
Default
Deskripsi
debezium-json.distributed-tables
Tidak
Boolean
false
Atur ke
truejika data untuk satu tabel Debezium JSON didistribusikan di beberapa partisi.CatatanOpsi konfigurasi ini hanya didukung di Ververica Runtime (VVR) 8.0.11 dan versi setelahnya.
PentingMemodifikasi parameter ini memerlukan startup tanpa status.
debezium-json.schema-include
Tidak
Boolean
false
Menentukan apakah pesan Debezium JSON menyertakan skema. Ini sesuai dengan properti
value.converter.schemas.enabledalam konfigurasi Debezium Kafka Connect.Nilai yang valid:
true: Pesan Debezium JSON berisi skema.
false: Pesan Debezium JSON tidak berisi skema.
debezium-json.ignore-parse-errors
Tidak
Boolean
false
Nilai yang valid:
true: Melewatkan baris yang menyebabkan exception penguraian.
false: Melemparkan error dan pekerjaan gagal.
debezium-json.infer-schema.primitive-as-string
Tidak
Boolean
false
Menentukan apakah akan mengurai semua tipe primitif sebagai tipe
Stringsaat mengurai skema tabel.Nilai yang valid:
true: Mengurai semua tipe primitif sebagai
String.false: Mengurai tipe berdasarkan aturan default.
Parameter format Canal JSON
Parameter
Wajib
Tipe
Default
Deskripsi
canal-json.distributed-tables
Tidak
Boolean
false
Jika data untuk satu tabel dalam Canal JSON didistribusikan di beberapa partisi, Anda harus mengaktifkan opsi ini.
CatatanOpsi konfigurasi ini hanya didukung di Ververica Runtime (VVR) 8.0.11 dan versi setelahnya.
PentingMemodifikasi parameter ini memerlukan startup tanpa status.
canal-json.database.include
Tidak
String
N/A
Ekspresi reguler opsional untuk memfilter changelog berdasarkan bidang metadata
databasedalam catatan Canal. Hanya catatan dari database yang cocok yang diproses. Ekspresi reguler ini kompatibel dengan kelas Pattern Java.canal-json.table.include
Tidak
String
N/A
Ekspresi reguler opsional untuk memfilter changelog berdasarkan bidang metadata
tabledalam catatan Canal. Hanya catatan dari tabel yang cocok yang diproses. Ekspresi reguler ini kompatibel dengan kelas Pattern Java.canal-json.ignore-parse-errors
Tidak
Boolean
false
Nilai yang valid:
true: Melewatkan baris saat ini jika terjadi exception penguraian.
false: Melemparkan error dan pekerjaan gagal dimulai.
canal-json.infer-schema.primitive-as-string
Tidak
Boolean
false
Menentukan apakah akan mengurai semua tipe primitif sebagai tipe
Stringsaat mengurai skema tabel.Nilai yang valid:
true: Mengurai semua tipe primitif sebagai
String.false: Mengurai tipe berdasarkan aturan default.
canal-json.infer-schema.strategy
Tidak
String
AUTO
Menentukan strategi untuk mengurai skema tabel.
Nilai yang valid:
AUTO: Secara otomatis mengurai skema dari data JSON. Disarankan jika data tidak berisi bidang
sqlType, untuk mencegah kegagalan penguraian.SQL_TYPE: Mengurai skema dari array
sqlTypedalam data Canal JSON. Kami menyarankan mengatur ini ke SQL_TYPE untuk mendapatkan tipe yang lebih tepat jika data berisi bidangsqlType.MYSQL_TYPE: Mengurai skema dari array
mysqlTypedalam data Canal JSON.
Untuk informasi selengkapnya tentang aturan pemetaan tipe
sqlType, lihat Penguraian Skema Canal JSON.CatatanKonfigurasi ini hanya didukung di Ververica Runtime (VVR) 11.1 dan versi setelahnya.
Nilai
MYSQL_TYPEdidukung di Ververica Runtime (VVR) 11.3 dan versi setelahnya.
canal-json.mysql.treat-mysql-timestamp-as-datetime-enabled
Tidak
Boolean
true
Menentukan apakah akan memetakan tipe MySQL
TIMESTAMPke tipe CDCTIMESTAMP.true: Tipe MySQL
TIMESTAMPdipetakan ke tipe CDCTIMESTAMP.false: Tipe MySQL
TIMESTAMPdipetakan ke tipe CDCTIMESTAMP_LTZ.
canal-json.mysql.treat-tinyint1-as-boolean.enabled
Tidak
Boolean
true
Saat menggunakan strategi penguraian
MYSQL_TYPE, menentukan apakah akan memetakan tipe MySQLTINYINT(1)ke tipe CDCBOOLEAN.true: Tipe MySQL
TINYINT(1)dipetakan ke tipe CDCBOOLEAN.false: Tipe MySQL
TINYINT(1)dipetakan ke tipe CDCTINYINT(1).
Opsi ini hanya berlaku saat
canal-json.infer-schema.strategydiatur keMYSQL_TYPE.Parameter format JSON
Parameter
Wajib
Tipe
Default
Deskripsi
json.timestamp-format.standard
Tidak
String
SQL
Format timestamp untuk data input dan output.
SQL: Mengurai timestamp input dalam format
yyyy-MM-dd HH:mm:ss.s{precision}, seperti2020-12-30 12:13:14.123.ISO-8601: Mengurai timestamp input dalam format
yyyy-MM-ddTHH:mm:ss.s{precision}, seperti2020-12-30T12:13:14.123.
json.ignore-parse-errors
Tidak
Boolean
false
Nilai yang valid:
true: Melewatkan baris saat ini jika terjadi exception penguraian.
false: Melemparkan error dan pekerjaan gagal dimulai.
json.infer-schema.primitive-as-string
Tidak
Boolean
false
Menentukan apakah akan mengurai semua tipe primitif sebagai tipe
Stringsaat mengurai skema tabel.Nilai yang valid:
true: Mengurai semua tipe primitif sebagai
String.false: Mengurai tipe berdasarkan aturan default.
json.infer-schema.flatten-nested-columns.enable
Tidak
Boolean
false
Menentukan apakah akan memperluas kolom bersarang dalam data JSON secara rekursif. Nilai yang valid:
true: Memperluas kolom bersarang secara rekursif.
false: Memperlakukan kolom bersarang sebagai
String.
json.decode.parser-table-id.fields
Tidak
String
N/A
Menentukan apakah akan menggunakan nilai beberapa bidang JSON untuk menghasilkan tableId saat mengurai data dalam format JSON. Nilai beberapa bidang digabungkan dengan koma Inggris
,. Misalnya, jika data JSON adalah{"col0":"a", "col1","b", "col2","c"}, hasil yang dihasilkan adalah sebagai berikut:Konfigurasi
tableId
col0
a
col0,col1
a.b
col0,col1,col2
a.b.c
json.infer-schema.fixed-types
Tidak
String
N/A
Saat Anda mengurai data JSON, Anda dapat menentukan tipe data untuk bidang tertentu. Gunakan koma
,untuk memisahkan beberapa bidang. Misalnya,id BIGINT, name VARCHAR(10)menentukan bahwa bidangidbertipe BIGINT dan bidangnamebertipe VARCHAR(10).CatatanOpsi konfigurasi ini hanya didukung di Ververica Runtime (VVR) 11.5 dan versi setelahnya.
Saat menggunakan konfigurasi ini dengan Ververica Runtime (VVR) versi 11.5, Anda juga harus menambahkan konfigurasi
scan.max.pre.fetch.records: 0.
Parameter tabel sink
Parameter
Deskripsi
Wajib
Tipe
Default
Keterangan
type
Tipe sink.
Ya
String
None.
Nilainya harus
kafka.name
Nama sink.
Tidak
String
None.
N/A
topic
Nama topik Kafka.
Tidak
String
None.
Jika parameter ini ditentukan, semua data ditulis ke topik ini.
CatatanJika tidak ditentukan, setiap catatan ditulis ke topik yang dinamai sesuai TableID-nya. TableID dibentuk dengan menggabungkan nama database dan tabel menggunakan titik (
.), misalnyadatabaseName.tableName.partition.strategy
Strategi penulisan data ke partisi Kafka.
Tidak
String
all-to-zero
Nilai yang valid:
all-to-zero(default): Menulis semua data ke Partisi 0.hash-by-key: Menulis data ke partisi berdasarkan nilai hash kunci primer, sehingga catatan dengan kunci primer yang sama selalu ditulis ke partisi yang sama untuk menjaga urutan.
sink.tableId-to-topic.mapping
Pemetaan nama tabel upstream ke nama topik Kafka downstream.
Tidak
String
None.
Pisahkan setiap pemetaan dengan titik koma (
;). Dalam satu pemetaan, pisahkan nama tabel hulu dan nama topik Kafka hulu dengan titik dua (:). Nama tabel dapat menggunakan ekspresi reguler. Untuk memetakan beberapa tabel ke satu topik yang sama, pisahkan nama tabel dengan koma (,). Contoh:mydb.mytable1:topic1;mydb.mytable2:topic2.CatatanParameter ini memungkinkan Anda mengubah topik tujuan sambil tetap mempertahankan informasi nama tabel asli.
Parameter format Debezium JSON
Parameter
Wajib
Type
Default
Deskripsi
debezium-json.include-schema.enabled
Tidak
Boolean
false
Menentukan apakah informasi skema disertakan dalam data Debezium JSON.
debezium-json.emit.full-table-id.enabled
Tidak
Boolean
false
Menentukan apakah ID tabel tiga bagian lengkap dituliskan ke bidang metadata Debezium JSON.
Jika parameter ini diaktifkan, pemetaannya adalah sebagai berikut:
Bagian ID Tabel CDC
Debezium JSON Key
Namespace
dbSkema
schemaTabel
tableJika parameter ini dinonaktifkan, pemetaannya adalah sebagai berikut:
Bagian ID Tabel CDC
Debezium JSON Key
Namespace
Tidak dipetakan
Skema
dbTabel
tableCatatanParameter ini hanya didukung di Ververica Runtime (VVR) 11.6 dan versi setelahnya.
Contoh
Gunakan Kafka sebagai Sumber Ingesti Data:
source: type: kafka name: Kafka source properties.bootstrap.servers: ${kafka.bootstraps.server} topic: ${kafka.topic} value.format: ${value.format} scan.startup.mode: ${scan.startup.mode} sink: type: hologres name: Hologres sink endpoint: <yourEndpoint> dbname: <yourDbname> username: ${secret_values.ak_id} password: ${secret_values.ak_secret} sink.type-normalize-strategy: BROADENGunakan Kafka sebagai Sink Ingesti Data:
source: type: mysql name: MySQL Source hostname: ${secret_values.mysql.hostname} port: ${mysql.port} username: ${secret_values.mysql.username} password: ${secret_values.mysql.password} tables: ${mysql.source.table} server-id: 8601-8604 sink: type: kafka name: Kafka Sink properties.bootstrap.servers: ${kafka.bootstraps.server} route: - source-table: ${mysql.source.table} sink-table: ${kafka.topic}Modul
routemenentukan Topik Kafka tujuan untuk Tabel Sumber.
Secara default, fitur pembuatan Topik otomatis dinonaktifkan untuk ApsaraMQ for Kafka. Untuk informasi selengkapnya, lihat FAQ tentang pembuatan topik otomatis. Anda harus membuat Topik sebelum menulis data ke ApsaraMQ for Kafka. Untuk informasi selengkapnya, lihat Langkah 3: Buat sumber daya.
Kebijakan untuk penguraian dan evolusi skema
Konektor Kafka mempertahankan skema semua tabel yang saat ini diketahui.
Inisialisasi skema tabel
Informasi Skema Tabel mencakup informasi kolom dan tipe data, informasi database dan tabel, serta informasi Kunci Primer. Bagian berikut menjelaskan cara menginisialisasi ketiga jenis informasi ini.
Informasi kolom dan tipe data
Pekerjaan Ingesti Data dapat secara otomatis menyimpulkan kolom dan tipe data untuk tabel dari data. Namun, dalam beberapa skenario, Anda mungkin ingin menentukan kolom dan tipe untuk tabel tertentu. Bergantung pada granularitas tipe yang ditentukan pengguna, terdapat tiga strategi untuk menginisialisasi skema tabel:
Inferensi skema otomatis penuh
Sebelum membaca data dari Kafka, konektor Kafka mencoba mengonsumsi hingga scan.max.pre.fetch.records pesan dari setiap partisi, mengurai skema setiap pesan, dan menggabungkan skema ini untuk menginisialisasi skema tabel. Event pembuatan tabel kemudian dihasilkan berdasarkan skema yang diinisialisasi ini sebelum data benar-benar dikonsumsi.
Untuk format Debezium JSON dan Canal JSON, informasi tabel terdapat dalam setiap pesan. Pesan yang diambil sebelumnya berdasarkan parameter scan.max.pre.fetch.records mungkin berisi data dari beberapa tabel. Oleh karena itu, jumlah catatan yang diambil sebelumnya untuk satu tabel tertentu tidak dapat ditentukan. Pengambilan sebelumnya dan inisialisasi skema hanya dilakukan sekali untuk setiap partisi sebelum pesannya dikonsumsi dan diproses. Jika data untuk tabel baru muncul nanti, skema yang diurai dari catatan pertama tabel tersebut digunakan sebagai skema awalnya, dan skema tidak diambil sebelumnya atau diinisialisasi lagi.
Distribusi data dari satu tabel di beberapa partisi hanya didukung di Ververica Runtime (VVR) 8.0.11 dan versi setelahnya, dan mengharuskan Anda mengatur opsi konfigurasi debezium-json.distributed-tables atau canal-json.distributed-tables ke true.
Menentukan skema tabel awal
Dalam beberapa skenario, Anda mungkin perlu menentukan skema tabel awal secara manual—misalnya, saat menulis data dari Kafka ke tabel hilir yang telah dibuat sebelumnya. Untuk melakukannya, tambahkan parameter scan.value.initial-schemas.ddls. Berikut contoh konfigurasinya:
source:
type: kafka
name: Kafka Source
properties.bootstrap.servers: host:9092
topic: test-topic
value.format: json
scan.startup.mode: earliest-offset
# Set the initial table schema
scan.value.initial-schemas.ddls: CREATE TABLE db1.t1 (id BIGINT, name VARCHAR(10)); CREATE TABLE db1.t2 (id BIGINT);Pernyataan DDL harus cocok dengan skema tabel target. Konfigurasi ini menentukan tipe awal kolom id sebagai BIGINT dan kolom name sebagai VARCHAR(10) untuk tabel db1.t1, dan tipe awal kolom id sebagai BIGINT untuk tabel db1.t2.
Pernyataan DDL menggunakan sintaksis SQL Flink.
Menetapkan tipe tetap untuk bidang tertentu
Dalam beberapa skenario, Anda mungkin ingin menetapkan tipe data tetap untuk bidang tertentu. Misalnya, untuk bidang yang mungkin disimpulkan sebagai tipe TIMESTAMP, Anda mungkin ingin mengeluarkannya sebagai string. Dalam hal ini, Anda dapat menambahkan parameter json.infer-schema.fixed-types untuk menentukan skema tabel awal. Parameter ini hanya berlaku saat format pesan adalah JSON. Berikut adalah contoh konfigurasi:
source:
type: kafka
name: Kafka Source
properties.bootstrap.servers: host:9092
topic: test-topic
value.format: json
scan.startup.mode: earliest-offset
# Set specific fields to a fixed type
json.infer-schema.fixed-types: id BIGINT, name VARCHAR(10)
scan.max.pre.fetch.records: 0Konfigurasi ini menentukan bahwa semua bidang id bertipe BIGINT dan semua bidang name bertipe VARCHAR(10).
Tipe data konsisten dengan tipe SQL Flink.
Informasi database dan tabel
Untuk format Canal JSON dan Debezium JSON, konektor mengurai informasi tabel, termasuk nama database dan tabel, dari setiap pesan.
Untuk format JSON, secara default, informasi tabel hanya berisi nama tabel, yaitu nama topik yang berisi data. Jika data Anda berisi informasi database dan tabel, Anda dapat menggunakan parameter json.infer-schema.fixed-types untuk menentukan bidang yang berisi informasi ini. Bidang ini kemudian dipetakan ke nama database dan tabel. Berikut adalah contoh konfigurasi:
source: type: kafka name: Kafka Source properties.bootstrap.servers: host:9092 topic: test-topic value.format: json scan.startup.mode: earliest-offset # Use the value of the col1 field as the database name and the value of the col2 field as the table name json.decode.parser-table-id.fields: col1,col2Dengan konfigurasi ini, konektor mengirim setiap catatan ke tabel di mana nama database adalah nilai bidang
col1dan nama tabel adalah nilai bidangcol2.
Informasi kunci primer
Untuk format Canal JSON, bidang
pkNamesdalam data JSON menentukan Kunci Primer tabel.Untuk format Debezium JSON dan JSON, data tidak berisi informasi Kunci Primer. Anda dapat menambahkan Kunci Primer ke tabel secara manual dengan menggunakan aturan
transform:transform: - source-table: \.*.\.* projection: \* primary-keys: key1, key2
Penguraian skema dan evolusi skema
Setelah skema tabel diinisialisasi, jika schema.inference.strategy diatur ke static, Konektor Kafka mengurai nilai pesan setiap pesan berdasarkan skema tabel awal dan tidak menghasilkan event perubahan skema. Jika schema.inference.strategy diatur ke continuous, Konektor Kafka mengurai nilai pesan setiap pesan Kafka, mengidentifikasi kolom fisiknya, dan membandingkan skema yang dihasilkan dengan skema yang saat ini dipertahankan. Jika skema tidak konsisten, konektor mencoba menggabungkannya dan menghasilkan event perubahan skema tabel yang sesuai. Aturan penggabungan adalah sebagai berikut:
Jika kolom fisik yang diurai berisi bidang yang tidak ada dalam skema saat ini, bidang tersebut ditambahkan ke skema, dan event dihasilkan untuk menambahkannya sebagai kolom nullable.
Jika kolom fisik yang diurai tidak berisi bidang yang ada dalam skema saat ini, bidang tersebut dipertahankan dan nilainya diisi dengan
NULL. Event penghapusan kolom tidak dihasilkan.Bidang dengan nama yang sama ditangani sebagai berikut:
Jika kolom memiliki tipe data yang sama tetapi presisi berbeda, tipe dengan presisi lebih besar digunakan, dan event perubahan tipe kolom dihasilkan.
Jika kolom memiliki tipe data berbeda, sistem menemukan tipe induk umum terkecil dalam pohon hierarki tipe di bawah ini. Sistem kemudian menggunakan tipe induk umum ini untuk kolom dan menghasilkan event perubahan tipe kolom.

Kebijakan evolusi skema yang didukung:
Menambahkan kolom: Konektor menambahkan kolom baru ke akhir skema dan menyinkronkan datanya. Kolom baru diatur sebagai nullable.
Menghapus kolom: Event penghapusan kolom tidak dihasilkan. Sebaliknya, data selanjutnya untuk kolom tersebut diisi dengan
NULL.Mengganti nama kolom: Konektor memperlakukannya sebagai menghapus kolom lama dan menambahkan yang baru. Kolom baru ditambahkan ke akhir skema, dan nilai untuk kolom asli diisi dengan
NULL.Mengubah tipe kolom:
Untuk Sink hilir yang mendukung perubahan tipe kolom, Pekerjaan Ingesti Data dapat menangani perubahan tipe (misalnya, dari
INTkeBIGINT) jika Sink hilir dikonfigurasi untuk memprosesnya. Kemampuan ini bergantung pada aturan perubahan tipe kolom yang didukung oleh sink tertentu. Lihat dokumentasi sink Anda untuk mengetahui aturan yang didukung.Untuk Sink Hilir yang tidak mendukung perubahan tipe kolom, seperti Hologres, Anda dapat menggunakan Pemetaan Tipe Lebar. Fitur ini membuat tabel dengan tipe data yang lebih lebar di Sink Hilir saat pekerjaan dimulai. Sistem kemudian dapat menoleransi perubahan tipe kolom selama tipe baru sesuai dengan tipe yang lebih lebar yang telah didefinisikan di Sink Hilir.
Perubahan skema yang tidak didukung:
Perubahan pada kendala, seperti Kunci Primer atau indeks.
Mengubah kolom dari
NOT NULLkeNULLABLE.
Penguraian skema Canal JSON
Data Canal JSON mungkin berisi bidang
sqlTypeopsional, yang mencatat informasi tipe yang tepat untuk kolom data. Untuk mendapatkan skema yang lebih akurat, Anda dapat mengatur canal-json.infer-schema.strategy keSQL_TYPEuntuk menggunakan tipe dari bidangsqlType. Pemetaan tipe adalah sebagai berikut:Tipe JDBC
Type code
Tipe CDC
BIT
-7
BOOLEAN
BOOLEAN
16
TINYINT
-6
TINYINT
SMALLINT
5
SMALLINT
INTEGER
4
INT
BIGINT
-5
BIGINT
DECIMAL
3
DECIMAL(38,18)
NUMERIC
2
REAL
7
FLOAT
FLOAT
6
DOUBLE
8
DOUBLE
BINARY
-2
BYTES
VARBINARY
-3
LONGVARBINARY
-4
BLOB
2004
DATE
91
DATE
TIME
92
TIME
TIMESTAMP
93
TIMESTAMP
CHAR
1
STRING
VARCHAR
12
LONGVARCHAR
-1
Tipe data lainnya
Toleransi dan pengumpulan data kotor
Sumber Data Kafka Anda mungkin berisi catatan yang rusak, juga dikenal sebagai Data Kotor. Untuk mencegah Pekerjaan Anda sering gagal dan dimulai ulang, Anda dapat mengonfigurasinya untuk mengabaikan catatan tidak valid ini. Misalnya:
source:
type: kafka
name: Kafka Source
properties.bootstrap.servers: host:9092
topic: test-topic
value.format: json
scan.startup.mode: earliest-offset
# Aktifkan Toleransi Data Kotor
ingestion.ignore-errors: true
# Toleransi hingga 1000 catatan data kotor
ingestion.error-tolerance.max-count: 1000Konfigurasi ini mentoleransi hingga 1.000 catatan kotor, memungkinkan Pekerjaan berjalan normal. Jika jumlah catatan kotor melebihi ambang batas ini, Pekerjaan gagal, mendorong Anda untuk memvalidasi data Anda.
Untuk memastikan Pekerjaan Anda tidak pernah gagal karena Data Kotor, gunakan konfigurasi berikut:
source:
type: kafka
name: Kafka Source
properties.bootstrap.servers: host:9092
topic: test-topic
value.format: json
scan.startup.mode: earliest-offset
# Aktifkan Toleransi Data Kotor
ingestion.ignore-errors: true
# Toleransi semua catatan data kotor
ingestion.error-tolerance.max-count: -1Kebijakan Toleransi Data Kotor memastikan Pekerjaan Anda tidak gagal karena catatan tidak valid. Anda mungkin juga ingin menganalisis Data Kotor ini untuk menyesuaikan perilaku produsen data Kafka Anda. Seperti dijelaskan dalam Pengumpulan Data Kotor, Anda dapat melihat Data Kotor Pekerjaan di log TaskManager. Misalnya:
source:
type: kafka
name: Kafka Source
properties.bootstrap.servers: host:9092
topic: test-topic
value.format: json
scan.startup.mode: earliest-offset
# Aktifkan Toleransi Data Kotor
ingestion.ignore-errors: true
# Toleransi semua catatan data kotor
ingestion.error-tolerance.max-count: -1
pipeline:
dirty-data.collector:
# Tulis data kotor ke file log TaskManager
type: loggerPemetaan nama tabel dan topik
Saat menggunakan Kafka sebagai Sink untuk Job Pengambilan Data, format pesan—seperti JSON Debezium atau JSON Canal—mencakup informasi nama tabel asli. Sistem downstream yang mengonsumsi pesan tersebut umumnya menggunakan nama tabel yang disematkan sebagai pengidentifikasi tabel aktual, bukan nama topik. Oleh karena itu, Anda harus mengonfigurasi strategi pemetaan antara nama tabel dan topik dengan cermat.
Anggaplah Anda perlu menyinkronkan dua tabel dari database MySQL: mydb.mytable1 dan mydb.mytable2. Strategi pemetaan berikut tersedia:
1. Tanpa strategi pemetaan
Tanpa strategi pemetaan apa pun, data untuk setiap tabel ditulis ke topik yang dinamai dalam format <Nama Database>.<Nama Tabel>. Oleh karena itu, data dari mydb.mytable1 ditulis ke topik bernama mydb.mytable1, dan data dari mydb.mytable2 ditulis ke topik bernama mydb.mytable2. Berikut adalah contoh konfigurasi:
source:
type: mysql
name: MySQL Source
hostname: ${secret_values.mysql.hostname}
port: ${mysql.port}
username: ${secret_values.mysql.username}
password: ${secret_values.mysql.password}
tables: mydb.mytable1,mydb.mytable2
server-id: 8601-8604
sink:
type: kafka
name: Kafka Sink
properties.bootstrap.servers: ${kafka.bootstraps.server}2. Pemetaan aturan rute (Tidak disarankan)
Anda mungkin ingin menulis data ke topik tertentu alih-alih menggunakan format default <Nama Database>.<Nama Tabel>. Untuk melakukan ini, Anda dapat mengonfigurasi aturan rute. Berikut adalah contoh konfigurasi:
source:
type: mysql
name: MySQL Source
hostname: ${secret_values.mysql.hostname}
port: ${mysql.port}
username: ${secret_values.mysql.username}
password: ${secret_values.mysql.password}
tables: mydb.mytable1,mydb.mytable2
server-id: 8601-8604
sink:
type: kafka
name: Kafka Sink
properties.bootstrap.servers: ${kafka.bootstraps.server}
route:
- source-table: mydb.mytable1,mydb.mytable2
sink-table: mytableDalam hal ini, semua data dari mydb.mytable1 dan mydb.mytable2 ditulis ke satu topik bernama mytable.
Namun, saat Anda menggunakan aturan rute untuk mengubah topik tujuan, hal ini juga memodifikasi nama tabel dalam pesan Kafka (dalam format Debezium JSON atau Canal JSON). Dalam kasus ini, nama tabel dalam semua pesan Kafka menjadi mytable. Hal ini dapat menyebabkan perilaku tak terduga pada sistem yang mengonsumsi pesan dari topik ini.
3. Pemetaan dengan sink.tableId-to-topic.mapping (Direkomendasikan)
Untuk memetakan nama tabel ke topik sambil mempertahankan nama tabel sumber asli, gunakan parameter sink.tableId-to-topic.mapping. Berikut adalah contoh konfigurasi:
source:
type: mysql
name: MySQL Source
hostname: ${secret_values.mysql.hostname}
port: ${mysql.port}
username: ${secret_values.mysql.username}
password: ${secret_values.mysql.password}
tables: mydb.mytable1,mydb.mytable2
server-id: 8601-8604
sink.tableId-to-topic.mapping: mydb.mytable1,mydb.mytable2:mytable
sink:
type: kafka
name: Kafka Sink
properties.bootstrap.servers: ${kafka.bootstraps.server}Atau, Anda dapat menggunakan konfigurasi berikut:
source:
type: mysql
name: MySQL Source
hostname: ${secret_values.mysql.hostname}
port: ${mysql.port}
username: ${secret_values.mysql.username}
password: ${secret_values.mysql.password}
tables: mydb.mytable1,mydb.mytable2
server-id: 8601-8604
sink.tableId-to-topic.mapping: mydb.mytable1:mytable;mydb.mytable2:mytable
sink:
type: kafka
name: Kafka Sink
properties.bootstrap.servers: ${kafka.bootstraps.server}Dalam kasus ini, seluruh data dari mydb.mytable1 dan mydb.mytable2 ditulis ke topik mytable, sedangkan nama tabel dalam pesan Kafka (dalam format JSON Debezium atau JSON Canal) tetap dipertahankan sebagai mydb.mytable1 atau mydb.mytable2. Hal ini memastikan bahwa sistem downstream dapat mengidentifikasi nama tabel sumber asli secara akurat.
Semantik EXACTLY_ONCE
Konfigurasikan Tingkat Isolasi Konsumen
Semua aplikasi yang mengonsumsi data Kafka harus mengatur properti
isolation.level:read_committed: Hanya membaca data yang dikomit.read_uncommitted(Default): Dapat membaca data yang belum dikomit.
EXACTLY_ONCE bergantung pada
read_committed. Jika tidak, konsumen mungkin melihat data yang belum dikomit, sehingga mengganggu konsistensi.Timeout Transaksi dan Kehilangan Data
Saat memulihkan dari Checkpoint, Realtime Compute for Apache Flink hanya mempertimbangkan Transaksi yang dikomit sebelum Checkpoint tersebut dimulai. Jika durasi antara Kegagalan Pekerjaan dan dimulainya ulang melebihi Timeout Transaksi Kafka, Kafka secara otomatis membatalkan Transaksi yang terbuka, yang dapat menyebabkan Kehilangan Data.
transaction.max.timeout.msdefault untuk broker Kafka adalah 15 menit.Secara default, Kafka Sink Flink mengatur parameter
transaction.timeout.mske 1 jam.Anda harus meningkatkan
transaction.max.timeout.mspada broker agar lebih besar dari atau sama dengan pengaturan di Flink.
Kolam Produsen dan Checkpoint Bersamaan
Mode
EXACTLY_ONCEmenggunakan Kolam Produsen Kafka dengan ukuran tetap. Setiap checkpoint menggunakan satu produsen dari kolam ini. Jika jumlah checkpoint bersamaan melebihi ukuran kolam, Pekerjaan gagal.Konfigurasikan ukuran kolam produsen berdasarkan jumlah maksimum checkpoint bersamaan.
Batasan Pengurangan Paralelisme
Jika Pekerjaan gagal sebelum Checkpoint pertama selesai, informasi Kolam Produsen asli hilang saat dimulai ulang. Oleh karena itu, jangan Mengurangi Paralelisme Pekerjaan sebelum Checkpoint pertama selesai. Jika pengurangan diperlukan, Paralelisme baru tidak boleh kurang dari
FlinkKafkaProducer.SAFE_SCALE_DOWN_FACTOR.Transaksi Memblokir Pembacaan
Dalam mode
read_committed, transaksi apa pun yang belum dikomit atau dibatalkan akan memblokir operasi baca pada seluruh Topik.Misalnya:
Transaksi 1 menulis data.
Transaksi 2 menulis lebih banyak data dan dikomit.
Selama Transaksi 1 tetap terbuka, data dari Transaksi 2 yang telah dikomit tidak terlihat oleh Konsumen.
Hal ini memiliki implikasi berikut:
Selama operasi normal, latensi visibilitas data kira-kira sama dengan interval checkpoint.
Jika Pekerjaan gagal, Topik apa pun yang sedang ditulisnya akan diblokir untuk Konsumen hingga Pekerjaan dimulai ulang atau Transaksi habis waktunya. Dalam kasus ekstrem, proses Timeout Transaksi itu sendiri juga dapat memengaruhi operasi baca.