全部产品
Search
文档中心

Realtime Compute for Apache Flink:MongoDB

更新时间:Dec 19, 2025

Topik ini menjelaskan cara menggunakan Konektor MongoDB.

Informasi latar belakang

MongoDB adalah database berorientasi dokumen untuk data tidak terstruktur yang menyederhanakan pengembangan dan penskalaan aplikasi. Konektor MongoDB mendukung fitur-fitur berikut:

Kategori

Rincian

Jenis yang didukung

Tabel sumber, tabel dimensi, tabel sink, dan Ingesti Data

Mode runtime

Hanya mode stream yang didukung.

Metrik pemantauan unik

Metrik pemantauan

  • Tabel sumber

    • numBytesIn

    • numBytesInPerSecond

    • numRecordsIn

    • numRecordsInPerSecond

    • numRecordsInErrors

    • currentFetchEventTimeLag

    • currentEmitEventTimeLag

    • watermarkLag

    • sourceIdleTime

  • Tabel dimensi dan tabel sink: Tidak ada.

Catatan

Untuk informasi selengkapnya, lihat Deskripsi metrik.

Jenis API

DataStream, SQL, dan YAML ingesti data

Dukungan untuk pembaruan atau penghapusan di tabel sink

Ya

Fitur

Tabel sumber Change Data Capture (CDC) MongoDB menggunakan API Change Stream untuk menangkap data lengkap maupun inkremental. Pertama-tama, tabel ini membaca Snapshot lengkap dari data historis, lalu beralih secara mulus ke pembacaan oplog inkremental. Proses ini memastikan bahwa data tidak mengalami duplikasi maupun kehilangan. Konektor ini juga mendukung semantik Exactly-Once untuk menjamin konsistensi data selama pemulihan kesalahan.

  • Berdasarkan API Change Stream

    Menggunakan API Change Stream dari MongoDB 3.6 untuk menangkap perubahan seperti insert, update, replacement, dan delete dari database atau koleksi secara efisien. Perubahan tersebut dikonversi menjadi aliran changelog yang dapat diproses oleh Flink.

  • Integrasi lengkap dan inkremental

    Membaca Snapshot awal secara otomatis dan beralih dengan lancar ke mode inkremental tanpa intervensi manual.

  • Pembacaan Snapshot paralel

    Mendukung pembacaan paralel data historis untuk meningkatkan performa. Fitur ini memerlukan MongoDB 4.0 atau versi lebih baru.

  • Beberapa mode startup

    • initial: Melakukan Snapshot lengkap saat startup pertama kali, lalu terus-menerus membaca oplog.

    • latest-offset: Memulai dari akhir oplog saat ini tanpa membaca data historis.

    • timestamp: Mulai membaca oplog dari timestamp tertentu dan melewati Snapshot. Fitur ini memerlukan MongoDB 4.0 atau versi lebih baru.

  • Dukungan changelog lengkap

    Mendukung keluaran changelog lengkap yang mencakup pra-gambar dan post-image (untuk MongoDB 6.0 atau versi lebih baru dengan fitur pencatatan pra-gambar/post-image diaktifkan).

Peningkatan integrasi Flink

Prasyarat

  • Persyaratan instans MongoDB

    • Hanya MongoDB Alibaba Cloud 3.6 atau versi lebih baru (ReplicaSet atau kluster sharded) dan MongoDB self-managed 3.6 atau versi lebih baru yang didukung.

    • Fitur ReplicaSet harus diaktifkan untuk database MongoDB yang ingin Anda pantau. Untuk informasi selengkapnya, lihat Replication.

  • Ketergantungan fitur MongoDB

    • Untuk menggunakan fitur aliran event Full Changelog, Anda harus mengaktifkan fitur pencatatan pra-gambar/post-image.

    • Jika autentikasi diaktifkan untuk MongoDB, Anda memerlukan izin database berikut.

      Daftar izin

      • izin splitVector

      • izin listDatabases

      • izin listCollections

      • izin collStats

      • Cari izin

      • izin changeStream

      • Izin akses untuk koleksi config.collections dan config.chunks

  • Persiapan jaringan dan lainnya untuk MongoDB

    • Daftar putih alamat IP telah dikonfigurasi untuk mengizinkan Flink mengakses MongoDB.

    • Koleksi dan data MongoDB target telah dibuat.

Batasan

  • Tabel sumber CDC

    • MongoDB 4.0 dan versi lebih baru mendukung pembacaan paralel selama fase Snapshot awal. Untuk mengaktifkan mode paralel pada Snapshot awal, atur parameter scan.incremental.snapshot.enabled ke true.

    • Karena batasan langganan Change Stream MongoDB, Anda tidak dapat membaca data dari database admin, local, atau config, maupun dari koleksi sistem. Untuk informasi selengkapnya, lihat dokumentasi MongoDB.

  • Tabel sink

    • Ververica Runtime (VVR) versi sebelum 8.0.5 hanya mendukung operasi insert data.

    • Pada VVR 8.0.5 dan versi lebih baru, jika primary key dideklarasikan di tabel sink, Anda dapat melakukan insert, update, dan delete data. Jika tidak ada primary key yang dideklarasikan, Anda hanya dapat melakukan insert data.

  • Tabel dimensi

    • VVR 8.0.5 dan versi lebih baru mendukung penggunaan tabel dimensi MongoDB.

