Topik ini menjelaskan cara menggunakan Konektor MongoDB.
Informasi latar belakang
MongoDB adalah database berorientasi dokumen untuk data tidak terstruktur yang menyederhanakan pengembangan dan penskalaan aplikasi. Konektor MongoDB mendukung fitur-fitur berikut:
Kategori | Rincian |
Jenis yang didukung | Tabel sumber, tabel dimensi, tabel sink, dan Ingesti Data |
Mode eksekusi | Hanya mode stream |
Metrik spesifik | |
Jenis API | DataStream, SQL, dan YAML ingesti data |
Mendukung pembaruan atau penghapusan di tabel sink | Ya |
Fitur khusus
Tabel sumber Change Data Capture (CDC) MongoDB menggunakan API Change Stream untuk menangkap data lengkap dan inkremental secara terintegrasi. Pertama-tama, semua data historis dibaca sebagai Snapshot, lalu beralih secara mulus ke pembacaan oplog inkremental. Proses ini memastikan tidak ada data yang hilang atau duplikat. Fitur ini juga mendukung semantik tepat-sekali untuk menjamin konsistensi data selama pemulihan kesalahan.
Berdasarkan API Change Stream
Menggunakan API Change Stream MongoDB 3.6 untuk menangkap event perubahan secara efisien, seperti insert, update, replacement, dan delete, dari database atau koleksi. Event-event tersebut kemudian dikonversi menjadi aliran changelog yang dapat diproses oleh Flink.
Pembacaan lengkap dan inkremental terintegrasi
Membaca Snapshot awal secara otomatis dan beralih dengan lancar ke mode inkremental tanpa intervensi manual.
Pembacaan Snapshot paralel
Mendukung pembacaan paralel data historis untuk meningkatkan performa. Fitur ini memerlukan MongoDB 4.0 atau versi lebih baru.
Beberapa mode startup
initial: Melakukan Snapshot lengkap saat startup awal, lalu terus-menerus membaca oplog.latest-offset: Mulai membaca dari akhir oplog saat ini tanpa membaca data historis.timestamp: Mulai membaca oplog dari timestamp tertentu dan melewati Snapshot. Fitur ini memerlukan MongoDB 4.0 atau versi lebih baru.
Dukungan changelog lengkap
Mendukung output changelog lengkap yang mencakup pra-gambar dan post-image dari perubahan. Fitur ini memerlukan MongoDB 6.0 atau versi lebih baru serta fitur perekaman pra-gambar/post-image harus diaktifkan.
Peningkatan integrasi Flink
VVR 8.0.6+
Mendukung sinkronisasi data dan perubahan skema MongoDB ke sistem downstream menggunakan pernyataan CREATE TABLE AS (CTAS) atau CREATE DATABASE AS (CDAS). Mengaktifkan fitur perekaman pra-gambar/post-image.
VVR 8.0.9+
Versi ini memperluas kemampuan join tabel dimensi untuk mendukung pembacaan field
_iddari tipe ObjectId bawaan.
Prasyarat
Persyaratan instans MongoDB
Hanya MongoDB Alibaba Cloud 3.6 atau versi lebih baru (ReplicaSet atau kluster sharded) dan instans MongoDB yang dikelola sendiri yang didukung.
Anda harus mengaktifkan fitur Replica Set untuk database MongoDB yang ingin dipantau. Untuk informasi selengkapnya, lihat Replication.
Ketergantungan fitur MongoDB
Untuk menggunakan fitur aliran event changelog lengkap, Anda harus mengaktifkan fitur perekaman pra-gambar/post-image.
Jika autentikasi diaktifkan untuk MongoDB, pengguna memerlukan izin database berikut.
Persiapan jaringan dan lainnya untuk MongoDB
Konfigurasikan Daftar putih alamat IP agar Flink dapat mengakses MongoDB.
Koleksi MongoDB target telah dibuat.
Batasan
Tabel sumber CDC
MongoDB 4.0 dan versi lebih baru mendukung pembacaan paralel selama fase Snapshot awal. Untuk mengaktifkan mode paralel untuk Snapshot awal, atur parameter
scan.incremental.snapshot.enabledke true.Karena keterbatasan langganan Change Stream MongoDB, Anda tidak dapat membaca data dari database admin, local, atau config, maupun dari koleksi sistem. Untuk informasi selengkapnya, lihat dokumentasi MongoDB.
Tabel sink
Ververica Runtime (VVR) versi sebelum 8.0.5 hanya mendukung penyisipan data.
Pada VVR 8.0.5 dan versi lebih baru, jika primary key dideklarasikan di tabel sink, operasi insert, update, dan delete didukung. Jika tidak ada primary key yang dideklarasikan, hanya operasi insert yang didukung.
Tabel dimensi
VVR 8.0.5 dan versi lebih baru mendukung penggunaan tabel dimensi MongoDB.
SQL
Sintaksis
CREATE TABLE tableName(
_id STRING,
[columnName dataType,]*
PRIMARY KEY(_id) NOT ENFORCED
) WITH (
'connector' = 'mongodb',
'hosts' = 'localhost:27017',
'username' = 'mongouser',
'password' = '${secret_values.password}',
'database' = 'testdb',
'collection' = 'testcoll'
)Saat membuat tabel sumber CDC, Anda harus mendeklarasikan kolom _id STRING dan menetapkannya sebagai kunci primer unik.
Parameter WITH
Umum
Parameter | Deskripsi | Tipe data | Wajib | Nilai default | Keterangan |
connector | Nama konektor. | String | Ya | Tidak ada |
|
uri | URI koneksi MongoDB. | String | Tidak | Tidak ada | Catatan Anda harus menentukan salah satu antara parameter uri atau hosts. Jika Anda menentukan |
hosts | Hostname server MongoDB. | String | Tidak | Tidak ada | Anda dapat menentukan beberapa hostname yang dipisahkan koma ( |
scheme | Protokol koneksi yang digunakan oleh MongoDB. | String | Tidak | mongodb | Nilai valid:
|
username | Nama pengguna yang digunakan untuk menghubungkan ke MongoDB. | String | Tidak | Tidak ada | Parameter ini diperlukan jika autentikasi diaktifkan. |
password | Kata sandi yang digunakan untuk menghubungkan ke MongoDB. | String | Tidak | Tidak ada | Parameter ini diperlukan jika autentikasi diaktifkan. Penting Untuk mencegah kebocoran kata sandi, kami menyarankan Anda menggunakan variabel untuk nilai kata sandi. Untuk informasi lebih lanjut, lihat Variabel proyek. |
database | Nama database MongoDB. | String | Tidak | Tidak ada |
Penting Anda tidak dapat memantau data di database admin, local, atau config. |
collection | Nama koleksi MongoDB. | String | Tidak | Tidak ada |
Penting Anda tidak dapat memantau data di koleksi sistem. |
connection.options | Parameter koneksi tambahan untuk MongoDB. | String | Tidak | Tidak ada | Parameter koneksi tambahan dalam format Penting Secara default, MongoDB CDC tidak secara otomatis menetapkan timeout koneksi socket. Hal ini dapat menyebabkan gangguan panjang selama fluktuasi jaringan. Kami menyarankan Anda selalu menetapkan socketTimeoutMS ke nilai yang wajar untuk menghindari masalah ini. |
Khusus tabel sumber
Parameter | Deskripsi | Tipe data | Wajib | Nilai default | Keterangan |
scan.startup.mode | Mode startup untuk sumber CDC MongoDB. | String | Tidak | initial | Nilai valid:
Untuk informasi lebih lanjut, lihat Properti Startup. |
scan.startup.timestamp-millis | Timestamp awal untuk konsumsi. | Long | Bergantung pada nilai scan.startup.mode
| Tidak ada | Nilainya adalah jumlah milidetik sejak epoch Linux. Parameter ini hanya berlaku untuk mode startup |
initial.snapshotting.queue.size | Batas ukuran antrian untuk Snapshot awal. | Integer | Tidak | 10240 | Parameter ini hanya berlaku ketika opsi |
batch.size | Ukuran batch untuk kursor. | Integer | Tidak | 1024 | Tidak ada. |
poll.max.batch.size | Jumlah maksimum dokumen perubahan yang diproses dalam satu batch. | Integer | Tidak | 1024 | Parameter ini mengontrol jumlah maksimum dokumen perubahan yang ditarik sekaligus selama pemrosesan aliran. Nilai yang lebih besar mengalokasikan buffer yang lebih besar di dalam konektor. |
poll.await.time.ms | Interval waktu antara dua penarikan data. | Integer | Tidak | 1000 | Unitnya adalah milidetik. |
heartbeat.interval.ms | Interval untuk mengirim paket heartbeat. | Integer | Tidak | 0 | Unitnya adalah milidetik. Konektor CDC MongoDB secara aktif mengirim paket heartbeat ke database untuk memastikan status backtrace tetap mutakhir. Nilai 0 menunjukkan bahwa tidak ada paket heartbeat yang dikirim. Penting Untuk koleksi yang jarang diperbarui, kami sangat menyarankan Anda mengatur opsi ini. |
scan.incremental.snapshot.enabled | Menentukan apakah akan mengaktifkan mode paralel untuk Snapshot awal. | Boolean | Tidak | false | Ini adalah fitur eksperimental. |
scan.incremental.snapshot.chunk.size.mb | Ukuran chunk untuk membaca Snapshot dalam mode paralel. | Integer | Tidak | 64 | Ini adalah fitur eksperimental. Unitnya adalah MB. Parameter ini hanya berlaku ketika Snapshot paralel diaktifkan. |
scan.full-changelog | Menghasilkan aliran acara changelog lengkap. | Boolean | Tidak | false | Ini adalah fitur eksperimental. Catatan Database MongoDB harus versi 6.0 atau lebih baru, dan fitur pra-gambar dan post-image harus diaktifkan. Untuk informasi selengkapnya tentang cara mengaktifkan fitur ini, lihat Document Preimages. |
scan.flatten-nested-columns.enabled | Menentukan apakah akan mengurai nama field yang dipisahkan titik ( | Boolean | Tidak | false | Jika parameter ini diaktifkan, dalam contoh dokumen BSON berikut, nama field Catatan Parameter ini hanya didukung di VVR 8.0.5 dan yang lebih baru. |
scan.primitive-as-string | Menentukan apakah akan mengurai semua tipe primitif dalam dokumen BSON sebagai string. | Boolean | Tidak | false | Catatan Parameter ini hanya didukung di VVR 8.0.5 dan yang lebih baru. |
scan.ignore-delete.enabled | Menentukan apakah akan mengabaikan pesan hapus (-D). | Boolean | Tidak | false | Saat mengarsipkan data dari sumber MongoDB, banyak event DELETE dapat dihasilkan di OpLog. Jika Anda tidak ingin menyinkronkan event-event ini ke sistem downstream, Anda dapat mengaktifkan parameter ini untuk mengabaikan event hapus. Catatan
|
scan.incremental.snapshot.backfill.skip | Menentukan apakah akan melewatkan proses backfill dari algoritma Snapshot inkremental. | Boolean | Tidak | false | Mengaktifkan sakelar ini hanya menyediakan semantik at-least-once. Catatan Parameter ini hanya didukung di VVR 11.1 dan yang lebih baru. |
initial.snapshotting.pipeline | Operasi pipeline MongoDB. Selama fase pembacaan Snapshot, operasi ini didorong ke MongoDB untuk memfilter hanya data yang diperlukan, sehingga meningkatkan efisiensi pembacaan. | String | Tidak | Tidak ada. |
|
initial.snapshotting.max.threads | Jumlah thread yang digunakan untuk replikasi data. | Integer | Tidak | Tidak ada. | Parameter ini hanya berlaku ketika opsi scan.startup.mode diatur ke initial. Catatan Parameter ini hanya didukung di VVR 11.1 dan yang lebih baru. |
initial.snapshotting.queue.size | Ukuran antrian untuk snapshot awal. | Integer | Tidak | 16000 | Parameter ini hanya berlaku ketika opsi scan.startup.mode diatur ke initial. Catatan Parameter ini hanya didukung di VVR 11.1 dan yang lebih baru. |
scan.change-stream.reading.parallelism | Tingkat paralelisme untuk langganan Change Stream. | Integer | Tidak | 1 | Parameter ini hanya berlaku ketika parameter scan.incremental.snapshot.enabled diaktifkan. Penting Untuk berlangganan Change Streams dengan beberapa thread konkuren, Anda juga harus mengatur parameter heartbeat.interval.ms. Catatan Parameter ini hanya didukung di VVR 11.2 dan yang lebih baru. |
scan.change-stream.reading.queue-size | Ukuran antrian pesan untuk langganan Change Stream konkuren. | Integer | Tidak | 16384 | Parameter ini hanya berlaku ketika parameter scan.change-stream.reading.parallelism diaktifkan. Catatan Parameter ini hanya didukung di VVR 11.2 dan yang lebih baru. |
Khusus tabel dimensi
Parameter | Deskripsi | Tipe data | Wajib | Nilai default | Keterangan |
lookup.cache | Kebijakan cache. | String | Tidak | NONE | Dua kebijakan cache berikut didukung:
|
lookup.max-retries | Jumlah maksimum percobaan ulang jika kueri database gagal. | Integer | Tidak | 3 | Tidak ada. |
lookup.retry.interval | Interval waktu antara percobaan ulang jika kueri database gagal. | Durasi | Tidak | 1s | Tidak ada. |
lookup.partial-cache.expire-after-access | Waktu maksimum catatan dapat tetap berada di cache setelah terakhir diakses. | Durasi | Tidak | Tidak ada | Unit waktu yang didukung meliputi ms, s, min, h, dan d. Saat menggunakan parameter ini, |
lookup.partial-cache.expire-after-write | Waktu maksimum catatan dapat tetap berada di cache setelah ditulis. | Durasi | Tidak | Tidak ada | Saat menggunakan parameter ini, |
lookup.partial-cache.max-rows | Jumlah maksimum baris di cache. Saat nilai ini terlampaui, baris terlama akan kedaluwarsa. | Long | Tidak | Tidak ada | Saat menggunakan parameter ini, |
lookup.partial-cache.cache-missing-key | Menentukan apakah akan menyimpan catatan kosong di cache saat tidak ditemukan data di tabel fisik. | Boolean | Tidak | True | Saat menggunakan parameter ini, |
Khusus tabel sink
Parameter | Deskripsi | Tipe data | Wajib | Nilai default | Keterangan |
sink.buffer-flush.max-rows | Jumlah maksimum catatan untuk setiap penulisan batch. | Integer | Tidak | 1000 | Tidak ada. |
sink.buffer-flush.interval | Interval flush untuk menulis data. | Durasi | Tidak | 1s | Tidak ada. |
sink.delivery-guarantee | Jaminan pengiriman untuk menulis data. | String | Tidak | at-least-once | Nilai valid:
Catatan exactly-once tidak didukung. |
sink.max-retries | Jumlah maksimum percobaan ulang jika penulisan database gagal. | Integer | Tidak | 3 | Tidak ada. |
sink.retry.interval | Interval waktu antara percobaan ulang jika penulisan database gagal. | Durasi | Tidak | 1s | Tidak ada. |
sink.parallelism | Tingkat paralelisme khusus untuk sink. | Integer | Tidak | Kosong | Tidak ada. |
Pemetaan tipe
Tabel sumber CDC
Tipe BSON | Tipe Flink SQL |
Int32 | INT |
Int64 | BIGINT |
Double | DOUBLE |
Decimal128 | DECIMAL(p, s) |
Boolean | BOOLEAN |
Date Timestamp | DATE |
Date Timestamp | TIME |
DateTime | TIMESTAMP(3) TIMESTAMP_LTZ(3) |
Timestamp | TIMESTAMP(0) TIMESTAMP_LTZ(0) |
String ObjectId UUID Symbol MD5 JavaScript Regex | STRING |
Binary | BYTES |
Object | ROW |
Array | ARRAY |
DBPointer | ROW<$ref STRING, $id STRING> |
GeoJSON | Point: ROW<type STRING, coordinates ARRAY<DOUBLE>> Line: ROW<type STRING, coordinates ARRAY<ARRAY< DOUBLE>>> |
Tabel dimensi dan sink
Tipe BSON | Tipe Flink SQL |
Int32 | INT |
Int64 | BIGINT |
Double | DOUBLE |
Decimal128 | DECIMAL |
Boolean | BOOLEAN |
DateTime | TIMESTAMP_LTZ(3) |
Timestamp | TIMESTAMP_LTZ(0) |
String ObjectId | STRING |
Binary | BYTES |
Object | ROW |
Array | ARRAY |
Contoh
Tabel sumber CDC
CREATE TEMPORARY TABLE mongo_source (
`_id` STRING, --harus dideklarasikan
name STRING,
weight DECIMAL,
tags ARRAY<STRING>,
price ROW<amount DECIMAL, currency STRING>,
suppliers ARRAY<ROW<name STRING, address STRING>>,
db_name STRING METADATA FROM 'database_name' VIRTUAL,
collection_name STRING METADATA VIRTUAL,
op_ts TIMESTAMP_LTZ(3) METADATA VIRTUAL,
PRIMARY KEY(_id) NOT ENFORCED
) WITH (
'connector' = 'mongodb',
'hosts' = 'dds-bp169b982fc25****.mongodb.rds.aliyuncs.com:3717,dds-bp169b982fc25****.mongodb.rds.aliyuncs.com:3717,',
'username' = 'root',
'password' = '${secret_values.password}',
'database' = 'flinktest',
'collection' = 'flinkcollection',
'scan.incremental.snapshot.enabled' = 'true',
'scan.full-changelog' = 'true'
);
CREATE TEMPORARY TABLE productssink (
name STRING,
weight DECIMAL,
tags ARRAY<STRING>,
price_amount DECIMAL,
suppliers_name STRING,
db_name STRING,
collection_name STRING,
op_ts TIMESTAMP_LTZ(3)
) WITH (
'connector' = 'print',
'logger' = 'true'
);
INSERT INTO productssink
SELECT
name,
weight,
tags,
price.amount,
suppliers[1].name,
db_name,
collection_name,
op_ts
FROM
mongo_source;Tabel dimensi
CREATE TEMPORARY TABLE datagen_source (
id STRING,
a int,
b BIGINT,
`proctime` AS PROCTIME()
) WITH (
'connector' = 'datagen'
);
CREATE TEMPORARY TABLE mongo_dim (
`_id` STRING,
name STRING,
weight DECIMAL,
tags ARRAY<STRING>,
price ROW<amount DECIMAL, currency STRING>,
suppliers ARRAY<ROW<name STRING, address STRING>>,
PRIMARY KEY(_id) NOT ENFORCED
) WITH (
'connector' = 'mongodb',
'hosts' = 'dds-bp169b982fc25****.mongodb.rds.aliyuncs.com:3717,dds-bp169b982fc25****.mongodb.rds.aliyuncs.com:3717,',
'username' = 'root',
'password' = '${secret_values.password}',
'database' = 'flinktest',
'collection' = 'flinkcollection',
'lookup.cache' = 'PARTIAL',
'lookup.partial-cache.expire-after-access' = '10min',
'lookup.partial-cache.expire-after-write' = '10min',
'lookup.partial-cache.max-rows' = '100'
);
CREATE TEMPORARY TABLE print_sink (
name STRING,
weight DECIMAL,
tags ARRAY<STRING>,
price_amount DECIMAL,
suppliers_name STRING
) WITH (
'connector' = 'print',
'logger' = 'true'
);
INSERT INTO print_sink
SELECT
T.id,
T.a,
T.b,
H.name
FROM
datagen_source AS T JOIN mongo_dim FOR SYSTEM_TIME AS OF T.`proctime` AS H ON T.id = H._id;Tabel sink
CREATE TEMPORARY TABLE datagen_source (
`_id` STRING,
name STRING,
weight DECIMAL,
tags ARRAY<STRING>,
price ROW<amount DECIMAL, currency STRING>,
suppliers ARRAY<ROW<name STRING, address STRING>>
) WITH (
'connector' = 'datagen'
);
CREATE TEMPORARY TABLE mongo_sink (
`_id` STRING,
name STRING,
weight DECIMAL,
tags ARRAY<STRING>,
price ROW<amount DECIMAL, currency STRING>,
suppliers ARRAY<ROW<name STRING, address STRING>>,
PRIMARY KEY(_id) NOT ENFORCED
) WITH (
'connector' = 'mongodb',
'hosts' = 'dds-bp169b982fc25****.mongodb.rds.aliyuncs.com:3717,dds-bp169b982fc25****.mongodb.rds.aliyuncs.com:3717,',
'username' = 'root',
'password' = '${secret_values.password}',
'database' = 'flinktest',
'collection' = 'flinkcollection'
);
INSERT INTO mongo_sink
SELECT * FROM datagen_source;Penyerapan data
Konektor MongoDB dapat digunakan sebagai sumber data dalam pekerjaan YAML ingesti data.
Batasan
Hanya didukung di Ververica Runtime (VVR) 11.1 dan versi lebih baru.
Sintaksis
source:
type: mongodb
name: MongoDB Source
hosts: localhost:33076
username: ${mongo.username}
password: ${mongo.password}
database: foo_db
collection: foo_col_.*
sink:
type: ...Parameter
Parameter | Deskripsi | Wajib | Tipe data | Nilai default | Keterangan |
type | Tipe sumber data. | Ya | STRING | None | Diperbaiki pada MongoDB. |
scheme | Protokol untuk menghubungkan ke server MongoDB. | Tidak | STRING | mongodb | Nilai yang valid:
|
hosts | Alamat server MongoDB yang akan dihubungkan. | Ya | STRING | None | Anda dapat menentukan beberapa alamat yang dipisahkan koma (,). |
username | Username untuk menghubungkan ke MongoDB. | Tidak | STRING | None | None. |
password | Password untuk menghubungkan ke MongoDB. | Tidak | STRING | None | None. |
database | Nama database MongoDB yang akan ditangkap. | Ya | STRING | None | Mendukung ekspresi reguler. |
collection | Nama koleksi MongoDB yang akan ditangkap. | Ya | STRING | None | Mendukung ekspresi reguler. Anda harus mencocokkan namespace lengkap |
connection.options | Opsi koneksi tambahan yang akan ditambahkan saat menghubungkan ke server MongoDB. | Tidak | STRING | None | Pasangan key-value dalam format |
schema.inference.strategy | Strategi untuk inferensi tipe Dokumen. Nilai yang valid adalah | Tidak | STRING |
| Jika diatur ke Jika diatur ke |
scan.max.pre.fetch.records | Jumlah maksimum catatan yang diambil sampel dari setiap koleksi yang ditangkap selama inisialisasi dan inferensi tipe. | Tidak | INT | 50 | None. |
scan.startup.mode | Menentukan mode startup untuk sumber data MongoDB. Nilai yang valid adalah | Tidak | STRING | initial | Nilai yang valid:
|
scan.startup.timestamp-millis | Menangkap data perubahan dari timestamp tertentu saat mode startup diatur ke | Tidak | LONG | None | None. |
chunk-meta.group.size | Menetapkan batas ukuran chunk metadata. | Tidak | INT | 1000 | None. |
scan.incremental.close-idle-reader.enabled | Menentukan apakah akan menutup reader sumber yang idle setelah beralih ke mode inkremental. | Tidak | BOOLEAN | false | None. |
scan.incremental.snapshot.backfill.skip | Menentukan apakah akan melewati proses backfill dari algoritma Snapshot inkremental. | Tidak | BOOLEAN | false | Jika konektor sink yang Anda gunakan dapat secara otomatis menghapus duplikat berdasarkan primary key, mengaktifkan sakelar ini dapat mengurangi waktu yang diperlukan untuk transisi dari lengkap ke inkremental. |
scan.incremental.snapshot.unbounded-chunk-first.enabled | Menentukan apakah akan membaca chunk tak terbatas terlebih dahulu saat menjalankan algoritma Snapshot inkremental. | Tidak | BOOLEAN | false | Jika koleksi yang Anda snapshot sering diperbarui, mengaktifkan fitur ini dapat mengurangi kemungkinan error kehabisan memori saat membaca chunk tak terbatas. |
batch.size | Ukuran batch kursor untuk membaca data MongoDB. | Tidak | INT | 1024 | None. |
poll.max.batch.size | Jumlah maksimum entri per permintaan saat menarik Change Stream. | Tidak | INT | 1024 | None. |
poll.await.time.ms | Waktu tunggu minimum antara dua permintaan saat menarik Change Stream. | Tidak | INT | 1000 | Unitnya adalah milidetik. |
heartbeat.interval.ms | Interval pengiriman paket heartbeat. | Tidak | INT | 0 | Unitnya adalah milidetik. Konektor CDC MongoDB secara aktif mengirim paket heartbeat ke database untuk memastikan status backtrace tetap mutakhir. Nilai 0 menunjukkan bahwa tidak ada paket heartbeat yang dikirim. Catatan Untuk koleksi yang jarang diperbarui, kami sangat menyarankan Anda mengatur opsi ini. |
scan.incremental.snapshot.chunk.size.mb | Ukuran chunk selama fase Snapshot. | Tidak | INT | 64 | Unitnya adalah MB. |
scan.incremental.snapshot.chunk.samples | Jumlah sampel yang diambil saat menentukan ukuran koleksi selama fase Snapshot. | Tidak | INT | 20 | None. |
scan.full-changelog | Menentukan apakah akan menghasilkan aliran event changelog lengkap berdasarkan catatan Mongo Pre- dan Post-Image. | Tidak | BOOLEAN | false | Database MongoDB harus versi 6.0 atau lebih baru, dan fitur pra-gambar dan post-image harus diaktifkan. Untuk informasi selengkapnya tentang cara mengaktifkan fitur ini, lihat Document Preimages. |
scan.cursor.no-timeout | Menentukan apakah akan mengatur kursor untuk membaca data agar tidak pernah kedaluwarsa. | Tidak | BOOLEAN | false | Server MongoDB biasanya menutup kursor setelah idle selama periode waktu tertentu (10 menit) untuk mencegah penggunaan memori berlebihan. Mengatur opsi ini ke true mencegah hal tersebut terjadi. |
scan.ignore-delete.enabled | Menentukan apakah akan mengabaikan catatan event hapus dari sumber MongoDB. | Tidak | BOOLEAN | false | None. |
scan.flatten.nested-documents.enabled | Menentukan apakah akan meratakan struktur bersarang dalam dokumen BSON. | Tidak | BOOLEAN | false | Saat opsi ini diaktifkan, skema seperti |
scan.all.primitives.as-string.enabled | Menentukan apakah akan menginferensi semua tipe primitif sebagai STRING. | Tidak | BOOLEAN | false | Mengaktifkan opsi ini dapat mencegah banyak event evolusi skema saat data input bercampur. |
Pemetaan tipe
Tipe BSON | Tipe CDC | Catatan |
STRING | VARCHAR | None. |
INT32 | INT | |
INT64 | BIGINT | |
DECIMAL128 | DECIMAL | |
DOUBLE | DOUBLE | |
BOOLEAN | BOOLEAN | |
TIMESTAMP | TIMESTAMP | |
DATETIME | LOCALZONEDTIMESTAMP | |
BINARY | VARBINARY | |
DOCUMENT | MAP | Parameter tipe kunci dan nilai perlu diinferensi. |
ARRAY | ARRAY | Parameter tipe elemen perlu diinferensi. |
OBJECTID | VARCHAR | Direpresentasikan sebagai string heksadesimal. |
SYMBOL REGULAREXPRESSION JAVASCRIPT JAVASCRIPTWITHSCOPE | VARCHAR | Direpresentasikan sebagai string. |
Metadata
Konektor SQL
Tabel sumber SQL CDC MongoDB mendukung kolom metadata. Anda dapat menggunakan kolom-kolom ini untuk mengakses metadata berikut.
Kunci metadata | Tipe metadata | Deskripsi |
database_name | STRING NOT NULL | Nama database yang berisi dokumen. |
collection_name | STRING NOT NULL | Nama koleksi yang berisi dokumen. |
op_ts | TIMESTAMP_LTZ(3) NOT NULL | Waktu dokumen diubah di database. Jika dokumen berasal dari data historis tabel dan bukan dari Change Stream, nilai ini selalu 0. |
row_kind | STRING NOT NULL | Menunjukkan jenis perubahan data. Nilai yang valid:
Catatan Hanya didukung di VVR 11.1 dan yang lebih baru. |
YAML Ingesti Data
Konektor YAML ingesti data CDC MongoDB mendukung pembacaan kolom metadata berikut:
Kunci metadata | Tipe metadata | Deskripsi |
ts_ms | BIGINT NOT NULL | Waktu dokumen diubah di database. Jika dokumen berasal dari data historis tabel dan bukan dari Change Stream, nilai ini selalu 0. |
Selain itu, Anda dapat menggunakan kolom metadata umum yang disediakan oleh modul Transform untuk mengakses informasi nama database, nama koleksi, dan row_kind.
Tentang fitur perekaman pra-gambar dan post-image MongoDB
Secara default, versi MongoDB sebelum 6.0 tidak menyediakan pra-gambar dokumen yang diubah atau data dokumen yang dihapus. Jika fitur perekaman pra-gambar dan post-image tidak diaktifkan, hanya semantik upsert yang dapat dicapai karena entri data update-before tidak tersedia. Namun, banyak operator berguna di Flink bergantung pada aliran perubahan lengkap yang terdiri dari event insert, update-before, update-after, dan delete.
Untuk melengkapi event update-before yang hilang, Flink SQL Planner secara otomatis menghasilkan node ChangelogNormalize untuk sumber data upsert. Node ini menyimpan cache versi terkini semua dokumen di state Flink. Saat dokumen yang diperbarui atau dihapus ditemui, node tersebut dapat mengkueri statenya untuk mengambil pra-gambar. Namun, node operator ini memerlukan jumlah data state yang besar.

MongoDB 6.0 mendukung fitur perekaman pra-gambar dan post-image. Untuk informasi selengkapnya, lihat Gunakan Change Streams MongoDB untuk menangkap perubahan data real-time. Saat fitur ini diaktifkan, MongoDB mencatat status lengkap dokumen sebelum dan sesudah setiap perubahan di koleksi khusus. Jika Anda mengaktifkan parameter scan.full-changelog dalam pekerjaan Anda, konektor CDC MongoDB menghasilkan catatan update-before dari catatan dokumen perubahan. Hal ini menghasilkan aliran event lengkap dan menghilangkan ketergantungan pada node ChangelogNormalize.
Mongo CDC DataStream API
Saat membaca dan menulis data menggunakan API DataStream, Anda harus menggunakan konektor DataStream yang sesuai untuk terhubung ke Flink. Untuk informasi selengkapnya, lihat Gunakan konektor DataStream.
Untuk menggunakan MongoDBSource, buat program API DataStream. Kode berikut memberikan contoh:
Java
MongoDBSource.builder()
.hosts("mongo.example.com:27017")
.username("mongouser")
.password("mongopasswd")
.databaseList("testdb")
.collectionList("testcoll")
.startupOptions(StartupOptions.initial())
.deserializer(new JsonDebeziumDeserializationSchema())
.build();XML
Konektor MongoDB VVR tersedia di Maven Central Repository untuk digunakan langsung dalam pengembangan pekerjaan Anda.
<dependency>
<groupId>com.alibaba.ververica</groupId>
<artifactId>flink-connector-mongodb</artifactId>
<version>${vvr.version}</version>
</dependency>Saat menggunakan API DataStream, jika Anda ingin mengaktifkan fitur Snapshot inkremental, gunakan MongoDBSource#builder() dari paket com.ververica.cdc.connectors.mongodb.source untuk membuat MongoDBSource. Jika tidak, gunakan MongoDBSource#builder() dari paket com.ververica.cdc.connectors.mongodb.
Saat membuat MongoDBSource, Anda dapat mengonfigurasi parameter berikut:
Parameter | Deskripsi |
hosts | Hostname database MongoDB yang akan dihubungkan. |
username | Nama pengguna untuk layanan database MongoDB. Catatan Jika autentikasi tidak diaktifkan di server MongoDB, Anda tidak perlu mengonfigurasi parameter ini. |
password | Kata sandi untuk layanan database MongoDB. Catatan Jika autentikasi tidak diaktifkan di server MongoDB, Anda tidak perlu mengonfigurasi parameter ini. |
databaseList | Nama database MongoDB yang akan dipantau. Catatan Nama database mendukung ekspresi reguler untuk membaca data dari beberapa database. Anda dapat menggunakan |
collectionList | Nama koleksi MongoDB yang akan dipantau. Catatan Nama koleksi mendukung ekspresi reguler untuk membaca data dari beberapa koleksi. Anda dapat menggunakan |
startupOptions | Mode startup untuk CDC MongoDB. Nilai valid:
Untuk informasi lebih lanjut, lihat Properti Startup. |
deserializer | Deserializer untuk mendeserialisasi catatan bertipe SourceRecord ke tipe tertentu. Nilai yang valid:
|