All Products
Search
Document Center

Realtime Compute for Apache Flink:MongoDB

Last Updated:Dec 18, 2025

This topic describes how to use the MongoDB connector.

Background information

MongoDB is a document-oriented database for unstructured data. It simplifies application development and scaling. The MongoDB connector supports the following features:

Category

Details

Supported types

Source table, dimension table, sink table, and data ingestion

Runtime mode

Only stream mode is supported.

Unique monitoring metrics

Monitoring metrics

  • Source table

    • numBytesIn

    • numBytesInPerSecond

    • numRecordsIn

    • numRecordsInPerSecond

    • numRecordsInErrors

    • currentFetchEventTimeLag

    • currentEmitEventTimeLag

    • watermarkLag

    • sourceIdleTime

  • Dimension table and sink table: None.

Note

For more information, see Metric descriptions.

API types

DataStream, SQL, and data ingestion YAML

Support for updates or deletions in sink tables

Yes

Features

The MongoDB Change Data Capture (CDC) source table uses the Change Stream API to capture both full and incremental data. It first reads a full snapshot of historical data and then seamlessly switches to reading the incremental oplog. This process ensures that data is neither duplicated nor lost. The connector also supports Exactly-Once semantics to guarantee data consistency during fault recovery.

  • Based on the Change Stream API

    Uses the Change Stream API from MongoDB 3.6 to efficiently capture change events such as inserts, updates, replacements, and deletions from a database or collection. It converts these events into a changelog stream that Flink can process.

  • Full and incremental integration

    Automatically reads the initial snapshot and smoothly transitions to incremental mode without manual intervention.

  • Parallel snapshot reading

    Supports parallel reading of historical data to improve performance. This requires MongoDB 4.0 or later.

  • Multiple startup modes

    • initial: Performs a full snapshot on the first startup and then continuously reads the oplog.

    • latest-offset: Starts from the end of the current oplog without reading historical data.

    • timestamp: Starts reading the oplog from a specified timestamp and skips the snapshot. This requires MongoDB 4.0 or later.

  • Full changelog support

    Supports outputting a complete changelog that includes both before-images and after-images (for MongoDB 6.0 or later with the before-image/after-image recording feature enabled).

Flink integration enhancements

Prerequisites

  • MongoDB instance requirements

    • Only Alibaba Cloud MongoDB 3.6 or later (ReplicaSet or sharded cluster) and self-managed MongoDB 3.6 or later are supported.

    • The ReplicaSet feature must be enabled for the MongoDB database that you want to monitor. For more information, see Replication.

  • MongoDB feature dependencies

    • To use the Full Changelog event stream feature, you must enable the before-image/after-image recording feature.

    • If authentication is enabled for MongoDB, you need the following database permissions.

      Permission list

      • splitVector permission

      • listDatabases permission

      • listCollections permission

      • collStats permission

      • find permission

      • changeStream permission

      • Access permissions for the config.collections and config.chunks collections

  • MongoDB network and other preparations

    • An IP address whitelist is configured to allow Flink to access MongoDB.

    • The target MongoDB collections and data have been created.

Limits

  • CDC source table

    • MongoDB 4.0 and later support parallel reading during the initial snapshot phase. To enable parallel mode for the initial snapshot, set the scan.incremental.snapshot.enabled parameter to true.

    • Due to MongoDB Change Stream subscription limits, you cannot read data from the admin, local, or config databases, or from system collections. For more information, see the MongoDB documentation.

  • Sink table

    • Ververica Runtime (VVR) versions earlier than 8.0.5 only support inserting data.

    • In VVR 8.0.5 and later, if a primary key is declared in the sink table, you can insert, update, and delete data. If no primary key is declared, you can only insert data.

  • Dimension table

    • VVR 8.0.5 and later support using MongoDB dimension tables.

SQL

Syntax

CREATE TABLE tableName(
  _id STRING,
  [columnName dataType,]*
  PRIMARY KEY(_id) NOT ENFORCED
) WITH (
  'connector' = 'mongodb',
  'hosts' = 'localhost:27017',
  'username' = 'mongouser',
  'password' = '${secret_values.password}',
  'database' = 'testdb',
  'collection' = 'testcoll'
)
Note

When you create a CDC source table, you must declare the _id STRING column and set it as the unique primary key.

WITH parameters

General parameters

Parameter

Description

Data type

Required

Default value

Notes

connector

