All Products
Search
Document Center

Realtime Compute for Apache Flink:Manage MongoDB catalogs

Last Updated:Mar 18, 2026

Katalog MongoDB memungkinkan Anda mengakses koleksi MongoDB dari Realtime Compute for Apache Flink tanpa perlu menentukan skema koleksi secara manual. Katalog ini secara otomatis mengurai dokumen berformat Binary JSON (BSON) untuk menginferensi skema setiap koleksi, sehingga Anda dapat langsung melakukan kueri terhadap bidang tertentu dalam Flink SQL tanpa menulis pernyataan DDL. Topik ini menjelaskan cara membuat, melihat, menggunakan, dan menghapus katalog MongoDB.

Informasi latar belakang

Katalog MongoDB secara otomatis menginferensi skema koleksi dengan mengurai dokumen berformat BSON. Karena nama tabel dalam katalog MongoDB sesuai dengan nama koleksi MongoDB yang bersangkutan, Anda tidak perlu mendaftarkan tabel MongoDB menggunakan pernyataan DDL. Hal ini mengurangi beban konfigurasi awal dan meningkatkan akurasi data.

Tabel dalam katalog MongoDB dapat berfungsi sebagai tabel sumber, tabel dimensi, dan tabel hasil dalam penerapan SQL Realtime Compute for Apache Flink.

Pada Realtime Compute for Apache Flink dengan Ververica Runtime (VVR) 8.0.6 atau versi yang lebih baru, Anda dapat menggunakan katalog MongoDB bersama dengan pernyataan CREATE TABLE AS atau pernyataan CREATE DATABASE AS untuk menyinkronkan perubahan skema tabel.

Topik ini mencakup operasi berikut:

Sebelum memulai

Konfirmasikan hal-hal berikut sebelum bekerja dengan katalog MongoDB:

  • Ruang kerja Realtime Compute for Apache Flink Anda menggunakan VVR 8.0.5 atau versi yang lebih baru. Diperlukan VVR 8.0.6 atau versi yang lebih baru jika Anda berencana menggunakan pernyataan CREATE TABLE AS atau CREATE DATABASE AS.

Batasan

  • Hanya Realtime Compute for Apache Flink dengan VVR 8.0.5 atau versi yang lebih baru yang mendukung katalog MongoDB.

  • Anda tidak dapat memodifikasi katalog MongoDB yang sudah ada menggunakan pernyataan DDL. Untuk memperbarui pengaturan koneksi, hapus katalog tersebut lalu buat ulang.

  • Katalog MongoDB hanya mendukung akses baca. Anda tidak dapat membuat, memodifikasi, atau menghapus database dan tabel melalui katalog MongoDB.

Buat katalog MongoDB

Penting

Jika database MongoDB Anda memerlukan autentikasi, parameter password menerima nilai teks biasa secara default. Untuk mencegah paparan kredensial, gunakan fitur manajemen kunci untuk menyimpan password sebagai variabel. Untuk informasi selengkapnya, lihat Manage variables and keys.

Catatan

