全部产品
Search
文档中心

Realtime Compute for Apache Flink:MongoDB

更新时间:Feb 10, 2026

Topik ini menjelaskan cara menggunakan konektor MongoDB.

Informasi latar belakang

MongoDB adalah database berorientasi dokumen yang menyimpan data tidak terstruktur serta menyederhanakan pengembangan dan penskalaan aplikasi. Konektor MongoDB mendukung kemampuan berikut:

Kategori

Deskripsi

Tipe yang didukung

Tabel sumber, dimensi, sink, dan ingesti data

Mode eksekusi

Hanya mode streaming

Metrik pemantauan

Metrik

  • Tabel sumber

    • numBytesIn

    • numBytesInPerSecond

    • numRecordsIn

    • numRecordsInPerSecond

    • numRecordsInErrors

    • currentFetchEventTimeLag

    • currentEmitEventTimeLag

    • watermarkLag

    • sourceIdleTime

  • Tabel dimensi dan sink: tidak ada

Catatan

Untuk informasi selengkapnya tentang metrik tersebut, lihat Metrik pemantauan.

Tipe API

DataStream, SQL, dan YAML ingesti data

Dukungan untuk memperbarui atau menghapus data tabel sink

Ya

Fitur

Tabel sumber Change Data Capture (CDC) MongoDB menangkap data lengkap dan inkremental menggunakan API Change Stream. Fitur ini pertama-tama membaca data historis dalam snapshot lengkap, lalu secara mulus beralih ke pembacaan data inkremental dari oplog. Proses ini menyediakan semantik tepat-sekali, yang memastikan bahwa tidak ada catatan duplikat atau yang terlewat serta menjamin konsistensi data selama pemulihan kesalahan.

  • Berdasarkan API Change Stream

    Konektor menggunakan API Change Stream yang diperkenalkan di MongoDB 3.6 untuk secara efisien menangkap event insert, update, replace, dan delete dari database dan koleksi. Event-event tersebut dikonversi menjadi aliran changelog yang dapat diproses oleh Flink.

  • Penangkapan data lengkap dan inkremental yang mulus

    Konektor secara otomatis membaca snapshot awal, lalu beralih ke mode inkremental tanpa intervensi manual.

  • Parallel Snapshot Reads

    Konektor membaca data historis secara paralel untuk meningkatkan performa. Fitur ini memerlukan MongoDB 4.0 atau versi yang lebih baru.

  • Beberapa mode startup

    • initial: Melakukan snapshot lengkap saat pekerjaan pertama kali dijalankan, lalu membaca oplog untuk perubahan inkremental.

    • latest-offset: Mulai membaca dari posisi terbaru oplog dan tidak membaca data historis.

    • timestamp: Membaca event oplog mulai dari timestamp tertentu, melewati snapshot. Fitur ini memerlukan MongoDB 4.0 atau versi yang lebih baru.

  • Dukungan Changelog lengkap

    Menghasilkan event changelog lengkap yang mencakup status data sebelum dan sesudah perubahan. Fitur ini memerlukan MongoDB 6.0 atau versi yang lebih baru serta fitur perekaman preimage dan postimage.

Peningkatan integrasi Flink

Prasyarat

  • Persyaratan instans MongoDB

    • Konektor hanya mendukung Alibaba Cloud MongoDB (replica set atau kluster sharded) atau MongoDB self-managed versi 3.6 atau yang lebih baru.

    • Anda harus mengaktifkan fitur replica set untuk database MongoDB yang ingin dipantau. Untuk informasi selengkapnya, lihat Replication.

  • Ketergantungan fitur MongoDB

    • Untuk menggunakan fitur aliran event Full Changelog, Anda harus mengaktifkan fitur perekaman preimage dan postimage.

    • Jika autentikasi MongoDB diaktifkan, pengguna MongoDB Anda harus memiliki izin database berikut:

      Izin yang diperlukan

      • izin splitVector

      • izin listDatabases

      • izin listCollections

      • izin collStats

      • Cari izin

      • izin changeStream

      • Akses ke koleksi config.collections dan config.chunks

  • Persiapan jaringan dan lainnya untuk MongoDB

    • Daftar putih alamat IP telah dikonfigurasi untuk mengizinkan Flink mengakses MongoDB.

    • Anda telah membuat database dan tabel MongoDB tujuan.

Batasan

  • Tabel sumber CDC

    • Pembacaan paralel selama fase snapshot awal hanya didukung untuk MongoDB 4.0 dan versi yang lebih baru. Untuk mengaktifkan fitur ini, atur opsi konfigurasi scan.incremental.snapshot.enabled ke true.

    • Anda tidak dapat membaca data dari database admin, local, atau config, maupun dari koleksi sistem. Hal ini disebabkan oleh batasan langganan Change Streams MongoDB. Untuk informasi selengkapnya, lihat dokumentasi MongoDB.

  • Tabel sink

    • Pada Realtime Compute for Apache Flink yang menggunakan VVR versi sebelum 8.0.5, Anda hanya dapat memasukkan data ke tabel sink.

    • Pada Realtime Compute for Apache Flink yang menggunakan VVR 8.0.5 atau versi yang lebih baru, Anda dapat memasukkan, memperbarui, dan menghapus data jika mendeklarasikan primary key dalam pernyataan DDL yang digunakan untuk membuat tabel sink. Jika tidak ada primary key yang dideklarasikan, Anda hanya dapat memasukkan data.

  • Tabel dimensi

    • Tabel dimensi MongoDB didukung pada Realtime Compute for Apache Flink yang menggunakan VVR 8.0.5 atau versi yang lebih baru.

