All Products
Search
Document Center

Realtime Compute for Apache Flink:MongoDB

Last Updated:Mar 31, 2026

The MongoDB connector integrates ApsaraDB for MongoDB and self-managed MongoDB into Realtime Compute for Apache Flink as source, dimension, and sink tables. It uses the Change Stream API to capture insert, update, replace, and delete events in real time.

Capabilities

Category

Description

Table types

SQL source, lookup (dimension), and sink

Flink CDC source

DataStream source

Running mode

Streaming

API types

DataStream API, SQL, Flink CDC

Sink write semantics

Insert, update, and delete (with primary key declared)

Monitoring metrics

Source tables:

numBytesIn, numBytesInPerSecond, numRecordsIn, numRecordsInPerSecond, numRecordsInErrors, currentFetchEventTimeLag, currentEmitEventTimeLag, watermarkLag, sourceIdleTime

Dimension and sink tables expose no monitoring metrics.

For metric definitions, see Monitoring metrics.

How it works

The MongoDB connector reads data in two phases:

  1. Full snapshot — reads all existing documents from the targeted collections in parallel.

  2. Incremental reading — automatically switches to consuming the oplog via the Change Stream API after the snapshot completes.

This process provides exactly-once semantics, ensuring no duplicate or missing records during fault recovery.

Key concepts

Startup modes

Choose a startup mode based on when your pipeline needs to start consuming data:

Mode

Behavior

Use when

initial (default)

Reads a snapshot on first start, then switches to incremental reading

You need a complete copy of existing data

latest-offset

Starts from the current oplog position; no historical data

You only need changes from now on

timestamp

Reads oplog events from a specified timestamp; skips the snapshot

You need changes starting from a known point in time (requires MongoDB 4.0+)

Full changelog support

By default, MongoDB does not store the pre-change state of documents (versions before MongoDB 6.0). Without this information, the connector can only produce UPSERT events — UPDATE_BEFORE records are missing.

To work around this, the Flink SQL planner inserts a ChangelogNormalize operator that caches document state in Flink's state backend. While functional, this approach consumes significant state storage.

image.png

MongoDB 6.0+ supports preimage and postimage recording. When enabled, MongoDB records the complete document state before and after each change. Setting scan.full-changelog to true tells the connector to use these records to produce full changelog streams — eliminating the ChangelogNormalize operator and its state overhead.

Prerequisites

Before you begin, ensure that you have:

  • An ApsaraDB for MongoDB instance (replica set or sharded cluster), or a self-managed MongoDB 3.6+ cluster, with replica set mode enabled. See Replication.

  • If authentication is enabled, a MongoDB user with the following permissions: splitVector, listDatabases, listCollections, collStats, find, changeStream, and read access to config.collections and config.chunks.

  • Flink cluster IP addresses added to the MongoDB IP allowlist.

  • The target database and collection created before running the job.

Limitations

SQL source

  • Parallel snapshot reading requires MongoDB 4.0 or later. Enable it by setting scan.incremental.snapshot.enabled to true.

  • The admin, local, and config databases and all system collections cannot be monitored. This is a MongoDB Change Stream restriction. See the Change Streams in MongoDB documentation.

  • When creating a SQL source table, declare the _id STRING column and set it as the primary key.

SQL sink

  • VVR 8.0.4 and earlier: insert only.

  • VVR 8.0.5 and later with a primary key declared: insert, update, and delete.

  • VVR 8.0.5 and later without a primary key: insert only.

  • Exactly-once delivery is not supported. The sink.delivery-guarantee option accepts none or at-least-once.

SQL lookup (dimension)

  • Supported in VVR 8.0.5 and later.

  • VVR 8.0.9 and later: lookup joins support reading the built-in _id field of the ObjectId type.

SQL

Syntax

CREATE TABLE tableName(
  _id STRING,
  [columnName dataType,]*
  PRIMARY KEY(_id) NOT ENFORCED
) WITH (
  'connector' = 'mongodb',
  'hosts' = 'localhost:27017',
  'username' = 'mongouser',
  'password' = '${secret_values.password}',
  'database' = 'testdb',
  'collection' = 'testcoll'
)
Declare the _id STRING column and specify it as the primary key when creating a CDC source table.

Connector options

General

Option

Type

Required

Default

