全部产品
Search
文档中心

Realtime Compute for Apache Flink:MongoDB

更新时间:Dec 04, 2025

Topik ini menjelaskan cara menggunakan Konektor MongoDB.

Informasi latar belakang

MongoDB adalah database berorientasi dokumen untuk data tidak terstruktur yang menyederhanakan pengembangan dan penskalaan aplikasi. Konektor MongoDB mendukung fitur-fitur berikut:

Kategori

Rincian

Jenis yang didukung

Tabel sumber, tabel dimensi, tabel sink, dan Ingesti Data

Mode eksekusi

Hanya mode stream

Metrik spesifik

Metrik pemantauan

  • Tabel sumber

    • numBytesIn

    • numBytesInPerSecond

    • numRecordsIn

    • numRecordsInPerSecond

    • numRecordsInErrors

    • currentFetchEventTimeLag

    • currentEmitEventTimeLag

    • watermarkLag

    • sourceIdleTime

  • Tabel dimensi dan tabel sink: Tidak ada.

Catatan

Untuk deskripsi setiap metrik, lihat Deskripsi metrik pemantauan.

Jenis API

DataStream, SQL, dan YAML ingesti data

Mendukung pembaruan atau penghapusan di tabel sink

Ya

Fitur khusus

Tabel sumber Change Data Capture (CDC) MongoDB menggunakan API Change Stream untuk menangkap data lengkap dan inkremental secara terintegrasi. Pertama-tama, semua data historis dibaca sebagai Snapshot, lalu beralih secara mulus ke pembacaan oplog inkremental. Proses ini memastikan tidak ada data yang hilang atau duplikat. Fitur ini juga mendukung semantik tepat-sekali untuk menjamin konsistensi data selama pemulihan kesalahan.

  • Berdasarkan API Change Stream

    Menggunakan API Change Stream MongoDB 3.6 untuk menangkap event perubahan secara efisien, seperti insert, update, replacement, dan delete, dari database atau koleksi. Event-event tersebut kemudian dikonversi menjadi aliran changelog yang dapat diproses oleh Flink.

  • Pembacaan lengkap dan inkremental terintegrasi

    Membaca Snapshot awal secara otomatis dan beralih dengan lancar ke mode inkremental tanpa intervensi manual.

  • Pembacaan Snapshot paralel

    Mendukung pembacaan paralel data historis untuk meningkatkan performa. Fitur ini memerlukan MongoDB 4.0 atau versi lebih baru.

  • Beberapa mode startup

    • initial: Melakukan Snapshot lengkap saat startup awal, lalu terus-menerus membaca oplog.

    • latest-offset: Mulai membaca dari akhir oplog saat ini tanpa membaca data historis.

    • timestamp: Mulai membaca oplog dari timestamp tertentu dan melewati Snapshot. Fitur ini memerlukan MongoDB 4.0 atau versi lebih baru.

  • Dukungan changelog lengkap

    Mendukung output changelog lengkap yang mencakup pra-gambar dan post-image dari perubahan. Fitur ini memerlukan MongoDB 6.0 atau versi lebih baru serta fitur perekaman pra-gambar/post-image harus diaktifkan.

Peningkatan integrasi Flink

Prasyarat

  • Persyaratan instans MongoDB

    • Hanya MongoDB Alibaba Cloud 3.6 atau versi lebih baru (ReplicaSet atau kluster sharded) dan instans MongoDB yang dikelola sendiri yang didukung.

    • Anda harus mengaktifkan fitur Replica Set untuk database MongoDB yang ingin dipantau. Untuk informasi selengkapnya, lihat Replication.

  • Ketergantungan fitur MongoDB

    • Untuk menggunakan fitur aliran event changelog lengkap, Anda harus mengaktifkan fitur perekaman pra-gambar/post-image.

    • Jika autentikasi diaktifkan untuk MongoDB, pengguna memerlukan izin database berikut.

      Daftar izin

      • izin splitVector

      • izin listDatabases

      • izin listCollections

      • izin collStats

      • Cari izin

      • izin changeStream

      • Izin akses untuk koleksi config.collections dan config.chunks

  • Persiapan jaringan dan lainnya untuk MongoDB

    • Konfigurasikan Daftar putih alamat IP agar Flink dapat mengakses MongoDB.

    • Koleksi MongoDB target telah dibuat.

Batasan

  • Tabel sumber CDC

    • MongoDB 4.0 dan versi lebih baru mendukung pembacaan paralel selama fase Snapshot awal. Untuk mengaktifkan mode paralel untuk Snapshot awal, atur parameter scan.incremental.snapshot.enabled ke true.

    • Karena keterbatasan langganan Change Stream MongoDB, Anda tidak dapat membaca data dari database admin, local, atau config, maupun dari koleksi sistem. Untuk informasi selengkapnya, lihat dokumentasi MongoDB.

  • Tabel sink

    • Ververica Runtime (VVR) versi sebelum 8.0.5 hanya mendukung penyisipan data.

    • Pada VVR 8.0.5 dan versi lebih baru, jika primary key dideklarasikan di tabel sink, operasi insert, update, dan delete didukung. Jika tidak ada primary key yang dideklarasikan, hanya operasi insert yang didukung.

  • Tabel dimensi

    • VVR 8.0.5 dan versi lebih baru mendukung penggunaan tabel dimensi MongoDB.