SQL

Sintaksis

CREATE TABLE tableName(
  _id STRING,
  [columnName dataType,]*
  PRIMARY KEY(_id) NOT ENFORCED
) WITH (
  'connector' = 'mongodb',
  'hosts' = 'localhost:27017',
  'username' = 'mongouser',
  'password' = '${secret_values.password}',
  'database' = 'testdb',
  'collection' = 'testcoll'
)
Catatan

Saat membuat tabel sumber CDC, Anda harus mendeklarasikan kolom _id STRING dan menetapkannya sebagai kunci primer unik.

Parameter WITH

Parameter umum

Parameter

Deskripsi

Tipe data

Wajib

Nilai default

Catatan

connector

Nama konektor.

String

Ya

Tidak ada

  • Sebagai tabel sumber:

    • Untuk VVR 8.0.4 dan versi sebelumnya, atur nilai ini ke mongodb-cdc.

    • Untuk VVR 8.0.5 dan versi lebih baru, atur nilai ini ke mongodb atau mongodb-cdc.

  • Sebagai tabel dimensi atau sink, nilainya harus mongodb.

uri

URI koneksi MongoDB.

String

Tidak

Tidak ada

Catatan

Anda harus menentukan salah satu dari parameter uri atau hosts. Jika Anda menentukan uri, Anda tidak perlu menentukan parameter scheme, hosts, username, password, atau connector.options. Jika kedua parameter ditentukan, uri akan digunakan untuk koneksi.

hosts

Hostname instans MongoDB.

String

Tidak

Tidak ada

Anda dapat memberikan beberapa hostname yang dipisahkan koma (,).

scheme

Protokol koneksi yang digunakan oleh MongoDB.

String

Tidak

mongodb

Nilai valid:

  • mongodb: Menghubungkan menggunakan protokol MongoDB default.

  • mongodb+srv: Menghubungkan menggunakan protokol rekaman SRV DNS.

username

Username untuk menghubungkan ke MongoDB.

String

Tidak

Tidak ada

Parameter ini wajib jika verifikasi identitas diaktifkan.

password

Password untuk menghubungkan ke MongoDB.

String

Tidak

Tidak ada

Parameter ini wajib jika verifikasi identitas diaktifkan.

Penting

Untuk mencegah kebocoran password, kami menyarankan agar Anda menggunakan variabel untuk mengatur nilai password. Untuk informasi selengkapnya, lihat Variabel Proyek.

database

Nama database MongoDB.

String

Tidak

Tidak ada

  • Untuk tabel sumber, nama database mendukung ekspresi reguler.

  • Jika parameter ini tidak dikonfigurasi, semua database akan dipantau.

Penting

Pemantauan data di database admin, local, atau config tidak didukung.

collection

Nama koleksi MongoDB.

String

Tidak

Tidak ada

  • Untuk tabel sumber, nama koleksi mendukung ekspresi reguler.

    Penting

    Jika nama koleksi yang ingin Anda pantau mengandung karakter khusus ekspresi reguler, Anda harus memberikan namespace lengkap (database_name.collection_name). Jika tidak, perubahan pada koleksi terkait tidak dapat ditangkap.

  • Jika parameter ini tidak dikonfigurasi, semua koleksi akan dipantau.

Penting

Pemantauan data di koleksi sistem tidak didukung.

connection.options

Parameter koneksi untuk sisi MongoDB.

String

Tidak

Tidak ada

Parameter koneksi tambahan dalam format key=value, dipisahkan dengan tanda ampersand (&). Contoh: connectTimeoutMS=12000&socketTimeoutMS=13000.

Penting

Secara default, MongoDB CDC tidak mengatur timeout koneksi socket secara otomatis. Hal ini dapat menyebabkan gangguan panjang selama fluktuasi jaringan.

Atur socketTimeoutMS ke nilai yang wajar untuk menghindari masalah ini.

Khusus tabel sumber

Parameter

Deskripsi

Tipe data

Wajib

Nilai default

Catatan

scan.startup.mode

Mode startup untuk MongoDB CDC.

String

Tidak

initial

Nilai valid:

  • initial: Menarik semua data dari offset awal.

  • latest-offset: Menarik data perubahan dari offset saat ini.

  • timestamp: Menarik data perubahan dari timestamp tertentu.

Untuk informasi lebih lanjut, lihat Properti Startup.

scan.startup.timestamp-millis

Timestamp awal untuk konsumsi pada offset tertentu.

Long

Bergantung pada nilai scan.startup.mode:

  • initial: Tidak

  • latest-offset: Tidak

  • timestamp: Ya

Tidak ada

Formatnya adalah jumlah milidetik sejak epoch Linux.