The name of the connector.

String

Yes

None

  • As a source table:

    • For VVR 8.0.4 and earlier, set this to mongodb-cdc.

    • For VVR 8.0.5 and later, set this to mongodb or mongodb-cdc.

  • As a dimension or sink table, the value must be mongodb.

uri

The MongoDB connection URI.

String

No

None

Note

You must specify either the uri or hosts parameter. If you specify uri, you do not need to specify scheme, hosts, username, password, or connector.options. If both are specified, the uri is used for the connection.

hosts

The hostname of the MongoDB instance.

String

No

None

You can provide multiple hostnames separated by commas (,).

scheme

The connection protocol used by MongoDB.

String

No

mongodb

Valid values:

  • mongodb: Connects using the default MongoDB protocol.

  • mongodb+srv: Connects using the DNS SRV record protocol.

username

The username for connecting to MongoDB.

String

No

None

This parameter is required if identity verification is enabled.

password

The password for connecting to MongoDB.

String

No

None

This parameter is required if identity verification is enabled.

Important

To prevent your password from being leaked, we recommend that you use a variable to set the password value. For more information, see Project Variables.

database

The name of the MongoDB database.

String

No

None

  • For a source table, the database name supports regular expressions.

  • If this parameter is not configured, all databases are monitored.

Important

Monitoring data in the admin, local, or config databases is not supported.

collection

The name of the MongoDB collection.

String

No

None

  • For a source table, the collection name supports regular expressions.

    Important

    If the collection name you want to monitor contains special characters for regular expressions, you must provide the fully qualified namespace (database_name.collection_name). Otherwise, changes to the corresponding collection cannot be captured.

  • If this parameter is not configured, all collections are monitored.

Important

Monitoring data in system collections is not supported.

connection.options

Connection parameters for the MongoDB side.

String

No

None

Additional connection parameters in the key=value format, separated by ampersands (&). For example, connectTimeoutMS=12000&socketTimeoutMS=13000.

Important

By default, MongoDB CDC does not automatically set a socket connection timeout. This can cause long interruptions during network jitter.

Set socketTimeoutMS to a reasonable value to avoid this issue.

Source table only

Parameter

Description

Data type

Required

Default value

Notes

scan.startup.mode

The startup mode for MongoDB CDC.

String

No

initial

Valid values:

  • initial: Pulls all data from the initial offset.

  • latest-offset: Pulls change data from the current offset.

  • timestamp: Pulls change data from a specified timestamp.

For more information, see Startup Properties.

scan.startup.timestamp-millis

The starting timestamp for consumption at a specific offset.

Long

Depends on the value of scan.startup.mode:

  • initial: No

  • latest-offset: No

  • timestamp: Yes

None

The format is the number of milliseconds since the Linux epoch.

This parameter applies only to the timestamp startup mode.

initial.snapshotting.queue.size

The queue size limit for the initial snapshot.

Integer

No

10240

This parameter takes effect only when the scan.startup.mode option is set to initial.

batch.size

The batch size for the cursor.

Integer

No

1024

None.

poll.max.batch.size

The maximum number of change documents in a single batch.

Integer

No

1024

This parameter controls the maximum number of change documents pulled at one time during stream processing. A larger value results in a larger buffer allocated within the connector.

poll.await.time.ms

The time interval between two data pulls.

Integer

No

1000

The unit is milliseconds.

heartbeat.interval.ms

The time interval for sending heartbeats.

Integer

No

0

The unit is milliseconds.

The MongoDB CDC connector actively sends heartbeats to the database to ensure the resume token is up to date. A value of 0 means heartbeats are never sent.

Important

For collections that are not frequently updated, we strongly recommend setting this option.

scan.incremental.snapshot.enabled

Specifies whether to enable parallel mode for the initial snapshot.

Boolean

No

false

This is an experimental feature.

scan.incremental.snapshot.chunk.size.mb

The chunk size for reading snapshots in parallel mode.

Integer

No

64

This is an experimental feature.

The unit is MB.

This parameter takes effect only when parallel snapshot is enabled.

scan.full-changelog

Generates a complete full changelog event stream.

Boolean

No

false

This is an experimental feature.

Note

Requires MongoDB 6.0 or later with the pre-image and post-image feature enabled. For more information about how to enable this feature, see Document Preimages.

scan.flatten-nested-columns.enabled

Specifies whether to parse field names separated by dots (.) as nested BSON document reads.