SQL

Sintaksis

CREATE TABLE tableName(
  _id STRING,
  [columnName dataType,]*
  PRIMARY KEY(_id) NOT ENFORCED
) WITH (
  'connector' = 'mongodb',
  'hosts' = 'localhost:27017',
  'username' = 'mongouser',
  'password' = '${secret_values.password}',
  'database' = 'testdb',
  'collection' = 'testcoll'
)
Catatan

Saat membuat tabel sumber CDC, Anda harus mendeklarasikan kolom _id STRING dan menetapkannya sebagai kunci primer unik.

Parameter WITH

Umum

Parameter

Deskripsi

Tipe data

Wajib

Nilai default

Keterangan

connector

Nama konektor.

String

Ya

Tidak ada

  • Sebagai tabel sumber:

    • Untuk Ververica Runtime (VVR) 8.0.4 dan versi sebelumnya, atur parameter ini ke mongodb-cdc.

    • Untuk VVR 8.0.5 dan versi lebih baru, atur parameter ini ke mongodb atau mongodb-cdc.

  • Sebagai tabel dimensi atau tabel sink, parameter ini tetap mongodb.

uri

URI koneksi MongoDB.

String

Tidak

Tidak ada

Catatan

Anda harus menentukan salah satu antara parameter uri atau hosts. Jika Anda menentukan uri, Anda tidak perlu menentukan scheme, hosts, username, password, atau connector.options. Jika kedua parameter uri dan hosts ditentukan, uri digunakan untuk koneksi.

hosts

Hostname server MongoDB.

String

Tidak

Tidak ada

Anda dapat menentukan beberapa hostname yang dipisahkan koma (,).

scheme

Protokol koneksi yang digunakan oleh MongoDB.

String

Tidak

mongodb

Nilai valid:

  • mongodb: Menghubungkan menggunakan protokol MongoDB default.

  • mongodb+srv: Menghubungkan menggunakan protokol rekaman SRV DNS.

username

Nama pengguna yang digunakan untuk menghubungkan ke MongoDB.

String

Tidak

Tidak ada

Parameter ini diperlukan jika autentikasi diaktifkan.

password

Kata sandi yang digunakan untuk menghubungkan ke MongoDB.

String

Tidak

Tidak ada

Parameter ini diperlukan jika autentikasi diaktifkan.

Penting

Untuk mencegah kebocoran kata sandi, kami menyarankan Anda menggunakan variabel untuk nilai kata sandi. Untuk informasi lebih lanjut, lihat Variabel proyek.

database

Nama database MongoDB.

String

Tidak

Tidak ada

  • Sebagai tabel sumber, nama database mendukung ekspresi reguler.

  • Jika Anda tidak mengonfigurasi parameter ini, semua database dipantau.

Penting

Anda tidak dapat memantau data di database admin, local, atau config.

collection

Nama koleksi MongoDB.

String

Tidak

Tidak ada

  • Sebagai tabel sumber, nama koleksi mendukung ekspresi reguler.

    Penting

    Jika nama koleksi yang ingin dipantau mengandung karakter khusus ekspresi reguler, Anda harus memberikan namespace lengkap (database_name.collection_name). Jika tidak, perubahan pada koleksi tersebut tidak dapat ditangkap.

  • Jika Anda tidak mengonfigurasi parameter ini, semua koleksi dipantau.

Penting

Anda tidak dapat memantau data di koleksi sistem.

connection.options

Parameter koneksi tambahan untuk MongoDB.

String

Tidak

Tidak ada

Parameter koneksi tambahan dalam format key=value, dipisahkan dengan ampersand (&). Contoh: connectTimeoutMS=12000&socketTimeoutMS=13000.

Penting

Secara default, MongoDB CDC tidak secara otomatis menetapkan timeout koneksi socket. Hal ini dapat menyebabkan gangguan panjang selama fluktuasi jaringan.

Kami menyarankan Anda selalu menetapkan socketTimeoutMS ke nilai yang wajar untuk menghindari masalah ini.

Khusus tabel sumber

Parameter

Deskripsi

Tipe data

Wajib

Nilai default

Keterangan

scan.startup.mode

Mode startup untuk sumber CDC MongoDB.

String

Tidak

initial

Nilai valid:

  • initial: Menarik semua data dari offset awal.

  • latest-offset: Menarik data perubahan dari offset saat ini.

  • timestamp: Menarik data perubahan dari timestamp tertentu.

Untuk informasi lebih lanjut, lihat Properti Startup.

scan.startup.timestamp-millis