Parameter ini hanya berlaku untuk mode startup timestamp.

initial.snapshotting.queue.size

Batas ukuran antrian untuk Snapshot awal.

Integer

Tidak

10240

Parameter ini hanya berlaku ketika opsi scan.startup.mode diatur ke initial.

batch.size

Ukuran batch untuk kursor.

Integer

Tidak

1024

Tidak ada.

poll.max.batch.size

Jumlah maksimum dokumen perubahan dalam satu batch.

Integer

Tidak

1024

Parameter ini mengontrol jumlah maksimum dokumen perubahan yang ditarik sekaligus selama pemrosesan aliran. Nilai yang lebih besar menghasilkan buffer yang lebih besar dialokasikan dalam konektor.

poll.await.time.ms

Interval waktu antara dua penarikan data.

Integer

Tidak

1000

Unitnya adalah milidetik.

heartbeat.interval.ms

Interval waktu untuk mengirim heartbeat.

Integer

Tidak

0

Unitnya adalah milidetik.

Konektor MongoDB CDC secara aktif mengirim heartbeat ke database untuk memastikan token resume tetap mutakhir. Nilai 0 berarti heartbeat tidak pernah dikirim.

Penting

Untuk koleksi yang jarang diperbarui, kami sangat menyarankan mengatur opsi ini.

scan.incremental.snapshot.enabled

Menentukan apakah akan mengaktifkan mode paralel untuk Snapshot awal.

Boolean

Tidak

false

Ini adalah fitur eksperimental.

scan.incremental.snapshot.chunk.size.mb

Ukuran chunk untuk membaca Snapshot dalam mode paralel.

Integer

Tidak

64

Ini adalah fitur eksperimental.

Unitnya adalah MB.

Parameter ini hanya berlaku ketika Snapshot paralel diaktifkan.

scan.full-changelog

Menghasilkan aliran event changelog lengkap.

Boolean

Tidak

false

Ini adalah fitur eksperimental.

Catatan

Memerlukan MongoDB 6.0 atau versi lebih baru dengan fitur pra-gambar dan post-image diaktifkan. Untuk informasi selengkapnya tentang cara mengaktifkan fitur ini, lihat Document Preimages.

scan.flatten-nested-columns.enabled

Menentukan apakah akan mengurai nama field yang dipisahkan titik (.) sebagai pembacaan dokumen BSON bersarang.

Boolean

Tidak

false

Jika diaktifkan, pada contoh dokumen BSON berikut, field col dinamai nested.col dalam skema.

{"nested":{"col":true}}
Catatan

Parameter ini hanya didukung di VVR 8.0.5 dan yang lebih baru.

scan.primitive-as-string

Menentukan apakah akan mengurai semua tipe primitif dalam dokumen BSON sebagai string.

Boolean

Tidak

false

Catatan

Parameter ini hanya didukung di VVR 8.0.5 dan yang lebih baru.

scan.ignore-delete.enabled

Menentukan apakah akan mengabaikan pesan hapus (-D).

Boolean

Tidak

false

Saat mengarsipkan data dari sumber MongoDB, banyak event DELETE dapat dihasilkan di OpLog. Jika Anda tidak ingin menyinkronkan event-event ini ke downstream, Anda dapat mengaktifkan parameter ini untuk mengabaikan event delete.

Catatan
  • Parameter ini hanya didukung di VVR 11.1 dan yang lebih baru.

  • Event DELETE lain yang tidak berasal dari operasi pengarsipan juga akan diabaikan.

scan.incremental.snapshot.backfill.skip

Menentukan apakah akan melewati proses backfill watermark dari algoritma Snapshot inkremental.

Boolean

Tidak

false

Mengaktifkan sakelar ini hanya menyediakan semantik at-least-once.

Catatan

Parameter ini hanya didukung di VVR 11.1 dan yang lebih baru.

initial.snapshotting.pipeline

Operasi pipeline MongoDB. Selama fase pembacaan Snapshot, operasi ini didorong ke MongoDB untuk memfilter hanya data yang diperlukan, sehingga meningkatkan efisiensi baca.

String

Tidak

Tidak ada.

  • Dinyatakan sebagai array objek JSON. Contoh, [{"$match": {"closed": "false"}}] hanya menyalin dokumen di mana field closed bernilai "false".

  • Opsi ini hanya berlaku ketika opsi scan.startup.mode diatur ke initial. Opsi ini hanya dapat digunakan dalam mode Debezium dan tidak dalam mode Snapshot inkremental, karena dapat menyebabkan inkonsistensi semantik.

    Catatan

    Parameter ini hanya didukung di VVR 11.1 dan yang lebih baru.

initial.snapshotting.max.threads

Jumlah thread yang digunakan untuk replikasi data.

Integer

Tidak

Tidak ada.

Parameter ini hanya berlaku ketika opsi scan.startup.mode diatur ke initial.

Catatan

Parameter ini hanya didukung di VVR 11.1 dan yang lebih baru.

initial.snapshotting.queue.size

Ukuran antrian untuk snapshot awal.

