Implementation Principle and Practice of Flink CDC MongoDB Connector

1. Introduction to MongoDB Change Stream technology

MongoDB is a document-oriented non-relational database that supports semi-structured data storage; it is also a distributed database that provides two cluster deployment modes, replica set and fragment set, with high availability and horizontal expansion capabilities. Suitable for large-scale data storage. In addition, MongoDB version 4.0 also provides support for multi-document transactions, which is more friendly to some more complex business scenarios.

MongoDB uses a weakly structured storage model, supports flexible data structures and rich data types, and is suitable for business scenarios such as Json documents, tags, snapshots, geographic locations, and content storage. Its natural distributed architecture provides an out-of-the-box fragmentation mechanism and automatic rebalance capability, suitable for large-scale data storage. In addition, MongoDB also provides a distributed grid file storage function, that is, GridFS, which is suitable for storing large files such as pictures, audio, and video.

MongoDB provides two cluster mode deployment modes: replica set and shard set.

Replica set: a high-availability deployment mode, the secondary node replicates data by copying the operation log of the primary node. When the primary node fails, the secondary node and the arbitration node will re-initiate a vote to elect a new primary node to achieve failover. In addition, secondary nodes can also share query requests and reduce the query pressure of primary nodes.

Shard set: a horizontally expanded deployment mode that evenly disperses data on different shards. Each shard can be deployed as a replica set. The primary node in the shard carries read and write requests, and the secondary node will copy the operation log of the primary node. According to the specified sharding index and sharding strategy, the data is divided into multiple 16MB data blocks, and these data blocks are handed over to different shards for storage. The corresponding relationship between Shard and data block will be recorded in Config Servers.

MongoDB's Oplog is similar to MySQL's Binlog, which records all operation logs of data in MongoDB. Oplog is a collection with a capacity. If it exceeds the preset capacity range, the previous information will be discarded.

Unlike MySQL's Binlog, Oplog does not record complete information before/after changes. Traversing the Oplog can indeed capture MongoDB data changes, but there are still some limitations in converting to the Changelog supported by Flink.

First, subscribing to the Oplog is difficult. Each replica set maintains its own Oplog. For a sharded cluster, each shard may be an independent replica set. It is necessary to traverse the Oplog of each shard and sort them by operation time. In addition, the Oplog does not contain the complete status before and after the change document, so it can neither be converted into a Flink standard Changelog nor an Upsert Changelog. This is also the main reason why we did not use the direct subscription Oplog scheme when implementing the MongoDB CDC Connector.

In the end, we chose to use the MongoDB Change Streams solution to implement the MongoDB CDC Connector.

Change Streams is a new feature provided by MongoDB version 3.6. It provides a simpler change data capture interface and shields the complexity of directly traversing the Oplog. Change Streams also provides the extraction function of the complete state of the changed document, which can be easily converted into a Changelog of the Flink Upsert type. It also provides a relatively complete failure recovery capability, and each change record data will contain a resume token to record the current change stream position. After a failure occurs, the resume token can be used to resume from the current consumption point.

In addition, Change Streams supports filtering and customization of change events. For example, regular filters for database and collection names can be pushed down to MongoDB to complete, which can significantly reduce network overhead. It also provides change subscriptions to the collection library and the entire cluster level, and can support corresponding permission control.

The CDC Connector implemented using the MongoDB Change Streams feature is shown in the figure above. First subscribe to MongoDB changes via Change Streams. For example, there are four types of changes: insert, update, delete, and replace. First convert it into an upsert Changelog supported by Flink, and then define a dynamic table on top of it and use Flink SQL for processing.

Currently, MongoDB CDC Connector supports Exactly-Once semantics, supports full plus incremental subscription, supports recovery from checkpoints and savepoints, supports filtering of Snapshot data, supports extraction of metadata such as Database and Collection of databases, and supports collection of libraries Regular filtering function.

2. MongoDB CDC Connector business practice

Founded in 2017, XTransfer focuses on B2B cross-border payment business, providing foreign trade collection and risk control services for small, medium and micro enterprises engaged in cross-border e-commerce export. The business link involved in the cross-border Type B business settlement scenario is very long. From inquiry to final transaction, logistics terms, payment terms, etc. are involved in the process, and risk control needs to be done in every link to comply with cross-border capital transactions. regulatory requirements.

All of the above factors put forward higher requirements on the security and accuracy of XTransfer's data processing. On this basis, XTransfer has built its own big data platform based on Flink, which can effectively ensure that data on the cross-border B2B full link can be effectively collected, processed and calculated, and meets the requirements of high security, low latency and high precision. demand.