SQL

Sintaksis

CREATE TABLE tableName(
  _id STRING,
  [columnName dataType,]*
  PRIMARY KEY(_id) NOT ENFORCED
) WITH (
  'connector' = 'mongodb',
  'hosts' = 'localhost:27017',
  'username' = 'mongouser',
  'password' = '${secret_values.password}',
  'database' = 'testdb',
  'collection' = 'testcoll'
)
Catatan

Saat membuat tabel sumber CDC, Anda harus mendeklarasikan kolom _id STRING dan menentukannya sebagai primary key.

Parameter WITH

Umum

Parameter

Deskripsi

Tipe

Wajib?

Nilai default

Keterangan

connector

Nama konektor.

String

Ya

Tidak ada

  • Tabel sumber:

    • VVR 8.0.4 atau versi sebelumnya: Atur opsi ini ke mongodb-cdc.

    • VVR 8.0.5 atau versi yang lebih baru: Atur opsi ini ke mongodb atau mongodb-cdc.

  • Tabel dimensi atau sink: Atur opsi ini ke mongodb.

uri

Uniform Resource Identifier (URI) yang digunakan untuk menghubungkan ke database MongoDB.

String

Tidak

Tidak ada nilai default

Catatan

Anda harus menentukan salah satu opsi uri atau hosts. Jika Anda menentukan opsi uri, jangan tentukan scheme, hosts, username, password, atau connector.options. Jika Anda menentukan kedua opsi tersebut, URI yang ditentukan oleh uri akan digunakan.

hosts

Hostname server database MongoDB.

String

Tidak

Tidak ada

Pisahkan beberapa hostname dengan koma (,).

scheme

Protokol koneksi yang digunakan untuk mengakses database MongoDB.

String

Tidak

mongodb

Nilai yang valid:

  • mongodb: Gunakan protokol MongoDB default.

  • mongodb+srv: Gunakan protokol rekaman DNS SRV.

username

Username yang digunakan untuk menghubungkan ke MongoDB.

String

Tidak

Tidak ada nilai default

Parameter ini wajib jika fitur verifikasi identitas diaktifkan.

password

Password yang digunakan untuk menghubungkan ke MongoDB.

String

Tidak

Tidak ada nilai default

Anda harus mengonfigurasi parameter ini jika mengaktifkan fitur verifikasi identitas.

Penting

Untuk mencegah kebocoran password, gunakan variabel daripada hardcoding kredensial.

database

Nama database MongoDB.

String

Tidak

Tidak ada nilai default

  • Untuk tabel sumber, nama database mendukung pencocokan ekspresi reguler.

  • Jika Anda tidak menentukan opsi ini, semua database akan dipantau.

Penting

Anda tidak dapat memantau data di database admin, local, dan config.

collection

Nama koleksi MongoDB.

String

Tidak

Tidak ada nilai default

  • Untuk tabel sumber, nama koleksi mendukung pencocokan ekspresi reguler.

    Penting

    Jika nama koleksi berisi karakter khusus ekspresi reguler, berikan namespace lengkap (nama_database.nama_koleksi). Jika tidak, perubahan pada koleksi tidak dapat ditangkap.

  • Jika Anda tidak menentukan opsi ini, semua koleksi akan dipantau.

Penting

Anda tidak dapat memantau data di koleksi sistem.

connection.options

Opsi koneksi tambahan untuk MongoDB.

String

Tidak

Tidak ada nilai default

Tentukan opsi tambahan sebagai pasangan kunci-nilai (&-dipisahkan key=value). Contoh: connectTimeoutMS=12000&socketTimeoutMS=13000.

Penting

Secara default, MongoDB CDC tidak mengatur timeout koneksi soket. Hal ini dapat menyebabkan gangguan lama selama fluktuasi jaringan.

Kami merekomendasikan menyetel socketTimeoutMS ke nilai yang wajar di sini untuk menghindari masalah ini.

Khusus sumber

Parameter

Deskripsi

Tipe

Wajib?

Nilai default

Keterangan

scan.startup.mode

Mode startup konektor CDC MongoDB.

String

Tidak

initial

Nilai yang valid:

  • initial: Menarik semua data dari offset awal.

  • latest-offset: Menarik data perubahan dari offset saat ini.

  • timestamp: Menarik data perubahan dari timestamp tertentu.

Untuk informasi selengkapnya, lihat Startup Properties.

scan.startup.timestamp-millis

