全部产品
Search
文档中心

实时计算Flink版:MongoDB

更新时间:Jun 24, 2025

本文为您介绍如何使用MongoDB连接器。

背景信息

MongoDB是一个面向文档的非结构化数据库,能够简化应用程序的开发及扩展。MongoDB连接器支持的信息如下:

类别

详情

支持类型

源表、维表和结果表

运行模式

仅支持流模式

特有监控指标

  • 源表

    • numBytesIn

    • numBytesInPerSecond

    • numRecordsIn

    • numRecordsInPerSecond

    • numRecordsInErrors

    • currentFetchEventTimeLag

    • currentEmitEventTimeLag

    • watermarkLag

    • sourceIdleTime

  • 维表和结果表:无。

说明

指标含义详情,请参见监控指标说明

API 种类

DataStream和SQL

是否支持更新或删除结果表数据

特色功能

  • MongoDB的CDC源表,即MongoDB的流式源表,会先读取数据库的历史全量数据,并平滑切换到oplog读取上,保证不多读一条也不少读一条。即使发生故障,也能保证通过Exactly Once语义处理数据。MongoDB CDC支持通过Change Stream API高效地捕获MongoDB的数据库和集合中的文档变更,监控文档的插入、修改、替换、删除事件,并将其转换为Flink能够处理的Changelog数据流。作为源表,支持以下功能特性:

    • 支持利用MongoDB 3.6新增的Change Stream API,更高效地监控变化。

    • 精确一次处理:在作业任何阶段失败都能保证Exactly-once语义。

    • 支持全增量一体化监测:支持快照阶段完成后自动切换为增量读取阶段。

    • 支持初始快照阶段的并行读取,需要MongoDB >= 4.0。

    • 支持多种启动模式:

      • initial模式:在第一次启动时对受监视的数据库表执行初始快照,并继续读取最新的oplog。

      • latest-offset模式:首次启动时,从不对受监视的数据库表执行快照, 连接器仅从oplog 的结尾处开始读取,这意味着连接器只能读取在连接器启动之后的数据更改。

      • timestamp:跳过快照阶段,从指定的时间戳开始读取oplog事件,需要MongoDB >= 4.0。

    • 支持产生Full Changelog事件流,需要MongoDB >= 6.0,详情请参见关于MongoDB的变更前后像记录功能

  • 实时计算Flink VVR 8.0.6及以上版本支持通过CREATE TABLE AS(CTAS)语句CREATE DATABASE AS(CDAS)语句将MongoDB的数据和Schema变更同步到下游表。使用时需开启MongoDB数据库的前像后像(Pre- and Post-images)记录功能,详情请参见关于MongoDB的变更前后像记录功能

  • 实时计算Flink VVR 8.0.9及以上版本扩展维表关联读取能力,支持读取内置ObjectId 类型的_id字段。

前提条件

  • CDC源表

    • CDC连接器支持通过副本集或分片集架构模式读取阿里云云数据库MongoDB版的数据,也支持读取自建MongoDB数据库的数据 。

    • 使用MongoDB CDC连接器的基础功能时,必须开启待监控的MongoDB数据库的副本集(Replica Set)功能,详情请参见Replication

    • 如需使用Full Changelog事件流功能,则需开启MongoDB数据库的前像后像(Pre- and Post-images)记录功能,详情请参见Document Preimages关于MongoDB的变更前后像记录功能

    • 如果启用了MongoDB的鉴权功能,则需要使用具有以下数据库权限的MongoDB用户:

      • splitVector权限

      • listDatabases权限

      • listCollections权限

      • collStats权限

      • find权限

      • changeStream权限

      • config.collections和config.chunks集合的访问权限

  • 维表和结果表

    • 已创建MongoDB数据库和表

    • 已设置IP白名单

