Setelah membuat katalog JSON Kafka, Anda dapat mengakses topik berformat JSON dari kluster Kafka di konsol pengembangan Realtime Compute for Apache Flink tanpa perlu mendefinisikan skema. Topik ini menjelaskan cara membuat, melihat, menggunakan, dan menghapus katalog JSON Kafka di konsol tersebut.
Informasi latar belakang
Katalog JSON Kafka secara otomatis mengurai pesan berformat JSON untuk menyimpulkan skema dari sebuah topik. Dengan demikian, Anda dapat menggunakan katalog JSON untuk mendapatkan bidang spesifik dari pesan tanpa perlu mendeklarasikan skema tabel Kafka dalam Flink SQL. Saat menggunakan katalog JSON Kafka, perhatikan poin-poin berikut:
Nama tabel katalog JSON Kafka sesuai dengan nama topik kluster Kafka. Dengan cara ini, Anda tidak perlu mengeksekusi pernyataan DDL untuk mendaftarkan tabel Kafka untuk mengakses topik kluster Kafka. Ini meningkatkan efisiensi dan akurasi pengembangan data.
Tabel katalog JSON Kafka dapat digunakan sebagai tabel sumber dalam penyebaran Flink SQL.
Anda dapat menggunakan katalog JSON Kafka bersama dengan pernyataan CREATE TABLE AS untuk menyinkronkan perubahan skema.
Topik ini menjelaskan operasi yang dapat Anda lakukan untuk mengelola katalog JSON Kafka:
Batasan
Katalog JSON Kafka hanya mendukung topik berformat JSON.
Hanya Realtime Compute for Apache Flink yang menggunakan Ververica Runtime (VVR) 6.0.2 atau lebih baru yang mendukung katalog JSON Kafka.
CatatanJika penyebaran Anda menggunakan VVR 4.X, kami sarankan Anda memperbarui versi VVR penyebaran Anda ke VVR 6.0.2 atau lebih baru sebelum menggunakan katalog JSON Kafka.
Anda tidak dapat memodifikasi katalog JSON Kafka yang ada dengan mengeksekusi pernyataan DDL.
Anda hanya dapat menanyakan tabel data menggunakan katalog JSON Kafka. Anda tidak diperbolehkan membuat, memodifikasi, atau menghapus database dan tabel menggunakan katalog JSON Kafka.
CatatanJika Anda menggunakan katalog JSON Kafka bersama dengan pernyataan CREATE DATABASE AS atau pernyataan CREATE TABLE AS, topik dapat dibuat secara otomatis.
Anda tidak dapat menggunakan katalog JSON Kafka untuk membaca data dari atau menulis data ke kluster Kafka yang memiliki autentikasi berbasis SSL atau Simple Authentication and Security Layer (SASL) diaktifkan.
Tabel katalog JSON Kafka dapat digunakan sebagai tabel sumber dalam penyebaran Flink SQL tetapi tidak dapat digunakan sebagai tabel hasil atau tabel pencarian yang digunakan sebagai tabel dimensi.
ApsaraMQ for Kafka tidak mengizinkan Anda memanggil Operasi API yang sama dengan Operasi API yang digunakan oleh Apache Kafka untuk menghapus grup. Saat Anda membuat katalog JSON Kafka, Anda harus mengonfigurasi parameter aliyun.kafka.instanceId, aliyun.kafka.accessKeyId, aliyun.kafka.accessKeySecret, aliyun.kafka.endpoint, dan aliyun.kafka.regionId untuk menghapus ID grup secara otomatis. Untuk informasi lebih lanjut, lihat Perbandingan antara ApsaraMQ for Kafka dan Apache Kafka open source.
Pencegahan
Katalog JSON Kafka mengurai data sampel untuk menghasilkan skema tabel. Untuk topik yang memiliki format data tidak konsisten, katalog JSON Kafka secara otomatis mempertahankan skema yang berisi semua bidang di semua kolom. Jika format data topik berubah, skema tabel yang dihasilkan oleh katalog JSON Kafka mungkin tidak konsisten pada titik waktu yang berbeda. Akibatnya, kesalahan saat menjalankan penyebaran mungkin terjadi jika skema berbeda disimpulkan sebelum dan sesudah penyebaran di-restart. Sebagai contoh, jika penyebaran SQL Realtime Compute for Apache Flink yang merujuk tabel dalam katalog JSON Kafka berjalan selama periode waktu tertentu dan kemudian di-restart dari savepoint, skema tabel yang berbeda dari skema tabel dari run terakhir mungkin dihasilkan. Penyebaran SQL menggunakan skema tabel yang dihasilkan oleh run terakhir. Akibatnya, objek seperti kondisi filter dan nilai bidang di penyimpanan hilir mungkin tidak cocok. Oleh karena itu, kami sarankan Anda menggunakan Create Temporary Table untuk membuat tabel Kafka dalam penyebaran SQL. Dalam hal ini, penyebaran SQL dapat menggunakan skema tabel tetap.
Buat katalog JSON Kafka
Di editor kode tab Scripts pada halaman Scripts, masukkan pernyataan berikut untuk membuat katalog JSON Kafka:
Pernyataan yang digunakan untuk membuat katalog JSON Kafka untuk kluster Kafka yang dikelola sendiri atau kluster E-MapReduce (EMR) Kafka
CREATE CATALOG <YourCatalogName> WITH( 'type'='kafka', 'properties.bootstrap.servers'='<brokers>', 'format'='json', 'default-database'='<dbName>', 'key.fields-prefix'='<keyPrefix>', 'value.fields-prefix'='<valuePrefix>', 'timestamp-format.standard'='<timestampFormat>', 'infer-schema.flatten-nested-columns.enable'='<flattenNestedColumns>', 'infer-schema.primitive-as-string'='<primitiveAsString>', 'infer-schema.parse-key-error.field-name'='<parseKeyErrorFieldName>', 'infer-schema.compacted-topic-as-upsert-table'='true', 'max.fetch.records'='100' );Pernyataan yang digunakan untuk membuat katalog JSON Kafka untuk instance ApsaraMQ for Kafka
CREATE CATALOG <YourCatalogName> WITH( 'type'='kafka', 'properties.bootstrap.servers'='<brokers>', 'format'='json', 'default-database'='<dbName>', 'key.fields-prefix'='<keyPrefix>', 'value.fields-prefix'='<valuePrefix>', 'timestamp-format.standard'='<timestampFormat>', 'infer-schema.flatten-nested-columns.enable'='<flattenNestedColumns>', 'infer-schema.primitive-as-string'='<primitiveAsString>', 'infer-schema.parse-key-error.field-name'='<parseKeyErrorFieldName>', 'infer-schema.compacted-topic-as-upsert-table'='true', 'max.fetch.records'='100', 'aliyun.kafka.accessKeyId'='<aliyunAccessKeyId>', 'aliyun.kafka.accessKeySecret'='<aliyunAccessKeySecret>', 'aliyun.kafka.instanceId'='<aliyunKafkaInstanceId>', 'aliyun.kafka.endpoint'='<aliyunKafkaEndpoint>', 'aliyun.kafka.regionId'='<aliyunKafkaRegionId>' );
Parameter
Tipe Data
Deskripsi
Diperlukan
Catatan
YourCatalogName
STRING
Nama katalog JSON Kafka.
Ya
Masukkan nama kustom.
PentingAnda harus menghapus tanda kurung sudut (<>) saat mengganti nilai parameter dengan nama katalog Anda. Jika tidak, kesalahan akan dikembalikan selama pemeriksaan sintaksis.
type
STRING
Tipe katalog.
Ya
Atur nilainya menjadi kafka.
properties.bootstrap.servers
STRING
Alamat IP atau endpoint broker Kafka.
Ya
Format:
host1:port1,host2:port2,host3:port3.Pisahkan beberapa pasangan host:port dengan koma (,).
format
STRING
Format pesan Kafka.
Ya
Hanya format JSON yang didukung. Realtime Compute for Apache Flink mengurai pesan Kafka berformat JSON untuk mendapatkan skema.
default-database
STRING
Nama kluster Kafka.
Tidak
Nilai default: kafka. Katalog mendefinisikan tabel berdasarkan catalog_name.db_name.table_name. Nilai default db_name digunakan dalam catalog_name.db_name.table_name. Kafka tidak menyediakan database. Anda dapat menggunakan string untuk mengubah nilai db_name untuk kluster Kafka.
key.fields-prefix
STRING
Awalan yang ditambahkan ke bidang yang diurai dari bidang kunci dalam pesan Kafka. Anda dapat mengonfigurasi parameter ini untuk mencegah konflik nama setelah bidang kunci dalam pesan Kafka diurai.
Tidak
Nilai default: key_. Jika nama bidang kunci adalah a, nama kunci yang diperoleh setelah bidang kunci dalam pesan Kafka diurai adalah key_a.
CatatanNilai parameter key.fields-prefix tidak boleh sama dengan awalan dalam nilai parameter value.fields-prefix. Sebagai contoh, jika parameter value.fields-prefix diatur ke test1_value_, Anda tidak dapat mengatur parameter key.fields-prefix ke test1_.
value.fields-prefix
STRING
Awalan yang ditambahkan ke bidang yang diurai dari bidang nilai dalam pesan Kafka. Anda dapat mengonfigurasi parameter ini untuk mencegah konflik nama setelah bidang nilai dalam pesan Kafka diurai.
Tidak
Nilai default: value_. Jika nama bidang nilai adalah b, nama nilai yang diperoleh setelah bidang nilai dalam pesan Kafka diurai adalah value_b.
CatatanNilai parameter value.fields-prefix tidak boleh sama dengan awalan dalam nilai parameter key.fields-prefix. Sebagai contoh, jika parameter key.fields-prefix diatur ke test2_value_, Anda tidak dapat mengatur parameter value.fields-prefix ke test2_.
timestamp-format.standard
STRING
Format bidang tipe TIMESTAMP dalam pesan Kafka berformat JSON. Realtime Compute for Apache Flink mengurai bidang dalam format yang Anda konfigurasikan. Jika Realtime Compute for Apache Flink gagal mengurai bidang dalam format yang Anda konfigurasikan, Realtime Compute for Apache Flink mencoba mengurai bidang dalam format lain.
Tidak
Nilai valid:
SQL (nilai default)
ISO-8601
infer-schema.flatten-nested-columns.enable
BOOLEAN
Menentukan apakah akan memperluas kolom bersarang secara rekursif dalam teks JSON saat bidang nilai dalam pesan Kafka berformat JSON diurai.
Tidak
Nilai valid:
true: Kolom bersarang diperluas secara rekursif.
Realtime Compute for Apache Flink menggunakan jalur yang mengindeks nilai kolom yang diperluas sebagai nama kolom. Sebagai contoh, kolom col dalam
{"nested": {"col": true}}dinamai nested.col setelah kolom diperluas.CatatanJika Anda mengatur parameter ini ke true, kami sarankan Anda menggunakan parameter ini bersama dengan pernyataan CREATE TABLE AS. Pernyataan DML lainnya tidak dapat digunakan untuk memperluas kolom bersarang secara otomatis.
false: Tipe bersarang diurai sebagai tipe STRING. Ini adalah nilai default.
infer-schema.primitive-as-string
BOOLEAN
Menentukan apakah akan menyimpulkan semua tipe dasar sebagai tipe STRING saat bidang nilai dalam pesan Kafka berformat JSON diurai.
Tidak
Nilai valid:
true: Semua tipe dasar disimpulkan sebagai tipe STRING.
false: Tipe data disimpulkan berdasarkan pemetaan tipe data. Ini adalah nilai default.
infer-schema.parse-key-error.field-name
STRING
Data bidang kunci. Saat bidang kunci dalam pesan Kafka berformat JSON diurai, jika bidang kunci ditentukan tetapi gagal diurai, kolom yang namanya adalah awalan key.fields-prefix dan nilai parameter ini ditambahkan ke skema tabel yang sesuai dengan topik. Kolom ini bertipe VARBINARY dan menunjukkan data bidang kunci.
Tidak
Nilai default: col. Sebagai contoh, jika bidang nilai dalam pesan Kafka berformat JSON diurai sebagai value_name dan bidang kunci ditentukan tetapi gagal diurai, skema tabel yang dikembalikan yang sesuai dengan topik berisi dua bidang: key_col dan value_name.
infer-schema.compacted-topic-as-upsert-table
BOOLEAN
Menentukan apakah akan menggunakan tabel sebagai Tabel Upsert Kafka saat kebijakan pembersihan log topik Kafka adalah kompak dan bidang kunci ditentukan.
Tidak
Nilai default: true. Anda harus mengatur parameter ini ke true saat mengeksekusi pernyataan CREATE TABLE AS atau pernyataan CREATE DATABASE AS untuk menyinkronkan data ke ApsaraMQ for Kafka.
CatatanHanya Realtime Compute for Apache Flink yang menggunakan VVR 6.0.2 atau lebih baru yang mendukung parameter ini.
max.fetch.records
INT
Jumlah maksimum pesan berformat JSON yang sistem coba konsumsi saat pesan tersebut diurai.
Tidak
Nilai default: 100.
aliyun.kafka.accessKeyId
STRING
ID AccessKey akun Alibaba Cloud Anda. Untuk informasi lebih lanjut tentang cara mendapatkan ID AccessKey, lihat Buat Pasangan AccessKey.
Tidak
Anda harus mengonfigurasi parameter ini saat mengeksekusi pernyataan CREATE TABLE AS atau pernyataan CREATE DATABASE AS untuk menyinkronkan data ke ApsaraMQ for Kafka.
CatatanHanya Realtime Compute for Apache Flink yang menggunakan VVR 6.0.2 atau lebih baru yang mendukung parameter ini.
aliyun.kafka.accessKeySecret
STRING
Rahasia AccessKey akun Alibaba Cloud Anda. Untuk informasi lebih lanjut tentang cara mendapatkan Rahasia AccessKey, lihat Buat Pasangan AccessKey.
Tidak
Anda harus mengonfigurasi parameter ini saat mengeksekusi pernyataan CREATE TABLE AS atau pernyataan CREATE DATABASE AS untuk menyinkronkan data ke ApsaraMQ for Kafka.
CatatanHanya Realtime Compute for Apache Flink yang menggunakan VVR 6.0.2 atau lebih baru yang mendukung parameter ini.
aliyun.kafka.instanceId
STRING
ID instance ApsaraMQ for Kafka. Anda dapat melihat ID tersebut di halaman Detail Instance konsol ApsaraMQ for Kafka.
Tidak
Anda harus mengonfigurasi parameter ini saat mengeksekusi pernyataan CREATE TABLE AS atau pernyataan CREATE DATABASE AS untuk menyinkronkan data ke ApsaraMQ for Kafka.
CatatanHanya Realtime Compute for Apache Flink yang menggunakan VVR 6.0.2 atau lebih baru yang mendukung parameter ini.
aliyun.kafka.endpoint
STRING
Endpoint ApsaraMQ for Kafka. Untuk informasi lebih lanjut, lihat Endpoints.
Tidak
Anda harus mengonfigurasi parameter ini saat mengeksekusi pernyataan CREATE TABLE AS atau pernyataan CREATE DATABASE AS untuk menyinkronkan data ke ApsaraMQ for Kafka.
CatatanHanya Realtime Compute for Apache Flink yang menggunakan VVR 6.0.2 atau lebih baru yang mendukung parameter ini.
aliyun.kafka.regionId
STRING
ID wilayah instance ApsaraMQ for Kafka tempat topik berada. Untuk informasi lebih lanjut, lihat Endpoints.
Tidak
Anda harus mengonfigurasi parameter ini saat mengeksekusi pernyataan CREATE TABLE AS atau pernyataan CREATE DATABASE AS untuk menyinkronkan data ke ApsaraMQ for Kafka.
CatatanHanya Realtime Compute for Apache Flink yang menggunakan VVR 6.0.2 atau lebih baru yang mendukung parameter ini.
Pilih kode yang digunakan untuk membuat katalog dan klik Run yang muncul di sisi kiri kode.

