本文为您介绍如何使用MongoDB连接器。
背景信息
MongoDB是一个面向文档的非结构化数据库,能够简化应用程序的开发及扩展。MongoDB连接器支持的信息如下:
类别 | 详情 |
支持类型 | 源表、维表和结果表 |
运行模式 | 仅支持流模式 |
特有监控指标 |
说明 指标含义详情,请参见监控指标说明。 |
API 种类 | DataStream和SQL |
是否支持更新或删除结果表数据 | 是 |
特色功能
MongoDB的CDC源表,即MongoDB的流式源表,会先读取数据库的历史全量数据,并平滑切换到oplog读取上,保证不多读一条也不少读一条。即使发生故障,也能保证通过Exactly Once语义处理数据。MongoDB CDC支持通过Change Stream API高效地捕获MongoDB的数据库和集合中的文档变更,监控文档的插入、修改、替换、删除事件,并将其转换为Flink能够处理的Changelog数据流。作为源表,支持以下功能特性:
支持利用MongoDB 3.6新增的Change Stream API,更高效地监控变化。
精确一次处理:在作业任何阶段失败都能保证Exactly-once语义。
支持全增量一体化监测:支持快照阶段完成后自动切换为增量读取阶段。
支持初始快照阶段的并行读取,需要MongoDB >= 4.0。
支持多种启动模式:
initial模式:在第一次启动时对受监视的数据库表执行初始快照,并继续读取最新的oplog。
latest-offset模式:首次启动时,从不对受监视的数据库表执行快照, 连接器仅从oplog 的结尾处开始读取,这意味着连接器只能读取在连接器启动之后的数据更改。
timestamp:跳过快照阶段,从指定的时间戳开始读取oplog事件,需要MongoDB >= 4.0。
支持产生Full Changelog事件流,需要MongoDB >= 6.0,详情请参见关于MongoDB的变更前后像记录功能。
实时计算Flink VVR 8.0.6及以上版本支持通过CREATE TABLE AS(CTAS)语句或CREATE DATABASE AS(CDAS)语句将MongoDB的数据和Schema变更同步到下游表。使用时需开启MongoDB数据库的前像后像(Pre- and Post-images)记录功能,详情请参见关于MongoDB的变更前后像记录功能。
实时计算Flink VVR 8.0.9及以上版本扩展维表关联读取能力,支持读取内置ObjectId 类型的
_id
字段。
前提条件
CDC源表
CDC连接器支持通过副本集或分片集架构模式读取阿里云云数据库MongoDB版的数据,也支持读取自建MongoDB数据库的数据 。
使用MongoDB CDC连接器的基础功能时,必须开启待监控的MongoDB数据库的副本集(Replica Set)功能,详情请参见Replication。
如需使用Full Changelog事件流功能,则需开启MongoDB数据库的前像后像(Pre- and Post-images)记录功能,详情请参见Document Preimages和关于MongoDB的变更前后像记录功能。
如果启用了MongoDB的鉴权功能,则需要使用具有以下数据库权限的MongoDB用户:
splitVector权限
listDatabases权限
listCollections权限
collStats权限
find权限
changeStream权限
config.collections和config.chunks集合的访问权限
维表和结果表
已创建MongoDB数据库和表
已设置IP白名单
使用限制
仅支持读写3.6及以上版本的MongoDB。
CDC源表
实时计算引擎VVR 8.0.1及以上版本支持使用MongoDB CDC连接器。
MongoDB 6.0及以上版本支持产生Full Changelog事件流。
MongoDB 4.0及以上版本支持指定时间戳的启动模式。
MongoDB 4.0及以上版本支持初始快照阶段并行读取。如果您需要启用并行模式进行初始快照,则需要将
scan.incremental.snapshot.enabled
配置项设置为true。由于MongoDB Change Stream流订阅限制,不支持读取admin、local、config数据库及system集合中的数据,详情请参见MongoDB文档。
结果表
实时计算引擎VVR 8.0.5以下版本仅支持插入数据。
实时计算引擎VVR 8.0.5及以上版本,结果表中声明主键时,支持插入、更新和删除数据,未声明主键时仅支持插入数据。
维表
实时计算引擎VVR 8.0.5及以上版本支持使用MongoDB维表。
SQL
语法结构
CREATE TABLE tableName(
_id STRING,
[columnName dataType,]*
PRIMARY KEY(_id) NOT ENFORCED
) WITH (
'connector' = 'mongodb',
'hosts' = 'localhost:27017',
'username' = 'mongouser',
'password' = '${secret_values.password}',
'database' = 'testdb',
'collection' = 'testcoll'
)
在创建CDC源表时,您必须声明_id STRING
列,并将其作为唯一的主键。
WITH参数
通用
参数 | 说明 | 数据类型 | 是否必填 | 默认值 | 备注 |
connector | 连接器名称。 | String | 是 | 无 |
|
uri | MongoDB连接uri。 | String | 否 | 无 | 说明 参数 |
hosts | MongoDB所在的主机名称。 | String | 否 | 无 | 可以使用英文逗号( |
scheme | MongoDB使用的连接协议。 | String | 否 | mongodb | 可选的取值包括:
|
username | 连接到MongoDB时使用的用户名。 | String | 否 | 无 | 开启身份验证功能时,必须配置该参数。 |
password | 连接到MongoDB时使用的密码。 | String | 否 | 无 | 开启身份验证功能时,必须配置该参数。 重要 为了避免您的密码信息泄露,建议您使用变量的方式填写密码取值,详情请参见项目变量。 |
database | MongoDB数据库名称。 | String | 否 | 无 |
重要 不支持监控admin、local、config数据库中的数据。 |
collection | MongoDB集合名称。 | String | 否 | 无 |
重要 不支持监控system集合中的数据。 |
connection.options | MongoDB侧的连接参数。 | String | 否 | 无 | 使用 |
源表独有
参数 | 说明 | 数据类型 | 是否必填 | 默认值 | 备注 |
scan.startup.mode | MongoDB CDC的启动模式。 | String | 否 | initial | 参数取值如下:
详情请参见Startup Properties。 |
scan.startup.timestamp-millis | 指定位点消费的起始时间戳。 | Long | 取决于 scan.startup.mode的取值
| 无 | 参数格式为自Linux Epoch时间戳以来的毫秒数。 仅适用于 |
initial.snapshotting.queue.size | 进行初始快照时的队列大小限制。 | Integer | 否 | 10240 | 仅在 |
batch.size | 游标的批处理大小。 | Integer | 否 | 1024 | 无。 |
poll.max.batch.size | 同一批处理的最多变更文档数量。 | Integer | 否 | 1024 | 此参数控制流处理时一次拉取最多变更文档的个数。取值越大,连接器内部分配的缓冲区越大。 |
poll.await.time.ms | 两次拉取数据之间的时间间隔。 | Integer | 否 | 1000 | 单位为毫秒。 |
heartbeat.interval.ms | 发送心跳包的时间间隔。 | Integer | 否 | 0 | 单位为毫秒。 MongoDB CDC连接器主动向数据库发送心跳包来保证回溯状态最新。设置为0代表永不发送心跳包。 重要 对于更新不频繁的集合,强烈建议设定此选项。 |
scan.incremental.snapshot.enabled | 是否启用并行模式进行初始快照。 | Boolean | 否 | false | 实验性功能。 |
scan.incremental.snapshot.chunk.size.mb | 并行模式读取快照时的分片大小。 | Integer | 否 | 64 | 实验性功能。 单位为MB。 仅在启用并行快照时生效。 |
scan.full-changelog | 产生完整的Full Changelog事件流。 | Boolean | 否 | false | 实验性功能。 说明 MongoDB数据库需要为6.0及以上版本,并且已开启前像后像功能,开启方法请参见Document Preimages。 |
scan.flatten-nested-columns.enabled | 是否将以 | Boolean | 否 | false | 若开启,在如下示例的BSON文档中,
说明 仅VVR 8.0.5及以上版本支持该参数。 |
scan.primitive-as-string | 是否将BSON文档中的原始类型都解析为字符串类型。 | Boolean | 否 | false | 说明 仅VVR 8.0.5及以上版本支持该参数。 |
scan.ignore-delete.enabled | 是否忽略delete(-D)类型的消息。 | Boolean | 否 | false | 参数取值如下:
说明 仅实时计算引擎VVR 11.1及以上版本支持该参数。 |
scan.incremental.snapshot.backfill.skip | 是否跳过增量快照算法的回填水位过程。 | Boolean | 否 | false | 启用此开关只能提供at-least-once语义。 说明 仅VVR 11.1及以上版本支持该参数。 |
initial.snapshotting.pipeline | MongoDB 管道操作,在快照读取阶段,会把该操作下推到 MongoDB,只筛选所需的数据,从而提高读取效率。 | String | 否 | 无。 |
|
initial.snapshotting.max.threads | 执行数据复制时使用的线程数。 | Integer | 否 | 无。 | 仅在 scan.startup.mode 选项设置为 initial 时生效。 说明 仅VVR 11.1及以上版本支持该参数。 |
initial.snapshotting.queue.size | 进行初始快照时的队列大小。 | Integer | 否 | 16000 | 仅在 scan.startup.mode 选项设置为 initial 时生效。 说明 仅VVR 11.1及以上版本支持该参数。 |
维表独有
参数 | 说明 | 数据类型 | 是否必填 | 默认值 | 备注 |
lookup.cache | Cache策略。 | String | 否 | NONE | 目前支持以下两种缓存策略:
|
lookup.max-retries | 查询数据库失败的最大重试次数。 | Integer | 否 | 3 | 无。 |
lookup.retry.interval | 如果查询数据库失败,重试的时间间隔。 | Duration | 否 | 1s | 无。 |
lookup.partial-cache.expire-after-access | 缓存中的记录最长保留时间。 | Duration | 否 | 无 | 支持时间单位ms、s、min、h和d。 使用该配置时 |
lookup.partial-cache.expire-after-write | 在记录写入缓存后该记录的最大保留时间。 | Duration | 否 | 无 | 使用该配置时 |
lookup.partial-cache.max-rows | 缓存的最大条数。超过该值,最旧的行将过期。 | Long | 否 | 无 | 使用该配置时 |
lookup.partial-cache.cache-missing-key | 在物理表中未关联到数据时,是否缓存空记录。 | Boolean | 否 | True | 使用该配置时 |
结果表独有
参数 | 说明 | 数据类型 | 是否必填 | 默认值 | 备注 |
sink.buffer-flush.max-rows | 每次按批写入数据时的最大记录数。 | Integer | 否 | 1000 | 无。 |
sink.buffer-flush.interval | 写入数据的刷新间隔。 | Duration | 否 | 1s | 无。 |
sink.delivery-guarantee | 写入数据时的语义保证。 | String | 否 | at-least-once | 可选的取值包括:
说明 目前不支持exactly-once。 |
sink.max-retries | 写入数据库失败时的最大重试次数。 | Integer | 否 | 3 | 无。 |
sink.retry.interval | 写入数据库失败时的重试时间间隔。 | Duration | 否 | 1s | 无。 |
sink.parallelism | 自定义sink并行度。 | Integer | 否 | 空 | 无。 |
类型映射
CDC源表
BSON类型 | Flink SQL类型 |
Int32 | INT |
Int64 | BIGINT |
Double | DOUBLE |
Decimal128 | DECIMAL(p, s) |
Boolean | BOOLEAN |
Date Timestamp | DATE |
Date Timestamp | TIME |
DateTime | TIMESTAMP(3) TIMESTAMP_LTZ(3) |
Timestamp | TIMESTAMP(0) TIMESTAMP_LTZ(0) |
String ObjectId UUID Symbol MD5 JavaScript Regex | STRING |
Binary | BYTES |
Object | ROW |
Array | ARRAY |
DBPointer | ROW<$ref STRING, $id STRING> |
GeoJSON | Point: ROW<type STRING, coordinates ARRAY<DOUBLE>> Line: ROW<type STRING, coordinates ARRAY<ARRAY< DOUBLE>>> |
维表和结果表
BSON类型 | Flink SQL类型 |
Int32 | INT |
Int64 | BIGINT |
Double | DOUBLE |
Decimal128 | DECIMAL |
Boolean | BOOLEAN |
DateTime | TIMESTAMP_LTZ(3) |
Timestamp | TIMESTAMP_LTZ(0) |
String ObjectId | STRING |
Binary | BYTES |
Object | ROW |
Array | ARRAY |
使用示例
CDC源表
CREATE TEMPORARY TABLE mongo_source (
`_id` STRING, --must be declared
name STRING,
weight DECIMAL,
tags ARRAY<STRING>,
price ROW<amount DECIMAL, currency STRING>,
suppliers ARRAY<ROW<name STRING, address STRING>>,
db_name STRING METADATA FROM 'database_name' VIRTUAL,
collection_name STRING METADATA VIRTUAL,
op_ts TIMESTAMP_LTZ(3) METADATA VIRTUAL,
PRIMARY KEY(_id) NOT ENFORCED
) WITH (
'connector' = 'mongodb',
'hosts' = 'dds-bp169b982fc25****.mongodb.rds.aliyuncs.com:3717,dds-bp169b982fc25****.mongodb.rds.aliyuncs.com:3717,',
'username' = 'root',
'password' = '${secret_values.password}',
'database' = 'flinktest',
'collection' = 'flinkcollection',
'scan.incremental.snapshot.enabled' = 'true',
'scan.full-changelog' = 'true'
);
CREATE TEMPORARY TABLE productssink (
name STRING,
weight DECIMAL,
tags ARRAY<STRING>,
price_amount DECIMAL,
suppliers_name STRING,
db_name STRING,
collection_name STRING,
op_ts TIMESTAMP_LTZ(3)
) WITH (
'connector' = 'print',
'logger' = 'true'
);
INSERT INTO productssink
SELECT
name,
weight,
tags,
price.amount,
suppliers[1].name,
db_name,
collection_name,
op_ts
FROM
mongo_source;
维表
CREATE TEMPORARY TABLE datagen_source (
id STRING,
a int,
b BIGINT,
`proctime` AS PROCTIME()
) WITH (
'connector' = 'datagen'
);
CREATE TEMPORARY TABLE mongo_dim (
`_id` STRING,
name STRING,
weight DECIMAL,
tags ARRAY<STRING>,
price ROW<amount DECIMAL, currency STRING>,
suppliers ARRAY<ROW<name STRING, address STRING>>,
PRIMARY KEY(_id) NOT ENFORCED
) WITH (
'connector' = 'mongodb',
'hosts' = 'dds-bp169b982fc25****.mongodb.rds.aliyuncs.com:3717,dds-bp169b982fc25****.mongodb.rds.aliyuncs.com:3717,',
'username' = 'root',
'password' = '${secret_values.password}',
'database' = 'flinktest',
'collection' = 'flinkcollection',
'lookup.cache' = 'PARTIAL',
'lookup.partial-cache.expire-after-access' = '10min',
'lookup.partial-cache.expire-after-write' = '10min',
'lookup.partial-cache.max-rows' = '100'
);
CREATE TEMPORARY TABLE print_sink (
name STRING,
weight DECIMAL,
tags ARRAY<STRING>,
price_amount DECIMAL,
suppliers_name STRING
) WITH (
'connector' = 'print',
'logger' = 'true'
);
INSERT INTO print_sink
SELECT
T.id,
T.a,
T.b,
H.name
FROM
datagen_source AS T JOIN mongo_dim FOR SYSTEM_TIME AS OF T.`proctime` AS H ON T.id = H._id;
结果表
CREATE TEMPORARY TABLE datagen_source (
`_id` STRING,
name STRING,
weight DECIMAL,
tags ARRAY<STRING>,
price ROW<amount DECIMAL, currency STRING>,
suppliers ARRAY<ROW<name STRING, address STRING>>
) WITH (
'connector' = 'datagen'
);
CREATE TEMPORARY TABLE mongo_sink (
`_id` STRING,
name STRING,
weight DECIMAL,
tags ARRAY<STRING>,
price ROW<amount DECIMAL, currency STRING>,
suppliers ARRAY<ROW<name STRING, address STRING>>,
PRIMARY KEY(_id) NOT ENFORCED
) WITH (
'connector' = 'mongodb',
'hosts' = 'dds-bp169b982fc25****.mongodb.rds.aliyuncs.com:3717,dds-bp169b982fc25****.mongodb.rds.aliyuncs.com:3717,',
'username' = 'root',
'password' = '${secret_values.password}',
'database' = 'flinktest',
'collection' = 'flinkcollection'
);
INSERT INTO mongo_sink
SELECT * FROM datagen_source;
元数据
MongoDB CDC源表支持元数据列语法,您可以通过元数据列访问以下元数据。
元数据key | 元数据类型 | 描述 |
database_name | STRING NOT NULL | 包含该文档的数据库名。 |
collection_name | STRING NOT NULL | 包含该文档的集合名。 |
op_ts | TIMESTAMP_LTZ(3) NOT NULL | 该文档在数据库中的变更时间,如果该文档来自表的存量历史数据而不是从ChangeStream中获取,则该值总是0。 |
row_kind | STRING NOT NULL | 表示数据变更类型,取值如下:
说明 仅VVR 11.1及以上版本支持使用。 |
关于MongoDB的变更前后像记录功能
MongoDB 6.0 之前的版本默认不会提供变更前文档及被删除文档的数据,在未开启变更前后像记录功能时,利用已有信息只能实现 Upsert 语义(即缺失了 Update Before 数据条目)。但在 Flink 中许多有用的算子操作都依赖完整的 Insert、Update Before、Update After、Delete 变更流。
为了补充缺失的变更前事件,目前 Flink SQL Planner 会自动为 Upsert 类型的数据源生成一个 ChangelogNormalize 节点,该节点会在 Flink 状态中缓存所有文档的当前版本快照,在遇到被更新或删除的文档时,查表即可得知变更前的状态,但该算子节点需要存储体积巨大的状态数据。
MongoDB 6.0版本支持开启数据库的前像后像(Pre- and Post-images)记录功能,详情可参考Document Preimages。开启该功能后,MongoDB会在每次变更发生时,在一个特殊的集合中记录文档变更前后的完整状态。此时在作业中启用scan.full-changelog
配置项,MongoDB CDC会从变更文档记录中生成Update Before记录,从而支持产生完整事件流,消除了对ChangelogNormalize节点的依赖。
Mongo CDC DataStream API
通过DataStream的方式读写数据时,则需要使用对应的DataStream连接器连接Flink,DataStream连接器设置方法请参见DataStream连接器使用方法。
创建DataStream API程序并使用MongoDBSource。代码示例如下:
Java
MongoDBSource.builder()
.hosts("mongo.example.com:27017")
.username("mongouser")
.password("mongopasswd")
.databaseList("testdb")
.collectionList("testcoll")
.startupOptions(StartupOptions.initial())
.deserializer(new JsonDebeziumDeserializationSchema())
.build();
XML
Maven中央仓库已经放置了VVR MongoDB连接器,以供您在作业开发时直接使用。
<dependency>
<groupId>com.alibaba.ververica</groupId>
<artifactId>flink-connector-mongodb</artifactId>
<version>${vvr.version}</version>
</dependency>
在使用DataStream API时,若要启用增量快照功能,请在构造MongoDBSource数据源时,使用com.ververica.cdc.connectors.mongodb.source
包中的MongoDBSource#builder()
;否则,使用com.ververica.cdc.connectors.mongodb
中的MongoDBSource#builder()
。
在构造MongoDBSource时,可以配置以下参数:
参数 | 说明 |
hosts | 需要连接的MongoDB数据库的主机名称。 |
username | MongoDB数据库服务的用户名。 说明 若MongoDB服务器未启用鉴权,则无需配置此参数。 |
password | MongoDB数据库服务的密码。 说明 若MongoDB服务器未启用鉴权,则无需配置此参数。 |
databaseList | 需要监控的MongoDB数据库名称。 说明 数据库名称支持正则表达式以读取多个数据库的数据,您可以使用 |
collectionList | 需要监控的MongoDB集合名称。 说明 集合名称支持正则表达式以读取多个集合的数据,您可以使用 |
startupOptions | 选择MongoDB CDC的启动模式。 合法的取值包括:
详情请参见Startup Properties。 |
deserializer | 反序列化器,将SourceRecord类型记录反序列化到指定类型。参数取值如下:
|