Integer

Tidak

16000

Parameter ini hanya berlaku ketika opsi scan.startup.mode diatur ke initial.

Catatan

Parameter ini hanya didukung di VVR 11.1 dan yang lebih baru.

scan.change-stream.reading.parallelism

Tingkat paralelisme untuk berlangganan Change Stream.

Integer

Tidak

1

Parameter ini hanya berlaku ketika parameter scan.incremental.snapshot.enabled diaktifkan.

Penting

Untuk berlangganan Change Stream dengan beberapa thread konkuren, Anda juga harus mengatur parameter heartbeat.interval.ms.

Catatan

Parameter ini hanya didukung di VVR 11.2 dan yang lebih baru.

scan.change-stream.reading.queue-size

Ukuran antrian pesan untuk langganan Change Stream konkuren.

Integer

Tidak

16384

Parameter ini hanya berlaku ketika parameter scan.change-stream.reading.parallelism diaktifkan.

Catatan

Parameter ini hanya didukung di VVR 11.2 dan yang lebih baru.

Khusus tabel dimensi

Parameter

Deskripsi

Tipe data

Wajib

Nilai default

Catatan

lookup.cache

Kebijakan cache.

String

Tidak

NONE

Kebijakan cache berikut didukung:

  • None: Tanpa cache.

  • Partial: Hanya menyimpan data di cache saat mencarinya di database eksternal.

lookup.max-retries

Jumlah maksimum percobaan ulang jika kueri database gagal.

Integer

Tidak

3

Tidak ada.

lookup.retry.interval

Interval waktu percobaan ulang jika kueri database gagal.

Durasi

Tidak

1s

Tidak ada.

lookup.partial-cache.expire-after-access

Periode retensi maksimum untuk catatan di cache.

Durasi

Tidak

Tidak ada

Unit waktu yang didukung adalah ms, s, min, h, dan d.

Saat menggunakan konfigurasi ini, lookup.cache harus diatur ke PARTIAL.

lookup.partial-cache.expire-after-write

Periode retensi maksimum untuk catatan setelah ditulis ke cache.

Durasi

Tidak

Tidak ada

Saat menggunakan konfigurasi ini, lookup.cache harus diatur ke PARTIAL.

lookup.partial-cache.max-rows

Jumlah maksimum baris di cache. Jika nilai ini terlampaui, baris terlama akan kedaluwarsa.

Long

Tidak

Tidak ada

Saat menggunakan konfigurasi ini, lookup.cache harus diatur ke PARTIAL.

lookup.partial-cache.cache-missing-key

Menentukan apakah akan menyimpan catatan kosong di cache saat tidak ditemukan data di tabel fisik.

Boolean

Tidak

True

Saat menggunakan konfigurasi ini, lookup.cache harus diatur ke PARTIAL.

Khusus tabel sink

Parameter

Deskripsi

Tipe data

Wajib

Nilai default

Catatan

sink.buffer-flush.max-rows

Jumlah maksimum catatan untuk setiap penulisan batch.

Integer

Tidak

1000

Tidak ada.

sink.buffer-flush.interval

Interval refresh untuk menulis data.

Durasi

Tidak

1s

Tidak ada.

sink.delivery-guarantee

Jaminan semantik untuk menulis data.

String

Tidak

at-least-once

Nilai valid:

  • none

  • at-least-once

Catatan

Exactly-once saat ini tidak didukung.

sink.max-retries

Jumlah maksimum percobaan ulang jika penulisan ke database gagal.

Integer

Tidak

3

Tidak ada.

sink.retry.interval

Interval waktu percobaan ulang jika penulisan ke database gagal.

Durasi

Tidak

1s

Tidak ada.

sink.parallelism

Tingkat paralelisme sink kustom.

Integer

Tidak

Kosong

Tidak ada.

sink.delete-strategy

Mengonfigurasi cara menangani tipe data -D atau -U.

String

Tidak

CHANGELOG_STANDARD

Nilai yang valid:

  • CHANGELOG_STANDARD: Dalam mode standar, event -U dan -D diterapkan ke sistem downstream seperti biasa.

  • IGNORE_DELETE: Mengabaikan hanya event -D, tetapi tetap menimpa seluruh baris saat update.

  • PARTIAL_UPDATE: Mengabaikan event -U untuk memungkinkan pembaruan kolom parsial. Namun, saat menerima event -D, seluruh baris tetap dihapus.

  • IGNORE_ALL: Mengabaikan event -U dan -D.

Pemetaan tipe

Tabel sumber CDC

Tipe BSON

Tipe Flink SQL

Int32

INT

Int64

BIGINT

Double

DOUBLE

Decimal128

DECIMAL(p, s)

Boolean

BOOLEAN

Date Timestamp

DATE

Date Timestamp

TIME

DateTime

TIMESTAMP(3)

TIMESTAMP_LTZ(3)

Timestamp

TIMESTAMP(0)

TIMESTAMP_LTZ(0)

String

ObjectId

UUID

Symbol

MD5

JavaScript

