全部产品
Search
文档中心

Realtime Compute for Apache Flink:Kelola katalog MongoDB

更新时间:Jul 06, 2025

Setelah membuat katalog MongoDB, Anda dapat mengakses koleksi MongoDB di konsol Realtime Compute for Apache Flink tanpa perlu mendefinisikan skema koleksi. Topik ini menjelaskan cara membuat, melihat, menggunakan, dan menghapus katalog MongoDB di konsol Realtime Compute for Apache Flink.

Informasi latar belakang

Katalog MongoDB secara otomatis mengurai dokumen berformat Binary JSON (BSON) untuk menyimpulkan skema koleksi. Dengan demikian, Anda dapat menggunakan katalog MongoDB untuk mendapatkan bidang spesifik dari koleksi tanpa harus mendeklarasikan skema koleksi dalam SQL Realtime Compute for Apache Flink. Saat menggunakan katalog MongoDB, perhatikan poin-poin berikut:

  • Nama tabel katalog MongoDB sesuai dengan nama koleksi MongoDB. Dengan cara ini, Anda tidak perlu mengeksekusi pernyataan DDL untuk mendaftarkan tabel MongoDB agar dapat mengakses koleksi MongoDB. Hal ini meningkatkan efisiensi pengembangan data dan akurasi data.

  • Tabel katalog MongoDB dapat digunakan sebagai tabel sumber, tabel dimensi, dan tabel hasil dalam penyebaran SQL Realtime Compute for Apache Flink.

  • Dalam Realtime Compute for Apache Flink yang menggunakan Ververica Runtime (VVR) 8.0.6 atau lebih baru, Anda dapat menggunakan katalog MongoDB bersama dengan CREATE TABLE AS (CTAS) atau CREATE DATABASE AS (CDAS) untuk menyinkronkan perubahan skema tabel.

Topik ini menjelaskan operasi yang dapat Anda lakukan untuk mengelola katalog MongoDB:

Batasan

  • Hanya Realtime Compute for Apache Flink yang menggunakan Ververica Runtime (VVR) 8.0.5 atau lebih baru yang mendukung katalog MongoDB.

  • Anda tidak dapat memodifikasi katalog MongoDB yang ada dengan mengeksekusi pernyataan DDL.

  • Anda hanya dapat mengambil data dari tabel menggunakan katalog MongoDB. Anda tidak diizinkan untuk membuat, memodifikasi, atau menghapus database dan tabel menggunakan katalog MongoDB.

