Production practice of Flink MongoDB CDC in XTransfer

Foreword

XTransfer focuses on providing cross-border financial and risk control services for cross-border B2B e-commerce small and medium-sized enterprises. Comprehensive solutions for various cross-border financial services such as local collection accounts, foreign exchange, and overseas foreign exchange control country declarations.

In the early stage of business development, we chose the traditional offline data warehouse architecture, and adopted the data integration method of full collection, batch processing, and overwrite writing, and the data timeliness was poor. With the development of business, offline data warehouses are increasingly unable to meet the requirements for data timeliness, so we decided to evolve from offline data warehouses to real-time data warehouses. The key to building a real-time data warehouse is to change the selection of data collection tools and real-time computing engines.

After a series of research, in February 2021, we paid attention to the Flink CDC project. Flink CDC is embedded with Debezium, which enables Flink itself to have the ability to capture change data, which greatly reduces the development threshold and simplifies deployment. the complexity. Coupled with Flink's powerful real-time computing capabilities and rich external system access capabilities, it has become a key tool for us to build a real-time data warehouse.

In addition, we also use MongoDB extensively in production, so we implemented the Flink MongoDB CDC Connector through the MongoDB Change Streams feature on the basis of Flink CDC, and contributed it to the Flink CDC community, which has been released in version 2.1. It is a great honor to share with you the implementation details and production practices here.

1. Flink CDC

Dynamic Table (Dynamic Table) is the core concept of Flink's Table API and SQL that supports streaming data. Streams and tables have duality. You can convert a table into a changelog stream, or play back the changelog stream and restore it to a table.

There are two forms of change streams: Append Mode and Update Mode. Append Mode will only add, not change or delete, such as event flow. Update Mode may be added, changed and deleted, such as database operation logs. Before Flink 1.11, only dynamic tables were supported in Append Mode.

Flink 1.11 introduces new TableSource and TableSink in FLIP-95 to support Update Mode changelog. And in FLIP-105, direct support for Debezium and Canal CDC formats was introduced. By implementing ScanTableSource, receiving external system change logs (such as database change logs), interpreting them as Flink-recognizable changlogs and flowing them down, it is possible to support the definition of dynamic tables from change logs.

Inside Flink, changelog records are represented by RowData, which includes 4 types: +I (INSERT), -U (UPDATE_BEFORE), +U (UPDATE_AFTER), -D (DELETE). According to the different record types generated by the changelog, it can be divided into three changelog modes.


2. MongoDB replication mechanism

As mentioned in the previous section, the key to implementing Flink CDC MongoDB is: how to convert MongoDB’s operation log into a changelog supported by Flink. To solve this problem, you first need to understand MongoDB's cluster deployment and replication mechanism.

2.1 Replica sets and fragmented clusters
The replica set is a highly available deployment mode provided by MongoDB. The replica set members replicate the oplog (operation log) to complete the data synchronization between the replica set members.

Sharded cluster is a deployment mode for MongoDB to support large-scale data sets and high-throughput operations, and each shard consists of a replica set.

2.2 Replica Set Oplog

The operation log oplog, in MongoDB, is a special capped collection (fixed-capacity collection), used to record the operation log of data, and used for synchronization between replica set members. The data structure of oplog records is as follows.

It can be seen from the example that the update record of the MongoDB oplog neither contains the information before the update nor the complete record after the update, so it cannot be converted into an ALL type changelog supported by Flink, nor can it be converted into an UPSERT type changelog .

In addition, in a sharded cluster, data writing may occur in different shard replica sets, so the oplog of each shard will only record data changes that occur on that shard. Therefore, it is necessary to obtain complete data changes, and it is necessary to merge the oplogs of each fragment according to the order of operation time, which increases the difficulty and risk of capturing change records.

Before version 1.7, Debezium MongoDB Connector realized change data capture by traversing the oplog. Due to the above reasons, we did not use Debezium MongoDB Connector but chose MongoDB's official Change Streams-based MongoDB Kafka Connector.

2.3 Change Streams

Change Streams is a new feature introduced by MongoDB 3.6, which shields the complexity of traversing the oplog and enables users to subscribe to data changes at the cluster, database, and collection levels through a simple API.

2.3.1 Conditions of use
2.3.2 Change Events
2.3.3 Update Lookup