Timestamp awal untuk konsumsi pada offset yang ditentukan.

Long

Bergantung pada nilai scan.startup.mode:

  • initial: Tidak

  • latest-offset: Tidak

  • timestamp: Ya

Tidak ada

Nilainya adalah jumlah milidetik sejak epoch UNIX (00:00:00 UTC pada 1 Januari 1970).

Hanya berlaku untuk mode startup timestamp.

initial.snapshotting.queue.size

Ukuran antrian maksimum untuk fase snapshot awal.

Integer

Tidak

10240

Hanya berlaku ketika scan.startup.mode diatur ke initial.

batch.size

Ukuran pemrosesan batch kursor.

Integer

Tidak

1024

Tidak berlaku.

poll.max.batch.size

Jumlah maksimum dokumen perubahan yang diproses dalam satu batch.

Integer

Tidak

1024

Opsi ini mengontrol jumlah maksimum dokumen perubahan yang ditarik sekaligus selama pemrosesan aliran. Nilai yang lebih besar mengalokasikan buffer internal yang lebih besar di konektor.

poll.await.time.ms

Interval antara penarikan data.

Integer

Tidak

1000

Unit: milidetik.

heartbeat.interval.ms

Interval pengiriman paket heartbeat.

Integer

Tidak

0

Unit: milidetik.

Konektor CDC MongoDB mengirim paket heartbeat ke database untuk memastikan status backtracking terbaru. Mengatur nilai ini ke 0 akan menonaktifkan paket heartbeat.

Penting

Kami sangat merekomendasikan menyetel opsi ini untuk koleksi yang jarang diperbarui.

scan.incremental.snapshot.enabled

Mengaktifkan pembacaan paralel selama fase snapshot awal.

Boolean

Tidak

false

Ini adalah fitur eksperimental.

scan.incremental.snapshot.chunk.size.mb

Ukuran shard saat pembacaan snapshot paralel diaktifkan.

Integer

Tidak

64

Ini adalah fitur eksperimental.

Unit: MB.

Hanya berlaku ketika pembacaan snapshot paralel diaktifkan.

scan.full-changelog

Menghasilkan aliran event changelog lengkap.

Boolean

Tidak

false

Ini adalah fitur eksperimental.

Catatan

Memerlukan MongoDB 6.0 atau versi yang lebih baru dengan fitur preimage dan postimage diaktifkan. Untuk petunjuknya, lihat Document Preimages.

scan.flatten-nested-columns.enabled

Mengurai field yang dipisahkan oleh . sebagai field dokumen BSON bersarang.

Boolean

Tidak

false

Jika diaktifkan, field col dalam dokumen BSON berikut dinamai nested.col dalam skema.

{"nested":{"col":true}}
Catatan

Hanya didukung di VVR 8.0.5 atau versi yang lebih baru.

scan.primitive-as-string

Mengurai semua tipe primitif dalam dokumen BSON sebagai STRING.

Boolean

Tidak

false

Catatan

Hanya didukung di VVR 8.0.5 atau versi yang lebih baru.

scan.ignore-delete.enabled

Apakah akan mengabaikan pesan hapus (-D).

Boolean

Tidak

false

Saat mengarsipkan data sumber MongoDB, banyak event DELETE mungkin muncul di OpLog. Aktifkan opsi ini untuk mengabaikan event tersebut dan mencegahnya disinkronkan ke downstream.

Catatan
  • Hanya didukung di VVR 11.1 atau versi yang lebih baru.

  • Semua event DELETE—bukan hanya yang berasal dari pengarsipan—akan diabaikan.

scan.incremental.snapshot.backfill.skip

Melewati proses backfill watermark selama pembacaan snapshot inkremental.

Boolean

Tidak

false

Mengaktifkan opsi ini hanya menyediakan semantik at-least-once.

Catatan

Hanya didukung di VVR 11.1 atau versi yang lebih baru.

initial.snapshotting.pipeline

Operasi pipeline MongoDB yang didorong ke MongoDB selama pembacaan snapshot untuk memfilter hanya data yang diperlukan dan meningkatkan efisiensi.

String

Tidak

Tidak ada.

  • Tentukan sebagai array objek JSON. Contoh: [{"$match": {"closed": "false"}}] hanya menyalin dokumen di mana field closed bernilai "false".

  • Opsi ini hanya berlaku ketika scan.startup.mode diatur ke initial, dan hanya berfungsi dalam mode Debezium untuk mencegah inkonsistensi semantik.

    Catatan

    Hanya didukung di VVR 11.1 atau versi yang lebih baru.

initial.snapshotting.max.threads

Jumlah thread yang digunakan untuk replikasi data.

Integer

Tidak

Tidak ada.

Hanya berlaku ketika scan.startup.mode diatur ke initial.

Catatan

Hanya didukung di VVR 11.1 atau versi yang lebih baru.

initial.snapshotting.queue.size

Ukuran antrian untuk snapshot awal.

Integer

Tidak

16000

Hanya berlaku ketika scan.startup.mode diatur ke initial.

Catatan

