Topik ini menjelaskan cara menggunakan Konektor MongoDB.
Informasi latar belakang
MongoDB adalah database berorientasi dokumen untuk data tidak terstruktur yang menyederhanakan pengembangan dan penskalaan aplikasi. Konektor MongoDB mendukung fitur-fitur berikut:
Kategori | Rincian |
Jenis yang didukung | Tabel sumber, tabel dimensi, tabel sink, dan Ingesti Data |
Mode runtime | Hanya mode stream yang didukung. |
Metrik pemantauan unik | |
Jenis API | DataStream, SQL, dan YAML ingesti data |
Dukungan untuk pembaruan atau penghapusan di tabel sink | Ya |
Fitur
Tabel sumber Change Data Capture (CDC) MongoDB menggunakan API Change Stream untuk menangkap data lengkap maupun inkremental. Pertama-tama, tabel ini membaca Snapshot lengkap dari data historis, lalu beralih secara mulus ke pembacaan oplog inkremental. Proses ini memastikan bahwa data tidak mengalami duplikasi maupun kehilangan. Konektor ini juga mendukung semantik Exactly-Once untuk menjamin konsistensi data selama pemulihan kesalahan.
Berdasarkan API Change Stream
Menggunakan API Change Stream dari MongoDB 3.6 untuk menangkap perubahan seperti insert, update, replacement, dan delete dari database atau koleksi secara efisien. Perubahan tersebut dikonversi menjadi aliran changelog yang dapat diproses oleh Flink.
Integrasi lengkap dan inkremental
Membaca Snapshot awal secara otomatis dan beralih dengan lancar ke mode inkremental tanpa intervensi manual.
Pembacaan Snapshot paralel
Mendukung pembacaan paralel data historis untuk meningkatkan performa. Fitur ini memerlukan MongoDB 4.0 atau versi lebih baru.
Beberapa mode startup
initial: Melakukan Snapshot lengkap saat startup pertama kali, lalu terus-menerus membaca oplog.latest-offset: Memulai dari akhir oplog saat ini tanpa membaca data historis.timestamp: Mulai membaca oplog dari timestamp tertentu dan melewati Snapshot. Fitur ini memerlukan MongoDB 4.0 atau versi lebih baru.
Dukungan changelog lengkap
Mendukung keluaran changelog lengkap yang mencakup pra-gambar dan post-image (untuk MongoDB 6.0 atau versi lebih baru dengan fitur pencatatan pra-gambar/post-image diaktifkan).
Peningkatan integrasi Flink
VVR 8.0.6 dan versi lebih baru
Mendukung penggunaan pernyataan CREATE TABLE AS (CTAS) atau pernyataan CREATE DATABASE AS (CDAS) untuk menyinkronkan perubahan data dan skema dari MongoDB ke sistem downstream serta mengaktifkan fitur pencatatan pra-gambar/post-image.
VVR 8.0.9 dan versi lebih baru
Memperluas kemampuan join tabel dimensi untuk mendukung pembacaan field bawaan
_idbertipe ObjectId.
Prasyarat
Persyaratan instans MongoDB
Hanya MongoDB Alibaba Cloud 3.6 atau versi lebih baru (ReplicaSet atau kluster sharded) dan MongoDB self-managed 3.6 atau versi lebih baru yang didukung.
Fitur ReplicaSet harus diaktifkan untuk database MongoDB yang ingin Anda pantau. Untuk informasi selengkapnya, lihat Replication.
Ketergantungan fitur MongoDB
Untuk menggunakan fitur aliran event Full Changelog, Anda harus mengaktifkan fitur pencatatan pra-gambar/post-image.
Jika autentikasi diaktifkan untuk MongoDB, Anda memerlukan izin database berikut.
Persiapan jaringan dan lainnya untuk MongoDB
Daftar putih alamat IP telah dikonfigurasi untuk mengizinkan Flink mengakses MongoDB.
Koleksi dan data MongoDB target telah dibuat.
Batasan
Tabel sumber CDC
MongoDB 4.0 dan versi lebih baru mendukung pembacaan paralel selama fase Snapshot awal. Untuk mengaktifkan mode paralel pada Snapshot awal, atur parameter
scan.incremental.snapshot.enabledke true.Karena batasan langganan Change Stream MongoDB, Anda tidak dapat membaca data dari database admin, local, atau config, maupun dari koleksi sistem. Untuk informasi selengkapnya, lihat dokumentasi MongoDB.
Tabel sink
Ververica Runtime (VVR) versi sebelum 8.0.5 hanya mendukung operasi insert data.
Pada VVR 8.0.5 dan versi lebih baru, jika primary key dideklarasikan di tabel sink, Anda dapat melakukan insert, update, dan delete data. Jika tidak ada primary key yang dideklarasikan, Anda hanya dapat melakukan insert data.
Tabel dimensi
VVR 8.0.5 dan versi lebih baru mendukung penggunaan tabel dimensi MongoDB.
SQL
Sintaksis
CREATE TABLE tableName(
_id STRING,
[columnName dataType,]*
PRIMARY KEY(_id) NOT ENFORCED
) WITH (
'connector' = 'mongodb',
'hosts' = 'localhost:27017',
'username' = 'mongouser',
'password' = '${secret_values.password}',
'database' = 'testdb',
'collection' = 'testcoll'
)Saat membuat tabel sumber CDC, Anda harus mendeklarasikan kolom _id STRING dan menetapkannya sebagai kunci primer unik.
Parameter WITH
Parameter umum
Parameter | Deskripsi | Tipe data | Wajib | Nilai default | Catatan |
connector | Nama konektor. | String | Ya | Tidak ada |
|
uri | URI koneksi MongoDB. | String | Tidak | Tidak ada | Catatan Anda harus menentukan salah satu dari parameter |
hosts | Hostname instans MongoDB. | String | Tidak | Tidak ada | Anda dapat memberikan beberapa hostname yang dipisahkan koma ( |
scheme | Protokol koneksi yang digunakan oleh MongoDB. | String | Tidak | mongodb | Nilai valid:
|
username | Username untuk menghubungkan ke MongoDB. | String | Tidak | Tidak ada | Parameter ini wajib jika verifikasi identitas diaktifkan. |
password | Password untuk menghubungkan ke MongoDB. | String | Tidak | Tidak ada | Parameter ini wajib jika verifikasi identitas diaktifkan. Penting Untuk mencegah kebocoran password, kami menyarankan agar Anda menggunakan variabel untuk mengatur nilai password. Untuk informasi selengkapnya, lihat Variabel Proyek. |
database | Nama database MongoDB. | String | Tidak | Tidak ada |
Penting Pemantauan data di database admin, local, atau config tidak didukung. |
collection | Nama koleksi MongoDB. | String | Tidak | Tidak ada |
Penting Pemantauan data di koleksi sistem tidak didukung. |
connection.options | Parameter koneksi untuk sisi MongoDB. | String | Tidak | Tidak ada | Parameter koneksi tambahan dalam format Penting Secara default, MongoDB CDC tidak mengatur timeout koneksi socket secara otomatis. Hal ini dapat menyebabkan gangguan panjang selama fluktuasi jaringan. Atur socketTimeoutMS ke nilai yang wajar untuk menghindari masalah ini. |
Khusus tabel sumber
Parameter | Deskripsi | Tipe data | Wajib | Nilai default | Catatan |
scan.startup.mode | Mode startup untuk MongoDB CDC. | String | Tidak | initial | Nilai valid:
Untuk informasi lebih lanjut, lihat Properti Startup. |
scan.startup.timestamp-millis | Timestamp awal untuk konsumsi pada offset tertentu. | Long | Bergantung pada nilai scan.startup.mode:
| Tidak ada | Formatnya adalah jumlah milidetik sejak epoch Linux. Parameter ini hanya berlaku untuk mode startup |
initial.snapshotting.queue.size | Batas ukuran antrian untuk Snapshot awal. | Integer | Tidak | 10240 | Parameter ini hanya berlaku ketika opsi |
batch.size | Ukuran batch untuk kursor. | Integer | Tidak | 1024 | Tidak ada. |
poll.max.batch.size | Jumlah maksimum dokumen perubahan dalam satu batch. | Integer | Tidak | 1024 | Parameter ini mengontrol jumlah maksimum dokumen perubahan yang ditarik sekaligus selama pemrosesan aliran. Nilai yang lebih besar menghasilkan buffer yang lebih besar dialokasikan dalam konektor. |
poll.await.time.ms | Interval waktu antara dua penarikan data. | Integer | Tidak | 1000 | Unitnya adalah milidetik. |
heartbeat.interval.ms | Interval waktu untuk mengirim heartbeat. | Integer | Tidak | 0 | Unitnya adalah milidetik. Konektor MongoDB CDC secara aktif mengirim heartbeat ke database untuk memastikan token resume tetap mutakhir. Nilai 0 berarti heartbeat tidak pernah dikirim. Penting Untuk koleksi yang jarang diperbarui, kami sangat menyarankan mengatur opsi ini. |
scan.incremental.snapshot.enabled | Menentukan apakah akan mengaktifkan mode paralel untuk Snapshot awal. | Boolean | Tidak | false | Ini adalah fitur eksperimental. |
scan.incremental.snapshot.chunk.size.mb | Ukuran chunk untuk membaca Snapshot dalam mode paralel. | Integer | Tidak | 64 | Ini adalah fitur eksperimental. Unitnya adalah MB. Parameter ini hanya berlaku ketika Snapshot paralel diaktifkan. |
scan.full-changelog | Menghasilkan aliran event changelog lengkap. | Boolean | Tidak | false | Ini adalah fitur eksperimental. Catatan Memerlukan MongoDB 6.0 atau versi lebih baru dengan fitur pra-gambar dan post-image diaktifkan. Untuk informasi selengkapnya tentang cara mengaktifkan fitur ini, lihat Document Preimages. |
scan.flatten-nested-columns.enabled | Menentukan apakah akan mengurai nama field yang dipisahkan titik ( | Boolean | Tidak | false | Jika diaktifkan, pada contoh dokumen BSON berikut, field Catatan Parameter ini hanya didukung di VVR 8.0.5 dan yang lebih baru. |
scan.primitive-as-string | Menentukan apakah akan mengurai semua tipe primitif dalam dokumen BSON sebagai string. | Boolean | Tidak | false | Catatan Parameter ini hanya didukung di VVR 8.0.5 dan yang lebih baru. |
scan.ignore-delete.enabled | Menentukan apakah akan mengabaikan pesan hapus (-D). | Boolean | Tidak | false | Saat mengarsipkan data dari sumber MongoDB, banyak event DELETE dapat dihasilkan di OpLog. Jika Anda tidak ingin menyinkronkan event-event ini ke downstream, Anda dapat mengaktifkan parameter ini untuk mengabaikan event delete. Catatan
|
scan.incremental.snapshot.backfill.skip | Menentukan apakah akan melewati proses backfill watermark dari algoritma Snapshot inkremental. | Boolean | Tidak | false | Mengaktifkan sakelar ini hanya menyediakan semantik at-least-once. Catatan Parameter ini hanya didukung di VVR 11.1 dan yang lebih baru. |
initial.snapshotting.pipeline | Operasi pipeline MongoDB. Selama fase pembacaan Snapshot, operasi ini didorong ke MongoDB untuk memfilter hanya data yang diperlukan, sehingga meningkatkan efisiensi baca. | String | Tidak | Tidak ada. |
|
initial.snapshotting.max.threads | Jumlah thread yang digunakan untuk replikasi data. | Integer | Tidak | Tidak ada. | Parameter ini hanya berlaku ketika opsi scan.startup.mode diatur ke initial. Catatan Parameter ini hanya didukung di VVR 11.1 dan yang lebih baru. |
initial.snapshotting.queue.size | Ukuran antrian untuk snapshot awal. | Integer | Tidak | 16000 | Parameter ini hanya berlaku ketika opsi scan.startup.mode diatur ke initial. Catatan Parameter ini hanya didukung di VVR 11.1 dan yang lebih baru. |
scan.change-stream.reading.parallelism | Tingkat paralelisme untuk berlangganan Change Stream. | Integer | Tidak | 1 | Parameter ini hanya berlaku ketika parameter scan.incremental.snapshot.enabled diaktifkan. Penting Untuk berlangganan Change Stream dengan beberapa thread konkuren, Anda juga harus mengatur parameter heartbeat.interval.ms. Catatan Parameter ini hanya didukung di VVR 11.2 dan yang lebih baru. |
scan.change-stream.reading.queue-size | Ukuran antrian pesan untuk langganan Change Stream konkuren. | Integer | Tidak | 16384 | Parameter ini hanya berlaku ketika parameter scan.change-stream.reading.parallelism diaktifkan. Catatan Parameter ini hanya didukung di VVR 11.2 dan yang lebih baru. |
Khusus tabel dimensi
Parameter | Deskripsi | Tipe data | Wajib | Nilai default | Catatan |
lookup.cache | Kebijakan cache. | String | Tidak | NONE | Kebijakan cache berikut didukung:
|
lookup.max-retries | Jumlah maksimum percobaan ulang jika kueri database gagal. | Integer | Tidak | 3 | Tidak ada. |
lookup.retry.interval | Interval waktu percobaan ulang jika kueri database gagal. | Durasi | Tidak | 1s | Tidak ada. |
lookup.partial-cache.expire-after-access | Periode retensi maksimum untuk catatan di cache. | Durasi | Tidak | Tidak ada | Unit waktu yang didukung adalah ms, s, min, h, dan d. Saat menggunakan konfigurasi ini, |
lookup.partial-cache.expire-after-write | Periode retensi maksimum untuk catatan setelah ditulis ke cache. | Durasi | Tidak | Tidak ada | Saat menggunakan konfigurasi ini, |
lookup.partial-cache.max-rows | Jumlah maksimum baris di cache. Jika nilai ini terlampaui, baris terlama akan kedaluwarsa. | Long | Tidak | Tidak ada | Saat menggunakan konfigurasi ini, |
lookup.partial-cache.cache-missing-key | Menentukan apakah akan menyimpan catatan kosong di cache saat tidak ditemukan data di tabel fisik. | Boolean | Tidak | True | Saat menggunakan konfigurasi ini, |
Khusus tabel sink
Parameter | Deskripsi | Tipe data | Wajib | Nilai default | Catatan |
sink.buffer-flush.max-rows | Jumlah maksimum catatan untuk setiap penulisan batch. | Integer | Tidak | 1000 | Tidak ada. |
sink.buffer-flush.interval | Interval refresh untuk menulis data. | Durasi | Tidak | 1s | Tidak ada. |
sink.delivery-guarantee | Jaminan semantik untuk menulis data. | String | Tidak | at-least-once | Nilai valid:
Catatan Exactly-once saat ini tidak didukung. |
sink.max-retries | Jumlah maksimum percobaan ulang jika penulisan ke database gagal. | Integer | Tidak | 3 | Tidak ada. |
sink.retry.interval | Interval waktu percobaan ulang jika penulisan ke database gagal. | Durasi | Tidak | 1s | Tidak ada. |
sink.parallelism | Tingkat paralelisme sink kustom. | Integer | Tidak | Kosong | Tidak ada. |
sink.delete-strategy | Mengonfigurasi cara menangani tipe data -D atau -U. | String | Tidak | CHANGELOG_STANDARD | Nilai yang valid:
|
Pemetaan tipe
Tabel sumber CDC
Tipe BSON | Tipe Flink SQL |
Int32 | INT |
Int64 | BIGINT |
Double | DOUBLE |
Decimal128 | DECIMAL(p, s) |
Boolean | BOOLEAN |
Date Timestamp | DATE |
Date Timestamp | TIME |
DateTime | TIMESTAMP(3) TIMESTAMP_LTZ(3) |
Timestamp | TIMESTAMP(0) TIMESTAMP_LTZ(0) |
String ObjectId UUID Symbol MD5 JavaScript Regex | STRING |
Binary | BYTES |
Object | ROW |
Array | ARRAY |
DBPointer | ROW<$ref STRING, $id STRING> |
GeoJSON | Point: ROW<type STRING, coordinates ARRAY<DOUBLE>> Line: ROW<type STRING, coordinates ARRAY<ARRAY< DOUBLE>>> |
Tabel dimensi dan sink
Tipe BSON | Tipe Flink SQL |
Int32 | INT |
Int64 | BIGINT |
Double | DOUBLE |
Decimal128 | DECIMAL |
Boolean | BOOLEAN |
DateTime | TIMESTAMP_LTZ(3) |
Timestamp | TIMESTAMP_LTZ(0) |
String ObjectId | STRING |
Binary | BYTES |
Object | ROW |
Array | ARRAY |
Contoh
Tabel sumber CDC
CREATE TEMPORARY TABLE mongo_source (
`_id` STRING, --harus dideklarasikan
name STRING,
weight DECIMAL,
tags ARRAY<STRING>,
price ROW<amount DECIMAL, currency STRING>,
suppliers ARRAY<ROW<name STRING, address STRING>>,
db_name STRING METADATA FROM 'database_name' VIRTUAL,
collection_name STRING METADATA VIRTUAL,
op_ts TIMESTAMP_LTZ(3) METADATA VIRTUAL,
PRIMARY KEY(_id) NOT ENFORCED
) WITH (
'connector' = 'mongodb',
'hosts' = 'dds-bp169b982fc25****.mongodb.rds.aliyuncs.com:3717,dds-bp169b982fc25****.mongodb.rds.aliyuncs.com:3717,',
'username' = 'root',
'password' = '${secret_values.password}',
'database' = 'flinktest',
'collection' = 'flinkcollection',
'scan.incremental.snapshot.enabled' = 'true',
'scan.full-changelog' = 'true'
);
CREATE TEMPORARY TABLE productssink (
name STRING,
weight DECIMAL,
tags ARRAY<STRING>,
price_amount DECIMAL,
suppliers_name STRING,
db_name STRING,
collection_name STRING,
op_ts TIMESTAMP_LTZ(3)
) WITH (
'connector' = 'print',
'logger' = 'true'
);
INSERT INTO productssink
SELECT
name,
weight,
tags,
price.amount,
suppliers[1].name,
db_name,
collection_name,
op_ts
FROM
mongo_source;Tabel dimensi
CREATE TEMPORARY TABLE datagen_source (
id STRING,
a int,
b BIGINT,
`proctime` AS PROCTIME()
) WITH (
'connector' = 'datagen'
);
CREATE TEMPORARY TABLE mongo_dim (
`_id` STRING,
name STRING,
weight DECIMAL,
tags ARRAY<STRING>,
price ROW<amount DECIMAL, currency STRING>,
suppliers ARRAY<ROW<name STRING, address STRING>>,
PRIMARY KEY(_id) NOT ENFORCED
) WITH (
'connector' = 'mongodb',
'hosts' = 'dds-bp169b982fc25****.mongodb.rds.aliyuncs.com:3717,dds-bp169b982fc25****.mongodb.rds.aliyuncs.com:3717,',
'username' = 'root',
'password' = '${secret_values.password}',
'database' = 'flinktest',
'collection' = 'flinkcollection',
'lookup.cache' = 'PARTIAL',
'lookup.partial-cache.expire-after-access' = '10min',
'lookup.partial-cache.expire-after-write' = '10min',
'lookup.partial-cache.max-rows' = '100'
);
CREATE TEMPORARY TABLE print_sink (
name STRING,
weight DECIMAL,
tags ARRAY<STRING>,
price_amount DECIMAL,
suppliers_name STRING
) WITH (
'connector' = 'print',
'logger' = 'true'
);
INSERT INTO print_sink
SELECT
T.id,
T.a,
T.b,
H.name
FROM
datagen_source AS T JOIN mongo_dim FOR SYSTEM_TIME AS OF T.`proctime` AS H ON T.id = H._id;Tabel sink
CREATE TEMPORARY TABLE datagen_source (
`_id` STRING,
name STRING,
weight DECIMAL,
tags ARRAY<STRING>,
price ROW<amount DECIMAL, currency STRING>,
suppliers ARRAY<ROW<name STRING, address STRING>>
) WITH (
'connector' = 'datagen'
);
CREATE TEMPORARY TABLE mongo_sink (
`_id` STRING,
name STRING,
weight DECIMAL,
tags ARRAY<STRING>,
price ROW<amount DECIMAL, currency STRING>,
suppliers ARRAY<ROW<name STRING, address STRING>>,
PRIMARY KEY(_id) NOT ENFORCED
) WITH (
'connector' = 'mongodb',
'hosts' = 'dds-bp169b982fc25****.mongodb.rds.aliyuncs.com:3717,dds-bp169b982fc25****.mongodb.rds.aliyuncs.com:3717,',
'username' = 'root',
'password' = '${secret_values.password}',
'database' = 'flinktest',
'collection' = 'flinkcollection'
);
INSERT INTO mongo_sink
SELECT * FROM datagen_source;Penyerapan data
Konektor MongoDB dapat digunakan sebagai sumber data dalam pekerjaan YAML ingesti data.
Batasan
Fitur ini hanya didukung di VVR 11.1 dan versi lebih baru.
Sintaksis
source:
type: mongodb
name: MongoDB Source
hosts: localhost:33076
username: ${mongo.username}
password: ${mongo.password}
database: foo_db
collection: foo_col_.*
sink:
type: ...Item konfigurasi
Parameter | Deskripsi | Wajib | Tipe data | Nilai default | Catatan |
type | Tipe sumber data. | Ya | STRING | None | Nilainya harus mongodb. |
scheme | Protokol untuk menghubungkan ke server MongoDB. | Tidak | STRING | mongodb | Nilai yang valid:
|
hosts | Alamat server untuk menghubungkan ke MongoDB. | Ya | STRING | None | Anda dapat menentukan beberapa alamat yang dipisahkan koma (,). |
username | Username untuk menghubungkan ke MongoDB. | Tidak | STRING | None | None. |
password | Password untuk menghubungkan ke MongoDB. | Tidak | STRING | None | None. |
database | Nama database MongoDB yang akan ditangkap. | Ya | STRING | None | Mendukung ekspresi reguler. |
collection | Nama koleksi MongoDB yang akan ditangkap. | Ya | STRING | None | Mendukung ekspresi reguler. Anda harus mencocokkan namespace lengkap |
connection.options | Opsi koneksi tambahan yang akan ditambahkan saat menghubungkan ke server MongoDB. | Tidak | STRING | None | Pasangan key-value dalam format |
schema.inference.strategy | Strategi untuk inferensi tipe dokumen. Nilai yang valid adalah | Tidak | STRING |
| Jika diatur ke Jika diatur ke |
scan.max.pre.fetch.records | Jumlah maksimum catatan yang diambil sampel dari setiap koleksi yang ditangkap selama inferensi awal. | Tidak | INT | 50 | None. |
scan.startup.mode | Menentukan mode startup untuk sumber data MongoDB. Nilai yang valid adalah | Tidak | STRING | initial | Nilai yang valid:
|
scan.startup.timestamp-millis | Saat mode startup diatur ke | Tidak | LONG | None | None. |
chunk-meta.group.size | Menetapkan batas ukuran chunk metadata. | Tidak | INT | 1000 | None. |
scan.incremental.close-idle-reader.enabled | Menentukan apakah akan menutup Pembaca Sumber yang menganggur setelah beralih ke mode inkremental. | Tidak | BOOLEAN | false | None. |
scan.incremental.snapshot.backfill.skip | Menentukan apakah akan melewati proses backfill watermark dari algoritma Snapshot inkremental. | Tidak | BOOLEAN | false | Jika konektor sink yang Anda gunakan dapat secara otomatis menghapus duplikat berdasarkan primary key, mengaktifkan sakelar ini dapat mengurangi waktu yang dibutuhkan untuk transisi dari lengkap ke inkremental. |
scan.incremental.snapshot.unbounded-chunk-first.enabled | Menentukan apakah akan membaca chunk tak terbatas terlebih dahulu saat menjalankan algoritma Snapshot inkremental. | Tidak | BOOLEAN | false | Jika koleksi yang Anda ambil Snapshot-nya sering diperbarui, mengaktifkan fitur ini dapat mengurangi kemungkinan terjadinya error kehabisan memori saat membaca chunk tak terbatas. |
batch.size | Ukuran batch kursor untuk membaca data MongoDB. | Tidak | INT | 1024 | None. |
poll.max.batch.size | Jumlah maksimum entri per permintaan saat menarik Change Stream. | Tidak | INT | 1024 | None. |
poll.await.time.ms | Waktu tunggu minimum antar permintaan saat menarik Change Stream. | Tidak | INT | 1000 | Unitnya adalah milidetik. |
heartbeat.interval.ms | Interval waktu untuk mengirim heartbeat. | Tidak | INT | 0 | Unitnya adalah milidetik. Konektor MongoDB CDC secara aktif mengirim heartbeat ke database untuk memastikan token resume tetap mutakhir. Nilai 0 berarti heartbeat tidak pernah dikirim. Catatan Untuk koleksi yang jarang diperbarui, kami sangat menyarankan mengatur opsi ini. |
scan.incremental.snapshot.chunk.size.mb | Ukuran chunk selama fase Snapshot. | Tidak | INT | 64 | Unitnya adalah MB. |
scan.incremental.snapshot.chunk.samples | Jumlah sampel yang digunakan saat menentukan ukuran koleksi selama fase Snapshot. | Tidak | INT | 20 | None. |
scan.full-changelog | Menentukan apakah akan menghasilkan aliran event changelog lengkap berdasarkan catatan Mongo Pre- dan Post-Image. | Tidak | BOOLEAN | false | Memerlukan MongoDB 6.0 atau versi lebih baru dengan fitur pra-gambar dan post-image diaktifkan. Untuk informasi selengkapnya tentang cara mengaktifkan fitur ini, lihat Document Preimages. |
scan.cursor.no-timeout | Menentukan apakah akan mengatur kursor untuk membaca data agar tidak pernah kedaluwarsa. | Tidak | BOOLEAN | false | Server MongoDB biasanya menutup kursor setelah menganggur selama periode tertentu (10 menit) untuk mencegah penggunaan memori tinggi. Mengatur opsi ini ke true mencegah hal tersebut terjadi. |
scan.ignore-delete.enabled | Menentukan apakah akan mengabaikan catatan event hapus dari sumber MongoDB. | Tidak | BOOLEAN | false | None. |
scan.flatten.nested-documents.enabled | Menentukan apakah akan meratakan struktur bersarang dalam dokumen BSON. | Tidak | BOOLEAN | false | Saat opsi ini diaktifkan, skema seperti |
scan.all.primitives.as-string.enabled | Menentukan apakah akan menginferensi semua tipe primitif sebagai STRING. | Tidak | BOOLEAN | false | Mengaktifkan opsi ini dapat menghindari pembuatan banyak event evolusi skema saat menangani tipe data input campuran. |
Pemetaan tipe
Tipe BSON | Tipe CDC | Catatan |
STRING | VARCHAR | None. |
INT32 | INT | |
INT64 | BIGINT | |
DECIMAL128 | DECIMAL | |
DOUBLE | DOUBLE | |
BOOLEAN | BOOLEAN | |
TIMESTAMP | TIMESTAMP | |
DATETIME | LOCALZONEDTIMESTAMP | |
BINARY | VARBINARY | |
DOCUMENT | MAP | Parameter tipe Key/Value perlu diinferensi. |
ARRAY | ARRAY | Parameter tipe elemen perlu diinferensi. |
OBJECTID | VARCHAR | Direpresentasikan sebagai HexString. |
SYMBOL REGULAREXPRESSION JAVASCRIPT JAVASCRIPTWITHSCOPE | VARCHAR | Direpresentasikan sebagai string. |
Metadata
Konektor SQL
Tabel sumber SQL CDC MongoDB mendukung sintaksis kolom metadata. Anda dapat mengakses metadata berikut melalui kolom metadata.
Kunci metadata | Tipe metadata | Deskripsi |
database_name | STRING NOT NULL | Nama database yang berisi dokumen. |
collection_name | STRING NOT NULL | Nama koleksi yang berisi dokumen. |
op_ts | TIMESTAMP_LTZ(3) NOT NULL | Waktu dokumen diubah di database. Jika dokumen berasal dari data historis yang sudah ada di tabel, bukan dari ChangeStream, nilai ini selalu 0. |
row_kind | STRING NOT NULL | Menunjukkan jenis perubahan data. Nilai yang valid:
Catatan Fitur ini hanya didukung di VVR 11.1 dan versi lebih baru. |
YAML Ingesti Data
Konektor YAML ingesti data CDC MongoDB mendukung pembacaan kolom metadata berikut:
Kunci metadata | Tipe metadata | Deskripsi |
ts_ms | BIGINT NOT NULL | Waktu dokumen diubah di database. Jika dokumen berasal dari data historis yang sudah ada di tabel, bukan dari ChangeStream, nilai ini selalu 0. |
Anda juga dapat menggunakan kolom metadata umum yang disediakan oleh modul Transform untuk mengakses informasi nama database, nama koleksi, dan row_kind.
Tentang fitur pencatatan pra-gambar dan post-image di MongoDB
Secara default, versi MongoDB sebelum 6.0 tidak menyediakan data untuk dokumen sebelum perubahan atau dokumen yang dihapus. Tanpa fitur ini diaktifkan, informasi yang tersedia hanya dapat mencapai semantik Upsert, artinya entri data Update Before tidak tersedia. Namun, banyak operator berguna di Flink bergantung pada aliran perubahan lengkap yang mencakup event Insert, Update Before, Update After, dan Delete.
Untuk melengkapi event pra-perubahan yang hilang, Flink SQL Planner secara otomatis menghasilkan node ChangelogNormalize untuk sumber data tipe Upsert. Node ini menyimpan Snapshot versi terkini semua dokumen di state Flink. Saat menemukan dokumen yang diperbarui atau dihapus, node ini dapat mencari status pra-perubahan dari Snapshot yang disimpan. Namun, node operator ini memerlukan jumlah data state yang besar.

MongoDB 6.0 mendukung fitur pra- dan post-image untuk database. Untuk informasi selengkapnya, lihat Gunakan Change Stream MongoDB untuk Menangkap Perubahan Data Secara Real Time. Setelah Anda mengaktifkan fitur ini, MongoDB mencatat status lengkap dokumen sebelum dan sesudah setiap perubahan dalam koleksi khusus. Kemudian, saat Anda mengaktifkan item konfigurasi scan.full-changelog dalam pekerjaan, MongoDB CDC menghasilkan catatan Update Before dari catatan dokumen perubahan. Proses ini menciptakan aliran event lengkap dan menghilangkan ketergantungan pada node ChangelogNormalize.
Mongo CDC DataStream API
Saat membaca atau menulis data menggunakan API DataStream, Anda perlu menggunakan konektor DataStream yang sesuai untuk terhubung ke Flink. Untuk petunjuk penyiapan, lihat Gunakan konektor DataStream.
Anda dapat membuat program API DataStream dan menggunakan MongoDBSource. Kode berikut menunjukkan contohnya:
Java
MongoDBSource.builder()
.hosts("mongo.example.com:27017")
.username("mongouser")
.password("mongopasswd")
.databaseList("testdb")
.collectionList("testcoll")
.startupOptions(StartupOptions.initial())
.deserializer(new JsonDebeziumDeserializationSchema())
.build();XML
Konektor MongoDB VVR tersedia di Repositori Maven Central. Anda dapat menggunakannya langsung dalam pengembangan pekerjaan Anda.
<dependency>
<groupId>com.alibaba.ververica</groupId>
<artifactId>flink-connector-mongodb</artifactId>
<version>${vvr.version}</version>
</dependency>Saat menggunakan API DataStream, untuk mengaktifkan fitur Snapshot inkremental, gunakan MongoDBSource#builder() dari paket com.ververica.cdc.connectors.mongodb.source saat membuat MongoDBSource. Jika tidak, gunakan MongoDBSource#builder() dari paket com.ververica.cdc.connectors.mongodb.
Saat membuat MongoDBSource, Anda dapat mengonfigurasi parameter berikut:
Parameter | Deskripsi |
hosts | Hostname database MongoDB yang akan dihubungkan. |
username | Nama pengguna untuk layanan database MongoDB. Catatan Jika autentikasi tidak diaktifkan di server MongoDB, Anda tidak perlu mengonfigurasi parameter ini. |
password | Kata sandi untuk layanan database MongoDB. Catatan Jika autentikasi tidak diaktifkan di server MongoDB, Anda tidak perlu mengonfigurasi parameter ini. |
databaseList | Nama database MongoDB yang akan dipantau. Catatan Nama database mendukung ekspresi reguler untuk membaca data dari beberapa database. Anda dapat menggunakan |
collectionList | Nama koleksi MongoDB yang akan dipantau. Catatan Nama koleksi mendukung ekspresi reguler untuk membaca data dari beberapa koleksi. Anda dapat menggunakan |
startupOptions | Memilih mode startup untuk MongoDB CDC. Nilai valid:
Untuk informasi lebih lanjut, lihat Properti Startup. |
deserializer | Deserializer yang mendeserialisasi catatan tipe SourceRecord ke tipe tertentu. Nilai yang valid:
|