Since the update operation of the oplog only includes the changed fields, the complete document after the change cannot be obtained directly from the oplog, but when converting to a changelog in UPSERT mode, UPDATE_AFTER RowData must have a complete row record. Change Streams can return the latest state of the document when fetching the change record by setting fullDocument = updateLookup. In addition, each record of Change Event contains documentKey (_id and shard key), which identifies the primary key information of the changed record, which satisfies the condition of idempotent update. Therefore, through the Update Lookup feature, the change records of MongoDB can be converted into Flink's UPSERT changelog.

3. Flink MongoDB CDC

In terms of specific implementation, we integrated MongoDB's official MongoDB Kafka Connector implemented based on Change Streams. With Debezium EmbeddedEngine, it is easy to drive MongoDB Kafka Connector to run in Flink. MongoDB CDC TableSource is implemented by converting Change Stream into Flink UPSERT changelog. With the resume mechanism of Change Streams, the function of restoring from checkpoint and savepoint is realized.

As noted in FLIP-149, some operations (such as aggregations) have difficulty handling correctly without the -U message. For a changelog of UPSERT type, Flink Planner will introduce an additional computing node (Changelog Normalize) to normalize it to a changelog of type ALL.


Support features

Support for Exactly-Once semantics
Support full and incremental subscription
Support for Snapshot data filtering
Support restore from checkpoint, savepoint
Support metadata extraction

4. Production practice

4.1 Using RocksDB State Backend

Changelog Normalize will bring additional state overhead in order to complement the pre-mirror value of -U. It is recommended to use RocksDB State Backend in a production environment.

4.2 Appropriate oplog capacity and expiration time

MongoDB oplog.rs is a special collection with capacity. When the capacity of oplog.rs reaches the maximum value, historical data will be discarded. Change Streams are restored through the resume token. Too small oplog capacity may cause the oplog record corresponding to the resume token to no longer exist, resulting in recovery failure.

When the specified oplog capacity is not displayed, the default oplog capacity of the WiredTiger engine is 5% of the disk size, the lower limit is 990MB, and the upper limit is 50GB. After MongoDB 4.4, it supports setting the minimum retention time of oplog. When the oplog is full and the oplog record exceeds the minimum retention time, the oplog record will be recycled.

The oplog capacity and minimum retention time can be reset using the replSetResizeOplog command. In a production environment, it is recommended to set the oplog capacity to no less than 20GB, and the oplog retention time to no less than 7 days.

4.3 Change the slow table to open the heartbeat event

Flink MongoDB CDC will periodically write the resume token to the checkpoint to restore the Change Stream. MongoDB change events or heartbeat events can trigger the update of the resume token. If the subscribed collection changes slowly, it may cause the resume token corresponding to the last change record to expire, making it impossible to recover from the checkpoint. Therefore, for collections that change slowly, it is recommended to enable heartbeat events (set heartbeat.interval.ms > 0) to keep the resume token updated.

4.4 Customize MongoDB connection parameters

When the default connection cannot meet the usage requirements, you can pass the connection parameters supported by MongoDB through the connection.options configuration item.

4.5 Change Stream parameter tuning
Pulling of configuration change events can be refined through poll.await.time.ms and poll.max.batch.size in Flink DDL.

poll.await.time.ms
Change event pull time interval, the default is 1500ms. For collections that change frequently, the pull interval can be appropriately reduced to improve processing efficiency; for collections that change slowly, the pull interval can be appropriately increased to reduce database pressure.

poll.max.batch.size
The maximum number of pull change events for each batch, the default is 1000. Increasing the change parameter will speed up the speed of pulling change events from Cursor, but it will increase the memory overhead.

4.6 Subscribe to the whole database and cluster changes
database = "db", collection = "", you can subscribe to the changes of the entire db database; database = "", collection = "", you can subscribe to the changes of the entire cluster.

The DataStream API can use the pipeline to filter the db and collection that need to be subscribed, and the filtering of the Snapshot collection is not currently supported.

4.7 Access control
MongoDB supports fine-grained control over users, roles, and permissions. Users who enable Change Stream need to have two permissions: find and changeStream.

single set

cluster

{ resource: { db: "", collection: "" }, actions: [ "find", "changeStream" ] }
In a production environment, it is recommended to create Flink users and roles, and perform fine-grained authorization on the roles. It should be noted that MongoDB can create users and roles under any database. If the user is not created under admin, you need to specify authSource = in the connection parameters.

In the development environment and test environment, two built-in roles of read and readAnyDatabase can be granted to Flink users to enable change streams for any collection.

Related Articles

Explore More Special Offers

  1. Short Message Service(SMS) & Mail Service

    50,000 email package starts as low as USD 1.99, 120 short messages start at only USD 1.00

phone Contact Us