Hanya didukung di VVR 11.1 atau versi yang lebih baru.

scan.change-stream.reading.parallelism

Tingkat paralelisme untuk berlangganan Change Stream.

Integer

Tidak

1

Hanya berlaku ketika scan.incremental.snapshot.enabled diaktifkan.

Penting

Untuk berlangganan Change Stream dengan beberapa pembaca konkuren, atur juga heartbeat.interval.ms.

Catatan

Hanya didukung di VVR 11.2 atau versi yang lebih baru.

scan.change-stream.reading.queue-size

Ukuran antrian pesan untuk langganan Change Stream konkuren.

Integer

Tidak

16384

Hanya berlaku ketika scan.change-stream.reading.parallelism diaktifkan.

Catatan

Hanya didukung di VVR 11.2 atau versi yang lebih baru.

Khusus tabel dimensi

Parameter

Deskripsi

Tipe data

Wajib?

Nilai default

Keterangan

lookup.cache

Kebijakan cache.

String

Tidak

NONE

Kebijakan yang didukung:

  • None: Tanpa caching.

  • Partial: Hanya cache data yang dicari dari database eksternal.

lookup.max-retries

Jumlah maksimum percobaan ulang yang diizinkan saat kueri ke database gagal.

Integer

Tidak

3

Tidak berlaku.

lookup.retry.interval

Interval antara percobaan ulang saat kueri ke database gagal.

Duration

Tidak

1s

Tidak berlaku.

lookup.partial-cache.expire-after-access

Waktu maksimum catatan tetap dalam cache setelah diakses.

Duration

Tidak

Tidak ada

Unit yang didukung: ms, s, min, h, d.

Memerlukan lookup.cache diatur ke PARTIAL.

lookup.partial-cache.expire-after-write

Waktu maksimum catatan tetap dalam cache setelah ditulis.

Duration

Tidak

Tidak ada

Memerlukan lookup.cache diatur ke PARTIAL.

lookup.partial-cache.max-rows

Jumlah maksimum baris yang di-cache. Saat melebihi batas, baris terlama akan kedaluwarsa.

Long

Tidak

Tidak ada

Memerlukan lookup.cache diatur ke PARTIAL.

lookup.partial-cache.cache-missing-key

Menyimpan catatan kosong saat tidak ada data yang terkait dengan tabel fisik.

Boolean

Tidak

True

Memerlukan lookup.cache diatur ke PARTIAL.

Khusus sink

Parameter

Deskripsi

Tipe

Wajib?

Nilai default

Keterangan

sink.buffer-flush.max-rows

Jumlah maksimum catatan yang ditulis per batch.

Integer

Tidak

1000

Tidak berlaku.

sink.buffer-flush.interval

Interval pengosongan data.

Duration

Tidak

1s

Tidak berlaku.

sink.delivery-guarantee

Jaminan semantik untuk penulisan data.

String

Tidak

at-least-once

Nilai yang valid:

  • none

  • at-least-once

Catatan

Exactly-once tidak didukung.

sink.max-retries

Jumlah maksimum percobaan ulang yang diizinkan saat penulisan data ke database gagal.

Integer

Tidak

3

Tidak berlaku.

sink.retry.interval

Interval antara percobaan ulang saat penulisan data ke database gagal.

Duration

Tidak

1s

Tidak berlaku.

sink.parallelism

Tingkat paralelisme kustom untuk sink.

Integer

Tidak

kosong

Tidak berlaku.

sink.delete-strategy

Menentukan cara menangani event data -D dan -U.

String

Tidak

CHANGELOG_STANDARD

Nilai yang valid:

  • CHANGELOG_STANDARD: Mode standar. Menerapkan event -U dan -D secara normal ke downstream.

  • IGNORE_DELETE: Mengabaikan hanya event -D tetapi menimpa seluruh baris saat pembaruan.

  • PARTIAL_UPDATE: Mengabaikan event -U untuk mendukung pembaruan kolom parsial. Menghapus seluruh baris pada event -D.

  • IGNORE_ALL: Mengabaikan event -U dan -D.

Pemetaan tipe data

Tabel sumber CDC

Tipe BSON

Tipe Flink SQL

Int32

INT

Int64

BIGINT

Double

DOUBLE

Decimal128

DECIMAL(p, s)

Boolean

BOOLEAN

Date Timestamp

DATE

Date Timestamp

TIME

DateTime

TIMESTAMP(3)

TIMESTAMP_LTZ(3)

Timestamp

TIMESTAMP(0)

TIMESTAMP_LTZ(0)

String

ObjectId

UUID

Symbol

MD5

JavaScript

Regex

STRING

Binary

BYTES

Object

ROW

Array

ARRAY

DBPointer

ROW<$ref STRING, $id STRING>

GeoJSON

Point: ROW<type STRING, coordinates ARRAY<DOUBLE>>

Line: ROW<type STRING, coordinates ARRAY<ARRAY< DOUBLE>>>

Tabel dimensi dan sink

Tipe BSON

Tipe Flink SQL

