全部产品
Search
文档中心

Realtime Compute for Apache Flink:Kelola Katalog JSON Kafka

更新时间:Jul 06, 2025

Setelah membuat katalog JSON Kafka, Anda dapat mengakses topik berformat JSON dari kluster Kafka di konsol pengembangan Realtime Compute for Apache Flink tanpa perlu mendefinisikan skema. Topik ini menjelaskan cara membuat, melihat, menggunakan, dan menghapus katalog JSON Kafka di konsol tersebut.

Informasi latar belakang

Katalog JSON Kafka secara otomatis mengurai pesan berformat JSON untuk menyimpulkan skema dari sebuah topik. Dengan demikian, Anda dapat menggunakan katalog JSON untuk mendapatkan bidang spesifik dari pesan tanpa perlu mendeklarasikan skema tabel Kafka dalam Flink SQL. Saat menggunakan katalog JSON Kafka, perhatikan poin-poin berikut:

  • Nama tabel katalog JSON Kafka sesuai dengan nama topik kluster Kafka. Dengan cara ini, Anda tidak perlu mengeksekusi pernyataan DDL untuk mendaftarkan tabel Kafka untuk mengakses topik kluster Kafka. Ini meningkatkan efisiensi dan akurasi pengembangan data.

  • Tabel katalog JSON Kafka dapat digunakan sebagai tabel sumber dalam penyebaran Flink SQL.

  • Anda dapat menggunakan katalog JSON Kafka bersama dengan pernyataan CREATE TABLE AS untuk menyinkronkan perubahan skema.

Topik ini menjelaskan operasi yang dapat Anda lakukan untuk mengelola katalog JSON Kafka:

Batasan

  • Katalog JSON Kafka hanya mendukung topik berformat JSON.

  • Hanya Realtime Compute for Apache Flink yang menggunakan Ververica Runtime (VVR) 6.0.2 atau lebih baru yang mendukung katalog JSON Kafka.

    Catatan

    Jika penyebaran Anda menggunakan VVR 4.X, kami sarankan Anda memperbarui versi VVR penyebaran Anda ke VVR 6.0.2 atau lebih baru sebelum menggunakan katalog JSON Kafka.

  • Anda tidak dapat memodifikasi katalog JSON Kafka yang ada dengan mengeksekusi pernyataan DDL.

  • Anda hanya dapat menanyakan tabel data menggunakan katalog JSON Kafka. Anda tidak diperbolehkan membuat, memodifikasi, atau menghapus database dan tabel menggunakan katalog JSON Kafka.

    Catatan

    Jika Anda menggunakan katalog JSON Kafka bersama dengan pernyataan CREATE DATABASE AS atau pernyataan CREATE TABLE AS, topik dapat dibuat secara otomatis.

  • Anda tidak dapat menggunakan katalog JSON Kafka untuk membaca data dari atau menulis data ke kluster Kafka yang memiliki autentikasi berbasis SSL atau Simple Authentication and Security Layer (SASL) diaktifkan.

  • Tabel katalog JSON Kafka dapat digunakan sebagai tabel sumber dalam penyebaran Flink SQL tetapi tidak dapat digunakan sebagai tabel hasil atau tabel pencarian yang digunakan sebagai tabel dimensi.

  • ApsaraMQ for Kafka tidak mengizinkan Anda memanggil Operasi API yang sama dengan Operasi API yang digunakan oleh Apache Kafka untuk menghapus grup. Saat Anda membuat katalog JSON Kafka, Anda harus mengonfigurasi parameter aliyun.kafka.instanceId, aliyun.kafka.accessKeyId, aliyun.kafka.accessKeySecret, aliyun.kafka.endpoint, dan aliyun.kafka.regionId untuk menghapus ID grup secara otomatis. Untuk informasi lebih lanjut, lihat Perbandingan antara ApsaraMQ for Kafka dan Apache Kafka open source.

Pencegahan

