Extract Oracle data in real time

1. Unable to connect to the database

According to the official documentation, enter the following statement in the Flink SQL CLI:

create table TEST (A string)
WITH ('connector'='oracle-cdc',
'table-name'='TEST' );

After trying to observe through select * from TEST, it was found that Oracle could not be connected normally, and the error was reported as follows:

[ERROR] Could not execute SQL statement. Reason:
oracle.net.ns.NetException: Listener refused the connection with the following error:
ORA-12505, TNS:listener does not currently know of SID given in connect descriptor

Judging from the error message, it may be that Flink CDC mistook the MY_SERVICE_NAME (Oracle service name) provided in the connection information for the SID. So I tried to read the source code of Flink CDC related to Oracle Connector and found it in com.ververica.cdc.connectors.oracle.OracleValidator.

It can be seen from the above that in the current version of Flink CDC, there is no distinction between the connection method of SID and Service Name, but the connection method of SID is directly written in the code (that is, ":" is used between port and dbname separated).

Starting from Oracle 8i, Oracle has introduced the concept of Service Name to support database cluster (RAC) deployment. A Service Name can be used as a logical concept of a database to unify the connection of different SID instances of the database. Accordingly, the following two approaches can be considered:

In the create table statement of Flink CDC, replace database-name with Service Name with one of the SIDs. This method can solve the connection problem, but it cannot adapt to the real scenario of mainstream Oracle cluster deployment;

Modify the source code. Specifically, in the new project, rewrite the com.ververica.cdc.connectors.oracle.OracleValidator method and change it to the connection method of Service Name.

The author adopts the second method, which realizes the normal connection to the database while retaining the use of the Oracle Service Name feature.

2. The Oracle table cannot be found

According to the above steps, observe through select * from TEST again, and find that the data still cannot be obtained normally, and the error is as follows:

Observe that the table mentioned in the error log is MY_SERVICE_NAME.MY_SCHEMA.test, why the database name and schema name are all uppercase, but the table name is lowercase?

Note that this error is reported by the io.debezium package. By analyzing the source code of the package (from the pom.xml file of Flink CDC, we can see that the current version of debezium 1.5.4 is used), in io.debezium.relational.Tables There is the following code in it.

It can be seen that the developers of Debezium have uniformly defined "case insensitivity" as "table names need to be converted to lowercase". This is true for PostgreSQL, Mysql etc supported by Debezium. However, for the Oracle database, "case-insensitive" means that the table name needs to be converted to uppercase when storing internal meta-information

Therefore, after Debezium reads the "case-insensitive" configuration, according to the above code logic, it will only report an error because it tries to read the lowercase table name.

Since Debezium has not fixed this problem until the latest stable version 1.7.1, and the latest development version 1.8.0, we can bypass this problem by the following two methods:

If you want to use Oracle's "case-insensitive" feature, you can directly modify the source code and change the above toLowercase to toUppercase (this is also the method I chose);
If you are unwilling to modify the source code and do not need to use Oracle's "case insensitive" feature, you can add 'debezium.database.tablename.case.insensitive'='false' to the create statement, as shown in the following example.

The disadvantage of this method is that it loses the "case-insensitive" feature of Oracle, and the uppercase table name must be explicitly specified in 'table-name'.

It should be noted that for the database.tablename.case.insensitive parameter, Debezium currently only sets it to true by default for Oracle 11g, and sets it to false by default for other Oracle versions. Therefore, if the reader is not using the Oracle 11g version, there is no need to modify this parameter, but it is still necessary to explicitly specify the uppercase table name.

3. Large data delay

Data latency is high, sometimes it takes 3-5 minutes to capture data changes. For this problem, a clear solution has been given in the Flink CDC FAQ: add the following two configuration items to the create statement:

So why do it? We can still deepen our understanding of the tool by analyzing the source code and logs, combined with the working principle of Oracle Logminer.

The extraction of Logminer is mainly carried out in the execute method of Debezium's io.debezium.connector.oracle.logminer.LogMinerStreamingChangeEventSource. In order to save space, this article does not list the actual source code, but only extracts the key process and draws the flow chart below. Interested readers can compare the flow chart and analyze it in combination with the actual source code:

With the redo_log_catalog method, the DDL information of the data table can be monitored, and since the archive logs are permanently saved to the disk, all DDL and DML operations before the downtime can be obtained normally after the database goes down. But because it involves more information monitoring than the online catalog, and the resulting frequent log switching and log dump operations, the cost is also staggering.

