Setelah mengonfigurasi katalog Kafka JSON, Anda dapat langsung mengakses topik berformat JSON di kluster Kafka tanpa perlu mendefinisikan skema saat mengembangkan pekerjaan di Realtime Compute for Apache Flink. Topik ini menjelaskan cara membuat, melihat, dan menghapus katalog Kafka JSON.
Informasi latar belakang
Katalog Kafka JSON melakukan inferensi skema topik dengan secara otomatis mengurai pesan berformat JSON. Hal ini memungkinkan Anda mengambil informasi bidang tertentu dari pesan tanpa perlu mendeklarasikan skema tabel Kafka dalam Flink SQL. Katalog Kafka JSON memiliki fitur-fitur berikut:
Nama tabel dalam katalog Kafka JSON sesuai dengan nama topik Kafka. Anda tidak perlu mendaftarkan tabel Kafka secara manual menggunakan pernyataan Data Definition Language (DDL). Hal ini meningkatkan efisiensi dan akurasi pengembangan.
Tabel yang disediakan oleh katalog Kafka JSON dapat langsung digunakan sebagai tabel sumber dalam pekerjaan Flink SQL.
Anda dapat menggunakan katalog Kafka JSON bersama pernyataan CREATE TABLE AS (CTAS) untuk menyinkronkan data beserta perubahan skemanya.
Topik ini menjelaskan cara mengelola katalog Kafka JSON:
Batasan
Katalog Kafka JSON hanya mendukung topik yang berisi pesan dalam format JSON.
Hanya mesin komputasi yang menggunakan VVR 6.0.2 atau versi lebih baru yang mendukung katalog Kafka JSON.
CatatanJika Anda menggunakan VVR 4.x, Anda harus melakukan upgrade pekerjaan ke VVR 6.0.2 atau versi lebih baru sebelum dapat menggunakan katalog Kafka JSON.
Anda tidak dapat memodifikasi katalog Kafka JSON yang sudah ada menggunakan pernyataan DDL.
Anda hanya dapat melakukan kueri pada tabel data. Anda tidak dapat membuat, memodifikasi, atau menghapus database atau tabel.
CatatanDalam skenario CREATE DATABASE AS (CDAS) atau CREATE TABLE AS (CTAS) yang menggunakan katalog Kafka JSON, topik dapat dibuat secara otomatis.
Katalog Kafka JSON tidak dapat membaca dari atau menulis ke kluster Kafka yang telah mengaktifkan autentikasi SSL atau SASL.
Tabel yang disediakan oleh katalog Kafka JSON dapat langsung digunakan sebagai tabel sumber dalam pekerjaan Flink SQL. Tabel tersebut tidak dapat digunakan sebagai tabel sink atau tabel dimensi lookup.
ApsaraMQ for Kafka saat ini tidak memungkinkan Anda menghapus groups menggunakan operasi API yang sama seperti Apache Kafka open source. Saat membuat katalog Kafka JSON, Anda harus mengonfigurasi parameter aliyun.kafka.instanceId, aliyun.kafka.accessKeyId, aliyun.kafka.accessKeySecret, aliyun.kafka.endpoint, dan aliyun.kafka.regionId untuk menghapus kelompok konsumen secara otomatis. Untuk informasi selengkapnya, lihat Perbandingan antara ApsaraMQ for Kafka dan Apache Kafka open source.
Pencegahan
Katalog Kafka JSON menghasilkan skema tabel dengan mengurai data sampel. Untuk topik dengan format data yang tidak konsisten, katalog akan mengembalikan skema terluas yang mungkin, dengan mempertahankan semua kolom secara default. Jika format data suatu topik berubah, skema tabel yang diinferensi oleh katalog dapat menjadi tidak konsisten pada waktu yang berbeda. Jika skema yang diinferensi berbeda sebelum dan sesudah restart pekerjaan, masalah eksekusi pekerjaan dapat terjadi.
Sebagai contoh, pekerjaan Flink SQL mereferensikan tabel dalam katalog Kafka JSON. Jika pekerjaan tersebut direstart dari titik simpan setelah berjalan selama periode tertentu, skema baru yang berbeda dari skema yang digunakan pada eksekusi sebelumnya mungkin diambil. Namun, rencana eksekusi pekerjaan tetap menggunakan versi yang dihasilkan dengan skema lama. Hal ini dapat menyebabkan ketidaksesuaian pada komponen downstream, seperti kondisi filter atau nilai bidang. Untuk mencegah hal ini, Anda dapat membuat tabel Kafka menggunakan pernyataan CREATE TEMPORARY TABLE dalam pekerjaan Flink SQL Anda untuk menetapkan skema tetap.
Buat katalog JSON Kafka
Di editor teks pada tab Data Exploration, masukkan pernyataan untuk mengonfigurasi katalog Kafka JSON.
Kluster Kafka yang dikelola sendiri atau kluster EMR Kafka
CREATE CATALOG <YourCatalogName> WITH( 'type'='kafka', 'properties.bootstrap.servers'='<brokers>', 'format'='json', 'default-database'='<dbName>', 'key.fields-prefix'='<keyPrefix>', 'value.fields-prefix'='<valuePrefix>', 'timestamp-format.standard'='<timestampFormat>', 'infer-schema.flatten-nested-columns.enable'='<flattenNestedColumns>', 'infer-schema.primitive-as-string'='<primitiveAsString>', 'infer-schema.parse-key-error.field-name'='<parseKeyErrorFieldName>', 'infer-schema.compacted-topic-as-upsert-table'='true', 'max.fetch.records'='100' );CREATE CATALOG <YourCatalogName> WITH( 'type'='kafka', 'properties.bootstrap.servers'='<brokers>', 'format'='json', 'default-database'='<dbName>', 'key.fields-prefix'='<keyPrefix>', 'value.fields-prefix'='<valuePrefix>', 'timestamp-format.standard'='<timestampFormat>', 'infer-schema.flatten-nested-columns.enable'='<flattenNestedColumns>', 'infer-schema.primitive-as-string'='<primitiveAsString>', 'infer-schema.parse-key-error.field-name'='<parseKeyErrorFieldName>', 'infer-schema.compacted-topic-as-upsert-table'='true', 'max.fetch.records'='100', 'aliyun.kafka.accessKeyId'='<aliyunAccessKeyId>', 'aliyun.kafka.accessKeySecret'='<aliyunAccessKeySecret>', 'aliyun.kafka.instanceId'='<aliyunKafkaInstanceId>', 'aliyun.kafka.endpoint'='<aliyunKafkaEndpoint>', 'aliyun.kafka.regionId'='<aliyunKafkaRegionId>' );
Parameter
Type
Deskripsi
Diperlukan
Catatan
YourCatalogName
String
Nama untuk katalog Kafka JSON.
Ya
Masukkan nama kustom dalam bahasa Inggris.
PentingHapus tanda kurung sudut (<>) setelah Anda mengganti parameter dengan nama katalog Anda. Jika tidak, akan terjadi error pemeriksaan sintaks.
type
String
Tipe katalog.
Ya
Nilainya harus kafka.
properties.bootstrap.servers
String
Alamat broker Kafka.
Ya
Format:
host1:port1,host2:port2,host3:port3.Pisahkan beberapa alamat dengan koma (,).
format
String
Format pesan Kafka.
Ya
Saat ini, hanya JSON yang didukung. Flink mengurai pesan Kafka berformat JSON untuk mendapatkan skema.
default-database
String
Nama kluster Kafka.
Tidak
Nilai default-nya adalah kafka. Katalog memerlukan nama tiga bagian untuk menemukan tabel: catalog_name.db_name.table_name. Parameter ini menentukan db_name default. Karena Kafka tidak memiliki konsep database, Anda dapat menggunakan string apa pun untuk merepresentasikan kluster Kafka sebagai database.
key.fields-prefix
String
Awalan kustom yang ditambahkan ke nama bidang yang diurai dari kunci pesan. Hal ini menghindari konflik penamaan setelah kunci pesan Kafka diurai.
Tidak
Nilai default-nya adalah key_. Misalnya, jika bidang kunci Anda bernama a, sistem akan mengurai nama bidang tersebut menjadi key_a.
CatatanNilai parameter key.fields-prefix tidak boleh menjadi awalan dari nilai parameter value.fields-prefix. Misalnya, jika Anda menyetel value.fields-prefix ke test1_value_, Anda tidak boleh menyetel key.fields-prefix ke test1_.
value.fields-prefix
String
Awalan kustom yang ditambahkan ke nama bidang yang diurai dari isi pesan (value). Hal ini menghindari konflik penamaan setelah isi pesan Kafka diurai.
Tidak
Nilai default-nya adalah value_. Misalnya, jika bidang value Anda bernama b, sistem akan mengurai nama bidang tersebut menjadi value_b.
CatatanNilai parameter value.fields-prefix tidak boleh menjadi awalan dari nilai parameter key.fields-prefix. Misalnya, jika Anda menyetel key.fields-prefix ke test2_value_, Anda tidak boleh menyetel value.fields-prefix ke test2_.
timestamp-format.standard
String
Format untuk mengurai bidang bertipe Timestamp dalam pesan berformat JSON. Sistem pertama-tama mencoba mengurai menggunakan format yang Anda konfigurasikan. Jika penguraian gagal, sistem secara otomatis mencoba format lainnya.
Tidak
Nilai valid:
SQL (default)
ISO-8601
infer-schema.flatten-nested-columns.enable
Boolean
Menentukan apakah akan memperluas kolom bersarang secara rekursif dalam isi pesan JSON (value) selama penguraian.
Tidak
Nilai valid:
true: Perluas secara rekursif.
Untuk kolom yang diperluas, Flink menggunakan jalur yang mengindeks nilai sebagai nama. Misalnya, untuk kolom col dalam
{"nested": {"col": true}}, nama yang diperluas adalah nested.col.CatatanJika Anda menyetel parameter ini ke true, gunakan bersama pernyataan CREATE TABLE AS (CTAS). Pernyataan DML lainnya tidak mendukung perluasan otomatis kolom bersarang.
false (default): Perlakukan tipe bersarang sebagai String.
infer-schema.primitive-as-string
Boolean
Menentukan apakah akan menginferensi semua tipe data primitif sebagai tipe String saat mengurai isi pesan JSON (value).
Tidak
Nilai valid:
true: Inferensi semua tipe primitif sebagai String.
false (default): Inferensi tipe berdasarkan aturan dasar.
infer-schema.parse-key-error.field-name
String
Saat mengurai kunci pesan JSON, jika kunci tidak kosong dan penguraian gagal, bidang bertipe VARBINARY ditambahkan ke skema tabel untuk merepresentasikan data kunci pesan. Nama kolom merupakan gabungan dari awalan key.fields-prefix dan nilai parameter ini.
Tidak
Nilai default-nya adalah col. Misalnya, jika isi pesan diurai menjadi bidang bernama value_name, dan kunci pesan tidak kosong tetapi gagal diurai, skema yang dikembalikan berisi dua bidang: key_col dan value_name.
infer-schema.compacted-topic-as-upsert-table
Boolean
Menentukan apakah akan menggunakan tabel sebagai tabel Upsert Kafka ketika kebijakan pembersihan topik Kafka adalah compact dan kunci pesan tidak kosong.
Tidak
Nilai default-nya adalah true. Setel ke true saat menggunakan sintaks CTAS atau CDAS untuk menyinkronkan data ke ApsaraMQ for Kafka.
CatatanHanya mesin komputasi VVR 6.0.2 atau versi lebih baru yang mendukung parameter ini.
max.fetch.records
Int
Jumlah maksimum pesan yang dikonsumsi saat mengurai pesan berformat JSON.
Tidak
Nilai default-nya adalah 100.
aliyun.kafka.accessKeyId
String
ID AccessKey Akun Alibaba Cloud Anda. Untuk informasi selengkapnya, lihat Buat Pasangan Kunci Akses.
Tidak
Konfigurasikan parameter ini saat menggunakan sintaks CTAS atau CDAS untuk menyinkronkan data ke ApsaraMQ for Kafka.
CatatanHanya mesin komputasi VVR 6.0.2 atau versi lebih baru yang mendukung parameter ini.
aliyun.kafka.accessKeySecret
String
Rahasia AccessKey Akun Alibaba Cloud Anda. Untuk informasi selengkapnya, lihat Buat Pasangan Kunci Akses.
Tidak
Konfigurasikan parameter ini saat menggunakan sintaks CTAS atau CDAS untuk menyinkronkan data ke ApsaraMQ for Kafka.
CatatanHanya mesin komputasi VVR 6.0.2 atau versi lebih baru yang mendukung parameter ini.
aliyun.kafka.instanceId
String
ID instans ApsaraMQ for Kafka. Anda dapat melihat ID tersebut di halaman detail instans di Konsol ApsaraMQ for Kafka.
Tidak
Konfigurasikan parameter ini saat menggunakan sintaks CTAS atau CDAS untuk menyinkronkan data ke ApsaraMQ for Kafka.
CatatanHanya mesin komputasi VVR 6.0.2 atau versi lebih baru yang mendukung parameter ini.
aliyun.kafka.endpoint
String
Titik akhir API ApsaraMQ for Kafka. Untuk informasi selengkapnya, lihat Titik Akhir.
Tidak
Konfigurasikan parameter ini saat menggunakan sintaks CTAS atau CDAS untuk menyinkronkan data ke ApsaraMQ for Kafka.
CatatanHanya mesin komputasi VVR 6.0.2 atau versi lebih baru yang mendukung parameter ini.
aliyun.kafka.regionId
String
ID wilayah instans tempat topik berada. Untuk informasi selengkapnya, lihat Titik Akhir.
Tidak
Konfigurasikan parameter ini saat menggunakan sintaks CTAS atau CDAS untuk menyinkronkan data ke ApsaraMQ for Kafka.
CatatanHanya mesin komputasi VVR 6.0.2 atau versi lebih baru yang mendukung parameter ini.
Pilih pernyataan untuk membuat katalog, lalu klik Run di sebelah nomor baris di sebelah kiri.