Katalog JSON Kafka mengurai data sampel untuk menghasilkan skema tabel. Untuk topik yang memiliki format data tidak konsisten, katalog JSON Kafka secara otomatis mempertahankan skema yang berisi semua bidang di semua kolom. Jika format data topik berubah, skema tabel yang dihasilkan oleh katalog JSON Kafka mungkin tidak konsisten pada titik waktu yang berbeda. Akibatnya, kesalahan saat menjalankan penyebaran mungkin terjadi jika skema berbeda disimpulkan sebelum dan sesudah penyebaran di-restart. Sebagai contoh, jika penyebaran SQL Realtime Compute for Apache Flink yang merujuk tabel dalam katalog JSON Kafka berjalan selama periode waktu tertentu dan kemudian di-restart dari savepoint, skema tabel yang berbeda dari skema tabel dari run terakhir mungkin dihasilkan. Penyebaran SQL menggunakan skema tabel yang dihasilkan oleh run terakhir. Akibatnya, objek seperti kondisi filter dan nilai bidang di penyimpanan hilir mungkin tidak cocok. Oleh karena itu, kami sarankan Anda menggunakan Create Temporary Table untuk membuat tabel Kafka dalam penyebaran SQL. Dalam hal ini, penyebaran SQL dapat menggunakan skema tabel tetap.