Di panel Catalogs di sisi kiri tab Catalogs, periksa katalog yang telah Anda buat.
Lihat katalog JSON Kafka
Di editor kode tab Scripts pada halaman Scripts, masukkan pernyataan berikut:
DESCRIBE `${catalog_name}`.`${db_name}`.`${topic_name}`;Parameter
Deskripsi
${catalog_name}
Nama katalog JSON Kafka.
${db_name}
Nama kluster Kafka.
${topic_name}
Nama topik Kafka.
Pilih kode yang digunakan untuk melihat katalog dan klik Run yang muncul di sisi kiri kode.
Setelah pernyataan dieksekusi, Anda dapat melihat informasi tentang tabel yang sesuai dengan topik dalam hasil.

Gunakan katalog JSON Kafka
Jika tabel katalog JSON Kafka digunakan sebagai tabel sumber, Anda dapat membaca data dari topik Kafka yang sesuai dengan tabel tersebut.
INSERT INTO ${other_sink_table} SELECT... FROM `${kafka_catalog}`.`${db_name}`.`${topic_name}`/*+OPTIONS('scan.startup.mode'='earliest-offset')*/;CatatanJika Anda perlu menentukan parameter lain dalam klausa WITH saat menggunakan katalog JSON Kafka, kami sarankan Anda menggunakan petunjuk SQL untuk menambahkan parameter lain. Dalam pernyataan SQL sebelumnya, petunjuk SQL digunakan untuk menentukan bahwa konsumsi dimulai dari data paling awal. Untuk informasi lebih lanjut tentang parameter lainnya, lihat Buat tabel sumber ApsaraMQ for Kafka dan Buat tabel hasil ApsaraMQ for Kafka.
Jika tabel katalog JSON Kafka digunakan sebagai tabel sumber Message Queue for Apache Kafka, Anda dapat menyinkronkan data dari topik Kafka yang sesuai dengan tabel ke tabel tujuan menggunakan pernyataan CREATE TABLE AS.
Sinkronkan data dari satu topik secara real-time.
CREATE TABLE IF NOT EXISTS `${target_table_name}` WITH(...) AS TABLE `${kafka_catalog}`.`${db_name}`.`${topic_name}` /*+OPTIONS('scan.startup.mode'='earliest-offset')*/;Sinkronkan data dari beberapa topik dalam penyebaran.
BEGIN STATEMENT SET; CREATE TABLE IF NOT EXISTS `some_catalog`.`some_database`.`some_table0` AS TABLE `kafka-catalog`.`kafka`.`topic0` /*+ OPTIONS('scan.startup.mode'='earliest-offset') */; CREATE TABLE IF NOT EXISTS `some_catalog`.`some_database`.`some_table1` AS TABLE `kafka-catalog`.`kafka`.`topic1` /*+ OPTIONS('scan.startup.mode'='earliest-offset') */; CREATE TABLE IF NOT EXISTS `some_catalog`.`some_database`.`some_table2` AS TABLE `kafka-catalog`.`kafka`.`topic2` /*+ OPTIONS('scan.startup.mode'='earliest-offset') */; END;Anda dapat menggunakan pernyataan CREATE TABLE AS bersama dengan katalog JSON Kafka untuk menyinkronkan data dari beberapa topik Kafka dalam penyebaran. Untuk menyinkronkan data dari beberapa topik Kafka dalam penyebaran, pastikan kondisi berikut terpenuhi:
topic-pattern tidak dikonfigurasikan untuk semua tabel yang sesuai dengan topik.
Nilai parameter Kafka di setiap tabel sama. Nilai parameter dengan awalan properties. sama. Parameter tersebut mencakup properties.bootstrap.servers dan properties.group.id.
Nilai parameter scan.startup.mode sama untuk semua tabel. Parameter scan.startup.mode hanya dapat diatur ke group-offsets, latest-offset, atau earliest-offset.
Gambar berikut menunjukkan contohnya. Pada gambar berikut, dua tabel di atas memenuhi kondisi sebelumnya dan dua tabel di bawah tidak memenuhi kondisi.

