Topik ini menjelaskan cara menggunakan Konektor Upsert Kafka.
Informasi latar belakang
Konektor Upsert Kafka digunakan untuk membaca dan menulis data ke topik Kafka dengan metode upsert.
Untuk tabel sumber, Konektor Upsert Kafka mengonversi data dalam topik Kafka menjadi aliran log perubahan. Setiap catatan dalam aliran log mewakili pembaruan atau penghapusan. Jika kunci dalam topik Kafka cocok dengan kunci dalam catatan data, nilai dari catatan tersebut akan menimpa nilai yang ada, diinterpretasikan sebagai UPDATE. Jika tidak ada kunci yang cocok, nilai baru dimasukkan, diinterpretasikan sebagai INSERT. Semua catatan diinterpretasikan sebagai UPSERT (INSERT atau UPDATE). Jika nilai kunci adalah null, catatan diinterpretasikan sebagai DELETE.
Untuk tabel sink atau sink ingest data, Konektor Upsert Kafka mengonsumsi aliran log perubahan dari sumber. Data INSERT dan UPDATE_AFTER ditulis sebagai pesan Kafka normal, sedangkan data DELETE ditulis sebagai pesan Kafka dengan nilai null. Jika nilai kunci dalam catatan adalah null, pesan Kafka terkait dihapus. Flink mempartisi data berdasarkan kolom kunci utama, memastikan bahwa pesan dengan kunci utama yang sama diurutkan dan ditulis ke partisi yang sama.
Item | Deskripsi |
Tipe tabel | Tabel sumber, tabel sink, sink ingest data |
Mode operasi | Mode streaming |
Format data | avro, avro-confluent, csv, json, dan raw |
Metrik |
|
Tipe API | SQL API dan YAML API ingest data |
Pembaruan atau penghapusan data dalam tabel sink | Ya |
Prasyarat
Kluster Kafka telah dibuat. Untuk detail lebih lanjut, lihat Buat kluster Dataflow Kafka atau Buat sumber daya di Kafka.
Koneksi jaringan antara Realtime Compute for Apache Flink dan kluster Kafka telah dibuat. Untuk informasi lebih lanjut tentang koneksi antara Realtime Compute for Apache Flink dan kluster Kafka di E-MapReduce (EMR), lihat Buat dan kelola VPC dan Ikhtisar. Untuk koneksi dengan kluster ApsaraMQ for Kafka, lihat Konfigurasikan daftar putih.
Batasan
Hanya Realtime Compute for Apache Flink dengan Ververica Runtime (VVR) 2.0.0 atau versi lebih baru yang mendukung konektor Apache Kafka.
Konektor Upsert Kafka hanya mendukung pembacaan dan penulisan data dari Apache Kafka 0.10 atau versi lebih baru.
Konektor Upsert Kafka hanya mendukung opsi klien Apache Kafka 2.8. Untuk detail konfigurasi produsen dan konsumen Kafka, lihat Konfigurasi Konsumen dan Konfigurasi Produsen.
Jika tabel sink Upsert Kafka menggunakan semantik exactly-once, mekanisme transaksi Kafka harus diaktifkan. Versi kluster Kafka harus Apache Kafka 0.11 atau lebih baru.
SQL
Konektor Upsert Kafka digunakan untuk membaca dan menulis data ke topik Kafka dengan metode upsert.
Sintaks
CREATE TABLE upsert_kafka_sink(
user_region STRING,
pv BIGINT,
uv BIGINT,
PRIMARY KEY(user_region) NOT ENFORCED
)WITH(
'connector'='upsert-kafka',
'topic'='<yourTopicName>',
'properties.bootstrap.servers'='...',
'key.format'='avro',
'value.format'='avro'
);Opsi konektor dalam klausa WITH
Opsi Umum
Opsi
Deskripsi
Tipe data
Diperlukan
Nilai default
Catatan
connector
Tipe tabel.
String
Ya
Tidak ada nilai default
Setel nilainya menjadi upsert-kafka.
properties.bootstrap.servers
Alamat IP atau titik akhir dan nomor port broker Kafka.
String
Ya
Tidak ada nilai default
Format:
host:port,host:port,host:port. Pisahkan beberapa pasangan host:port dengan koma (,).properties.*
Opsi yang dikonfigurasi untuk klien Kafka.
String
Tidak
Tidak ada nilai default
Akhiran opsi ini harus sesuai dengan aturan yang didefinisikan di Konfigurasi Produsen dan Konfigurasi Konsumen.
Flink menghapus awalan properties. dan meneruskan kunci dan nilai yang telah diubah ke klien Kafka. Sebagai contoh, Anda dapat mengatur
properties.allow.auto.create.topicsmenjadi false untuk menonaktifkan pembuatan topik otomatis.Anda tidak dapat memodifikasi konfigurasi opsi berikut dengan menambahkan awalan properties., karena nilai opsi tersebut ditimpa setelah Anda menggunakan konektor Upsert Kafka:
key.deserializer
value.deserializer
key.format
Format yang digunakan untuk membaca atau menulis bidang kunci pesan Kafka.
String
Ya
Tidak ada nilai default
Anda harus mengonfigurasi opsi key.fields atau key.fields-prefix jika Anda mengonfigurasi opsi ini.
Nilai valid:
csv
json
avro
debezium-json
canal-json
maxwell-json
avro-confluent
raw
key.fields-prefix
Awalan kustom untuk semua bidang kunci dalam pesan Kafka. Anda dapat mengonfigurasi parameter ini untuk mencegah konflik nama dengan bidang nilai.
String
Tidak
Tidak ada nilai default
Opsi ini hanya digunakan untuk membedakan nama kolom tabel sumber dan tabel sink. Awalan dihapus dari nama kolom saat bidang kunci pesan Kafka diurai dan dibuat.
CatatanJika Anda mengonfigurasi opsi ini, Anda harus mengatur opsi value.fields-include menjadi EXCEPT_KEY.
value.format
Format yang digunakan untuk membaca atau menulis bidang nilai pesan Kafka.
String
Ya
Tidak ada nilai default
Konfigurasi opsi ini setara dengan konfigurasi opsi format. Opsi format tidak dapat digunakan bersama dengan opsi value.format. Jika Anda mengonfigurasi kedua opsi tersebut, terjadi konflik.
value.fields-include
Menentukan apakah akan menyertakan bidang yang sesuai dengan kunci pesan saat bidang nilai pesan Kafka diurai atau dibuat.
String
Ya
ALL
Nilai valid:
ALL: Semua bidang diproses sebagai bidang nilai pesan Kafka. Ini adalah nilai default.
EXCEPT_KEY: Semua bidang kecuali bidang yang ditentukan oleh opsi key.fields diproses sebagai bidang nilai pesan Kafka.
topic
Nama topik tempat data dibaca atau ke mana data ditulis.
String
Ya
Tidak ada nilai default
N/A
Opsi Eksklusif Sink
Opsi
Deskripsi
Tipe data
Diperlukan
Nilai default
Catatan
sink.parallelism
Paralelisme operator dalam tabel sink Kafka.
Integer
Tidak
Paralelisme operator hulu, yang ditentukan oleh kerangka kerja.
N/A
sink.buffer-flush.max-rows
Jumlah maksimum catatan data yang dapat di-cache sebelum cache diperbarui.
Integer
Tidak
0 (dinonaktifkan)
Jika tabel sink menerima sejumlah besar pembaruan pada kunci yang sama, hanya catatan data terakhir dari kunci tersebut yang disimpan dalam cache. Dalam hal ini, caching data dalam tabel sink membantu mengurangi jumlah data yang ditulis ke topik Kafka. Ini mencegah pesan nisan potensial dikirim ke topik Kafka.
CatatanJika Anda ingin mengaktifkan caching data untuk tabel sink, Anda harus mengatur opsi sink.buffer-flush.max-rows dan sink.buffer-flush.interval ke nilai yang lebih besar dari 0.
sink.buffer-flush.interval
Interval pembaruan cache.
Durasi
Tidak
0 (dinonaktifkan)
Unit bisa milidetik, detik, menit, atau jam. Sebagai contoh, Anda dapat mengonfigurasi
'sink.buffer-flush.interval'='1 s'.Jika tabel sink menerima sejumlah besar pembaruan pada kunci yang sama, hanya catatan data terakhir dari kunci tersebut yang disimpan dalam cache. Dalam hal ini, caching data dalam tabel sink membantu mengurangi jumlah data yang ditulis ke topik Kafka. Ini mencegah pesan nisan potensial dikirim ke topik Kafka.
CatatanJika Anda ingin mengaktifkan caching data untuk tabel sink, Anda harus mengatur opsi sink.buffer-flush.max-rows dan sink.buffer-flush.interval ke nilai yang lebih besar dari 0.
Ingest data
Konektor Upsert Kafka dapat digunakan untuk mengembangkan draft YAML untuk ingest data dan sebagai sink untuk menulis data dalam format JSON, termasuk bidang kunci utama dalam tubuh pesan.
Sintaks
sink:
type: upsert-kafka
name: upsert-kafka Sink
properties.bootstrap.servers: localhost:9092
# ApsaraMQ for Kafka
aliyun.kafka.accessKeyId: ${secret_values.kafka-ak}
aliyun.kafka.accessKeySecret: ${secret_values.kafka-sk}
aliyun.kafka.instanceId: ${instancd-id}
aliyun.kafka.endpoint: ${endpoint}
aliyun.kafka.regionId: ${region-id}Opsi konektor
Opsi | Deskripsi | Tipe data | Diperlukan | Nilai default | Catatan |
type | Tipe konektor sink | STRING | Ya | Tidak ada nilai default | Setel opsi menjadi upsert-kafka. |
name | Nama konektor sink | STRING | Tidak | Tidak ada nilai default | N/A |
properties.bootstrap.servers | Alamat IP atau titik akhir dan nomor port broker Kafka. | STRING | Ya | Tidak ada nilai default | Format: |
properties.* | Opsi yang dikonfigurasi untuk klien Kafka. | STRING | Tidak | Tidak ada nilai default | Akhiran opsi ini harus sesuai dengan aturan yang didefinisikan di Konfigurasi Produsen. Flink menghapus awalan |
sink.delivery-guarantee | Semantik pengiriman untuk sink Kafka | STRING | Tidak | at-least-once | Nilai valid:
|
sink.add-tableId-to-header-enabled | Apakah akan menulis informasi tabel ke header. | BOOLEAN | Tidak | false | Jika Anda mengatur opsi menjadi |
aliyun.kafka.accessKeyId | ID AccessKey akun Alibaba Cloud Anda. | STRING | Tidak | Tidak ada nilai default | Untuk informasi lebih lanjut, lihat Dapatkan pasangan AccessKey. Catatan Anda harus mengonfigurasi opsi ini saat menyinkronkan data ke ApsaraMQ for Kafka. |
aliyun.kafka.accessKeySecret | Rahasia AccessKey akun Alibaba Cloud. | STRING | Tidak | Tidak ada nilai default | Untuk informasi lebih lanjut, lihat Dapatkan pasangan AccessKey. Catatan Anda harus mengonfigurasi opsi ini saat menyinkronkan data ke ApsaraMQ for Kafka. |
aliyun.kafka.instanceId | ID instance ApsaraMQ for Kafka | STRING | Tidak | Tidak ada nilai default | Anda dapat melihat ID instance di halaman Detail Instance. Catatan Anda harus mengonfigurasi opsi ini saat menyinkronkan data ke ApsaraMQ for Kafka. |
aliyun.kafka.endpoint | Titik akhir ApsaraMQ for Kafka. | STRING | Tidak | Tidak ada nilai default | Untuk informasi lebih lanjut, lihat Titik Akhir. Catatan Anda harus mengonfigurasi opsi ini saat menyinkronkan data ke ApsaraMQ for Kafka. |
aliyun.kafka.regionId | ID wilayah instance tempat Anda ingin membuat topik. | STRING | Tidak | Tidak ada nilai default | Untuk informasi lebih lanjut, lihat Titik Akhir. Catatan Anda harus mengonfigurasi opsi ini saat menyinkronkan data ke ApsaraMQ for Kafka. |
Tipe perubahan yang didukung
Konektor Upsert Kafka mendukung semua tipe perubahan untuk pemasukan data. Namun, perlu diperhatikan bahwa data harus dimasukkan dari tabel sumber dengan skema tetap melalui pekerjaan Flink SQL menggunakan Konektor Upsert Kafka.
Kode sampel
Kode sampel untuk tabel sumber
Buat tabel sumber Kafka yang berisi data penjelajahan pengguna situs web.
CREATE TABLE pageviews( user_id BIGINT, page_id BIGINT, viewtime TIMESTAMP, user_region STRING, WATERMARK FOR viewtime AS viewtime - INTERVAL '2' SECOND )WITH( 'connector'='kafka', 'topic'='<yourTopicName>', 'properties.bootstrap.servers'='...', 'format'='json' );Kode sampel untuk tabel sink
Buat tabel sink Upsert Kafka.
CREATE TABLE pageviews_per_region( user_region STRING, pv BIGINT, uv BIGINT, PRIMARY KEY(user_region) NOT ENFORCED )WITH( 'connector'='upsert-kafka', 'topic'='<yourTopicName>', 'properties.bootstrap.servers'='...', 'key.format'='avro', 'value.format'='avro' );Tulis data penjelajahan pengguna situs web ke tabel sink.
INSERT INTO pageviews_per_region SELECT user_region, COUNT(*), COUNT(DISTINCTuser_id) FROM pageviews GROUP BY user_region;
Sink Ingest Data
source: type: mysql name: MySQL Source hostname: ${mysql.hostname} port: ${mysql.port} username: ${mysql.username} password: ${mysql.password} tables: ${mysql.source.table} server-id: 8601-8604 sink: type: upsert-kafka name: Upsert Kafka Sink properties.bootstrap.servers: ${upsert.kafka.bootstraps.server} aliyun.kafka.accessKeyId: ${upsert.kafka.aliyun.ak} aliyun.kafka.accessKeySecret: ${upsert.kafka.aliyun.sk} aliyun.kafka.instanceId: ${upsert.kafka.aliyun.instanceid} aliyun.kafka.endpoint: ${upsert.kafka.aliyun.endpoint} aliyun.kafka.regionId: ${upsert.kafka.aliyun.regionid} route: - source-table: ${mysql.source.table} sink-table: ${upsert.kafka.topic}