Int32

INT

Int64

BIGINT

Double

DOUBLE

Decimal128

DECIMAL

Boolean

BOOLEAN

DateTime

TIMESTAMP_LTZ(3)

Timestamp

TIMESTAMP_LTZ(0)

String

ObjectId

STRING

Binary

BYTES

Object

ROW

Array

ARRAY

Contoh Penggunaan

Tabel sumber CDC

CREATE TEMPORARY TABLE mongo_source (
  `_id` STRING, --harus dideklarasikan
  name STRING,
  weight DECIMAL,
  tags ARRAY<STRING>,
  price ROW<amount DECIMAL, currency STRING>,
  suppliers ARRAY<ROW<name STRING, address STRING>>,
  db_name STRING METADATA FROM 'database_name' VIRTUAL,
  collection_name STRING METADATA VIRTUAL,
  op_ts TIMESTAMP_LTZ(3) METADATA VIRTUAL,
  PRIMARY KEY(_id) NOT ENFORCED
) WITH (
  'connector' = 'mongodb',
  'hosts' = 'dds-bp169b982fc25****.mongodb.rds.aliyuncs.com:3717,dds-bp169b982fc25****.mongodb.rds.aliyuncs.com:3717,',
  'username' = 'root',
  'password' = '${secret_values.password}',
  'database' = 'flinktest',
  'collection' = 'flinkcollection',
  'scan.incremental.snapshot.enabled' = 'true',
  'scan.full-changelog' = 'true'
);
CREATE TEMPORARY TABLE  productssink (
  name STRING,
  weight DECIMAL,
  tags ARRAY<STRING>,
  price_amount DECIMAL,
  suppliers_name STRING,
  db_name STRING,
  collection_name STRING,
  op_ts TIMESTAMP_LTZ(3)
) WITH (
  'connector' = 'print',
  'logger' = 'true'
);
INSERT INTO productssink  
SELECT
  name,
  weight,
  tags,
  price.amount,
  suppliers[1].name,
  db_name,
  collection_name,
  op_ts
FROM
  mongo_source;

Tabel dimensi

CREATE TEMPORARY TABLE datagen_source (
  id STRING,
  a int,
  b BIGINT,
  `proctime` AS PROCTIME()
) WITH (
  'connector' = 'datagen'
);
CREATE TEMPORARY TABLE mongo_dim (
  `_id` STRING,
  name STRING,
  weight DECIMAL,
  tags ARRAY<STRING>,
  price ROW<amount DECIMAL, currency STRING>,
  suppliers ARRAY<ROW<name STRING, address STRING>>,
  PRIMARY KEY(_id) NOT ENFORCED
) WITH (
  'connector' = 'mongodb',
  'hosts' = 'dds-bp169b982fc25****.mongodb.rds.aliyuncs.com:3717,dds-bp169b982fc25****.mongodb.rds.aliyuncs.com:3717,',
  'username' = 'root',
  'password' = '${secret_values.password}',
  'database' = 'flinktest',
  'collection' = 'flinkcollection',
  'lookup.cache' = 'PARTIAL',
  'lookup.partial-cache.expire-after-access' = '10min',
  'lookup.partial-cache.expire-after-write' = '10min',
  'lookup.partial-cache.max-rows' = '100'
);
CREATE TEMPORARY TABLE print_sink (
  name STRING,
  weight DECIMAL,
  tags ARRAY<STRING>,
  price_amount DECIMAL,
  suppliers_name STRING
) WITH (
  'connector' = 'print',
  'logger' = 'true'
);
INSERT INTO print_sink
SELECT
  T.id,
  T.a,
  T.b,
  H.name
FROM
  datagen_source AS T JOIN mongo_dim FOR SYSTEM_TIME AS OF T.`proctime` AS H ON T.id = H._id;

Tabel sink

CREATE TEMPORARY TABLE datagen_source (
  `_id` STRING,
  name STRING,
  weight DECIMAL,
  tags ARRAY<STRING>,
  price ROW<amount DECIMAL, currency STRING>,
  suppliers ARRAY<ROW<name STRING, address STRING>>
) WITH (
  'connector' = 'datagen'
);
CREATE TEMPORARY TABLE mongo_sink (
  `_id` STRING,
  name STRING,
  weight DECIMAL,
  tags ARRAY<STRING>,
  price ROW<amount DECIMAL, currency STRING>,
  suppliers ARRAY<ROW<name STRING, address STRING>>,
  PRIMARY KEY(_id) NOT ENFORCED
) WITH (
  'connector' = 'mongodb',
  'hosts' = 'dds-bp169b982fc25****.mongodb.rds.aliyuncs.com:3717,dds-bp169b982fc25****.mongodb.rds.aliyuncs.com:3717,',
  'username' = 'root',
  'password' = '${secret_values.password}',
  'database' = 'flinktest',
  'collection' = 'flinkcollection'
);
INSERT INTO mongo_sink
SELECT * FROM datagen_source;

Ingesti Data (pratinjau publik)