使用限制

  • 仅支持读写3.6及以上版本的MongoDB。

  • CDC源表

    • 实时计算引擎VVR 8.0.1及以上版本支持使用MongoDB CDC连接器。

    • MongoDB 6.0及以上版本支持产生Full Changelog事件流。

    • MongoDB 4.0及以上版本支持指定时间戳的启动模式。

    • MongoDB 4.0及以上版本支持初始快照阶段并行读取。如果您需要启用并行模式进行初始快照,则需要将scan.incremental.snapshot.enabled配置项设置为true。

    • 由于MongoDB Change Stream流订阅限制,不支持读取admin、local、config数据库及system集合中的数据,详情请参见MongoDB文档

  • 结果表

    • 实时计算引擎VVR 8.0.5以下版本仅支持插入数据。

    • 实时计算引擎VVR 8.0.5及以上版本,结果表中声明主键时,支持插入、更新和删除数据,未声明主键时仅支持插入数据。

  • 维表

    • 实时计算引擎VVR 8.0.5及以上版本支持使用MongoDB维表。

SQL

语法结构

CREATE TABLE tableName(
  _id STRING,
  [columnName dataType,]*
  PRIMARY KEY(_id) NOT ENFORCED
) WITH (
  'connector' = 'mongodb',
  'hosts' = 'localhost:27017',
  'username' = 'mongouser',
  'password' = '${secret_values.password}',
  'database' = 'testdb',
  'collection' = 'testcoll'
)
说明

在创建CDC源表时,您必须声明_id STRING列,并将其作为唯一的主键。

WITH参数

通用

参数

说明

数据类型

是否必填

默认值

备注

connector

连接器名称。

String

  • 作为源表:

    • 实时计算引擎VVR 8.0.4及之前版本,填写为mongodb-cdc。

    • 实时计算引擎VVR 8.0.5及之后版本,填写为mongodb或mongodb-cdc。

  • 作为维表或结果表时,固定值为mongodb。

uri

MongoDB连接uri。

String

说明

参数urihosts必须指定其中之一。若指定uri,则无需指定schemehostsusernamepasswordconnector.options。当两者均指定时将使用uri进行连接。

hosts

MongoDB所在的主机名称。

String

可以使用英文逗号(,)分隔提供多个主机名。

scheme

MongoDB使用的连接协议。

String

mongodb

可选的取值包括:

  • mongodb:代表使用默认的MongoDB协议进行连接

  • mongodb+srv:代表使用DNS SRV记录协议进行连接

username

连接到MongoDB时使用的用户名。

String

开启身份验证功能时,必须配置该参数。

password

连接到MongoDB时使用的密码。

String

开启身份验证功能时,必须配置该参数。

重要

为了避免您的密码信息泄露,建议您使用变量的方式填写密码取值,详情请参见项目变量

database

MongoDB数据库名称。

String

  • 作为源表时,数据库名称支持正则表达式匹配。

  • 不配置该参数代表监控全部数据库。

重要

不支持监控admin、local、config数据库中的数据。

collection

MongoDB集合名称。

String

  • 作为源表时,集合名称支持正则表达式匹配。

    重要

    如果您要监控的集合名称中包含正则表达式特殊字符,则必须提供完全限定的名字空间(数据库名称.集合名称),否则无法捕获对应集合的变更。

  • 不配置该参数代表监控全部集合。

重要

不支持监控system集合中的数据。

connection.options

MongoDB侧的连接参数。

String

使用&分隔的key=value式额外连接参数。例如connectTimeoutMS=12000&socketTimeoutMS=13000。

源表独有

参数

说明

数据类型

是否必填

默认值

备注

scan.startup.mode

MongoDB CDC的启动模式。

String

initial

参数取值如下:

  • initial:从初始位点开始拉取全部数据。

  • latest-offset:从当前位点开始拉取变更数据。

  • timestamp:从指定的时间戳开始拉取变更数据。

详情请参见Startup Properties

scan.startup.timestamp-millis

指定位点消费的起始时间戳。

Long

取决于 scan.startup.mode的取值

  • initial:否

  • latest-offset:否

  • timestamp:是