Pada pernyataan SQL berikut, ganti semua nilai placeholder yang diapit tanda kurung lancip (< >). Anda harus menghapus tanda kurung lancip tersebut saat menggantinya dengan nilai aktual Anda. Membiarkan tanda kurung lancip akan menyebabkan error pemeriksaan sintaksis.

  1. Pada editor kode di tab Scripts pada halaman SQL Editor, masukkan pernyataan untuk membuat katalog MongoDB.

    CREATE CATALOG <yourcatalogname> WITH(
     'type'='mongodb',
     'default-database'='<dbName>',
     'hosts'='<hosts>',
     'scheme'='<scheme>',
     'username'='<username>',
     'password'='<password>',
     'connection.options'='<connectionOptions>',
     'max.fetch.records'='100',
     'scan.flatten-nested-columns.enabled'='<flattenNestedColumns>',
     'scan.primitive-as-string'='<primitiveAsString>'
    );

    Tabel berikut menjelaskan parameter-parameter tersebut.

    Parameter

    Tipe data

    Deskripsi

    Wajib

    Nilai default

    Keterangan

    yourcatalogname

    STRING

    Nama katalog MongoDB.

    Ya

    N/A

    Masukkan nama kustom.

    type

    STRING

    Jenis katalog.

    Ya

    N/A

    Tetapkan nilainya ke mongodb.

    hosts

    STRING

    Hostname instans MongoDB.

    Ya

    N/A

    Pisahkan beberapa hostname dengan koma (,).

    default-database

    STRING

    Nama database MongoDB default.

    Ya

    N/A

    N/A.

    scheme

    STRING

    Protokol koneksi untuk database MongoDB.

    Tidak

    mongodb

    Nilai yang valid:

    • mongodb (menggunakan protokol MongoDB standar)

    • mongodb+srv (menggunakan protokol rekaman DNS SRV)

    username

    STRING

    Username untuk menghubungkan ke database MongoDB.

    Tidak

    N/A

    Diperlukan jika autentikasi diaktifkan pada database MongoDB.

    password

    STRING

    Password untuk menghubungkan ke database MongoDB.

    Tidak

    N/A

    Diperlukan jika autentikasi diaktifkan pada database MongoDB. Untuk mencegah paparan kredensial, gunakan fitur manajemen kunci alih-alih menentukan password dalam bentuk teks biasa. Lihat Manage variables and keys.

    connection.options

    STRING

    Parameter koneksi tambahan untuk database MongoDB.

    Tidak

    N/A

    Tentukan sebagai pasangan kunci-nilai dalam format key=value, dipisahkan oleh tanda ampersand (&). Contoh: connectTimeoutMS=12000&socketTimeoutMS=13000.

    max.fetch.records

    INT

    Jumlah maksimum dokumen yang dicoba dibaca oleh katalog MongoDB saat menginferensi skema koleksi.

    Tidak

    100

    N/A.

    scan.flatten-nested-columns.enabled

    BOOLEAN

    Menentukan apakah kolom bersarang diperluas secara rekursif saat mengurai dokumen berformat BSON.

    Tidak

    false

    Nilai yang valid:

    • true: Kolom bersarang diperluas; jalur kolom menjadi nama kolom, misalnya {"nested": {"col": true}} menjadi nested.col.

    • false: Dokumen bersarang diurai sebagai STRING.

    Penting

    Parameter ini hanya berlaku untuk tabel katalog MongoDB yang digunakan sebagai tabel sumber dalam penerapan Flink SQL.

    scan.primitive-as-string

    BOOLEAN

    Menentukan apakah semua tipe data primitif diinferensi sebagai STRING saat mengurai dokumen berformat BSON.

    Tidak

    false

    Nilai yang valid:

    • true: Semua tipe primitif diinferensi sebagai STRING.

    • false: Tipe diinferensi berdasarkan pemetaan tipe BSON ke Flink. Untuk pemetaan tipe, lihat Deskripsi inferensi skema.

  2. Pilih kode untuk membuat katalog dan klik Run di sisi kiri editor kode.

    image.png

  3. Di panel Catalogs di sisi kiri halaman Catalog List, verifikasi bahwa katalog baru muncul.

Tampilkan katalog MongoDB

  1. Pada editor kode di tab Scripts pada halaman SQL Editor, masukkan pernyataan berikut:

    DESCRIBE `${catalog_name}`.`${db_name}`.`${collection_name}`;

    Parameter

    Deskripsi

    ${catalog_name}

    Nama katalog MongoDB.

    ${db_name}

    Nama database ApsaraDB for MongoDB.

    ${collection_name}

    Nama koleksi ApsaraDB for MongoDB.

  2. Pilih kode tersebut dan klik Run di sisi kiri editor kode.

    Setelah pernyataan dijalankan, detail tabel akan muncul di panel hasil.

    image.png

