全部产品
Search
文档中心

Realtime Compute for Apache Flink:Kelola Katalog JSON Kafka

更新时间:Jan 14, 2026

Setelah mengonfigurasi katalog Kafka JSON, Anda dapat langsung mengakses topik berformat JSON di kluster Kafka tanpa perlu mendefinisikan skema saat mengembangkan pekerjaan di Realtime Compute for Apache Flink. Topik ini menjelaskan cara membuat, melihat, dan menghapus katalog Kafka JSON.

Informasi latar belakang

Katalog Kafka JSON melakukan inferensi skema topik dengan secara otomatis mengurai pesan berformat JSON. Hal ini memungkinkan Anda mengambil informasi bidang tertentu dari pesan tanpa perlu mendeklarasikan skema tabel Kafka dalam Flink SQL. Katalog Kafka JSON memiliki fitur-fitur berikut:

  • Nama tabel dalam katalog Kafka JSON sesuai dengan nama topik Kafka. Anda tidak perlu mendaftarkan tabel Kafka secara manual menggunakan pernyataan Data Definition Language (DDL). Hal ini meningkatkan efisiensi dan akurasi pengembangan.

  • Tabel yang disediakan oleh katalog Kafka JSON dapat langsung digunakan sebagai tabel sumber dalam pekerjaan Flink SQL.

  • Anda dapat menggunakan katalog Kafka JSON bersama pernyataan CREATE TABLE AS (CTAS) untuk menyinkronkan data beserta perubahan skemanya.

Batasan

  • Katalog Kafka JSON hanya mendukung topik yang berisi pesan dalam format JSON.

  • Hanya mesin komputasi yang menggunakan VVR 6.0.2 atau versi lebih baru yang mendukung katalog Kafka JSON.

    Catatan

    Jika Anda menggunakan VVR 4.x, Anda harus melakukan upgrade pekerjaan ke VVR 6.0.2 atau versi lebih baru sebelum dapat menggunakan katalog Kafka JSON.

  • Anda tidak dapat memodifikasi katalog Kafka JSON yang sudah ada menggunakan pernyataan DDL.

  • Anda hanya dapat melakukan kueri pada tabel data. Anda tidak dapat membuat, memodifikasi, atau menghapus database atau tabel.

    Catatan

    Dalam skenario CREATE DATABASE AS (CDAS) atau CREATE TABLE AS (CTAS) yang menggunakan katalog Kafka JSON, topik dapat dibuat secara otomatis.

  • Katalog Kafka JSON tidak dapat membaca dari atau menulis ke kluster Kafka yang telah mengaktifkan autentikasi SSL atau SASL.

  • Tabel yang disediakan oleh katalog Kafka JSON dapat langsung digunakan sebagai tabel sumber dalam pekerjaan Flink SQL. Tabel tersebut tidak dapat digunakan sebagai tabel sink atau tabel dimensi lookup.

  • ApsaraMQ for Kafka saat ini tidak memungkinkan Anda menghapus groups menggunakan operasi API yang sama seperti Apache Kafka open source. Saat membuat katalog Kafka JSON, Anda harus mengonfigurasi parameter aliyun.kafka.instanceId, aliyun.kafka.accessKeyId, aliyun.kafka.accessKeySecret, aliyun.kafka.endpoint, dan aliyun.kafka.regionId untuk menghapus kelompok konsumen secara otomatis. Untuk informasi selengkapnya, lihat Perbandingan antara ApsaraMQ for Kafka dan Apache Kafka open source.

Pencegahan

Katalog Kafka JSON menghasilkan skema tabel dengan mengurai data sampel. Untuk topik dengan format data yang tidak konsisten, katalog akan mengembalikan skema terluas yang mungkin, dengan mempertahankan semua kolom secara default. Jika format data suatu topik berubah, skema tabel yang diinferensi oleh katalog dapat menjadi tidak konsisten pada waktu yang berbeda. Jika skema yang diinferensi berbeda sebelum dan sesudah restart pekerjaan, masalah eksekusi pekerjaan dapat terjadi.