Buat katalog JSON Kafka

  1. Di editor kode tab Scripts pada halaman Scripts, masukkan pernyataan berikut untuk membuat katalog JSON Kafka:

    • Pernyataan yang digunakan untuk membuat katalog JSON Kafka untuk kluster Kafka yang dikelola sendiri atau kluster E-MapReduce (EMR) Kafka

      CREATE CATALOG <YourCatalogName> WITH(
       'type'='kafka',
       'properties.bootstrap.servers'='<brokers>',
       'format'='json',
       'default-database'='<dbName>',
       'key.fields-prefix'='<keyPrefix>',
       'value.fields-prefix'='<valuePrefix>',
       'timestamp-format.standard'='<timestampFormat>',
       'infer-schema.flatten-nested-columns.enable'='<flattenNestedColumns>',
       'infer-schema.primitive-as-string'='<primitiveAsString>',
       'infer-schema.parse-key-error.field-name'='<parseKeyErrorFieldName>',
       'infer-schema.compacted-topic-as-upsert-table'='true',
       'max.fetch.records'='100'
      );
    • Pernyataan yang digunakan untuk membuat katalog JSON Kafka untuk instance ApsaraMQ for Kafka

      CREATE CATALOG <YourCatalogName> WITH(
       'type'='kafka',
       'properties.bootstrap.servers'='<brokers>',
       'format'='json',
       'default-database'='<dbName>',
       'key.fields-prefix'='<keyPrefix>',
       'value.fields-prefix'='<valuePrefix>',
       'timestamp-format.standard'='<timestampFormat>',
       'infer-schema.flatten-nested-columns.enable'='<flattenNestedColumns>',
       'infer-schema.primitive-as-string'='<primitiveAsString>',
       'infer-schema.parse-key-error.field-name'='<parseKeyErrorFieldName>',
       'infer-schema.compacted-topic-as-upsert-table'='true',
       'max.fetch.records'='100',
       'aliyun.kafka.accessKeyId'='<aliyunAccessKeyId>',
       'aliyun.kafka.accessKeySecret'='<aliyunAccessKeySecret>',
       'aliyun.kafka.instanceId'='<aliyunKafkaInstanceId>',
       'aliyun.kafka.endpoint'='<aliyunKafkaEndpoint>',
       'aliyun.kafka.regionId'='<aliyunKafkaRegionId>'
      );

    Parameter

    Tipe Data

    Deskripsi

    Diperlukan

    Catatan

    YourCatalogName

    STRING

    Nama katalog JSON Kafka.

    Ya

    Masukkan nama kustom.

    Penting

    Anda harus menghapus tanda kurung sudut (<>) saat mengganti nilai parameter dengan nama katalog Anda. Jika tidak, kesalahan akan dikembalikan selama pemeriksaan sintaksis.

    type

    STRING

    Tipe katalog.

    Ya

    Atur nilainya menjadi kafka.

    properties.bootstrap.servers

    STRING

    Alamat IP atau endpoint broker Kafka.

    Ya

    Format: host1:port1,host2:port2,host3:port3.

    Pisahkan beberapa pasangan host:port dengan koma (,).

    format

    STRING

    Format pesan Kafka.

    Ya

    Hanya format JSON yang didukung. Realtime Compute for Apache Flink mengurai pesan Kafka berformat JSON untuk mendapatkan skema.

    default-database

    STRING

    Nama kluster Kafka.

    Tidak

    Nilai default: kafka. Katalog mendefinisikan tabel berdasarkan catalog_name.db_name.table_name. Nilai default db_name digunakan dalam catalog_name.db_name.table_name. Kafka tidak menyediakan database. Anda dapat menggunakan string untuk mengubah nilai db_name untuk kluster Kafka.

    key.fields-prefix

    STRING

    Awalan yang ditambahkan ke bidang yang diurai dari bidang kunci dalam pesan Kafka. Anda dapat mengonfigurasi parameter ini untuk mencegah konflik nama setelah bidang kunci dalam pesan Kafka diurai.

    Tidak

    Nilai default: key_. Jika nama bidang kunci adalah a, nama kunci yang diperoleh setelah bidang kunci dalam pesan Kafka diurai adalah key_a.

    Catatan

    Nilai parameter key.fields-prefix tidak boleh sama dengan awalan dalam nilai parameter value.fields-prefix. Sebagai contoh, jika parameter value.fields-prefix diatur ke test1_value_, Anda tidak dapat mengatur parameter key.fields-prefix ke test1_.

    value.fields-prefix

    STRING

    Awalan yang ditambahkan ke bidang yang diurai dari bidang nilai dalam pesan Kafka. Anda dapat mengonfigurasi parameter ini untuk mencegah konflik nama setelah bidang nilai dalam pesan Kafka diurai.

    Tidak

    Nilai default: value_. Jika nama bidang nilai adalah b, nama nilai yang diperoleh setelah bidang nilai dalam pesan Kafka diurai adalah value_b.

    Catatan

    Nilai parameter value.fields-prefix tidak boleh sama dengan awalan dalam nilai parameter key.fields-prefix. Sebagai contoh, jika parameter key.fields-prefix diatur ke test2_value_, Anda tidak dapat mengatur parameter value.fields-prefix ke test2_.

    timestamp-format.standard

    STRING

    Format bidang tipe TIMESTAMP dalam pesan Kafka berformat JSON. Realtime Compute for Apache Flink mengurai bidang dalam format yang Anda konfigurasikan. Jika Realtime Compute for Apache Flink gagal mengurai bidang dalam format yang Anda konfigurasikan, Realtime Compute for Apache Flink mencoba mengurai bidang dalam format lain.

    Tidak

    Nilai valid:

    • SQL (nilai default)

    • ISO-8601

    infer-schema.flatten-nested-columns.enable

    BOOLEAN

    Menentukan apakah akan memperluas kolom bersarang secara rekursif dalam teks JSON saat bidang nilai dalam pesan Kafka berformat JSON diurai.

    Tidak

    Nilai valid:

    • true: Kolom bersarang diperluas secara rekursif.

      Realtime Compute for Apache Flink menggunakan jalur yang mengindeks nilai kolom yang diperluas sebagai nama kolom. Sebagai contoh, kolom col dalam {"nested": {"col": true}} dinamai nested.col setelah kolom diperluas.

      Catatan

      Jika Anda mengatur parameter ini ke true, kami sarankan Anda menggunakan parameter ini bersama dengan pernyataan CREATE TABLE AS. Pernyataan DML lainnya tidak dapat digunakan untuk memperluas kolom bersarang secara otomatis.

    • false: Tipe bersarang diurai sebagai tipe STRING. Ini adalah nilai default.

    infer-schema.primitive-as-string

    BOOLEAN

    Menentukan apakah akan menyimpulkan semua tipe dasar sebagai tipe STRING saat bidang nilai dalam pesan Kafka berformat JSON diurai.

    Tidak

    Nilai valid:

    • true: Semua tipe dasar disimpulkan sebagai tipe STRING.

    • false: Tipe data disimpulkan berdasarkan pemetaan tipe data. Ini adalah nilai default.

    infer-schema.parse-key-error.field-name

    STRING

    Data bidang kunci. Saat bidang kunci dalam pesan Kafka berformat JSON diurai, jika bidang kunci ditentukan tetapi gagal diurai, kolom yang namanya adalah awalan key.fields-prefix dan nilai parameter ini ditambahkan ke skema tabel yang sesuai dengan topik. Kolom ini bertipe VARBINARY dan menunjukkan data bidang kunci.

    Tidak

    Nilai default: col. Sebagai contoh, jika bidang nilai dalam pesan Kafka berformat JSON diurai sebagai value_name dan bidang kunci ditentukan tetapi gagal diurai, skema tabel yang dikembalikan yang sesuai dengan topik berisi dua bidang: key_col dan value_name.

    infer-schema.compacted-topic-as-upsert-table

    BOOLEAN

    Menentukan apakah akan menggunakan tabel sebagai Tabel Upsert Kafka saat kebijakan pembersihan log topik Kafka adalah kompak dan bidang kunci ditentukan.

    Tidak

    Nilai default: true. Anda harus mengatur parameter ini ke true saat mengeksekusi pernyataan CREATE TABLE AS atau pernyataan CREATE DATABASE AS untuk menyinkronkan data ke ApsaraMQ for Kafka.

    Catatan

    Hanya Realtime Compute for Apache Flink yang menggunakan VVR 6.0.2 atau lebih baru yang mendukung parameter ini.

    max.fetch.records

    INT

    Jumlah maksimum pesan berformat JSON yang sistem coba konsumsi saat pesan tersebut diurai.

    Tidak

    Nilai default: 100.

    aliyun.kafka.accessKeyId

    STRING

    ID AccessKey akun Alibaba Cloud Anda. Untuk informasi lebih lanjut tentang cara mendapatkan ID AccessKey, lihat Buat Pasangan AccessKey.

    Tidak

    Anda harus mengonfigurasi parameter ini saat mengeksekusi pernyataan CREATE TABLE AS atau pernyataan CREATE DATABASE AS untuk menyinkronkan data ke ApsaraMQ for Kafka.

    Catatan

    Hanya Realtime Compute for Apache Flink yang menggunakan VVR 6.0.2 atau lebih baru yang mendukung parameter ini.

    aliyun.kafka.accessKeySecret

    STRING

    Rahasia AccessKey akun Alibaba Cloud Anda. Untuk informasi lebih lanjut tentang cara mendapatkan Rahasia AccessKey, lihat Buat Pasangan AccessKey.

    Tidak

    Anda harus mengonfigurasi parameter ini saat mengeksekusi pernyataan CREATE TABLE AS atau pernyataan CREATE DATABASE AS untuk menyinkronkan data ke ApsaraMQ for Kafka.

    Catatan

    Hanya Realtime Compute for Apache Flink yang menggunakan VVR 6.0.2 atau lebih baru yang mendukung parameter ini.

    aliyun.kafka.instanceId

    STRING

    ID instance ApsaraMQ for Kafka. Anda dapat melihat ID tersebut di halaman Detail Instance konsol ApsaraMQ for Kafka.

    Tidak

    Anda harus mengonfigurasi parameter ini saat mengeksekusi pernyataan CREATE TABLE AS atau pernyataan CREATE DATABASE AS untuk menyinkronkan data ke ApsaraMQ for Kafka.

    Catatan

    Hanya Realtime Compute for Apache Flink yang menggunakan VVR 6.0.2 atau lebih baru yang mendukung parameter ini.

    aliyun.kafka.endpoint

    STRING

    Endpoint ApsaraMQ for Kafka. Untuk informasi lebih lanjut, lihat Endpoints.

    Tidak

    Anda harus mengonfigurasi parameter ini saat mengeksekusi pernyataan CREATE TABLE AS atau pernyataan CREATE DATABASE AS untuk menyinkronkan data ke ApsaraMQ for Kafka.

    Catatan

    Hanya Realtime Compute for Apache Flink yang menggunakan VVR 6.0.2 atau lebih baru yang mendukung parameter ini.

    aliyun.kafka.regionId

    STRING

    ID wilayah instance ApsaraMQ for Kafka tempat topik berada. Untuk informasi lebih lanjut, lihat Endpoints.

    Tidak

    Anda harus mengonfigurasi parameter ini saat mengeksekusi pernyataan CREATE TABLE AS atau pernyataan CREATE DATABASE AS untuk menyinkronkan data ke ApsaraMQ for Kafka.

    Catatan

    Hanya Realtime Compute for Apache Flink yang menggunakan VVR 6.0.2 atau lebih baru yang mendukung parameter ini.

  2. Pilih kode yang digunakan untuk membuat katalog dan klik Run yang muncul di sisi kiri kode.

    image.png

  3. Di panel Catalogs di sisi kiri tab Catalogs, periksa katalog yang telah Anda buat.

