All Products
Search
Document Center

Realtime Compute for Apache Flink:MongoDB connector

Last Updated:Oct 10, 2024

This topic describes how to use the MongoDB connector.

Background information

MongoDB is a document-oriented unstructured database that simplifies application development and expansion. The following table describes the capabilities supported by the MongoDB connector.

Item

Description

Table type

Source table, dimension table, and sink table

Running mode

Streaming mode

Metric

  • Metrics for source tables

    • numBytesIn

    • numBytesInPerSecond

    • numRecordsIn

    • numRecordsInPerSecond

    • numRecordsInErrors

    • currentFetchEventTimeLag

    • currentEmitEventTimeLag

    • watermarkLag

    • sourceIdleTime

  • Metrics for dimension tables and sink tables: none

Note

For more information about the metrics, see Metrics.

API type

DataStream API and SQL API

Data update or deletion in a sink table

Yes

Features

  • A MongoDB Change Data Capture (CDC) source table is a streaming source table of MongoDB databases. The MongoDB connector for a MongoDB CDC source table is referred to as a MongoDB CDC connector. The MongoDB CDC connector reads full historical data from a MongoDB database and then reads operations log data. This way, data accuracy is ensured. If an error occurs, the exactly-once semantics can be used to ensure data accuracy. The MongoDB CDC connector can use the Change Stream API to efficiently capture document changes in MongoDB databases and collections, monitor document insertion, modification, replacement, and deletion events, and convert the events into changelog streams that can be processed by Realtime Compute for Apache Flink. The MongoDB CDC connector provides the following features:

    • Efficiently monitors document changes by using the Change Stream API that is supported in MongoDB 3.6.

    • Ensures the exactly-once semantics for deployments that fail at any phase.

    • Supports full and incremental data monitoring. After the snapshot reading phase is complete, Realtime Compute for Apache Flink automatically switches to the incremental reading phase.

    • Supports parallel reading in the initial snapshot phase. Only MongoDB 4.0 or later supports this feature.

    • Supports the following startup modes:

      • initial: If the MongoDB CDC connector starts for the first time, the connector performs an initial snapshot for the monitored database table and continues to read the latest operations log data.

      • latest-offset: If the MongoDB CDC connector starts for the first time, the connector does not perform a snapshot for the monitored database table. The connector only reads data from the end of the operations log data. This indicates that the connector can read only data changes after the connector starts.

      • timestamp: The MongoDB CDC connector skips the snapshot reading phase and reads the operations log data events from a specific timestamp. Only MongoDB 4.0 or later supports this mode.

    • Supports full changelog event streams. Only MongoDB 6.0 or later supports this feature. For more information, see the Preimage and postimage features section of this topic.

  • Realtime Compute for Apache Flink that uses Ververica Runtime (VVR) 8.0.6 or later allows you to use the CREATE TABLE AS statement or CREATE DATABASE AS statement to synchronize data and schema changes from a MongoDB database to downstream tables. To perform the synchronization, you must enable the preimage and postimage features for the MongoDB database. For more information, see the Preimage and postimage features section of this topic.

  • Realtime Compute for Apache Flink that uses VVR 8.0.9 or later provides extended capability of reading the _id field of the built-in ObjectId type in dimension tables.

Prerequisites

  • MongoDB CDC source table

    • The MongoDB CDC connector can read data from ApsaraDB for MongoDB replica set or sharded cluster database instances. The MongoDB CDC connector can also read data from self-managed MongoDB databases.

    • The replica set feature is enabled for the MongoDB database that you want to monitor. This ensures that you can use the basic features of the MongoDB CDC connector. For more information, see Replication.

    • If you want to use full changelog event streams, you must make sure that the preimage and postimage features are enabled for the MongoDB database. For more information, see Document Preimages and the Preimage and postimage features section of this topic.

    • If the authentication feature of MongoDB is enabled, you must use a MongoDB user that has the following database permissions:

      • splitVector

      • listDatabases

      • listCollections

      • collStats

      • find

      • changeStream

      • Permissions to access the config.collections and config.chunks collections

  • MongoDB dimension table and sink table

    • A MongoDB database and table are created.

    • An IP address whitelist is configured to access MongoDB.