Boolean

No

false

If enabled, in the following BSON document example, the col field is named nested.col in the schema.

{"nested":{"col":true}}
Note

This parameter is supported only in VVR 8.0.5 and later.

scan.primitive-as-string

Specifies whether to parse all primitive types in a BSON document as strings.

Boolean

No

false

Note

This parameter is supported only in VVR 8.0.5 and later.

scan.ignore-delete.enabled

Specifies whether to ignore delete (-D) messages.

Boolean

No

false

When archiving data from a MongoDB source, many DELETE events may be generated in the OpLog. If you do not want to sync these events downstream, you can enable this parameter to ignore delete events.

Note
  • This parameter is supported only in VVR 11.1 and later.

  • Other DELETE events not originating from archiving operations will also be ignored.

scan.incremental.snapshot.backfill.skip

Specifies whether to skip the watermark backfill process of the incremental snapshot algorithm.

Boolean

No

false

Enabling this switch provides only at-least-once semantics.

Note

This parameter is supported only in VVR 11.1 and later.

initial.snapshotting.pipeline

A MongoDB pipeline operation. During the snapshot reading phase, this operation is pushed down to MongoDB to filter only the required data, improving read efficiency.

String

No

None.

  • Expressed as a JSON array of objects. For example, [{"$match": {"closed": "false"}}] copies only documents where the closed field is "false".

  • This option is effective only when the scan.startup.mode option is set to initial. It can only be used in Debezium mode and not in incremental snapshot mode, as it may cause semantic inconsistencies.

    Note

    This parameter is supported only in VVR 11.1 and later.

initial.snapshotting.max.threads

The number of threads used for data replication.

Integer

No

None.

This parameter takes effect only when the scan.startup.mode option is set to initial.

Note

This parameter is supported only in VVR 11.1 and later.

initial.snapshotting.queue.size

The queue size for the initial snapshot.

Integer

No

16000

This parameter takes effect only when the scan.startup.mode option is set to initial.

Note

This parameter is supported only in VVR 11.1 and later.

scan.change-stream.reading.parallelism

The degree of parallelism for subscribing to the Change Stream.

Integer

No

1

This parameter takes effect only when the scan.incremental.snapshot.enabled parameter is enabled.

Important

To subscribe to the Change Stream with multiple concurrent threads, you must also set the heartbeat.interval.ms parameter.

Note

This parameter is supported only in VVR 11.2 and later.

scan.change-stream.reading.queue-size

The message queue size for concurrent Change Stream subscriptions.

Integer

No

16384

This parameter is effective only when the scan.change-stream.reading.parallelism parameter is enabled.

Note

This parameter is supported only in VVR 11.2 and later.

Dimension table only

Parameter

Description

Data type

Required

Default value

Notes

lookup.cache

The cache policy.

String

No

NONE

The following cache policies are supported:

  • None: No cache.

  • Partial: Caches data only when looking it up in the external database.

lookup.max-retries

The maximum number of retries if a database query fails.

Integer

No

3

None.

lookup.retry.interval

The retry time interval if a database query fails.

Duration

No

1s

None.

lookup.partial-cache.expire-after-access

The maximum retention period for a record in the cache.

Duration

No

None

Supported time units are ms, s, min, h, and d.

When using this configuration, lookup.cache must be set to PARTIAL.

lookup.partial-cache.expire-after-write

The maximum retention period for a record after it is written to the cache.

Duration

No

None

When using this configuration, lookup.cache must be set to PARTIAL.

lookup.partial-cache.max-rows

The maximum number of rows in the cache. If this value is exceeded, the oldest rows expire.

Long

No

None

When using this configuration, lookup.cache must be set to PARTIAL.

lookup.partial-cache.cache-missing-key

Specifies whether to cache empty records when no data is found in the physical table.

Boolean

No

True

When using this configuration, lookup.cache must be set to PARTIAL.

Sink table only

Parameter

Description

Data type

Required

Default value

Notes

sink.buffer-flush.max-rows

The maximum number of records for each batch write.

Integer

No

1000

None.

sink.buffer-flush.interval

The refresh interval for writing data.

Duration

No

1s

None.

sink.delivery-guarantee

The semantic guarantee for writing data.

String

No

at-least-once

Valid values:

  • none

  • at-least-once

Note

Exactly-once is not currently supported.

sink.max-retries

The maximum number of retries if writing to the database fails.