Description

connector

String

Yes

Connector identifier. Source tables: mongodb-cdc (VVR 8.0.4 and earlier) or mongodb / mongodb-cdc (VVR 8.0.5 and later). Dimension or sink tables: mongodb.

uri

String

No

MongoDB connection URI. Specify either uri or hosts. If you specify uri, omit scheme, hosts, username, password, and connection.options. If both are set, uri takes precedence.

hosts

String

No

Hostname of the MongoDB server. Separate multiple hosts with commas (,).

scheme

String

No

mongodb

Connection protocol. Valid values: mongodb (default), mongodb+srv (DNS SRV).

username

String

No

MongoDB username. Required when authentication is enabled.

password

String

No

MongoDB password. Required when authentication is enabled. Use variables instead of hardcoding credentials.

database

String

No

MongoDB database name. Supports regular expressions for source tables. If not set, all databases are monitored. Cannot monitor the admin, local, or config databases.

collection

String

No

MongoDB collection name. Supports regular expressions for source tables.

Notes:

  • If not set, all collections are monitored.

  • Cannot monitor system collections.

  • If the collection name contains regular expression special characters, use the fully qualified namespace (database.collection).

connection.options

String

No

Additional connection options as \&-separated key=value pairs (for example, connectTimeoutMS=12000\&socketTimeoutMS=13000).

By default, the connector does not set a socket connection timeout, which can cause long interruptions during network jitter. Set socketTimeoutMS to a reasonable value to avoid this.

Source

Option

Type

Required

Default

Description

scan.startup.mode

String

No

initial

Startup mode. Valid values: initial, latest-offset, timestamp. See Startup modes and Startup Properties.

scan.startup.timestamp-millis

Long

Conditional

Start timestamp in milliseconds since the UNIX epoch. Required when scan.startup.mode is timestamp.

initial.snapshotting.queue.size

Integer

No

10240

Maximum queue size during the initial snapshot phase. Takes effect only when scan.startup.mode is initial.

batch.size

Integer

No

1024

Cursor batch size.

poll.max.batch.size

Integer

No

1024

Maximum number of change documents pulled per batch during stream reading. Larger values allocate a larger internal buffer.

poll.await.time.ms

Integer

No

1000

Interval between data pull requests, in milliseconds.

heartbeat.interval.ms

Integer

No

0

Heartbeat interval in milliseconds. The connector sends heartbeats to track the latest oplog position. Setting this to 0 disables heartbeats. Set this for infrequently updated collections.

scan.incremental.snapshot.enabled

Boolean

No

false

Enables parallel snapshot reading. Experimental feature. Requires MongoDB 4.0 or later.

scan.incremental.snapshot.chunk.size.mb

Integer

No

64

Chunk size for parallel snapshot reading, in MB. Experimental feature. Takes effect only when parallel snapshot reading is enabled.

scan.full-changelog

Boolean

No

false

Generates a full changelog stream using MongoDB preimage and postimage records. Experimental feature. Requires MongoDB 6.0 or later with preimage and postimage features enabled.

scan.flatten-nested-columns.enabled

Boolean

No

false

Parses fields separated by . as nested BSON document fields. For example, {"nested":{"col":true}} maps to a field named nested.col. Supported in VVR 8.0.5 and later.

scan.primitive-as-string

Boolean

No

false

Parses all primitive BSON types as STRING. Supported in VVR 8.0.5 and later.

scan.ignore-delete.enabled

Boolean

No

false

Ignores all DELETE (-D) events, including those generated during MongoDB data archiving. Supported in VVR 11.1 and later.

scan.incremental.snapshot.backfill.skip

Boolean

No

false

Skips backfill during incremental snapshot reading. Enabling this option provides only at-least-once semantics. Supported in VVR 11.1 and later.

initial.snapshotting.pipeline

String

No

MongoDB aggregation pipeline operations applied during snapshot reading to filter data. Specify as a JSON array, for example: [{"$match": {"closed": "false"}}]. Takes effect only when scan.startup.mode is initial and the connector runs in Debezium mode. Supported in VVR 11.1 and later.

initial.snapshotting.max.threads

Integer

No

Number of threads for snapshot replication. Takes effect only when scan.startup.mode is initial. Supported in VVR 11.1 and later.