Timestamp awal untuk konsumsi.

Long

Bergantung pada nilai scan.startup.mode

  • initial: Tidak

  • latest-offset: Tidak

  • timestamp: Ya

Tidak ada

Nilainya adalah jumlah milidetik sejak epoch Linux.

Parameter ini hanya berlaku untuk mode startup timestamp.

initial.snapshotting.queue.size

Batas ukuran antrian untuk Snapshot awal.

Integer

Tidak

10240

Parameter ini hanya berlaku ketika opsi scan.startup.mode diatur ke initial.

batch.size

Ukuran batch untuk kursor.

Integer

Tidak

1024

Tidak ada.

poll.max.batch.size

Jumlah maksimum dokumen perubahan yang diproses dalam satu batch.

Integer

Tidak

1024

Parameter ini mengontrol jumlah maksimum dokumen perubahan yang ditarik sekaligus selama pemrosesan aliran. Nilai yang lebih besar mengalokasikan buffer yang lebih besar di dalam konektor.

poll.await.time.ms

Interval waktu antara dua penarikan data.

Integer

Tidak

1000

Unitnya adalah milidetik.

heartbeat.interval.ms

Interval untuk mengirim paket heartbeat.

Integer

Tidak

0

Unitnya adalah milidetik.

Konektor CDC MongoDB secara aktif mengirim paket heartbeat ke database untuk memastikan status backtrace tetap mutakhir. Nilai 0 menunjukkan bahwa tidak ada paket heartbeat yang dikirim.

Penting

Untuk koleksi yang jarang diperbarui, kami sangat menyarankan Anda mengatur opsi ini.

scan.incremental.snapshot.enabled

Menentukan apakah akan mengaktifkan mode paralel untuk Snapshot awal.

Boolean

Tidak

false

Ini adalah fitur eksperimental.

scan.incremental.snapshot.chunk.size.mb

Ukuran chunk untuk membaca Snapshot dalam mode paralel.

Integer

Tidak

64

Ini adalah fitur eksperimental.

Unitnya adalah MB.

Parameter ini hanya berlaku ketika Snapshot paralel diaktifkan.

scan.full-changelog

Menghasilkan aliran acara changelog lengkap.

Boolean

Tidak

false

Ini adalah fitur eksperimental.

Catatan

Database MongoDB harus versi 6.0 atau lebih baru, dan fitur pra-gambar dan post-image harus diaktifkan. Untuk informasi selengkapnya tentang cara mengaktifkan fitur ini, lihat Document Preimages.

scan.flatten-nested-columns.enabled

Menentukan apakah akan mengurai nama field yang dipisahkan titik (.) sebagai dokumen BSON bersarang.

Boolean

Tidak

false

Jika parameter ini diaktifkan, dalam contoh dokumen BSON berikut, nama field col dalam skema adalah nested.col.

{"nested":{"col":true}}
Catatan

Parameter ini hanya didukung di VVR 8.0.5 dan yang lebih baru.

scan.primitive-as-string

Menentukan apakah akan mengurai semua tipe primitif dalam dokumen BSON sebagai string.

Boolean

Tidak

false

Catatan

Parameter ini hanya didukung di VVR 8.0.5 dan yang lebih baru.

scan.ignore-delete.enabled

Menentukan apakah akan mengabaikan pesan hapus (-D).

Boolean

Tidak

false

Saat mengarsipkan data dari sumber MongoDB, banyak event DELETE dapat dihasilkan di OpLog. Jika Anda tidak ingin menyinkronkan event-event ini ke sistem downstream, Anda dapat mengaktifkan parameter ini untuk mengabaikan event hapus.

Catatan
  • Parameter ini hanya didukung di VVR 11.1 dan yang lebih baru.

  • Event DELETE lain yang tidak berasal dari operasi pengarsipan juga diabaikan.

scan.incremental.snapshot.backfill.skip

Menentukan apakah akan melewatkan proses backfill dari algoritma Snapshot inkremental.

Boolean

Tidak

false

Mengaktifkan sakelar ini hanya menyediakan semantik at-least-once.

Catatan

Parameter ini hanya didukung di VVR 11.1 dan yang lebih baru.

initial.snapshotting.pipeline

Operasi pipeline MongoDB. Selama fase pembacaan Snapshot, operasi ini didorong ke MongoDB untuk memfilter hanya data yang diperlukan, sehingga meningkatkan efisiensi pembacaan.

String

Tidak

Tidak ada.

  • Formatnya adalah array objek JSON. Misalnya, [{"$match": {"closed": "false"}}] menunjukkan bahwa hanya dokumen dengan field closed bernilai "false" yang disalin.

  • Opsi ini hanya berlaku ketika opsi scan.startup.mode diatur ke initial. Opsi ini hanya dapat digunakan dalam mode Debezium dan tidak dalam mode Snapshot inkremental. Jika tidak, ketidakkonsistenan semantik dapat terjadi.

    Catatan

    Parameter ini hanya didukung di VVR 11.1 dan yang lebih baru.