Integer

No

3

None.

sink.retry.interval

The retry time interval if writing to the database fails.

Duration

No

1s

None.

sink.parallelism

A custom sink degree of parallelism.

Integer

No

Empty

None.

sink.delete-strategy

Configures how to handle -D or -U data types.

String

No

CHANGELOG_STANDARD

Valid values:

  • CHANGELOG_STANDARD: In standard mode, -U and -D events are applied to the downstream system as usual.

  • IGNORE_DELETE: Ignores only -D events, but still overwrites the entire row on updates.

  • PARTIAL_UPDATE: Ignores -U events to enable partial column updates. However, when a -D event is received, the entire row is still deleted.

  • IGNORE_ALL: Ignores both -U and -D events.

Type mapping

CDC source table

BSON type

Flink SQL type

Int32

INT

Int64

BIGINT

Double

DOUBLE

Decimal128

DECIMAL(p, s)

Boolean

BOOLEAN

Date Timestamp

DATE

Date Timestamp

TIME

DateTime

TIMESTAMP(3)

TIMESTAMP_LTZ(3)

Timestamp

TIMESTAMP(0)

TIMESTAMP_LTZ(0)

String

ObjectId

UUID

Symbol

MD5

JavaScript

Regex

STRING

Binary

BYTES

Object

ROW

Array

ARRAY

DBPointer

ROW<$ref STRING, $id STRING>

GeoJSON

Point: ROW<type STRING, coordinates ARRAY<DOUBLE>>

Line: ROW<type STRING, coordinates ARRAY<ARRAY< DOUBLE>>>

Dimension and sink tables

BSON type

Flink SQL type

Int32

INT

Int64

BIGINT

Double

DOUBLE

Decimal128

DECIMAL

Boolean

BOOLEAN

DateTime

TIMESTAMP_LTZ(3)

Timestamp

TIMESTAMP_LTZ(0)

String

ObjectId

STRING

Binary

BYTES

Object

ROW

Array

ARRAY

Examples

CDC source table

CREATE TEMPORARY TABLE mongo_source (
  `_id` STRING, --must be declared
  name STRING,
  weight DECIMAL,
  tags ARRAY<STRING>,
  price ROW<amount DECIMAL, currency STRING>,
  suppliers ARRAY<ROW<name STRING, address STRING>>,
  db_name STRING METADATA FROM 'database_name' VIRTUAL,
  collection_name STRING METADATA VIRTUAL,
  op_ts TIMESTAMP_LTZ(3) METADATA VIRTUAL,
  PRIMARY KEY(_id) NOT ENFORCED
) WITH (
  'connector' = 'mongodb',
  'hosts' = 'dds-bp169b982fc25****.mongodb.rds.aliyuncs.com:3717,dds-bp169b982fc25****.mongodb.rds.aliyuncs.com:3717,',
  'username' = 'root',
  'password' = '${secret_values.password}',
  'database' = 'flinktest',
  'collection' = 'flinkcollection',
  'scan.incremental.snapshot.enabled' = 'true',
  'scan.full-changelog' = 'true'
);
CREATE TEMPORARY TABLE  productssink (
  name STRING,
  weight DECIMAL,
  tags ARRAY<STRING>,
  price_amount DECIMAL,
  suppliers_name STRING,
  db_name STRING,
  collection_name STRING,
  op_ts TIMESTAMP_LTZ(3)
) WITH (
  'connector' = 'print',
  'logger' = 'true'
);
INSERT INTO productssink  
SELECT
  name,
  weight,
  tags,
  price.amount,
  suppliers[1].name,
  db_name,
  collection_name,
  op_ts
FROM
  mongo_source;

Dimension table

