All Products
Search
Document Center

Realtime Compute for Apache Flink:MongoDB

Last Updated:Oct 15, 2025

This topic describes how to use the MongoDB connector.

Background information

MongoDB is a document-oriented database designed for unstructured data. It simplifies application development and scaling. The MongoDB connector supports the following features:

Category

Details

Supported types

Source table, dimension table, sink table, and data ingestion

Operating modes

Only stream mode is supported.

Specific metrics

Metrics

  • Source table

    • numBytesIn

    • numBytesInPerSecond

    • numRecordsIn

    • numRecordsInPerSecond

    • numRecordsInErrors

    • currentFetchEventTimeLag

    • currentEmitEventTimeLag

    • watermarkLag

    • sourceIdleTime

  • Dimension and sink tables: None.

Note

For more information about the metrics, see Metric description.

API types

DataStream and SQL

Can sink table data be updated or deleted?

Yes

Features

The MongoDB Change Data Capture (CDC) source table uses the Change Stream API to capture both full and incremental data. It first reads all historical data (snapshot) and then seamlessly switches to reading incremental oplogs. This process ensures that no data is lost or duplicated. The connector also supports exactly-once semantics to guarantee data consistency during fault recovery.

  • Based on the Change Stream API

    The connector uses the Change Stream API of MongoDB 3.6 to efficiently capture change events, such as inserts, updates, replacements, and deletions, from a database or collection. These events are then converted into a changelog stream that Flink can process.

  • Combined full and incremental data capture

    The connector automatically reads the initial snapshot and smoothly transitions to incremental mode without manual intervention.

  • Parallel snapshot reading

    The connector supports parallel reading of historical data to improve performance. This feature requires MongoDB 4.0 or later.

  • Multiple startup modes

    • initial: Starts by taking a full snapshot and then continuously reads the oplog.

    • latest-offset: Starts reading only from the end of the current oplog without reading historical data.

    • timestamp: Starts reading the oplog from a specified timestamp, skipping the snapshot. This mode requires MongoDB 4.0 or later.

  • Full changelog support

    The connector supports outputting a complete changelog that includes both the before-image and after-image of the data. This feature requires MongoDB 6.0 or later with the before-image and after-image recording feature enabled.

Flink integration enhancements

Prerequisites

  • MongoDB instance requirements

    • Only Alibaba Cloud MongoDB (ReplicaSet or sharded cluster) or self-managed MongoDB 3.6 or later is supported.

    • You must enable the ReplicaSet feature for the MongoDB database that you want to monitor. For more information, see Replication.

  • MongoDB feature dependencies

    • To use the full changelog event stream feature, you must enable the before-image and after-image recording feature.

    • If authentication is enabled for MongoDB, you need the following database permissions:

      Permission list

      • splitVector

      • listDatabases

      • listCollections

      • collStats

      • find

      • changeStream

      • Access permissions for the `config.collections` and `config.chunks` collections

  • Network and other preparations

    • You must configure an IP whitelist to allow Flink to access MongoDB.

    • The target MongoDB collections have been created.

Limits

  • CDC source table

    • MongoDB 4.0 or later supports parallel reading during the initial snapshot phase. To enable parallel mode for the initial snapshot, you can set the scan.incremental.snapshot.enabled parameter to `true`.

    • Due to the limitations of MongoDB Change Stream subscriptions, you cannot read data from the `admin`, `local`, or `config` databases or from `system` collections. For more information, see the MongoDB documentation.

  • Sink table

    • In Ververica Runtime (VVR) versions earlier than 8.0.5, only data insertion is supported.

    • In VVR 8.0.5 or later, if a primary key is declared in the sink table, data insertion, update, and deletion are supported. If no primary key is declared, only data insertion is supported.

  • Dimension table

    • VVR 8.0.5 or later supports the use of MongoDB dimension tables.

SQL

Syntax