Regex

STRING

Binary

BYTES

Object

ROW

Array

ARRAY

DBPointer

ROW<$ref STRING, $id STRING>

GeoJSON

Point: ROW<type STRING, coordinates ARRAY<DOUBLE>>

Line: ROW<type STRING, coordinates ARRAY<ARRAY< DOUBLE>>>

Tabel dimensi dan sink

Tipe BSON

Tipe Flink SQL

Int32

INT

Int64

BIGINT

Double

DOUBLE

Decimal128

DECIMAL

Boolean

BOOLEAN

DateTime

TIMESTAMP_LTZ(3)

Timestamp

TIMESTAMP_LTZ(0)

String

ObjectId

STRING

Binary

BYTES

Object

ROW

Array

ARRAY

Contoh

Tabel sumber CDC

CREATE TEMPORARY TABLE mongo_source (
  `_id` STRING, --harus dideklarasikan
  name STRING,
  weight DECIMAL,
  tags ARRAY<STRING>,
  price ROW<amount DECIMAL, currency STRING>,
  suppliers ARRAY<ROW<name STRING, address STRING>>,
  db_name STRING METADATA FROM 'database_name' VIRTUAL,
  collection_name STRING METADATA VIRTUAL,
  op_ts TIMESTAMP_LTZ(3) METADATA VIRTUAL,
  PRIMARY KEY(_id) NOT ENFORCED
) WITH (
  'connector' = 'mongodb',
  'hosts' = 'dds-bp169b982fc25****.mongodb.rds.aliyuncs.com:3717,dds-bp169b982fc25****.mongodb.rds.aliyuncs.com:3717,',
  'username' = 'root',
  'password' = '${secret_values.password}',
  'database' = 'flinktest',
  'collection' = 'flinkcollection',
  'scan.incremental.snapshot.enabled' = 'true',
  'scan.full-changelog' = 'true'
);
CREATE TEMPORARY TABLE productssink (
  name STRING,
  weight DECIMAL,
  tags ARRAY<STRING>,
  price_amount DECIMAL,
  suppliers_name STRING,
  db_name STRING,
  collection_name STRING,
  op_ts TIMESTAMP_LTZ(3)
) WITH (
  'connector' = 'print',
  'logger' = 'true'
);
INSERT INTO productssink  
SELECT
  name,
  weight,
  tags,
  price.amount,
  suppliers[1].name,
  db_name,
  collection_name,
  op_ts
FROM
  mongo_source;

Tabel dimensi

CREATE TEMPORARY TABLE datagen_source (
  id STRING,
  a int,
  b BIGINT,
  `proctime` AS PROCTIME()
) WITH (
  'connector' = 'datagen'
);
CREATE TEMPORARY TABLE mongo_dim (
  `_id` STRING,
  name STRING,
  weight DECIMAL,
  tags ARRAY<STRING>,
  price ROW<amount DECIMAL, currency STRING>,
  suppliers ARRAY<ROW<name STRING, address STRING>>,
  PRIMARY KEY(_id) NOT ENFORCED
) WITH (
  'connector' = 'mongodb',
  'hosts' = 'dds-bp169b982fc25****.mongodb.rds.aliyuncs.com:3717,dds-bp169b982fc25****.mongodb.rds.aliyuncs.com:3717,',
  'username' = 'root',
  'password' = '${secret_values.password}',
  'database' = 'flinktest',
  'collection' = 'flinkcollection',
  'lookup.cache' = 'PARTIAL',
  'lookup.partial-cache.expire-after-access' = '10min',
  'lookup.partial-cache.expire-after-write' = '10min',
  'lookup.partial-cache.max-rows' = '100'
);
CREATE TEMPORARY TABLE print_sink (
  name STRING,
  weight DECIMAL,
  tags ARRAY<STRING>,
  price_amount DECIMAL,
  suppliers_name STRING
) WITH (
  'connector' = 'print',
  'logger' = 'true'
);
INSERT INTO print_sink
SELECT
  T.id,
  T.a,
  T.b,
  H.name
FROM
  datagen_source AS T JOIN mongo_dim FOR SYSTEM_TIME AS OF T.`proctime` AS H ON T.id = H._id;

Tabel sink

CREATE TEMPORARY TABLE datagen_source (
  `_id` STRING,
  name STRING,
  weight DECIMAL,
  tags ARRAY<STRING>,
  price ROW<amount DECIMAL, currency STRING>,
  suppliers ARRAY<ROW<name STRING, address STRING>>
) WITH (
  'connector' = 'datagen'
);
CREATE TEMPORARY TABLE mongo_sink (
  `_id` STRING,
  name STRING,
  weight DECIMAL,
  tags ARRAY<STRING>,
  price ROW<amount DECIMAL, currency STRING>,
  suppliers ARRAY<ROW<name STRING, address STRING>>,
  PRIMARY KEY(_id) NOT ENFORCED
) WITH (
  'connector' = 'mongodb',
  'hosts' = 'dds-bp169b982fc25****.mongodb.rds.aliyuncs.com:3717,dds-bp169b982fc25****.mongodb.rds.aliyuncs.com:3717,',
  'username' = 'root',
  'password' = '${secret_values.password}',
  'database' = 'flinktest',
  'collection' = 'flinkcollection'
);
INSERT INTO mongo_sink
SELECT * FROM datagen_source;