CREATE TEMPORARY TABLE datagen_source (
  id STRING,
  a int,
  b BIGINT,
  `proctime` AS PROCTIME()
) WITH (
  'connector' = 'datagen'
);
CREATE TEMPORARY TABLE mongo_dim (
  `_id` STRING,
  name STRING,
  weight DECIMAL,
  tags ARRAY<STRING>,
  price ROW<amount DECIMAL, currency STRING>,
  suppliers ARRAY<ROW<name STRING, address STRING>>,
  PRIMARY KEY(_id) NOT ENFORCED
) WITH (
  'connector' = 'mongodb',
  'hosts' = 'dds-bp169b982fc25****.mongodb.rds.aliyuncs.com:3717,dds-bp169b982fc25****.mongodb.rds.aliyuncs.com:3717,',
  'username' = 'root',
  'password' = '${secret_values.password}',
  'database' = 'flinktest',
  'collection' = 'flinkcollection',
  'lookup.cache' = 'PARTIAL',
  'lookup.partial-cache.expire-after-access' = '10min',
  'lookup.partial-cache.expire-after-write' = '10min',
  'lookup.partial-cache.max-rows' = '100'
);
CREATE TEMPORARY TABLE print_sink (
  name STRING,
  weight DECIMAL,
  tags ARRAY<STRING>,
  price_amount DECIMAL,
  suppliers_name STRING
) WITH (
  'connector' = 'print',
  'logger' = 'true'
);
INSERT INTO print_sink
SELECT
  T.id,
  T.a,
  T.b,
  H.name
FROM
  datagen_source AS T JOIN mongo_dim FOR SYSTEM_TIME AS OF T.`proctime` AS H ON T.id = H._id;

Sink table

CREATE TEMPORARY TABLE datagen_source (
  `_id` STRING,
  name STRING,
  weight DECIMAL,
  tags ARRAY<STRING>,
  price ROW<amount DECIMAL, currency STRING>,
  suppliers ARRAY<ROW<name STRING, address STRING>>
) WITH (
  'connector' = 'datagen'
);
CREATE TEMPORARY TABLE mongo_sink (
  `_id` STRING,
  name STRING,
  weight DECIMAL,
  tags ARRAY<STRING>,
  price ROW<amount DECIMAL, currency STRING>,
  suppliers ARRAY<ROW<name STRING, address STRING>>,
  PRIMARY KEY(_id) NOT ENFORCED
) WITH (
  'connector' = 'mongodb',
  'hosts' = 'dds-bp169b982fc25****.mongodb.rds.aliyuncs.com:3717,dds-bp169b982fc25****.mongodb.rds.aliyuncs.com:3717,',
  'username' = 'root',
  'password' = '${secret_values.password}',
  'database' = 'flinktest',
  'collection' = 'flinkcollection'
);
INSERT INTO mongo_sink
SELECT * FROM datagen_source;

Data ingestion

The MongoDB connector can be used as a data source in a data ingestion YAML job.

Limits

This feature is supported only in VVR 11.1 and later.

Syntax

source:
   type: mongodb
   name: MongoDB Source
   hosts: localhost:33076
   username: ${mongo.username}
   password: ${mongo.password}
   database: foo_db
   collection: foo_col_.*

sink:
  type: ...

Configuration items

Parameter

Description

Required

Data type

Default value

Notes

type

The data source type.

Yes

STRING

None

The value must be mongodb.

scheme

The protocol for connecting to the MongoDB server.

No

STRING

mongodb

Valid values:

  • mongodb

  • mongodb+srv

hosts

The server address for connecting to MongoDB.

Yes

STRING

None

You can specify multiple addresses separated by commas (,).

username

The username for connecting to MongoDB.

No

STRING

None

None.

password

The password for connecting to MongoDB.

No

STRING

None

None.

database

The name of the MongoDB database to capture.

Yes

STRING

None

Supports regular expressions.

collection

The name of the MongoDB collection to capture.

Yes

STRING

None

Supports regular expressions. You must match the full database.collection namespace.

connection.options

Additional connection options to append when connecting to the MongoDB server.

No

STRING

None

Key-value pairs in k=v format, separated by ampersands (&). For example, replicaSet=test&connectTimeoutMS=300000.

schema.inference.strategy

The strategy for document type inference.

Valid values are continuous and static.

No

STRING

continuous

If set to continuous, the MongoDB Source continuously performs type inference. If a subsequent record is inconsistent with the current schema, it issues a schema evolution event to widen the structure to accommodate the new data.

If set to static, MongoDB performs schema inference only once during the initialization phase.

scan.max.pre.fetch.records

The maximum number of records to sample from each captured collection during initial inference.

No

INT

50

None.

scan.startup.mode

Specifies the startup mode for the MongoDB data source.

Valid values are initial, latest-offset, timestamp, and snapshot.

No

STRING

initial

Valid values:

  • initial: Pulls all data from the initial offset and automatically switches to incremental mode.

  • latest-offset: Pulls change data from the latest OpLog offset.

  • timestamp: Pulls change data from a specified timestamp.

  • snapshot: Performs a one-time snapshot of the current database state.