Change Data Capture CDC is a critical part of data integration. Before Flink CDC is used, traditional CDC tools such as Debezium and Canal are generally used to extract the change log of the database and forward it to Kafka, and the downstream reads the change log in Kafka for consumption. This architecture has the following pain points:

*Many deployment components lead to high operation and maintenance costs;
*The downstream data consumption logic needs to be adapted according to the writing end, and there is a certain development cost;
*The configuration of data subscription is complicated, and it is impossible to define a complete data synchronization logic only through SQL statements like Flink CDC;
* It is difficult to fully satisfy the full + incremental collection, and it may be necessary to introduce full collection components such as DataX;
* It is more biased towards the collection of changed data, and the ability to process and filter data is relatively weak;
* It is difficult to meet the widening scenarios of heterogeneous data sources.

At present, our big data platform mainly uses Flink CDC for change data capture, which has the following advantages:

1. Real-time data integration

There is no need to deploy additional components such as Debezium, Canal, and Datax, and the operation and maintenance costs are greatly reduced;
It supports rich data sources, and can reuse Flink's existing connectors for data collection and writing, which can cover most business scenarios;
It reduces the difficulty of development, and a complete data integration workflow can be defined only through Flink SQL;
Strong data processing capability, relying on the powerful computing capability of the Flink platform, it can realize streaming ETL and even join and group by of heterogeneous data sources.

2. Build a real-time data warehouse

It greatly simplifies the deployment difficulty of real-time data warehouses, collects database changes in real time through Flink CDC, and writes them into Kafka, Iceberg, Hudi, TiDB and other databases, and then uses Flink for in-depth data mining and data processing.
Flink's computing engine can support the computing mode of streaming and batch integration. It is no longer necessary to maintain multiple sets of computing engines, which can greatly reduce the cost of data development.

3. Real-time risk control

In the past, real-time risk control was usually implemented by sending business events to Kafka. After using Flink CDC, you can directly capture risk control events from the business library, and then perform complex event processing through Flink CDC.
Models can be run to enrich machine learning capabilities through Flink ML and Alink. Finally, the disposal results of these real-time risk control are dropped into Kafka, and the risk control command is issued.

3. MongoDB CDC Connector Production Tuning

The use of MongoDB CDC Connector has the following requirements:

Since the feature of Change Streams is used to implement the MongoDB CDC Connector, the minimum available version of MongoDB is required to be 3.6, and version 4.0.8 and above are recommended.
Cluster deployment mode must be used. Since subscribing to MongoDB's Change Streams requires nodes to replicate data with each other, a single MongoDB cannot replicate data with each other, and there is no Oplog. Only replica sets or shard sets have data replication mechanisms.
Requires the use of the WireTiger storage engine, using the pv1 replication protocol.
Requires ChangeStream and find user privileges.

When using MongoDB CDC Connector, pay attention to setting the capacity and expiration time of Oplog. MongoDB oplog is a special collection with capacity. After the capacity reaches the maximum value, historical data will be discarded. However, Change Streams are restored through resume tokens. Too small oplog capacity may cause the oplog record corresponding to the resume token to no longer exist, that is, the resume token expires, and Change Streams cannot be restored.

You can use replSetResizeOplog to set the oplog capacity and minimum retention time, and MongoDB version 4.4 and later also supports setting the minimum time. Generally speaking, it is recommended to keep the oplog for no less than 7 days in a production environment.

For tables that change slowly, it is recommended to enable heartbeat events in the configuration. The change event and the heartbeat event can push the resume token forward at the same time. For tables that change slowly, the resume token can be refreshed through the heartbeat event to avoid its expiration.

The heartbeat interval can be set via

Since MongoDB's Change Streams can only be converted into Flink's Upsert changelog, which is similar to Upsert Kafka, an operator ChangelogNormalize will be added to complete the -U pre-image value, which will bring additional state overhead. Therefore, it is recommended to use RocksDB State Backend in a production environment.

When the parameters of the default connection cannot meet the usage requirements, you can pass the connection parameters supported by MongoDB by setting the connection.options configuration item.

For example, if the database created by the user connecting to MongoDB is not in admin, parameters can be set to specify which database needs to be used to authenticate the current user, and the maximum connection parameters of the connection pool can also be set. MongoDB's connection string supports these parameters by default.

Regular matching of multiple databases and multiple tables is a new feature provided by MongoDB CDC Connector after version 2.0. It should be noted that if the database name uses a regular parameter, it needs to have the readAnyDatabase role. Because MongoDB's Change Streams can only be enabled at the granularity of the entire cluster, database, and collection. If you need to filter the entire database, you can only enable Change Streams on the entire cluster when performing regular matching on the database, and then filter the database changes through the Pipeline. You can subscribe to multiple databases and tables by writing regular expressions in the two parameters of Ddatabase and Collection.