Gunakan katalog MongoDB

Gunakan sebagai tabel sumber

Gunakan pola berikut untuk membaca data dari koleksi MongoDB melalui tabel katalog:

INSERT INTO ${other_sink_table}
SELECT...
FROM `${mongodb_catalog}`.`${db_name}`.`${collection_name}`
/*+OPTIONS('scan.incremental.snapshot.enabled'='true')*/;
Catatan

Untuk menentukan parameter tambahan klausa WITH saat menggunakan katalog MongoDB, gunakan Petunjuk SQL alih-alih mengubah definisi katalog. Contoh di atas menggunakan Petunjuk SQL untuk mengaktifkan pembacaan paralel selama fase snapshot awal. Untuk semua parameter yang tersedia, lihat MongoDB connector.

Gunakan dengan CTAS/CDAS untuk sinkronisasi data

Gunakan pernyataan CREATE TABLE AS atau pernyataan CREATE DATABASE AS untuk menyinkronkan data dari koleksi MongoDB ke tabel tujuan.

Penting

Sebelum menggunakan pernyataan ini, pastikan semua kondisi berikut terpenuhi:

  • Versi VVR adalah 8.0.6 atau lebih baru, dan versi database MongoDB adalah 6.0 atau lebih baru.

  • Parameter scan.incremental.snapshot.enabled dan scan.full-changelog ditetapkan ke true dalam Petunjuk SQL.

  • Fitur preimage diaktifkan di database MongoDB. Untuk petunjuknya, lihat Document Preimages.

Sinkronkan data dari satu koleksi secara real time:

CREATE TABLE IF NOT EXISTS `${target_table_name}`
WITH(...)
AS TABLE `${mongodb_catalog}`.`${db_name}`.`${collection_name}`
/*+ OPTIONS('scan.incremental.snapshot.enabled'='true', 'scan.full-changelog'='true') */;

Sinkronkan data dari beberapa koleksi dalam satu penerapan:

BEGIN STATEMENT SET;

CREATE TABLE IF NOT EXISTS `some_catalog`.`some_database`.`some_table0`
AS TABLE `mongodb-catalog`.`database`.`collection0`
/*+ OPTIONS('scan.incremental.snapshot.enabled'='true', 'scan.full-changelog'='true') */;

CREATE TABLE IF NOT EXISTS `some_catalog`.`some_database`.`some_table1`
AS TABLE `mongodb-catalog`.`database`.`collection1`
/*+ OPTIONS('scan.incremental.snapshot.enabled'='true', 'scan.full-changelog'='true') */;

CREATE TABLE IF NOT EXISTS `some_catalog`.`some_database`.`some_table2`
AS TABLE `mongodb-catalog`.`database`.`collection2`
/*+ OPTIONS('scan.incremental.snapshot.enabled'='true', 'scan.full-changelog'='true') */;

END;

Saat menyinkronkan beberapa koleksi MongoDB dalam satu penerapan menggunakan pernyataan CREATE TABLE AS, parameter berikut harus dikonfigurasi secara identik untuk semua tabel:

  • Parameter koneksi database MongoDB: hosts, scheme, username, password, dan connection.options

  • scan.startup.mode

Sinkronkan data dari seluruh database MongoDB:

CREATE DATABASE IF NOT EXISTS `some_catalog`.`some_database`
AS DATABASE `mongodb-catalog`.`database`
/*+ OPTIONS('scan.incremental.snapshot.enabled'='true', 'scan.full-changelog'='true') */;

Gunakan sebagai tabel dimensi

INSERT INTO ${other_sink_table}
SELECT ...
FROM ${other_source_table} AS e
JOIN `${mysql_catalog}`.`${db_name}`.`${table_name}` FOR SYSTEM_TIME AS OF e.proctime AS w
ON e.id = w.id;

Gunakan sebagai tabel hasil

INSERT INTO `${mysql_catalog}`.`${db_name}`.`${table_name}`
SELECT ...
FROM ${other_source_table}