Anda dapat menggunakan konektor MongoDB sebagai sumber ingesti data.

Batasan

Fitur ini hanya didukung di VVR 11.1 dan versi yang lebih baru.

Sintaksis

source:
   type: mongodb
   name: MongoDB Source
   hosts: localhost:33076
   username: ${mongo.username}
   password: ${mongo.password}
   database: foo_db
   collection: foo_col_.*

sink:
  type: ...

Opsi konfigurasi

Parameter

Deskripsi

Wajib?

Tipe Data

Nilai default

Keterangan

type

Tipe sumber data.

Ya

STRING

Tidak ada

Atur opsi ini ke mongodb.

scheme

Protokol yang digunakan untuk menghubungkan ke server MongoDB.

Tidak

STRING

mongodb

Nilai yang valid:

  • mongodb

  • mongodb+srv

hosts

Hostname server MongoDB.

Ya

STRING

Tidak ada nilai default

Pisahkan beberapa hostname dengan koma (,).

username

Username yang digunakan untuk menghubungkan ke MongoDB.

Tidak

STRING

Tidak ada nilai default

Tidak berlaku.

password

Password yang digunakan untuk menghubungkan ke MongoDB.

Tidak

STRING

Tidak ada

Tidak berlaku.

database

Nama database MongoDB yang akan ditangkap.

Ya

STRING

Tidak ada nilai default

Ekspresi reguler didukung.

collection

Nama koleksi MongoDB yang akan ditangkap.

Ya

STRING

Tidak ada nilai default

Ekspresi reguler didukung. Anda harus mencocokkan namespace lengkap database.collection.

connection.options

Opsi koneksi tambahan saat menghubungkan ke server MongoDB.

Tidak

STRING

Tidak ada

Tentukan pasangan kunci-nilai (&-dipisahkan k=v) seperti replicaSet=test&connectTimeoutMS=300000.

schema.inference.strategy

Strategi untuk inferensi tipe dokumen.

Nilai yang valid: continuous dan static.

Tidak

STRING

continuous

Saat diatur ke continuous, MongoDB Source terus-menerus melakukan inferensi tipe. Saat skema data masuk berbeda dari skema saat ini, MongoDB Source mengeluarkan event perubahan skema untuk memperluas struktur dan mengakomodasi data baru.

Saat diatur ke static, MongoDB Source hanya melakukan inferensi skema sekali selama inisialisasi.

scan.max.pre.fetch.records

Jumlah maksimum catatan yang diambil sampelnya di setiap koleksi yang ditangkap selama inferensi skema awal.

Tidak

INT

50

Tidak berlaku.

scan.startup.mode

Mode startup sumber data MongoDB.

Nilai yang valid: initial, latest-offset, timestamp, dan snapshot.

Tidak

STRING

initial

Nilai yang valid:

  • initial: Menarik semua data dari offset awal dan secara otomatis beralih ke pembacaan inkremental.

  • latest-offset: Menarik data perubahan dari offset OpLog terbaru.

  • timestamp: Menarik data perubahan dari timestamp tertentu.

  • snapshot: Melakukan snapshot satu kali terhadap status database saat ini.

scan.startup.timestamp-millis

Saat mode startup diatur ke timestamp, menangkap data perubahan dari timestamp yang ditentukan.

Tidak

LONG

Tidak ada

Tidak berlaku.

chunk-meta.group.size

Ukuran chunk metadata maksimum.

Tidak

INT

1000

Tidak berlaku.

scan.incremental.close-idle-reader.enabled

Menentukan apakah akan menutup pembaca sumber yang tidak aktif setelah beralih ke pembacaan inkremental.

Tidak

BOOLEAN

false

Tidak berlaku.

scan.incremental.snapshot.backfill.skip

Menentukan apakah akan melewati proses backfill watermark dalam algoritma snapshot inkremental.

Tidak

BOOLEAN

false

Jika konektor sink Anda mendukung deduplikasi primary-key otomatis, mengaktifkan sakelar ini mengurangi waktu yang dibutuhkan untuk beralih dari pembacaan snapshot ke pembacaan inkremental.

scan.incremental.snapshot.unbounded-chunk-first.enabled

Menentukan apakah akan membaca chunk tak terbatas terlebih dahulu dalam kerangka kerja snapshot inkremental.

Tidak

BOOLEAN

false

Jika koleksi yang akan di-snapshot sering diperbarui, mengaktifkan fitur ini mengurangi risiko error kehabisan memori saat membaca chunk tak terbatas.

batch.size

Ukuran batch untuk kursor saat membaca data dari MongoDB.

Tidak

INT

1024

Tidak berlaku.

poll.max.batch.size

Jumlah maksimum entri yang diminta saat menarik aliran perubahan.

Tidak

INT

1024

Tidak berlaku.

poll.await.time.ms

Waktu tunggu minimum antara dua permintaan saat menarik perubahan Change Stream.

Tidak

INT

1000

Unit: milidetik.

heartbeat.interval.ms

Interval pengiriman paket heartbeat.

Tidak

INT

0

Unit: milidetik.