initial.snapshotting.queue.size

Integer

No

16000

Queue size for the initial snapshot. Takes effect only when scan.startup.mode is initial. Supported in VVR 11.1 and later.

scan.change-stream.reading.parallelism

Integer

No

1

Number of concurrent readers for the Change Stream. Takes effect only when scan.incremental.snapshot.enabled is true. Also set heartbeat.interval.ms when using this option. Supported in VVR 11.2 and later.

scan.change-stream.reading.queue-size

Integer

No

16384

Message queue size for concurrent change stream subscriptions. Takes effect only when scan.change-stream.reading.parallelism is enabled. Supported in VVR 11.2 and later.

Lookup (dimension)

Option

Type

Required

Default

Description

lookup.cache

String

No

NONE

Cache policy. Valid values: NONE (no caching), PARTIAL (cache lookup results from the external database).

lookup.max-retries

Integer

No

3

Maximum retries on lookup failure.

lookup.retry.interval

Duration

No

1s

Interval between retries on lookup failure.

lookup.partial-cache.expire-after-access

Duration

No

Maximum time a cached entry lives after last access. Supported units: ms, s, min, h, d. Requires lookup.cache = PARTIAL.

lookup.partial-cache.expire-after-write

Duration

No

Maximum time a cached entry lives after being written. Requires lookup.cache = PARTIAL.

lookup.partial-cache.max-rows

Long

No

Maximum number of rows in the cache. Oldest entries are evicted when the limit is reached. Requires lookup.cache = PARTIAL.

lookup.partial-cache.cache-missing-key

Boolean

No

true

Caches a null entry when a lookup key has no matching record. Requires lookup.cache = PARTIAL.

Sink

Option

Type

Required

Default

Description

sink.buffer-flush.max-rows

Integer

No

1000

Maximum number of records written per batch.

sink.buffer-flush.interval

Duration

No

1s

Flush interval.

sink.delivery-guarantee

String

No

at-least-once

Write delivery semantics. Valid values: none, at-least-once. Exactly-once is not supported.

sink.max-retries

Integer

No

3

Maximum retries on write failure.

sink.retry.interval

Duration

No

1s

Interval between retries on write failure.

sink.parallelism

Integer

No

Custom sink parallelism.

sink.delete-strategy

String

No

CHANGELOG_STANDARD

Strategy for handling -D and -U events. Valid values: CHANGELOG_STANDARD (apply updates and deletes normally), IGNORE_DELETE (ignore -D events; overwrite full rows on -U), PARTIAL_UPDATE (ignore -U events to support partial column updates; delete rows on -D), IGNORE_ALL (ignore both -U and -D events).

Data type mappings

Source

BSON type

Flink SQL

Int32

INT

Int64

BIGINT

Double

DOUBLE

Decimal128

DECIMAL(p, s)

Boolean

BOOLEAN

Date Timestamp

DATE

Date Timestamp

TIME

DateTime

TIMESTAMP(3), TIMESTAMP_LTZ(3)

Timestamp

TIMESTAMP(0), TIMESTAMP_LTZ(0)

String, ObjectId, UUID, Symbol, MD5, JavaScript, Regex

STRING

Binary

BYTES

Object

ROW

Array

ARRAY

DBPointer

ROW\<$ref STRING, $id STRING\>

GeoJSON Point

ROW\<type STRING, coordinates ARRAY\<DOUBLE\>\>

GeoJSON Line

ROW\<type STRING, coordinates ARRAY\<ARRAY\<DOUBLE\>\>\>

Lookup (dimension) and sink

BSON type

Flink SQL type

Int32

INT

Int64

BIGINT

Double

DOUBLE

Decimal128

DECIMAL

Boolean

BOOLEAN

DateTime

TIMESTAMP_LTZ(3)

Timestamp

TIMESTAMP_LTZ(0)

String, ObjectId

STRING

Binary

BYTES

Object

ROW

Array

ARRAY

Metadata columns

SQL source supports the following metadata columns:

Metadata column

Type

Description

database_name

STRING NOT NULL

The database containing the document.

collection_name

STRING NOT NULL

The collection containing the document.

op_ts

TIMESTAMP_LTZ(3) NOT NULL

Time when the document changed. Returns 0 for documents from the initial snapshot.

row_kind