Buat katalog MongoDB

  1. Di editor kode tab Scripts pada halaman SQL Editor, masukkan pernyataan untuk membuat katalog MongoDB.

    CREATE CATALOG <yourcatalogname> WITH(
     'type'='mongodb',
     'default-database'='<dbName>',
     'hosts'='<hosts>',
     'scheme'='<scheme>',
     'username'='<username>',
     'password'='<password>',
     'connection.options'='<connectionOptions>',
     'max.fetch.records'='100',
     'scan.flatten-nested-columns.enable'='<flattenNestedColumns>',
     'scan.primitive-as-string'='<primitiveAsString>'
    );

    Parameter

    Tipe data

    Deskripsi

    Diperlukan

    Catatan

    yourcatalogname

    STRING

    Nama katalog MongoDB.

    Ya

    Masukkan nama kustom.

    Penting

    Anda harus menghapus tanda kurung sudut (<>) saat mengganti nilai parameter dengan nama katalog Anda. Jika tidak, kesalahan akan dikembalikan selama pemeriksaan sintaksis.

    type

    STRING

    Jenis katalog.

    Ya

    Atur nilainya menjadi mongodb.

    hosts

    STRING

    Nama host tempat instance MongoDB berada.

    Ya

    Pisahkan beberapa nama host dengan koma (,).

    default-database

    STRING

    Nama database MongoDB default.

    Ya

    N/A.

    scheme

    STRING

    Protokol koneksi yang digunakan oleh database MongoDB.

    Tidak

    Nilai valid:

    • mongodb: Protokol MongoDB default digunakan untuk mengakses database MongoDB. Ini adalah nilai default.

    • mongodb+srv: Protokol DNS SRV record digunakan untuk mengakses database MongoDB.

    username

    STRING

    Nama pengguna yang digunakan untuk terhubung ke database MongoDB.

    Tidak

    Parameter ini diperlukan jika fitur verifikasi identitas diaktifkan untuk database MongoDB.

    password

    STRING

    Kata sandi yang digunakan untuk terhubung ke database MongoDB.

    Tidak

    Parameter ini diperlukan jika fitur verifikasi identitas diaktifkan untuk database MongoDB.

    Catatan

    Untuk mencegah kebocoran kata sandi, kami sarankan Anda menggunakan metode manajemen kunci untuk menentukan kata sandi Anda. Untuk informasi lebih lanjut, lihat Kelola variabel.

    connection.options

    STRING

    Parameter yang dikonfigurasikan untuk koneksi ke database MongoDB.

    Tidak

    Parameter adalah pasangan kunci-nilai dalam format key=value dan dipisahkan oleh ampersand (&), seperti connectTimeoutMS=12000&socketTimeoutMS=13000.

    max.fetch.records

    INT

    Jumlah maksimum dokumen yang dapat dicoba diperoleh katalog MongoDB saat katalog MongoDB menguraikan dokumen berformat BSON.

    Tidak

    Nilai default: 100.

    scan.flatten-nested-columns.enabled

    BOOLEAN

    Menentukan apakah akan secara rekursif memperluas kolom bertingkat dalam dokumen saat dokumen berformat BSON diuraikan.

    Tidak

    Nilai valid:

    • true: Kolom bertingkat diperluas secara rekursif. Realtime Compute for Apache Flink menggunakan jalur yang mengindeks nilai kolom yang diperluas sebagai nama kolom. Misalnya, kolom col dalam {"nested": {"col": true}} dinamai nested.col setelah kolom diperluas.

    • false: Dokumen berformat BSON bertingkat diuraikan sebagai tipe STRING. Ini adalah nilai default.

    Penting

    Hanya tabel katalog MongoDB yang digunakan sebagai tabel sumber dalam penyebaran SQL Realtime Compute for Apache Flink yang mendukung parameter ini.

    scan.primitive-as-string

    BOOLEAN

    Menentukan apakah semua tipe data dasar disimpulkan sebagai tipe STRING saat dokumen berformat BSON diuraikan.

    Tidak

    Nilai valid:

    • true: Semua tipe data dasar disimpulkan sebagai tipe STRING.

    • false: Tipe data disimpulkan berdasarkan pemetaan tipe data. Ini adalah nilai default. Untuk informasi lebih lanjut tentang pemetaan tipe data, lihat Pelajari detail tabel katalog MongoDB.

  2. Pilih kode yang digunakan untuk membuat katalog dan klik Run di sisi kiri kode.

    image.png

  3. Di panel Catalogs di sisi kiri halaman Daftar Katalog, lihat katalog yang telah Anda buat.

Lihat katalog MongoDB

  1. Di editor kode tab Scripts pada halaman SQL Editor, masukkan pernyataan berikut:

    DESCRIBE `${catalog_name}`.`${db_name}`.`${collection_name}`;

    Parameter

    Deskripsi

    ${catalog_name}

    Nama katalog MongoDB.

    ${db_name}

    Nama database ApsaraDB for MongoDB.

    ${collection_name}

    Nama koleksi ApsaraDB for MongoDB.

  2. Pilih kode yang digunakan untuk melihat katalog dan klik Run di sisi kiri kode.

    Setelah kode dijalankan, Anda dapat melihat informasi tentang tabel di hasil.

    image.png

