Topik ini menjelaskan cara menggunakan konektor MongoDB.
Informasi latar belakang
MongoDB adalah database berorientasi dokumen yang menyimpan data tidak terstruktur serta menyederhanakan pengembangan dan penskalaan aplikasi. Konektor MongoDB mendukung kemampuan berikut:
Kategori | Deskripsi |
Tipe yang didukung | Tabel sumber, dimensi, sink, dan ingesti data |
Mode eksekusi | Hanya mode streaming |
Metrik pemantauan | |
Tipe API | DataStream, SQL, dan YAML ingesti data |
Dukungan untuk memperbarui atau menghapus data tabel sink | Ya |
Fitur
Tabel sumber Change Data Capture (CDC) MongoDB menangkap data lengkap dan inkremental menggunakan API Change Stream. Fitur ini pertama-tama membaca data historis dalam snapshot lengkap, lalu secara mulus beralih ke pembacaan data inkremental dari oplog. Proses ini menyediakan semantik tepat-sekali, yang memastikan bahwa tidak ada catatan duplikat atau yang terlewat serta menjamin konsistensi data selama pemulihan kesalahan.
Berdasarkan API Change Stream
Konektor menggunakan API Change Stream yang diperkenalkan di MongoDB 3.6 untuk secara efisien menangkap event insert, update, replace, dan delete dari database dan koleksi. Event-event tersebut dikonversi menjadi aliran changelog yang dapat diproses oleh Flink.
Penangkapan data lengkap dan inkremental yang mulus
Konektor secara otomatis membaca snapshot awal, lalu beralih ke mode inkremental tanpa intervensi manual.
Parallel Snapshot Reads
Konektor membaca data historis secara paralel untuk meningkatkan performa. Fitur ini memerlukan MongoDB 4.0 atau versi yang lebih baru.
Beberapa mode startup
initial: Melakukan snapshot lengkap saat pekerjaan pertama kali dijalankan, lalu membaca oplog untuk perubahan inkremental.latest-offset: Mulai membaca dari posisi terbaru oplog dan tidak membaca data historis.timestamp: Membaca event oplog mulai dari timestamp tertentu, melewati snapshot. Fitur ini memerlukan MongoDB 4.0 atau versi yang lebih baru.
Dukungan Changelog lengkap
Menghasilkan event changelog lengkap yang mencakup status data sebelum dan sesudah perubahan. Fitur ini memerlukan MongoDB 6.0 atau versi yang lebih baru serta fitur perekaman preimage dan postimage.
Peningkatan integrasi Flink
VVR 8.0.6 dan versi yang lebih baru
Anda dapat menyinkronkan data dan perubahan skema MongoDB ke tabel downstream menggunakan pernyataan CREATE TABLE AS (CTAS) atau CREATE DATABASE AS (CDAS). Fitur ini memerlukan fitur perekaman preimage dan postimage.
VVR 8.0.9 dan versi yang lebih baru
Kemampuan join tabel dimensi diperluas untuk mendukung pembacaan field bawaan
_idbertipe ObjectId.
Prasyarat
Persyaratan instans MongoDB
Konektor hanya mendukung Alibaba Cloud MongoDB (replica set atau kluster sharded) atau MongoDB self-managed versi 3.6 atau yang lebih baru.
Anda harus mengaktifkan fitur replica set untuk database MongoDB yang ingin dipantau. Untuk informasi selengkapnya, lihat Replication.
Ketergantungan fitur MongoDB
Untuk menggunakan fitur aliran event Full Changelog, Anda harus mengaktifkan fitur perekaman preimage dan postimage.
Jika autentikasi MongoDB diaktifkan, pengguna MongoDB Anda harus memiliki izin database berikut:
Persiapan jaringan dan lainnya untuk MongoDB
Daftar putih alamat IP telah dikonfigurasi untuk mengizinkan Flink mengakses MongoDB.
Anda telah membuat database dan tabel MongoDB tujuan.
Batasan
Tabel sumber CDC
Pembacaan paralel selama fase snapshot awal hanya didukung untuk MongoDB 4.0 dan versi yang lebih baru. Untuk mengaktifkan fitur ini, atur opsi konfigurasi
scan.incremental.snapshot.enabledketrue.Anda tidak dapat membaca data dari database admin, local, atau config, maupun dari koleksi sistem. Hal ini disebabkan oleh batasan langganan Change Streams MongoDB. Untuk informasi selengkapnya, lihat dokumentasi MongoDB.
Tabel sink
Pada Realtime Compute for Apache Flink yang menggunakan VVR versi sebelum 8.0.5, Anda hanya dapat memasukkan data ke tabel sink.
Pada Realtime Compute for Apache Flink yang menggunakan VVR 8.0.5 atau versi yang lebih baru, Anda dapat memasukkan, memperbarui, dan menghapus data jika mendeklarasikan primary key dalam pernyataan DDL yang digunakan untuk membuat tabel sink. Jika tidak ada primary key yang dideklarasikan, Anda hanya dapat memasukkan data.
Tabel dimensi
Tabel dimensi MongoDB didukung pada Realtime Compute for Apache Flink yang menggunakan VVR 8.0.5 atau versi yang lebih baru.
SQL
Sintaksis
CREATE TABLE tableName(
_id STRING,
[columnName dataType,]*
PRIMARY KEY(_id) NOT ENFORCED
) WITH (
'connector' = 'mongodb',
'hosts' = 'localhost:27017',
'username' = 'mongouser',
'password' = '${secret_values.password}',
'database' = 'testdb',
'collection' = 'testcoll'
)Saat membuat tabel sumber CDC, Anda harus mendeklarasikan kolom _id STRING dan menentukannya sebagai primary key.
Parameter WITH
Umum
Parameter | Deskripsi | Tipe | Wajib? | Nilai default | Keterangan |
connector | Nama konektor. | String | Ya | Tidak ada |
|
uri | Uniform Resource Identifier (URI) yang digunakan untuk menghubungkan ke database MongoDB. | String | Tidak | Tidak ada nilai default | Catatan Anda harus menentukan salah satu opsi |
hosts | Hostname server database MongoDB. | String | Tidak | Tidak ada | Pisahkan beberapa hostname dengan koma ( |
scheme | Protokol koneksi yang digunakan untuk mengakses database MongoDB. | String | Tidak | mongodb | Nilai yang valid:
|
username | Username yang digunakan untuk menghubungkan ke MongoDB. | String | Tidak | Tidak ada nilai default | Parameter ini wajib jika fitur verifikasi identitas diaktifkan. |
password | Password yang digunakan untuk menghubungkan ke MongoDB. | String | Tidak | Tidak ada nilai default | Anda harus mengonfigurasi parameter ini jika mengaktifkan fitur verifikasi identitas. Penting Untuk mencegah kebocoran password, gunakan variabel daripada hardcoding kredensial. |
database | Nama database MongoDB. | String | Tidak | Tidak ada nilai default |
Penting Anda tidak dapat memantau data di database admin, local, dan config. |
collection | Nama koleksi MongoDB. | String | Tidak | Tidak ada nilai default |
Penting Anda tidak dapat memantau data di koleksi sistem. |
connection.options | Opsi koneksi tambahan untuk MongoDB. | String | Tidak | Tidak ada nilai default | Tentukan opsi tambahan sebagai pasangan kunci-nilai ( Penting Secara default, MongoDB CDC tidak mengatur timeout koneksi soket. Hal ini dapat menyebabkan gangguan lama selama fluktuasi jaringan. Kami merekomendasikan menyetel socketTimeoutMS ke nilai yang wajar di sini untuk menghindari masalah ini. |
Khusus sumber
Parameter | Deskripsi | Tipe | Wajib? | Nilai default | Keterangan |
scan.startup.mode | Mode startup konektor CDC MongoDB. | String | Tidak | initial | Nilai yang valid:
Untuk informasi selengkapnya, lihat Startup Properties. |
scan.startup.timestamp-millis | Timestamp awal untuk konsumsi pada offset yang ditentukan. | Long | Bergantung pada nilai scan.startup.mode:
| Tidak ada | Nilainya adalah jumlah milidetik sejak epoch UNIX (00:00:00 UTC pada 1 Januari 1970). Hanya berlaku untuk mode startup |
initial.snapshotting.queue.size | Ukuran antrian maksimum untuk fase snapshot awal. | Integer | Tidak | 10240 | Hanya berlaku ketika |
batch.size | Ukuran pemrosesan batch kursor. | Integer | Tidak | 1024 | Tidak berlaku. |
poll.max.batch.size | Jumlah maksimum dokumen perubahan yang diproses dalam satu batch. | Integer | Tidak | 1024 | Opsi ini mengontrol jumlah maksimum dokumen perubahan yang ditarik sekaligus selama pemrosesan aliran. Nilai yang lebih besar mengalokasikan buffer internal yang lebih besar di konektor. |
poll.await.time.ms | Interval antara penarikan data. | Integer | Tidak | 1000 | Unit: milidetik. |
heartbeat.interval.ms | Interval pengiriman paket heartbeat. | Integer | Tidak | 0 | Unit: milidetik. Konektor CDC MongoDB mengirim paket heartbeat ke database untuk memastikan status backtracking terbaru. Mengatur nilai ini ke 0 akan menonaktifkan paket heartbeat. Penting Kami sangat merekomendasikan menyetel opsi ini untuk koleksi yang jarang diperbarui. |
scan.incremental.snapshot.enabled | Mengaktifkan pembacaan paralel selama fase snapshot awal. | Boolean | Tidak | false | Ini adalah fitur eksperimental. |
scan.incremental.snapshot.chunk.size.mb | Ukuran shard saat pembacaan snapshot paralel diaktifkan. | Integer | Tidak | 64 | Ini adalah fitur eksperimental. Unit: MB. Hanya berlaku ketika pembacaan snapshot paralel diaktifkan. |
scan.full-changelog | Menghasilkan aliran event changelog lengkap. | Boolean | Tidak | false | Ini adalah fitur eksperimental. Catatan Memerlukan MongoDB 6.0 atau versi yang lebih baru dengan fitur preimage dan postimage diaktifkan. Untuk petunjuknya, lihat Document Preimages. |
scan.flatten-nested-columns.enabled | Mengurai field yang dipisahkan oleh | Boolean | Tidak | false | Jika diaktifkan, field Catatan Hanya didukung di VVR 8.0.5 atau versi yang lebih baru. |
scan.primitive-as-string | Mengurai semua tipe primitif dalam dokumen BSON sebagai STRING. | Boolean | Tidak | false | Catatan Hanya didukung di VVR 8.0.5 atau versi yang lebih baru. |
scan.ignore-delete.enabled | Apakah akan mengabaikan pesan hapus (-D). | Boolean | Tidak | false | Saat mengarsipkan data sumber MongoDB, banyak event DELETE mungkin muncul di OpLog. Aktifkan opsi ini untuk mengabaikan event tersebut dan mencegahnya disinkronkan ke downstream. Catatan
|
scan.incremental.snapshot.backfill.skip | Melewati proses backfill watermark selama pembacaan snapshot inkremental. | Boolean | Tidak | false | Mengaktifkan opsi ini hanya menyediakan semantik at-least-once. Catatan Hanya didukung di VVR 11.1 atau versi yang lebih baru. |
initial.snapshotting.pipeline | Operasi pipeline MongoDB yang didorong ke MongoDB selama pembacaan snapshot untuk memfilter hanya data yang diperlukan dan meningkatkan efisiensi. | String | Tidak | Tidak ada. |
|
initial.snapshotting.max.threads | Jumlah thread yang digunakan untuk replikasi data. | Integer | Tidak | Tidak ada. | Hanya berlaku ketika scan.startup.mode diatur ke initial. Catatan Hanya didukung di VVR 11.1 atau versi yang lebih baru. |
initial.snapshotting.queue.size | Ukuran antrian untuk snapshot awal. | Integer | Tidak | 16000 | Hanya berlaku ketika scan.startup.mode diatur ke initial. Catatan Hanya didukung di VVR 11.1 atau versi yang lebih baru. |
scan.change-stream.reading.parallelism | Tingkat paralelisme untuk berlangganan Change Stream. | Integer | Tidak | 1 | Hanya berlaku ketika scan.incremental.snapshot.enabled diaktifkan. Penting Untuk berlangganan Change Stream dengan beberapa pembaca konkuren, atur juga heartbeat.interval.ms. Catatan Hanya didukung di VVR 11.2 atau versi yang lebih baru. |
scan.change-stream.reading.queue-size | Ukuran antrian pesan untuk langganan Change Stream konkuren. | Integer | Tidak | 16384 | Hanya berlaku ketika scan.change-stream.reading.parallelism diaktifkan. Catatan Hanya didukung di VVR 11.2 atau versi yang lebih baru. |
Khusus tabel dimensi
Parameter | Deskripsi | Tipe data | Wajib? | Nilai default | Keterangan |
lookup.cache | Kebijakan cache. | String | Tidak | NONE | Kebijakan yang didukung:
|
lookup.max-retries | Jumlah maksimum percobaan ulang yang diizinkan saat kueri ke database gagal. | Integer | Tidak | 3 | Tidak berlaku. |
lookup.retry.interval | Interval antara percobaan ulang saat kueri ke database gagal. | Duration | Tidak | 1s | Tidak berlaku. |
lookup.partial-cache.expire-after-access | Waktu maksimum catatan tetap dalam cache setelah diakses. | Duration | Tidak | Tidak ada | Unit yang didukung: ms, s, min, h, d. Memerlukan |
lookup.partial-cache.expire-after-write | Waktu maksimum catatan tetap dalam cache setelah ditulis. | Duration | Tidak | Tidak ada | Memerlukan |
lookup.partial-cache.max-rows | Jumlah maksimum baris yang di-cache. Saat melebihi batas, baris terlama akan kedaluwarsa. | Long | Tidak | Tidak ada | Memerlukan |
lookup.partial-cache.cache-missing-key | Menyimpan catatan kosong saat tidak ada data yang terkait dengan tabel fisik. | Boolean | Tidak | True | Memerlukan |
Khusus sink
Parameter | Deskripsi | Tipe | Wajib? | Nilai default | Keterangan |
sink.buffer-flush.max-rows | Jumlah maksimum catatan yang ditulis per batch. | Integer | Tidak | 1000 | Tidak berlaku. |
sink.buffer-flush.interval | Interval pengosongan data. | Duration | Tidak | 1s | Tidak berlaku. |
sink.delivery-guarantee | Jaminan semantik untuk penulisan data. | String | Tidak | at-least-once | Nilai yang valid:
Catatan Exactly-once tidak didukung. |
sink.max-retries | Jumlah maksimum percobaan ulang yang diizinkan saat penulisan data ke database gagal. | Integer | Tidak | 3 | Tidak berlaku. |
sink.retry.interval | Interval antara percobaan ulang saat penulisan data ke database gagal. | Duration | Tidak | 1s | Tidak berlaku. |
sink.parallelism | Tingkat paralelisme kustom untuk sink. | Integer | Tidak | kosong | Tidak berlaku. |
sink.delete-strategy | Menentukan cara menangani event data -D dan -U. | String | Tidak | CHANGELOG_STANDARD | Nilai yang valid:
|
Pemetaan tipe data
Tabel sumber CDC
Tipe BSON | Tipe Flink SQL |
Int32 | INT |
Int64 | BIGINT |
Double | DOUBLE |
Decimal128 | DECIMAL(p, s) |
Boolean | BOOLEAN |
Date Timestamp | DATE |
Date Timestamp | TIME |
DateTime | TIMESTAMP(3) TIMESTAMP_LTZ(3) |
Timestamp | TIMESTAMP(0) TIMESTAMP_LTZ(0) |
String ObjectId UUID Symbol MD5 JavaScript Regex | STRING |
Binary | BYTES |
Object | ROW |
Array | ARRAY |
DBPointer | ROW<$ref STRING, $id STRING> |
GeoJSON | Point: ROW<type STRING, coordinates ARRAY<DOUBLE>> Line: ROW<type STRING, coordinates ARRAY<ARRAY< DOUBLE>>> |
Tabel dimensi dan sink
Tipe BSON | Tipe Flink SQL |
Int32 | INT |
Int64 | BIGINT |
Double | DOUBLE |
Decimal128 | DECIMAL |
Boolean | BOOLEAN |
DateTime | TIMESTAMP_LTZ(3) |
Timestamp | TIMESTAMP_LTZ(0) |
String ObjectId | STRING |
Binary | BYTES |
Object | ROW |
Array | ARRAY |
Contoh Penggunaan
Tabel sumber CDC
CREATE TEMPORARY TABLE mongo_source (
`_id` STRING, --harus dideklarasikan
name STRING,
weight DECIMAL,
tags ARRAY<STRING>,
price ROW<amount DECIMAL, currency STRING>,
suppliers ARRAY<ROW<name STRING, address STRING>>,
db_name STRING METADATA FROM 'database_name' VIRTUAL,
collection_name STRING METADATA VIRTUAL,
op_ts TIMESTAMP_LTZ(3) METADATA VIRTUAL,
PRIMARY KEY(_id) NOT ENFORCED
) WITH (
'connector' = 'mongodb',
'hosts' = 'dds-bp169b982fc25****.mongodb.rds.aliyuncs.com:3717,dds-bp169b982fc25****.mongodb.rds.aliyuncs.com:3717,',
'username' = 'root',
'password' = '${secret_values.password}',
'database' = 'flinktest',
'collection' = 'flinkcollection',
'scan.incremental.snapshot.enabled' = 'true',
'scan.full-changelog' = 'true'
);
CREATE TEMPORARY TABLE productssink (
name STRING,
weight DECIMAL,
tags ARRAY<STRING>,
price_amount DECIMAL,
suppliers_name STRING,
db_name STRING,
collection_name STRING,
op_ts TIMESTAMP_LTZ(3)
) WITH (
'connector' = 'print',
'logger' = 'true'
);
INSERT INTO productssink
SELECT
name,
weight,
tags,
price.amount,
suppliers[1].name,
db_name,
collection_name,
op_ts
FROM
mongo_source;Tabel dimensi
CREATE TEMPORARY TABLE datagen_source (
id STRING,
a int,
b BIGINT,
`proctime` AS PROCTIME()
) WITH (
'connector' = 'datagen'
);
CREATE TEMPORARY TABLE mongo_dim (
`_id` STRING,
name STRING,
weight DECIMAL,
tags ARRAY<STRING>,
price ROW<amount DECIMAL, currency STRING>,
suppliers ARRAY<ROW<name STRING, address STRING>>,
PRIMARY KEY(_id) NOT ENFORCED
) WITH (
'connector' = 'mongodb',
'hosts' = 'dds-bp169b982fc25****.mongodb.rds.aliyuncs.com:3717,dds-bp169b982fc25****.mongodb.rds.aliyuncs.com:3717,',
'username' = 'root',
'password' = '${secret_values.password}',
'database' = 'flinktest',
'collection' = 'flinkcollection',
'lookup.cache' = 'PARTIAL',
'lookup.partial-cache.expire-after-access' = '10min',
'lookup.partial-cache.expire-after-write' = '10min',
'lookup.partial-cache.max-rows' = '100'
);
CREATE TEMPORARY TABLE print_sink (
name STRING,
weight DECIMAL,
tags ARRAY<STRING>,
price_amount DECIMAL,
suppliers_name STRING
) WITH (
'connector' = 'print',
'logger' = 'true'
);
INSERT INTO print_sink
SELECT
T.id,
T.a,
T.b,
H.name
FROM
datagen_source AS T JOIN mongo_dim FOR SYSTEM_TIME AS OF T.`proctime` AS H ON T.id = H._id;Tabel sink
CREATE TEMPORARY TABLE datagen_source (
`_id` STRING,
name STRING,
weight DECIMAL,
tags ARRAY<STRING>,
price ROW<amount DECIMAL, currency STRING>,
suppliers ARRAY<ROW<name STRING, address STRING>>
) WITH (
'connector' = 'datagen'
);
CREATE TEMPORARY TABLE mongo_sink (
`_id` STRING,
name STRING,
weight DECIMAL,
tags ARRAY<STRING>,
price ROW<amount DECIMAL, currency STRING>,
suppliers ARRAY<ROW<name STRING, address STRING>>,
PRIMARY KEY(_id) NOT ENFORCED
) WITH (
'connector' = 'mongodb',
'hosts' = 'dds-bp169b982fc25****.mongodb.rds.aliyuncs.com:3717,dds-bp169b982fc25****.mongodb.rds.aliyuncs.com:3717,',
'username' = 'root',
'password' = '${secret_values.password}',
'database' = 'flinktest',
'collection' = 'flinkcollection'
);
INSERT INTO mongo_sink
SELECT * FROM datagen_source;Ingesti Data (pratinjau publik)
Anda dapat menggunakan konektor MongoDB sebagai sumber ingesti data.
Batasan
Fitur ini hanya didukung di VVR 11.1 dan versi yang lebih baru.
Sintaksis
source:
type: mongodb
name: MongoDB Source
hosts: localhost:33076
username: ${mongo.username}
password: ${mongo.password}
database: foo_db
collection: foo_col_.*
sink:
type: ...Opsi konfigurasi
Parameter | Deskripsi | Wajib? | Tipe Data | Nilai default | Keterangan |
type | Tipe sumber data. | Ya | STRING | Tidak ada | Atur opsi ini ke mongodb. |
scheme | Protokol yang digunakan untuk menghubungkan ke server MongoDB. | Tidak | STRING | mongodb | Nilai yang valid:
|
hosts | Hostname server MongoDB. | Ya | STRING | Tidak ada nilai default | Pisahkan beberapa hostname dengan koma (,). |
username | Username yang digunakan untuk menghubungkan ke MongoDB. | Tidak | STRING | Tidak ada nilai default | Tidak berlaku. |
password | Password yang digunakan untuk menghubungkan ke MongoDB. | Tidak | STRING | Tidak ada | Tidak berlaku. |
database | Nama database MongoDB yang akan ditangkap. | Ya | STRING | Tidak ada nilai default | Ekspresi reguler didukung. |
collection | Nama koleksi MongoDB yang akan ditangkap. | Ya | STRING | Tidak ada nilai default | Ekspresi reguler didukung. Anda harus mencocokkan namespace lengkap |
connection.options | Opsi koneksi tambahan saat menghubungkan ke server MongoDB. | Tidak | STRING | Tidak ada | Tentukan pasangan kunci-nilai ( |
schema.inference.strategy | Strategi untuk inferensi tipe dokumen. Nilai yang valid: | Tidak | STRING |
| Saat diatur ke Saat diatur ke |
scan.max.pre.fetch.records | Jumlah maksimum catatan yang diambil sampelnya di setiap koleksi yang ditangkap selama inferensi skema awal. | Tidak | INT | 50 | Tidak berlaku. |
scan.startup.mode | Mode startup sumber data MongoDB. Nilai yang valid: | Tidak | STRING | initial | Nilai yang valid:
|
scan.startup.timestamp-millis | Saat mode startup diatur ke | Tidak | LONG | Tidak ada | Tidak berlaku. |
chunk-meta.group.size | Ukuran chunk metadata maksimum. | Tidak | INT | 1000 | Tidak berlaku. |
scan.incremental.close-idle-reader.enabled | Menentukan apakah akan menutup pembaca sumber yang tidak aktif setelah beralih ke pembacaan inkremental. | Tidak | BOOLEAN | false | Tidak berlaku. |
scan.incremental.snapshot.backfill.skip | Menentukan apakah akan melewati proses backfill watermark dalam algoritma snapshot inkremental. | Tidak | BOOLEAN | false | Jika konektor sink Anda mendukung deduplikasi primary-key otomatis, mengaktifkan sakelar ini mengurangi waktu yang dibutuhkan untuk beralih dari pembacaan snapshot ke pembacaan inkremental. |
scan.incremental.snapshot.unbounded-chunk-first.enabled | Menentukan apakah akan membaca chunk tak terbatas terlebih dahulu dalam kerangka kerja snapshot inkremental. | Tidak | BOOLEAN | false | Jika koleksi yang akan di-snapshot sering diperbarui, mengaktifkan fitur ini mengurangi risiko error kehabisan memori saat membaca chunk tak terbatas. |
batch.size | Ukuran batch untuk kursor saat membaca data dari MongoDB. | Tidak | INT | 1024 | Tidak berlaku. |
poll.max.batch.size | Jumlah maksimum entri yang diminta saat menarik aliran perubahan. | Tidak | INT | 1024 | Tidak berlaku. |
poll.await.time.ms | Waktu tunggu minimum antara dua permintaan saat menarik perubahan Change Stream. | Tidak | INT | 1000 | Unit: milidetik. |
heartbeat.interval.ms | Interval pengiriman paket heartbeat. | Tidak | INT | 0 | Unit: milidetik. Konektor CDC MongoDB mengirim paket heartbeat ke database MongoDB untuk memastikan status backtracking terbaru. Mengatur nilai ini ke 0 akan menonaktifkan paket heartbeat. Catatan Konfigurasikan opsi ini untuk koleksi yang jarang diperbarui. |
scan.incremental.snapshot.chunk.size.mb | Ukuran shard selama fase snapshot. | Tidak | INT | 64 | Unit: MB. |
scan.incremental.snapshot.chunk.samples | Jumlah sampel untuk menentukan ukuran koleksi selama snapshot. | Tidak | INT | 20 | Tidak berlaku. |
scan.full-changelog | Menentukan apakah akan menghasilkan aliran event changelog lengkap berdasarkan catatan preimage dan postimage MongoDB. | Tidak | BOOLEAN | false | Memerlukan MongoDB 6.0 atau versi yang lebih baru dengan fitur preimage dan postimage diaktifkan. Untuk petunjuknya, lihat Document Preimages. |
scan.cursor.no-timeout | Menentukan apakah akan menonaktifkan timeout kursor. | Tidak | BOOLEAN | false | Server MongoDB biasanya menutup kursor yang tidak aktif setelah 10 menit untuk mencegah masalah penggunaan memori. Menyetel opsi ini ke true mencegah perilaku tersebut. |
scan.ignore-delete.enabled | Menentukan apakah akan mengabaikan event hapus dari MongoDB. | Tidak | BOOLEAN | false | Tidak berlaku. |
scan.flatten.nested-documents.enabled | Apakah akan meratakan struktur bersarang dalam dokumen BSON. | Tidak | BOOLEAN | false | Saat diaktifkan, skema seperti |
scan.all.primitives.as-string.enabled | Menyimpulkan semua tipe primitif sebagai STRING. | Tidak | BOOLEAN | false | Mengaktifkan ini menghindari event perubahan skema yang sering terjadi saat tipe data upstream tidak konsisten. |
metadata.list | Daftar metadata yang diteruskan ke downstream. | Tidak | STRING | Tidak ada. | Pisahkan beberapa item metadata dengan koma. Metadata yang didukung:
|
Pemetaan tipe data
Tipe BSON | Tipe CDC | Catatan |
STRING | VARCHAR | Tidak berlaku. |
INT32 | INT | |
INT64 | BIGINT | |
DECIMAL128 | DECIMAL | |
DOUBLE | DOUBLE | |
BOOLEAN | BOOLEAN | |
TIMESTAMP | TIMESTAMP | |
DATETIME | LOCALZONEDTIMESTAMP | |
BINARY | VARBINARY | |
DOCUMENT | MAP | Tipe kunci dan nilai disimpulkan. |
ARRAY | ARRAY | Tipe elemen disimpulkan. |
OBJECTID | VARCHAR | Dinyatakan sebagai HexString. |
SYMBOL REGULAREXPRESSION JAVASCRIPT JAVASCRIPTWITHSCOPE | VARCHAR | Dinyatakan sebagai string. |
Metadata
Konektor SQL
Tabel sumber SQL CDC MongoDB mendukung sintaksis kolom metadata. Anda dapat mengakses kolom metadata berikut:
kunci metadata | Tipe metadata | Deskripsi |
database_name | STRING NOT NULL | Nama database yang berisi dokumen. |
collection_name | STRING NOT NULL | Nama koleksi yang berisi dokumen. |
op_ts | TIMESTAMP_LTZ(3) NOT NULL | Waktu dokumen berubah di database. Jika dokumen berasal dari data tabel historis dan bukan dari ChangeStream, nilai ini selalu 0. |
row_kind | STRING NOT NULL | Tipe event perubahan data. Nilai yang valid:
Catatan Hanya didukung di VVR 11.1 atau versi yang lebih baru. |
YAML Ingesti Data
Konektor YAML ingesti data CDC MongoDB mendukung kolom metadata berikut:
Kolom metadata | Tipe metadata | Deskripsi |
ts_ms | BIGINT NOT NULL | Waktu dokumen berubah di database. Jika dokumen berasal dari data tabel historis dan bukan dari ChangeStream, nilai ini selalu 0. |
Anda juga dapat menggunakan kolom metadata generik yang disediakan oleh modul Transform untuk mengakses informasi nama database, nama koleksi, dan row_kind.
Fitur preimage dan postimage
Secara default, versi MongoDB sebelum 6.0 tidak menyimpan dokumen sebelum perubahan atau dokumen yang dihapus. Tanpa fitur preimage dan postimage diaktifkan, MongoDB hanya mendukung semantik UPSERT, yang berarti event UPDATE_BEFORE tidak tersedia. Namun, banyak operator Flink yang berguna memerlukan aliran changelog lengkap yang mencakup event INSERT, UPDATE_BEFORE, UPDATE_AFTER, dan DELETE.
Untuk melengkapi event UPDATE_BEFORE yang hilang, perencana Flink SQL secara otomatis menghasilkan operator ChangelogNormalize untuk sumber data tipe UPSERT. Operator ini menyimpan snapshot versi terkini semua dokumen dalam data state deployment. Saat dokumen diperbarui atau dihapus, Anda dapat mengkueri data state di ChangelogNormalize untuk mendapatkan status sebelum pembaruan. Namun, hal ini memerlukan penyimpanan data state dalam jumlah besar.

MongoDB 6.0 mendukung fitur preimage dan postimage. Untuk informasi selengkapnya, lihat Gunakan Change Streams MongoDB untuk menangkap perubahan data secara real time. Saat Anda mengaktifkan fitur ini, MongoDB mencatat status lengkap dokumen sebelum dan sesudah setiap perubahan dalam koleksi khusus. Jika Anda mengaktifkan opsi scan.full-changelog dalam pekerjaan Anda, konektor CDC MongoDB menggunakan catatan dokumen perubahan ini untuk menghasilkan catatan UPDATE_BEFORE. Hal ini memungkinkan konektor menghasilkan aliran event lengkap dan menghilangkan ketergantungan pada operator ChangelogNormalize.
API DataStream CDC MongoDB
Saat membaca atau menulis data menggunakan DataStream, gunakan konektor DataStream yang sesuai untuk menghubungkan ke Flink. Untuk informasi selengkapnya tentang cara menyiapkan konektor DataStream, lihat Penggunaan konektor DataStream.
Anda dapat membuat program API DataStream dan menggunakan MongoDBSource. Kode berikut memberikan contoh:
Java
MongoDBSource.builder()
.hosts("mongo.example.com:27017")
.username("mongouser")
.password("mongopasswd")
.databaseList("testdb")
.collectionList("testcoll")
.startupOptions(StartupOptions.initial())
.deserializer(new JsonDebeziumDeserializationSchema())
.build();XML
Repositori Maven Central menyediakan konektor MongoDB VVR yang dapat Anda gunakan langsung dalam pengembangan pekerjaan.
<dependency>
<groupId>com.alibaba.ververica</groupId>
<artifactId>flink-connector-mongodb</artifactId>
<version>${vvr.version}</version>
</dependency>Saat menggunakan API DataStream, paket yang Anda gunakan untuk membangun sumber data MongoDBSource bergantung pada apakah Anda ingin mengaktifkan fitur snapshot inkremental. Untuk mengaktifkan fitur snapshot inkremental, gunakan MongoDBSource#builder() dari paket com.ververica.cdc.connectors.mongodb.source. Jika tidak, gunakan MongoDBSource#builder() dari com.ververica.cdc.connectors.mongodb.
Anda dapat mengonfigurasi parameter berikut saat membangun MongoDBSource:
Parameter | Deskripsi |
hosts | Hostname database MongoDB yang akan dihubungkan. |
username | Username untuk layanan database MongoDB. Catatan Jika autentikasi tidak diaktifkan di server MongoDB, Anda tidak perlu mengonfigurasi parameter ini. |
password | Password untuk layanan database MongoDB. Catatan Jika autentikasi tidak diaktifkan di server MongoDB, Anda tidak perlu mengonfigurasi parameter ini. |
databaseList | Nama database MongoDB yang akan dipantau. Catatan Nama database mendukung ekspresi reguler untuk membaca data dari beberapa database. Gunakan |
collectionList | Nama koleksi MongoDB yang akan dipantau. Catatan Nama koleksi mendukung ekspresi reguler untuk membaca data dari beberapa koleksi. Gunakan |
startupOptions | Pilih mode startup untuk CDC MongoDB. Nilai yang valid:
Untuk informasi selengkapnya, lihat Startup Properties. |
deserializer | Deserializer yang mendeserialisasi catatan tipe SourceRecord ke tipe tertentu. Nilai yang valid:
|