Four, MongoDB CDC Connector Parallel Snapshot Improvement

In order to speed up the speed of Snapshot, the source introduced by Flip-27 can be used for parallel transformation. First, a split enumerator is used to split a complete Snapshot task into several subtasks according to a certain segmentation strategy, and then assigned to multiple split readers to perform Snapshots in parallel, so as to improve the running speed of the overall task.

But in MongoDB, in most cases, the component is ObjectID, in which the first four bytes are UNIX descriptions, the middle five bytes are a random value, and the last three bytes are an auto-increment. The documents inserted in the same description are not strictly increasing, and the random value in the middle may affect the local strict increase, but overall, the increasing trend can still be satisfied.

Therefore, unlike MySQL's incremental components, MongoDB is not suitable for simple splitting of its collections using the offset + limit splitting strategy, and requires a targeted splitting strategy for ObjectID.

In the end, we adopted the following three MongoDB segmentation strategies:

Sample sampling bucketing: The principle is to use the $sample command to randomly sample the collection, and estimate the number of buckets required by the average document size and the size of each chunk. The query permission of the corresponding collection is required. The advantage is that the speed is fast, and it is suitable for collections with a large amount of data but no fragmentation. The disadvantage is that due to the use of sampling estimation mode, the results of bucketing cannot be absolutely uniform.

SplitVector index splitting: SplitVector is an internal command for MongoDB to calculate the split point of the chunk, and calculates the boundary of each chunk by accessing the specified index. The SplitVector permission is required, which has the advantages of fast speed and uniform chunk results; the disadvantage is that for a collection with a large amount of data and has been fragmented, it is better to directly read the metadata of the chunks that have been divided in the config library.

Chunks metadata reading: Because MongoDB stores the actual sharding results of the sharded collection in the config database, the actual sharded results of the sharded collection can be read directly from config. Requires read access to the config library, and is only available for sharded collections. The advantage is that it is fast, there is no need to recalculate the chunk split point, and the chunk result is uniform, with a default size of 64MB; the disadvantage is that it cannot satisfy all scenarios, and is only limited to fragmentation scenarios.

The figure above is an example of sample sampling bucketing. On the left is a complete collection. Set the number of samples from the complete collection, then shrink the entire sample, and perform bucketing according to the sampled samples. The final result is the chunks boundary we want.

The sample command is a built-in command for MongoDB sampling. In the case of sample values less than 5%, sampling is performed using a pseudo-random algorithm; in the case of sample values greater than 5%, random sorting is used first, and then the top N documents are selected. Its uniformity and time consumption mainly depend on the random algorithm and the number of samples. It is a compromise strategy between uniformity and segmentation speed. It is suitable for scenarios that require fast segmentation speed but can tolerate uneven segmentation results. .

In the actual test, the uniformity of sample sampling has a good performance.

The figure above is an example of SplitVector index splitting. The left side is the original collection, and the index to be accessed is specified by the SplitVector command, which is the ID index. You can set the size of each chunk in MB, then use the SplitVector command to access the index, and calculate the boundaries of each chunk through the index.

It is fast and the chunk results are very uniform, which is suitable for most scenarios.

The above figure is an example of reading config.chuncks, that is, directly reading the metadata of the chunks that MongoDB has divided. Each shard, its machine, and the boundaries of each shard are stored in the Config Server. For fragmented collections, you can directly read its boundary information in chunks, without repeatedly calculating these split points, and you can also ensure that the reading of each chunk can be completed on a single machine, which is extremely fast, and in large-scale It has a good performance in the fragmented collection scenario.

5. Subsequent planning

The subsequent planning of Flink CDC is mainly divided into the following five aspects:

First, assist in improving the Flink CDC incremental Snapshot framework;
Second, use MongoDB CDC to connect to Flink CDC incremental Snapshot framework, so that it can support parallel Snapshot improvement;
Third, MongoDB CDC supports Flink RawType. RawType conversion is provided for some more flexible storage structures, and users can perform custom analysis in the form of UDF;
Fourth, MongoDB CDC supports the collection of changed data from a specified location;
Fifth, the optimization of MongoDB CDC stability.

Related Articles

Explore More Special Offers

  1. Short Message Service(SMS) & Mail Service

    50,000 email package starts as low as USD 1.99, 120 short messages start at only USD 1.00

phone Contact Us