Gunakan katalog MongoDB

  • Jika tabel katalog MongoDB digunakan sebagai tabel sumber, Anda dapat membaca data dari koleksi MongoDB yang sesuai dengan tabel tersebut.

    INSERT INTO ${other_sink_table}
    SELECT...
    FROM `${mongodb_catalog}`.`${db_name}`.`${collection_name}`
    /*+OPTIONS('scan.incremental.snapshot.enabled'='true')*/;
    Catatan

    Jika Anda ingin menentukan parameter lain dalam klausa WITH saat menggunakan katalog MongoDB, kami sarankan Anda menggunakan petunjuk SQL untuk menambahkan parameter. Dalam pernyataan SQL sebelumnya, petunjuk SQL digunakan untuk mengaktifkan mode pembacaan paralel dalam fase snapshot awal. Untuk informasi lebih lanjut tentang parameter lainnya, lihat Konektor MongoDB.

  • Jika tabel katalog MongoDB digunakan sebagai tabel sumber, Anda dapat mengeksekusi CREATE TABLE AS (CTAS) atau CREATE DATABASE AS (CDAS) untuk menyinkronkan data dari koleksi MongoDB yang sesuai dengan tabel ke tabel tujuan.

    Penting

    Saat menggunakan pernyataan CREATE TABLE AS atau CREATE DATABASE AS untuk menyinkronkan data dari koleksi MongoDB yang sesuai dengan tabel ke tabel tujuan, pastikan persyaratan bisnis berikut terpenuhi:

    • Versi VVR adalah 8.0.6 atau lebih baru. Versi database MongoDB adalah 6.0 atau lebih baru.

    • Parameter scan.incremental.snapshot.enabled dan scan.full-changelog diatur ke true dalam petunjuk SQL.

    • Fitur preimage diaktifkan dalam database MongoDB. Untuk informasi lebih lanjut tentang cara mengaktifkan fitur preimage, lihat Preimage Dokumen.

    • Sinkronkan data dari topik tunggal secara real-time.

      CREATE TABLE IF NOT EXISTS `${target_table_name}`
      WITH(...)
      AS TABLE `${mongodb_catalog}`.`${db_name}`.`${collection_name}`
      /*+ OPTIONS('scan.incremental.snapshot.enabled'='true', 'scan.full-changelog'='true') */;
    • Sinkronkan data dari beberapa topik dalam penyebaran.

      BEGIN STATEMENT SET;
      
      CREATE TABLE IF NOT EXISTS `some_catalog`.`some_database`.`some_table0`
      AS TABLE `mongodb-catalog`.`database`.`collection0`
      /*+ OPTIONS('scan.incremental.snapshot.enabled'='true', 'scan.full-changelog'='true') */;
      
      CREATE TABLE IF NOT EXISTS `some_catalog`.`some_database`.`some_table1`
      AS TABLE `mongodb-catalog`.`database`.`collection1`
      /*+ OPTIONS('scan.incremental.snapshot.enabled'='true', 'scan.full-changelog'='true') */;
      
      CREATE TABLE IF NOT EXISTS `some_catalog`.`some_database`.`some_table2`
      AS TABLE `mongodb-catalog`.`database`.`collection2`
      /*+ OPTIONS('scan.incremental.snapshot.enabled'='true', 'scan.full-changelog'='true') */;
      
      END;

      Anda dapat menggunakan pernyataan CREATE TABLE AS bersama dengan katalog MongoDB untuk menyinkronkan data dari beberapa koleksi MongoDB dalam penyebaran. Untuk menyinkronkan data dari beberapa koleksi MongoDB dalam penyebaran, pastikan konfigurasi parameter berikut untuk semua tabel dalam penyebaran sama:

      • Parameter terkait database MongoDB, termasuk hosts, scheme, username, password, dan connectionOptions

      • scan.startup.mode

    • Sinkronkan data dari seluruh database MongoDB.

      CREATE DATABASE IF NOT EXISTS `some_catalog`.`some_database`
      AS DATABASE `mongodb-catalog`.`database`
      /*+ OPTIONS('scan.incremental.snapshot.enabled'='true', 'scan.full-changelog'='true') */;
  • Baca data dari tabel dimensi MongoDB.

    INSERT INTO ${other_sink_table}
    SELECT ...
    FROM ${other_source_table} AS e
    JOIN `${mysql_catalog}`.`${db_name}`.`${table_name}` FOR SYSTEM_TIME AS OF e.proctime AS w
    ON e.id = w.id;
  • Tulis data hasil ke tabel MongoDB.

    INSERT INTO `${mysql_catalog}`.`${db_name}`.`${table_name}`
    SELECT ...
    FROM ${other_source_table}

Hapus katalog MongoDB

Peringatan