Di area Metadata di sebelah kiri, Anda dapat melihat katalog yang telah dibuat.
Lihat katalog JSON Kafka
Di editor teks pada tab Data Exploration, masukkan pernyataan berikut.
DESCRIBE `${catalog_name}`.`${db_name}`.`${topic_name}`;Parameter
Deskripsi
${catalog_name}
Nama katalog JSON Kafka.
${db_name}
Nama kluster Kafka.
${topic_name}
Nama topik Kafka.
Pilih pernyataan untuk melihat katalog, lalu klik Run di sebelah nomor baris di sebelah kiri.
Setelah pernyataan berhasil dijalankan, Anda dapat melihat detail tabel di hasil eksekusi.

Gunakan katalog JSON Kafka
Gunakan katalog sebagai tabel sumber untuk membaca data dari topik Kafka.
INSERT INTO ${other_sink_table} SELECT... FROM `${kafka_catalog}`.`${db_name}`.`${topic_name}`/*+OPTIONS('scan.startup.mode'='earliest-offset')*/;CatatanUntuk menentukan parameter WITH lainnya saat menggunakan tabel dari katalog Kafka JSON, Anda dapat menggunakan Petunjuk SQL untuk menambahkannya. Sebagai contoh, pernyataan SQL di atas menggunakan Petunjuk SQL untuk menentukan bahwa konsumsi dimulai dari data paling awal. Untuk informasi selengkapnya tentang parameter lainnya, lihat Tabel sumber ApsaraMQ for Kafka dan Tabel sink ApsaraMQ for Kafka.
Anda dapat menggunakan pernyataan CREATE TABLE AS (CTAS) untuk menyinkronkan data dari topik Kafka ke tabel target, dengan menggunakan topik Kafka sebagai tabel sumber.
Menyinkronkan satu tabel secara real-time.
CREATE TABLE IF NOT EXISTS `${target_table_name}` WITH(...) AS TABLE `${kafka_catalog}`.`${db_name}`.`${topic_name}` /*+OPTIONS('scan.startup.mode'='earliest-offset')*/;Menyinkronkan beberapa tabel dalam satu pekerjaan.
BEGIN STATEMENT SET; CREATE TABLE IF NOT EXISTS `some_catalog`.`some_database`.`some_table0` AS TABLE `kafka-catalog`.`kafka`.`topic0` /*+ OPTIONS('scan.startup.mode'='earliest-offset') */; CREATE TABLE IF NOT EXISTS `some_catalog`.`some_database`.`some_table1` AS TABLE `kafka-catalog`.`kafka`.`topic1` /*+ OPTIONS('scan.startup.mode'='earliest-offset') */; CREATE TABLE IF NOT EXISTS `some_catalog`.`some_database`.`some_table2` AS TABLE `kafka-catalog`.`kafka`.`topic2` /*+ OPTIONS('scan.startup.mode'='earliest-offset') */; END;Dengan katalog Kafka JSON, Anda dapat menyinkronkan beberapa tabel Kafka dalam satu tugas yang sama. Namun, kondisi berikut harus dipenuhi:
Parameter topic-pattern tidak dikonfigurasi untuk salah satu tabel Kafka.
Konfigurasi Kafka untuk setiap tabel harus identik. Artinya, semua pengaturan properties.*, termasuk properties.bootstrap.servers dan properties.group.id, harus sama.
Pengaturan scan.startup.mode untuk setiap tabel harus identik. Nilainya hanya dapat diatur ke group-offsets, latest-offset, atau earliest-offset.
Sebagai contoh, pada gambar berikut, dua tabel di atas memenuhi kondisi tersebut, sedangkan dua tabel di bawahnya melanggarnya.