Untuk informasi lebih lanjut tentang cara menyinkronkan data dari tabel sumber dalam katalog JSON Kafka ke tabel tujuan dalam katalog tujuan, lihat Ingest data log ke gudang data secara real-time.
Hapus katalog JSON Kafka
Setelah Anda menghapus katalog JSON Kafka, penyebaran yang sedang berjalan tidak terpengaruh. Namun, penyebaran yang menggunakan tabel dari katalog tersebut tidak dapat lagi menemukan tabel saat penyebaran diterbitkan atau di-restart. Berhati-hatilah saat menghapus katalog JSON Kafka.
Di editor kode tab Scripts pada halaman Scripts, masukkan pernyataan berikut:
DROP CATALOG ${catalog_name};catalog_name menentukan nama katalog JSON Kafka yang ingin Anda hapus.
Klik kanan pernyataan yang digunakan untuk menghapus katalog dan pilih Run dari menu pintasan.
Lihat panel Catalogs di sisi kiri halaman Daftar Katalog untuk memeriksa apakah katalog telah dihapus.
Deskripsi inferensi skema
Untuk memudahkan Anda menggunakan tabel yang diperoleh setelah mengonfigurasi katalog JSON Kafka, sistem secara otomatis menambahkan parameter konfigurasi default, kolom metadata, dan kunci utama ke tabel. Bagian ini menjelaskan informasi tentang tabel yang diperoleh setelah Anda mengonfigurasi katalog JSON Kafka.
Skema Tabel
Saat pesan Kafka berformat JSON diurai untuk mendapatkan skema topik, sistem mencoba mengonsumsi pesan yang kurang dari atau sama dengan nilai parameter max.fetch.records. Sistem mengurai skema setiap catatan data dan menggabungkan skema tersebut sebagai skema topik. Sistem mengurai pesan berdasarkan pemetaan tipe data yang digunakan saat Anda menggunakan pernyataan CREATE TABLE AS untuk menyinkronkan data tabel Kafka.
PentingSaat katalog JSON Kafka digunakan untuk menyimpulkan skema topik, grup konsumen dibuat untuk mengonsumsi data topik. Jika nama grup konsumen mencakup awalan, grup konsumen dibuat menggunakan katalog.
Jika Anda ingin mendapatkan data dari tabel ApsaraMQ for Kafka, kami sarankan Anda menggunakan katalog JSON Kafka dari Realtime Compute for Apache Flink yang menggunakan VVR 6.0.7 atau lebih baru. Untuk Realtime Compute for Apache Flink yang menggunakan VVR versi lebih lama dari 6.0.7, grup konsumen tidak dihapus secara otomatis. Akibatnya, Anda mungkin menerima notifikasi peringatan tentang akumulasi pesan dalam grup konsumen.
Skema topik terdiri dari bagian-bagian berikut:
Kolom Fisik
Secara default, kolom fisik diurai berdasarkan bidang kunci dan nilai pesan Kafka. Awalan ditambahkan ke nama kolom yang diperoleh.
Jika bidang kunci ditentukan tetapi gagal diurai, kolom yang namanya adalah awalan key.fields-prefix dan nilai parameter infer-schema.parse-key-error.field-name dikembalikan. Tipe kolom adalah VARBINARY.
Setelah katalog JSON Kafka mendapatkan sekelompok pesan Kafka, katalog JSON Kafka mengurai pesan Kafka secara berurutan dan menggabungkan kolom fisik yang diperoleh setelah penguraian sebagai skema topik berdasarkan aturan berikut: Fungsi ini menggabungkan dokumen JSON berdasarkan aturan berikut:
Jika kolom fisik tertentu yang diperoleh setelah penguraian tidak ada dalam skema topik, katalog JSON Kafka secara otomatis menambahkan kolom tersebut ke skema topik.
Jika kolom fisik tertentu yang diperoleh setelah penguraian memiliki nama yang sama dengan kolom tertentu dalam skema topik, lakukan operasi berdasarkan skenario bisnis Anda:
Jika kolom-kolom tersebut memiliki tipe data yang sama tetapi presisi berbeda, katalog JSON Kafka menggabungkan kolom dengan presisi lebih besar.
Jika kolom-kolom tersebut memiliki tipe data yang berbeda, katalog JSON Kafka menggunakan simpul induk terkecil dalam struktur pohon yang ditunjukkan dalam gambar berikut sebagai tipe kolom yang memiliki nama yang sama. Jika kolom bertipe DECIMAL dan FLOAT digabungkan, kolom tersebut digabungkan menjadi tipe DOUBLE untuk mempertahankan presisi.