Limits

  • You can read data from or write data to only MongoDB 3.6 or later.

  • MongoDB CDC source table

    • Only Realtime Compute for Apache Flink that uses VVR 8.0.1 or later supports the MongoDB CDC connector.

    • Only MongoDB 6.0 or later supports full changelog event streams.

    • Only MongoDB 4.0 or later allows you to set the scan.startup.mode parameter to timestamp.

    • Only MongoDB 4.0 or later supports parallel reading in the initial snapshot phase. To enable parallel reading in the initial snapshot phase, set the scan.incremental.snapshot.enabled parameter to true.

  • MongoDB sink table

    • In Realtime Compute for Apache Flink that uses a VVR version earlier than 8.0.5, data can only be inserted into a sink table.

    • In Realtime Compute for Apache Flink that uses VVR 8.0.5 or later, if a primary key is defined in the DDL statement that is used to create a sink table, data can be inserted into, updated in, or deleted from the sink table. If no primary key is defined in the DDL statement that is used to create a sink table, data can only be inserted into the sink table.

  • MongoDB dimension table

    • Only Realtime Compute for Apache Flink that uses VVR 8.0.5 or later supports MongoDB dimension tables.

Syntax

CREATE TABLE tableName(
  _id STRING,
  [columnName dataType,]*
  PRIMARY KEY(_id) NOT ENFORCED
) WITH (
  'connector' = 'mongodb',
  'hosts' = 'localhost:27017',
  'username' = 'mongouser',
  'password' = '${secret_values.password}',
  'database' = 'testdb',
  'collection' = 'testcoll'
)
Note

When you create a MongoDB CDC source table, you must declare the _id STRING column as the unique primary key.