CREATE TABLE tableName(
  _id STRING,
  [columnName dataType,]*
  PRIMARY KEY(_id) NOT ENFORCED
) WITH (
  'connector' = 'mongodb',
  'hosts' = 'localhost:27017',
  'username' = 'mongouser',
  'password' = '${secret_values.password}',
  'database' = 'testdb',
  'collection' = 'testcoll'
)
Note

When you create a CDC source table, you must declare the _id STRING column and set it as the unique primary key.

WITH parameters

General parameters

Parameter

Description

Data type

Required

Default value

Notes

connector

The name of the connector.

String

Yes

None

  • As a source table:

    • For Ververica Runtime (VVR) 8.0.4 and earlier, set this to `mongodb-cdc`.

    • For VVR 8.0.5 and later, set this to `mongodb` or `mongodb-cdc`.

  • As a dimension or sink table, set this to `mongodb`.

uri

The connection URI for MongoDB.

String

No

None

Note

You must specify either the uri parameter or the hosts parameter. If you specify uri, you do not need to specify scheme, hosts, username, password, or connector.options. If both are specified, the uri is used for the connection.

hosts

The hostname of the MongoDB server.

String

No

None

You can provide multiple hostnames separated by commas (,).

scheme

The connection protocol used by MongoDB.

String

No

mongodb

Valid values:

  • mongodb: Connects using the default MongoDB protocol.

  • mongodb+srv: Connects using the DNS SRV record protocol.

username

The username used to connect to MongoDB.

String

No

None

This parameter is required if authentication is enabled.

password

The password used to connect to MongoDB.

String

No

None

This parameter is required if authentication is enabled.

Important

To prevent password leaks, we recommend that you use a variable for the password value. For more information, see Project variables.

database

The name of the MongoDB database.

String

No

None

  • As a source table, the database name supports regular expressions.

  • If you do not configure this parameter, all databases are monitored.

Important

You cannot monitor data in the `admin`, `local`, or `config` databases.

collection

The name of the MongoDB collection.

String

No

None

  • As a source table, the collection name supports regular expressions.

    Important

    If the name of the collection that you want to monitor contains special characters for regular expressions, you must provide the fully qualified namespace (`database.collection`). Otherwise, changes to the collection cannot be captured.

  • If you do not configure this parameter, all collections are monitored.

Important

You cannot monitor data in `system` collections.

connection.options

The connection parameters on the MongoDB side.

String

No

None

Additional connection parameters in the key=value format, separated by ampersands (&). For example, `connectTimeoutMS=12000&socketTimeoutMS=13000`.

Important

By default, the MongoDB CDC connector does not automatically set a socket connection timeout. This can cause long interruptions during network jitter.

We recommend that you always set `socketTimeoutMS` to a reasonable value to avoid this issue.

Source table-specific parameters

Parameter

Description

Data type

Required

Default value

Notes

scan.startup.mode

The startup mode for the MongoDB CDC connector.

String

No

initial

Valid values:

  • initial: Pulls all data starting from the initial offset.

  • latest-offset: Pulls change data starting from the current offset.

  • timestamp: Pulls change data starting from a specified timestamp.

For more information, see Startup Properties.

scan.startup.timestamp-millis

The start timestamp for consumption at a specific offset.

Long

Depends on the value of `scan.startup.mode`

  • initial: No

  • latest-offset: No

  • timestamp: Yes

None

The parameter is specified in milliseconds since the Linux epoch.

This parameter applies only to the timestamp startup mode.

initial.snapshotting.queue.size

The queue size limit for the initial snapshot.

Integer

No

10240

This parameter takes effect only when the scan.startup.mode parameter is set to initial.

batch.size

The batch size for the cursor.

Integer

No

1024

None.

poll.max.batch.size

The maximum number of change documents in a single batch.

Integer

No

1024

This parameter controls the maximum number of change documents that can be pulled at a time during stream processing. A larger value results in a larger buffer allocated within the connector.

poll.await.time.ms

The time interval between two data pulls.

Integer

No

1000

The unit is milliseconds.

heartbeat.interval.ms

The interval for sending heartbeat packets.

Integer

No

0

The unit is milliseconds.

