Topik ini menjelaskan cara menggunakan Kafka connector.
Informasi latar belakang
Apache Kafka adalah layanan message queue terdistribusi open source yang banyak digunakan dalam aplikasi big data, seperti pemrosesan data berkinerja tinggi, analitik streaming, dan integrasi data. Kafka connector mendukung ingesti dan egress data ber-throughput tinggi, operasi baca-tulis data dalam berbagai format, serta semantik tepat-sekali (exactly-once semantics) untuk Realtime Compute for Apache Flink dengan memanfaatkan klien Apache Kafka.
Kategori | Deskripsi |
Tipe yang didukung | Tabel sumber, tabel sink, sink ingesti data |
Mode eksekusi | Mode streaming |
Format data | |
Metrik Pemantauan Unik | |
Tipe API | SQL API, DataStream API, dan data ingestion YAML API |
Apakah Anda dapat memperbarui atau menghapus data di tabel sink? | Anda tidak dapat memperbarui atau menghapus data di tabel sink. Anda hanya dapat menyisipkan data ke tabel sink. Catatan Untuk informasi selengkapnya tentang fitur terkait pembaruan atau penghapusan data, lihat Upsert Kafka. |
Prasyarat
Anda dapat menghubungkan ke kluster menggunakan salah satu metode berikut, tergantung pada kebutuhan Anda:
Hubungkan ke kluster ApsaraMQ for Kafka
Versi kluster ApsaraMQ for Kafka adalah 0.11 atau lebih baru.
Kluster ApsaraMQ for Kafka telah dibuat. Untuk informasi selengkapnya, lihat Buat resource.
Ruang kerja Realtime Compute for Apache Flink berada dalam virtual private cloud (VPC) yang sama dengan kluster ApsaraMQ for Kafka, dan blok CIDR VPC Realtime Compute for Apache Flink telah ditambahkan ke daftar putih kluster ApsaraMQ for Kafka. Untuk informasi selengkapnya tentang cara mengonfigurasi daftar putih ApsaraMQ for Kafka, lihat Konfigurasi daftar putih.
PentingSaat menulis data ke ApsaraMQ for Kafka, perhatikan hal berikut:
ApsaraMQ for Kafka tidak mendukung algoritma kompresi Zstandard untuk penulisan data.
ApsaraMQ for Kafka tidak mendukung operasi penulisan idempoten atau transaksional. Oleh karena itu, Anda tidak dapat menggunakan semantik tepat-sekali (exactly-once semantics) yang didukung oleh tabel sink Kafka. Jika pekerjaan Realtime Compute for Apache Flink Anda menggunakan Ververica Runtime (VVR) 8.0.0 atau versi lebih baru, nonaktifkan fitur penulisan idempoten dengan mengatur
properties.enable.idempotence=falseuntuk tabel sink. Untuk informasi selengkapnya tentang perbandingan dan batasan mesin penyimpanan untuk ApsaraMQ for Kafka, lihat Perbandingan antar mesin penyimpanan.
Hubungkan ke kluster Apache Kafka yang dikelola sendiri
Versi kluster Apache Kafka yang dikelola sendiri adalah 0.11 atau lebih baru.
Terdapat koneksi jaringan antara Flink dan kluster Apache Kafka yang dikelola sendiri. Untuk informasi selengkapnya tentang menghubungkan kluster Apache Kafka yang dikelola sendiri melalui jaringan publik, lihat Opsi Konektivitas Jaringan.
Hanya opsi klien Apache Kafka 2.8 yang didukung. Untuk informasi selengkapnya, lihat dokumentasi Apache Kafka Consumer Configs dan Producer Configs.
Catatan penggunaan
Saat ini, penulisan transaksional tidak disarankan karena keterbatasan desain di komunitas Flink dan Kafka. Saat Anda mengatur sink.delivery-guarantee = exactly-once, Kafka connector mengaktifkan penulisan transaksional, yang memiliki tiga masalah yang diketahui:
Setiap checkpoint menghasilkan ID transaksi baru. Jika interval checkpoint terlalu pendek, jumlah ID transaksi menjadi berlebihan. Akibatnya, koordinator Kafka mungkin kehabisan memori, sehingga mengganggu stabilitas kluster Kafka.
Setiap transaksi membuat instance produsen. Jika terlalu banyak transaksi dikomit secara bersamaan, TaskManager mungkin kehabisan memori, sehingga mengganggu pekerjaan Flink.
Jika beberapa pekerjaan Flink menggunakan
sink.transactional-id-prefixyang sama, ID transaksi yang dihasilkan mungkin bertabrakan. Jika suatu pekerjaan gagal menulis, hal ini akan memblokir Log Start Offset (LSO) partisi Kafka agar tidak maju, sehingga memengaruhi semua konsumen yang membaca dari partisi tersebut.
Jika Anda memerlukan semantik tepat-sekali, gunakan Upsert Kafka untuk menulis ke tabel dengan primary key dan mengandalkan primary key tersebut untuk memastikan idempotensi. Untuk menggunakan penulisan transaksional, lihat Catatan tentang Semantik Tepat-Sekali.
Pemecahan masalah konektivitas jaringan
Jika pekerjaan Flink melaporkan error Timed out waiting for a node assignment saat startup, hal ini biasanya menunjukkan adanya masalah konektivitas jaringan antara Flink dan Kafka.
Klien Kafka terhubung ke server seperti dijelaskan di bawah ini:
Anda terhubung ke Kafka menggunakan alamat di
bootstrap.servers.Kafka mengembalikan metadata untuk setiap broker di kluster, seperti titik akhirnya.
Klien kemudian terhubung ke setiap broker menggunakan titik akhir yang dikembalikan untuk memproduksi atau mengonsumsi data.
Bahkan jika alamat bootstrap.servers dapat dijangkau, klien tidak dapat membaca atau menulis ke broker jika Kafka mengembalikan alamat broker yang salah. Masalah ini umum terjadi pada arsitektur jaringan yang menggunakan proxy, penerusan port, atau jalur sewa.
Langkah-langkah pemecahan masalah
ApsaraMQ for Kafka
Konfirmasi jenis titik akses
Titik akses default (jaringan internal)
Titik akses SASL (jaringan internal dengan otentikasi)
Titik akses jaringan publik (memerlukan pengajuan terpisah)
Gunakan konsol pengembangan Flink untuk menjalankan probe jaringan guna mengesampingkan masalah konektivitas dengan alamat
bootstrap.servers.Periksa grup keamanan dan daftar putih
Instans Kafka harus menambahkan VPC Flink ke daftar putihnya. Untuk informasi selengkapnya, lihat Lihat blok CIDR VPC dan Konfigurasi daftar putih.
Periksa konfigurasi SASL (jika diaktifkan)
Jika Anda menggunakan titik akhir SASL_SSL, Anda harus mengonfigurasi mekanisme JAAS, SSL, dan SASL dengan benar dalam pekerjaan Flink Anda. Otentikasi yang tidak lengkap menyebabkan koneksi gagal selama fase handshake dan dapat muncul sebagai timeout. Untuk informasi selengkapnya, lihat Keamanan dan Otentikasi.
Kafka yang dikelola sendiri (ECS)
Gunakan konsol pengembangan Flink untuk melakukanprobe jaringan.
Esampingkan masalah konektivitas dengan alamat
bootstrap.serversdan verifikasi bahwa titik akhir publik dan internal sudah benar.Periksa grup keamanan dan daftar putih
Grup keamanan ECS harus mengizinkan traffic pada port titik akses Kafka (biasanya 9092 atau 9093).
VPC tempat Flink berada harus ditambahkan ke daftar putih instans ECS. Untuk informasi selengkapnya, lihat Lihat blok CIDR VPC.
Pemecahan masalah konfigurasi
Login ke kluster ZooKeeper yang digunakan oleh Kafka dan gunakan tool zkCli.sh atau zookeeper-shell.sh.
Jalankan perintah untuk mengambil metadata broker. Contoh:
get /brokers/ids/0. Di bidangendpointsrespons, temukan alamat yang diiklankan Kafka kepada klien.
Lakukan probe jaringan di konsol pengembangan Flink untuk menguji apakah alamat tersebut dapat dijangkau.
CatatanJika alamat tidak dapat dijangkau, hubungi engineer O&M Kafka untuk memeriksa dan memperbaiki konfigurasi
listenersdanadvertised.listenersagar alamat yang dikembalikan dapat diakses oleh Flink.Untuk informasi selengkapnya tentang konektivitas klien dan server Kafka, lihat Pemecahan Masalah Konektivitas.
Periksa konfigurasi SASL (jika diaktifkan)
Jika Anda menggunakan titik akhir SASL_SSL, Anda harus mengonfigurasi mekanisme JAAS, SSL, dan SASL dengan benar dalam pekerjaan Flink Anda. Otentikasi yang tidak lengkap menyebabkan koneksi gagal selama fase handshake atau muncul sebagai timeout. Untuk informasi selengkapnya, lihat Keamanan dan Otentikasi.
SQL
Anda dapat menggunakan Kafka connector dalam pekerjaan SQL sebagai tabel sumber atau tabel sink.
Sintaksis
CREATE TABLE KafkaTable (
`user_id` BIGINT,
`item_id` BIGINT,
`behavior` STRING,
`ts` TIMESTAMP_LTZ(3) METADATA FROM 'timestamp' VIRTUAL
) WITH (
'connector' = 'kafka',
'topic' = 'user_behavior',
'properties.bootstrap.servers' = 'localhost:9092',
'properties.group.id' = 'testGroup',
'scan.startup.mode' = 'earliest-offset',
'format' = 'csv'
)Kolom metadata
Anda dapat mendefinisikan kolom metadata di tabel sumber Kafka atau tabel sink Kafka untuk mengambil metadata pesan Kafka. Misalnya, jika beberapa topik didefinisikan dalam klausa WITH untuk tabel sumber Kafka dan kolom metadata didefinisikan di tabel sumber, topik tempat Flink membaca data akan ditandai. Potongan kode berikut memberikan contoh cara menggunakan kolom metadata:
CREATE TABLE kafka_source (
-- Baca topik tempat pesan tersebut berada sebagai nilai bidang record_topic.
`record_topic` STRING NOT NULL METADATA FROM 'topic' VIRTUAL,
-- Baca timestamp di ConsumerRecord sebagai nilai bidang ts.
`ts` TIMESTAMP_LTZ(3) METADATA FROM 'timestamp' VIRTUAL,
-- Baca offset pesan sebagai nilai bidang record_offset.
`record_offset` BIGINT NOT NULL METADATA FROM 'offset' VIRTUAL,
...
) WITH (
'connector' = 'kafka',
...
);
CREATE TABLE kafka_sink (
-- Tulis timestamp di bidang ts sebagai timestamp ProducerRecord ke Kafka.
`ts` TIMESTAMP_LTZ(3) METADATA FROM 'timestamp' VIRTUAL,
...
) WITH (
'connector' = 'kafka',
...
);Tabel berikut menjelaskan kolom metadata yang didukung oleh tabel sumber dan tabel sink Kafka.
Kunci | Tipe data | Deskripsi | Tabel sumber atau tabel sink |
topic | STRING NOT NULL METADATA VIRTUAL | Nama topik tempat pesan Kafka berada. | Tabel sumber |
partition | INT NOT NULL METADATA VIRTUAL | ID partisi tempat pesan Kafka berada. | Tabel sumber |
headers | MAP<STRING, BYTES> NOT NULL METADATA VIRTUAL | Header pesan Kafka. | Tabel sumber dan tabel sink |
leader-epoch | INT NOT NULL METADATA VIRTUAL | Leader epoch pesan Kafka. | Tabel sumber |
offset | BIGINT NOT NULL METADATA VIRTUAL | Offset pesan Kafka. | Tabel sumber |
timestamp | TIMESTAMP(3) WITH LOCAL TIME ZONE NOT NULL METADATA VIRTUAL | Timestamp pesan Kafka. | Tabel sumber dan tabel sink |
timestamp-type | STRING NOT NULL METADATA VIRTUAL | Tipe timestamp pesan Kafka:
| Tabel sumber |
__raw_key__ | STRING NOT NULL METADATA VIRTUAL | Bidang kunci mentah pesan Kafka. | Tabel sumber dan tabel sink Catatan Parameter ini hanya didukung untuk Realtime Compute for Apache Flink yang menggunakan VVR 11.4 atau versi lebih baru. |
__raw_value__ | STRING NOT NULL METADATA VIRTUAL | Bidang nilai mentah pesan Kafka. | Tabel sumber dan tabel sink Catatan Parameter ini hanya didukung untuk Realtime Compute for Apache Flink yang menggunakan VVR 11.4 atau versi lebih baru. |
dengan parameter
Umum
parameter
Deskripsi
Tipe data
Wajib
Nilai default
Keterangan
connector
Tipe tabel.
String
Ya
Tidak ada
Atur nilainya ke kafka.
properties.bootstrap.servers
Alamat IP dan nomor port broker Kafka.
String
Ya
Tidak ada
Format: host:port,host:port,host:port. Pisahkan beberapa pasangan host:port dengan koma (,).
properties.*
Opsi yang dikonfigurasi untuk klien Kafka.
String
Tidak
Tidak ada
Akhiran harus mematuhi aturan yang ditentukan dalam Producer Configs dan Consumer Configs.
Flink menghapus awalan "properties." dan meneruskan konfigurasi yang tersisa ke klien Kafka. Misalnya, Anda dapat menonaktifkan pembuatan topik otomatis menggunakan
'properties.allow.auto.create.topics'='false'.Konfigurasi berikut tidak dapat dimodifikasi dengan cara ini karena Kafka connector menimpa konfigurasi tersebut:
key.deserializer
value.deserializer
format
Format yang digunakan untuk membaca atau menulis bidang nilai pesan Kafka.
String
Tidak
Tidak ada nilai default.
Format yang didukung
csv
json
avro
debezium-json
canal-json
maxwell-json
avro-confluent
raw
CatatanUntuk informasi selengkapnya tentang opsi format, lihat Opsi Format.
key.format
Format yang digunakan untuk membaca atau menulis bidang kunci pesan Kafka.
String
Tidak
Tidak ada nilai default.
Format yang didukung
csv
json
avro
debezium-json
canal-json
maxwell-json
avro-confluent
raw
CatatanJika Anda menggunakan konfigurasi ini, konfigurasi key.options wajib diisi.
key.fields
Bidang kunci di tabel sumber atau tabel sink yang sesuai dengan bidang kunci pesan Kafka.
String
Tidak
Tidak ada
Pisahkan beberapa nama bidang dengan titik koma (;), seperti
field1;field2.key.fields-prefix
Awalan kustom untuk semua bidang kunci dalam pesan Kafka. Anda dapat mengonfigurasi opsi ini untuk mencegah konflik nama dengan bidang nilai.
String
Tidak
Tidak ada
Opsi ini hanya digunakan untuk membedakan nama kolom tabel sumber dan tabel sink. Awalan dihapus dari nama kolom saat bidang kunci pesan Kafka diurai atau dihasilkan.
CatatanSaat Anda menggunakan konfigurasi ini, Anda harus mengatur opsi
value.fields-includeke EXCEPT_KEY.value.format
Format yang digunakan untuk membaca atau menulis bidang nilai pesan Kafka.
String
Tidak
Tidak ada
Konfigurasi ini setara dengan
format, dan Anda hanya dapat mengonfigurasi salah satu dariformatatauvalue.format. Jika Anda mengonfigurasi keduanya,value.formatakan menimpaformat.value.fields-include
Menentukan apakah kunci pesan yang sesuai disertakan saat mengurai atau menghasilkan nilai pesan Kafka.
String
Tidak
ALL
Nilai yang valid:
ALL(default): Semua bidang diproses sebagai nilai pesan Kafka.EXCEPT_KEY: Semua bidang kecuali bidang yang ditentukan oleh opsi key.fields diproses sebagai nilai pesan Kafka.
Tabel sumber
Parameter
Deskripsi
Tipe data
Wajib
Nilai default
Keterangan
topic
Nama topik tempat Anda ingin membaca data.
String
Tidak
Tidak ada
Pisahkan beberapa nama topik dengan titik koma (;), seperti topic-1 dan topic-2.
CatatanAnda tidak dapat menggunakan opsi topic bersamaan dengan opsi topic-pattern.
topic-pattern
Ekspresi reguler yang digunakan untuk mencocokkan topik. Data dari semua topik yang namanya cocok dengan ekspresi reguler yang ditentukan akan dibaca saat deployment berjalan.
String
Tidak
Tidak ada
CatatanAnda tidak dapat menggunakan opsi topic bersamaan dengan opsi topic-pattern.
properties.group.id
ID kelompok konsumen.
String
Tidak
KafkaSource-{Nama tabel sumber}
Jika ID grup yang ditentukan digunakan untuk pertama kalinya, Anda harus mengatur properties.auto.offset.reset ke earliest atau latest untuk menentukan offset awal.
scan.startup.mode
Offset awal untuk Kafka membaca data.
String
Tidak
group-offsets
Nilai yang valid:
earliest-offset: Kafka membaca data dari partisi paling awal.latest-offset: Kafka membaca data dari offset terbaru.group-offsets(default): membaca data dari offset yang dikomit oleh kelompok konsumen dengan ID yang ditentukan oleh opsi properties.group.id.timestamp: Membaca data dari timestamp yang ditentukan oleh scan.startup.timestamp-millis.specific-offsets: membaca data dari offset yang ditentukan oleh opsi scan.startup.specific-offsets.
CatatanOpsi ini berlaku saat deployment dimulai tanpa state. Saat deployment dimulai ulang dari checkpoint atau dilanjutkan dari state tertentu, deployment akan memprioritaskan membaca data dari progres yang disimpan dalam data state.
scan.startup.specific-offsets
Offset awal setiap partisi saat opsi scan.startup.mode diatur ke specific-offsets.
String
Tidak
Tidak ada
Contoh:
partition:0,offset:42;partition:1,offset:300.scan.startup.timestamp-millis
Timestamp offset awal saat opsi scan.startup.mode diatur ke timestamp.
Long
Tidak
Tidak ada
Unit: milidetik.
scan.topic-partition-discovery.interval
Interval waktu untuk mendeteksi topik dan partisi Kafka secara dinamis.
Duration
Tidak
5 menit
Interval penemuan partisi default adalah 5 menit. Untuk menonaktifkan fitur ini, Anda harus secara eksplisit mengatur opsi ini ke nilai non-positif. Setelah fitur penemuan partisi dinamis diaktifkan, sumber Kafka dapat secara otomatis menemukan partisi baru dan membaca data dari partisi tersebut. Dalam mode topic-pattern, sumber Kafka membaca data dari partisi baru topik yang ada dan data dari semua partisi topik baru yang cocok dengan ekspresi reguler.
CatatanDalam Realtime Compute for Apache Flink yang menggunakan VVR 6.0.X, fitur penemuan partisi dinamis dinonaktifkan secara default. Dalam Realtime Compute for Apache Flink yang menggunakan VVR 8.0 atau versi lebih baru, fitur ini diaktifkan secara default. Interval penemuan partisi default adalah 5 menit.
scan.header-filter
Penyaringan data Kafka berdasarkan apakah data tersebut mengandung header pesan tertentu.
String
Tidak
Tidak ada
Pisahkan kunci header dan nilai dengan titik dua (:). Pisahkan beberapa header dengan operator logika seperti AND (&) atau OR (|). Operator logika NOT (!) didukung. Misalnya,
depart:toy|depart:book&!env:testmenunjukkan bahwa data Kafka yang headernya mengandung depart=toy atau depart=book dan tidak mengandung env=test akan dipertahankan.CatatanOpsi ini hanya didukung untuk Realtime Compute for Apache Flink yang menggunakan VVR 8.0.6 atau versi lebih baru.
Operasi tanda kurung tidak didukung.
Operasi logika dilakukan dari kiri ke kanan secara berurutan.
Nilai header dalam format UTF-8 dikonversi menjadi string dan dibandingkan dengan nilai header yang ditentukan oleh opsi scan.header-filter.
scan.check.duplicated.group.id
Menentukan apakah akan memeriksa duplikasi kelompok konsumen yang ditentukan oleh parameter
properties.group.id.Boolean
Tidak
false
Nilai yang valid:
true: Memeriksa duplikasi kelompok konsumen sebelum pekerjaan dimulai. Jika terdapat duplikasi kelompok konsumen, melaporkan error dan menangguhkan pekerjaan untuk mencegah konflik.
false: Tidak memeriksa duplikasi kelompok konsumen sebelum pekerjaan dimulai.
CatatanOpsi ini hanya didukung untuk Realtime Compute for Apache Flink yang menggunakan VVR 6.0.4 atau versi lebih baru.
Khusus sink
Parameter
Deskripsi
Tipe data
Wajib
Nilai default
Keterangan
topic
Nama topik tempat data ditulis.
String
Ya
Tidak ada
N/A
sink.partitioner
Pola untuk memetakan konkurensi Flink ke partisi Kafka.
String
Tidak
default
Nilai yang valid:
default (default): Menggunakan partitioner Kafka default untuk mempartisi data.
fixed: Setiap partisi Flink sesuai dengan partisi Kafka tetap.
round-robin: Data dalam partisi Flink didistribusikan ke partisi Kafka secara round-robin.
Pola pemetaan partisi kustom: Anda dapat membuat subclass FlinkKafkaPartitioner untuk mengonfigurasi pola pemetaan partisi kustom, seperti org.mycompany.MyPartitioner.
sink.delivery-guarantee
Pola semantik untuk tabel sink Kafka.
String
Tidak
at-least-once
Nilai yang valid:
none: Jaminan pengiriman tidak dipastikan. Data mungkin hilang atau diduplikasi.
at-least-once (default): Memastikan data tidak hilang. Namun, data mungkin diduplikasi.
exactly-once: Transaksi Kafka digunakan untuk memastikan semantik tepat-sekali. Ini memastikan data tidak hilang atau diduplikasi.
CatatanAnda harus mengonfigurasi opsi sink.transactional-id-prefix jika Anda mengatur opsi ini ke exactly-once.
sink.transactional-id-prefix
Awalan ID transaksi Kafka yang digunakan dalam semantik tepat-sekali.
String
Tidak
Tidak ada
Opsi ini hanya berlaku saat opsi sink.delivery-guarantee diatur ke exactly-once.
sink.parallelism
Konkurensi operator untuk tabel sink Kafka.
Integer
Tidak
Tidak ada
Konkurensi operator hulu, yang ditentukan oleh framework.
Keamanan dan otentikasi
Jika kluster Kafka Anda memerlukan koneksi aman atau otentikasi, tambahkan awalan properties. ke nama opsi keamanan dan otentikasi serta konfigurasikan dalam klausa WITH. Potongan kode berikut menunjukkan cara mengonfigurasi tabel Kafka untuk menggunakan PLAIN sebagai mekanisme Simple Authentication and Security Layer (SASL) dan menyediakan konfigurasi Java Authentication and Authorization Service (JAAS).
CREATE TABLE KafkaTable (
`user_id` BIGINT,
`item_id` BIGINT,
`behavior` STRING,
`ts` TIMESTAMP_LTZ(3) METADATA FROM 'timestamp'
) WITH (
'connector' = 'kafka',
...
'properties.security.protocol' = 'SASL_PLAINTEXT',
'properties.sasl.mechanism' = 'PLAIN',
'properties.sasl.jaas.config' = 'org.apache.flink.kafka.shaded.org.apache.kafka.common.security.plain.PlainLoginModule required username="username" password="password";'
)Potongan kode berikut menunjukkan cara mengonfigurasi tabel Kafka untuk menggunakan SASL_SSL sebagai protokol keamanan dan SCRAM-SHA-256 sebagai mekanisme SASL.
CREATE TABLE KafkaTable (
`user_id` BIGINT,
`item_id` BIGINT,
`behavior` STRING,
`ts` TIMESTAMP_LTZ(3) METADATA FROM 'timestamp'
) WITH (
'connector' = 'kafka',
...
'properties.security.protocol' = 'SASL_SSL',
/*Konfigurasi Secure Sockets Layer (SSL).*/
/*Tentukan jalur truststore sertifikat CA yang disediakan oleh server.*/
/*Artefak yang diunggah disimpan di /flink/usrlib/.*/
'properties.ssl.truststore.location' = '/flink/usrlib/kafka.client.truststore.jks',
'properties.ssl.truststore.password' = 'test1234',
/*Tentukan jalur file keystore kunci privat jika otentikasi klien diperlukan.*/
'properties.ssl.keystore.location' = '/flink/usrlib/kafka.client.keystore.jks',
'properties.ssl.keystore.password' = 'test1234',
/*Algoritma yang digunakan klien untuk memverifikasi alamat server. Nilai null menunjukkan bahwa verifikasi alamat server dinonaktifkan.*/
'properties.ssl.endpoint.identification.algorithm' = '',
/*Konfigurasi SASL.*/
/*Konfigurasi SCRAM-SHA-256 sebagai mekanisme SASL.*/
'properties.sasl.mechanism' = 'SCRAM-SHA-256',
/*Konfigurasi JAAS.*/
'properties.sasl.jaas.config' = 'org.apache.flink.kafka.shaded.org.apache.kafka.common.security.scram.ScramLoginModule required username="username" password="password";'
)Anda dapat menggunakan fitur Artefak di konsol pengembangan untuk mengunggah sertifikat CA dan kunci privat dari contoh. Setelah diunggah, file tersebut disimpan di direktori /flink/usrlib. Jika file sertifikat CA yang ingin Anda gunakan bernama my-truststore.jks, Anda dapat mengatur parameter 'properties.ssl.truststore.location' dalam klausa WITH dengan dua cara berikut untuk menggunakan sertifikat ini:
Jika Anda mengatur
'properties.ssl.truststore.location' = '/flink/usrlib/my-truststore.jks', Flink tidak perlu mengunduh file OSS secara dinamis selama runtime, tetapi mode debug tidak didukung.Untuk versi mesin komputasi waktu nyata Ververica Runtime (VVR) 11.5 dan versi lebih baru, Anda dapat mengonfigurasi
properties.ssl.truststore.locationdanproperties.ssl.keystore.locationdengan jalur mutlak OSS. Format jalur file adalah oss://flink-fullymanaged-<ID ruang kerja>/artifacts/namespaces/<nama proyek>/<nama file>. Metode ini mengunduh file OSS secara dinamis selama runtime Flink dan mendukung mode debug.
Konfirmasi konfigurasi: Potongan kode di atas berlaku untuk sebagian besar skenario konfigurasi. Sebelum mengonfigurasi Kafka connector, hubungi personel O&M server Kafka untuk mendapatkan informasi konfigurasi keamanan dan otentikasi yang benar.
Catatan escape: Berbeda dengan Apache Flink, editor SQL Realtime Compute for Apache Flink secara default melakukan escape tanda kutip ganda (") . Oleh karena itu, Anda tidak perlu menambahkan backslash (\) sebagai karakter escape untuk tanda kutip ganda (") yang digunakan untuk mengapit username dan password saat mengonfigurasi opsi
properties.sasl.jaas.config.
Offset awal untuk tabel sumber Kafka
Mode startup
Anda dapat mengonfigurasi parameter scan.startup.mode untuk menentukan offset baca awal untuk tabel sumber Kafka:
earliest-offset: Membaca data dari offset paling awal partisi saat ini.
latest-offset: Membaca data dari offset terbaru partisi saat ini.
group-offsets: Membaca data dari offset yang dikomit oleh kelompok konsumen dengan ID yang ditentukan oleh opsi properties.group.id.
timestamp: Membaca data dari pesan pertama yang timestamp-nya lebih besar dari atau sama dengan timestamp yang ditentukan oleh scan.startup.timestamp-millis.
specific-offsets: Membaca data dari offset partisi yang ditentukan oleh opsi scan.startup.specific-offsets.
Jika Anda tidak menentukan offset awal, group-offsets digunakan secara default.
scan.startup.mode hanya berlaku untuk pekerjaan tanpa status. Untuk pekerjaan berstatus, konsumsi dimulai dari offset yang disimpan dalam status.
Kode contoh:
CREATE TEMPORARY TABLE kafka_source (
...
) WITH (
'connector' = 'kafka',
...
-- Mengonsumsi data dari offset paling awal.
'scan.startup.mode' = 'earliest-offset',
-- Mengonsumsi data dari offset terbaru.
'scan.startup.mode' = 'latest-offset',
-- Mengonsumsi data dari offset yang dikomit oleh kelompok konsumen my-group.
'properties.group.id' = 'my-group',
'scan.startup.mode' = 'group-offsets',
'properties.auto.offset.reset' = 'earliest', -- Jika my-group digunakan untuk pertama kalinya, konsumsi dimulai dari offset paling awal.
'properties.auto.offset.reset' = 'latest', -- Jika my-group digunakan untuk pertama kalinya, konsumsi dimulai dari offset terbaru.
-- Mengonsumsi data dari timestamp 1655395200000, dalam milidetik.
'scan.startup.mode' = 'timestamp',
'scan.startup.timestamp-millis' = '1655395200000',
-- Mengonsumsi data dari offset yang ditentukan.
'scan.startup.mode' = 'specific-offsets',
'scan.startup.specific-offsets' = 'partition:0,offset:42;partition:1,offset:300'
);Prioritas offset awal
Urutan prioritas untuk offset awal tabel sumber adalah sebagai berikut:
Prioritas (tertinggi ke terendah) | Offset yang disimpan dalam checkpoint atau titik simpan. |
Waktu mulai yang ditentukan di konsol. | |
Offset awal yang ditentukan oleh parameter scan.startup.mode dalam parameter WITH. | |
Jika scan.startup.mode tidak ditentukan, sistem menggunakan group-offsets dan mengonsumsi data dari offset kelompok konsumen yang sesuai. |
Jika offset menjadi tidak valid dalam salah satu langkah di atas karena kedaluwarsa atau masalah di kluster Kafka, kebijakan reset yang ditentukan oleh properties.auto.offset.reset akan digunakan. Jika item konfigurasi ini tidak diatur, akan dilemparkan exception yang memerlukan intervensi pengguna.
Dalam kebanyakan kasus, tabel sumber Kafka mulai membaca data dari offset yang dikomit oleh kelompok konsumen dengan ID grup baru. Saat tabel sumber Kafka menanyakan offset yang dikomit oleh kelompok konsumen di kluster Kafka, tidak ada offset valid yang dikembalikan karena ID grup digunakan untuk pertama kalinya. Dalam kasus ini, strategi reset yang dikonfigurasi oleh parameter properties.auto.offset.reset digunakan untuk mereset offset. Oleh karena itu, Anda harus mengonfigurasi parameter properties.auto.offset.reset untuk menentukan strategi reset offset.
Pengiriman Offset Tabel Sumber
Tabel sumber Kafka hanya mengirimkan offset konsumen ke kluster Kafka setelah operasi checkpoint berhasil. Jika interval checkpoint yang Anda tentukan terlalu besar, offset konsumen akan dikirimkan dengan penundaan ke kluster Kafka. Selama operasi checkpoint, tabel sumber Kafka menyimpan progres pembacaan data saat ini di backend status. Offset yang dikirimkan ke kluster Kafka tidak digunakan untuk pemulihan kesalahan. Offset yang dikirimkan hanya digunakan untuk memantau progres pembacaan data di Kafka. Akurasi data tidak terpengaruh bahkan jika offset gagal dikirimkan.
Partitioner kustom untuk tabel sink
Jika partitioner produsen Kafka bawaan tidak memenuhi kebutuhan Anda, Anda dapat mengimplementasikan partitioner kustom untuk menulis data ke partisi tertentu. Partitioner kustom harus mewarisi FlinkKafkaPartitioner. Setelah pengembangan, kompilasi paket JAR dan gunakan fitur Manajemen File untuk mengunggahnya ke konsol Komputasi Waktu Nyata. Setelah paket JAR diunggah dan direferensikan, atur parameter sink.partitioner dalam klausa WITH. Nilai parameter harus berupa jalur kelas lengkap partitioner, seperti org.mycompany.MyPartitioner.
Perbandingan antara Kafka, Upsert Kafka, dan katalog JSON Kafka
Kafka adalah sistem antrian pesan yang hanya mendukung penyisipan data dan tidak mendukung pembaruan atau penghapusan. Oleh karena itu, Kafka tidak dapat memproses data Change Data Capture (CDC) dari sistem hulu atau logika penarikan dari operator seperti agregat dan join selama komputasi SQL streaming. Jika Anda ingin menulis data yang mengandung data perubahan atau data penarikan ke Kafka, gunakan tabel sink Upsert Kafka, yang melakukan pemrosesan khusus pada data perubahan.
Jika Anda ingin menyinkronkan data perubahan dari satu atau beberapa tabel data di database hulu ke Kafka secara batch, Anda dapat menggunakan katalog JSON Kafka. Jika data yang disimpan di Kafka dalam format JSON, Anda dapat menggunakan katalog JSON Kafka. Hal ini menghilangkan kebutuhan untuk mengonfigurasi skema dan opsi dalam klausa WITH. Untuk informasi selengkapnya, lihat Kelola katalog JSON Kafka.
Contoh
Contoh 1: Baca data dari topik Kafka dan tulis data tersebut ke topik Kafka lain
Kode contoh berikut membaca data dari topik Kafka sumber dan menulisnya ke topik Kafka sink. Data dalam format CSV.
CREATE TEMPORARY TABLE kafka_source (
id INT,
name STRING,
age INT
) WITH (
'connector' = 'kafka',
'topic' = 'source',
'properties.bootstrap.servers' = '<yourKafkaBrokers>',
'properties.group.id' = '<yourKafkaConsumerGroupId>',
'format' = 'csv'
);
CREATE TEMPORARY TABLE kafka_sink (
id INT,
name STRING,
age INT
) WITH (
'connector' = 'kafka',
'topic' = 'sink',
'properties.bootstrap.servers' = '<yourKafkaBrokers>',
'properties.group.id' = '<yourKafkaConsumerGroupId>',
'format' = 'csv'
);
INSERT INTO kafka_sink SELECT id, name, age FROM kafka_source;Contoh 2: Sinkronisasi skema dan data tabel
Gunakan Kafka connector untuk menyinkronkan pesan dari topik Kafka ke Hologres secara real-time. Dengan mengonfigurasi offset dan ID partisi pesan Kafka sebagai primary key, Anda menghindari pesan duplikat di Hologres jika terjadi failover.
CREATE TEMPORARY TABLE kafkaTable (
`offset` INT NOT NULL METADATA,
`part` BIGINT NOT NULL METADATA FROM 'partition',
PRIMARY KEY (`part`, `offset`) NOT ENFORCED
) WITH (
'connector' = 'kafka',
'properties.bootstrap.servers' = '<yourKafkaBrokers>',
'topic' = 'kafka_evolution_demo',
'scan.startup.mode' = 'earliest-offset',
'format' = 'json',
'json.infer-schema.flatten-nested-columns.enable' = 'true'
-- Opsional. Perluas semua kolom bersarang.
);
CREATE TABLE IF NOT EXISTS hologres.kafka.`sync_kafka`
WITH (
'connector' = 'hologres'
) AS TABLE vvp.`default`.kafkaTable;Contoh 3: Sinkronisasi skema dan data dalam kolom kunci dan nilai pesan Kafka
Bidang kunci pesan Kafka menyimpan informasi relevan. Anda dapat menyinkronkan data dalam kolom kunci dan nilai pesan Kafka secara bersamaan.
CREATE TEMPORARY TABLE kafkaTable (
`key_id` INT NOT NULL,
`val_name` VARCHAR(200)
) WITH (
'connector' = 'kafka',
'properties.bootstrap.servers' = '<yourKafkaBrokers>',
'topic' = 'kafka_evolution_demo',
'scan.startup.mode' = 'earliest-offset',
'key.format' = 'json',
'value.format' = 'json',
'key.fields' = 'key_id',
'key.fields-prefix' = 'key_',
'value.fields-prefix' = 'val_',
'value.fields-include' = 'EXCEPT_KEY'
);
CREATE TABLE IF NOT EXISTS hologres.kafka.`sync_kafka`(
WITH (
'connector' = 'hologres'
) AS TABLE vvp.`default`.kafkaTable;Kunci pesan Kafka tidak mendukung evolusi skema dan penguraian tipe. Deklarasi manual diperlukan.
Contoh 4: Sinkronisasi skema dan data serta lakukan komputasi
Saat Anda menyinkronkan data dari Kafka ke Hologres, diperlukan perhitungan ringan.
CREATE TEMPORARY TABLE kafkaTable (
`distinct_id` INT NOT NULL,
`properties` STRING,
`timestamp` TIMESTAMP_LTZ METADATA,
`date` AS CAST(`timestamp` AS DATE)
) WITH (
'connector' = 'kafka',
'properties.bootstrap.servers' = '<yourKafkaBrokers>',
'topic' = 'kafka_evolution_demo',
'scan.startup.mode' = 'earliest-offset',
'key.format' = 'json',
'value.format' = 'json',
'key.fields' = 'key_id',
'key.fields-prefix' = 'key_'
);
CREATE TABLE IF NOT EXISTS hologres.kafka.`sync_kafka` WITH (
'connector' = 'hologres'
) AS TABLE vvp.`default`.kafkaTable
ADD COLUMN
`order_id` AS COALESCE(JSON_VALUE(`properties`, '$.order_id'), 'default');
-- Gunakan COALESCE untuk menangani nilai null.Contoh 5: Uraikan data JSON bersarang
Pesan JSON contoh
{
"id": 101,
"name": "VVP",
"properties": {
"owner": "Alibaba Cloud",
"engine": "Flink"
}
}Untuk menghindari penggunaan fungsi seperti JSON_VALUE(payload, '$.properties.owner') untuk mengurai bidang, Anda dapat langsung mendefinisikan struktur dalam DDL Sumber:
CREATE TEMPORARY TABLE kafka_source (
id VARCHAR,
`name` VARCHAR,
properties ROW<`owner` STRING, engine STRING>
) WITH (
'connector' = 'kafka',
'topic' = 'xxx',
'properties.bootstrap.servers' = 'xxx',
'scan.startup.mode' = 'earliest-offset',
'format' = 'json'
);Akibatnya, Flink mengurai JSON menjadi bidang terstruktur selama fase baca, dan query SQL selanjutnya langsung menggunakan properties.owner, tanpa memerlukan pemanggilan fungsi tambahan, meningkatkan kinerja keseluruhan.
DataStream API
Jika Anda ingin menggunakan DataStream API untuk membaca atau menulis data, Anda harus menggunakan konektor DataStream dari tipe terkait untuk terhubung ke Realtime Compute for Apache Flink. Untuk informasi selengkapnya tentang cara mengonfigurasi konektor DataStream, lihat Cara menggunakan konektor DataStream.
Buat sumber Kafka
Sumber Kafka menyediakan kelas builder untuk membuat instance KafkaSource. Kode contoh berikut menunjukkan cara membuat sumber Kafka untuk mengonsumsi pesan dari offset paling awal topik "input-topic", dengan kelompok konsumen bernama my-group, dan mendeserialisasi isi pesan Kafka sebagai string.
Java
KafkaSource<String> source = KafkaSource.<String>builder() .setBootstrapServers(brokers) .setTopics("input-topic") .setGroupId("my-group") .setStartingOffsets(OffsetsInitializer.earliest()) .setValueOnlyDeserializer(new SimpleStringSchema()) .build(); env.fromSource(source, WatermarkStrategy.noWatermarks(), "Kafka Source");Saat membuat KafkaSource, Anda harus menentukan parameter berikut.
Parameter
Deskripsi
BootstrapServers
Alamat broker Kafka. Anda dapat memanggil operasi setBootstrapServers(String) untuk mengonfigurasi alamat.
GroupId
ID kelompok konsumen. Anda dapat memanggil metode setGroupId(String) untuk mengonfigurasi ID.
Topik atau Partisi
Topik atau nama partisi tempat Anda berlangganan. Anda dapat mengonfigurasi sumber Kafka untuk berlangganan ke topik atau partisi menggunakan salah satu pola langganan berikut:
Daftar topik. Setelah Anda mengonfigurasi daftar topik, sumber Kafka berlangganan ke semua partisi dari topik yang ditentukan.
KafkaSource.builder().setTopics("topic-a","topic-b")Pola topik. Setelah Anda menentukan ekspresi reguler, sumber Kafka berlangganan ke semua partisi dari topik yang cocok dengan ekspresi reguler yang ditentukan.
KafkaSource.builder().setTopicPattern("topic.*")Daftar partisi. Setelah Anda mengonfigurasi daftar partisi, sumber Kafka berlangganan ke partisi yang ditentukan.
final HashSet<TopicPartition> partitionSet = new HashSet<>(Arrays.asList( new TopicPartition("topic-a", 0), // Partisi 0 dari topik "topic-a" new TopicPartition("topic-b", 5))); // Partisi 5 dari topik "topic-b" KafkaSource.builder().setPartitions(partitionSet)
Deserializer
Deserializer yang mendeserialisasi pesan Kafka.
Anda dapat memanggil metode setDeserializer(KafkaRecordDeserializationSchema) untuk menentukan deserializer. Antarmuka KafkaRecordDeserializationSchema mendefinisikan cara objek ConsumerRecord dideserialisasi. Anda dapat menggunakan salah satu metode berikut untuk hanya mendeserialisasi bidang Value dalam pesan Kafka objek ConsumerRecord:
Sumber Kafka menyediakan metode setValueOnlyDeserializer(DeserializationSchema). Kelas DeserializationSchema mendefinisikan cara pesan Kafka yang disimpan sebagai nilai biner dideserialisasi.
Gunakan kelas yang mengimplementasikan Antarmuka Deserializer Kafka. Misalnya, Anda dapat menggunakan kelas StringDeserializer untuk mendeserialisasi pesan menjadi string.
import org.apache.kafka.common.serialization.StringDeserializer; KafkaSource.<String>builder() .setDeserializer(KafkaRecordDeserializationSchema.valueOnly(StringDeserializer.class));
CatatanJika Anda ingin mendeserialisasi objek ConsumerRecord, Anda harus membuat kelas yang mengimplementasikan antarmuka KafkaRecordDeserializationSchema.
XML
Konektor DataStream Kafka disimpan di repositori pusat Maven.
<dependency> <groupId>com.alibaba.ververica</groupId> <artifactId>ververica-connector-kafka</artifactId> <version>${vvr-version}</version> </dependency>Saat menggunakan konektor DataStream Kafka, Anda harus memahami properti Kafka berikut:
Offset awal
Anda dapat menggunakan inisialisasi offset untuk menentukan offset untuk sumber Kafka saat sumber Kafka mulai membaca data. Inisialisasi offset adalah objek yang mengimplementasikan antarmuka OffsetsInitializer. Kelas KafkaSource menyediakan inisialisasi offset bawaan berikut.
Inisialisasi offset
Pengaturan Kode
Membaca data dari offset paling awal.
KafkaSource.builder().setStartingOffsets(OffsetsInitializer.earliest())Membaca data dari offset terbaru.
KafkaSource.builder().setStartingOffsets(OffsetsInitializer.latest())Memulai mengonsumsi data dengan timestamp lebih besar dari atau sama dengan waktu yang ditentukan (dalam milidetik).
KafkaSource.builder().setStartingOffsets(OffsetsInitializer.timestamp(1592323200000L))Mengonsumsi dari offset yang dikomit oleh kelompok konsumen. Jika tidak ada offset tersebut, gunakan offset paling awal.
KafkaSource.builder().setStartingOffsets(OffsetsInitializer.committedOffsets(OffsetResetStrategy.EARLIEST))Membaca data dari offset yang dikomit setiap partisi dan tidak ada strategi reset yang ditentukan.
KafkaSource.builder().setStartingOffsets(OffsetsInitializer.committedOffsets())CatatanJika inisialisasi offset bawaan tidak memenuhi kebutuhan bisnis Anda, Anda dapat membuat inisialisasi offset kustom.
Jika Anda tidak menentukan inisialisasi offset, inisialisasi offset OffsetsInitializer.earliest() digunakan secara default.
Mode eksekusi streaming dan mode eksekusi batch
Sumber Kafka dapat beroperasi dalam mode streaming atau mode batch. Secara default, sumber Kafka beroperasi dalam mode streaming. Dalam mode ini, deployment terus berjalan hingga deployment gagal atau dibatalkan. Jika Anda ingin sumber Kafka beroperasi dalam mode batch, Anda dapat memanggil metode setBounded(OffsetsInitializer) untuk menentukan offset berhenti. Saat semua partisi mencapai offset berhentinya, sumber Kafka keluar.
CatatanDalam kebanyakan kasus, sumber Kafka yang beroperasi dalam mode streaming tidak memiliki offset berhenti. Jika Anda ingin men-debug sumber Kafka yang beroperasi dalam mode streaming, Anda dapat memanggil metode setUnbounded(OffsetsInitializer) untuk menentukan offset berhenti. Metode yang Anda gunakan untuk menentukan offset berhenti bervariasi tergantung pada apakah Anda menggunakan mode streaming atau mode batch.
Penemuan partisi dinamis
Jika Anda ingin deployment yang sedang berjalan memproses data dari topik baru dan partisi baru yang cocok dengan pola langganan Anda tanpa me-restart deployment, Anda dapat mengaktifkan fitur penemuan partisi dinamis pada sumber Kafka. Dalam konektor DataStream, fitur ini dinonaktifkan secara default dan harus diaktifkan secara manual:
KafkaSource.builder() .setProperty("partition.discovery.interval.ms", "10000") // Temukan partisi baru setiap 10 detik.PentingFitur penemuan partisi dinamis bergantung pada mekanisme pembaruan metadata kluster Kafka. Jika kluster Kafka tidak segera memperbarui informasi partisi, partisi baru mungkin tidak ditemukan. Pastikan konfigurasi partition.discovery.interval.ms kluster Kafka sesuai dengan situasi aktual.
Waktu event dan watermark
Secara default, sumber Kafka menggunakan timestamp yang dilampirkan pada catatan sebagai waktu event untuk catatan tersebut. Anda dapat mendefinisikan strategi watermark berdasarkan waktu event setiap catatan dan mengirimkan watermark ke layanan hilir.
env.fromSource(kafkaSource, new CustomWatermarkStrategy(), "Kafka Source With Custom Watermark Strategy")Untuk informasi selengkapnya tentang cara mendefinisikan strategi watermark kustom, lihat Generating Watermarks.
CatatanJika beberapa subtugas sumber tetap menganggur untuk jangka waktu yang lama, seperti partisi Kafka yang tidak menerima pesan baru atau konkurensi sumber melebihi jumlah partisi Kafka, pembuatan watermark mungkin gagal. Dalam kasus ini, komputasi window tidak dapat dipicu, dan pemrosesan data akan berhenti.
Solusinya adalah sebagai berikut:
Konfigurasi mekanisme timeout watermark: Aktifkan parameter table.exec.source.idle-timeout untuk memaksa sistem menghasilkan watermark setelah periode timeout yang ditentukan, memastikan progres epoch komputasi window.
Optimalkan sumber data: Atur konkurensi sumber agar sama dengan atau kurang dari jumlah partisi Kafka.
Consumer Offset Commit
Saat checkpoint dihasilkan, sumber Kafka mengirimkan offset konsumen Kafka setiap partisi ke broker Kafka. Hal ini memastikan bahwa offset konsumen Kafka yang dicatat di broker Kafka konsisten dengan status checkpoint. Konsumen Kafka dapat secara otomatis mengirimkan offset pada setiap partisi ke broker Kafka secara berkala. Anda dapat mengonfigurasi fitur pengiriman offset otomatis menggunakan opsi enable.auto.commit dan auto.commit.interval.ms. Jika Anda menonaktifkan fitur checkpointing, sumber Kafka mengandalkan konsumen Kafka untuk mengirimkan offset ke broker Kafka.
CatatanSumber Kafka tidak menggunakan offset yang dikirimkan yang dicatat di broker Kafka untuk toleransi kesalahan. Saat Anda mengirimkan offset, broker Kafka dapat memantau progres konsumsi catatan pada setiap partisi.
Properti tambahan
Anda dapat memanggil metode setProperties(Properties) dan setProperty(String, String) untuk mengonfigurasi properti tambahan untuk sumber Kafka dan konsumen Kafka. Tabel berikut menjelaskan properti sumber Kafka.
Item konfigurasi
Deskripsi
client.id.prefix
Menentukan awalan untuk ID klien konsumen Kafka.
partition.discovery.interval.ms
Menentukan interval waktu saat sumber Kafka memeriksa partisi baru.
CatatanProperti partition.discovery.interval.ms ditimpa menjadi -1 dalam mode batch.
register.consumer.metrics
Menentukan apakah akan mendaftarkan metrik untuk konsumen Kafka di Realtime Compute for Apache Flink.
Konfigurasi Konsumen Kafka lainnya
Untuk informasi selengkapnya tentang properti konsumen Kafka, lihat Apache Kafka.
PentingKonektor DataStream Kafka menimpa nilai properti berikut:
key.deserializer: Nilai properti ini diatur ke ByteArrayDeserializer.
value.deserializer: Nilai properti ini diatur ke ByteArrayDeserializer.
auto.offset.reset.strategy: Nilai properti ini diatur ke OffsetsInitializer#getAutoOffsetResetStrategy().
Kode contoh berikut menunjukkan cara konsumen Kafka terhubung ke kluster Kafka menggunakan konfigurasi JAAS dan mekanisme otentikasi SASL/PLAIN.
KafkaSource.builder() .setProperty("sasl.mechanism", "PLAIN") .setProperty("sasl.jaas.config", "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"username\" password=\"password\";")Pemantauan
Sumber Kafka mendaftarkan metrik di Realtime Compute for Apache Flink untuk pemantauan dan diagnosis.
Cakupan metrik
Semua metrik sumber Kafka didaftarkan di bawah grup metrik KafkaSourceReader. KafkaSourceReader adalah subgrup dari grup metrik operator. Metrik untuk partisi tertentu didaftarkan di grup metrik KafkaSourceReader.topic.<nama_topik>.partition.<id_partisi>.
Misalnya, topik bernama my-topic dan partisi topik bernama 1. Offset konsumen partisi dilaporkan oleh metrik <some_parent_groups>.operator.KafkaSourceReader.topic.my-topic.partition.1.currentOffset. Jumlah pengiriman offset konsumen yang berhasil diukur oleh metrik <some_parent_groups>.operator.KafkaSourceReader.commitsSucceeded.
Metrik
Metrik
Deskripsi
Cakupan
currentOffset
Offset Konsumen Saat Ini
TopicPartition
committedOffset
Offset komit saat ini
TopicPartition
commitsSucceeded
Jumlah pengiriman yang berhasil
KafkaSourceReader
commitsFailed
Jumlah Pengiriman Gagal
KafkaSourceReader
Metrik Konsumen Kafka
Metrik untuk konsumen Kafka didaftarkan di grup metrik KafkaSourceReader.KafkaConsumer. Misalnya, metrik records-consumed-total didaftarkan di <some_parent_groups>.operator.KafkaSourceReader.KafkaConsumer.records-consumed-total.
Anda dapat mengonfigurasi opsi register.consumer.metrics untuk menentukan apakah akan mendaftarkan metrik untuk konsumen Kafka. Secara default, opsi register.consumer.metrics diatur ke true. Untuk informasi selengkapnya tentang metrik untuk konsumen Kafka, lihat Apache Kafka.
Buat sink Kafka
Sink Kafka dapat menulis data dari beberapa stream ke satu atau beberapa topik Kafka.
DataStream<String> stream = ... Properties properties = new Properties(); properties.setProperty("bootstrap.servers", ); KafkaSink<String> kafkaSink = KafkaSink.<String>builder() .setKafkaProducerConfig(kafkaProperties) // // konfigurasi produsen .setRecordSerializer( KafkaRecordSerializationSchema.builder() .setTopic("my-topic") // topik tujuan .setKafkaValueSerializer(StringSerializer.class) // skema serialisasi .build()) .setDeliveryGuarantee(DeliveryGuarantee.EXACTLY_ONCE) // toleransi kesalahan .build(); stream.sinkTo(kafkaSink);Anda harus mengonfigurasi parameter berikut.
Parameter
Deskripsi
Topik
Nama topik tempat data ditulis.
Serialisasi data
Saat Anda membuat sink Kafka, Anda harus menyediakan
KafkaRecordSerializationSchemauntuk mengonversi data input menjadi objekProducerRecordKafka. Flink menyediakan builder skema yang menawarkan komponen umum, seperti serialisasi kunci dan nilai pesan, pemilihan topik, dan partisi pesan. Anda juga dapat mengimplementasikan antarmuka yang sesuai untuk kontrol lanjutan. Sink Kafka memanggil metode ProducerRecord<byte[], byte[]> serialize(T element, KafkaSinkContext context, Long timestamp) untuk setiap catatan masuk untuk menghasilkan objek ProducerRecord yang merepresentasikan catatan yang telah diserialisasi. Kemudian, sink Kafka menulis objek ProducerRecord ke topik yang diperlukan.Anda dapat mengontrol cara setiap catatan ditulis ke Kafka secara detail. Menggunakan ProducerRecord, Anda dapat melakukan tindakan berikut:
Menyetel nama topik tujuan.
Menentukan kunci pesan.
Menentukan partisi tujuan.
Properti klien Kafka
Properti bootstrap.servers wajib diisi. Tentukan daftar alamat broker Kafka yang dipisahkan koma.
Semantik toleransi kesalahan
Setelah Anda mengaktifkan fitur checkpointing, sink Kafka dapat memastikan pengiriman tepat-sekali. Anda juga dapat mengonfigurasi parameter DeliveryGuarantee untuk menentukan semantik toleransi kesalahan yang berbeda. Detail tentang parameter DeliveryGuarantee adalah sebagai berikut:
DeliveryGuarantee.NONE: Tidak ada jaminan pengiriman yang disediakan oleh Flink. Data mungkin hilang atau diduplikasi.
DeliveryGuarantee.AT_LEAST_ONCE: Sink Kafka memastikan data tidak hilang. Namun, data mungkin diduplikasi.
DeliveryGuarantee.EXACTLY_ONCE: Sink Kafka memastikan data tidak hilang atau diduplikasi. Mekanisme transaksi Kafka digunakan untuk memastikan pengiriman tepat-sekali.
CatatanUntuk informasi selengkapnya tentang semantik tepat-sekali, lihat Catatan penggunaan Semantic.EXACTLY_ONCE.
Ingesti Data
Anda dapat menggunakan Kafka connector dalam pekerjaan ingesti data berbasis YAML sebagai sumber atau sink.
Batasan
Kami menyarankan Anda menggunakan Kafka sebagai sumber data sinkron untuk ingesti data Flink CDC di Realtime Compute for Apache Flink yang menggunakan VVR 11.1 atau versi lebih baru.
Hanya format JSON, Debezium JSON, dan Canal JSON yang didukung. Format data lain tidak didukung.
Untuk sumber, data dalam tabel yang sama dapat didistribusikan ke beberapa partisi hanya di Realtime Compute for Apache Flink yang menggunakan VVR 8.0.11 atau versi lebih baru.
Sintaksis
source:
type: kafka
name: Kafka source
properties.bootstrap.servers: localhost:9092
topic: ${kafka.topic}sink:
type: kafka
name: Kafka Sink
properties.bootstrap.servers: localhost:9092Opsi konfigurasi
Umum
Parameter
Deskripsi
Wajib
Tipe data
Nilai default
Keterangan
type
Tipe sumber atau sink.
Ya
String
Tidak ada
Atur nilainya ke kafka.
name
Nama sumber atau sink.
Tidak
String
Tidak ada nilai default.
N/A
properties.bootstrap.servers
Alamat IP dan nomor port broker Kafka.
Ya
String
Tidak ada
Format:
host:port,host:port,host:port. Pisahkan beberapa pasangan host:port dengan koma (,).properties.*
Opsi yang dikonfigurasi untuk klien Kafka.
Tidak
String
Tidak ada
Akhiran harus berupa konfigurasi produsen atau konsumen yang ditentukan dalam dokumentasi resmi Kafka.
Flink menghapus awalan properties. dan meneruskan kunci dan nilai yang diubah ke klien Kafka. Misalnya, Anda dapat mengatur
properties.allow.auto.create.topicske false untuk menonaktifkan pembuatan topik otomatis.key.format
Format yang digunakan untuk membaca atau menulis bidang kunci pesan Kafka.
Tidak
String
Tidak ada
Untuk sumber, hanya json yang didukung.
Untuk sink, nilai yang valid adalah:
csv
json
CatatanOpsi ini hanya didukung untuk Realtime Compute for Apache Flink yang menggunakan VVR 11.0.0 atau versi lebih baru.
value.format
Format yang digunakan untuk membaca atau menulis bidang nilai pesan Kafka.
Tidak
String
debezium-json
Nilai yang valid:
debezium-json
canal-json
json
CatatanFormat debezium-json dan canal-json hanya didukung untuk Realtime Compute for Apache Flink yang menggunakan VVR 8.0.10 atau versi lebih baru.
Format json hanya didukung untuk Realtime Compute for Apache Flink yang menggunakan VVR 11.0.0 atau versi lebih baru.
tabel sumber
Parameter
Deskripsi
Wajib
Tipe data
Nilai default
Keterangan
topic
Nama topik tempat Anda ingin membaca data.
Tidak
String
Tidak ada nilai default.
Pisahkan beberapa nama topik dengan titik koma (;), seperti topic-1 dan topic-2.
CatatanAnda tidak dapat menggunakan opsi topic bersamaan dengan opsi topic-pattern.
topic-pattern
Ekspresi reguler yang digunakan untuk mencocokkan topik. Data dari semua topik yang namanya cocok dengan ekspresi reguler yang ditentukan akan dibaca saat pekerjaan berjalan.
Tidak
String
Tidak ada nilai default.
CatatanAnda tidak dapat menggunakan opsi topic bersamaan dengan opsi topic-pattern.
properties.group.id
ID kelompok konsumen.
Tidak
String
Tidak ada
Jika ID grup yang ditentukan digunakan untuk pertama kalinya, Anda harus mengatur properties.auto.offset.reset ke earliest atau latest untuk menentukan offset awal.
scan.startup.mode
Offset awal untuk Kafka membaca data.
Tidak
String
group-offsets
Nilai yang valid:
earliest-offset: Membaca data dari partisi paling awal.
latest-offset: Membaca data dari offset terbaru.
group-offsets (default): Mulai membaca dari offset yang dikomit untuk grup yang ditentukan di properties.group.id.
timestamp: Membaca dari timestamp yang ditentukan oleh scan.startup.timestamp-millis.
specific-offsets: Baca dari offset yang ditentukan oleh scan.startup.specific-offsets.
CatatanOpsi ini berlaku saat pekerjaan dimulai tanpa state. Saat pekerjaan dimulai ulang dari checkpoint atau dilanjutkan dari state tertentu, pekerjaan akan memprioritaskan membaca data dari progres yang disimpan dalam data state.
scan.startup.specific-offsets
Offset awal setiap partisi saat opsi scan.startup.mode diatur ke specific-offsets.
Tidak
String
Tidak ada
Misalnya
partition:0,offset:42;partition:1,offset:300scan.startup.timestamp-millis
Timestamp offset awal saat opsi scan.startup.mode diatur ke timestamp.
Tidak
Long
Tidak ada
Unit: milidetik.
scan.topic-partition-discovery.interval
Interval waktu untuk mendeteksi topik dan partisi Kafka secara dinamis.
Tidak
Duration
5 menit
Interval penemuan partisi default adalah 5 menit. Untuk menonaktifkan fitur ini, Anda harus secara eksplisit mengatur opsi ini ke nilai non-positif. Setelah fitur penemuan partisi dinamis diaktifkan, sumber Kafka dapat secara otomatis menemukan partisi baru dan membaca data dari partisi tersebut. Dalam mode topic-pattern, sumber Kafka membaca data dari partisi baru topik yang ada dan data dari semua partisi topik baru yang cocok dengan ekspresi reguler.
scan.check.duplicated.group.id
Apakah akan memeriksa duplikasi kelompok konsumen yang ditentukan oleh
properties.group.id.Tidak
Boolean
false
Nilai yang valid:
true: Memeriksa duplikasi kelompok konsumen sebelum pekerjaan dimulai. Jika terdapat duplikasi kelompok konsumen, melaporkan error dan menangguhkan pekerjaan untuk mencegah konflik.
false: Tidak memeriksa duplikasi kelompok konsumen sebelum pekerjaan dimulai.
schema.inference.strategy
Strategi inferensi skema.
Tidak
String
continuous
Nilai yang valid:
continuous: Menyimpulkan skema untuk setiap catatan. Jika skema tidak kompatibel, menyimpulkan skema yang lebih luas dan menghasilkan event perubahan skema.
static: Melakukan inferensi skema hanya sekali saat pekerjaan dimulai. Catatan selanjutnya diurai berdasarkan skema awal. Event perubahan skema tidak dihasilkan.
CatatanUntuk detail tentang penguraian skema, lihat Kebijakan penguraian skema tabel dan sinkronisasi perubahan.
Opsi ini hanya didukung untuk Realtime Compute for Apache Flink yang menggunakan VVR 8.0.11 atau versi lebih baru.
scan.max.pre.fetch.records
Jumlah maksimum pesan yang sistem coba konsumsi dan uraikan dalam partisi selama inferensi skema awal.
Tidak
Int
50
Sebelum pekerjaan membaca dan memproses data, sistem mencoba mengonsumsi jumlah tertentu pesan terbaru di muka dalam partisi untuk menginisialisasi informasi skema.
key.fields-prefix
Awalan yang ditambahkan ke bidang yang diurai dari bidang kunci dalam pesan Kafka. Konfigurasikan opsi ini untuk mencegah konflik penamaan setelah bidang kunci dalam pesan Kafka diurai.
Tidak
String
Tidak ada
Misalnya, jika opsi ini diatur ke key_, dan bidang kunci berisi bidang bernama a, maka nama bidang setelah penguraian adalah key_a.
CatatanNilai opsi key.fields-prefix tidak boleh menjadi awalan dari opsi value.fields-prefix.
value.fields-prefix
Awalan yang ditambahkan ke bidang yang diurai dari bidang nilai dalam pesan Kafka. Anda dapat mengonfigurasi opsi ini untuk mencegah konflik penamaan setelah bidang nilai dalam pesan Kafka diurai.
Tidak
String
Tidak ada
Misalnya, jika opsi ini diatur ke value_, dan bidang nilai berisi bidang bernama b, maka nama bidang setelah penguraian adalah value_b.
CatatanNilai opsi value.fields-prefix tidak boleh menjadi awalan dari opsi key.fields-prefix.
metadata.list
Kolom metadata untuk diteruskan ke penyimpanan hilir.
Tidak
String
Tidak ada
Kolom metadata yang tersedia meliputi
topic,partition,offset,timestamp,timestamp-type,headers,leader-epoch,__raw_key__, dan__raw_value__, dipisahkan koma.scan.value.initial-schemas.ddls
Tentukan skema awal untuk tabel tertentu menggunakan pernyataan DDL.
Tidak
String
Tidak ada
Beberapa pernyataan DDL dihubungkan dengan titik koma Inggris (
;). Misalnya, gunakanCREATE TABLE db1.t1 (id BIGINT, name VARCHAR(10)); CREATE TABLE db1.t2 (id BIGINT);untuk menentukan skema awal untuk tabel db1.t1 dan db1.t2.Struktur tabel DDL di sini harus sesuai dengan tabel target dan mematuhi aturan sintaksis SQL Flink.
CatatanVersi VVR 11.5 dan versi lebih baru mendukung konfigurasi ini.
ingestion.ignore-errors
Menentukan apakah akan mengabaikan error selama penguraian data.
Tidak
Boolean
false
CatatanKonfigurasi ini didukung di VVR 11.5 dan versi lebih baru.
ingestion.error-tolerance.max-count
Jumlah error penguraian setelah pekerjaan gagal, jika error diabaikan selama penguraian data.
Tidak
Integer
-1
Opsi ini hanya berlaku saat ingestion.ignore-errors diaktifkan. Nilai default -1 berarti bahwa exception penguraian tidak memicu kegagalan pekerjaan.
CatatanVerverica Runtime (VVR) versi 11.5 atau versi lebih baru mendukung konfigurasi ini.
Tabel sumber dalam format Debezium JSON
Parameter
Wajib
Tipe data
Nilai default
Deskripsi
debezium-json.distributed-tables
Tidak
Boolean
false
Jika data tabel tunggal dalam Debezium JSON muncul di beberapa partisi, Anda harus mengaktifkan opsi ini.
CatatanOpsi ini hanya didukung untuk Realtime Compute for Apache Flink yang menggunakan VVR 8.0.11 atau versi lebih baru.
PentingSetelah Anda mengonfigurasi opsi ini, Anda harus memulai deployment tanpa state.
debezium-json.schema-include
Tidak
Boolean
false
Saat mengonfigurasi Debezium Kafka Connect, Anda dapat mengaktifkan konfigurasi Kafka value.converter.schemas.enable untuk menyertakan informasi skema dalam pesan. Opsi ini menentukan apakah pesan Debezium JSON menyertakan informasi skema.
Nilai yang valid:
true: Pesan Debezium JSON menyertakan informasi skema.
false: Pesan Debezium JSON tidak menyertakan informasi skema.
debezium-json.ignore-parse-errors
Tidak
Boolean
false
Nilai yang valid:
true: Melewati baris saat ini jika terjadi exception penguraian.
false (default): Mengembalikan error dan deployment gagal dimulai.
debezium-json.infer-schema.primitive-as-string
Tidak
Boolean
false
Menentukan apakah akan menginterpretasikan semua tipe data sebagai STRING saat mengurai skema tabel.
Nilai yang valid:
true: Menginterpretasikan semua tipe dasar sebagai STRING.
Saat false (default), penguraian mengikuti aturan dasar.
Format JSON Canal untuk Tabel Sumber
parameter
Wajib
Tipe data
Nilai default
Deskripsi
canal-json.distributed-tables
Tidak
Boolean
false
Jika data tabel tunggal dalam Canal JSON muncul di beberapa partisi, Anda harus mengaktifkan opsi ini.
CatatanOpsi ini hanya didukung untuk Realtime Compute for Apache Flink yang menggunakan VVR 8.0.11 atau versi lebih baru.
PentingSetelah Anda mengonfigurasi opsi ini, Anda harus memulai deployment tanpa state.
canal-json.database.include
Tidak
String
Tidak ada
Ekspresi reguler opsional yang cocok dengan bidang metadata database dalam catatan Canal. Hanya changelog dari database yang ditentukan yang dibaca. String ekspresi reguler kompatibel dengan Pattern Java.
canal-json.table.include
Tidak
String
Tidak ada
Ekspresi reguler opsional yang cocok dengan bidang metadata tabel dalam catatan Canal. Hanya catatan changelog dari tabel yang ditentukan yang dibaca. Ekspresi reguler kompatibel dengan Pattern Java.
canal-json.ignore-parse-errors
Tidak
Boolean
false
Nilai yang valid:
true: Melewati baris saat ini jika terjadi exception penguraian.
false (default): Mengembalikan error dan deployment gagal dimulai.
canal-json.infer-schema.primitive-as-string
Tidak
Boolean
false
Menentukan apakah akan menginterpretasikan semua tipe data sebagai STRING saat mengurai skema tabel.
Nilai yang valid:
true: Menginterpretasikan semua tipe dasar sebagai STRING.
false (default): Parser mengikuti aturan dasar.
canal-json.infer-schema.strategy
Tidak
String
AUTO
Strategi inferensi skema.
Nilai yang valid:
AUTO (default): Secara otomatis menyimpulkan skema dengan mengurai data JSON. Gunakan AUTO jika data Anda tidak mengandung bidang sqlType untuk menghindari kegagalan penguraian.
SQL_TYPE: Menyimpulkan skema menggunakan array sqlType dalam data Canal JSON. Jika data Anda mengandung bidang sqlType, kami sarankan mengatur canal-json.infer-schema.strategy ke SQL_TYPE untuk inferensi tipe yang lebih akurat.
MYSQL_TYPE: Menyimpulkan skema menggunakan array mysqlType dalam data Canal JSON.
Jika data Canal JSON Anda di Kafka mengandung bidang sqlType dan Anda memerlukan pemetaan tipe yang lebih akurat, atur canal-json.infer-schema.strategy ke SQL_TYPE.
Untuk informasi selengkapnya tentang aturan pemetaan sqlType, lihat Penguraian skema Canal JSON.
CatatanVerverica Runtime (VVR) 11.1 dan versi lebih baru mendukung konfigurasi ini.
MYSQL_TYPE hanya didukung untuk Realtime Compute for Apache Flink yang menggunakan VVR 11.3 atau versi lebih baru.
canal-json.mysql.treat-mysql-timestamp-as-datetime-enabled
Tidak
Boolean
true
Menentukan apakah akan memetakan TIMESTAMP MySQL ke TIMESTAMP CDC:
true (default): Memetakan TIMESTAMP MySQL ke TIMESTAMP CDC.
false: Memetakan TIMESTAMP MySQL ke TIMESTAMP_LTZ CDC.
canal-json.mysql.treat-tinyint1-as-boolean.enabled
Tidak
Boolean
true
Saat MYSQL_TYPE digunakan untuk penguraian, menentukan apakah akan memetakan TINYINT(1) MySQL ke BOOLEAN CDC:
true (default): Memetakan TINYINT(1) MySQL ke BOOLEAN CDC.
false: Memetakan TINYINT(1) MySQL ke TINYINT(1) CDC.
Opsi ini hanya berlaku saat canal-json.infer-schema.strategy diatur ke MYSQL_TYPE.
Format JSON tabel sumber
Parameter
Wajib
Tipe data
Nilai default
Deskripsi
json.timestamp-format.standard
Tidak
String
SQL
Menentukan format timestamp input dan output. Nilai yang valid:
SQL: Mengurai timestamp input dalam format yyyy-MM-dd HH:mm:ss.s{precision}, seperti 2020-12-30 12:13:14.123.
ISO-8601: Mengurai timestamp input dalam format yyyy-MM-ddTHH:mm:ss.s{precision}, seperti 2020-12-30T12:13:14.123.
json.ignore-parse-errors
Tidak
Boolean
false
Nilai yang valid:
true: Melewati baris saat ini jika terjadi exception penguraian.
false (default): Mengembalikan error dan deployment gagal dimulai.
json.infer-schema.primitive-as-string
Tidak
Boolean
false
Menentukan apakah akan menginterpretasikan semua tipe data sebagai STRING saat mengurai skema tabel.
Nilai yang valid:
true: Menginterpretasikan semua tipe dasar sebagai STRING.
false (default): Mengurai sesuai aturan dasar.
json.infer-schema.flatten-nested-columns.enable
Tidak
Boolean
false
Saat mengurai data berformat JSON, Anda harus menentukan apakah akan memperluas kolom bersarang secara rekursif. Nilai parameter ini adalah sebagai berikut:
true: Memperluas kolom bersarang secara rekursif.
false (default): Memperlakukan tipe bersarang sebagai STRING.
json.decode.parser-table-id.fields
Tidak
String
Tidak ada
Menentukan apakah akan menghasilkan tableId berdasarkan nilai bidang JSON tertentu, dengan beberapa bidang dipisahkan koma
,. Misalnya, jika data JSON adalah{"col0":"a", "col1","b", "col2","c"}, hasil yang dihasilkan adalah sebagai berikut:Konfigurasi
tableId
col0
a
col0,col1
a.b
col0,col1,col2
a.b.c
json.infer-schema.fixed-types
Tidak
String
Tidak ada nilai default.
Saat mengurai data berformat JSON, tentukan tipe data eksak untuk bidang tertentu. Pisahkan beberapa bidang dengan koma Inggris (
,). Misalnya,id BIGINT, name VARCHAR(10)menentukan bidang id dalam data JSON sebagai BIGINT dan bidang name sebagai VARCHAR(10).Saat menggunakan konfigurasi ini, Anda juga harus menambahkan konfigurasi
scan.max.pre.fetch.records: 0.CatatanOpsi ini hanya didukung untuk Realtime Compute for Apache Flink yang menggunakan VVR 11.5 atau versi lebih baru.
Khusus sink
Parameter
Deskripsi
Wajib
Tipe data
Nilai default
Keterangan
type
Tipe sink.
Ya
String
Tidak ada
Atur nilainya ke Kafka.
name
Nama sink.
Tidak
String
Tidak ada
N/A
topic
Nama topik Kafka.
Tidak
String
Tidak ada
Jika opsi ini diaktifkan, semua data ditulis ke topik ini.
CatatanJika opsi ini tidak diaktifkan, setiap catatan data ditulis ke topik yang namanya berasal dari string ID tabelnya (gabungan nama database dan nama tabel, dipisahkan oleh titik (
.)), sepertidatabaseName.tableName.partition.strategy
Strategi partisi Kafka.
Tidak
String
all-to-zero
Nilai yang valid:
all-to-zero (default): Menulis semua data ke partisi 0.
hash-by-key: Menulis data ke partisi berdasarkan nilai hash primary key. Hal ini memastikan bahwa data dengan primary key yang sama berada di partisi yang sama dan terurut.
sink.tableId-to-topic.mapping
Pemetaan antara nama tabel leluhur dan nama topik Kafka hilir.
Tidak
String
Tidak ada
Setiap hubungan pemetaan dipisahkan oleh
;. Nama tabel leluhur dan nama topik Kafka hilir yang sesuai dipisahkan oleh:. Anda dapat menggunakan ekspresi reguler untuk nama tabel, serta menggabungkan beberapa tabel yang dipetakan ke topik yang sama menggunakan,. Contoh:mydb.mytable1:topic1;mydb.mytable2:topic2.CatatanMengonfigurasi parameter ini memungkinkan Anda memodifikasi topik yang dipetakan sambil mempertahankan informasi nama tabel asli.
Format JSON Debezium untuk tabel sink
Parameter
Wajib
Tipe data
Nilai default
Deskripsi
debezium-json.include-schema.enabled
Tidak
Boolean
false
Menentukan apakah informasi skema disertakan dalam data JSON Debezium.
Contoh
Ingest data dari Kafka:
source: type: kafka name: Kafka source properties.bootstrap.servers: ${kafka.bootstraps.server} topic: ${kafka.topic} value.format: ${value.format} scan.startup.mode: ${scan.startup.mode} sink: type: hologres name: Hologres sink endpoint: <yourEndpoint> dbname: <yourDbname> username: ${secret_values.ak_id} password: ${secret_values.ak_secret} sink.type-normalize-strategy: BROADENIngest data ke Kafka:
source: type: mysql name: MySQL Source hostname: ${secret_values.mysql.hostname} port: ${mysql.port} username: ${secret_values.mysql.username} password: ${secret_values.mysql.password} tables: ${mysql.source.table} server-id: 8601-8604 sink: type: kafka name: Kafka Sink properties.bootstrap.servers: ${kafka.bootstraps.server} route: - source-table: ${mysql.source.table} sink-table: ${kafka.topic}Di bagian route, tentukan nama topik Kafka tujuan.
Secara default, fitur pembuatan topik otomatis dinonaktifkan untuk Kafka Alibaba Cloud. Untuk informasi selengkapnya, lihat FAQ tentang pembuatan topik otomatis. Saat Anda menulis data ke Kafka Alibaba Cloud, Anda harus membuat topik yang sesuai terlebih dahulu. Untuk informasi selengkapnya, lihat Langkah 3: Buat resource.
Kebijakan untuk penguraian dan evolusi skema
Kafka connector memelihara skema semua tabel yang diketahui.
Inisialisasi skema
Informasi skema mencakup informasi bidang dan tipe data, informasi database dan tabel, serta informasi primary key. Berikut ini menjelaskan cara menginisialisasi ketiga jenis informasi tersebut:
Informasi bidang dan tipe data
Pekerjaan ingesti data dapat menyimpulkan informasi bidang dan tipe data dari data secara otomatis. Namun, dalam beberapa skenario, Anda mungkin ingin menentukan informasi bidang dan tipe untuk tabel tertentu. Berdasarkan granularitas tipe bidang yang ditentukan pengguna, inisialisasi skema mendukung tiga strategi berikut:
Skema disimpulkan sepenuhnya oleh sistem
Sebelum pesan Kafka dibaca, Kafka connector mencoba mengonsumsi pesan di setiap partisi, mengurai skema setiap catatan data, lalu menggabungkan skema untuk menginisialisasi informasi skema tabel. Jumlah pesan yang dapat dikonsumsi tidak lebih dari nilai opsi scan.max.pre.fetch.records. Sebelum data dikonsumsi, event pembuatan tabel dihasilkan berdasarkan skema yang diinisialisasi.
Untuk format Debezium JSON dan Canal JSON, informasi tabel disertakan dalam pesan tertentu. Jumlah pesan yang akan dikonsumsi di muka ditentukan oleh parameter scan.max.pre.fetch.records. Pesan yang dikonsumsi di muka ini mungkin berisi data dari beberapa tabel. Oleh karena itu, jumlah catatan data yang dikonsumsi di muka untuk setiap tabel tidak dapat ditentukan. Konsumsi pesan di muka partisi dan inisialisasi skema tabel hanya dilakukan sekali sebelum konsumsi dan pemrosesan pesan yang sebenarnya untuk setiap partisi. Jika data tabel selanjutnya ada, skema tabel yang diurai dari catatan data pertama tabel tersebut digunakan sebagai skema tabel awal. Dalam kasus ini, konsumsi pesan di muka partisi dan inisialisasi skema tabel tidak akan dilakukan lagi.
Data dalam tabel tunggal dapat didistribusikan ke beberapa partisi hanya di Ververica Runtime (VVR) 8.0.11 atau versi lebih baru. Dalam skenario ini, Anda harus mengatur opsi debezium-json.distributed-tables atau canal-json.distributed-tables ke true.
Tentukan skema awal
Dalam beberapa skenario, Anda mungkin ingin menentukan skema tabel awal sendiri, misalnya saat Anda menulis data dari Kafka ke tabel turunan yang telah dibuat sebelumnya. Untuk melakukan ini, Anda dapat menambahkan parameter scan.value.initial-schemas.ddls. Kode berikut menunjukkan contoh konfigurasi:
source:
type: kafka
name: Kafka Source
properties.bootstrap.servers: host:9092
topic: test-topic
value.format: json
scan.startup.mode: earliest-offset
# Set the initial schema.
scan.value.initial-schemas.ddls: CREATE TABLE db1.t1 (id BIGINT, name VARCHAR(10)); CREATE TABLE db1.t2 (id BIGINT);Pernyataan CREATE TABLE harus sesuai dengan skema tabel target. Di sini, tipe awal bidang id di tabel db1.t1 diatur ke BIGINT, dan tipe awal bidang name diatur ke VARCHAR(10). Demikian pula, tipe awal bidang id di tabel db1.t2 diatur ke BIGINT.
Pernyataan CREATE TABLE menggunakan sintaksis SQL Flink.
Perbaiki tipe bidang
Dalam beberapa skenario, Anda mungkin ingin memperbaiki tipe data bidang tertentu—misalnya, untuk bidang tertentu yang mungkin disimpulkan sebagai tipe TIMESTAMP, Anda ingin mengirimkannya sebagai string. Dalam kasus ini, Anda dapat menentukan skema tabel awal dengan menambahkan parameter json.infer-schema.fixed-types (hanya berlaku saat format pesan adalah JSON). Konfigurasi contoh:
source:
type: kafka
name: Kafka Source
properties.bootstrap.servers: host:9092
topic: test-topic
value.format: json
scan.startup.mode: earliest-offset
# Fix specific fields to static types.
json.infer-schema.fixed-types: id BIGINT, name VARCHAR(10)
scan.max.pre.fetch.records: 0Hal ini memperbaiki tipe semua bidang id ke BIGINT dan semua bidang name ke VARCHAR(10).
Tipe di sini sesuai dengan tipe data SQL Flink.
Informasi database dan tabel
Untuk format Canal JSON dan Debezium JSON, nama database dan tabel diurai dari pesan individual.
Secara default, untuk pesan dalam format JSON, informasi tabel hanya berisi nama tabel—nama topik yang berisi data. Jika data Anda mencakup informasi database dan tabel, Anda dapat menggunakan parameter json.infer-schema.fixed-types untuk menentukan bidang yang berisi informasi ini. Kami memetakan bidang ini ke nama database dan nama tabel. Kode berikut menunjukkan contoh konfigurasi:
source: type: kafka name: Kafka Source properties.bootstrap.servers: host:9092 topic: test-topic value.format: json scan.startup.mode: earliest-offset # Use the value of the col1 field as the database name and the value of the col2 field as the table name. json.decode.parser-table-id.fields: col1,col2Hal ini menulis setiap catatan ke tabel yang nama databasenya adalah nilai bidang col1 dan nama tabelnya adalah nilai bidang col2.
kunci utama
Untuk format Canal JSON, primary key tabel didefinisikan berdasarkan bidang pkNames dalam JSON.
Untuk format Debezium JSON dan JSON, JSON tidak berisi informasi primary key. Anda dapat menambahkan primary key ke tabel secara manual menggunakan aturan transformasi:
transform: - source-table: \.*\.\.* projection: \* primary-keys: key1, key2
Penguraian skema dan evolusi skema
Setelah sinkronisasi skema awal selesai, jika schema.inference.strategy diatur ke static, Kafka connector mengurai nilai setiap pesan berdasarkan skema tabel awal dan tidak menghasilkan event perubahan skema. Jika schema.inference.strategy diatur ke continuous, Kafka connector mengurai bagian nilai setiap pesan Kafka menjadi kolom fisik dan membandingkan kolom tersebut dengan skema yang saat ini dipelihara. Jika skema yang diurai tidak konsisten dengan skema saat ini, Kafka connector mencoba menggabungkan skema dan menghasilkan event perubahan skema tabel yang sesuai. Aturan penggabungan adalah sebagai berikut:
Jika kolom fisik yang diurai berisi bidang yang tidak ada dalam skema saat ini, Kafka connector menambahkan bidang tersebut ke skema dan menghasilkan event penambahan kolom nullable.
Jika kolom fisik yang diurai tidak berisi bidang yang sudah ada dalam skema saat ini, bidang tersebut dipertahankan dan nilainya diisi dengan NULL. Event penghapusan kolom tidak dihasilkan.
Jika kolom fisik yang diurai dan skema saat ini berisi kolom dengan nama yang sama, tangani sebagai berikut:
Jika tipe datanya sama tetapi presisinya berbeda, gunakan tipe presisi yang lebih tinggi dan hasilkan event perubahan tipe kolom.
Jika tipe datanya berbeda, temukan node induk terkecil dalam struktur pohon sebagai tipe untuk kolom dengan nama yang sama dan hasilkan event perubahan tipe kolom.

Opsi evolusi skema yang didukung:
Menambahkan kolom: Menambahkan kolom baru ke akhir skema saat ini dan menyinkronkan data kolom baru. Kolom baru diatur sebagai nullable.
Menghapus kolom: Tidak menghasilkan event penghapusan kolom. Sebaliknya, data selanjutnya untuk kolom tersebut secara otomatis diisi dengan nilai NULL.
Mengganti nama kolom: Dianggap sebagai menambahkan kolom dan menghapus kolom. Menambahkan kolom yang diganti namanya ke akhir skema dan mengisi data kolom asli dengan nilai NULL.
Mengubah tipe data kolom:
Jika sistem hilir mendukung perubahan tipe kolom, pekerjaan ingesti data mendukung perubahan tipe kolom biasa setelah sink hilir mendukung penanganan perubahan tipe kolom—misalnya, mengubah dari INT ke BIGINT. Perubahan tersebut bergantung pada aturan perubahan tipe kolom yang didukung oleh sink hilir. Tabel sink yang berbeda mendukung aturan perubahan tipe kolom yang berbeda. Lihat dokumentasi untuk tabel sink terkait untuk mempelajari aturan perubahan tipe kolom yang didukungnya.
Untuk sistem hilir yang tidak mendukung perubahan tipe kolom, seperti Hologres, Anda dapat menggunakan pemetaan tipe luas. Metode ini membuat tabel dengan tipe data yang lebih umum di sistem hilir saat pekerjaan dimulai. Saat terjadi perubahan tipe kolom, sistem menentukan apakah sink hilir dapat menerima perubahan tersebut, yang memungkinkan dukungan toleran terhadap perubahan tipe kolom.
Perubahan skema yang tidak didukung:
Perubahan pada constraint, seperti primary key atau indeks.
Perubahan dari NOT NULL ke NULLABLE.
Penguraian skema untuk Canal JSON
Data berformat Canal JSON mungkin berisi bidang sqlType opsional, yang berisi informasi tipe yang tepat untuk kolom data. Untuk mendapatkan skema yang lebih akurat, Anda dapat mengatur konfigurasi canal-json.infer-schema.strategy ke SQL_TYPE untuk menggunakan tipe dari sqlType. Hubungan pemetaan tipe adalah sebagai berikut:
Tipe data JDBC
Kode tipe
Tipe data CDC
BIT
-7
BOOLEAN
BOOLEAN
16
TINYINT
-6
TINYINT
SMALLINT
-5
SMALLINT
INTEGER
4
INT
BIGINT
-5
BIGINT
DECIMAL
3
DECIMAL(38,18)
NUMERIC
2
REAL
7
FLOAT
FLOAT
6
DOUBLE
8
DOUBLE
BINARY
-2
BYTES
VARBINARY
-3
LONGVARBINARY
-4
BLOB
2004
DATE
91
DATE
TIME
92
TIME
TIMESTAMP
93
TIMESTAMP
CHAR
1
STRING
VARCHAR
12
LONGVARCHAR
-1
Tipe data lainnya
Toleransi dan pengumpulan data kotor
Dalam beberapa kasus, sumber data Kafka Anda mungkin berisi data rusak (data kotor). Untuk mencegah restart pekerjaan yang sering karena data kotor tersebut, Anda dapat mengonfigurasi pekerjaan untuk mengabaikan exception tersebut. Konfigurasi contoh:
source:
type: kafka
name: Kafka Source
properties.bootstrap.servers: host:9092
topic: test-topic
value.format: json
scan.startup.mode: earliest-offset
# Enable dirty data tolerance.
ingestion.ignore-errors: true
# Tolerate up to 1000 dirty data records.
ingestion.error-tolerance.max-count: 1000Konfigurasi ini mentoleransi hingga 1000 catatan data kotor, memungkinkan pekerjaan Anda berjalan normal saat terdapat sedikit data kotor. Saat jumlah catatan data kotor melebihi ambang batas ini, pekerjaan gagal, mendorong Anda untuk memvalidasi data Anda.
Untuk memastikan pekerjaan Anda tidak pernah gagal karena data kotor, gunakan konfigurasi berikut:
source:
type: kafka
name: Kafka Source
properties.bootstrap.servers: host:9092
topic: test-topic
value.format: json
scan.startup.mode: earliest-offset
# Enable dirty data tolerance.
ingestion.ignore-errors: true
# Tolerate all dirty data.
ingestion.error-tolerance.max-count: -1Kebijakan toleransi data kotor mencegah pekerjaan gagal secara sering karena data abnormal. Anda mungkin juga ingin mempelajari lebih lanjut tentang data kotor untuk menyesuaikan perilaku produsen Kafka. Untuk proses yang dijelaskan dalam Pengumpulan Data Kotor, Anda dapat melihat data kotor pekerjaan Anda di log TaskManager. Kode berikut menunjukkan contoh konfigurasi:
source:
type: kafka
name: Kafka Source
properties.bootstrap.servers: host:9092
topic: test-topic
value.format: json
scan.startup.mode: earliest-offset
# Enable dirty data tolerance.
ingestion.ignore-errors: true
# Tolerate all dirty data.
ingestion.error-tolerance.max-count: -1
pipeline:
dirty-data.collector:
# Write dirty data to TaskManager log files.
type: loggerStrategi pemetaan nama tabel dan topik
Saat menggunakan Kafka sebagai sink ingesti data, format pesan (debezium-json atau canal-json) sering kali mencakup informasi nama tabel. Konsumen biasanya menggunakan nama tabel ini—bukan nama topik—sebagai nama tabel sebenarnya. Oleh karena itu, konfigurasikan strategi pemetaan nama tabel dan topik dengan hati-hati.
Asumsikan Anda perlu menyinkronkan dua tabel—mydb.mytable1 dan mydb.mytable2—dari MySQL. Strategi pemetaan yang mungkin termasuk yang berikut:
1. Jangan mengonfigurasi strategi pemetaan apa pun
Tanpa strategi pemetaan apa pun, setiap tabel ditulis ke topik yang dinamai berdasarkan database dan tabel (misalnya, mydb.mytable1). Dengan demikian, data mydb.mytable1 masuk ke topik mydb.mytable1, dan data mydb.mytable2 masuk ke topik mydb.mytable2. Konfigurasi contoh:
source:
type: mysql
name: MySQL Source
hostname: ${secret_values.mysql.hostname}
port: ${mysql.port}
username: ${secret_values.mysql.username}
password: ${secret_values.mysql.password}
tables: mydb.mytable1,mydb.mytable2
server-id: 8601-8604
sink:
type: kafka
name: Kafka Sink
properties.bootstrap.servers: ${kafka.bootstraps.server}2. Konfigurasikan aturan rute untuk pemetaan (tidak disarankan)
Dalam banyak skenario, pengguna tidak ingin topik dinamai berdasarkan database dan tabel. Sebaliknya, mereka mengonfigurasi aturan rute untuk memetakan data ke topik tertentu. Konfigurasi contoh:
source:
type: mysql
name: MySQL Source
hostname: ${secret_values.mysql.hostname}
port: ${mysql.port}
username: ${secret_values.mysql.username}
password: ${secret_values.mysql.password}
tables: mydb.mytable1,mydb.mytable2
server-id: 8601-8604
sink:
type: kafka
name: Kafka Sink
properties.bootstrap.servers: ${kafka.bootstraps.server}
route:
- source-table: mydb.mytable1,mydb.mytable2
sink-table: mytable1Dalam kasus ini, semua data dari mydb.mytable1 dan mydb.mytable2 ditulis ke topik mytable1.
Namun, memodifikasi nama topik melalui aturan rute juga memodifikasi nama tabel dalam pesan Kafka (dalam format debezium-json atau canal-json). Akibatnya, semua pesan dalam topik ini memiliki nama tabel mytable1. Sistem lain yang mengonsumsi topik ini mungkin berperilaku tidak terduga.
3. Konfigurasikan parameter sink.tableId-to-topic.mapping untuk pemetaan (disarankan)
Untuk mempertahankan informasi nama tabel asli sambil memetakan ke topik kustom, gunakan parameter sink.tableId-to-topic.mapping. Konfigurasi contoh:
source:
type: mysql
name: MySQL Source
hostname: ${secret_values.mysql.hostname}
port: ${mysql.port}
username: ${secret_values.mysql.username}
password: ${secret_values.mysql.password}
tables: mydb.mytable1,mydb.mytable2
server-id: 8601-8604
sink.tableId-to-topic.mapping: mydb.mytable1,mydb.mytable2:mytable
sink:
type: kafka
name: Kafka Sink
properties.bootstrap.servers: ${kafka.bootstraps.server}Atau:
source:
type: mysql
name: MySQL Source
hostname: ${secret_values.mysql.hostname}
port: ${mysql.port}
username: ${secret_values.mysql.username}
password: ${secret_values.mysql.password}
tables: mydb.mytable1,mydb.mytable2
server-id: 8601-8604
sink.tableId-to-topic.mapping: mydb.mytable1:mytable;mydb.mytable2:mytable
sink:
type: kafka
name: Kafka Sink
properties.bootstrap.servers: ${kafka.bootstraps.server}Dalam konfigurasi ini, semua data dari `mydb.mytable1` dan `mydb.mytable2` ditulis ke topik `mytable1`, tetapi nama tabel asli (`mydb.mytable1` atau `mydb.mytable2`) dipertahankan dalam format pesan Kafka (`debezium-json` atau `canal-json`). Hal ini memungkinkan sistem lain yang mengonsumsi pesan dari topik ini untuk mengambil informasi nama tabel sumber dengan benar.
Pertimbangan semantik EXACTLY_ONCE
Konfigurasikan tingkat isolasi konsumen
Semua aplikasi yang mengonsumsi data Kafka harus mengatur isolation.level:
read_committed: Hanya membaca data yang telah dikomit.read_uncommitted(default): Memungkinkan Anda membaca data yang belum dikomit.
EXACTLY_ONCE bergantung pada
read_committed. Jika tidak, konsumen mungkin melihat data yang belum dikomit, yang merusak konsistensi.Timeout transaksi dan kehilangan data
Saat memulihkan dari checkpoint, Flink hanya bergantung pada transaksi yang dikomit sebelum checkpoint dimulai. Jika waktu antara crash pekerjaan dan restart melebihi timeout transaksi Kafka, Kafka secara otomatis membatalkan transaksi, menyebabkan kehilangan data.
Default broker Kafka
transaction.max.timeout.ms= 15 menit.Sink Kafka Flink mengatur
transaction.timeout.mske 1 jam secara default.Anda harus meningkatkan
transaction.max.timeout.msdi broker agar lebih besar dari atau sama dengan pengaturan Flink.
Kolam produsen Kafka dan checkpoint konkuren
Mode EXACTLY_ONCE menggunakan kolam produsen Kafka berukuran tetap. Setiap checkpoint mengonsumsi satu produsen dari kolam. Jika checkpoint konkuren melebihi ukuran kolam, pekerjaan gagal.
Sesuaikan ukuran kolam produsen sesuai dengan jumlah maksimum checkpoint konkuren Anda.
Scale-in restrictions
Jika pekerjaan gagal sebelum checkpoint pertama, informasi kolam produsen tidak dipertahankan setelah restart. Oleh karena itu, sebelum checkpoint pertama selesai, jangan menurunkan konkurensi pekerjaan. Jika Anda harus menurunkan skala, konkurensi tidak boleh kurang dari
FlinkKafkaProducer.SAFE_SCALE_DOWN_FACTOR.Pemblokiran pembacaan transaksional
Dalam mode
read_committed, transaksi yang belum selesai (tidak dikomit maupun dibatalkan) memblokir pembacaan dari seluruh topik.Misalnya:
Transaksi 1 menulis data.
Transaksi 2 menulis dan melakukan commit data.
Hingga Transaksi 1 selesai, data Transaksi 2 tidak terlihat oleh konsumen.
Oleh karena itu:
Selama operasi normal, visibilitas data tertunda sekitar interval rata-rata checkpoint.
Saat pekerjaan gagal, topik yang sedang ditulis memblokir konsumen hingga pekerjaan dimulai ulang atau transaksi timeout. Dalam kasus ekstrem, timeout transaksi bahkan dapat memengaruhi pembacaan.