scan.startup.timestamp-millis

When the startup mode is set to timestamp, this parameter specifies the timestamp from which to start capturing change data.

No

LONG

None

None.

chunk-meta.group.size

Sets the metadata chunk size limit.

No

INT

1000

None.

scan.incremental.close-idle-reader.enabled

Specifies whether to close idle Source Readers after switching to incremental mode.

No

BOOLEAN

false

None.

scan.incremental.snapshot.backfill.skip

Specifies whether to skip the watermark backfill process of the incremental snapshot algorithm.

No

BOOLEAN

false

If the sink connector you are using can automatically remove duplicates based on a primary key, enabling this switch can reduce the time taken for the full-to-incremental transition.

scan.incremental.snapshot.unbounded-chunk-first.enabled

Specifies whether to read unbounded chunks first when executing the incremental snapshot algorithm.

No

BOOLEAN

false

If the collection you are snapshotting is updated frequently, enabling this feature can reduce the likelihood of out of memory errors when reading unbounded chunks.

batch.size

The cursor batch size for reading MongoDB data.

No

INT

1024

None.

poll.max.batch.size

The maximum number of entries per request when pulling the Change Stream.

No

INT

1024

None.

poll.await.time.ms

The minimum waiting time between requests when pulling the Change Stream.

No

INT

1000

The unit is milliseconds.

heartbeat.interval.ms

The time interval for sending heartbeats.

No

INT

0

The unit is milliseconds.

The MongoDB CDC connector actively sends heartbeats to the database to ensure the resume token is up to date. A value of 0 means heartbeats are never sent.

Note

For collections that are not frequently updated, we strongly recommend setting this option.

scan.incremental.snapshot.chunk.size.mb

The chunk size during the snapshot phase.

No

INT

64

The unit is MB.

scan.incremental.snapshot.chunk.samples

The number of samples to use when determining the collection size during the snapshot phase.

No

INT

20

None.

scan.full-changelog

Specifies whether to generate a complete full changelog event stream based on Mongo Pre- and Post-Image records.

No

BOOLEAN

false

Requires MongoDB 6.0 or later with the pre-image and post-image feature enabled. For more information about how to enable this feature, see Document Preimages.

scan.cursor.no-timeout

Specifies whether to set the cursor for reading data to never expire.

No

BOOLEAN

false

A MongoDB server typically closes a cursor after it has been idle for a period of time (10 minutes) to prevent high memory usage. Setting this option to true prevents this from happening.

scan.ignore-delete.enabled

Specifies whether to ignore delete event records from the MongoDB source.

No

BOOLEAN

false

None.

scan.flatten.nested-documents.enabled

Specifies whether to flatten nested structures in BSON documents.

No

BOOLEAN

false

When this option is enabled, a schema like {"doc": {"foo": 1, "bar": "two"}} is expanded to doc.foo INT, doc.bar STRING.

scan.all.primitives.as-string.enabled

Specifies whether to infer all primitive types as STRING.

No

BOOLEAN

false

Enabling this option can avoid generating many schema evolution events when dealing with mixed input data types.

Type mapping

BSON type

CDC type

Notes

STRING

VARCHAR

None.

INT32

INT

INT64

BIGINT

DECIMAL128

DECIMAL

DOUBLE

DOUBLE

BOOLEAN

BOOLEAN

TIMESTAMP

TIMESTAMP

DATETIME

LOCALZONEDTIMESTAMP

BINARY

VARBINARY

DOCUMENT

MAP

Key/Value type parameters need to be inferred.

ARRAY

ARRAY

The element type parameter needs to be inferred.

OBJECTID

VARCHAR

Represented as a HexString.

SYMBOL

REGULAREXPRESSION

JAVASCRIPT

JAVASCRIPTWITHSCOPE

VARCHAR

Represented as a string.

Metadata

SQL connector

The MongoDB CDC SQL source table supports metadata column syntax. You can access the following metadata through metadata columns.

Metadata key

Metadata type

Description

database_name

STRING NOT NULL

The name of the database that contains the document.

collection_name

STRING NOT NULL

The name of the collection that contains the document.

op_ts

TIMESTAMP_LTZ(3) NOT NULL

The time the document was changed in the database. If the document is from the existing historical data of the table instead of from the ChangeStream, this value is always 0.

row_kind

STRING NOT NULL