The MongoDB CDC connector actively sends heartbeat packets to the database to ensure the back-tracking state is up to date. A value of 0 means that heartbeat packets are never sent.

Important

For collections that are not frequently updated, we strongly recommend that you set this parameter.

scan.incremental.snapshot.enabled

Specifies whether to enable parallel mode for the initial snapshot.

Boolean

No

false

This is an experimental feature.

scan.incremental.snapshot.chunk.size.mb

The chunk size for reading snapshots in parallel mode.

Integer

No

64

This is an experimental feature.

The unit is MB.

This parameter takes effect only when parallel snapshot is enabled.

scan.full-changelog

Generates a full changelog event stream.

Boolean

No

false

This is an experimental feature.

Note

This feature requires MongoDB 6.0 or later, and the before-image and after-image feature must be enabled. For information about how to enable the feature, see Document Preimages.

scan.flatten-nested-columns.enabled

Specifies whether to parse field names that contain periods (.) as nested BSON documents.

Boolean

No

false

If this parameter is enabled, the col field in the following BSON document example is named nested.col in the schema.

{"nested":{"col":true}}
Note

This parameter is supported only in VVR 8.0.5 and later.

scan.primitive-as-string

Specifies whether to parse all primitive types in a BSON document as strings.

Boolean

No

false

Note

This parameter is supported only in VVR 8.0.5 and later.

scan.ignore-delete.enabled

Specifies whether to ignore delete (-D) messages.

Boolean

No

false

When you archive data from a MongoDB source, many `DELETE` events may be generated in the oplog. If you do not want to sync these events to the downstream system, you can enable this parameter to ignore deletion events.

Note
  • This parameter is supported only in VVR 11.1 and later.

  • Other `DELETE` events that are not from archiving operations are also ignored.

scan.incremental.snapshot.backfill.skip

Specifies whether to skip the backfill process of the incremental snapshot algorithm.

Boolean

No

false

Enabling this parameter provides only at-least-once semantics.

Note

This parameter is supported only in VVR 11.1 and later.

initial.snapshotting.pipeline

A MongoDB pipeline operation. During the snapshot reading phase, this operation is pushed down to MongoDB to filter only the required data, which improves read efficiency.

String

No

None.

  • This parameter is specified as a JSON array of objects. For example, `[{"$match": {"closed": "false"}}]` indicates that only documents where the `closed` field is `"false"` are copied.

  • This parameter takes effect only when `scan.startup.mode` is set to `initial`. It can be used only in Debezium mode and not in incremental snapshot mode. Otherwise, semantic inconsistencies may occur.

    Note

    This parameter is supported only in VVR 11.1 and later.

initial.snapshotting.max.threads

The number of threads used for data replication.

Integer

No

None.

This parameter takes effect only when `scan.startup.mode` is set to `initial`.

Note

This parameter is supported only in VVR 11.1 and later.

initial.snapshotting.queue.size

The queue size for the initial snapshot.

Integer

No

16000

This parameter takes effect only when `scan.startup.mode` is set to `initial`.

Note

This parameter is supported only in VVR 11.1 and later.

scan.change-stream.reading.parallelism

The degree of parallelism for subscribing to the Change Stream.

Integer

No

1

This parameter takes effect only when `scan.incremental.snapshot.enabled` is enabled.

Important

To subscribe to the Change Stream with multiple concurrent threads, you must also set the `heartbeat.interval.ms` parameter.

Note

This parameter is supported only in VVR 11.2 and later.

scan.change-stream.reading.queue-size

The message queue size for concurrent subscriptions to the Change Stream.

Integer

No

16384

This parameter takes effect only when `scan.change-stream.reading.parallelism` is enabled.

Note

This parameter is supported only in VVR 11.2 and later.

Dimension table-specific parameters

Parameter

Description

Data type

Required

Default value

Notes

lookup.cache

The cache policy.

String

No

NONE

The following cache policies are supported:

  • None: No cache.

  • Partial: Caches data only when it is looked up in the external database.

lookup.max-retries