Hapus katalog MongoDB

Peringatan

Setelah Anda menghapus katalog MongoDB, penerapan yang sedang berjalan tidak terpengaruh. Namun, penerapan apa pun yang mereferensikan tabel dalam katalog yang dihapus akan gagal menemukan tabel tersebut saat penerapan tersebut dipublikasikan ulang atau dimulai ulang. Lakukan dengan hati-hati.

  1. Pada editor kode di tab Scripts pada halaman SQL Editor, masukkan pernyataan berikut:

    DROP CATALOG ${catalog_name};

    ${catalog_name} menentukan nama katalog MongoDB yang ingin Anda hapus.

  2. Klik kanan pernyataan tersebut dan pilih Run dari menu pintasan untuk menghapus katalog.

  3. Di panel Catalogs di sisi kiri halaman Catalog List, verifikasi bahwa katalog tersebut tidak lagi muncul.

Deskripsi inferensi skema

Saat katalog MongoDB menginferensi skema tabel, katalog tersebut secara otomatis menambahkan parameter default dan informasi primary key. Saat katalog mengurai dokumen berformat BSON untuk mendapatkan skema koleksi, katalog tersebut mencoba membaca hingga jumlah catatan data yang ditentukan oleh parameter max.fetch.records. Katalog tersebut mengurai skema setiap catatan dan menggabungkan semua skema yang diurai menjadi satu skema koleksi.

Skema koleksi terdiri dari bagian-bagian berikut:

  • Kolom fisik: Katalog MongoDB menginferensi kolom fisik dari dokumen berformat BSON.

  • Primary key default: Untuk setiap tabel dalam katalog MongoDB, kolom _id berfungsi sebagai primary key untuk mencegah data duplikat.

Setelah katalog MongoDB membaca serangkaian dokumen berformat BSON, katalog tersebut menguraikannya secara berurutan dan menggabungkan kolom fisik ke dalam skema koleksi menggunakan aturan berikut. Katalog MongoDB menggabungkan dokumen berformat BSON berdasarkan aturan berikut:

  • Jika suatu bidang dalam kolom fisik yang diurai tidak ada dalam skema koleksi saat ini, katalog MongoDB menambahkan bidang tersebut ke skema.

  • Jika kolom fisik yang diurai memiliki nama yang sama dengan kolom yang sudah ada dalam skema koleksi, terapkan logika penggabungan berikut berdasarkan tipe data Anda:

    • Jika kolom memiliki tipe data yang sama tetapi berbeda dalam presisi, katalog MongoDB mempertahankan kolom dengan presisi yang lebih besar.

    • Jika kolom memiliki tipe data yang berbeda, katalog MongoDB menggunakan tipe induk umum terkecil dari hierarki tipe yang ditunjukkan dalam gambar berikut. Jika kolom bertipe DECIMAL dan FLOAT digabung, hasilnya adalah DOUBLE untuk mempertahankan presisi.

      Type hierarchy diagram

Tabel berikut menjelaskan pemetaan tipe data antara BSON dan Realtime Compute for Apache Flink SQL saat skema koleksi diinferensi.

Tipe data BSON

Tipe data Realtime Compute for Apache Flink SQL

Boolean

BOOLEAN

Int32

INT

Int64

BIGINT

Binary

BYTES

Double

DOUBLE

Decimal128

DECIMAL

String

STRING

ObjectId

STRING

DateTime

TIMESTAMP_LTZ(3)

Timestamp

TIMESTAMP_LTZ(0)

Array

STRING

Document

STRING

Referensi

  • Untuk informasi selengkapnya tentang cara menggunakan MongoDB connector, lihat MongoDB connector.

  • Jika katalog bawaan Realtime Compute for Apache Flink tidak memenuhi kebutuhan Anda, Anda dapat menggunakan katalog kustom. Untuk informasi selengkapnya, lihat Manage custom catalogs.