Penyerapan data

Konektor MongoDB dapat digunakan sebagai sumber data dalam pekerjaan YAML ingesti data.

Batasan

Fitur ini hanya didukung di VVR 11.1 dan versi lebih baru.

Sintaksis

source:
   type: mongodb
   name: MongoDB Source
   hosts: localhost:33076
   username: ${mongo.username}
   password: ${mongo.password}
   database: foo_db
   collection: foo_col_.*

sink:
  type: ...

Item konfigurasi

Parameter

Deskripsi

Wajib

Tipe data

Nilai default

Catatan

type

Tipe sumber data.

Ya

STRING

None

Nilainya harus mongodb.

scheme

Protokol untuk menghubungkan ke server MongoDB.

Tidak

STRING

mongodb

Nilai yang valid:

  • mongodb

  • mongodb+srv

hosts

Alamat server untuk menghubungkan ke MongoDB.

Ya

STRING

None

Anda dapat menentukan beberapa alamat yang dipisahkan koma (,).

username

Username untuk menghubungkan ke MongoDB.

Tidak

STRING

None

None.

password

Password untuk menghubungkan ke MongoDB.

Tidak

STRING

None

None.

database

Nama database MongoDB yang akan ditangkap.

Ya

STRING

None

Mendukung ekspresi reguler.

collection

Nama koleksi MongoDB yang akan ditangkap.

Ya

STRING

None

Mendukung ekspresi reguler. Anda harus mencocokkan namespace lengkap database.collection.

connection.options

Opsi koneksi tambahan yang akan ditambahkan saat menghubungkan ke server MongoDB.

Tidak

STRING

None

Pasangan key-value dalam format k=v, dipisahkan dengan ampersand (&). Contoh: replicaSet=test&connectTimeoutMS=300000.

schema.inference.strategy

Strategi untuk inferensi tipe dokumen.

Nilai yang valid adalah continuous dan static.

Tidak

STRING

continuous

Jika diatur ke continuous, Sumber MongoDB terus-menerus melakukan inferensi tipe. Jika catatan berikutnya tidak konsisten dengan skema saat ini, sistem akan mengeluarkan event evolusi skema untuk memperluas struktur agar sesuai dengan data baru.

Jika diatur ke static, MongoDB hanya melakukan inferensi skema sekali selama fase inisialisasi.

scan.max.pre.fetch.records

Jumlah maksimum catatan yang diambil sampel dari setiap koleksi yang ditangkap selama inferensi awal.

Tidak

INT

50

None.

scan.startup.mode

Menentukan mode startup untuk sumber data MongoDB.

Nilai yang valid adalah initial, latest-offset, timestamp, dan snapshot.

Tidak

STRING

initial

Nilai yang valid:

  • initial: Menarik semua data dari offset awal dan secara otomatis beralih ke mode inkremental.

  • latest-offset: Menarik data perubahan dari offset OpLog terbaru.

  • timestamp: Menarik data perubahan dari timestamp tertentu.

  • snapshot: Melakukan Snapshot satu kali terhadap status database saat ini.

scan.startup.timestamp-millis

Saat mode startup diatur ke timestamp, parameter ini menentukan timestamp untuk mulai menangkap data perubahan.

Tidak

LONG

None

None.

chunk-meta.group.size

Menetapkan batas ukuran chunk metadata.

Tidak

INT

1000

None.

scan.incremental.close-idle-reader.enabled

Menentukan apakah akan menutup Pembaca Sumber yang menganggur setelah beralih ke mode inkremental.

Tidak

BOOLEAN

false

None.

scan.incremental.snapshot.backfill.skip

Menentukan apakah akan melewati proses backfill watermark dari algoritma Snapshot inkremental.

Tidak

BOOLEAN

false

Jika konektor sink yang Anda gunakan dapat secara otomatis menghapus duplikat berdasarkan primary key, mengaktifkan sakelar ini dapat mengurangi waktu yang dibutuhkan untuk transisi dari lengkap ke inkremental.

scan.incremental.snapshot.unbounded-chunk-first.enabled

Menentukan apakah akan membaca chunk tak terbatas terlebih dahulu saat menjalankan algoritma Snapshot inkremental.

Tidak

BOOLEAN

false

Jika koleksi yang Anda ambil Snapshot-nya sering diperbarui, mengaktifkan fitur ini dapat mengurangi kemungkinan terjadinya error kehabisan memori saat membaca chunk tak terbatas.

batch.size

Ukuran batch kursor untuk membaca data MongoDB.

Tidak

INT

1024

None.

poll.max.batch.size

Jumlah maksimum entri per permintaan saat menarik Change Stream.

Tidak

INT

1024

None.

poll.await.time.ms