The maximum number of retries if a database query fails.

Integer

No

3

None.

lookup.retry.interval

The retry interval if a database query fails.

Duration

No

1s

None.

lookup.partial-cache.expire-after-access

The maximum retention period for records in the cache.

Duration

No

None

Supported time units include ms, s, min, h, and d.

When you use this parameter, you must set lookup.cache to PARTIAL.

lookup.partial-cache.expire-after-write

The maximum retention period for a record after it is written to the cache.

Duration

No

None

When you use this parameter, you must set lookup.cache to PARTIAL.

lookup.partial-cache.max-rows

The maximum number of rows in the cache. If this value is exceeded, the oldest rows expire.

Long

No

None

When you use this parameter, you must set lookup.cache to PARTIAL.

lookup.partial-cache.cache-missing-key

Specifies whether to cache empty records when no data is associated in the physical table.

Boolean

No

True

When you use this parameter, you must set lookup.cache to PARTIAL.

Unique to sink tables

Parameter

Description

Data type

Required

Default value

Notes

sink.buffer-flush.max-rows

The maximum number of records for each batch write.

Integer

No

1000

None.

sink.buffer-flush.interval

The refresh interval for writing data.

Duration

No

1s

None.

sink.delivery-guarantee

The semantic guarantee for writing data.

String

No

at-least-once

Valid values:

  • none

  • at-least-once

Note

Exactly-once is not currently supported.

sink.max-retries

The maximum number of retries if a database write fails.

Integer

No

3

None.

sink.retry.interval

The retry interval if a database write fails.

Duration

No

1s

None.

sink.parallelism

A custom degree of parallelism for the sink.

Integer

No

Empty

None.

Data type mapping

CDC source table

BSON type

Flink SQL type

Int32

INT

Int64

BIGINT

Double

DOUBLE

Decimal128

DECIMAL(p, s)

Boolean

BOOLEAN

Date Timestamp

DATE

Date Timestamp

TIME

DateTime

TIMESTAMP(3)

TIMESTAMP_LTZ(3)

Timestamp

TIMESTAMP(0)

TIMESTAMP_LTZ(0)

String

ObjectId

UUID

Symbol

MD5

JavaScript

Regex

STRING

Binary

BYTES

Object

ROW

Array

ARRAY

DBPointer

ROW<$ref STRING, $id STRING>

GeoJSON

Point: ROW<type STRING, coordinates ARRAY<DOUBLE>>

Line: ROW<type STRING, coordinates ARRAY<ARRAY< DOUBLE>>>

Dimension and sink tables

BSON type

Flink SQL type

Int32

INT

Int64

BIGINT

Double

DOUBLE

Decimal128

DECIMAL

Boolean

BOOLEAN

DateTime

TIMESTAMP_LTZ(3)

Timestamp

TIMESTAMP_LTZ(0)

String

ObjectId

STRING

Binary

BYTES

Object

ROW

Array

ARRAY

Examples

CDC source table

CREATE TEMPORARY TABLE mongo_source (
  `_id` STRING, --must be declared
  name STRING,
  weight DECIMAL,
  tags ARRAY<STRING>,
  price ROW<amount DECIMAL, currency STRING>,
  suppliers ARRAY<ROW<name STRING, address STRING>>,
  db_name STRING METADATA FROM 'database_name' VIRTUAL,
  collection_name STRING METADATA VIRTUAL,
  op_ts TIMESTAMP_LTZ(3) METADATA VIRTUAL,
  PRIMARY KEY(_id) NOT ENFORCED
) WITH (
  'connector' = 'mongodb',
  'hosts' = 'dds-bp169b982fc25****.mongodb.rds.aliyuncs.com:3717,dds-bp169b982fc25****.mongodb.rds.aliyuncs.com:3717,',
  'username' = 'root',
  'password' = '${secret_values.password}',
  'database' = 'flinktest',
  'collection' = 'flinkcollection',
  'scan.incremental.snapshot.enabled' = 'true',
  'scan.full-changelog' = 'true'
);
CREATE TEMPORARY TABLE  productssink (
  name STRING,
  weight DECIMAL,
  tags ARRAY<STRING>,
  price_amount DECIMAL,
  suppliers_name STRING,
  db_name STRING,
  collection_name STRING,
  op_ts TIMESTAMP_LTZ(3)
) WITH (
  'connector' = 'print',
  'logger' = 'true'
);
INSERT INTO productssink  
SELECT
  name,
  weight,
  tags,
  price.amount,
  suppliers[1].name,
  db_name,
  collection_name,
  op_ts