Untuk contoh lengkap end-to-end penggunaan katalog Kafka JSON, lihat Panduan Cepat untuk gudang log real-time.
Hapus katalog Kafka JSON
Menghapus katalog Kafka JSON tidak memengaruhi pekerjaan yang sedang berjalan. Namun, pekerjaan yang menggunakan tabel dari katalog tersebut akan gagal dengan error "table not found" saat dipublikasikan atau direstart. Lakukan dengan hati-hati.
Di editor teks pada tab Data Exploration, masukkan pernyataan berikut.
DROP CATALOG ${catalog_name};Ganti ${catalog_name} dengan nama katalog Kafka JSON yang ingin Anda hapus.
Pilih pernyataan untuk menghapus katalog, klik kanan, lalu pilih Run.
Di area Metadata di sebelah kiri, periksa apakah katalog telah dihapus.
Detail informasi tabel dari katalog Kafka JSON
Untuk mempermudah penggunaan tabel dari katalog Kafka JSON, katalog tersebut menambahkan parameter konfigurasi default, metadata, dan informasi kunci utama ke tabel yang diinferensi. Bagian berikut menjelaskan detail informasi tabel dari katalog Kafka JSON:
Inferensi Skema untuk Tabel Kafka
Saat katalog Kafka JSON mengurai pesan berformat JSON untuk mengambil skema topik, katalog tersebut mencoba mengonsumsi hingga max.fetch.records pesan. Katalog tersebut mengurai skema setiap catatan data sesuai dengan aturan dasar yang sama seperti saat Kafka digunakan sebagai sumber data CTAS. Kemudian, katalog tersebut menggabungkan skema-skema tersebut untuk membentuk skema akhir.
PentingSaat katalog Kafka JSON melakukan inferensi skema, katalog tersebut membuat kelompok konsumen untuk mengonsumsi data topik. Nama kelompok konsumen menggunakan awalan untuk menunjukkan bahwa kelompok tersebut dibuat oleh katalog.
Untuk ApsaraMQ for Kafka, gunakan katalog Kafka JSON dengan VVR 6.0.7 atau versi lebih baru. Pada versi sebelum 6.0.7, kelompok konsumen tidak dihapus secara otomatis. Hal ini dapat menyebabkan peringatan karena penumpukan pesan di kelompok konsumen.
Skema tersebut terutama mencakup bagian-bagian berikut:
Kolom Fisik yang Diinferensi
Katalog Kafka JSON melakukan inferensi kolom fisik pesan dari kunci pesan Kafka dan isi pesan (value). Awalan yang sesuai ditambahkan ke nama kolom.
Jika kunci pesan tidak kosong tetapi tidak dapat diurai, kolom bertipe VARBINARY dibuat. Nama kolom merupakan gabungan dari awalan key.fields-prefix dan nilai parameter infer-schema.parse-key-error.field-name.
Setelah menarik sekelompok pesan Kafka, katalog tersebut mengurai setiap pesan dan menggabungkan kolom fisik yang diurai untuk membuat skema seluruh topik. Aturan penggabungan adalah sebagai berikut:
Jika kolom fisik yang diurai berisi bidang yang tidak ada dalam skema hasil, katalog Kafka JSON secara otomatis menambahkan bidang tersebut ke skema hasil.
Jika kolom dengan nama yang sama muncul, penanganannya berdasarkan skenario berikut:
Jika tipenya sama tetapi presisinya berbeda, tipe dengan presisi lebih tinggi yang digunakan.
Jika tipenya berbeda, sistem menemukan node induk terkecil dalam struktur pohon yang ditunjukkan pada gambar berikut dan menggunakannya sebagai tipe untuk kolom dengan nama yang sama. Namun, saat tipe Decimal dan Float digabung, keduanya dikonversi ke tipe Double untuk mempertahankan presisi.