Konektor CDC MongoDB mengirim paket heartbeat ke database MongoDB untuk memastikan status backtracking terbaru. Mengatur nilai ini ke 0 akan menonaktifkan paket heartbeat.

Catatan

Konfigurasikan opsi ini untuk koleksi yang jarang diperbarui.

scan.incremental.snapshot.chunk.size.mb

Ukuran shard selama fase snapshot.

Tidak

INT

64

Unit: MB.

scan.incremental.snapshot.chunk.samples

Jumlah sampel untuk menentukan ukuran koleksi selama snapshot.

Tidak

INT

20

Tidak berlaku.

scan.full-changelog

Menentukan apakah akan menghasilkan aliran event changelog lengkap berdasarkan catatan preimage dan postimage MongoDB.

Tidak

BOOLEAN

false

Memerlukan MongoDB 6.0 atau versi yang lebih baru dengan fitur preimage dan postimage diaktifkan. Untuk petunjuknya, lihat Document Preimages.

scan.cursor.no-timeout

Menentukan apakah akan menonaktifkan timeout kursor.

Tidak

BOOLEAN

false

Server MongoDB biasanya menutup kursor yang tidak aktif setelah 10 menit untuk mencegah masalah penggunaan memori. Menyetel opsi ini ke true mencegah perilaku tersebut.

scan.ignore-delete.enabled

Menentukan apakah akan mengabaikan event hapus dari MongoDB.

Tidak

BOOLEAN

false

Tidak berlaku.

scan.flatten.nested-documents.enabled

Apakah akan meratakan struktur bersarang dalam dokumen BSON.

Tidak

BOOLEAN

false

Saat diaktifkan, skema seperti {"doc": {"foo": 1, "bar": "two"}} menjadi doc.foo INT, doc.bar STRING.

scan.all.primitives.as-string.enabled

Menyimpulkan semua tipe primitif sebagai STRING.

Tidak

BOOLEAN

false

Mengaktifkan ini menghindari event perubahan skema yang sering terjadi saat tipe data upstream tidak konsisten.

metadata.list

Daftar metadata yang diteruskan ke downstream.

Tidak

STRING

Tidak ada.

Pisahkan beberapa item metadata dengan koma.

Metadata yang didukung:

  • ts_ms: Timestamp event yang dicatat di OpLog MongoDB.

  • op_ts: Alias untuk ts_ms. Gunakan op_ts saat menulis metadata ke Kafka JSON.

Pemetaan tipe data

Tipe BSON

Tipe CDC

Catatan

STRING

VARCHAR

Tidak berlaku.

INT32

INT

INT64

BIGINT

DECIMAL128

DECIMAL

DOUBLE

DOUBLE

BOOLEAN

BOOLEAN

TIMESTAMP

TIMESTAMP

DATETIME

LOCALZONEDTIMESTAMP

BINARY

VARBINARY

DOCUMENT

MAP

Tipe kunci dan nilai disimpulkan.

ARRAY

ARRAY

Tipe elemen disimpulkan.

OBJECTID

VARCHAR

Dinyatakan sebagai HexString.

SYMBOL

REGULAREXPRESSION

JAVASCRIPT

JAVASCRIPTWITHSCOPE

VARCHAR

Dinyatakan sebagai string.

Metadata

Konektor SQL

Tabel sumber SQL CDC MongoDB mendukung sintaksis kolom metadata. Anda dapat mengakses kolom metadata berikut:

kunci metadata

Tipe metadata

Deskripsi

database_name

STRING NOT NULL

Nama database yang berisi dokumen.

collection_name

STRING NOT NULL

Nama koleksi yang berisi dokumen.

op_ts

TIMESTAMP_LTZ(3) NOT NULL

Waktu dokumen berubah di database. Jika dokumen berasal dari data tabel historis dan bukan dari ChangeStream, nilai ini selalu 0.

row_kind

STRING NOT NULL

Tipe event perubahan data. Nilai yang valid:

  • +I: INSERT

  • -D: DELETE

  • -U: UPDATE_BEFORE

  • +U: UPDATE_AFTER

Catatan

Hanya didukung di VVR 11.1 atau versi yang lebih baru.

YAML Ingesti Data

Konektor YAML ingesti data CDC MongoDB mendukung kolom metadata berikut:

Kolom metadata

Tipe metadata

Deskripsi

ts_ms

BIGINT NOT NULL

Waktu dokumen berubah di database. Jika dokumen berasal dari data tabel historis dan bukan dari ChangeStream, nilai ini selalu 0.

Anda juga dapat menggunakan kolom metadata generik yang disediakan oleh modul Transform untuk mengakses informasi nama database, nama koleksi, dan row_kind.

Fitur preimage dan postimage

Secara default, versi MongoDB sebelum 6.0 tidak menyimpan dokumen sebelum perubahan atau dokumen yang dihapus. Tanpa fitur preimage dan postimage diaktifkan, MongoDB hanya mendukung semantik UPSERT, yang berarti event UPDATE_BEFORE tidak tersedia. Namun, banyak operator Flink yang berguna memerlukan aliran changelog lengkap yang mencakup event INSERT, UPDATE_BEFORE, UPDATE_AFTER, dan DELETE.