initial.snapshotting.max.threads

Jumlah thread yang digunakan untuk replikasi data.

Integer

Tidak

Tidak ada.

Parameter ini hanya berlaku ketika opsi scan.startup.mode diatur ke initial.

Catatan

Parameter ini hanya didukung di VVR 11.1 dan yang lebih baru.

initial.snapshotting.queue.size

Ukuran antrian untuk snapshot awal.

Integer

Tidak

16000

Parameter ini hanya berlaku ketika opsi scan.startup.mode diatur ke initial.

Catatan

Parameter ini hanya didukung di VVR 11.1 dan yang lebih baru.

scan.change-stream.reading.parallelism

Tingkat paralelisme untuk langganan Change Stream.

Integer

Tidak

1

Parameter ini hanya berlaku ketika parameter scan.incremental.snapshot.enabled diaktifkan.

Penting

Untuk berlangganan Change Streams dengan beberapa thread konkuren, Anda juga harus mengatur parameter heartbeat.interval.ms.

Catatan

Parameter ini hanya didukung di VVR 11.2 dan yang lebih baru.

scan.change-stream.reading.queue-size

Ukuran antrian pesan untuk langganan Change Stream konkuren.

Integer

Tidak

16384

Parameter ini hanya berlaku ketika parameter scan.change-stream.reading.parallelism diaktifkan.

Catatan

Parameter ini hanya didukung di VVR 11.2 dan yang lebih baru.

Khusus tabel dimensi

Parameter

Deskripsi

Tipe data

Wajib

Nilai default

Keterangan

lookup.cache

Kebijakan cache.

String

Tidak

NONE

Dua kebijakan cache berikut didukung:

  • None: Tidak menggunakan cache.

  • Partial: Hanya menyimpan data di cache saat data tersebut dicari di database eksternal.

lookup.max-retries

Jumlah maksimum percobaan ulang jika kueri database gagal.

Integer

Tidak

3

Tidak ada.

lookup.retry.interval

Interval waktu antara percobaan ulang jika kueri database gagal.

Durasi

Tidak

1s

Tidak ada.

lookup.partial-cache.expire-after-access

Waktu maksimum catatan dapat tetap berada di cache setelah terakhir diakses.

Durasi

Tidak

Tidak ada

Unit waktu yang didukung meliputi ms, s, min, h, dan d.

Saat menggunakan parameter ini, lookup.cache harus diatur ke PARTIAL.

lookup.partial-cache.expire-after-write

Waktu maksimum catatan dapat tetap berada di cache setelah ditulis.

Durasi

Tidak

Tidak ada

Saat menggunakan parameter ini, lookup.cache harus diatur ke PARTIAL.

lookup.partial-cache.max-rows

Jumlah maksimum baris di cache. Saat nilai ini terlampaui, baris terlama akan kedaluwarsa.

Long

Tidak

Tidak ada

Saat menggunakan parameter ini, lookup.cache harus diatur ke PARTIAL.

lookup.partial-cache.cache-missing-key

Menentukan apakah akan menyimpan catatan kosong di cache saat tidak ditemukan data di tabel fisik.

Boolean

Tidak

True

Saat menggunakan parameter ini, lookup.cache harus diatur ke PARTIAL.

Khusus tabel sink

Parameter

Deskripsi

Tipe data

Wajib

Nilai default

Keterangan

sink.buffer-flush.max-rows

Jumlah maksimum catatan untuk setiap penulisan batch.

Integer

Tidak

1000

Tidak ada.

sink.buffer-flush.interval

Interval flush untuk menulis data.

Durasi

Tidak

1s

Tidak ada.

sink.delivery-guarantee

Jaminan pengiriman untuk menulis data.

String

Tidak

at-least-once

Nilai valid:

  • none

  • at-least-once

Catatan

exactly-once tidak didukung.

sink.max-retries

Jumlah maksimum percobaan ulang jika penulisan database gagal.

Integer

Tidak

3

Tidak ada.

sink.retry.interval

Interval waktu antara percobaan ulang jika penulisan database gagal.

Durasi

Tidak

1s

Tidak ada.

sink.parallelism

Tingkat paralelisme khusus untuk sink.

Integer

Tidak

Kosong

Tidak ada.

Pemetaan tipe

Tabel sumber CDC

Tipe BSON

Tipe Flink SQL

Int32

INT

Int64

BIGINT

Double

DOUBLE

Decimal128

DECIMAL(p, s)

Boolean

BOOLEAN

Date Timestamp

DATE

Date Timestamp

TIME

DateTime

TIMESTAMP(3)

TIMESTAMP_LTZ(3)

Timestamp

TIMESTAMP(0)

TIMESTAMP_LTZ(0)

String

ObjectId

UUID

Symbol

MD5

JavaScript

Regex

STRING

Binary

BYTES

Object

ROW

Array

ARRAY

DBPointer

ROW<$ref STRING, $id STRING>