参数格式为自Linux Epoch时间戳以来的毫秒数。

仅适用于timestamp启动模式。

initial.snapshotting.queue.size

进行初始快照时的队列大小限制。

Integer

10240

仅在scan.startup.mode选项设置为initial 时生效。

batch.size

游标的批处理大小。

Integer

1024

无。

poll.max.batch.size

同一批处理的最多变更文档数量。

Integer

1024

此参数控制流处理时一次拉取最多变更文档的个数。取值越大,连接器内部分配的缓冲区越大。

poll.await.time.ms

两次拉取数据之间的时间间隔。

Integer

1000

单位为毫秒。

heartbeat.interval.ms

发送心跳包的时间间隔。

Integer

0

单位为毫秒。

MongoDB CDC连接器主动向数据库发送心跳包来保证回溯状态最新。设置为0代表永不发送心跳包。

重要

对于更新不频繁的集合,强烈建议设定此选项。

scan.incremental.snapshot.enabled

是否启用并行模式进行初始快照。

Boolean

false

实验性功能。

scan.incremental.snapshot.chunk.size.mb

并行模式读取快照时的分片大小。

Integer

64

实验性功能。

单位为MB。

仅在启用并行快照时生效。

scan.full-changelog

产生完整的Full Changelog事件流。

Boolean

false

实验性功能。

说明

MongoDB数据库需要为6.0及以上版本,并且已开启前像后像功能,开启方法请参见Document Preimages

scan.flatten-nested-columns.enabled

是否将以.分隔的字段名解析为嵌套BSON文档读取。

Boolean

false

若开启,在如下示例的BSON文档中,col字段在schema中名称为nested.col

{"nested":{"col":true}}
说明

仅VVR 8.0.5及以上版本支持该参数。

scan.primitive-as-string

是否将BSON文档中的原始类型都解析为字符串类型。

Boolean

false

说明

仅VVR 8.0.5及以上版本支持该参数。

scan.ignore-delete.enabled

是否忽略delete(-D)类型的消息。

Boolean

false

参数取值如下:

  • true:忽略删除事件。

  • false:不忽略删除事件。

说明

仅实时计算引擎VVR 11.1及以上版本支持该参数。

scan.incremental.snapshot.backfill.skip

是否跳过增量快照算法的回填水位过程。

Boolean

false

启用此开关只能提供at-least-once语义。

说明

仅VVR 11.1及以上版本支持该参数。

initial.snapshotting.pipeline

MongoDB 管道操作,在快照读取阶段,会把该操作下推到 MongoDB,只筛选所需的数据,从而提高读取效率。

String

无。

  • 以JSON 对象数组格式表示,例如: [{"$match": {"closed": "false"}}] 表示只复制 closed 字段为 "false" 的文档。

  • 该选项仅在 scan.startup.mode 选项设置为 initial 时生效,且仅限于在 Debezium 模式下使用,不能用于增量快照模式,否则会出现语义不一致的问题。

    说明

    仅VVR 11.1及以上版本支持该参数。

initial.snapshotting.max.threads

执行数据复制时使用的线程数。

Integer

无。

仅在 scan.startup.mode 选项设置为 initial 时生效。

说明

仅VVR 11.1及以上版本支持该参数。

initial.snapshotting.queue.size

进行初始快照时的队列大小。

Integer

16000

仅在 scan.startup.mode 选项设置为 initial 时生效。

说明

仅VVR 11.1及以上版本支持该参数。

维表独有

参数

说明

数据类型

是否必填

默认值

备注

lookup.cache

Cache策略。

String

NONE

目前支持以下两种缓存策略:

  • None:无缓存。

  • Partial:只在外部数据库中查找数据时缓存。

lookup.max-retries

查询数据库失败的最大重试次数。

Integer

3

无。

lookup.retry.interval

如果查询数据库失败,重试的时间间隔。

Duration

1s

无。

lookup.partial-cache.expire-after-access