Lihat katalog JSON Kafka

  1. Di editor kode tab Scripts pada halaman Scripts, masukkan pernyataan berikut:

    DESCRIBE `${catalog_name}`.`${db_name}`.`${topic_name}`;

    Parameter

    Deskripsi

    ${catalog_name}

    Nama katalog JSON Kafka.

    ${db_name}

    Nama kluster Kafka.

    ${topic_name}

    Nama topik Kafka.

  2. Pilih kode yang digunakan untuk melihat katalog dan klik Run yang muncul di sisi kiri kode.

    Setelah pernyataan dieksekusi, Anda dapat melihat informasi tentang tabel yang sesuai dengan topik dalam hasil.表信息

Gunakan katalog JSON Kafka

  • Jika tabel katalog JSON Kafka digunakan sebagai tabel sumber, Anda dapat membaca data dari topik Kafka yang sesuai dengan tabel tersebut.

    INSERT INTO ${other_sink_table}
    SELECT...
    FROM `${kafka_catalog}`.`${db_name}`.`${topic_name}`/*+OPTIONS('scan.startup.mode'='earliest-offset')*/;
    Catatan

    Jika Anda perlu menentukan parameter lain dalam klausa WITH saat menggunakan katalog JSON Kafka, kami sarankan Anda menggunakan petunjuk SQL untuk menambahkan parameter lain. Dalam pernyataan SQL sebelumnya, petunjuk SQL digunakan untuk menentukan bahwa konsumsi dimulai dari data paling awal. Untuk informasi lebih lanjut tentang parameter lainnya, lihat Buat tabel sumber ApsaraMQ for Kafka dan Buat tabel hasil ApsaraMQ for Kafka.

  • Jika tabel katalog JSON Kafka digunakan sebagai tabel sumber Message Queue for Apache Kafka, Anda dapat menyinkronkan data dari topik Kafka yang sesuai dengan tabel ke tabel tujuan menggunakan pernyataan CREATE TABLE AS.

    • Sinkronkan data dari satu topik secara real-time.

      CREATE TABLE IF NOT EXISTS `${target_table_name}`
      WITH(...)
      AS TABLE `${kafka_catalog}`.`${db_name}`.`${topic_name}`
      /*+OPTIONS('scan.startup.mode'='earliest-offset')*/;
    • Sinkronkan data dari beberapa topik dalam penyebaran.

      BEGIN STATEMENT SET;
      
      CREATE TABLE IF NOT EXISTS `some_catalog`.`some_database`.`some_table0`
      AS TABLE `kafka-catalog`.`kafka`.`topic0`
      /*+ OPTIONS('scan.startup.mode'='earliest-offset') */;
      
      CREATE TABLE IF NOT EXISTS `some_catalog`.`some_database`.`some_table1`
      AS TABLE `kafka-catalog`.`kafka`.`topic1`
      /*+ OPTIONS('scan.startup.mode'='earliest-offset') */;
      
      CREATE TABLE IF NOT EXISTS `some_catalog`.`some_database`.`some_table2`
      AS TABLE `kafka-catalog`.`kafka`.`topic2`
      /*+ OPTIONS('scan.startup.mode'='earliest-offset') */;
      
      END;

      Anda dapat menggunakan pernyataan CREATE TABLE AS bersama dengan katalog JSON Kafka untuk menyinkronkan data dari beberapa topik Kafka dalam penyebaran. Untuk menyinkronkan data dari beberapa topik Kafka dalam penyebaran, pastikan kondisi berikut terpenuhi:

      • topic-pattern tidak dikonfigurasikan untuk semua tabel yang sesuai dengan topik.

      • Nilai parameter Kafka di setiap tabel sama. Nilai parameter dengan awalan properties. sama. Parameter tersebut mencakup properties.bootstrap.servers dan properties.group.id.

      • Nilai parameter scan.startup.mode sama untuk semua tabel. Parameter scan.startup.mode hanya dapat diatur ke group-offsets, latest-offset, atau earliest-offset.

      Gambar berikut menunjukkan contohnya. Pada gambar berikut, dua tabel di atas memenuhi kondisi sebelumnya dan dua tabel di bawah tidak memenuhi kondisi.示例