Parameters in the WITH clause

  • Common parameters

    Parameter

    Description

    Type

    Required

    Default value

    Remarks

    connector

    The name of the connector.

    String

    Yes

    No default value

    • If you use the MongoDB connector for a source table, you must specify a value for this parameter based on the VVR version that Realtime Compute for Apache Flink uses.

      • VVR 8.0.4 and earlier: Set this parameter to mongodb-cdc.

      • VVR 8.0.5 and later: Set this parameter to mongodb or mongodb-cdc.

    • If you use the MongoDB connector for a dimension table or a sink table, you must set this parameter to mongodb.

    uri

    The uniform resource identifier (URI) that is used to access the MongoDB database.

    String

    No

    No default value

    Note

    You must specify one of the uri and hosts parameters. If you specify the uri parameter, you do not need to specify the scheme, hosts, username, password, or connector.options parameter. If you specify both the uri and hosts parameters, the URI specified by the uri parameter is used to access the MongoDB database.

    hosts

    The name of the host where the MongoDB database resides.

    String

    No

    No default value

    Separate multiple hostnames with commas (,).

    scheme

    The connection protocol that is used to access the MongoDB database.

    String

    No

    mongodb

    Valid values:

    • mongodb: The default MongoDB protocol is used to access the MongoDB database.

    • mongodb+srv: The DNS SRV record protocol is used to access the MongoDB database.

    username

    The username that is used to access the MongoDB database.

    String

    No

    No default value

    This parameter is required if the identity verification feature is enabled for the MongoDB database.

    password

    The password that is used to access the MongoDB database.

    String

    No

    No default value

    This parameter is required if the identity verification feature is enabled for the MongoDB database.

    Important

    To prevent password leaks, we recommend that you use the key management method to configure your password. For more information, see Manage variables and keys.

    database

    The name of the MongoDB database.

    String

    No

    No default value

    • If you use the MongoDB connector for a source table, a regular expression can be used to match the name of the MongoDB database.

    • If you do not specify this parameter, all databases are monitored.

    collection

    The name of the MongoDB collection.

    String

    No

    No default value

    • If you use the MongoDB connector for a source table, a regular expression can be used to match the name of the collection.

      Important

      If the name of the collection that you want to monitor contains special characters of regular expressions, you must provide a fully qualified namespace (database name.collection name). Otherwise, the changes to the collection cannot be captured.

    • If you do not specify this parameter, all collections are monitored.

    connection.options

    The parameters that are specified to access the MongoDB database.

    String

    No

    No default value

    The parameters are key-value pairs that are in the key=value format and separated by ampersands (&), such as connectTimeoutMS=12000&socketTimeoutMS=13000.

  • Parameters only for source tables

    Parameter

    Description

    Type

    Required

    Default value

    Remarks

    scan.startup.mode

    The startup mode of the MongoDB CDC connector.

    String

    No

    initial

    Valid values:

    • initial: pulls all data from the initial offset.

    • latest-offset: pulls change data from the current offset.

    • timestamp: pulls change data from a specific timestamp.

    For more information, see Startup Properties.

    scan.startup.timestamp-millis

    The start timestamp for the consumption at the specified offset.

    Long

    Depending on the value of the scan.startup.mode parameter:

    • initial: no

    • latest-offset: no

    • timestamp: yes

    No default value

    The value of this parameter is the number of milliseconds that have elapsed since 00:00:00 UTC on January 1, 1970. The timestamp follows the UNIX time format.

    This parameter is required only when the scan.startup.mode parameter is set to timestamp.

    initial.snapshotting.queue.size

    The maximum queue size for the initial snapshot phase.

    Integer

    No

    10240

    This parameter takes effect only when the scan.startup.mode parameter is set to initial.

    batch.size

    The batch processing size of the cursor.

    Integer

    No

    1024

    None.

    poll.max.batch.size

    The maximum number of change documents that can be processed in a batch.

    Integer

    No

    1024

    This parameter determines the maximum number of change documents that can be pulled at a time during stream processing. A large value of this parameter indicates a large buffer that is allocated in the connector.

    poll.await.time.ms

    The interval at which data is pulled.

    Integer

    No

    1000

    Unit: milliseconds.

    heartbeat.interval.ms

    The interval at which heartbeat packets are sent.

    Integer

    No

    0

    Unit: milliseconds.

    The MongoDB CDC connector periodically sends heartbeat packets to the MongoDB database to ensure the latest backtracking status. If you set this parameter to 0, heartbeat packets are never sent.

    Important

    We strongly recommend that you specify this parameter for collections that are not frequently updated.

    scan.incremental.snapshot.enabled

    Specifies whether to enable the parallel reading mode in the initial snapshot phase.

    Boolean

    No

    false

    This is an experimental feature.

    scan.incremental.snapshot.chunk.size.mb

    The size of the shard when the parallel snapshot reading mode is enabled.

    Integer

    No

    64

    This is an experimental feature.

    Unit: MB.

    This parameter takes effect only when the parallel snapshot reading mode is enabled.

    scan.full-changelog

    Specifies whether to generate a complete full changelog event stream.

    Boolean

    No

    false

    This is an experimental feature.

    Note

    This parameter is available only when the version of the MongoDB database is MongoDB 6.0 or later and the preimage and postimage features are enabled for the MongoDB database. For more information about how to enable the preimage and postimage features, see Document Preimages.

    scan.flatten-nested-columns.enabled

    Specifies whether to read a nested field in a Binary JSON (BSON)-formatted document as a field whose name is separated with a period (.).

    Boolean

    No

    false

    If you set this parameter to true, the col field in the following BSON-formatted document is named nested.col in the obtained schema.

    {"nested":{"col":true}}
    Note

    Only Realtime Compute for Apache Flink that uses VVR 8.0.5 or later supports this parameter.

    scan.primitive-as-string

    Specifies whether to infer all basic data types in BSON-formatted documents as the STRING type.

    Boolean

    No

    false

    Note

    Only Realtime Compute for Apache Flink that uses VVR 8.0.5 or later supports this parameter.

  • Parameters only for dimension tables

    Parameter

    Description

    Type

    Required

    Default value

    Remarks

    lookup.cache

    The cache policy.

    String

    No

    NONE

    Valid values:

    • None: No data is cached.

    • Partial: Specific data that is looked up in an external database is cached.

    lookup.max-retries

    The maximum number of retries allowed when the database fails to be queried.

    Integer

    No

    3

    None.

    lookup.retry.interval

    The interval between retries when the database fails to be queried.

    Duration

    No

    1s

    None.

    lookup.partial-cache.expire-after-access

    The maximum period of time for which data records in the cache can be retained.

    Duration

    No

    No default value

    Unit: milliseconds, seconds, minutes, hour, or day.

    To use this parameter, you must set the lookup.cache parameter to PARTIAL.

    lookup.partial-cache.expire-after-write

    The maximum period of time for which data records can be retained after they are written to the cache.

    Duration

    No

    No default value

    To use this parameter, you must set the lookup.cache parameter to PARTIAL.

    lookup.partial-cache.max-rows

    The maximum number of data records that can be cached. If the number of data records that are cached exceeds the value of this parameter, the earliest data records expire.

    Long

    No

    No default value

    To use this parameter, you must set the lookup.cache parameter to PARTIAL.

    lookup.partial-cache.cache-missing-key

    Specifies whether to cache empty data records if no data is associated with the physical table.

    Boolean

    No

    True

    To use this parameter, you must set the lookup.cache parameter to PARTIAL.

  • Parameters only for sink tables

    Parameter

    Description

    Type

    Required

    Default value

    Remarks

    sink.buffer-flush.max-rows

    The maximum number of data records that can be written at a time.

    Integer

    No

    1000

    None.

    sink.buffer-flush.interval

    The interval at which data is flushed.

    Duration

    No

    1s

    None.

    sink.delivery-guarantee

    The semantics used when data is written.

    String

    No

    at-least-once

    Valid values:

    • none

    • at-least-once

    Note

    The exactly-once semantics is not supported.

    sink.max-retries

    The maximum number of retries allowed when data fails to be written to the database.

    Integer

    No

    3

    None.

    sink.retry.interval

    The interval between retries when data fails to be written to the database.

    Duration

    No

    1s

    None.

    sink.parallelism

    The degree of parallelism of the sink.

    Integer

    No

    No default value

    None.