缓存中的记录最长保留时间。

Duration

支持时间单位ms、s、min、h和d。

使用该配置时 lookup.cache 必须设置为 PARTIAL

lookup.partial-cache.expire-after-write

在记录写入缓存后该记录的最大保留时间。

Duration

使用该配置时 lookup.cache 必须设置为 PARTIAL

lookup.partial-cache.max-rows

缓存的最大条数。超过该值,最旧的行将过期。

Long

使用该配置时 lookup.cache 必须设置为 PARTIAL

lookup.partial-cache.cache-missing-key

在物理表中未关联到数据时,是否缓存空记录。

Boolean

True

使用该配置时 lookup.cache 必须设置为 PARTIAL

结果表独有

参数

说明

数据类型

是否必填

默认值

备注

sink.buffer-flush.max-rows

每次按批写入数据时的最大记录数。

Integer

1000

无。

sink.buffer-flush.interval

写入数据的刷新间隔。

Duration

1s

无。

sink.delivery-guarantee

写入数据时的语义保证。

String

at-least-once

可选的取值包括:

  • none

  • at-least-once

说明

目前不支持exactly-once。

sink.max-retries

写入数据库失败时的最大重试次数。

Integer

3

无。

sink.retry.interval

写入数据库失败时的重试时间间隔。

Duration

1s

无。

sink.parallelism

自定义sink并行度。

Integer

无。

类型映射

CDC源表

BSON类型

Flink SQL类型

Int32

INT

Int64

BIGINT

Double

DOUBLE

Decimal128

DECIMAL(p, s)

Boolean

BOOLEAN

Date Timestamp

DATE

Date Timestamp

TIME

DateTime

TIMESTAMP(3)

TIMESTAMP_LTZ(3)

Timestamp

TIMESTAMP(0)

TIMESTAMP_LTZ(0)

String

ObjectId

UUID

Symbol

MD5

JavaScript

Regex

STRING

Binary

BYTES

Object

ROW

Array

ARRAY

DBPointer

ROW<$ref STRING, $id STRING>

GeoJSON

Point: ROW<type STRING, coordinates ARRAY<DOUBLE>>

Line: ROW<type STRING, coordinates ARRAY<ARRAY< DOUBLE>>>

维表和结果表

BSON类型

Flink SQL类型

Int32

INT

Int64

BIGINT

Double

DOUBLE

Decimal128

DECIMAL

Boolean

BOOLEAN

DateTime

TIMESTAMP_LTZ(3)

Timestamp

TIMESTAMP_LTZ(0)

String

ObjectId

STRING

Binary

BYTES

Object

ROW

Array

ARRAY

使用示例

CDC源表

CREATE TEMPORARY TABLE mongo_source (
  `_id` STRING, --must be declared
  name STRING,
  weight DECIMAL,
  tags ARRAY<STRING>,
  price ROW<amount DECIMAL, currency STRING>,
  suppliers ARRAY<ROW<name STRING, address STRING>>,
  db_name STRING METADATA FROM 'database_name' VIRTUAL,
  collection_name STRING METADATA VIRTUAL,
  op_ts TIMESTAMP_LTZ(3) METADATA VIRTUAL,
  PRIMARY KEY(_id) NOT ENFORCED
) WITH (
  'connector' = 'mongodb',
  'hosts' = 'dds-bp169b982fc25****.mongodb.rds.aliyuncs.com:3717,dds-bp169b982fc25****.mongodb.rds.aliyuncs.com:3717,',
  'username' = 'root',
  'password' = '${secret_values.password}',
  'database' = 'flinktest',
  'collection' = 'flinkcollection',
  'scan.incremental.snapshot.enabled' = 'true',
  'scan.full-changelog' = 'true'
);
CREATE TEMPORARY TABLE  productssink (
  name STRING,
  weight DECIMAL,
  tags ARRAY<STRING>,
  price_amount DECIMAL,
  suppliers_name STRING,
  db_name STRING,
  collection_name STRING,
  op_ts TIMESTAMP_LTZ(3)
) WITH (
  'connector' = 'print',
  'logger' = 'true'
);
INSERT INTO productssink  
SELECT
  name,
  weight,
  tags,
  price.amount,
  suppliers[1].name,
  db_name,
  collection_name,
  op_ts