GeoJSON

Point: ROW<type STRING, coordinates ARRAY<DOUBLE>>

Line: ROW<type STRING, coordinates ARRAY<ARRAY< DOUBLE>>>

Tabel dimensi dan sink

Tipe BSON

Tipe Flink SQL

Int32

INT

Int64

BIGINT

Double

DOUBLE

Decimal128

DECIMAL

Boolean

BOOLEAN

DateTime

TIMESTAMP_LTZ(3)

Timestamp

TIMESTAMP_LTZ(0)

String

ObjectId

STRING

Binary

BYTES

Object

ROW

Array

ARRAY

Contoh

Tabel sumber CDC

CREATE TEMPORARY TABLE mongo_source (
  `_id` STRING, --harus dideklarasikan
  name STRING,
  weight DECIMAL,
  tags ARRAY<STRING>,
  price ROW<amount DECIMAL, currency STRING>,
  suppliers ARRAY<ROW<name STRING, address STRING>>,
  db_name STRING METADATA FROM 'database_name' VIRTUAL,
  collection_name STRING METADATA VIRTUAL,
  op_ts TIMESTAMP_LTZ(3) METADATA VIRTUAL,
  PRIMARY KEY(_id) NOT ENFORCED
) WITH (
  'connector' = 'mongodb',
  'hosts' = 'dds-bp169b982fc25****.mongodb.rds.aliyuncs.com:3717,dds-bp169b982fc25****.mongodb.rds.aliyuncs.com:3717,',
  'username' = 'root',
  'password' = '${secret_values.password}',
  'database' = 'flinktest',
  'collection' = 'flinkcollection',
  'scan.incremental.snapshot.enabled' = 'true',
  'scan.full-changelog' = 'true'
);
CREATE TEMPORARY TABLE productssink (
  name STRING,
  weight DECIMAL,
  tags ARRAY<STRING>,
  price_amount DECIMAL,
  suppliers_name STRING,
  db_name STRING,
  collection_name STRING,
  op_ts TIMESTAMP_LTZ(3)
) WITH (
  'connector' = 'print',
  'logger' = 'true'
);
INSERT INTO productssink  
SELECT
  name,
  weight,
  tags,
  price.amount,
  suppliers[1].name,
  db_name,
  collection_name,
  op_ts
FROM
  mongo_source;

Tabel dimensi

CREATE TEMPORARY TABLE datagen_source (
  id STRING,
  a int,
  b BIGINT,
  `proctime` AS PROCTIME()
) WITH (
  'connector' = 'datagen'
);
CREATE TEMPORARY TABLE mongo_dim (
  `_id` STRING,
  name STRING,
  weight DECIMAL,
  tags ARRAY<STRING>,
  price ROW<amount DECIMAL, currency STRING>,
  suppliers ARRAY<ROW<name STRING, address STRING>>,
  PRIMARY KEY(_id) NOT ENFORCED
) WITH (
  'connector' = 'mongodb',
  'hosts' = 'dds-bp169b982fc25****.mongodb.rds.aliyuncs.com:3717,dds-bp169b982fc25****.mongodb.rds.aliyuncs.com:3717,',
  'username' = 'root',
  'password' = '${secret_values.password}',
  'database' = 'flinktest',
  'collection' = 'flinkcollection',
  'lookup.cache' = 'PARTIAL',
  'lookup.partial-cache.expire-after-access' = '10min',
  'lookup.partial-cache.expire-after-write' = '10min',
  'lookup.partial-cache.max-rows' = '100'
);
CREATE TEMPORARY TABLE print_sink (
  name STRING,
  weight DECIMAL,
  tags ARRAY<STRING>,
  price_amount DECIMAL,
  suppliers_name STRING
) WITH (
  'connector' = 'print',
  'logger' = 'true'
);
INSERT INTO print_sink
SELECT
  T.id,
  T.a,
  T.b,
  H.name
FROM
  datagen_source AS T JOIN mongo_dim FOR SYSTEM_TIME AS OF T.`proctime` AS H ON T.id = H._id;

Tabel sink

CREATE TEMPORARY TABLE datagen_source (
  `_id` STRING,
  name STRING,
  weight DECIMAL,
  tags ARRAY<STRING>,
  price ROW<amount DECIMAL, currency STRING>,
  suppliers ARRAY<ROW<name STRING, address STRING>>
) WITH (
  'connector' = 'datagen'
);
CREATE TEMPORARY TABLE mongo_sink (
  `_id` STRING,
  name STRING,
  weight DECIMAL,
  tags ARRAY<STRING>,
  price ROW<amount DECIMAL, currency STRING>,
  suppliers ARRAY<ROW<name STRING, address STRING>>,
  PRIMARY KEY(_id) NOT ENFORCED
) WITH (
  'connector' = 'mongodb',
  'hosts' = 'dds-bp169b982fc25****.mongodb.rds.aliyuncs.com:3717,dds-bp169b982fc25****.mongodb.rds.aliyuncs.com:3717,',
  'username' = 'root',
  'password' = '${secret_values.password}',
  'database' = 'flinktest',
  'collection' = 'flinkcollection'
);
INSERT INTO mongo_sink
SELECT * FROM datagen_source;