Data type mappings

  • MongoDB CDC source table

    Data type of BSON

    Data type of Flink SQL

    Int32

    INT

    Int64

    BIGINT

    Double

    DOUBLE

    Decimal128

    DECIMAL(p, s)

    Boolean

    BOOLEAN

    Date Timestamp

    DATE

    Date Timestamp

    TIME

    DateTime

    TIMESTAMP(3)

    TIMESTAMP_LTZ(3)

    Timestamp

    TIMESTAMP(0)

    TIMESTAMP_LTZ(0)

    String

    ObjectId

    UUID

    Symbol

    MD5

    JavaScript

    Regex

    STRING

    Binary

    BYTES

    Object

    ROW

    Array

    ARRAY

    DBPointer

    ROW<$ref STRING, $id STRING>

    GeoJSON

    Point: ROW<type STRING, coordinates ARRAY<DOUBLE>>

    Line: ROW<type STRING, coordinates ARRAY<ARRAY< DOUBLE>>>

  • MongoDB dimension table and sink table

    Data type of BSON

    Data type of Flink SQL

    Int32

    INT

    Int64

    BIGINT

    Double

    DOUBLE

    Decimal128

    DECIMAL

    Boolean

    BOOLEAN

    DateTime

    TIMESTAMP_LTZ(3)

    Timestamp

    TIMESTAMP_LTZ(0)

    String

    ObjectId

    STRING

    Binary

    BYTES

    Object

    ROW

    Array

    ARRAY