Catatan

Untuk informasi lebih lanjut tentang cara menyinkronkan data dari tabel sumber dalam katalog JSON Kafka ke tabel tujuan dalam katalog tujuan, lihat Ingest data log ke gudang data secara real-time.

Hapus katalog JSON Kafka

Peringatan

Setelah Anda menghapus katalog JSON Kafka, penyebaran yang sedang berjalan tidak terpengaruh. Namun, penyebaran yang menggunakan tabel dari katalog tersebut tidak dapat lagi menemukan tabel saat penyebaran diterbitkan atau di-restart. Berhati-hatilah saat menghapus katalog JSON Kafka.

  1. Di editor kode tab Scripts pada halaman Scripts, masukkan pernyataan berikut:

    DROP CATALOG ${catalog_name};

    catalog_name menentukan nama katalog JSON Kafka yang ingin Anda hapus.

  2. Klik kanan pernyataan yang digunakan untuk menghapus katalog dan pilih Run dari menu pintasan.

  3. Lihat panel Catalogs di sisi kiri halaman Daftar Katalog untuk memeriksa apakah katalog telah dihapus.

Deskripsi inferensi skema

Untuk memudahkan Anda menggunakan tabel yang diperoleh setelah mengonfigurasi katalog JSON Kafka, sistem secara otomatis menambahkan parameter konfigurasi default, kolom metadata, dan kunci utama ke tabel. Bagian ini menjelaskan informasi tentang tabel yang diperoleh setelah Anda mengonfigurasi katalog JSON Kafka.

  • Skema Tabel

    Saat pesan Kafka berformat JSON diurai untuk mendapatkan skema topik, sistem mencoba mengonsumsi pesan yang kurang dari atau sama dengan nilai parameter max.fetch.records. Sistem mengurai skema setiap catatan data dan menggabungkan skema tersebut sebagai skema topik. Sistem mengurai pesan berdasarkan pemetaan tipe data yang digunakan saat Anda menggunakan pernyataan CREATE TABLE AS untuk menyinkronkan data tabel Kafka.

    Penting
    • Saat katalog JSON Kafka digunakan untuk menyimpulkan skema topik, grup konsumen dibuat untuk mengonsumsi data topik. Jika nama grup konsumen mencakup awalan, grup konsumen dibuat menggunakan katalog.

    • Jika Anda ingin mendapatkan data dari tabel ApsaraMQ for Kafka, kami sarankan Anda menggunakan katalog JSON Kafka dari Realtime Compute for Apache Flink yang menggunakan VVR 6.0.7 atau lebih baru. Untuk Realtime Compute for Apache Flink yang menggunakan VVR versi lebih lama dari 6.0.7, grup konsumen tidak dihapus secara otomatis. Akibatnya, Anda mungkin menerima notifikasi peringatan tentang akumulasi pesan dalam grup konsumen.

    Skema topik terdiri dari bagian-bagian berikut:

    • Kolom Fisik

      Secara default, kolom fisik diurai berdasarkan bidang kunci dan nilai pesan Kafka. Awalan ditambahkan ke nama kolom yang diperoleh.

      Jika bidang kunci ditentukan tetapi gagal diurai, kolom yang namanya adalah awalan key.fields-prefix dan nilai parameter infer-schema.parse-key-error.field-name dikembalikan. Tipe kolom adalah VARBINARY.

      Setelah katalog JSON Kafka mendapatkan sekelompok pesan Kafka, katalog JSON Kafka mengurai pesan Kafka secara berurutan dan menggabungkan kolom fisik yang diperoleh setelah penguraian sebagai skema topik berdasarkan aturan berikut: Fungsi ini menggabungkan dokumen JSON berdasarkan aturan berikut:

      • Jika kolom fisik tertentu yang diperoleh setelah penguraian tidak ada dalam skema topik, katalog JSON Kafka secara otomatis menambahkan kolom tersebut ke skema topik.

      • Jika kolom fisik tertentu yang diperoleh setelah penguraian memiliki nama yang sama dengan kolom tertentu dalam skema topik, lakukan operasi berdasarkan skenario bisnis Anda:

        • Jika kolom-kolom tersebut memiliki tipe data yang sama tetapi presisi berbeda, katalog JSON Kafka menggabungkan kolom dengan presisi lebih besar.

        • Jika kolom-kolom tersebut memiliki tipe data yang berbeda, katalog JSON Kafka menggunakan simpul induk terkecil dalam struktur pohon yang ditunjukkan dalam gambar berikut sebagai tipe kolom yang memiliki nama yang sama. Jika kolom bertipe DECIMAL dan FLOAT digabungkan, kolom tersebut digabungkan menjadi tipe DOUBLE untuk mempertahankan presisi.Schema合并

      Sebagai contoh, jika topik Kafka berisi tiga catatan data, skema yang ditunjukkan dalam gambar berikut dikembalikan.Schema

    • Kolom Metadata

      Secara default, kolom metadata bernama partition, offset, dan timestamp ditambahkan. Tabel berikut menjelaskan kolom metadata.

      Nama Metadata

      Nama Kolom

      Tipe

      Deskripsi

      partition

      partition

      INT NOT NULL

      Nilai dalam kolom kunci partisi.

      offset

      offset

      BIGINT NOT NULL

      Offset.

      timestamp

      timestamp

      TIMESTAMP_LTZ(3) NOT NULL

      Timestamp pesan.

    • Aturan untuk Kunci Utama Default yang Ditambahkan

      Jika tabel yang diperoleh setelah Anda mengonfigurasi katalog JSON Kafka dikonsumsi sebagai tabel sumber, kolom metadata partition dan offset digunakan sebagai kunci utama. Ini memastikan bahwa data tidak diduplikasi.

    Catatan

    Jika skema tabel yang disimpulkan dari katalog JSON Kafka tidak sesuai harapan, Anda dapat menggunakan sintaksis CREATE TEMPORARY TABLE ... LIKE untuk mendeklarasikan tabel sementara untuk menentukan skema tabel yang diinginkan. Sebagai contoh, data JSON berisi bidang ts dalam format '2023-01-01 12:00:01'. Katalog JSON Kafka secara otomatis menyimpulkan bidang ts sebagai tipe data TIMESTAMP. Jika Anda ingin bidang ts digunakan sebagai tipe data STRING, Anda dapat menggunakan sintaksis CREATE TEMPORARY TABLE... LIKE untuk mendeklarasikan tabel. Dalam kode sampel berikut, bidang value_ts digunakan karena awalan value_ ditambahkan ke bidang nilai dalam konfigurasi default.

    CREATE TEMPORARY TABLE tempTable (
        value_name STRING,
        value_ts STRING
    ) LIKE `kafkaJsonCatalog`.`kafka`.`testTopic`;
  • Parameter Tabel yang Ditambahkan Secara Default

    Parameter

    Deskripsi

    Catatan

    connector

    Tipe konektor.

    Atur nilainya menjadi kafka atau upsert-kafka.

    topic

    Nama topik.

    Atur nilainya menjadi nama tabel yang Anda deklarasikan dalam katalog JSON Kafka.

    properties.bootstrap.servers

    Alamat IP atau endpoint broker Kafka.

    Atur nilainya sama dengan nilai parameter properties.bootstrap.servers dari katalog JSON Kafka.

    value.format

    Format yang digunakan oleh konektor Kafka Flink untuk meng-serialisasi atau meng-deserialisasi bidang nilai dalam pesan Kafka.

    Atur nilainya menjadi json.

    value.fields-prefix

    Awalan kustom untuk semua bidang nilai dalam pesan Kafka. Anda dapat mengonfigurasi parameter ini untuk mencegah konflik nama dengan bidang kunci atau bidang metadata.

    Atur nilainya sama dengan nilai parameter value.fields-prefix dari katalog JSON Kafka.

    value.json.infer-schema.flatten-nested-columns.enable

    Menentukan apakah akan memperluas kolom bersarang secara rekursif dalam teks JSON saat bidang nilai dalam pesan Kafka berformat JSON diurai.

    Atur nilainya sama dengan nilai parameter infer-schema.flatten-nested-columns.enable dari katalog JSON Kafka.

    value.json.infer-schema.primitive-as-string

    Menentukan apakah akan menyimpulkan semua tipe dasar sebagai tipe STRING saat bidang nilai dalam pesan Kafka berformat JSON diurai.

    Atur nilainya sama dengan nilai parameter infer-schema.primitive-as-string dari katalog JSON Kafka.

    value.fields-include

    Kebijakan yang digunakan untuk memproses bidang kunci saat bidang nilai diurai.

    Atur nilainya menjadi EXCEPT_KEY. Jika parameter ini diatur ke EXCEPT_KEY, bidang kunci dikecualikan saat bidang nilai diurai.

    Jika bidang kunci ditentukan, Anda harus mengonfigurasi parameter ini.

    key.format

    Format yang digunakan oleh konektor Kafka Flink untuk meng-serialisasi atau meng-deserialisasi bidang kunci dalam pesan Kafka.

    Atur nilainya menjadi json atau raw.

    Jika bidang kunci ditentukan, Anda harus mengonfigurasi parameter ini.

    Jika bidang kunci ditentukan tetapi gagal diurai, atur nilai parameter ini menjadi raw. Jika bidang kunci ditentukan dan diurai, atur nilai parameter ini menjadi json.

    key.fields-prefix

    Awalan kustom untuk semua bidang kunci dalam pesan Kafka. Anda dapat mengonfigurasi parameter ini untuk mencegah konflik nama dengan bidang nilai.

    Atur nilainya sama dengan nilai parameter key.fields-prefix dari katalog JSON Kafka.

    Jika bidang kunci ditentukan, Anda harus mengonfigurasi parameter ini.

    key.fields

    Bidang yang diurai dari bidang kunci dalam pesan Kafka.

    Sistem secara otomatis memasukkan bidang kunci dalam tabel.

    Jika bidang kunci ditentukan dan tabel bukan tabel Upsert Kafka, Anda harus mengonfigurasi parameter ini.