Penyerapan data

Konektor MongoDB dapat digunakan sebagai sumber data dalam pekerjaan YAML ingesti data.

Batasan

Hanya didukung di Ververica Runtime (VVR) 11.1 dan versi lebih baru.

Sintaksis

source:
   type: mongodb
   name: MongoDB Source
   hosts: localhost:33076
   username: ${mongo.username}
   password: ${mongo.password}
   database: foo_db
   collection: foo_col_.*

sink:
  type: ...

Parameter

Parameter

Deskripsi

Wajib

Tipe data

Nilai default

Keterangan

type

Tipe sumber data.

Ya

STRING

None

Diperbaiki pada MongoDB.

scheme

Protokol untuk menghubungkan ke server MongoDB.

Tidak

STRING

mongodb

Nilai yang valid:

  • mongodb

  • mongodb+srv

hosts

Alamat server MongoDB yang akan dihubungkan.

Ya

STRING

None

Anda dapat menentukan beberapa alamat yang dipisahkan koma (,).

username

Username untuk menghubungkan ke MongoDB.

Tidak

STRING

None

None.

password

Password untuk menghubungkan ke MongoDB.

Tidak

STRING

None

None.

database

Nama database MongoDB yang akan ditangkap.

Ya

STRING

None

Mendukung ekspresi reguler.

collection

Nama koleksi MongoDB yang akan ditangkap.

Ya

STRING

None

Mendukung ekspresi reguler. Anda harus mencocokkan namespace lengkap database.collection.

connection.options

Opsi koneksi tambahan yang akan ditambahkan saat menghubungkan ke server MongoDB.

Tidak

STRING

None

Pasangan key-value dalam format k=v, dipisahkan dengan ampersand (&). Contoh: replicaSet=test&connectTimeoutMS=300000

schema.inference.strategy

Strategi untuk inferensi tipe Dokumen.

Nilai yang valid adalah continuous dan static.

Tidak

STRING

continuous

Jika diatur ke continuous, sumber MongoDB terus-menerus melakukan inferensi tipe. Jika catatan berikutnya tidak cocok dengan skema saat ini, event evolusi skema dikirim untuk memperluas struktur agar dapat menampung data baru.

Jika diatur ke static, MongoDB hanya melakukan inferensi skema sekali selama fase inisialisasi.

scan.max.pre.fetch.records

Jumlah maksimum catatan yang diambil sampel dari setiap koleksi yang ditangkap selama inisialisasi dan inferensi tipe.

Tidak

INT

50

None.

scan.startup.mode

Menentukan mode startup untuk sumber data MongoDB.

Nilai yang valid adalah initial, latest-offset, timestamp, dan snapshot.

Tidak

STRING

initial

Nilai yang valid:

  • initial: Menarik semua data dari offset awal dan secara otomatis beralih ke mode inkremental.

  • latest-offset: Menarik data perubahan dari offset OpLog terbaru.

  • timestamp: Menarik data perubahan dari timestamp tertentu.

  • snapshot: Melakukan Snapshot satu kali terhadap status database saat ini.

scan.startup.timestamp-millis

Menangkap data perubahan dari timestamp tertentu saat mode startup diatur ke timestamp.

Tidak

LONG

None

None.

chunk-meta.group.size

Menetapkan batas ukuran chunk metadata.

Tidak

INT

1000

None.

scan.incremental.close-idle-reader.enabled

Menentukan apakah akan menutup reader sumber yang idle setelah beralih ke mode inkremental.

Tidak

BOOLEAN

false

None.

scan.incremental.snapshot.backfill.skip

Menentukan apakah akan melewati proses backfill dari algoritma Snapshot inkremental.

Tidak

BOOLEAN

false

Jika konektor sink yang Anda gunakan dapat secara otomatis menghapus duplikat berdasarkan primary key, mengaktifkan sakelar ini dapat mengurangi waktu yang diperlukan untuk transisi dari lengkap ke inkremental.

scan.incremental.snapshot.unbounded-chunk-first.enabled

Menentukan apakah akan membaca chunk tak terbatas terlebih dahulu saat menjalankan algoritma Snapshot inkremental.

Tidak

BOOLEAN

false

Jika koleksi yang Anda snapshot sering diperbarui, mengaktifkan fitur ini dapat mengurangi kemungkinan error kehabisan memori saat membaca chunk tak terbatas.

batch.size

Ukuran batch kursor untuk membaca data MongoDB.

Tidak

INT

1024

None.

poll.max.batch.size

Jumlah maksimum entri per permintaan saat menarik Change Stream.

Tidak

INT

1024

None.

poll.await.time.ms

Waktu tunggu minimum antara dua permintaan saat menarik Change Stream.

