OceanBase full incremental integrated data integration solution

1. Introduction to OceanBase

OceanBase is a distributed database self-developed by Ant Group. Since 2010, the project has been established and developed and iterated. The earliest users are Taobao's favorites. In 2014, the OceanBase R&D team moved from Taobao to Ant Group, and was mainly responsible for supporting Alipay’s internal IOE work, that is, replacing the database used by Alipay. At present, the Ant Group database has all been migrated to OceanBase. On June 1, 2021, OceanBase was officially open-sourced, and a MySQL-compatible version was released.

The OceanBase database has undergone three generations of architectural upgrades, from a distributed storage system initially applied to e-commerce, to a general-purpose distributed database, and to today's enterprise-level distributed database.

The figure above shows the architecture of OceanBase.

The top-level app accesses the server side of the OceanBase database through OBProxy (load balancing agent). There are multiple copies of the data on the server side. The relationship between copies is similar to the master-slave relationship in the database architecture, but it is at the table level, that is, partitions Table partitioning means that there are multiple copies at the table level, and then they are broken up and exist in multiple servers.

The architecture of OceanBase has the following characteristics:

Shared nothing architecture: each node has its own complete SQL engine, storage engine and transaction processing logic, and the nodes are completely peer-to-peer without hierarchical structure.
Partition-level availability: Provides partition-level availability. In OceanBase database, partition is the basic unit of reliability and scalability, which realizes access routing, load balancing and automatic fault recovery.
High availability + strong consistency: Since there are multiple copies of the data, the Paxos consistency protocol is used to provide high reliability between the multiple copies, and to ensure that the log persistence is successful on the majority nodes.

OceanBase has the following six core features:

High availability: Based on Paxos protocol, strong consistency. If a few copies fail, data will not be lost and services will continue.
High scalability: Support online horizontal expansion and shrinkage, and automatic load balancing between nodes.
High compatibility: The community edition provides compatibility with MySQL protocol and syntax.
Low cost: The storage cost of OceanBase database is about 1/3 of that of MySQL. Because it has low requirements on hardware quality and has optimized storage a lot, the "storage compression ratio" is extremely high.
Multi-tenant: The resources between tenants are completely isolated, and different business parties only need to manage data in their own tenants, which can save a certain amount of cost.
HTAP: It realizes both OLTP and OLAP functions in one set of engines.

2. Implementation principle of Flink CDC OceanBase Connector

The current mainstream CDC implementation method mainly relies on the database log. After obtaining the incremental log of the database, it must ensure its order and integrity, then process these logs, and then write them to the destination, such as data warehouse or query engine.

OceanBase provides some components for incremental data acquisition. Because it is a distributed database itself, its data is also scattered when it falls into the log. It provides an obcdc component for obtaining database logs. It will interact with the OceanBase server through RPC to pull the original log information. After certain processing, an ordered log stream can be spit out, and the downstream can consume the ordered log stream by accessing the obcdc component.

At present, there are three main types of downstream consumers:

oblogproxy: an open source component, a service that consumes log streams, and Flink CDC relies on this component to achieve incremental data pull.
OMS store: Data migration service provided by OceanBase. The commercial version of OMS has iterated many versions and supports many data sources. Last year, OMS provided support for the community edition, mainly supporting the OceanBase community edition and MySQL two data sources.
JNI client: You can directly use obcdc to interact with OBSserver to pull incremental logs through the JNI log client, and it is in the open source plan.

Currently, there are two main components of OceanBase CDC provided by the open source community:

OceanBase Canal: Canal is an open-source MySQL incremental log pull tool released by Alibaba. Based on the latest code of the open source version of Canal, the OceanBase community has added the ability to pull and parse OceanBase incremental logs.
Flink CDC: use obcdc through oblogproxy, pull incremental logs from OceanBase, consume incremental logs through another open source component logproxy-client, and process them.

The lower left corner of the figure above is the definition of the dynamic table, and the data flow will be converted into a table in the form of a dynamic table in Flink. Only by converting it into a table can SQL operations be performed on it. Afterwards, Continuous Queries will query the ever-growing stream table, and the obtained data is still in the table structure, and then converted into stream data and sent downstream.

Flink CDC Connector only reads source data, that is, it is only responsible for reading data from the data source to the Flink engine.

The current Flink CDC Connector is mainly divided into the following three categories:

MySqlSource: implements the latest source interface and concurrent reading.
DebeziumSourceFunction: SourceFunction is implemented based on Debezium, supporting old versions of MySQL, MongoDB, SqlServer, PostgreSQL.
OceanBaseSourceFunction: implements the SourceFunction interface, and implements full and incremental reading based on JDBC and logproxy-cilent respectively.

