Topik ini menjelaskan cara menggunakan konektor Upsert Kafka.
Informasi latar belakang
Konektor Upsert Kafka membaca data dari dan menulis data ke topik Kafka menggunakan operasi upsert.
-
Sebagai tabel sumber, konektor dapat mengonversi data yang disimpan di Kafka menjadi aliran changelog. Setiap catatan dalam aliran tersebut merepresentasikan event pembaruan atau penghapusan. Nilai dalam sebuah catatan data diinterpretasikan sebagai UPDATE terhadap nilai terakhir dengan kunci yang sama, jika kunci tersebut ada. Jika kunci tidak ada, pembaruan tersebut diperlakukan sebagai INSERT. Dalam istilah tabel, sebuah catatan dalam aliran changelog merupakan UPSERT—juga dikenal sebagai INSERT atau UPDATE—karena setiap baris yang ada dengan kunci yang sama akan ditimpa. Pesan dengan nilai kosong diperlakukan sebagai pesan DELETE.
-
Sebagai tabel sink atau sink ingesti data, konektor dapat mengonsumsi aliran changelog yang dihasilkan oleh pekerjaan hulu. Konektor menulis data INSERT atau UPDATE_AFTER sebagai pesan Kafka biasa, sedangkan data DELETE ditulis sebagai pesan Kafka dengan nilai kosong, yang menunjukkan bahwa pesan untuk kunci yang bersangkutan telah dihapus. Flink mempartisi data berdasarkan nilai kolom kunci primer, sehingga memastikan bahwa pesan dengan kunci primer yang sama tetap terurut. Akibatnya, pesan pembaruan atau penghapusan untuk kunci primer yang sama ditulis ke partisi yang sama.
|
Kategori |
Deskripsi |
|
Jenis yang didukung |
Tabel sumber, tabel sink, dan sink ingesti data |
|
Mode eksekusi |
Mode streaming |
|
Format data |
avro, avro-confluent, csv, json, dan raw |
|
Metrik pemantauan spesifik |
|
|
Jenis API |
SQL dan pekerjaan YAML ingesti data |
|
Pembaruan atau penghapusan data dalam tabel sink |
Ya |
Prasyarat
-
Buat kluster Kafka. Untuk informasi selengkapnya, lihat Buat kluster DataFlow Kafka atau Buat sumber daya di Kafka.
-
Buat koneksi jaringan antara kluster Flink dan kluster Kafka Anda. Untuk Kafka di EMR, lihat Buat dan kelola VPC dan Ikhtisar grup keamanan. Untuk ApsaraMQ for Kafka, Anda harus mengonfigurasi daftar putih.
Batasan
-
Konektor Kafka hanya didukung oleh Flink yang menggunakan Ververica Runtime (VVR) 2.0.0 atau versi yang lebih baru.
-
Konektor hanya mendukung pembacaan dari dan penulisan ke Apache Kafka 0.10 atau versi yang lebih baru.
-
Konektor hanya mendukung parameter client Apache Kafka 2.8. Untuk informasi selengkapnya, lihat dokumentasi konfigurasi consumer dan producer dari Apache Kafka.
-
Jika tabel sink Upsert Kafka menggunakan semantik tepat-sekali (exactly-once semantics), fitur transaksi harus diaktifkan pada kluster Kafka tujuan. Kluster tersebut harus merupakan Apache Kafka 0.11 atau versi yang lebih baru.
-
Tabel sumber Upsert Kafka hanya mendukung mode startup earliest-offset. Mode ini tidak dapat dikonfigurasi. Konektor membaca semua data perubahan historis untuk mendapatkan changelog lengkap, sehingga memastikan Anda dapat memproses changelog tersebut secara utuh dalam SQL dan seluruh pipeline menyediakan semantik tepat-sekali. Jika Anda menentukan mode startup lain, seperti berdasarkan timestamp atau latest-offset, konektor akan membaca changelog yang tidak lengkap, yang dapat menyebabkan masalah kebenaran data dalam komputasi hilir.
SQL
Konektor Upsert Kafka membaca data dari dan menulis data ke topik Kafka menggunakan operasi upsert.
Sintaksis
CREATE TABLE upsert_kafka_sink(
user_region STRING,
pv BIGINT,
uv BIGINT,
PRIMARY KEY(user_region) NOT ENFORCED
)WITH(
'connector'='upsert-kafka',
'topic'='<yourTopicName>',
'properties.bootstrap.servers'='...',
'key.format'='avro',
'value.format'='avro'
);
DENGAN parameter
-
Umum
Parameter
Deskripsi
Tipe data
Wajib
Bawaan
Keterangan
connector
Jenis tabel.
String
Ya
None
Atur nilainya ke `upsert-kafka`.
properties.bootstrap.servers
Alamat broker Kafka.
String
Ya
None
Formatnya adalah
host:port,host:port,host:port. Pisahkan beberapa alamat dengan koma (,).properties.*
Parameter untuk client Kafka.
String
Tidak
None
Akhiran harus merupakan konfigurasi yang didefinisikan dalam dokumentasi resmi Kafka untuk producer dan consumer.
Flink menghapus awalan `properties.` dan meneruskan konfigurasi yang tersisa ke client Kafka. Misalnya, Anda dapat menggunakan
'properties.allow.auto.create.topics' = 'false'untuk menonaktifkan pembuatan topik otomatis.Jangan ubah konfigurasi berikut dengan cara ini, karena konektor Kafka akan menimpanya:
-
key.deserializer
-
value.deserializer
key.format
Format untuk bagian kunci pesan Kafka.
String
Ya
None
Jika Anda mengonfigurasi parameter ini, Anda juga harus mengonfigurasi key.fields atau key.fields-prefix.
Nilai yang valid:
-
csv
-
json
-
avro
-
debezium-json
-
canal-json
-
maxwell-json
-
avro-confluent
-
raw
key.fields-prefix
Awalan kustom untuk semua bidang kunci dalam pesan Kafka. Hal ini menghindari konflik nama dengan bidang dalam format nilai.
String
Tidak
None
Parameter ini hanya digunakan untuk membedakan nama kolom antara tabel sumber dan tabel sink. Awalan dihapus saat bagian kunci pesan Kafka diurai dan dihasilkan.
CatatanJika Anda mengonfigurasi parameter ini, Anda harus mengatur value.fields-include ke `EXCEPT_KEY`.
value.format
Format untuk bagian nilai pesan Kafka.
String
Ya
None
Parameter ini setara dengan format. Anda hanya dapat mengonfigurasi salah satunya. Konflik terjadi jika Anda mengonfigurasi kedua parameter format dan value.format.
value.fields-include
Menentukan apakah bidang yang sesuai dengan kunci pesan disertakan saat bagian nilai pesan Kafka diurai atau dihasilkan.
String
Ya
ALL
Nilai yang valid:
-
ALL (Default): Semua kolom diproses sebagai bagian nilai pesan Kafka.
-
EXCEPT_KEY: Semua kolom kecuali bidang yang didefinisikan oleh `key.fields` diproses sebagai bagian nilai pesan Kafka.
topic
Nama topik untuk dibaca atau ditulis.
String
Ya
None
None.
-
-
Parameter khusus sink
Parameter
Deskripsi
Tipe data
Wajib
Bawaan
Keterangan
sink.parallelism
Konkurensi operator sink Kafka.
Integer
Tidak
Konkurensi operator hulu, yang ditentukan oleh framework.
None.
sink.buffer-flush.max-rows
Jumlah maksimum catatan yang dapat di-cache sebelum cache dikosongkan.
Integer
Tidak
0 (dinonaktifkan)
Jika tabel sink menerima banyak pembaruan pada kunci yang sama, cache hanya menyimpan catatan terakhir untuk kunci tersebut. Hal ini membantu mengurangi jumlah data yang dikirim ke topik Kafka dan menghindari potensi pesan tombstone.
CatatanUntuk mengaktifkan caching sink, atur kedua parameter sink.buffer-flush.max-rows dan sink.buffer-flush.interval ke nilai yang lebih besar dari nol.
sink.buffer-flush.interval
Interval pengosongan cache.
Duration
Tidak
0 (dinonaktifkan)
Unitnya dapat berupa milidetik (ms), detik (s), menit (min), atau jam (h). Contohnya,
'sink.buffer-flush.interval'='1 s'.Jika tabel sink menerima banyak pembaruan pada kunci yang sama, cache hanya menyimpan catatan terakhir untuk kunci tersebut. Hal ini membantu mengurangi jumlah data yang dikirim ke topik Kafka dan menghindari potensi pesan tombstone.
CatatanUntuk mengaktifkan caching sink, atur kedua parameter sink.buffer-flush.max-rows dan sink.buffer-flush.interval ke nilai yang lebih besar dari nol.
Ingesti data
Konektor Upsert Kafka dapat digunakan sebagai sink dalam pekerjaan ingesti data YAML. Data ditulis dalam format JSON, dan bidang kunci primer juga disertakan dalam isi pesan.
Sintaksis
sink:
type: upsert-kafka
name: upsert-kafka Sink
properties.bootstrap.servers: localhost:9092
# ApsaraMQ for Kafka
aliyun.kafka.accessKeyId: ${secret_values.kafka-ak}
aliyun.kafka.accessKeySecret: ${secret_values.kafka-sk}
aliyun.kafka.instanceId: ${instancd-id}
aliyun.kafka.endpoint: ${endpoint}
aliyun.kafka.regionId: ${region-id}
Parameter
|
Parameter |
Deskripsi |
Tipe data |
Wajib |
Bawaan |
Keterangan |
|
type |
Jenis sink. |
STRING |
Ya |
None |
Atur nilainya ke `upsert-kafka`. |
|
name |
Nama sink. |
STRING |
Tidak |
None |
None. |
|
properties.bootstrap.servers |
Alamat broker Kafka. |
STRING |
Ya |
None |
Formatnya adalah |
|
properties.* |
Parameter untuk client Kafka. |
STRING |
Tidak |
None |
Akhiran harus merupakan konfigurasi yang didefinisikan dalam dokumentasi resmi Kafka untuk producer. Flink menghapus awalan `properties.` dan meneruskan konfigurasi yang tersisa ke client Kafka. Misalnya, Anda dapat menggunakan |
|
sink.delivery-guarantee |
Pola semantik untuk operasi penulisan. |
STRING |
Tidak |
at-least-once |
Nilai yang valid:
|
|
sink.add-tableId-to-header-enabled |
Menentukan apakah informasi tabel ditulis ke header. |
BOOLEAN |
Tidak |
false |
Jika diaktifkan, `namespace`, `schemaName`, dan `tableName` ditulis ke header. |
|
aliyun.kafka.accessKeyId |
ID AccessKey Akun Alibaba Cloud Anda. |
STRING |
Tidak |
None |
Untuk informasi selengkapnya, lihat Buat pasangan Kunci Akses. Catatan
Konfigurasikan parameter ini saat Anda menyinkronkan data ke ApsaraMQ for Kafka. |
|
aliyun.kafka.accessKeySecret |
Rahasia AccessKey Akun Alibaba Cloud Anda. |
STRING |
Tidak |
None |
Untuk informasi selengkapnya, lihat Buat pasangan Kunci Akses. Catatan
Konfigurasikan parameter ini saat Anda menyinkronkan data ke ApsaraMQ for Kafka. |
|
aliyun.kafka.instanceId |
ID instans ApsaraMQ for Kafka. |
STRING |
Tidak |
None |
Anda dapat melihat detail instans di antarmuka Alibaba Cloud Kafka. Catatan
Konfigurasikan parameter ini saat Anda menyinkronkan data ke ApsaraMQ for Kafka. |
|
aliyun.kafka.endpoint |
Titik akhir API untuk ApsaraMQ for Kafka. |
STRING |
Tidak |
None |
Untuk informasi selengkapnya, lihat Titik akhir. Catatan
Konfigurasikan parameter ini saat Anda menyinkronkan data ke ApsaraMQ for Kafka. |
|
aliyun.kafka.regionId |
ID wilayah instans tempat topik berada. |
STRING |
Tidak |
None |
Untuk informasi selengkapnya, lihat Titik akhir. Catatan
Konfigurasikan parameter ini saat Anda menyinkronkan data ke ApsaraMQ for Kafka. |
Perubahan tipe yang didukung
Konektor Upsert Kafka untuk ingesti data mendukung semua jenis operasi perubahan. Namun, untuk membaca data tersebut, Anda harus menggunakan konektor SQL Flink Upsert Kafka dengan skema tetap.
Contoh
-
Tabel sumber
Buat tabel sumber Kafka yang berisi data penelusuran pengguna website.
CREATE TABLE pageviews( user_id BIGINT, page_id BIGINT, viewtime TIMESTAMP, user_region STRING, WATERMARK FOR viewtime AS viewtime - INTERVAL '2' SECOND )WITH( 'connector'='kafka', 'topic'='<yourTopicName>', 'properties.bootstrap.servers'='...', 'format'='json' ); -
Tabel sink
-
Buat tabel sink Upsert Kafka.
CREATE TABLE pageviews_per_region( user_region STRING, pv BIGINT, uv BIGINT, PRIMARY KEY(user_region) NOT ENFORCED )WITH( 'connector'='upsert-kafka', 'topic'='<yourTopicName>', 'properties.bootstrap.servers'='...', 'key.format'='avro', 'value.format'='avro' ); -
Tulis data penelusuran pengguna website ke tabel sink.
INSERT INTO pageviews_per_region SELECT user_region, COUNT(*), COUNT(DISTINCT user_id) FROM pageviews GROUP BY user_region;
-
-
sink pengambilan data
source: type: mysql name: MySQL Source hostname: ${mysql.hostname} port: ${mysql.port} username: ${mysql.username} password: ${mysql.password} tables: ${mysql.source.table} server-id: 8601-8604 sink: type: upsert-kafka name: Upsert Kafka Sink properties.bootstrap.servers: ${upsert.kafka.bootstraps.server} aliyun.kafka.accessKeyId: ${upsert.kafka.aliyun.ak} aliyun.kafka.accessKeySecret: ${upsert.kafka.aliyun.sk} aliyun.kafka.instanceId: ${upsert.kafka.aliyun.instanceid} aliyun.kafka.endpoint: ${upsert.kafka.aliyun.endpoint} aliyun.kafka.regionId: ${upsert.kafka.aliyun.regionid} route: - source-table: ${mysql.source.table} sink-table: ${upsert.kafka.topic}