STRING NOT NULL

Change event type: +I (INSERT), -D (DELETE), -U (UPDATE_BEFORE), +U (UPDATE_AFTER). Supported in VVR 11.1 and later.

Examples

Source

The following example reads from a MongoDB source table with parallel snapshot reading and full changelog enabled, then writes selected fields to a print sink.

-- CDC source table: reads product data from MongoDB
-- _id must be declared and set as the primary key
CREATE TEMPORARY TABLE mongo_source (
  `_id` STRING,
  name STRING,
  weight DECIMAL,
  tags ARRAY<STRING>,
  price ROW<amount DECIMAL, currency STRING>,
  suppliers ARRAY<ROW<name STRING, address STRING>>,
  db_name STRING METADATA FROM 'database_name' VIRTUAL,
  collection_name STRING METADATA VIRTUAL,
  op_ts TIMESTAMP_LTZ(3) METADATA VIRTUAL,
  PRIMARY KEY(_id) NOT ENFORCED
) WITH (
  'connector' = 'mongodb',
  'hosts' = 'dds-bp169b982fc25****.mongodb.rds.aliyuncs.com:3717,dds-bp169b982fc25****.mongodb.rds.aliyuncs.com:3717,',
  'username' = 'root',
  'password' = '${secret_values.password}',
  'database' = 'flinktest',
  'collection' = 'flinkcollection',
  'scan.incremental.snapshot.enabled' = 'true', -- Enable parallel snapshot reading (requires MongoDB 4.0+)
  'scan.full-changelog' = 'true'                -- Enable full changelog (requires MongoDB 6.0+ with preimage/postimage)
);

CREATE TEMPORARY TABLE productssink (
  name STRING,
  weight DECIMAL,
  tags ARRAY<STRING>,
  price_amount DECIMAL,
  suppliers_name STRING,
  db_name STRING,
  collection_name STRING,
  op_ts TIMESTAMP_LTZ(3)
) WITH (
  'connector' = 'print',
  'logger' = 'true'
);

INSERT INTO productssink
SELECT
  name,
  weight,
  tags,
  price.amount,
  suppliers[1].name,
  db_name,
  collection_name,
  op_ts
FROM mongo_source;

Lookup (dimension)

The following example joins a data generator stream with a MongoDB dimension table using a temporal join.

CREATE TEMPORARY TABLE datagen_source (
  id STRING,
  a INT,
  b BIGINT,
  `proctime` AS PROCTIME()
) WITH (
  'connector' = 'datagen'
);

CREATE TEMPORARY TABLE mongo_dim (
  `_id` STRING,
  name STRING,
  weight DECIMAL,
  tags ARRAY<STRING>,
  price ROW<amount DECIMAL, currency STRING>,
  suppliers ARRAY<ROW<name STRING, address STRING>>,
  PRIMARY KEY(_id) NOT ENFORCED
) WITH (
  'connector' = 'mongodb',
  'hosts' = 'dds-bp169b982fc25****.mongodb.rds.aliyuncs.com:3717,dds-bp169b982fc25****.mongodb.rds.aliyuncs.com:3717,',
  'username' = 'root',
  'password' = '${secret_values.password}',
  'database' = 'flinktest',
  'collection' = 'flinkcollection',
  'lookup.cache' = 'PARTIAL',                           -- Cache lookup results for better performance
  'lookup.partial-cache.expire-after-access' = '10min', -- Evict cached entries after 10 minutes of inactivity
  'lookup.partial-cache.expire-after-write' = '10min',  -- Evict cached entries 10 minutes after write
  'lookup.partial-cache.max-rows' = '100'               -- Maximum 100 rows in the cache
);

CREATE TEMPORARY TABLE print_sink (
  name STRING,
  weight DECIMAL,
  tags ARRAY<STRING>,
  price_amount DECIMAL,
  suppliers_name STRING
) WITH (
  'connector' = 'print',
  'logger' = 'true'
);

INSERT INTO print_sink
SELECT
  T.id,
  T.a,
  T.b,
  H.name
FROM datagen_source AS T
JOIN mongo_dim FOR SYSTEM_TIME AS OF T.`proctime` AS H ON T.id = H._id;

Sink

The following example writes data from a data generator to a MongoDB sink table. A primary key is declared to support insert, update, and delete operations.