Waktu tunggu minimum antar permintaan saat menarik Change Stream.

Tidak

INT

1000

Unitnya adalah milidetik.

heartbeat.interval.ms

Interval waktu untuk mengirim heartbeat.

Tidak

INT

0

Unitnya adalah milidetik.

Konektor MongoDB CDC secara aktif mengirim heartbeat ke database untuk memastikan token resume tetap mutakhir. Nilai 0 berarti heartbeat tidak pernah dikirim.

Catatan

Untuk koleksi yang jarang diperbarui, kami sangat menyarankan mengatur opsi ini.

scan.incremental.snapshot.chunk.size.mb

Ukuran chunk selama fase Snapshot.

Tidak

INT

64

Unitnya adalah MB.

scan.incremental.snapshot.chunk.samples

Jumlah sampel yang digunakan saat menentukan ukuran koleksi selama fase Snapshot.

Tidak

INT

20

None.

scan.full-changelog

Menentukan apakah akan menghasilkan aliran event changelog lengkap berdasarkan catatan Mongo Pre- dan Post-Image.

Tidak

BOOLEAN

false

Memerlukan MongoDB 6.0 atau versi lebih baru dengan fitur pra-gambar dan post-image diaktifkan. Untuk informasi selengkapnya tentang cara mengaktifkan fitur ini, lihat Document Preimages.

scan.cursor.no-timeout

Menentukan apakah akan mengatur kursor untuk membaca data agar tidak pernah kedaluwarsa.

Tidak

BOOLEAN

false

Server MongoDB biasanya menutup kursor setelah menganggur selama periode tertentu (10 menit) untuk mencegah penggunaan memori tinggi. Mengatur opsi ini ke true mencegah hal tersebut terjadi.

scan.ignore-delete.enabled

Menentukan apakah akan mengabaikan catatan event hapus dari sumber MongoDB.

Tidak

BOOLEAN

false

None.

scan.flatten.nested-documents.enabled

Menentukan apakah akan meratakan struktur bersarang dalam dokumen BSON.

Tidak

BOOLEAN

false

Saat opsi ini diaktifkan, skema seperti {"doc": {"foo": 1, "bar": "two"}} diperluas menjadi doc.foo INT, doc.bar STRING.

scan.all.primitives.as-string.enabled

Menentukan apakah akan menginferensi semua tipe primitif sebagai STRING.

Tidak

BOOLEAN

false

Mengaktifkan opsi ini dapat menghindari pembuatan banyak event evolusi skema saat menangani tipe data input campuran.

Pemetaan tipe

Tipe BSON

Tipe CDC

Catatan

STRING

VARCHAR

None.

INT32

INT

INT64

BIGINT

DECIMAL128

DECIMAL

DOUBLE

DOUBLE

BOOLEAN

BOOLEAN

TIMESTAMP

TIMESTAMP

DATETIME

LOCALZONEDTIMESTAMP

BINARY

VARBINARY

DOCUMENT

MAP

Parameter tipe Key/Value perlu diinferensi.

ARRAY

ARRAY

Parameter tipe elemen perlu diinferensi.

OBJECTID

VARCHAR

Direpresentasikan sebagai HexString.

SYMBOL

REGULAREXPRESSION

JAVASCRIPT

JAVASCRIPTWITHSCOPE

VARCHAR

Direpresentasikan sebagai string.

Metadata

Konektor SQL

Tabel sumber SQL CDC MongoDB mendukung sintaksis kolom metadata. Anda dapat mengakses metadata berikut melalui kolom metadata.

Kunci metadata

Tipe metadata

Deskripsi

database_name

STRING NOT NULL

Nama database yang berisi dokumen.

collection_name

STRING NOT NULL

Nama koleksi yang berisi dokumen.

op_ts

TIMESTAMP_LTZ(3) NOT NULL

Waktu dokumen diubah di database. Jika dokumen berasal dari data historis yang sudah ada di tabel, bukan dari ChangeStream, nilai ini selalu 0.

row_kind

STRING NOT NULL

Menunjukkan jenis perubahan data. Nilai yang valid:

  • +I: INSERT

  • -D: DELETE

  • -U: UPDATE_BEFORE

  • +U: UPDATE_AFTER

Catatan

Fitur ini hanya didukung di VVR 11.1 dan versi lebih baru.

YAML Ingesti Data

Konektor YAML ingesti data CDC MongoDB mendukung pembacaan kolom metadata berikut:

Kunci metadata

Tipe metadata

Deskripsi

ts_ms

BIGINT NOT NULL

Waktu dokumen diubah di database. Jika dokumen berasal dari data historis yang sudah ada di tabel, bukan dari ChangeStream, nilai ini selalu 0.

Anda juga dapat menggunakan kolom metadata umum yang disediakan oleh modul Transform untuk mengakses informasi nama database, nama koleksi, dan row_kind.

Tentang fitur pencatatan pra-gambar dan post-image di MongoDB