Indicates the type of data change. Valid values:

  • +I: INSERT

  • -D: DELETE

  • -U: UPDATE_BEFORE

  • +U: UPDATE_AFTER

Note

This feature is supported only in VVR 11.1 and later.

Data ingestion YAML

The MongoDB CDC data ingestion YAML connector supports reading the following metadata columns:

Metadata key

Metadata type

Description

ts_ms

BIGINT NOT NULL

The time the document was changed in the database. If the document is from the existing historical data of the table instead of from the ChangeStream, this value is always 0.

You can also use the common metadata columns provided by the Transform module to access database name, collection name, and row_kind information.

About the pre-image and post-image recording feature in MongoDB

By default, versions of MongoDB earlier than 6.0 do not provide data for pre-change documents or deleted documents. Without this feature enabled, the available information can only achieve Upsert semantics, which means that Update Before data entries are missing. However, many useful operators in Flink rely on a complete change stream of Insert, Update Before, Update After, and Delete events.

To supplement the missing pre-change events, the Flink SQL Planner automatically generates a ChangelogNormalize node for Upsert-type data sources. This node caches a snapshot of the current version of all documents in Flink state. When it encounters an updated or deleted document, it can look up the pre-change state from the cached snapshot. However, this operator node requires a large amount of state data.

image.png

MongoDB 6.0 supports the pre- and post-images feature for databases. For more information, see Use MongoDB Change Streams to Capture Data Changes in Real Time. After you enable this feature, MongoDB records the complete state of a document before and after each change in a special collection. Then, when you enable the scan.full-changelog configuration item in a job, MongoDB CDC generates Update Before records from the change document records. This process creates a complete event stream and eliminates the dependency on the ChangelogNormalize node.

Mongo CDC DataStream API

Important

When you read or write data using the DataStream API, you need to use the corresponding DataStream connector to connect to Flink. For setup instructions, see Use DataStream connectors.

You can create a DataStream API program and use MongoDBSource. The following code shows an example:

Java

MongoDBSource.builder()
  .hosts("mongo.example.com:27017")
  .username("mongouser")
  .password("mongopasswd")
  .databaseList("testdb")
  .collectionList("testcoll")
  .startupOptions(StartupOptions.initial())
  .deserializer(new JsonDebeziumDeserializationSchema())
  .build();

XML

The VVR MongoDB connector is available in the Maven Central Repository. You can use it directly in your job development.

<dependency>
    <groupId>com.alibaba.ververica</groupId>
    <artifactId>flink-connector-mongodb</artifactId>
    <version>${vvr.version}</version>
</dependency>
Note

When you use the DataStream API, to enable the incremental snapshot feature, use MongoDBSource#builder() from the com.ververica.cdc.connectors.mongodb.source package when constructing the MongoDBSource. Otherwise, use MongoDBSource#builder() from the com.ververica.cdc.connectors.mongodb package.

When you construct a MongoDBSource, you can configure the following parameters:

Parameter

Description

hosts

The hostname of the MongoDB database to connect to.

username

The username for the MongoDB database service.

Note

If authentication is not enabled on the MongoDB server, you do not need to configure this parameter.

password

The password for the MongoDB database service.

Note

If authentication is not enabled on the MongoDB server, you do not need to configure this parameter.

databaseList

The name of the MongoDB database to monitor.

Note

The database name supports regular expressions to read data from multiple databases. You can use .* to match all databases.

collectionList

The name of the MongoDB collection to monitor.

Note

The collection name supports regular expressions to read data from multiple collections. You can use .* to match all collections.

startupOptions

Selects the startup mode for MongoDB CDC.

Valid values:

  • StartupOptions.initial()

    • Pulls all data from the initial offset.

  • StartupOptions.latest-offset()

    • Pulls change data from the current offset.

  • StartupOptions.timestamp()

    • Pulls change data from a specified timestamp.

For more information, see Startup Properties.

deserializer

A deserializer that deserializes SourceRecord type records to a specified type. Valid values:

  • MongoDBConnectorDeserializationSchema: Converts SourceRecord generated in Upsert mode to the internal data structure RowData for Flink Table API or SQL API.

  • MongoDBConnectorFullChangelogDeserializationSchema: Converts SourceRecord generated in Full Changelog mode to the internal data structure RowData for Flink Table or SQL.

  • JsonDebeziumDeserializationSchema: Converts SourceRecord to a JSON-formatted String.