CREATE TEMPORARY TABLE datagen_source (
  `_id` STRING,
  name STRING,
  weight DECIMAL,
  tags ARRAY<STRING>,
  price ROW<amount DECIMAL, currency STRING>,
  suppliers ARRAY<ROW<name STRING, address STRING>>
) WITH (
  'connector' = 'datagen'
);

CREATE TEMPORARY TABLE mongo_sink (
  `_id` STRING,
  name STRING,
  weight DECIMAL,
  tags ARRAY<STRING>,
  price ROW<amount DECIMAL, currency STRING>,
  suppliers ARRAY<ROW<name STRING, address STRING>>,
  PRIMARY KEY(_id) NOT ENFORCED -- Declare primary key to enable update and delete
) WITH (
  'connector' = 'mongodb',
  'hosts' = 'dds-bp169b982fc25****.mongodb.rds.aliyuncs.com:3717,dds-bp169b982fc25****.mongodb.rds.aliyuncs.com:3717,',
  'username' = 'root',
  'password' = '${secret_values.password}',
  'database' = 'flinktest',
  'collection' = 'flinkcollection'
);

INSERT INTO mongo_sink SELECT * FROM datagen_source;

Flink CDC (public preview)

Flink CDC lets you synchronize MongoDB data to a downstream store using a YAML script-based pipeline, without writing SQL DDL. This feature requires VVR 11.1 and later.

Syntax

source:
   type: mongodb
   name: MongoDB Source
   hosts: localhost:33076
   username: ${mongo.username}
   password: ${mongo.password}
   database: foo_db
   collection: foo_col_.*

sink:
  type: ...

Configuration options

Option

Required

Type

Default

Description

type

Yes

STRING

The connector. Set to mongodb.

scheme

No

STRING

mongodb

Connection protocol. Valid values: mongodb, mongodb+srv.

hosts

Yes

STRING

MongoDB server hostname(s). Separate multiple hosts with commas.

username

No

STRING

MongoDB username.

password

No

STRING

MongoDB password.

database

Yes

STRING

MongoDB database name to capture. Regular expressions are supported.

collection

Yes

STRING

MongoDB collection name to capture. Regular expressions are supported. Use the fully qualified database.collection namespace.

connection.options

No

STRING

Additional connection options as \&-separated k=v pairs, for example: replicaSet=test\&connectTimeoutMS=300000.

schema.inference.strategy

No

STRING

continuous

Schema inference strategy. continuous: infers types continuously and emits schema change events when the schema widens. static: infers schema once at startup.

scan.max.pre.fetch.records

No

INT

50

Maximum records to sample per collection during initial schema inference.

scan.startup.mode

No

STRING

initial

Startup mode. Valid values: initial, latest-offset, timestamp, snapshot.

scan.startup.timestamp-millis

No

LONG

Start timestamp in milliseconds. Required when scan.startup.mode is timestamp.

chunk-meta.group.size

No

INT

1000

Maximum metadata chunk size.

scan.incremental.close-idle-reader.enabled

No

BOOLEAN

false

Closes idle source readers after switching to incremental reading.

scan.incremental.snapshot.backfill.skip

No

BOOLEAN

false

Skips the backfill stage. Enable if the sink supports automatic primary-key deduplication, to reduce snapshot-to-incremental transition time.

scan.incremental.snapshot.unbounded-chunk-first.enabled

No

BOOLEAN

false

Reads unbounded chunks first. Reduces out-of-memory risk for frequently updated collections.

batch.size

No

INT

1024

Cursor batch size.

poll.max.batch.size

No

INT

1024

Maximum entries per Change Stream pull request.

poll.await.time.ms

No

INT

1000

Minimum wait time between Change Stream pull requests, in milliseconds.

heartbeat.interval.ms

No

INT

0

Heartbeat interval in milliseconds. Set this for infrequently updated collections. Setting to 0 disables heartbeats.

scan.incremental.snapshot.chunk.size.mb

No

INT

64

Chunk size during snapshotting, in MB.

scan.incremental.snapshot.chunk.samples

No

INT

20

Number of samples used to estimate collection size during snapshotting.

scan.full-changelog

No

BOOLEAN

false

Generates full changelog events using preimage and postimage records. Requires MongoDB 6.0 or later with preimage and postimage enabled.