Sebagai contoh, untuk topik Kafka yang berisi tiga catatan data di bawah ini, katalog Kafka JSON menghasilkan skema seperti yang ditunjukkan pada gambar.

Kolom Metadata Default
Katalog Kafka JSON menambahkan tiga kolom metadata berguna secara default: partition, offset, dan timestamp. Detailnya ditunjukkan pada tabel berikut.
Nama Metadata
Nama Kolom
Tipe
Deskripsi
partition
partition
INT NOT NULL
Nilai partisi.
offset
offset
BIGINT NOT NULL
Offset.
timestamp
timestamp
TIMESTAMP_LTZ(3) NOT NULL
Timestamp pesan.
Kendala KUNCI UTAMA Default
Saat tabel dari katalog Kafka JSON dikonsumsi sebagai tabel sumber, kolom metadata partition dan offset digunakan sebagai kunci utama secara default untuk memastikan keunikan data.
CatatanJika skema tabel yang diinferensi oleh katalog Kafka JSON tidak memenuhi kebutuhan Anda, Anda dapat mendeklarasikan tabel temporary menggunakan sintaks CREATE TEMPORARY TABLE ... LIKE untuk menentukan skema tabel yang diinginkan. Sebagai contoh, jika data JSON berisi bidang bernama ts dalam format '2023-01-01 12:00:01', katalog Kafka JSON secara otomatis menginferensi bidang ts sebagai tipe TIMESTAMP. Jika Anda ingin menggunakan bidang ts sebagai tipe STRING, Anda dapat mendeklarasikan tabel menggunakan sintaks CREATE TEMPORARY TABLE ... LIKE. Seperti yang ditunjukkan pada contoh berikut, karena awalan value_ ditambahkan ke bidang isi pesan dalam konfigurasi default, nama bidangnya menjadi value_ts.
CREATE TEMPORARY TABLE tempTable ( value_name STRING, value_ts STRING ) LIKE `kafkaJsonCatalog`.`kafka`.`testTopic`;Parameter Tabel Default
Parameter
Deskripsi
Catatan
connector
Tipe konektor.
Nilainya harus kafka atau upsert-kafka.
topic
Nama topik yang sesuai.
Nama tabel.
properties.bootstrap.servers
Alamat broker Kafka.
Sesuai dengan nilai parameter properties.bootstrap.servers dalam konfigurasi katalog.
value.format
Format yang digunakan oleh konektor Flink Kafka untuk serialisasi atau deserialisasi isi pesan Kafka (value).
Nilainya tetap JSON.
value.fields-prefix
Menentukan awalan kustom untuk semua bidang isi pesan Kafka (value) untuk menghindari konflik nama dengan bidang kunci pesan atau metadata.
Sesuai dengan nilai parameter value.fields-prefix dalam konfigurasi katalog.
value.json.infer-schema.flatten-nested-columns.enable
Menentukan apakah akan memperluas kolom bersarang secara rekursif dalam JSON isi pesan Kafka (value).
Sesuai dengan nilai parameter infer-schema.flatten-nested-columns.enable dalam konfigurasi katalog.
value.json.infer-schema.primitive-as-string
Menentukan apakah akan menginferensi semua tipe data primitif sebagai tipe String untuk isi pesan Kafka (value).
Sesuai dengan nilai parameter infer-schema.primitive-as-string dalam konfigurasi katalog.
value.fields-include
Menentukan kebijakan penanganan bidang kunci pesan dalam isi pesan.
Nilainya harus EXCEPT_KEY. Ini menunjukkan bahwa isi pesan tidak berisi bidang kunci pesan.
Anda harus mengonfigurasi parameter ini jika kunci pesan tidak kosong. Jangan konfigurasi parameter ini jika kunci pesan kosong.
key.format
Format yang digunakan oleh konektor Flink Kafka untuk serialisasi atau deserialisasi kunci pesan Kafka.
Nilainya harus json atau raw.
Anda harus mengonfigurasi parameter ini jika kunci pesan tidak kosong. Jangan konfigurasi parameter ini jika kunci pesan kosong.
Jika kunci pesan tidak kosong tetapi tidak dapat diurai, setel parameter ini ke raw. Jika penguraian berhasil, setel parameter ini ke json.
key.fields-prefix
Menentukan awalan kustom untuk semua bidang kunci pesan Kafka untuk menghindari konflik nama dengan bidang format isi pesan.
Sesuai dengan nilai parameter key.fields-prefix dalam konfigurasi katalog.
Anda harus mengonfigurasi parameter ini jika kunci pesan tidak kosong. Jangan konfigurasi parameter ini jika kunci pesan kosong.
key.fields
Bidang tempat data yang diurai dari kunci pesan Kafka disimpan.
Daftar bidang kunci yang diurai diisi secara otomatis.
Anda harus mengonfigurasi parameter ini jika kunci pesan tidak kosong dan tabel bukan tabel Upsert Kafka. Jika tidak, jangan konfigurasi parameter ini.