FROM
  mongo_source;

维表

CREATE TEMPORARY TABLE datagen_source (
  id STRING,
  a int,
  b BIGINT,
  `proctime` AS PROCTIME()
) WITH (
  'connector' = 'datagen'
);
CREATE TEMPORARY TABLE mongo_dim (
  `_id` STRING,
  name STRING,
  weight DECIMAL,
  tags ARRAY<STRING>,
  price ROW<amount DECIMAL, currency STRING>,
  suppliers ARRAY<ROW<name STRING, address STRING>>,
  PRIMARY KEY(_id) NOT ENFORCED
) WITH (
  'connector' = 'mongodb',
  'hosts' = 'dds-bp169b982fc25****.mongodb.rds.aliyuncs.com:3717,dds-bp169b982fc25****.mongodb.rds.aliyuncs.com:3717,',
  'username' = 'root',
  'password' = '${secret_values.password}',
  'database' = 'flinktest',
  'collection' = 'flinkcollection',
  'lookup.cache' = 'PARTIAL',
  'lookup.partial-cache.expire-after-access' = '10min',
  'lookup.partial-cache.expire-after-write' = '10min',
  'lookup.partial-cache.max-rows' = '100'
);
CREATE TEMPORARY TABLE print_sink (
  name STRING,
  weight DECIMAL,
  tags ARRAY<STRING>,
  price_amount DECIMAL,
  suppliers_name STRING
) WITH (
  'connector' = 'print',
  'logger' = 'true'
);
INSERT INTO print_sink
SELECT
  T.id,
  T.a,
  T.b,
  H.name
FROM
  datagen_source AS T JOIN mongo_dim FOR SYSTEM_TIME AS OF T.`proctime` AS H ON T.id = H._id;

结果表

CREATE TEMPORARY TABLE datagen_source (
  `_id` STRING,
  name STRING,
  weight DECIMAL,
  tags ARRAY<STRING>,
  price ROW<amount DECIMAL, currency STRING>,
  suppliers ARRAY<ROW<name STRING, address STRING>>
) WITH (
  'connector' = 'datagen'
);
CREATE TEMPORARY TABLE mongo_sink (
  `_id` STRING,
  name STRING,
  weight DECIMAL,
  tags ARRAY<STRING>,
  price ROW<amount DECIMAL, currency STRING>,
  suppliers ARRAY<ROW<name STRING, address STRING>>,
  PRIMARY KEY(_id) NOT ENFORCED
) WITH (
  'connector' = 'mongodb',
  'hosts' = 'dds-bp169b982fc25****.mongodb.rds.aliyuncs.com:3717,dds-bp169b982fc25****.mongodb.rds.aliyuncs.com:3717,',
  'username' = 'root',
  'password' = '${secret_values.password}',
  'database' = 'flinktest',
  'collection' = 'flinkcollection'
);
INSERT INTO mongo_sink
SELECT * FROM datagen_source;

元数据

MongoDB CDC源表支持元数据列语法,您可以通过元数据列访问以下元数据。

元数据key

元数据类型

描述

database_name

STRING NOT NULL

包含该文档的数据库名。

collection_name

STRING NOT NULL

包含该文档的集合名。

op_ts

TIMESTAMP_LTZ(3) NOT NULL

该文档在数据库中的变更时间,如果该文档来自表的存量历史数据而不是从ChangeStream中获取,则该值总是0。

row_kind

STRING NOT NULL

表示数据变更类型,取值如下:

  • +I:INSERT

  • -D:DELETE

  • -U:UPDATE_BEFORE

  • +U:UPDATE_AFTER

说明

仅VVR 11.1及以上版本支持使用。