Sample code

  • Sample code for a MongoDB CDC source table

    CREATE TEMPORARY TABLE mongo_source (
      `_id` STRING, --must be declared
      name STRING,
      weight DECIMAL,
      tags ARRAY<STRING>,
      price ROW<amount DECIMAL, currency STRING>,
      suppliers ARRAY<ROW<name STRING, address STRING>>,
      db_name STRING METADATA FROM 'database_name' VIRTUAL,
      collection_name STRING METADATA VIRTUAL,
      op_ts TIMESTAMP_LTZ(3) METADATA VIRTUAL,
      PRIMARY KEY(_id) NOT ENFORCED
    ) WITH (
      'connector' = 'mongodb',
      'hosts' = 'dds-bp169b982fc25****.mongodb.rds.aliyuncs.com:3717,dds-bp169b982fc25****.mongodb.rds.aliyuncs.com:3717,',
      'username' = 'root',
      'password' = '${secret_values.password}',
      'database' = 'flinktest',
      'collection' = 'flinkcollection',
      'scan.incremental.snapshot.enabled' = 'true',
      'scan.full-changelog' = 'true'
    );
    
    CREATE TEMPORARY TABLE  productssink (
      name STRING,
      weight DECIMAL,
      tags ARRAY<STRING>,
      price_amount DECIMAL,
      suppliers_name STRING,
      db_name STRING,
      collection_name STRING,
      op_ts TIMESTAMP_LTZ(3)
    ) WITH (
      'connector' = 'print',
      'logger' = 'true'
    );
    
    INSERT INTO productssink  
    SELECT
      name,
      weight,
      tags,
      price.amount,
      suppliers[1].name,
      db_name,
      collection_name,
      op_ts
    FROM
      mongo_source;
  • Sample code for a dimension table

    CREATE TEMPORARY TABLE datagen_source (
      id STRING,
      a int,
      b BIGINT,
      `proctime` AS PROCTIME()
    ) WITH (
      'connector' = 'datagen'
    );
    
    CREATE TEMPORARY TABLE mongo_dim (
      `_id` STRING,
      name STRING,
      weight DECIMAL,
      tags ARRAY<STRING>,
      price ROW<amount DECIMAL, currency STRING>,
      suppliers ARRAY<ROW<name STRING, address STRING>>,
      PRIMARY KEY(_id) NOT ENFORCED
    ) WITH (
      'connector' = 'mongodb',
      'hosts' = 'dds-bp169b982fc25****.mongodb.rds.aliyuncs.com:3717,dds-bp169b982fc25****.mongodb.rds.aliyuncs.com:3717,',
      'username' = 'root',
      'password' = '${secret_values.password}',
      'database' = 'flinktest',
      'collection' = 'flinkcollection',
      'lookup.cache' = 'PARTIAL',
      'lookup.partial-cache.expire-after-access' = '10min',
      'lookup.partial-cache.expire-after-write' = '10min',
      'lookup.partial-cache.max-rows' = '100'
    );
    
    CREATE TEMPORARY TABLE print_sink (
      name STRING,
      weight DECIMAL,
      tags ARRAY<STRING>,
      price_amount DECIMAL,
      suppliers_name STRING
    ) WITH (
      'connector' = 'print',
      'logger' = 'true'
    );
    
    INSERT INTO print_sink
    SELECT
      T.id,
      T.a,
      T.b,
      H.name
    FROM
      datagen_source AS T JOIN mongo_dim FOR SYSTEM_TIME AS OF T.`proctime` AS H ON T.id = H._id;
  • Sample code for a sink table

    CREATE TEMPORARY TABLE datagen_source (
      `_id` STRING,
      name STRING,
      weight DECIMAL,
      tags ARRAY<STRING>,
      price ROW<amount DECIMAL, currency STRING>,
      suppliers ARRAY<ROW<name STRING, address STRING>>
    ) WITH (
      'connector' = 'datagen'
    );
    
    CREATE TEMPORARY TABLE mongo_sink (
      `_id` STRING,
      name STRING,
      weight DECIMAL,
      tags ARRAY<STRING>,
      price ROW<amount DECIMAL, currency STRING>,
      suppliers ARRAY<ROW<name STRING, address STRING>>,
      PRIMARY KEY(_id) NOT ENFORCED
    ) WITH (
      'connector' = 'mongodb',
      'hosts' = 'dds-bp169b982fc25****.mongodb.rds.aliyuncs.com:3717,dds-bp169b982fc25****.mongodb.rds.aliyuncs.com:3717,',
      'username' = 'root',
      'password' = '${secret_values.password}',
      'database' = 'flinktest',
      'collection' = 'flinkcollection'
    );
    
    INSERT INTO mongo_sink
    SELECT * FROM datagen_source;

Metadata

MongoDB CDC source tables support the metadata column syntax. The following table describes the metadata that you can access by using metadata columns.

Metadata key

Metadata type

Description

database_name

STRING NOT NULL

The name of the database to which the document belongs.

collection_name

STRING NOT NULL

The name of the collection to which the document belongs.

op_ts

