This topic describes how to use the MongoDB connector.
Background information
MongoDB is a document-oriented database for unstructured data. It simplifies application development and scaling. The MongoDB connector supports the following features:
Category | Details |
Supported types | Source table, dimension table, sink table, and data ingestion |
Runtime mode | Only stream mode is supported. |
Unique monitoring metrics | |
API types | DataStream, SQL, and data ingestion YAML |
Support for updates or deletions in sink tables | Yes |
Features
The MongoDB Change Data Capture (CDC) source table uses the Change Stream API to capture both full and incremental data. It first reads a full snapshot of historical data and then seamlessly switches to reading the incremental oplog. This process ensures that data is neither duplicated nor lost. The connector also supports Exactly-Once semantics to guarantee data consistency during fault recovery.
Based on the Change Stream API
Uses the Change Stream API from MongoDB 3.6 to efficiently capture change events such as inserts, updates, replacements, and deletions from a database or collection. It converts these events into a changelog stream that Flink can process.
Full and incremental integration
Automatically reads the initial snapshot and smoothly transitions to incremental mode without manual intervention.
Parallel snapshot reading
Supports parallel reading of historical data to improve performance. This requires MongoDB 4.0 or later.
Multiple startup modes
initial: Performs a full snapshot on the first startup and then continuously reads the oplog.latest-offset: Starts from the end of the current oplog without reading historical data.timestamp: Starts reading the oplog from a specified timestamp and skips the snapshot. This requires MongoDB 4.0 or later.
Full changelog support
Supports outputting a complete changelog that includes both before-images and after-images (for MongoDB 6.0 or later with the before-image/after-image recording feature enabled).
Flink integration enhancements
VVR 8.0.6 and later
Supports using CREATE TABLE AS (CTAS) statements or CREATE DATABASE AS (CDAS) statements to synchronize data and schema changes from MongoDB to downstream systems and to enable the before-image/after-image recording feature.
VVR 8.0.9 and later
Extends the dimension table join capability to support reading the built-in
_idfield of the ObjectId type.
Prerequisites
MongoDB instance requirements
Only Alibaba Cloud MongoDB 3.6 or later (ReplicaSet or sharded cluster) and self-managed MongoDB 3.6 or later are supported.
The ReplicaSet feature must be enabled for the MongoDB database that you want to monitor. For more information, see Replication.
MongoDB feature dependencies
To use the Full Changelog event stream feature, you must enable the before-image/after-image recording feature.
If authentication is enabled for MongoDB, you need the following database permissions.
MongoDB network and other preparations
An IP address whitelist is configured to allow Flink to access MongoDB.
The target MongoDB collections and data have been created.
Limits
CDC source table
MongoDB 4.0 and later support parallel reading during the initial snapshot phase. To enable parallel mode for the initial snapshot, set the
scan.incremental.snapshot.enabledparameter to true.Due to MongoDB Change Stream subscription limits, you cannot read data from the admin, local, or config databases, or from system collections. For more information, see the MongoDB documentation.
Sink table
Ververica Runtime (VVR) versions earlier than 8.0.5 only support inserting data.
In VVR 8.0.5 and later, if a primary key is declared in the sink table, you can insert, update, and delete data. If no primary key is declared, you can only insert data.
Dimension table
VVR 8.0.5 and later support using MongoDB dimension tables.
SQL
Syntax
CREATE TABLE tableName(
_id STRING,
[columnName dataType,]*
PRIMARY KEY(_id) NOT ENFORCED
) WITH (
'connector' = 'mongodb',
'hosts' = 'localhost:27017',
'username' = 'mongouser',
'password' = '${secret_values.password}',
'database' = 'testdb',
'collection' = 'testcoll'
)When you create a CDC source table, you must declare the _id STRING column and set it as the unique primary key.
WITH parameters
General parameters
Parameter | Description | Data type | Required | Default value | Notes |
connector | The name of the connector. | String | Yes | None |
|
uri | The MongoDB connection URI. | String | No | None | Note You must specify either the |
hosts | The hostname of the MongoDB instance. | String | No | None | You can provide multiple hostnames separated by commas ( |
scheme | The connection protocol used by MongoDB. | String | No | mongodb | Valid values:
|
username | The username for connecting to MongoDB. | String | No | None | This parameter is required if identity verification is enabled. |
password | The password for connecting to MongoDB. | String | No | None | This parameter is required if identity verification is enabled. Important To prevent your password from being leaked, we recommend that you use a variable to set the password value. For more information, see Project Variables. |
database | The name of the MongoDB database. | String | No | None |
Important Monitoring data in the admin, local, or config databases is not supported. |
collection | The name of the MongoDB collection. | String | No | None |
Important Monitoring data in system collections is not supported. |
connection.options | Connection parameters for the MongoDB side. | String | No | None | Additional connection parameters in the Important By default, MongoDB CDC does not automatically set a socket connection timeout. This can cause long interruptions during network jitter. Set socketTimeoutMS to a reasonable value to avoid this issue. |
Source table only
Parameter | Description | Data type | Required | Default value | Notes |
scan.startup.mode | The startup mode for MongoDB CDC. | String | No | initial | Valid values:
For more information, see Startup Properties. |
scan.startup.timestamp-millis | The starting timestamp for consumption at a specific offset. | Long | Depends on the value of scan.startup.mode:
| None | The format is the number of milliseconds since the Linux epoch. This parameter applies only to the |
initial.snapshotting.queue.size | The queue size limit for the initial snapshot. | Integer | No | 10240 | This parameter takes effect only when the |
batch.size | The batch size for the cursor. | Integer | No | 1024 | None. |
poll.max.batch.size | The maximum number of change documents in a single batch. | Integer | No | 1024 | This parameter controls the maximum number of change documents pulled at one time during stream processing. A larger value results in a larger buffer allocated within the connector. |
poll.await.time.ms | The time interval between two data pulls. | Integer | No | 1000 | The unit is milliseconds. |
heartbeat.interval.ms | The time interval for sending heartbeats. | Integer | No | 0 | The unit is milliseconds. The MongoDB CDC connector actively sends heartbeats to the database to ensure the resume token is up to date. A value of 0 means heartbeats are never sent. Important For collections that are not frequently updated, we strongly recommend setting this option. |
scan.incremental.snapshot.enabled | Specifies whether to enable parallel mode for the initial snapshot. | Boolean | No | false | This is an experimental feature. |
scan.incremental.snapshot.chunk.size.mb | The chunk size for reading snapshots in parallel mode. | Integer | No | 64 | This is an experimental feature. The unit is MB. This parameter takes effect only when parallel snapshot is enabled. |
scan.full-changelog | Generates a complete full changelog event stream. | Boolean | No | false | This is an experimental feature. Note Requires MongoDB 6.0 or later with the pre-image and post-image feature enabled. For more information about how to enable this feature, see Document Preimages. |
scan.flatten-nested-columns.enabled | Specifies whether to parse field names separated by dots ( | Boolean | No | false | If enabled, in the following BSON document example, the Note This parameter is supported only in VVR 8.0.5 and later. |
scan.primitive-as-string | Specifies whether to parse all primitive types in a BSON document as strings. | Boolean | No | false | Note This parameter is supported only in VVR 8.0.5 and later. |
scan.ignore-delete.enabled | Specifies whether to ignore delete (-D) messages. | Boolean | No | false | When archiving data from a MongoDB source, many DELETE events may be generated in the OpLog. If you do not want to sync these events downstream, you can enable this parameter to ignore delete events. Note
|
scan.incremental.snapshot.backfill.skip | Specifies whether to skip the watermark backfill process of the incremental snapshot algorithm. | Boolean | No | false | Enabling this switch provides only at-least-once semantics. Note This parameter is supported only in VVR 11.1 and later. |
initial.snapshotting.pipeline | A MongoDB pipeline operation. During the snapshot reading phase, this operation is pushed down to MongoDB to filter only the required data, improving read efficiency. | String | No | None. |
|
initial.snapshotting.max.threads | The number of threads used for data replication. | Integer | No | None. | This parameter takes effect only when the scan.startup.mode option is set to initial. Note This parameter is supported only in VVR 11.1 and later. |
initial.snapshotting.queue.size | The queue size for the initial snapshot. | Integer | No | 16000 | This parameter takes effect only when the scan.startup.mode option is set to initial. Note This parameter is supported only in VVR 11.1 and later. |
scan.change-stream.reading.parallelism | The degree of parallelism for subscribing to the Change Stream. | Integer | No | 1 | This parameter takes effect only when the scan.incremental.snapshot.enabled parameter is enabled. Important To subscribe to the Change Stream with multiple concurrent threads, you must also set the heartbeat.interval.ms parameter. Note This parameter is supported only in VVR 11.2 and later. |
scan.change-stream.reading.queue-size | The message queue size for concurrent Change Stream subscriptions. | Integer | No | 16384 | This parameter is effective only when the scan.change-stream.reading.parallelism parameter is enabled. Note This parameter is supported only in VVR 11.2 and later. |
Dimension table only
Parameter | Description | Data type | Required | Default value | Notes |
lookup.cache | The cache policy. | String | No | NONE | The following cache policies are supported:
|
lookup.max-retries | The maximum number of retries if a database query fails. | Integer | No | 3 | None. |
lookup.retry.interval | The retry time interval if a database query fails. | Duration | No | 1s | None. |
lookup.partial-cache.expire-after-access | The maximum retention period for a record in the cache. | Duration | No | None | Supported time units are ms, s, min, h, and d. When using this configuration, |
lookup.partial-cache.expire-after-write | The maximum retention period for a record after it is written to the cache. | Duration | No | None | When using this configuration, |
lookup.partial-cache.max-rows | The maximum number of rows in the cache. If this value is exceeded, the oldest rows expire. | Long | No | None | When using this configuration, |
lookup.partial-cache.cache-missing-key | Specifies whether to cache empty records when no data is found in the physical table. | Boolean | No | True | When using this configuration, |
Sink table only
Parameter | Description | Data type | Required | Default value | Notes |
sink.buffer-flush.max-rows | The maximum number of records for each batch write. | Integer | No | 1000 | None. |
sink.buffer-flush.interval | The refresh interval for writing data. | Duration | No | 1s | None. |
sink.delivery-guarantee | The semantic guarantee for writing data. | String | No | at-least-once | Valid values:
Note Exactly-once is not currently supported. |
sink.max-retries | The maximum number of retries if writing to the database fails. | Integer | No | 3 | None. |
sink.retry.interval | The retry time interval if writing to the database fails. | Duration | No | 1s | None. |
sink.parallelism | A custom sink degree of parallelism. | Integer | No | Empty | None. |
sink.delete-strategy | Configures how to handle -D or -U data types. | String | No | CHANGELOG_STANDARD | Valid values:
|
Type mapping
CDC source table
BSON type | Flink SQL type |
Int32 | INT |
Int64 | BIGINT |
Double | DOUBLE |
Decimal128 | DECIMAL(p, s) |
Boolean | BOOLEAN |
Date Timestamp | DATE |
Date Timestamp | TIME |
DateTime | TIMESTAMP(3) TIMESTAMP_LTZ(3) |
Timestamp | TIMESTAMP(0) TIMESTAMP_LTZ(0) |
String ObjectId UUID Symbol MD5 JavaScript Regex | STRING |
Binary | BYTES |
Object | ROW |
Array | ARRAY |
DBPointer | ROW<$ref STRING, $id STRING> |
GeoJSON | Point: ROW<type STRING, coordinates ARRAY<DOUBLE>> Line: ROW<type STRING, coordinates ARRAY<ARRAY< DOUBLE>>> |
Dimension and sink tables
BSON type | Flink SQL type |
Int32 | INT |
Int64 | BIGINT |
Double | DOUBLE |
Decimal128 | DECIMAL |
Boolean | BOOLEAN |
DateTime | TIMESTAMP_LTZ(3) |
Timestamp | TIMESTAMP_LTZ(0) |
String ObjectId | STRING |
Binary | BYTES |
Object | ROW |
Array | ARRAY |
Examples
CDC source table
CREATE TEMPORARY TABLE mongo_source (
`_id` STRING, --must be declared
name STRING,
weight DECIMAL,
tags ARRAY<STRING>,
price ROW<amount DECIMAL, currency STRING>,
suppliers ARRAY<ROW<name STRING, address STRING>>,
db_name STRING METADATA FROM 'database_name' VIRTUAL,
collection_name STRING METADATA VIRTUAL,
op_ts TIMESTAMP_LTZ(3) METADATA VIRTUAL,
PRIMARY KEY(_id) NOT ENFORCED
) WITH (
'connector' = 'mongodb',
'hosts' = 'dds-bp169b982fc25****.mongodb.rds.aliyuncs.com:3717,dds-bp169b982fc25****.mongodb.rds.aliyuncs.com:3717,',
'username' = 'root',
'password' = '${secret_values.password}',
'database' = 'flinktest',
'collection' = 'flinkcollection',
'scan.incremental.snapshot.enabled' = 'true',
'scan.full-changelog' = 'true'
);
CREATE TEMPORARY TABLE productssink (
name STRING,
weight DECIMAL,
tags ARRAY<STRING>,
price_amount DECIMAL,
suppliers_name STRING,
db_name STRING,
collection_name STRING,
op_ts TIMESTAMP_LTZ(3)
) WITH (
'connector' = 'print',
'logger' = 'true'
);
INSERT INTO productssink
SELECT
name,
weight,
tags,
price.amount,
suppliers[1].name,
db_name,
collection_name,
op_ts
FROM
mongo_source;Dimension table
CREATE TEMPORARY TABLE datagen_source (
id STRING,
a int,
b BIGINT,
`proctime` AS PROCTIME()
) WITH (
'connector' = 'datagen'
);
CREATE TEMPORARY TABLE mongo_dim (
`_id` STRING,
name STRING,
weight DECIMAL,
tags ARRAY<STRING>,
price ROW<amount DECIMAL, currency STRING>,
suppliers ARRAY<ROW<name STRING, address STRING>>,
PRIMARY KEY(_id) NOT ENFORCED
) WITH (
'connector' = 'mongodb',
'hosts' = 'dds-bp169b982fc25****.mongodb.rds.aliyuncs.com:3717,dds-bp169b982fc25****.mongodb.rds.aliyuncs.com:3717,',
'username' = 'root',
'password' = '${secret_values.password}',
'database' = 'flinktest',
'collection' = 'flinkcollection',
'lookup.cache' = 'PARTIAL',
'lookup.partial-cache.expire-after-access' = '10min',
'lookup.partial-cache.expire-after-write' = '10min',
'lookup.partial-cache.max-rows' = '100'
);
CREATE TEMPORARY TABLE print_sink (
name STRING,
weight DECIMAL,
tags ARRAY<STRING>,
price_amount DECIMAL,
suppliers_name STRING
) WITH (
'connector' = 'print',
'logger' = 'true'
);
INSERT INTO print_sink
SELECT
T.id,
T.a,
T.b,
H.name
FROM
datagen_source AS T JOIN mongo_dim FOR SYSTEM_TIME AS OF T.`proctime` AS H ON T.id = H._id;Sink table
CREATE TEMPORARY TABLE datagen_source (
`_id` STRING,
name STRING,
weight DECIMAL,
tags ARRAY<STRING>,
price ROW<amount DECIMAL, currency STRING>,
suppliers ARRAY<ROW<name STRING, address STRING>>
) WITH (
'connector' = 'datagen'
);
CREATE TEMPORARY TABLE mongo_sink (
`_id` STRING,
name STRING,
weight DECIMAL,
tags ARRAY<STRING>,
price ROW<amount DECIMAL, currency STRING>,
suppliers ARRAY<ROW<name STRING, address STRING>>,
PRIMARY KEY(_id) NOT ENFORCED
) WITH (
'connector' = 'mongodb',
'hosts' = 'dds-bp169b982fc25****.mongodb.rds.aliyuncs.com:3717,dds-bp169b982fc25****.mongodb.rds.aliyuncs.com:3717,',
'username' = 'root',
'password' = '${secret_values.password}',
'database' = 'flinktest',
'collection' = 'flinkcollection'
);
INSERT INTO mongo_sink
SELECT * FROM datagen_source;Data ingestion
The MongoDB connector can be used as a data source in a data ingestion YAML job.
Limits
This feature is supported only in VVR 11.1 and later.
Syntax
source:
type: mongodb
name: MongoDB Source
hosts: localhost:33076
username: ${mongo.username}
password: ${mongo.password}
database: foo_db
collection: foo_col_.*
sink:
type: ...Configuration items
Parameter | Description | Required | Data type | Default value | Notes |
type | The data source type. | Yes | STRING | None | The value must be mongodb. |
scheme | The protocol for connecting to the MongoDB server. | No | STRING | mongodb | Valid values:
|
hosts | The server address for connecting to MongoDB. | Yes | STRING | None | You can specify multiple addresses separated by commas (,). |
username | The username for connecting to MongoDB. | No | STRING | None | None. |
password | The password for connecting to MongoDB. | No | STRING | None | None. |
database | The name of the MongoDB database to capture. | Yes | STRING | None | Supports regular expressions. |
collection | The name of the MongoDB collection to capture. | Yes | STRING | None | Supports regular expressions. You must match the full |
connection.options | Additional connection options to append when connecting to the MongoDB server. | No | STRING | None | Key-value pairs in |
schema.inference.strategy | The strategy for document type inference. Valid values are | No | STRING |
| If set to If set to |
scan.max.pre.fetch.records | The maximum number of records to sample from each captured collection during initial inference. | No | INT | 50 | None. |
scan.startup.mode | Specifies the startup mode for the MongoDB data source. Valid values are | No | STRING | initial | Valid values:
|
scan.startup.timestamp-millis | When the startup mode is set to | No | LONG | None | None. |
chunk-meta.group.size | Sets the metadata chunk size limit. | No | INT | 1000 | None. |
scan.incremental.close-idle-reader.enabled | Specifies whether to close idle Source Readers after switching to incremental mode. | No | BOOLEAN | false | None. |
scan.incremental.snapshot.backfill.skip | Specifies whether to skip the watermark backfill process of the incremental snapshot algorithm. | No | BOOLEAN | false | If the sink connector you are using can automatically remove duplicates based on a primary key, enabling this switch can reduce the time taken for the full-to-incremental transition. |
scan.incremental.snapshot.unbounded-chunk-first.enabled | Specifies whether to read unbounded chunks first when executing the incremental snapshot algorithm. | No | BOOLEAN | false | If the collection you are snapshotting is updated frequently, enabling this feature can reduce the likelihood of out of memory errors when reading unbounded chunks. |
batch.size | The cursor batch size for reading MongoDB data. | No | INT | 1024 | None. |
poll.max.batch.size | The maximum number of entries per request when pulling the Change Stream. | No | INT | 1024 | None. |
poll.await.time.ms | The minimum waiting time between requests when pulling the Change Stream. | No | INT | 1000 | The unit is milliseconds. |
heartbeat.interval.ms | The time interval for sending heartbeats. | No | INT | 0 | The unit is milliseconds. The MongoDB CDC connector actively sends heartbeats to the database to ensure the resume token is up to date. A value of 0 means heartbeats are never sent. Note For collections that are not frequently updated, we strongly recommend setting this option. |
scan.incremental.snapshot.chunk.size.mb | The chunk size during the snapshot phase. | No | INT | 64 | The unit is MB. |
scan.incremental.snapshot.chunk.samples | The number of samples to use when determining the collection size during the snapshot phase. | No | INT | 20 | None. |
scan.full-changelog | Specifies whether to generate a complete full changelog event stream based on Mongo Pre- and Post-Image records. | No | BOOLEAN | false | Requires MongoDB 6.0 or later with the pre-image and post-image feature enabled. For more information about how to enable this feature, see Document Preimages. |
scan.cursor.no-timeout | Specifies whether to set the cursor for reading data to never expire. | No | BOOLEAN | false | A MongoDB server typically closes a cursor after it has been idle for a period of time (10 minutes) to prevent high memory usage. Setting this option to true prevents this from happening. |
scan.ignore-delete.enabled | Specifies whether to ignore delete event records from the MongoDB source. | No | BOOLEAN | false | None. |
scan.flatten.nested-documents.enabled | Specifies whether to flatten nested structures in BSON documents. | No | BOOLEAN | false | When this option is enabled, a schema like |
scan.all.primitives.as-string.enabled | Specifies whether to infer all primitive types as STRING. | No | BOOLEAN | false | Enabling this option can avoid generating many schema evolution events when dealing with mixed input data types. |
Type mapping
BSON type | CDC type | Notes |
STRING | VARCHAR | None. |
INT32 | INT | |
INT64 | BIGINT | |
DECIMAL128 | DECIMAL | |
DOUBLE | DOUBLE | |
BOOLEAN | BOOLEAN | |
TIMESTAMP | TIMESTAMP | |
DATETIME | LOCALZONEDTIMESTAMP | |
BINARY | VARBINARY | |
DOCUMENT | MAP | Key/Value type parameters need to be inferred. |
ARRAY | ARRAY | The element type parameter needs to be inferred. |
OBJECTID | VARCHAR | Represented as a HexString. |
SYMBOL REGULAREXPRESSION JAVASCRIPT JAVASCRIPTWITHSCOPE | VARCHAR | Represented as a string. |
Metadata
SQL connector
The MongoDB CDC SQL source table supports metadata column syntax. You can access the following metadata through metadata columns.
Metadata key | Metadata type | Description |
database_name | STRING NOT NULL | The name of the database that contains the document. |
collection_name | STRING NOT NULL | The name of the collection that contains the document. |
op_ts | TIMESTAMP_LTZ(3) NOT NULL | The time the document was changed in the database. If the document is from the existing historical data of the table instead of from the ChangeStream, this value is always 0. |
row_kind | STRING NOT NULL | Indicates the type of data change. Valid values:
Note This feature is supported only in VVR 11.1 and later. |
Data ingestion YAML
The MongoDB CDC data ingestion YAML connector supports reading the following metadata columns:
Metadata key | Metadata type | Description |
ts_ms | BIGINT NOT NULL | The time the document was changed in the database. If the document is from the existing historical data of the table instead of from the ChangeStream, this value is always 0. |
You can also use the common metadata columns provided by the Transform module to access database name, collection name, and row_kind information.
About the pre-image and post-image recording feature in MongoDB
By default, versions of MongoDB earlier than 6.0 do not provide data for pre-change documents or deleted documents. Without this feature enabled, the available information can only achieve Upsert semantics, which means that Update Before data entries are missing. However, many useful operators in Flink rely on a complete change stream of Insert, Update Before, Update After, and Delete events.
To supplement the missing pre-change events, the Flink SQL Planner automatically generates a ChangelogNormalize node for Upsert-type data sources. This node caches a snapshot of the current version of all documents in Flink state. When it encounters an updated or deleted document, it can look up the pre-change state from the cached snapshot. However, this operator node requires a large amount of state data.

MongoDB 6.0 supports the pre- and post-images feature for databases. For more information, see Use MongoDB Change Streams to Capture Data Changes in Real Time. After you enable this feature, MongoDB records the complete state of a document before and after each change in a special collection. Then, when you enable the scan.full-changelog configuration item in a job, MongoDB CDC generates Update Before records from the change document records. This process creates a complete event stream and eliminates the dependency on the ChangelogNormalize node.
Mongo CDC DataStream API
When you read or write data using the DataStream API, you need to use the corresponding DataStream connector to connect to Flink. For setup instructions, see Use DataStream connectors.
You can create a DataStream API program and use MongoDBSource. The following code shows an example:
Java
MongoDBSource.builder()
.hosts("mongo.example.com:27017")
.username("mongouser")
.password("mongopasswd")
.databaseList("testdb")
.collectionList("testcoll")
.startupOptions(StartupOptions.initial())
.deserializer(new JsonDebeziumDeserializationSchema())
.build();XML
The VVR MongoDB connector is available in the Maven Central Repository. You can use it directly in your job development.
<dependency>
<groupId>com.alibaba.ververica</groupId>
<artifactId>flink-connector-mongodb</artifactId>
<version>${vvr.version}</version>
</dependency>When you use the DataStream API, to enable the incremental snapshot feature, use MongoDBSource#builder() from the com.ververica.cdc.connectors.mongodb.source package when constructing the MongoDBSource. Otherwise, use MongoDBSource#builder() from the com.ververica.cdc.connectors.mongodb package.
When you construct a MongoDBSource, you can configure the following parameters:
Parameter | Description |
hosts | The hostname of the MongoDB database to connect to. |
username | The username for the MongoDB database service. Note If authentication is not enabled on the MongoDB server, you do not need to configure this parameter. |
password | The password for the MongoDB database service. Note If authentication is not enabled on the MongoDB server, you do not need to configure this parameter. |
databaseList | The name of the MongoDB database to monitor. Note The database name supports regular expressions to read data from multiple databases. You can use |
collectionList | The name of the MongoDB collection to monitor. Note The collection name supports regular expressions to read data from multiple collections. You can use |
startupOptions | Selects the startup mode for MongoDB CDC. Valid values:
For more information, see Startup Properties. |
deserializer | A deserializer that deserializes SourceRecord type records to a specified type. Valid values:
|