关于MongoDB的变更前后像记录功能

MongoDB 6.0 之前的版本默认不会提供变更前文档及被删除文档的数据,在未开启变更前后像记录功能时,利用已有信息只能实现 Upsert 语义(即缺失了 Update Before 数据条目)。但在 Flink 中许多有用的算子操作都依赖完整的 Insert、Update Before、Update After、Delete 变更流。

为了补充缺失的变更前事件,目前 Flink SQL Planner 会自动为 Upsert 类型的数据源生成一个 ChangelogNormalize 节点,该节点会在 Flink 状态中缓存所有文档的当前版本快照,在遇到被更新或删除的文档时,查表即可得知变更前的状态,但该算子节点需要存储体积巨大的状态数据。

image.png

MongoDB 6.0版本支持开启数据库的前像后像(Pre- and Post-images)记录功能,详情可参考Document Preimages。开启该功能后,MongoDB会在每次变更发生时,在一个特殊的集合中记录文档变更前后的完整状态。此时在作业中启用scan.full-changelog配置项,MongoDB CDC会从变更文档记录中生成Update Before记录,从而支持产生完整事件流,消除了对ChangelogNormalize节点的依赖。

Mongo CDC DataStream API

重要

通过DataStream的方式读写数据时,则需要使用对应的DataStream连接器连接Flink,DataStream连接器设置方法请参见DataStream连接器使用方法

创建DataStream API程序并使用MongoDBSource。代码示例如下:

Java

MongoDBSource.builder()
  .hosts("mongo.example.com:27017")
  .username("mongouser")
  .password("mongopasswd")
  .databaseList("testdb")
  .collectionList("testcoll")
  .startupOptions(StartupOptions.initial())
  .deserializer(new JsonDebeziumDeserializationSchema())
  .build();

XML

Maven中央仓库已经放置了VVR MongoDB连接器,以供您在作业开发时直接使用。

<dependency>
    <groupId>com.alibaba.ververica</groupId>
    <artifactId>flink-connector-mongodb</artifactId>
    <version>${vvr.version}</version>
</dependency>
说明

在使用DataStream API时,若要启用增量快照功能,请在构造MongoDBSource数据源时,使用com.ververica.cdc.connectors.mongodb.source包中的MongoDBSource#builder();否则,使用com.ververica.cdc.connectors.mongodb中的MongoDBSource#builder()

在构造MongoDBSource时,可以配置以下参数:

参数

说明

hosts

需要连接的MongoDB数据库的主机名称。

username

MongoDB数据库服务的用户名。

说明

若MongoDB服务器未启用鉴权,则无需配置此参数。

password

MongoDB数据库服务的密码。

说明

若MongoDB服务器未启用鉴权,则无需配置此参数。

databaseList

需要监控的MongoDB数据库名称。

说明

数据库名称支持正则表达式以读取多个数据库的数据,您可以使用.*匹配所有数据库。

collectionList

需要监控的MongoDB集合名称。

说明

集合名称支持正则表达式以读取多个集合的数据,您可以使用.*匹配所有集合。

startupOptions

选择MongoDB CDC的启动模式。

合法的取值包括:

  • StartupOptions.initial()

    • 从初始位点开始拉取全部数据

  • StartupOptions.latest-offset()

    • 从当前位点开始拉取变更数据

  • StartupOptions.timestamp()

    • 从指定的时间戳开始拉取变更数据

详情请参见Startup Properties

deserializer

反序列化器,将SourceRecord类型记录反序列化到指定类型。参数取值如下:

  • MongoDBConnectorDeserializationSchema:将Upsert模式下产生的SourceRecord转成Flink Table API或SQL API内部数据结构RowData。

  • MongoDBConnectorFullChangelogDeserializationSchema:将Full Changelog模式下产生的SourceRecord转成Flink Table或SQL内部数据结构RowData。

  • JsonDebeziumDeserializationSchema:将SourceRecord转成JSON格式的String。