According to the author's actual test situation, if debezium.log.mining.strategy is the default redo_log_catalog configuration, not only need to perform step ① more (this operation takes about half a minute to 1 minute), in step ④, according to The amount of data in the archived logs will also take time to fluctuate between 1 minute and 4 minutes; in step ⑤, the actual query of the V$LOGMNR_CONTENTS view often takes more than ten seconds to complete.

In addition, since the archive logs grow faster in the actual system, in actual use, it is often combined with the operation of periodically deleting or dumping expired logs. Since the above step ④ takes a long time, the author observed that during the execution of step ④, the archive logs added in step ② will expire and be deleted and dumped under a certain probability, so in the When querying in step ⑤, the following error will be reported because the log added in step ② cannot be found:

Generally speaking, the tables that Flink CDC needs to monitor, especially the tables that are of great significance to the business system, generally do not perform DDL operations, only need to capture DML operations, and for extremely special cases such as database downtime, it can also Use the method of full data update after the database is restored to ensure data consistency. Therefore, the online_catalog approach is sufficient for our needs.

In addition, whether you use online_catalog or the default redo_log_catalog, there will be a problem that the logs found in step ② and the logs actually needed in step ⑤ are out of sync. Therefore, add 'debezium.log.mining.continuous.mine'='true' This problem can be avoided by handing over the real-time log collection work to Oracle for automatic completion.

After the author configures these two parameters, the data delay can generally be reduced from a few minutes to about 5 seconds.

4. Adjust parameters to continue to reduce data delay

Steps ③ and ⑦ of the above flow chart refer to the determination of the LogMiner monitoring timing range and sleep time based on the configuration items. The process is further analyzed below, and a general methodology is given for further tuning of a single table.

By observing the getEndScn method in the io.debezium.connector.oracle.logminer.LogMinerHelper class, you can understand how debezium adjusts the monitoring timing range and sleep time. In order to facilitate the reader's understanding, the method is illustrated as follows with a flow chart:

As can be seen from the above flow chart, debezium provides log.mining.batch.size.* and log.mining.sleep.time.* two sets of parameters in order to make the step size of each logMiner run as close as possible to The increase step size of the database's own SCN is the same. It can be seen from this:

The setting of log.mining.batch.size.* and log.mining.sleep.time.* parameters is related to the overall performance of the database, and has nothing to do with the data changes of a single table;
log.mining.batch.size.default not only monitors the starting value of the timing range, but also monitors the threshold for timing range changes. Therefore, if you want to achieve a more flexible monitoring timing range adjustment, you can consider reducing this parameter appropriately;
Since the sleepTime will be adjusted according to the size of topScn and currentScn every time the monitoring timing range is determined, in order to achieve a more flexible adjustment of the sleep time, it may be considered to increase log.mining.sleep.time.increment.ms appropriately;
log.mining.batch.size.max cannot be too small, otherwise there will be a risk that the monitoring timing range will never be able to catch up with the current SCN of the database. For this, debezium has the following logic in io.debezium.connector.oracle.OracleStreamingChangeEventSourceMetrics:

If the current monitoring timing range reaches log.mining.batch.size.max, then debezium will give the above prompt in the log. In practical applications, you can know whether the value of log.mining.batch.size.max is reasonable by observing whether the log generated by Flink CDC contains this hint.

5. Hidden parameters of Debezium Oracle Connector

In fact, we have learned two hidden parameters from the above: debezium.database.tablename.case.insensitive (see the second section) and debezium.log.mining.continuous.mine (see the third section), these two None of the parameters are actually explained in Debezium's official documentation, but they can actually be used. By analyzing the source code, all hidden parameters of Debezium Oracle Connector are given, and their descriptions are as follows:

The author thinks that apart from the two parameters we have used above, the log.mining.history.recorder.class parameter is also worth paying attention to. Since this parameter currently defaults to io.debezium.connector.oracle.logminer.NeverHistoryRecorder, which is an empty class, when we analyze the behavior of Flink CDC, we customize the class that implements the io.debezium.connector.oracle.logminer.HistoryRecorder interface , which can realize personalized monitoring of Flink CDC behavior without modifying the source code

Related Articles

Explore More Special Offers

  1. Short Message Service(SMS) & Mail Service

    50,000 email package starts as low as USD 1.99, 120 short messages start at only USD 1.00

phone Contact Us