Sebagai contoh, pekerjaan Flink SQL mereferensikan tabel dalam katalog Kafka JSON. Jika pekerjaan tersebut direstart dari titik simpan setelah berjalan selama periode tertentu, skema baru yang berbeda dari skema yang digunakan pada eksekusi sebelumnya mungkin diambil. Namun, rencana eksekusi pekerjaan tetap menggunakan versi yang dihasilkan dengan skema lama. Hal ini dapat menyebabkan ketidaksesuaian pada komponen downstream, seperti kondisi filter atau nilai bidang. Untuk mencegah hal ini, Anda dapat membuat tabel Kafka menggunakan pernyataan CREATE TEMPORARY TABLE dalam pekerjaan Flink SQL Anda untuk menetapkan skema tetap.

Buat katalog JSON Kafka

  1. Di editor teks pada tab Data Exploration, masukkan pernyataan untuk mengonfigurasi katalog Kafka JSON.

    • Kluster Kafka yang dikelola sendiri atau kluster EMR Kafka

      CREATE CATALOG <YourCatalogName> WITH(
       'type'='kafka',
       'properties.bootstrap.servers'='<brokers>',
       'format'='json',
       'default-database'='<dbName>',
       'key.fields-prefix'='<keyPrefix>',
       'value.fields-prefix'='<valuePrefix>',
       'timestamp-format.standard'='<timestampFormat>',
       'infer-schema.flatten-nested-columns.enable'='<flattenNestedColumns>',
       'infer-schema.primitive-as-string'='<primitiveAsString>',
       'infer-schema.parse-key-error.field-name'='<parseKeyErrorFieldName>',
       'infer-schema.compacted-topic-as-upsert-table'='true',
       'max.fetch.records'='100'
      );
    • ApsaraMQ for Kafka

      CREATE CATALOG <YourCatalogName> WITH(
       'type'='kafka',
       'properties.bootstrap.servers'='<brokers>',
       'format'='json',
       'default-database'='<dbName>',
       'key.fields-prefix'='<keyPrefix>',
       'value.fields-prefix'='<valuePrefix>',
       'timestamp-format.standard'='<timestampFormat>',
       'infer-schema.flatten-nested-columns.enable'='<flattenNestedColumns>',
       'infer-schema.primitive-as-string'='<primitiveAsString>',
       'infer-schema.parse-key-error.field-name'='<parseKeyErrorFieldName>',
       'infer-schema.compacted-topic-as-upsert-table'='true',
       'max.fetch.records'='100',
       'aliyun.kafka.accessKeyId'='<aliyunAccessKeyId>',
       'aliyun.kafka.accessKeySecret'='<aliyunAccessKeySecret>',
       'aliyun.kafka.instanceId'='<aliyunKafkaInstanceId>',
       'aliyun.kafka.endpoint'='<aliyunKafkaEndpoint>',
       'aliyun.kafka.regionId'='<aliyunKafkaRegionId>'
      );

    Parameter

    Type

    Deskripsi

    Diperlukan

    Catatan

    YourCatalogName

    String

    Nama untuk katalog Kafka JSON.

    Ya

    Masukkan nama kustom dalam bahasa Inggris.

    Penting

    Hapus tanda kurung sudut (<>) setelah Anda mengganti parameter dengan nama katalog Anda. Jika tidak, akan terjadi error pemeriksaan sintaks.

    type

    String

    Tipe katalog.

    Ya

    Nilainya harus kafka.

    properties.bootstrap.servers

    String

    Alamat broker Kafka.

    Ya

    Format: host1:port1,host2:port2,host3:port3.

    Pisahkan beberapa alamat dengan koma (,).

    format

    String

    Format pesan Kafka.

    Ya

    Saat ini, hanya JSON yang didukung. Flink mengurai pesan Kafka berformat JSON untuk mendapatkan skema.

    default-database

    String

    Nama kluster Kafka.

    Tidak

    Nilai default-nya adalah kafka. Katalog memerlukan nama tiga bagian untuk menemukan tabel: catalog_name.db_name.table_name. Parameter ini menentukan db_name default. Karena Kafka tidak memiliki konsep database, Anda dapat menggunakan string apa pun untuk merepresentasikan kluster Kafka sebagai database.

    key.fields-prefix

    String

    Awalan kustom yang ditambahkan ke nama bidang yang diurai dari kunci pesan. Hal ini menghindari konflik penamaan setelah kunci pesan Kafka diurai.

    Tidak

    Nilai default-nya adalah key_. Misalnya, jika bidang kunci Anda bernama a, sistem akan mengurai nama bidang tersebut menjadi key_a.

    Catatan

    Nilai parameter key.fields-prefix tidak boleh menjadi awalan dari nilai parameter value.fields-prefix. Misalnya, jika Anda menyetel value.fields-prefix ke test1_value_, Anda tidak boleh menyetel key.fields-prefix ke test1_.

    value.fields-prefix

    String

    Awalan kustom yang ditambahkan ke nama bidang yang diurai dari isi pesan (value). Hal ini menghindari konflik penamaan setelah isi pesan Kafka diurai.

    Tidak

    Nilai default-nya adalah value_. Misalnya, jika bidang value Anda bernama b, sistem akan mengurai nama bidang tersebut menjadi value_b.

    Catatan

    Nilai parameter value.fields-prefix tidak boleh menjadi awalan dari nilai parameter key.fields-prefix. Misalnya, jika Anda menyetel key.fields-prefix ke test2_value_, Anda tidak boleh menyetel value.fields-prefix ke test2_.

    timestamp-format.standard

    String

    Format untuk mengurai bidang bertipe Timestamp dalam pesan berformat JSON. Sistem pertama-tama mencoba mengurai menggunakan format yang Anda konfigurasikan. Jika penguraian gagal, sistem secara otomatis mencoba format lainnya.

    Tidak

    Nilai valid:

    • SQL (default)

    • ISO-8601

    infer-schema.flatten-nested-columns.enable

    Boolean

    Menentukan apakah akan memperluas kolom bersarang secara rekursif dalam isi pesan JSON (value) selama penguraian.

    Tidak

    Nilai valid:

    • true: Perluas secara rekursif.

      Untuk kolom yang diperluas, Flink menggunakan jalur yang mengindeks nilai sebagai nama. Misalnya, untuk kolom col dalam {"nested": {"col": true}} , nama yang diperluas adalah nested.col.

      Catatan

      Jika Anda menyetel parameter ini ke true, gunakan bersama pernyataan CREATE TABLE AS (CTAS). Pernyataan DML lainnya tidak mendukung perluasan otomatis kolom bersarang.

    • false (default): Perlakukan tipe bersarang sebagai String.

    infer-schema.primitive-as-string

    Boolean

    Menentukan apakah akan menginferensi semua tipe data primitif sebagai tipe String saat mengurai isi pesan JSON (value).

    Tidak

    Nilai valid:

    • true: Inferensi semua tipe primitif sebagai String.

    • false (default): Inferensi tipe berdasarkan aturan dasar.

    infer-schema.parse-key-error.field-name

    String

    Saat mengurai kunci pesan JSON, jika kunci tidak kosong dan penguraian gagal, bidang bertipe VARBINARY ditambahkan ke skema tabel untuk merepresentasikan data kunci pesan. Nama kolom merupakan gabungan dari awalan key.fields-prefix dan nilai parameter ini.

    Tidak

    Nilai default-nya adalah col. Misalnya, jika isi pesan diurai menjadi bidang bernama value_name, dan kunci pesan tidak kosong tetapi gagal diurai, skema yang dikembalikan berisi dua bidang: key_col dan value_name.

    infer-schema.compacted-topic-as-upsert-table

    Boolean

    Menentukan apakah akan menggunakan tabel sebagai tabel Upsert Kafka ketika kebijakan pembersihan topik Kafka adalah compact dan kunci pesan tidak kosong.

    Tidak

    Nilai default-nya adalah true. Setel ke true saat menggunakan sintaks CTAS atau CDAS untuk menyinkronkan data ke ApsaraMQ for Kafka.

    Catatan

    Hanya mesin komputasi VVR 6.0.2 atau versi lebih baru yang mendukung parameter ini.

    max.fetch.records

    Int

    Jumlah maksimum pesan yang dikonsumsi saat mengurai pesan berformat JSON.

    Tidak

    Nilai default-nya adalah 100.

    aliyun.kafka.accessKeyId

    String

    ID AccessKey Akun Alibaba Cloud Anda. Untuk informasi selengkapnya, lihat Buat Pasangan Kunci Akses.

    Tidak

    Konfigurasikan parameter ini saat menggunakan sintaks CTAS atau CDAS untuk menyinkronkan data ke ApsaraMQ for Kafka.

    Catatan

    Hanya mesin komputasi VVR 6.0.2 atau versi lebih baru yang mendukung parameter ini.

    aliyun.kafka.accessKeySecret

    String

    Rahasia AccessKey Akun Alibaba Cloud Anda. Untuk informasi selengkapnya, lihat Buat Pasangan Kunci Akses.

    Tidak

    Konfigurasikan parameter ini saat menggunakan sintaks CTAS atau CDAS untuk menyinkronkan data ke ApsaraMQ for Kafka.

    Catatan

    Hanya mesin komputasi VVR 6.0.2 atau versi lebih baru yang mendukung parameter ini.

    aliyun.kafka.instanceId

    String

    ID instans ApsaraMQ for Kafka. Anda dapat melihat ID tersebut di halaman detail instans di Konsol ApsaraMQ for Kafka.

    Tidak

    Konfigurasikan parameter ini saat menggunakan sintaks CTAS atau CDAS untuk menyinkronkan data ke ApsaraMQ for Kafka.

    Catatan

    Hanya mesin komputasi VVR 6.0.2 atau versi lebih baru yang mendukung parameter ini.

    aliyun.kafka.endpoint

    String

    Titik akhir API ApsaraMQ for Kafka. Untuk informasi selengkapnya, lihat Titik Akhir.

    Tidak

    Konfigurasikan parameter ini saat menggunakan sintaks CTAS atau CDAS untuk menyinkronkan data ke ApsaraMQ for Kafka.

    Catatan

    Hanya mesin komputasi VVR 6.0.2 atau versi lebih baru yang mendukung parameter ini.

    aliyun.kafka.regionId

    String

    ID wilayah instans tempat topik berada. Untuk informasi selengkapnya, lihat Titik Akhir.

    Tidak

    Konfigurasikan parameter ini saat menggunakan sintaks CTAS atau CDAS untuk menyinkronkan data ke ApsaraMQ for Kafka.

    Catatan

    Hanya mesin komputasi VVR 6.0.2 atau versi lebih baru yang mendukung parameter ini.

  2. Pilih pernyataan untuk membuat katalog, lalu klik Run di sebelah nomor baris di sebelah kiri.

    image.png

  3. Di area Metadata di sebelah kiri, Anda dapat melihat katalog yang telah dibuat.