TIMESTAMP_LTZ(3) NOT NULL

The time when the document changes in the database. If the document is generated based on the historical data of the table instead of the changelog streams, the value of the metadata key is fixed to 0.

Preimage and postimage features

By default, a MongoDB database whose version is earlier than 6.0 does not retain pre-change documents or deleted documents. If you do not enable the preimage and postimage features for a MongoDB database, the MongoDB database supports only the UPSERT semantics based on the data in the database. As a result, UPDATE_BEFORE messages are missing. However, multiple operators of Realtime Compute for Apache Flink require a complete changelog stream, which contains change messages, including INSERT, UPDATE_BEFORE, UPDATE_AFTER, and DELETE.

To supplement the missing UPDATE_BEFORE messages, the Flink SQL planner automatically generates the ChangelogNormalize operator for data sources of the UPSERT change type. This operator caches the savepoints of the current version of all documents in the state data of deployments. If a document is updated or deleted, you can obtain the pre-update state of the document by querying the state data that is stored in the ChangelogNormalize operator. However, this method requires the operator to store a huge amount of state data.

image.png

MongoDB 6.0 supports the preimage and postimage features for the MongoDB database. For more information about the preimage and postimage features, see Document Preimages. After the preimage and postimage features are enabled for the MongoDB database, the MongoDB database records the complete state data of a document in a specific collection each time the document changes. If you configure the scan.full-changelog parameter for your deployment, the MongoDB CDC connector can generate UPDATE_BEFORE messages from the collection that stores the state data of the change document. This helps generate a complete changelog event stream and the ChangelogNormalize operator is not used.

Mongo CDC DataStream API

Important

If you want to call a DataStream API to read or write data, you must use a DataStream connector of the related type to connect to Realtime Compute for Apache Flink. For more information about how to configure a DataStream connector, see Settings of DataStream connectors.

Create a DataStream API program and use MongoDBSource. Sample code:

MongoDBSource.builder()
  .hosts("mongo.example.com:27017")
  .username("mongouser")
  .password("mongopasswd")
  .databaseList("testdb")
  .collectionList("testcoll")
  .startupOptions(StartupOptions.initial())
  .deserializer(new JsonDebeziumDeserializationSchema())
  .build();
Note

To enable the incremental snapshot feature when you use the DataStream API, use the MongoDBSource#builder() method in the com.ververica.cdc.connectors.mongodb.source package during the construction of the MongoDBSource data source. If you do not need to enable the incremental snapshot feature, use the MongoDBSource#builder() method in the com.ververica.cdc.connectors.mongodb package.

The following table describes the parameters that you must specify during the construction of the MongoDBSource data source.

Parameter

Description

hosts

The hostname of the MongoDB database that you want to access.

username

The username of the MongoDB database service.

Note

If authentication is not enabled on the MongoDB server, you do not need to specify this parameter.

password

The password of the MongoDB database service.

Note

If authentication is not enabled on the MongoDB server, you do not need to specify this parameter.

databaseList

The name of the MongoDB database that you want to monitor.

Note

If you want to read data from multiple databases, you can set this parameter to a regular expression. You can use .* to match all databases.

collectionList

The name of the MongoDB collection that you want to monitor.

Note

If you want to read data from multiple collections, you can set this parameter to a regular expression. You can use .* to match all collections.

startupOptions

The startup mode of the MongoDB CDC connector.

Valid values:

  • StartupOptions.initial()

    • Pulls all data from the initial offset.

  • StartupOptions.latest-offset()

    • Pulls change data from the current offset.

  • StartupOptions.timestamp()

    • Pulls change data from a specific timestamp.

For more information, see Startup Properties.

deserializer

A deserializer, which deserializes SourceRecords into a specific type. Valid values:

  • MongoDBConnectorDeserializationSchema: deserializes SourceRecords that are generated in Upsert mode into the internal data structure RowData of the Flink Table API or Flink SQL API.

  • MongoDBConnectorFullChangelogDeserializationSchema: deserializes SourceRecords that are generated in full changelog mode into the internal data structure RowData of the Flink Table API or Flink SQL API.

  • JsonDebeziumDeserializationSchema: deserializes SourceRecords into JSON strings.