FROM
  mongo_source;

Dimension table

CREATE TEMPORARY TABLE datagen_source (
  id STRING,
  a int,
  b BIGINT,
  `proctime` AS PROCTIME()
) WITH (
  'connector' = 'datagen'
);
CREATE TEMPORARY TABLE mongo_dim (
  `_id` STRING,
  name STRING,
  weight DECIMAL,
  tags ARRAY<STRING>,
  price ROW<amount DECIMAL, currency STRING>,
  suppliers ARRAY<ROW<name STRING, address STRING>>,
  PRIMARY KEY(_id) NOT ENFORCED
) WITH (
  'connector' = 'mongodb',
  'hosts' = 'dds-bp169b982fc25****.mongodb.rds.aliyuncs.com:3717,dds-bp169b982fc25****.mongodb.rds.aliyuncs.com:3717,',
  'username' = 'root',
  'password' = '${secret_values.password}',
  'database' = 'flinktest',
  'collection' = 'flinkcollection',
  'lookup.cache' = 'PARTIAL',
  'lookup.partial-cache.expire-after-access' = '10min',
  'lookup.partial-cache.expire-after-write' = '10min',
  'lookup.partial-cache.max-rows' = '100'
);
CREATE TEMPORARY TABLE print_sink (
  name STRING,
  weight DECIMAL,
  tags ARRAY<STRING>,
  price_amount DECIMAL,
  suppliers_name STRING
) WITH (
  'connector' = 'print',
  'logger' = 'true'
);
INSERT INTO print_sink
SELECT
  T.id,
  T.a,
  T.b,
  H.name
FROM
  datagen_source AS T JOIN mongo_dim FOR SYSTEM_TIME AS OF T.`proctime` AS H ON T.id = H._id;

Sink table

CREATE TEMPORARY TABLE datagen_source (
  `_id` STRING,
  name STRING,
  weight DECIMAL,
  tags ARRAY<STRING>,
  price ROW<amount DECIMAL, currency STRING>,
  suppliers ARRAY<ROW<name STRING, address STRING>>
) WITH (
  'connector' = 'datagen'
);
CREATE TEMPORARY TABLE mongo_sink (
  `_id` STRING,
  name STRING,
  weight DECIMAL,
  tags ARRAY<STRING>,
  price ROW<amount DECIMAL, currency STRING>,
  suppliers ARRAY<ROW<name STRING, address STRING>>,
  PRIMARY KEY(_id) NOT ENFORCED
) WITH (
  'connector' = 'mongodb',
  'hosts' = 'dds-bp169b982fc25****.mongodb.rds.aliyuncs.com:3717,dds-bp169b982fc25****.mongodb.rds.aliyuncs.com:3717,',
  'username' = 'root',
  'password' = '${secret_values.password}',
  'database' = 'flinktest',
  'collection' = 'flinkcollection'
);
INSERT INTO mongo_sink
SELECT * FROM datagen_source;

Metadata

The MongoDB CDC source table supports metadata columns. You can access the following metadata using metadata columns.

Metadata key

Metadata type

Description

database_name

STRING NOT NULL

The name of the database that contains the document.

collection_name

STRING NOT NULL

The name of the collection that contains the document.

op_ts

TIMESTAMP_LTZ(3) NOT NULL

The time when the document was changed in the database. If the document is from the historical data of a table instead of from the ChangeStream, the value is always 0.

row_kind

STRING NOT NULL