Lihat katalog JSON Kafka

  1. Di editor teks pada tab Data Exploration, masukkan pernyataan berikut.

    DESCRIBE `${catalog_name}`.`${db_name}`.`${topic_name}`;

    Parameter

    Deskripsi

    ${catalog_name}

    Nama katalog JSON Kafka.

    ${db_name}

    Nama kluster Kafka.

    ${topic_name}

    Nama topik Kafka.

  2. Pilih pernyataan untuk melihat katalog, lalu klik Run di sebelah nomor baris di sebelah kiri.

    Setelah pernyataan berhasil dijalankan, Anda dapat melihat detail tabel di hasil eksekusi.Table information

Gunakan katalog JSON Kafka

  • Gunakan katalog sebagai tabel sumber untuk membaca data dari topik Kafka.

    INSERT INTO ${other_sink_table}
    SELECT...
    FROM `${kafka_catalog}`.`${db_name}`.`${topic_name}`/*+OPTIONS('scan.startup.mode'='earliest-offset')*/;
    Catatan

    Untuk menentukan parameter WITH lainnya saat menggunakan tabel dari katalog Kafka JSON, Anda dapat menggunakan Petunjuk SQL untuk menambahkannya. Sebagai contoh, pernyataan SQL di atas menggunakan Petunjuk SQL untuk menentukan bahwa konsumsi dimulai dari data paling awal. Untuk informasi selengkapnya tentang parameter lainnya, lihat Tabel sumber ApsaraMQ for Kafka dan Tabel sink ApsaraMQ for Kafka.

  • Anda dapat menggunakan pernyataan CREATE TABLE AS (CTAS) untuk menyinkronkan data dari topik Kafka ke tabel target, dengan menggunakan topik Kafka sebagai tabel sumber.

    • Menyinkronkan satu tabel secara real-time.

      CREATE TABLE IF NOT EXISTS `${target_table_name}`
      WITH(...)
      AS TABLE `${kafka_catalog}`.`${db_name}`.`${topic_name}`
      /*+OPTIONS('scan.startup.mode'='earliest-offset')*/;
    • Menyinkronkan beberapa tabel dalam satu pekerjaan.

      BEGIN STATEMENT SET;
      
      CREATE TABLE IF NOT EXISTS `some_catalog`.`some_database`.`some_table0`
      AS TABLE `kafka-catalog`.`kafka`.`topic0`
      /*+ OPTIONS('scan.startup.mode'='earliest-offset') */;
      
      CREATE TABLE IF NOT EXISTS `some_catalog`.`some_database`.`some_table1`
      AS TABLE `kafka-catalog`.`kafka`.`topic1`
      /*+ OPTIONS('scan.startup.mode'='earliest-offset') */;
      
      CREATE TABLE IF NOT EXISTS `some_catalog`.`some_database`.`some_table2`
      AS TABLE `kafka-catalog`.`kafka`.`topic2`
      /*+ OPTIONS('scan.startup.mode'='earliest-offset') */;
      
      END;

      Dengan katalog Kafka JSON, Anda dapat menyinkronkan beberapa tabel Kafka dalam satu tugas yang sama. Namun, kondisi berikut harus dipenuhi:

      • Parameter topic-pattern tidak dikonfigurasi untuk salah satu tabel Kafka.

      • Konfigurasi Kafka untuk setiap tabel harus identik. Artinya, semua pengaturan properties.*, termasuk properties.bootstrap.servers dan properties.group.id, harus sama.

      • Pengaturan scan.startup.mode untuk setiap tabel harus identik. Nilainya hanya dapat diatur ke group-offsets, latest-offset, atau earliest-offset.

      Sebagai contoh, pada gambar berikut, dua tabel di atas memenuhi kondisi tersebut, sedangkan dua tabel di bawahnya melanggarnya.Example

