This topic describes how to use the MongoDB connector.
Background information
MongoDB is a document-oriented database designed for unstructured data. It simplifies application development and scaling. The MongoDB connector supports the following features:
Category | Details |
Supported types | Source table, dimension table, sink table, and data ingestion |
Operating modes | Only stream mode is supported. |
Specific metrics | |
API types | DataStream and SQL |
Can sink table data be updated or deleted? | Yes |
Features
The MongoDB Change Data Capture (CDC) source table uses the Change Stream API to capture both full and incremental data. It first reads all historical data (snapshot) and then seamlessly switches to reading incremental oplogs. This process ensures that no data is lost or duplicated. The connector also supports exactly-once semantics to guarantee data consistency during fault recovery.
Based on the Change Stream API
The connector uses the Change Stream API of MongoDB 3.6 to efficiently capture change events, such as inserts, updates, replacements, and deletions, from a database or collection. These events are then converted into a changelog stream that Flink can process.
Combined full and incremental data capture
The connector automatically reads the initial snapshot and smoothly transitions to incremental mode without manual intervention.
Parallel snapshot reading
The connector supports parallel reading of historical data to improve performance. This feature requires MongoDB 4.0 or later.
Multiple startup modes
initial: Starts by taking a full snapshot and then continuously reads the oplog.latest-offset: Starts reading only from the end of the current oplog without reading historical data.timestamp: Starts reading the oplog from a specified timestamp, skipping the snapshot. This mode requires MongoDB 4.0 or later.
Full changelog support
The connector supports outputting a complete changelog that includes both the before-image and after-image of the data. This feature requires MongoDB 6.0 or later with the before-image and after-image recording feature enabled.
Flink integration enhancements
VVR 8.0.6 or later
You can use CREATE TABLE AS (CTAS) or CREATE DATABASE AS (CDAS) statements to sync data and schema changes from MongoDB to downstream systems. You can also enable the before-image and after-image recording feature.
VVR 8.0.9 or later
The dimension table join capability is extended to support reading the
_idfield of the built-in ObjectId type.
Prerequisites
MongoDB instance requirements
Only Alibaba Cloud MongoDB (ReplicaSet or sharded cluster) or self-managed MongoDB 3.6 or later is supported.
You must enable the ReplicaSet feature for the MongoDB database that you want to monitor. For more information, see Replication.
MongoDB feature dependencies
To use the full changelog event stream feature, you must enable the before-image and after-image recording feature.
If authentication is enabled for MongoDB, you need the following database permissions:
Network and other preparations
You must configure an IP whitelist to allow Flink to access MongoDB.
The target MongoDB collections have been created.
Limits
CDC source table
MongoDB 4.0 or later supports parallel reading during the initial snapshot phase. To enable parallel mode for the initial snapshot, you can set the
scan.incremental.snapshot.enabledparameter to `true`.Due to the limitations of MongoDB Change Stream subscriptions, you cannot read data from the `admin`, `local`, or `config` databases or from `system` collections. For more information, see the MongoDB documentation.
Sink table
In Ververica Runtime (VVR) versions earlier than 8.0.5, only data insertion is supported.
In VVR 8.0.5 or later, if a primary key is declared in the sink table, data insertion, update, and deletion are supported. If no primary key is declared, only data insertion is supported.
Dimension table
VVR 8.0.5 or later supports the use of MongoDB dimension tables.
SQL
Syntax
CREATE TABLE tableName(
_id STRING,
[columnName dataType,]*
PRIMARY KEY(_id) NOT ENFORCED
) WITH (
'connector' = 'mongodb',
'hosts' = 'localhost:27017',
'username' = 'mongouser',
'password' = '${secret_values.password}',
'database' = 'testdb',
'collection' = 'testcoll'
)When you create a CDC source table, you must declare the _id STRING column and set it as the unique primary key.
WITH parameters
General parameters
Parameter | Description | Data type | Required | Default value | Notes |
connector | The name of the connector. | String | Yes | None |
|
uri | The connection URI for MongoDB. | String | No | None | Note You must specify either the |
hosts | The hostname of the MongoDB server. | String | No | None | You can provide multiple hostnames separated by commas ( |
scheme | The connection protocol used by MongoDB. | String | No | mongodb | Valid values:
|
username | The username used to connect to MongoDB. | String | No | None | This parameter is required if authentication is enabled. |
password | The password used to connect to MongoDB. | String | No | None | This parameter is required if authentication is enabled. Important To prevent password leaks, we recommend that you use a variable for the password value. For more information, see Project variables. |
database | The name of the MongoDB database. | String | No | None |
Important You cannot monitor data in the `admin`, `local`, or `config` databases. |
collection | The name of the MongoDB collection. | String | No | None |
Important You cannot monitor data in `system` collections. |
connection.options | The connection parameters on the MongoDB side. | String | No | None | Additional connection parameters in the Important By default, the MongoDB CDC connector does not automatically set a socket connection timeout. This can cause long interruptions during network jitter. We recommend that you always set `socketTimeoutMS` to a reasonable value to avoid this issue. |
Source table-specific parameters
Parameter | Description | Data type | Required | Default value | Notes |
scan.startup.mode | The startup mode for the MongoDB CDC connector. | String | No | initial | Valid values:
For more information, see Startup Properties. |
scan.startup.timestamp-millis | The start timestamp for consumption at a specific offset. | Long | Depends on the value of `scan.startup.mode`
| None | The parameter is specified in milliseconds since the Linux epoch. This parameter applies only to the |
initial.snapshotting.queue.size | The queue size limit for the initial snapshot. | Integer | No | 10240 | This parameter takes effect only when the |
batch.size | The batch size for the cursor. | Integer | No | 1024 | None. |
poll.max.batch.size | The maximum number of change documents in a single batch. | Integer | No | 1024 | This parameter controls the maximum number of change documents that can be pulled at a time during stream processing. A larger value results in a larger buffer allocated within the connector. |
poll.await.time.ms | The time interval between two data pulls. | Integer | No | 1000 | The unit is milliseconds. |
heartbeat.interval.ms | The interval for sending heartbeat packets. | Integer | No | 0 | The unit is milliseconds. The MongoDB CDC connector actively sends heartbeat packets to the database to ensure the back-tracking state is up to date. A value of 0 means that heartbeat packets are never sent. Important For collections that are not frequently updated, we strongly recommend that you set this parameter. |
scan.incremental.snapshot.enabled | Specifies whether to enable parallel mode for the initial snapshot. | Boolean | No | false | This is an experimental feature. |
scan.incremental.snapshot.chunk.size.mb | The chunk size for reading snapshots in parallel mode. | Integer | No | 64 | This is an experimental feature. The unit is MB. This parameter takes effect only when parallel snapshot is enabled. |
scan.full-changelog | Generates a full changelog event stream. | Boolean | No | false | This is an experimental feature. Note This feature requires MongoDB 6.0 or later, and the before-image and after-image feature must be enabled. For information about how to enable the feature, see Document Preimages. |
scan.flatten-nested-columns.enabled | Specifies whether to parse field names that contain periods ( | Boolean | No | false | If this parameter is enabled, the Note This parameter is supported only in VVR 8.0.5 and later. |
scan.primitive-as-string | Specifies whether to parse all primitive types in a BSON document as strings. | Boolean | No | false | Note This parameter is supported only in VVR 8.0.5 and later. |
scan.ignore-delete.enabled | Specifies whether to ignore delete (-D) messages. | Boolean | No | false | When you archive data from a MongoDB source, many `DELETE` events may be generated in the oplog. If you do not want to sync these events to the downstream system, you can enable this parameter to ignore deletion events. Note
|
scan.incremental.snapshot.backfill.skip | Specifies whether to skip the backfill process of the incremental snapshot algorithm. | Boolean | No | false | Enabling this parameter provides only at-least-once semantics. Note This parameter is supported only in VVR 11.1 and later. |
initial.snapshotting.pipeline | A MongoDB pipeline operation. During the snapshot reading phase, this operation is pushed down to MongoDB to filter only the required data, which improves read efficiency. | String | No | None. |
|
initial.snapshotting.max.threads | The number of threads used for data replication. | Integer | No | None. | This parameter takes effect only when `scan.startup.mode` is set to `initial`. Note This parameter is supported only in VVR 11.1 and later. |
initial.snapshotting.queue.size | The queue size for the initial snapshot. | Integer | No | 16000 | This parameter takes effect only when `scan.startup.mode` is set to `initial`. Note This parameter is supported only in VVR 11.1 and later. |
scan.change-stream.reading.parallelism | The degree of parallelism for subscribing to the Change Stream. | Integer | No | 1 | This parameter takes effect only when `scan.incremental.snapshot.enabled` is enabled. Important To subscribe to the Change Stream with multiple concurrent threads, you must also set the `heartbeat.interval.ms` parameter. Note This parameter is supported only in VVR 11.2 and later. |
scan.change-stream.reading.queue-size | The message queue size for concurrent subscriptions to the Change Stream. | Integer | No | 16384 | This parameter takes effect only when `scan.change-stream.reading.parallelism` is enabled. Note This parameter is supported only in VVR 11.2 and later. |
Dimension table-specific parameters
Parameter | Description | Data type | Required | Default value | Notes |
lookup.cache | The cache policy. | String | No | NONE | The following cache policies are supported:
|
lookup.max-retries | The maximum number of retries if a database query fails. | Integer | No | 3 | None. |
lookup.retry.interval | The retry interval if a database query fails. | Duration | No | 1s | None. |
lookup.partial-cache.expire-after-access | The maximum retention period for records in the cache. | Duration | No | None | Supported time units include ms, s, min, h, and d. When you use this parameter, you must set |
lookup.partial-cache.expire-after-write | The maximum retention period for a record after it is written to the cache. | Duration | No | None | When you use this parameter, you must set |
lookup.partial-cache.max-rows | The maximum number of rows in the cache. If this value is exceeded, the oldest rows expire. | Long | No | None | When you use this parameter, you must set |
lookup.partial-cache.cache-missing-key | Specifies whether to cache empty records when no data is associated in the physical table. | Boolean | No | True | When you use this parameter, you must set |
Unique to sink tables
Parameter | Description | Data type | Required | Default value | Notes |
sink.buffer-flush.max-rows | The maximum number of records for each batch write. | Integer | No | 1000 | None. |
sink.buffer-flush.interval | The refresh interval for writing data. | Duration | No | 1s | None. |
sink.delivery-guarantee | The semantic guarantee for writing data. | String | No | at-least-once | Valid values:
Note Exactly-once is not currently supported. |
sink.max-retries | The maximum number of retries if a database write fails. | Integer | No | 3 | None. |
sink.retry.interval | The retry interval if a database write fails. | Duration | No | 1s | None. |
sink.parallelism | A custom degree of parallelism for the sink. | Integer | No | Empty | None. |
Data type mapping
CDC source table
BSON type | Flink SQL type |
Int32 | INT |
Int64 | BIGINT |
Double | DOUBLE |
Decimal128 | DECIMAL(p, s) |
Boolean | BOOLEAN |
Date Timestamp | DATE |
Date Timestamp | TIME |
DateTime | TIMESTAMP(3) TIMESTAMP_LTZ(3) |
Timestamp | TIMESTAMP(0) TIMESTAMP_LTZ(0) |
String ObjectId UUID Symbol MD5 JavaScript Regex | STRING |
Binary | BYTES |
Object | ROW |
Array | ARRAY |
DBPointer | ROW<$ref STRING, $id STRING> |
GeoJSON | Point: ROW<type STRING, coordinates ARRAY<DOUBLE>> Line: ROW<type STRING, coordinates ARRAY<ARRAY< DOUBLE>>> |
Dimension and sink tables
BSON type | Flink SQL type |
Int32 | INT |
Int64 | BIGINT |
Double | DOUBLE |
Decimal128 | DECIMAL |
Boolean | BOOLEAN |
DateTime | TIMESTAMP_LTZ(3) |
Timestamp | TIMESTAMP_LTZ(0) |
String ObjectId | STRING |
Binary | BYTES |
Object | ROW |
Array | ARRAY |
Examples
CDC source table
CREATE TEMPORARY TABLE mongo_source (
`_id` STRING, --must be declared
name STRING,
weight DECIMAL,
tags ARRAY<STRING>,
price ROW<amount DECIMAL, currency STRING>,
suppliers ARRAY<ROW<name STRING, address STRING>>,
db_name STRING METADATA FROM 'database_name' VIRTUAL,
collection_name STRING METADATA VIRTUAL,
op_ts TIMESTAMP_LTZ(3) METADATA VIRTUAL,
PRIMARY KEY(_id) NOT ENFORCED
) WITH (
'connector' = 'mongodb',
'hosts' = 'dds-bp169b982fc25****.mongodb.rds.aliyuncs.com:3717,dds-bp169b982fc25****.mongodb.rds.aliyuncs.com:3717,',
'username' = 'root',
'password' = '${secret_values.password}',
'database' = 'flinktest',
'collection' = 'flinkcollection',
'scan.incremental.snapshot.enabled' = 'true',
'scan.full-changelog' = 'true'
);
CREATE TEMPORARY TABLE productssink (
name STRING,
weight DECIMAL,
tags ARRAY<STRING>,
price_amount DECIMAL,
suppliers_name STRING,
db_name STRING,
collection_name STRING,
op_ts TIMESTAMP_LTZ(3)
) WITH (
'connector' = 'print',
'logger' = 'true'
);
INSERT INTO productssink
SELECT
name,
weight,
tags,
price.amount,
suppliers[1].name,
db_name,
collection_name,
op_ts
FROM
mongo_source;Dimension table
CREATE TEMPORARY TABLE datagen_source (
id STRING,
a int,
b BIGINT,
`proctime` AS PROCTIME()
) WITH (
'connector' = 'datagen'
);
CREATE TEMPORARY TABLE mongo_dim (
`_id` STRING,
name STRING,
weight DECIMAL,
tags ARRAY<STRING>,
price ROW<amount DECIMAL, currency STRING>,
suppliers ARRAY<ROW<name STRING, address STRING>>,
PRIMARY KEY(_id) NOT ENFORCED
) WITH (
'connector' = 'mongodb',
'hosts' = 'dds-bp169b982fc25****.mongodb.rds.aliyuncs.com:3717,dds-bp169b982fc25****.mongodb.rds.aliyuncs.com:3717,',
'username' = 'root',
'password' = '${secret_values.password}',
'database' = 'flinktest',
'collection' = 'flinkcollection',
'lookup.cache' = 'PARTIAL',
'lookup.partial-cache.expire-after-access' = '10min',
'lookup.partial-cache.expire-after-write' = '10min',
'lookup.partial-cache.max-rows' = '100'
);
CREATE TEMPORARY TABLE print_sink (
name STRING,
weight DECIMAL,
tags ARRAY<STRING>,
price_amount DECIMAL,
suppliers_name STRING
) WITH (
'connector' = 'print',
'logger' = 'true'
);
INSERT INTO print_sink
SELECT
T.id,
T.a,
T.b,
H.name
FROM
datagen_source AS T JOIN mongo_dim FOR SYSTEM_TIME AS OF T.`proctime` AS H ON T.id = H._id;Sink table
CREATE TEMPORARY TABLE datagen_source (
`_id` STRING,
name STRING,
weight DECIMAL,
tags ARRAY<STRING>,
price ROW<amount DECIMAL, currency STRING>,
suppliers ARRAY<ROW<name STRING, address STRING>>
) WITH (
'connector' = 'datagen'
);
CREATE TEMPORARY TABLE mongo_sink (
`_id` STRING,
name STRING,
weight DECIMAL,
tags ARRAY<STRING>,
price ROW<amount DECIMAL, currency STRING>,
suppliers ARRAY<ROW<name STRING, address STRING>>,
PRIMARY KEY(_id) NOT ENFORCED
) WITH (
'connector' = 'mongodb',
'hosts' = 'dds-bp169b982fc25****.mongodb.rds.aliyuncs.com:3717,dds-bp169b982fc25****.mongodb.rds.aliyuncs.com:3717,',
'username' = 'root',
'password' = '${secret_values.password}',
'database' = 'flinktest',
'collection' = 'flinkcollection'
);
INSERT INTO mongo_sink
SELECT * FROM datagen_source;Metadata
The MongoDB CDC source table supports metadata columns. You can access the following metadata using metadata columns.
Metadata key | Metadata type | Description |
database_name | STRING NOT NULL | The name of the database that contains the document. |
collection_name | STRING NOT NULL | The name of the collection that contains the document. |
op_ts | TIMESTAMP_LTZ(3) NOT NULL | The time when the document was changed in the database. If the document is from the historical data of a table instead of from the ChangeStream, the value is always 0. |
row_kind | STRING NOT NULL | Indicates the type of data change. Valid values:
Note Supported only in VVR 11.1 and later. |
About the before-image and after-image recording feature in MongoDB
By default, MongoDB versions earlier than 6.0 do not provide data for before-image documents or deleted documents. Without the before-image and after-image recording feature enabled, you can only achieve upsert semantics, which means the update-before data entries are missing. However, many useful operators in Flink depend on a complete change stream of insert, update-before, update-after, and delete events.
To supplement the missing before-image events, the Flink SQL Planner automatically generates a `ChangelogNormalize` node for upsert data sources. This node caches a snapshot of the current version of all documents in the Flink state. When an updated or deleted document is encountered, the node can look up the state to retrieve the before-image. However, this operator node requires a large amount of state data.

MongoDB 6.0 supports enabling the before-image and after-image recording feature. For more information, see Use MongoDB Change Streams to capture real-time data changes. After you enable this feature, MongoDB records the complete state of a document before and after each change in a special collection. If you then enable the scan.full-changelog parameter in your job, the MongoDB CDC connector generates update-before records from the change document records. This supports a complete event stream and eliminates the dependency on the `ChangelogNormalize` node.
MongoDB CDC DataStream API
When you read and write data using the DataStream API, you must use the corresponding DataStream connector to connect to Flink. For more information about how to set up a DataStream connector, see Use a DataStream connector.
You can create a DataStream API program and use `MongoDBSource`. The following code provides an example:
Java
MongoDBSource.builder()
.hosts("mongo.example.com:27017")
.username("mongouser")
.password("mongopasswd")
.databaseList("testdb")
.collectionList("testcoll")
.startupOptions(StartupOptions.initial())
.deserializer(new JsonDebeziumDeserializationSchema())
.build();XML
The VVR MongoDB connector is available in the Maven central repository for you to use directly in your job development.
<dependency>
<groupId>com.alibaba.ververica</groupId>
<artifactId>flink-connector-mongodb</artifactId>
<version>${vvr.version}</version>
</dependency>When you use the DataStream API, to enable the incremental snapshot feature, use MongoDBSource#builder() from the com.ververica.cdc.connectors.mongodb.source package when you construct the MongoDBSource data source. Otherwise, use MongoDBSource#builder() from the com.ververica.cdc.connectors.mongodb package.
When you construct a `MongoDBSource`, you can configure the following parameters:
Parameter | Description |
hosts | The hostname of the MongoDB database to connect to. |
username | The username for the MongoDB database service. Note If authentication is not enabled on the MongoDB server, you do not need to configure this parameter. |
password | The password for the MongoDB database service. Note If authentication is not enabled on the MongoDB server, you do not need to configure this parameter. |
databaseList | The name of the MongoDB database to monitor. Note The database name supports regular expressions to read data from multiple databases. You can use |
collectionList | The name of the MongoDB collection to monitor. Note The collection name supports regular expressions to read data from multiple collections. You can use |
startupOptions | The startup mode for the MongoDB CDC connector. Valid values:
For more information, see Startup Properties. |
deserializer | A deserializer that deserializes a `SourceRecord` to a specified type. Valid values:
|