Untuk melengkapi event UPDATE_BEFORE yang hilang, perencana Flink SQL secara otomatis menghasilkan operator ChangelogNormalize untuk sumber data tipe UPSERT. Operator ini menyimpan snapshot versi terkini semua dokumen dalam data state deployment. Saat dokumen diperbarui atau dihapus, Anda dapat mengkueri data state di ChangelogNormalize untuk mendapatkan status sebelum pembaruan. Namun, hal ini memerlukan penyimpanan data state dalam jumlah besar.

image.png

MongoDB 6.0 mendukung fitur preimage dan postimage. Untuk informasi selengkapnya, lihat Gunakan Change Streams MongoDB untuk menangkap perubahan data secara real time. Saat Anda mengaktifkan fitur ini, MongoDB mencatat status lengkap dokumen sebelum dan sesudah setiap perubahan dalam koleksi khusus. Jika Anda mengaktifkan opsi scan.full-changelog dalam pekerjaan Anda, konektor CDC MongoDB menggunakan catatan dokumen perubahan ini untuk menghasilkan catatan UPDATE_BEFORE. Hal ini memungkinkan konektor menghasilkan aliran event lengkap dan menghilangkan ketergantungan pada operator ChangelogNormalize.

API DataStream CDC MongoDB

Penting

Saat membaca atau menulis data menggunakan DataStream, gunakan konektor DataStream yang sesuai untuk menghubungkan ke Flink. Untuk informasi selengkapnya tentang cara menyiapkan konektor DataStream, lihat Penggunaan konektor DataStream.

Anda dapat membuat program API DataStream dan menggunakan MongoDBSource. Kode berikut memberikan contoh:

Java

MongoDBSource.builder()
  .hosts("mongo.example.com:27017")
  .username("mongouser")
  .password("mongopasswd")
  .databaseList("testdb")
  .collectionList("testcoll")
  .startupOptions(StartupOptions.initial())
  .deserializer(new JsonDebeziumDeserializationSchema())
  .build();

XML

Repositori Maven Central menyediakan konektor MongoDB VVR yang dapat Anda gunakan langsung dalam pengembangan pekerjaan.

<dependency>
    <groupId>com.alibaba.ververica</groupId>
    <artifactId>flink-connector-mongodb</artifactId>
    <version>${vvr.version}</version>
</dependency>
Catatan

Saat menggunakan API DataStream, paket yang Anda gunakan untuk membangun sumber data MongoDBSource bergantung pada apakah Anda ingin mengaktifkan fitur snapshot inkremental. Untuk mengaktifkan fitur snapshot inkremental, gunakan MongoDBSource#builder() dari paket com.ververica.cdc.connectors.mongodb.source. Jika tidak, gunakan MongoDBSource#builder() dari com.ververica.cdc.connectors.mongodb.

Anda dapat mengonfigurasi parameter berikut saat membangun MongoDBSource:

Parameter

Deskripsi

hosts

Hostname database MongoDB yang akan dihubungkan.

username

Username untuk layanan database MongoDB.

Catatan

Jika autentikasi tidak diaktifkan di server MongoDB, Anda tidak perlu mengonfigurasi parameter ini.

password

Password untuk layanan database MongoDB.

Catatan

Jika autentikasi tidak diaktifkan di server MongoDB, Anda tidak perlu mengonfigurasi parameter ini.

databaseList

Nama database MongoDB yang akan dipantau.

Catatan

Nama database mendukung ekspresi reguler untuk membaca data dari beberapa database. Gunakan .* untuk mencocokkan semua database.

collectionList

Nama koleksi MongoDB yang akan dipantau.

Catatan

Nama koleksi mendukung ekspresi reguler untuk membaca data dari beberapa koleksi. Gunakan .* untuk mencocokkan semua koleksi.

startupOptions

Pilih mode startup untuk CDC MongoDB.

Nilai yang valid:

  • StartupOptions.initial()

    • Menarik semua data dari offset awal.

  • StartupOptions.latest-offset()

    • Menarik data perubahan dari offset saat ini.

  • StartupOptions.timestamp()

    • Menarik data perubahan dari timestamp tertentu.

Untuk informasi selengkapnya, lihat Startup Properties.

deserializer

Deserializer yang mendeserialisasi catatan tipe SourceRecord ke tipe tertentu. Nilai yang valid:

  • MongoDBConnectorDeserializationSchema: Mengonversi SourceRecord yang dihasilkan dalam mode Upsert menjadi struktur data RowData internal Flink Table API atau SQL API.

  • MongoDBConnectorFullChangelogDeserializationSchema: Mengonversi SourceRecord yang dihasilkan dalam mode Full Changelog menjadi struktur data RowData internal Flink Table atau SQL.

  • JsonDebeziumDeserializationSchema: Mengonversi SourceRecord menjadi string berformat JSON.