Sebagai contoh, jika topik Kafka berisi tiga catatan data, skema yang ditunjukkan dalam gambar berikut dikembalikan.

Kolom Metadata
Secara default, kolom metadata bernama partition, offset, dan timestamp ditambahkan. Tabel berikut menjelaskan kolom metadata.
Nama Metadata
Nama Kolom
Tipe
Deskripsi
partition
partition
INT NOT NULL
Nilai dalam kolom kunci partisi.
offset
offset
BIGINT NOT NULL
Offset.
timestamp
timestamp
TIMESTAMP_LTZ(3) NOT NULL
Timestamp pesan.
Aturan untuk Kunci Utama Default yang Ditambahkan
Jika tabel yang diperoleh setelah Anda mengonfigurasi katalog JSON Kafka dikonsumsi sebagai tabel sumber, kolom metadata partition dan offset digunakan sebagai kunci utama. Ini memastikan bahwa data tidak diduplikasi.
CatatanJika skema tabel yang disimpulkan dari katalog JSON Kafka tidak sesuai harapan, Anda dapat menggunakan sintaksis CREATE TEMPORARY TABLE ... LIKE untuk mendeklarasikan tabel sementara untuk menentukan skema tabel yang diinginkan. Sebagai contoh, data JSON berisi bidang ts dalam format '2023-01-01 12:00:01'. Katalog JSON Kafka secara otomatis menyimpulkan bidang ts sebagai tipe data TIMESTAMP. Jika Anda ingin bidang ts digunakan sebagai tipe data STRING, Anda dapat menggunakan sintaksis CREATE TEMPORARY TABLE... LIKE untuk mendeklarasikan tabel. Dalam kode sampel berikut, bidang value_ts digunakan karena awalan value_ ditambahkan ke bidang nilai dalam konfigurasi default.
CREATE TEMPORARY TABLE tempTable ( value_name STRING, value_ts STRING ) LIKE `kafkaJsonCatalog`.`kafka`.`testTopic`;Parameter Tabel yang Ditambahkan Secara Default
Parameter
Deskripsi
Catatan
connector
Tipe konektor.
Atur nilainya menjadi kafka atau upsert-kafka.
topic
Nama topik.
Atur nilainya menjadi nama tabel yang Anda deklarasikan dalam katalog JSON Kafka.
properties.bootstrap.servers
Alamat IP atau endpoint broker Kafka.
Atur nilainya sama dengan nilai parameter properties.bootstrap.servers dari katalog JSON Kafka.
value.format
Format yang digunakan oleh konektor Kafka Flink untuk meng-serialisasi atau meng-deserialisasi bidang nilai dalam pesan Kafka.
Atur nilainya menjadi json.
value.fields-prefix
Awalan kustom untuk semua bidang nilai dalam pesan Kafka. Anda dapat mengonfigurasi parameter ini untuk mencegah konflik nama dengan bidang kunci atau bidang metadata.
Atur nilainya sama dengan nilai parameter value.fields-prefix dari katalog JSON Kafka.
value.json.infer-schema.flatten-nested-columns.enable
Menentukan apakah akan memperluas kolom bersarang secara rekursif dalam teks JSON saat bidang nilai dalam pesan Kafka berformat JSON diurai.
Atur nilainya sama dengan nilai parameter infer-schema.flatten-nested-columns.enable dari katalog JSON Kafka.
value.json.infer-schema.primitive-as-string
Menentukan apakah akan menyimpulkan semua tipe dasar sebagai tipe STRING saat bidang nilai dalam pesan Kafka berformat JSON diurai.
Atur nilainya sama dengan nilai parameter infer-schema.primitive-as-string dari katalog JSON Kafka.
value.fields-include
Kebijakan yang digunakan untuk memproses bidang kunci saat bidang nilai diurai.
Atur nilainya menjadi EXCEPT_KEY. Jika parameter ini diatur ke EXCEPT_KEY, bidang kunci dikecualikan saat bidang nilai diurai.
Jika bidang kunci ditentukan, Anda harus mengonfigurasi parameter ini.
key.format
Format yang digunakan oleh konektor Kafka Flink untuk meng-serialisasi atau meng-deserialisasi bidang kunci dalam pesan Kafka.
Atur nilainya menjadi json atau raw.
Jika bidang kunci ditentukan, Anda harus mengonfigurasi parameter ini.
Jika bidang kunci ditentukan tetapi gagal diurai, atur nilai parameter ini menjadi raw. Jika bidang kunci ditentukan dan diurai, atur nilai parameter ini menjadi json.
key.fields-prefix
Awalan kustom untuk semua bidang kunci dalam pesan Kafka. Anda dapat mengonfigurasi parameter ini untuk mencegah konflik nama dengan bidang nilai.
Atur nilainya sama dengan nilai parameter key.fields-prefix dari katalog JSON Kafka.
Jika bidang kunci ditentukan, Anda harus mengonfigurasi parameter ini.
key.fields
Bidang yang diurai dari bidang kunci dalam pesan Kafka.
Sistem secara otomatis memasukkan bidang kunci dalam tabel.
Jika bidang kunci ditentukan dan tabel bukan tabel Upsert Kafka, Anda harus mengonfigurasi parameter ini.