Tidak

INT

1000

Unitnya adalah milidetik.

heartbeat.interval.ms

Interval pengiriman paket heartbeat.

Tidak

INT

0

Unitnya adalah milidetik.

Konektor CDC MongoDB secara aktif mengirim paket heartbeat ke database untuk memastikan status backtrace tetap mutakhir. Nilai 0 menunjukkan bahwa tidak ada paket heartbeat yang dikirim.

Catatan

Untuk koleksi yang jarang diperbarui, kami sangat menyarankan Anda mengatur opsi ini.

scan.incremental.snapshot.chunk.size.mb

Ukuran chunk selama fase Snapshot.

Tidak

INT

64

Unitnya adalah MB.

scan.incremental.snapshot.chunk.samples

Jumlah sampel yang diambil saat menentukan ukuran koleksi selama fase Snapshot.

Tidak

INT

20

None.

scan.full-changelog

Menentukan apakah akan menghasilkan aliran event changelog lengkap berdasarkan catatan Mongo Pre- dan Post-Image.

Tidak

BOOLEAN

false

Database MongoDB harus versi 6.0 atau lebih baru, dan fitur pra-gambar dan post-image harus diaktifkan. Untuk informasi selengkapnya tentang cara mengaktifkan fitur ini, lihat Document Preimages.

scan.cursor.no-timeout

Menentukan apakah akan mengatur kursor untuk membaca data agar tidak pernah kedaluwarsa.

Tidak

BOOLEAN

false

Server MongoDB biasanya menutup kursor setelah idle selama periode waktu tertentu (10 menit) untuk mencegah penggunaan memori berlebihan. Mengatur opsi ini ke true mencegah hal tersebut terjadi.

scan.ignore-delete.enabled

Menentukan apakah akan mengabaikan catatan event hapus dari sumber MongoDB.

Tidak

BOOLEAN

false

None.

scan.flatten.nested-documents.enabled

Menentukan apakah akan meratakan struktur bersarang dalam dokumen BSON.

Tidak

BOOLEAN

false

Saat opsi ini diaktifkan, skema seperti {"doc": {"foo": 1, "bar": "two"}} diperluas menjadi doc.foo INT, doc.bar STRING.

scan.all.primitives.as-string.enabled

Menentukan apakah akan menginferensi semua tipe primitif sebagai STRING.

Tidak

BOOLEAN

false

Mengaktifkan opsi ini dapat mencegah banyak event evolusi skema saat data input bercampur.

Pemetaan tipe

Tipe BSON

Tipe CDC

Catatan

STRING

VARCHAR

None.

INT32

INT

INT64

BIGINT

DECIMAL128

DECIMAL

DOUBLE

DOUBLE

BOOLEAN

BOOLEAN

TIMESTAMP

TIMESTAMP

DATETIME

LOCALZONEDTIMESTAMP

BINARY

VARBINARY

DOCUMENT

MAP

Parameter tipe kunci dan nilai perlu diinferensi.

ARRAY

ARRAY

Parameter tipe elemen perlu diinferensi.

OBJECTID

VARCHAR

Direpresentasikan sebagai string heksadesimal.

SYMBOL

REGULAREXPRESSION

JAVASCRIPT

JAVASCRIPTWITHSCOPE

VARCHAR

Direpresentasikan sebagai string.

Metadata

Konektor SQL

Tabel sumber SQL CDC MongoDB mendukung kolom metadata. Anda dapat menggunakan kolom-kolom ini untuk mengakses metadata berikut.

Kunci metadata

Tipe metadata

Deskripsi

database_name

STRING NOT NULL

Nama database yang berisi dokumen.

collection_name

STRING NOT NULL

Nama koleksi yang berisi dokumen.

op_ts

TIMESTAMP_LTZ(3) NOT NULL

Waktu dokumen diubah di database. Jika dokumen berasal dari data historis tabel dan bukan dari Change Stream, nilai ini selalu 0.

row_kind

STRING NOT NULL

Menunjukkan jenis perubahan data. Nilai yang valid:

  • +I: INSERT

  • -D: DELETE

  • -U: UPDATE_BEFORE

  • +U: UPDATE_AFTER

Catatan

Hanya didukung di VVR 11.1 dan yang lebih baru.

YAML Ingesti Data

Konektor YAML ingesti data CDC MongoDB mendukung pembacaan kolom metadata berikut:

Kunci metadata

Tipe metadata

Deskripsi

ts_ms

BIGINT NOT NULL

Waktu dokumen diubah di database. Jika dokumen berasal dari data historis tabel dan bukan dari Change Stream, nilai ini selalu 0.

Selain itu, Anda dapat menggunakan kolom metadata umum yang disediakan oleh modul Transform untuk mengakses informasi nama database, nama koleksi, dan row_kind.

Tentang fitur perekaman pra-gambar dan post-image MongoDB