Incremental data is first pulled through logproxy, and logproxy-client will monitor the data flow of incremental logs. After the data flow enters Flink CDC, it is written to Flink through the processing logic of Flink CDC. The full amount of data is pulled through JDBC.

The current capabilities supported by Flink CDC OceanBase Connector are mainly limited by logproxy, which currently supports pulling data from a specified time. However, since OceanBase is a distributed database, it is impossible to accurately find the starting point of log incremental data, and there may be some duplicate data specified by timestamp.

In the full stage, because the OceanBase community edition does not have a table lock, the read of the full data cannot be locked to determine the data boundary.

Based on the above two considerations, currently only the at-least-once working mode is supported, and exactly-onece has not yet been implemented.

3. Flink CDC + OceanBase application scenarios

3.1 Scenario 1: Data integration based on sub-database and sub-table
Flink CDC is a full incremental integration, and OceanBase Connector supports regular matching for reading data tables. For some scenarios of sub-database and sub-table, you can create a dynamic table through OceanBase Connector to read the data of the data source, and then write it into a table to realize the aggregation of table data.

3.2 Scenario 2: Data integration across clusters/tenants
OceanBase is a multi-tenant system. At present, the MySQL tenant of the community edition has not yet achieved cross-tenant access. Therefore, if you need to read cross-tenant data, you need to connect to multiple databases to read separately. And Flink CDC is naturally suitable for this work, which means that each tenant corresponds to a dynamic table as a channel for reading data sources, and then aggregated in Flink.


3.3 Scenario 3: Data Integration of Multiple Data Sources
Data aggregation can be performed on different types of data sources. For the integration of data sources compatible with the MySQL protocol, such as MySQL and TiDB, because the data format is the same, there is basically no change cost.

3.4 Scenario 4: Building OLAP applications
OceanBase is an HTAP database, which not only has strong TP capabilities, but can also be used as a data warehouse. The JDBC connector in Flink supports writing data to a database compatible with the MySQL protocol. Therefore, you can use Flink CDC to read source data, and then write these data to OceanBase through the Flink JDBC connector, and use OceanBase as the target end. use.

Currently, OceanBase provides three data usage methods: SQL, Table API, and HBase API, and the components required for all usage methods are open source.

4. Future Outlook of OceanBase Connector

The figure above lists the status quo of the OceanBase CDC solution.

OMS Community Edition: It is a functional subset of OMS Commercial Edition, but not open source. As a white screen tool, it is user-friendly and fully incrementally integrated, with data verification and operation and maintenance capabilities. Its shortcoming is that the deployment process is a bit cumbersome. It only supports two data sources, MySQL and OceanBase Community Edition, and does not support incremental DDL.

DataX + Canal/Otter: It is an open source solution that uses DataX + Canal/Otter for data migration. Otter is the parent project of Canal. It is mainly aimed at remote multi-active, and can support two-way data synchronization. Its incremental data reading is based on Canal. The advantage of this solution is that it supports a variety of destinations, and it supports HBase, ES, and relational database RDB; the disadvantage is that Canal and Otter do incremental, DataX does full, and the way of separating increment and full is relatively fragmented , there will be data redundancy in the connection part.

Flink CDC: A pure open source solution with an active community and rapid growth of community users, supporting multiple sources and destinations, and full incremental integration. At the same time, Flink, as a very good big data processing engine, can do ETL. Its disadvantage is that OceanBase Connector currently does not support incremental DDL, and does not implement exactly-once, so there may be data redundancy in the overlap between incremental and full amounts.

In the future, we will optimize data reading first. Parallelize the full part, using the new parallel processing framework of the source interface; skip the logproxy service for the incremental part, and pull the incremental data directly from the OceanBase database, that is, use the obcdc component to pull the data directly through the JNI client.

Second, enrich the functional features. At present, Flink CDC only supports the community version of OceanBase, and the OceanBase community version and the enterprise version use exactly the same components in incremental log reading, so only minor changes can support the incremental reading of the enterprise version; support incremental DDL , exactly-once mode, and rate limiting.

Finally, improve code quality. First, end-to-end testing will be added; in the format conversion part, runtime converters will be used instead of JdbcValueConverters to improve performance; support for the new version of the source interface (parallel processing framework) will be implemented.

Related Articles

Explore More Special Offers

  1. Short Message Service(SMS) & Mail Service

    50,000 email package starts as low as USD 1.99, 120 short messages start at only USD 1.00

phone Contact Us