Catatan

Untuk contoh lengkap end-to-end penggunaan katalog Kafka JSON, lihat Panduan Cepat untuk gudang log real-time.

Hapus katalog Kafka JSON

Peringatan

Menghapus katalog Kafka JSON tidak memengaruhi pekerjaan yang sedang berjalan. Namun, pekerjaan yang menggunakan tabel dari katalog tersebut akan gagal dengan error "table not found" saat dipublikasikan atau direstart. Lakukan dengan hati-hati.

  1. Di editor teks pada tab Data Exploration, masukkan pernyataan berikut.

    DROP CATALOG ${catalog_name};

    Ganti ${catalog_name} dengan nama katalog Kafka JSON yang ingin Anda hapus.

  2. Pilih pernyataan untuk menghapus katalog, klik kanan, lalu pilih Run.

  3. Di area Metadata di sebelah kiri, periksa apakah katalog telah dihapus.

Detail informasi tabel dari katalog Kafka JSON

Untuk mempermudah penggunaan tabel dari katalog Kafka JSON, katalog tersebut menambahkan parameter konfigurasi default, metadata, dan informasi kunci utama ke tabel yang diinferensi. Bagian berikut menjelaskan detail informasi tabel dari katalog Kafka JSON:

  • Inferensi Skema untuk Tabel Kafka

    Saat katalog Kafka JSON mengurai pesan berformat JSON untuk mengambil skema topik, katalog tersebut mencoba mengonsumsi hingga max.fetch.records pesan. Katalog tersebut mengurai skema setiap catatan data sesuai dengan aturan dasar yang sama seperti saat Kafka digunakan sebagai sumber data CTAS. Kemudian, katalog tersebut menggabungkan skema-skema tersebut untuk membentuk skema akhir.

    Penting
    • Saat katalog Kafka JSON melakukan inferensi skema, katalog tersebut membuat kelompok konsumen untuk mengonsumsi data topik. Nama kelompok konsumen menggunakan awalan untuk menunjukkan bahwa kelompok tersebut dibuat oleh katalog.

    • Untuk ApsaraMQ for Kafka, gunakan katalog Kafka JSON dengan VVR 6.0.7 atau versi lebih baru. Pada versi sebelum 6.0.7, kelompok konsumen tidak dihapus secara otomatis. Hal ini dapat menyebabkan peringatan karena penumpukan pesan di kelompok konsumen.

    Skema tersebut terutama mencakup bagian-bagian berikut:

    • Kolom Fisik yang Diinferensi

      Katalog Kafka JSON melakukan inferensi kolom fisik pesan dari kunci pesan Kafka dan isi pesan (value). Awalan yang sesuai ditambahkan ke nama kolom.

      Jika kunci pesan tidak kosong tetapi tidak dapat diurai, kolom bertipe VARBINARY dibuat. Nama kolom merupakan gabungan dari awalan key.fields-prefix dan nilai parameter infer-schema.parse-key-error.field-name.

      Setelah menarik sekelompok pesan Kafka, katalog tersebut mengurai setiap pesan dan menggabungkan kolom fisik yang diurai untuk membuat skema seluruh topik. Aturan penggabungan adalah sebagai berikut:

      • Jika kolom fisik yang diurai berisi bidang yang tidak ada dalam skema hasil, katalog Kafka JSON secara otomatis menambahkan bidang tersebut ke skema hasil.

      • Jika kolom dengan nama yang sama muncul, penanganannya berdasarkan skenario berikut:

        • Jika tipenya sama tetapi presisinya berbeda, tipe dengan presisi lebih tinggi yang digunakan.

        • Jika tipenya berbeda, sistem menemukan node induk terkecil dalam struktur pohon yang ditunjukkan pada gambar berikut dan menggunakannya sebagai tipe untuk kolom dengan nama yang sama. Namun, saat tipe Decimal dan Float digabung, keduanya dikonversi ke tipe Double untuk mempertahankan presisi.Schema merge

      Sebagai contoh, untuk topik Kafka yang berisi tiga catatan data di bawah ini, katalog Kafka JSON menghasilkan skema seperti yang ditunjukkan pada gambar.Schema

    • Kolom Metadata Default

      Katalog Kafka JSON menambahkan tiga kolom metadata berguna secara default: partition, offset, dan timestamp. Detailnya ditunjukkan pada tabel berikut.

      Nama Metadata

      Nama Kolom

      Tipe

      Deskripsi

      partition

      partition

      INT NOT NULL

      Nilai partisi.

      offset

      offset

      BIGINT NOT NULL

      Offset.

      timestamp

      timestamp

      TIMESTAMP_LTZ(3) NOT NULL

      Timestamp pesan.

    • Kendala KUNCI UTAMA Default

      Saat tabel dari katalog Kafka JSON dikonsumsi sebagai tabel sumber, kolom metadata partition dan offset digunakan sebagai kunci utama secara default untuk memastikan keunikan data.

    Catatan

    Jika skema tabel yang diinferensi oleh katalog Kafka JSON tidak memenuhi kebutuhan Anda, Anda dapat mendeklarasikan tabel temporary menggunakan sintaks CREATE TEMPORARY TABLE ... LIKE untuk menentukan skema tabel yang diinginkan. Sebagai contoh, jika data JSON berisi bidang bernama ts dalam format '2023-01-01 12:00:01', katalog Kafka JSON secara otomatis menginferensi bidang ts sebagai tipe TIMESTAMP. Jika Anda ingin menggunakan bidang ts sebagai tipe STRING, Anda dapat mendeklarasikan tabel menggunakan sintaks CREATE TEMPORARY TABLE ... LIKE. Seperti yang ditunjukkan pada contoh berikut, karena awalan value_ ditambahkan ke bidang isi pesan dalam konfigurasi default, nama bidangnya menjadi value_ts.

    CREATE TEMPORARY TABLE tempTable (
        value_name STRING,
        value_ts STRING
    ) LIKE `kafkaJsonCatalog`.`kafka`.`testTopic`;
  • Parameter Tabel Default

    Parameter

    Deskripsi

    Catatan

    connector

    Tipe konektor.

    Nilainya harus kafka atau upsert-kafka.

    topic

    Nama topik yang sesuai.

    Nama tabel.

    properties.bootstrap.servers

    Alamat broker Kafka.

    Sesuai dengan nilai parameter properties.bootstrap.servers dalam konfigurasi katalog.

    value.format

    Format yang digunakan oleh konektor Flink Kafka untuk serialisasi atau deserialisasi isi pesan Kafka (value).

    Nilainya tetap JSON.

    value.fields-prefix

    Menentukan awalan kustom untuk semua bidang isi pesan Kafka (value) untuk menghindari konflik nama dengan bidang kunci pesan atau metadata.

    Sesuai dengan nilai parameter value.fields-prefix dalam konfigurasi katalog.

    value.json.infer-schema.flatten-nested-columns.enable

    Menentukan apakah akan memperluas kolom bersarang secara rekursif dalam JSON isi pesan Kafka (value).

    Sesuai dengan nilai parameter infer-schema.flatten-nested-columns.enable dalam konfigurasi katalog.

    value.json.infer-schema.primitive-as-string

    Menentukan apakah akan menginferensi semua tipe data primitif sebagai tipe String untuk isi pesan Kafka (value).

    Sesuai dengan nilai parameter infer-schema.primitive-as-string dalam konfigurasi katalog.

    value.fields-include

    Menentukan kebijakan penanganan bidang kunci pesan dalam isi pesan.

    Nilainya harus EXCEPT_KEY. Ini menunjukkan bahwa isi pesan tidak berisi bidang kunci pesan.

    Anda harus mengonfigurasi parameter ini jika kunci pesan tidak kosong. Jangan konfigurasi parameter ini jika kunci pesan kosong.

    key.format

    Format yang digunakan oleh konektor Flink Kafka untuk serialisasi atau deserialisasi kunci pesan Kafka.

    Nilainya harus json atau raw.

    Anda harus mengonfigurasi parameter ini jika kunci pesan tidak kosong. Jangan konfigurasi parameter ini jika kunci pesan kosong.

    Jika kunci pesan tidak kosong tetapi tidak dapat diurai, setel parameter ini ke raw. Jika penguraian berhasil, setel parameter ini ke json.

    key.fields-prefix

    Menentukan awalan kustom untuk semua bidang kunci pesan Kafka untuk menghindari konflik nama dengan bidang format isi pesan.

    Sesuai dengan nilai parameter key.fields-prefix dalam konfigurasi katalog.

    Anda harus mengonfigurasi parameter ini jika kunci pesan tidak kosong. Jangan konfigurasi parameter ini jika kunci pesan kosong.

    key.fields

    Bidang tempat data yang diurai dari kunci pesan Kafka disimpan.

    Daftar bidang kunci yang diurai diisi secara otomatis.

    Anda harus mengonfigurasi parameter ini jika kunci pesan tidak kosong dan tabel bukan tabel Upsert Kafka. Jika tidak, jangan konfigurasi parameter ini.