Secara default, versi MongoDB sebelum 6.0 tidak menyediakan data untuk dokumen sebelum perubahan atau dokumen yang dihapus. Tanpa fitur ini diaktifkan, informasi yang tersedia hanya dapat mencapai semantik Upsert, artinya entri data Update Before tidak tersedia. Namun, banyak operator berguna di Flink bergantung pada aliran perubahan lengkap yang mencakup event Insert, Update Before, Update After, dan Delete.

Untuk melengkapi event pra-perubahan yang hilang, Flink SQL Planner secara otomatis menghasilkan node ChangelogNormalize untuk sumber data tipe Upsert. Node ini menyimpan Snapshot versi terkini semua dokumen di state Flink. Saat menemukan dokumen yang diperbarui atau dihapus, node ini dapat mencari status pra-perubahan dari Snapshot yang disimpan. Namun, node operator ini memerlukan jumlah data state yang besar.

image.png

MongoDB 6.0 mendukung fitur pra- dan post-image untuk database. Untuk informasi selengkapnya, lihat Gunakan Change Stream MongoDB untuk Menangkap Perubahan Data Secara Real Time. Setelah Anda mengaktifkan fitur ini, MongoDB mencatat status lengkap dokumen sebelum dan sesudah setiap perubahan dalam koleksi khusus. Kemudian, saat Anda mengaktifkan item konfigurasi scan.full-changelog dalam pekerjaan, MongoDB CDC menghasilkan catatan Update Before dari catatan dokumen perubahan. Proses ini menciptakan aliran event lengkap dan menghilangkan ketergantungan pada node ChangelogNormalize.

Mongo CDC DataStream API

Penting

Saat membaca atau menulis data menggunakan API DataStream, Anda perlu menggunakan konektor DataStream yang sesuai untuk terhubung ke Flink. Untuk petunjuk penyiapan, lihat Gunakan konektor DataStream.

Anda dapat membuat program API DataStream dan menggunakan MongoDBSource. Kode berikut menunjukkan contohnya:

Java

MongoDBSource.builder()
  .hosts("mongo.example.com:27017")
  .username("mongouser")
  .password("mongopasswd")
  .databaseList("testdb")
  .collectionList("testcoll")
  .startupOptions(StartupOptions.initial())
  .deserializer(new JsonDebeziumDeserializationSchema())
  .build();

XML

Konektor MongoDB VVR tersedia di Repositori Maven Central. Anda dapat menggunakannya langsung dalam pengembangan pekerjaan Anda.

<dependency>
    <groupId>com.alibaba.ververica</groupId>
    <artifactId>flink-connector-mongodb</artifactId>
    <version>${vvr.version}</version>
</dependency>
Catatan

Saat menggunakan API DataStream, untuk mengaktifkan fitur Snapshot inkremental, gunakan MongoDBSource#builder() dari paket com.ververica.cdc.connectors.mongodb.source saat membuat MongoDBSource. Jika tidak, gunakan MongoDBSource#builder() dari paket com.ververica.cdc.connectors.mongodb.

Saat membuat MongoDBSource, Anda dapat mengonfigurasi parameter berikut:

Parameter

Deskripsi

hosts

Hostname database MongoDB yang akan dihubungkan.

username

Nama pengguna untuk layanan database MongoDB.

Catatan

Jika autentikasi tidak diaktifkan di server MongoDB, Anda tidak perlu mengonfigurasi parameter ini.

password

Kata sandi untuk layanan database MongoDB.

Catatan

Jika autentikasi tidak diaktifkan di server MongoDB, Anda tidak perlu mengonfigurasi parameter ini.

databaseList

Nama database MongoDB yang akan dipantau.

Catatan

Nama database mendukung ekspresi reguler untuk membaca data dari beberapa database. Anda dapat menggunakan .* untuk mencocokkan semua database.

collectionList

Nama koleksi MongoDB yang akan dipantau.

Catatan

Nama koleksi mendukung ekspresi reguler untuk membaca data dari beberapa koleksi. Anda dapat menggunakan .* untuk mencocokkan semua koleksi.

startupOptions

Memilih mode startup untuk MongoDB CDC.

Nilai valid:

  • StartupOptions.initial()

    • Menarik semua data dari offset awal.

  • StartupOptions.latest-offset()

    • Menarik data perubahan dari offset saat ini.

  • StartupOptions.timestamp()

    • Menarik data perubahan dari timestamp tertentu.

Untuk informasi lebih lanjut, lihat Properti Startup.

deserializer

Deserializer yang mendeserialisasi catatan tipe SourceRecord ke tipe tertentu. Nilai yang valid:

  • MongoDBConnectorDeserializationSchema: Mengonversi SourceRecord yang dihasilkan dalam mode Upsert ke struktur data internal RowData untuk Flink Table API atau SQL API.

  • MongoDBConnectorFullChangelogDeserializationSchema: Mengonversi SourceRecord yang dihasilkan dalam mode Full Changelog ke struktur data internal RowData untuk Flink Table atau SQL.

  • JsonDebeziumDeserializationSchema: Mengonversi SourceRecord ke String berformat JSON.