Indicates the type of data change. Valid values:

  • +I: INSERT

  • -D: DELETE

  • -U: UPDATE_BEFORE

  • +U: UPDATE_AFTER

Note

Supported only in VVR 11.1 and later.

About the before-image and after-image recording feature in MongoDB

By default, MongoDB versions earlier than 6.0 do not provide data for before-image documents or deleted documents. Without the before-image and after-image recording feature enabled, you can only achieve upsert semantics, which means the update-before data entries are missing. However, many useful operators in Flink depend on a complete change stream of insert, update-before, update-after, and delete events.

To supplement the missing before-image events, the Flink SQL Planner automatically generates a `ChangelogNormalize` node for upsert data sources. This node caches a snapshot of the current version of all documents in the Flink state. When an updated or deleted document is encountered, the node can look up the state to retrieve the before-image. However, this operator node requires a large amount of state data.

image.png

MongoDB 6.0 supports enabling the before-image and after-image recording feature. For more information, see Use MongoDB Change Streams to capture real-time data changes. After you enable this feature, MongoDB records the complete state of a document before and after each change in a special collection. If you then enable the scan.full-changelog parameter in your job, the MongoDB CDC connector generates update-before records from the change document records. This supports a complete event stream and eliminates the dependency on the `ChangelogNormalize` node.

MongoDB CDC DataStream API

Important

When you read and write data using the DataStream API, you must use the corresponding DataStream connector to connect to Flink. For more information about how to set up a DataStream connector, see Use a DataStream connector.

You can create a DataStream API program and use `MongoDBSource`. The following code provides an example:

Java

MongoDBSource.builder()
  .hosts("mongo.example.com:27017")
  .username("mongouser")
  .password("mongopasswd")
  .databaseList("testdb")
  .collectionList("testcoll")
  .startupOptions(StartupOptions.initial())
  .deserializer(new JsonDebeziumDeserializationSchema())
  .build();

XML

The VVR MongoDB connector is available in the Maven central repository for you to use directly in your job development.

<dependency>
    <groupId>com.alibaba.ververica</groupId>
    <artifactId>flink-connector-mongodb</artifactId>
    <version>${vvr.version}</version>
</dependency>
Note

When you use the DataStream API, to enable the incremental snapshot feature, use MongoDBSource#builder() from the com.ververica.cdc.connectors.mongodb.source package when you construct the MongoDBSource data source. Otherwise, use MongoDBSource#builder() from the com.ververica.cdc.connectors.mongodb package.

When you construct a `MongoDBSource`, you can configure the following parameters:

Parameter

Description

hosts

The hostname of the MongoDB database to connect to.

username

The username for the MongoDB database service.

Note

If authentication is not enabled on the MongoDB server, you do not need to configure this parameter.

password

The password for the MongoDB database service.

Note

If authentication is not enabled on the MongoDB server, you do not need to configure this parameter.

databaseList

The name of the MongoDB database to monitor.

Note

The database name supports regular expressions to read data from multiple databases. You can use .* to match all databases.

collectionList

The name of the MongoDB collection to monitor.

Note

The collection name supports regular expressions to read data from multiple collections. You can use .* to match all collections.

startupOptions

The startup mode for the MongoDB CDC connector.

Valid values:

  • StartupOptions.initial()

    • Pulls all data starting from the initial offset.

  • StartupOptions.latest-offset()

    • Pulls change data starting from the current offset.

  • StartupOptions.timestamp()

    • Pulls change data starting from a specified timestamp.

For more information, see Startup Properties.

deserializer

A deserializer that deserializes a `SourceRecord` to a specified type. Valid values:

  • MongoDBConnectorDeserializationSchema: Converts a `SourceRecord` generated in upsert mode to the `RowData` internal data structure of the Flink Table API or SQL API.

  • MongoDBConnectorFullChangelogDeserializationSchema: Converts a `SourceRecord` generated in full changelog mode to the `RowData` internal data structure of the Flink Table API or SQL API.

  • JsonDebeziumDeserializationSchema: Converts a `SourceRecord` to a JSON-formatted string.