Secara default, versi MongoDB sebelum 6.0 tidak menyediakan pra-gambar dokumen yang diubah atau data dokumen yang dihapus. Jika fitur perekaman pra-gambar dan post-image tidak diaktifkan, hanya semantik upsert yang dapat dicapai karena entri data update-before tidak tersedia. Namun, banyak operator berguna di Flink bergantung pada aliran perubahan lengkap yang terdiri dari event insert, update-before, update-after, dan delete.

Untuk melengkapi event update-before yang hilang, Flink SQL Planner secara otomatis menghasilkan node ChangelogNormalize untuk sumber data upsert. Node ini menyimpan cache versi terkini semua dokumen di state Flink. Saat dokumen yang diperbarui atau dihapus ditemui, node tersebut dapat mengkueri statenya untuk mengambil pra-gambar. Namun, node operator ini memerlukan jumlah data state yang besar.

image.png

MongoDB 6.0 mendukung fitur perekaman pra-gambar dan post-image. Untuk informasi selengkapnya, lihat Gunakan Change Streams MongoDB untuk menangkap perubahan data real-time. Saat fitur ini diaktifkan, MongoDB mencatat status lengkap dokumen sebelum dan sesudah setiap perubahan di koleksi khusus. Jika Anda mengaktifkan parameter scan.full-changelog dalam pekerjaan Anda, konektor CDC MongoDB menghasilkan catatan update-before dari catatan dokumen perubahan. Hal ini menghasilkan aliran event lengkap dan menghilangkan ketergantungan pada node ChangelogNormalize.

Mongo CDC DataStream API

Penting

Saat membaca dan menulis data menggunakan API DataStream, Anda harus menggunakan konektor DataStream yang sesuai untuk terhubung ke Flink. Untuk informasi selengkapnya, lihat Gunakan konektor DataStream.

Untuk menggunakan MongoDBSource, buat program API DataStream. Kode berikut memberikan contoh:

Java

MongoDBSource.builder()
  .hosts("mongo.example.com:27017")
  .username("mongouser")
  .password("mongopasswd")
  .databaseList("testdb")
  .collectionList("testcoll")
  .startupOptions(StartupOptions.initial())
  .deserializer(new JsonDebeziumDeserializationSchema())
  .build();

XML

Konektor MongoDB VVR tersedia di Maven Central Repository untuk digunakan langsung dalam pengembangan pekerjaan Anda.

<dependency>
    <groupId>com.alibaba.ververica</groupId>
    <artifactId>flink-connector-mongodb</artifactId>
    <version>${vvr.version}</version>
</dependency>
Catatan

Saat menggunakan API DataStream, jika Anda ingin mengaktifkan fitur Snapshot inkremental, gunakan MongoDBSource#builder() dari paket com.ververica.cdc.connectors.mongodb.source untuk membuat MongoDBSource. Jika tidak, gunakan MongoDBSource#builder() dari paket com.ververica.cdc.connectors.mongodb.

Saat membuat MongoDBSource, Anda dapat mengonfigurasi parameter berikut:

Parameter

Deskripsi

hosts

Hostname database MongoDB yang akan dihubungkan.

username

Nama pengguna untuk layanan database MongoDB.

Catatan

Jika autentikasi tidak diaktifkan di server MongoDB, Anda tidak perlu mengonfigurasi parameter ini.

password

Kata sandi untuk layanan database MongoDB.

Catatan

Jika autentikasi tidak diaktifkan di server MongoDB, Anda tidak perlu mengonfigurasi parameter ini.

databaseList

Nama database MongoDB yang akan dipantau.

Catatan

Nama database mendukung ekspresi reguler untuk membaca data dari beberapa database. Anda dapat menggunakan .* untuk mencocokkan semua database.

collectionList

Nama koleksi MongoDB yang akan dipantau.

Catatan

Nama koleksi mendukung ekspresi reguler untuk membaca data dari beberapa koleksi. Anda dapat menggunakan .* untuk mencocokkan semua koleksi.

startupOptions

Mode startup untuk CDC MongoDB.

Nilai valid:

  • StartupOptions.initial()

    • Menarik semua data dari offset awal.

  • StartupOptions.latest-offset()

    • Menarik data perubahan dari offset saat ini.

  • StartupOptions.timestamp()

    • Menarik data perubahan dari timestamp tertentu.

Untuk informasi lebih lanjut, lihat Properti Startup.

deserializer

Deserializer untuk mendeserialisasi catatan bertipe SourceRecord ke tipe tertentu. Nilai yang valid:

  • MongoDBConnectorDeserializationSchema: Mengonversi SourceRecord yang dihasilkan dalam mode upsert ke struktur data internal RowData API Tabel atau SQL Flink.

  • MongoDBConnectorFullChangelogDeserializationSchema: Mengonversi SourceRecord yang dihasilkan dalam mode changelog lengkap ke struktur data internal RowData Tabel atau SQL Flink.

  • JsonDebeziumDeserializationSchema: Mengonversi SourceRecord ke string berformat JSON.