Setelah menghapus katalog MongoDB, penyebaran yang sedang berjalan tidak terpengaruh. Namun, penyebaran yang menggunakan tabel katalog tidak dapat menemukan tabel saat penyebaran diterbitkan atau dimulai ulang. Lanjutkan dengan hati-hati saat menghapus katalog MongoDB.

  1. Di editor kode tab Scripts pada halaman SQL Editor, masukkan pernyataan berikut:

    DROP CATALOG ${catalog_name};

    ${catalog_name} menentukan nama katalog MongoDB yang ingin Anda hapus.

  2. Klik kanan pernyataan yang digunakan untuk menghapus katalog dan pilih Run dari menu pintasan.

  3. Lihat panel Catalogs di sisi kiri halaman Daftar Katalog untuk memeriksa apakah katalog telah dihapus.

Deskripsi inferensi skema

Saat katalog MongoDB menyimpulkan skema tabel, katalog MongoDB secara otomatis menambahkan parameter default dan informasi kunci utama ke tabel. Ini membantu Anda dengan mudah mempelajari detail tabel. Saat katalog MongoDB mengurai dokumen berformat BSON untuk mendapatkan skema koleksi, katalog MongoDB mencoba mengonsumsi catatan data. Jumlah maksimum catatan data yang dapat dicoba dikonsumsi oleh katalog MongoDB ditentukan oleh parameter max.fetch.records. Katalog mengurai skema setiap catatan data dan menggabungkan skema catatan data menjadi skema koleksi. Skema koleksi terdiri dari bagian-bagian berikut:

  • Kolom fisik

    Katalog MongoDB menyimpulkan kolom fisik berdasarkan dokumen berformat BSON.

  • Kunci utama default yang ditambahkan

    Untuk tabel katalog MongoDB, kolom _id digunakan sebagai kunci utama untuk mencegah duplikasi data.

Setelah katalog MongoDB mendapatkan sekelompok dokumen berformat BSON, katalog MongoDB mengurai dokumen secara berurutan dan menggabungkan kolom fisik yang diperoleh untuk mendapatkan skema koleksi berdasarkan aturan berikut: Fungsi ini menggabungkan dokumen JSON berdasarkan aturan berikut:

  • Jika bidang dalam kolom fisik yang diperoleh tidak terkandung dalam skema koleksi, katalog MongoDB secara otomatis menambahkan bidang ke skema koleksi.

  • Jika kolom fisik tertentu yang diperoleh setelah penguraian memiliki nama yang sama dengan kolom tertentu dalam skema topik, lakukan operasi berdasarkan skenario bisnis Anda:

    • Jika kolom tersebut memiliki tipe data yang sama tetapi presisi berbeda, katalog JSON Kafka menggabungkan kolom dengan presisi yang lebih besar.

    • Jika kolom tersebut memiliki tipe data yang berbeda, katalog JSON Kafka menggunakan node induk terkecil dalam struktur pohon yang ditampilkan dalam gambar berikut sebagai tipe kolom yang memiliki nama yang sama. Jika kolom tipe DECIMAL dan FLOAT digabungkan, kolom tersebut digabungkan menjadi tipe DOUBLE untuk mempertahankan presisi.

Tabel berikut menjelaskan pemetaan tipe data antara BSON dan SQL Realtime Compute for Apache Flink saat skema koleksi disimpulkan.

Tipe data BSON

Tipe data SQL Realtime Compute for Apache Flink

Boolean

BOOLEAN

Int32

INT

Int64

BIGINT

Binary

BYTES

Double

DOUBLE

Decimal128

DECIMAL

String

STRING

ObjectId

STRING

DateTime

TIMESTAMP_LTZ(3)

Timestamp

TIMESTAMP_LTZ(0)

Array

STRING

Document

STRING

Referensi

  • Untuk informasi lebih lanjut tentang cara menggunakan konektor MongoDB, lihat Konektor MongoDB.

  • Jika katalog bawaan Realtime Compute for Apache Flink tidak dapat memenuhi kebutuhan bisnis Anda, Anda dapat menggunakan katalog kustom. Untuk informasi lebih lanjut, lihat Kelola Katalog Kustom.