scan.cursor.no-timeout

No

BOOLEAN

false

Disables cursor timeout. By default, MongoDB closes idle cursors after 10 minutes.

scan.ignore-delete.enabled

No

BOOLEAN

false

Ignores delete events from MongoDB.

scan.flatten.nested-documents.enabled

No

BOOLEAN

false

Flattens nested BSON documents. For example, {"doc": {"foo": 1, "bar": "two"}} becomes doc.foo INT, doc.bar STRING.

scan.all.primitives.as-string.enabled

No

BOOLEAN

false

Infers all primitive types as STRING. Reduces schema change events when upstream types are inconsistent.

metadata.list

No

STRING

Comma-separated list of metadata fields to pass downstream. Supported values: ts_ms (OpLog event timestamp), op_ts (alias for ts_ms; use when writing metadata to Kafka JSON).

Data type mappings

MongoDB BSON

Flink CDC

Notes

STRING

VARCHAR

INT32

INT

INT64

BIGINT

DECIMAL128

DECIMAL

DOUBLE

DOUBLE

BOOLEAN

BOOLEAN

TIMESTAMP

TIMESTAMP

DATETIME

LOCALZONEDTIMESTAMP

BINARY

VARBINARY

DOCUMENT

MAP

Key and value types are inferred.

ARRAY

ARRAY

Element types are inferred.

OBJECTID

VARCHAR

Represented as a hex string.

SYMBOL, REGULAREXPRESSION, JAVASCRIPT, JAVASCRIPTWITHSCOPE

VARCHAR

Represented as a string.

Metadata columns

Flink CDC supports the following metadata column for the MongoDB connector:

Metadata column

Type

Description

ts_ms

BIGINT NOT NULL

Time when the document changed (OpLog timestamp). Returns 0 for documents from the initial snapshot.

Use the Transform module's generic metadata columns to access database_name, collection_name, and row_kind.

DataStream API

Important

To use the DataStream API, set up the DataStream connector for your job. See DataStream connector usage.

Add the Maven dependency

The Maven Central Repository hosts VVR MongoDB connectors.

<dependency>
    <groupId>com.alibaba.ververica</groupId>
    <artifactId>flink-connector-mongodb</artifactId>
    <version>${vvr.version}</version>
</dependency>

Build a MongoDBSource

Use MongoDBSource.builder() to construct a source:

  • To enable incremental snapshot reading, use the builder from com.ververica.cdc.connectors.mongodb.source.

  • Otherwise, use the builder from com.ververica.cdc.connectors.mongodb.

MongoDBSource.builder()
  .hosts("mongo.example.com:27017")
  .username("mongouser")
  .password("mongopasswd")
  .databaseList("testdb")          // Supports regular expressions; use .* to match all databases
  .collectionList("testcoll")      // Supports regular expressions; use .* to match all collections
  .startupOptions(StartupOptions.initial()) // StartupOptions.latest-offset(), StartupOptions.timestamp()
  .deserializer(new JsonDebeziumDeserializationSchema())
  .build();

MongoDBSource parameters

Parameter

Description

hosts

Hostname of the MongoDB server.

username

MongoDB username. Omit if authentication is not enabled.

password

MongoDB password. Omit if authentication is not enabled.

databaseList

Database name to monitor. Supports regular expressions. Use .* to match all databases.

collectionList

Collection name to monitor. Supports regular expressions. Use .* to match all collections.

startupOptions

Startup mode. Valid values: StartupOptions.initial(), StartupOptions.latest-offset(), StartupOptions.timestamp().

deserializer

Deserializer for converting SourceRecord objects. Valid values: MongoDBConnectorDeserializationSchema (upsert mode, produces Flink RowData), MongoDBConnectorFullChangelogDeserializationSchema (full changelog mode, produces Flink RowData), JsonDebeziumDeserializationSchema (produces JSON strings).

References

  • CREATE TABLE AS (CTAS) — Synchronize MongoDB data and schema changes to downstream tables (VVR 8.0.6 and later, requires preimage and postimage).

  • CREATE DATABASE AS (CDAS) — Synchronize an entire MongoDB database to downstream tables (VVR 8.0.6 and later, requires preimage and postimage).

  • Monitoring metrics — Monitor source table performance.