All Products
Search
Document Center

Realtime Compute for Apache Flink:FAQ about CDC

Last Updated:Apr 07, 2024

This topic provides answers to some frequently asked questions about Change Data Capture (CDC).

Can I cancel a Flink CDC deployment instead of restarting the deployment if the deployment fails?

You can modify the configuration of the deployment to specify a restart policy. For example, the following configuration specifies that a maximum of two restart attempts can be performed and the interval between the attempts is 10 seconds. If the deployment fails to start after two attempts, the deployment is canceled.

restart-strategy: fixed-delay
restart-strategy.fixed-delay.attempts: 2
restart-strategy.fixed-delay.delay: 10 s

MySQL CDC source tables and Hologres CDC source tables do not support window functions. How do I implement minute-level data aggregation on a MySQL CDC source table or Hologres CDC source table?

To obtain a similar effect of window aggregation on a MySQL CDC source table or Hologres CDC source table, you can use the following method to perform time-based aggregation: To modify the Spark configurations, perform the following steps:

  1. Use the DATE_FORMAT function to convert time fields into strings that are formatted as minutes and use the strings as window values.

  2. Use the GROUP BY function to aggregate the window values.

The following sample code provides an example on how to collect the statistics about the number of orders and sales every minute for each store.

SELECT 
    shop_id, 
    DATE_FORMAT(order_ts, 'yyyy-MM-dd HH:mm') AS window,
    COUNT(*) AS order_count, 
    SUM(price) AS amount 
FROM order_mysql_cdc 
GROUP BY shop_id, window

Can a MySQL CDC table be used only as a source table?

Yes, a MySQL CDC table can be used only as a source table. A MySQL CDC table can be used to read full and incremental data from MySQL database tables and can be used only as a source table. A MySQL table can be used as a dimension table or a result table.

Why does the MySQL CDC connector not read incremental data after the connector reads full data?

Problem description

Cause

Solution

The MySQL CDC connector reads only full data but does not read incremental data.

The MySQL CDC connector reads data of a secondary instance or read-only instance of ApsaraDB RDS for MySQL V5.6 based on the configuration of the MySQL CDC deployment. However, secondary instances or read-only instances of ApsaraDB RDS for MySQL V5.6 do not write data to log files. As a result, the downstream synchronization tool cannot read the incremental change information.

We recommend that you use an ApsaraDB RDS for MySQL instance that can process write requests or upgrade the ApsaraDB RDS for MySQL instance to a version later than V5.6.

The MySQL CDC deployment is suspended after full data is read.

The time required for reading full data in the MySQL CDC deployment is excessively long. In this case, the amount of data in the last shard is excessively large. This causes an out of memory (OOM) error. As a result, the deployment is suspended after a failover.

Increase the degree of parallelism on the MySQL database to accelerate full data reading.

After full data synchronization is complete, the MySQL CDC connector automatically switches the data synchronization deployment to the incremental data synchronization phase. If the MySQL CDC connector runs multiple subtasks in parallel to read full data, the MySQL CDC connector needs to wait for one more checkpoint before the data synchronization deployment enters the incremental data synchronization phase. This ensures that full data is written to the sink before the incremental data is read. This way, data accuracy is ensured. If the checkpointing interval that you specified is excessively large, the MySQL CDC connector needs to wait for a long period of time to start the synchronization of incremental data. For example, if you set the checkpointing interval to 20 minutes, the MySQL CDC connector waits for 20 minutes after the full data synchronization is complete.

To avoid this issue, we recommend that you specify an appropriate checkpointing interval based on your business requirements.

What do I do if commas (,) in the regular expression in the value of the table-name parameter in a MySQL CDC deployment fail to be parsed?

  • Problem description

    For example, if the 'table-name' = 't_process_wi_history_\d{1,2}' configuration is used in the MySQL CDC deployment, an error occurs. The following figure shows the error details.报错参数

  • Cause

    Debezium uses commas (,) as delimiters and does not support regular expressions that contain commas (,). As a result, a parsing error occurs.

  • Solution

    We recommend that you use the 'table-name' = '(t_process_wi_history_\d{1}|t_process_wi_history_\d{2})' configuration.

When I restart a deployment, does the connector for the MySQL CDC source table consume data from the binary log file position at which the deployment is canceled or from the binary log file position at which the deployment is configured to start?

When you restart a deployment, you can configure the startup policy based on your business requirements. If you set Starting Strategy to NONE in the Deployment Starting Configuration dialog box, the connector for the MySQL CDC source table re-consumes data from the binary log file position at which the deployment is configured to start. If you set Starting Strategy to Latest State in the Deployment Starting Configuration dialog box, the connector for the MySQL CDC source table consumes data from the binary log file position at which the deployment is canceled.

For example, a deployment is configured to start from the binary log file position {file=mysql-bin.01, position=40} and the deployment is canceled after it runs for a period of time. In this case, data is consumed at the binary log file position {file=mysql-bin.01, position=210}. If you set Starting Strategy to NONE in the Deployment Starting Configuration dialog box, the connector for the MySQL CDC source table re-consumes data from the binary log file position {file=mysql-bin.01, position=40}. If you set Starting Strategy to Latest State in the Deployment Starting Configuration dialog box, the connector for the MySQL CDC source table consumes data from the binary log file position {file=mysql-bin.01, position=210}.

Important

When you restart a deployment, make sure that the required binary log file is not deleted from the server due to expiration. Otherwise, an error is returned.

How does the connector for a MySQL CDC source table work? How does a MySQL CDC source table affect a database?

If the scan.startup.mode parameter in the WITH clause for a MySQL CDC source table is set to initial, the connector for the MySQL CDC source table connects to the MySQL database by using a Java Database Connectivity (JDBC) driver, executes a SELECT statement to read full data, and then records the binary log file position. The default value of the scan.startup.mode parameter is initial. After full data reading is complete, the connector reads incremental data from the binary log file at the binary log file position that is recorded.

During full data reading, the query load in the MySQL database may increase because the SELECT statement is executed to query data. During incremental data reading, the binlog client is used to connect to the MySQL database to read binary log data. If the number of data tables that are used increases, excessive connections may exist. You can run the following MySQL command to query the maximum number of connections:

show variables like '%max_connections%';

How do I enable the MySQL CDC connector to skip the savepoint data reading phase and read only the change data?

You can configure the scan.startup.mode parameter in the WITH clause to specify the startup mode that you want to use for data consumption. You can specify to consume the accessible binary log data from the earliest offset, the most recent binary log data, the binary log data from the specified timestamp, or the binary log data from the specified offset. For more information about the scan.startup.mode parameter, see the "Parameters in the WITH clause" section in Parameters in the WITH clause.

How does the MySQL CDC connector read data from a MySQL database on which sharding is performed?

For example, a MySQL database has multiple tables such as table user_00, table user_02, and table user_99 after sharding, and the schemas of these tables are the same. In this scenario, you can use the table-name option to specify a regular expression to match multiple tables whose data can be read. For example, you can set table-name to user_.* to monitor all tables whose prefix is user_. If the schemas of all tables in the database are the same, you can use the database-name option to achieve the same effect.

What do I do if the data reading efficiency is low and backpressure exists when full data is read from tables?

If the sink processes data at a low speed, backpressure may occur. You can check whether the sink has backpressure. If the sink has backpressure, use one of the following methods to resolve the backpressure issue on the sink first. To resolve this issue, you can use one of the following methods:

  • Increase the degree of parallelism.

  • Enable aggregation optimization features, such as miniBatch.

How do I determine whether full data synchronization is complete in a MySQL CDC deployment?

  • You can determine whether full data synchronization is complete in the deployment based on the value of the currentEmitEventTimeLag metric on the Metrics tab of the Deployments page.

    The currentEmitEventTimeLag metric indicates the difference between the time at which the source sends a data record to the sink and the time at which the data record is generated in the database. This metric is used to measure the delay from the time when the data is generated in the database to the time when the data leaves the source.指标

    Description of the value of the currentEmitEventTimeLag metric:

    • If the value of currentEmitEventTimeLag is less than or equal to 0, the full data synchronization in the MySQL CDC deployment is not complete.

    • If the value of currentEmitEventTimeLag is greater than 0, the MySQL CDC deployment completes full data synchronization and starts to read binary log data.

  • Check whether the logs of the TaskManager of the MySQL CDC source table contain "BinlogSplitReader is created". If this message appears, full data is read. The following figure shows "BinlogSplitReader is created" in the logs of the TaskManager.

    日志

What do I do if the load on the MySQL database becomes high due to multiple MySQL CDC deployments?

The connector for a MySQL CDC source table needs to connect to a database to read binary log data. If the number of source tables increases, the load on the database also increases. To reduce the load on the database, you can synchronize data from a MySQL CDC source table to a Message Queue for Apache Kafka result table and consume data in the result table. This way, MySQL CDC deployments do not depend on the reading of binary log data. For more information, see Synchronize data from all tables in a MySQL database to Kafka.

If the load on the database becomes high due to data synchronization by using the CREATE TABLE AS statement, you can merge multiple deployments that use the CREATE TABLE AS statement into one deployment to run. If the configurations of the deployments that use the CREATE TABLE AS statement are the same, you can configure the same server ID for each MySQL CDC source table to reuse the data source and reduce the load on the database. For more information, see Example 4: execution of multiple CREATE TABLE AS statements.

When I use the MySQL CDC connector to read incremental data from a table, a time difference of eight hours exists between the data of a timestamp field that is read and the time zone of my MySQL server. Why?

  • The value of the server-time-zone parameter that is configured in the CDC deployment is inconsistent with the time zone of your MySQL server. When a timestamp field in the binary log data is parsed, this error occurs.

  • A custom serializer, such as MyDeserializer implements DebeziumDeserializationSchema, is used in DataStream. When the custom serializer parses data of the TIMESTAMP type, this error occurs. You can specify serverTimeZone in the code based on the parsing information about the data of the TIMESTAMP type in RowDataDebeziumDeserializeSchema.

      private TimestampData convertToTimestamp(Object dbzObj, Schema schema) {
              if (dbzObj instanceof Long) {
                  switch (schema.name()) {
                      case Timestamp.SCHEMA_NAME:
                         return TimestampData.fromEpochMillis((Long) dbzObj);
                      case MicroTimestamp.SCHEMA_NAME:
                         long micro = (long) dbzObj;
                         return TimestampData.fromEpochMillis(micro / 1000, (int) (micro % 1000 * 1000));
                      case NanoTimestamp.SCHEMA_NAME:
                         long nano = (long) dbzObj;
                         return TimestampData.fromEpochMillis(nano / 1000_000, (int) (nano % 1000_000));
                  }
             }
             LocalDateTime localDateTime = TemporalConversions.toLocalDateTime(dbzObj, serverTimeZone);
             return TimestampData.fromLocalDateTime(localDateTime);
        }

Can the MySQL CDC connector listen to secondary databases? How do I configure secondary databases?

Yes, the MySQL CDC connector can listen to secondary databases. To allow the MySQL CDC connector to listen to secondary databases, you must add the following configuration to the code for secondary databases. After the configuration is complete, the data that is synchronized from the primary database is written to the binary log files of the secondary databases.

log-slave-updates = 1

If the global transaction identifier (GTID) mode is enabled for the primary database, you must also enable the GTID mode for the secondary databases. To enable the GTID mode, add the following configurations to the code for the primary and secondary databases:

gtid_mode = on
enforce_gtid_consistency = on

How do I obtain DDL events from a database?

When you use a CDC connector for Apache Flink, you can call a DataStream API to use the MySqlSource class and configure the includeSchemaChanges(true) parameter to obtain DDL events. After you obtain DDL events, you can write code for subsequent processing. Sample code:

MySqlSource<xxx> mySqlSource =
 MySqlSource.<xxx>builder()
 .hostname(...)
 .port(...)
 .databaseList("<databaseName>")
 .tableList("<databaseName>.<tableName>")
 .username(...)
 .password(...)
 .serverId(...)
 .deserializer(...)
 .includeSchemaChanges(true) // Configure the parameter to obtain DDL events.
 .build();
 ... // Write other processing logic.

Does the MySQL CDC connector support synchronization of data from all tables in a MySQL database? How do I synchronize data from all tables in a MySQL database?

Yes, Realtime Compute for Apache Flink allows you to execute the CREATE TABLE AS or CREATE DATABASE AS statement to synchronize data from all tables in a MySQL database. For more information, see CREATE TABLE AS statement or CREATE DATABASE AS statement.

Note

ApsaraDB RDS for MySQL V5.6 instances do not write data to log files. As a result, the downstream synchronization tool cannot read the incremental change information.

Why does incremental data of a table of a specific database in an instance fails to be synchronized?

A binary log filter is configured for the MySQL server, which filters out the binary logs of specific databases. You can run the show master status command to query the values of Binlog_Ignore_DB and Binlog_Do_DB. The following example shows the results.

mysql> show master status;
+------------------+----------+--------------+------------------+----------------------+
| File             | Position | Binlog_Do_DB | Binlog_Ignore_DB |  Executed_Gtid_Set   |
+------------------+----------+--------------+------------------+----------------------+
| mysql-bin.000006 |     4594 |              |                  | xxx:1-15             |
+------------------+----------+--------------+------------------+----------------------+

How do I configure the tableList option when I use the DataStream API to create a MySQL CDC source table?

The value of the tableList option must include database names and table names, rather than the table names in the DataStream API. When you create a MySQL CDC source table, you can configure tableList("yourDatabaseName.yourTableName").

Can the MongoDB CDC connector continue to read data from the checkpoints of a deployment if the deployment fails during the full reading phase?

Yes, you can configure 'scan.incremental.snapshot.enabled'= 'true' in the WITH clause of the deployment to allow the MongoDB CDC connector to continue to read data from the checkpoints of the deployment if the deployment fails during the full reading phase.

Does the MongoDB CDC connector support reading both full and incremental data or reading only incremental data?

Yes. By default, the MongoDB CDC connector reads both full and incremental data. If you want to allow the MongoDB CDC connector to read only incremental data, configure 'scan.startup.mode' = 'latest-offset' in the WITH clause of your deployment.

Does the MongoDB CDC connector support subscription to only specific collections of a database?

No, the MongoDB CDC connector does not support subscription to only specific collections of a database. You can use the MongoDB CDC connector to subscribe to all collections of a database. For example, if you configure 'database' = 'mgdb' and 'collection' = '' in the WITH clause, the connector subscribes to all collections of the MongoDB database.

Does the MongoDB CDC connector support concurrent reading?

If you set the scan.incremental.snapshot.enabled parameter to true, concurrent reading is supported during the initial savepoint phase.

Which MongoDB versions are supported by the MongoDB CDC connector?

The MongoDB CDC connector is implemented based on the change stream feature. This feature is introduced in MongoDB 3.6. Theoretically, the MongoDB CDC connector supports MongoDB 3.6 and later. We recommend that you use MongoDB 4.0 or later. If the MongoDB version is earlier than 3.6, the error message "Unrecognized pipeline stage name: '$changeStream'" may appear when the MongoDB CDC connector reads change streams.

Which MongoDB database architectures are supported by the MongoDB CDC connector?

Change streams require that a MongoDB database run in a replica set architecture or sharded cluster architecture. To simplify operations during on-premises testing, you can run the MongoDB database in a standalone replica set architecture. You can run the rs.initiate() command to initialize change streams. If you use the MongoDB CDC connector to read data from a MongoDB database that runs in a standalone replica set architecture, the error message "The $changestage is only supported on replica sets" may appear.

Does the MongoDB CDC connector support Debezium-related parameters?

No, the MongoDB CDC connector does not support Debezium-related parameters. The MongoDB CDC connector is independently developed in Flink CDC and does not depend on Debezium.

The MongoDB CDC connector cannot access a MongoDB database based on a username and its password and an error message indicating invalid username or password appears. However, other components can access the MongoDB database based on the username and password. Why?

This occurs because the user credentials are created under a specific database. If you want to access the MongoDB database, add 'connection.options' = 'authSource=Database to which the user belongs' to the WITH clause.

Can the MongoDB CDC connector read data from the checkpoints of a deployment after the deployment restarts? What is the working principle?

Yes, the MongoDB CDC connector can read data from the checkpoints of a deployment after the deployment restarts. Checkpoints record resume tokens for change streams. The connector resumes reading the change stream based on the related resume token. A resume token corresponds to the location of the oplog.rs collection. The oplog.rs collection is a collection of MongoDB change logs and has a fixed capacity.

If the data record of a resume token does not exist in the oplog.rs collection, the resume token may be invalid. In this case, you can set the size of the oplog.rs collection to a proper value to prevent the oplog.rs collection from being retained for an excessively short period of time. For more information, see Change the Size of the OplogWARNING.

The resume token can be updated based on newly arrived change records and heartbeat records.

Does the MongoDB CDC connector support the output of UPDATE_BEFORE messages (pre-update images)?

  • If the pre-image or post-image feature is enabled for a MongoDB database of 6.0 or later, you can configure 'scan.full-changelog' = 'true' for your SQL deployment. This way, MongoDBSource can generate UPDATE_BEFORE messages and the ChangelogNormalize operator is not used.

  • The original oplog.rs collection of a MongoDB database of a version earlier than 6.0 includes the INSERT, UPDATE, REPLACE, and DELETE change types but does not include the UPDATE_BEFORE change type. Therefore, MongoDBSource cannot directly generate UPDATE_BEFORE messages. Flink supports only UPDATE-based semantics. When you use MongoDBTableSource, the Flink planner automatically optimizes data by using the ChangelogNormalize operator, supplements UPDATE_BEFORE messages, and then generates INSERT, UPDATE_BEFORE, UPDATE_AFTER, and DELETE messages. However, the ChangelogNormalize operator causes a large overhead because the operator saves the state of all keys before the update. If a DataStream deployment uses MongoDBSource without optimization by the Flink planner, the ChangelogNormalize operator is not automatically used for optimization. As a result, MongoDBSource cannot generate UPDATE_BEFORE messages. If you want to obtain pre-update images, you must manage state data on your own. If you do not want to manage state data on your own, you can enable MongoDBTableSource to convert the original streams in the oplog.rs collection into ChangelogStream or RetractStream, and use the Flink planner to obtain pre-update images. Sample code:

     tEnv.executeSql("CREATE TABLE orders ( ... ) WITH ( 'connector'='mongodb-cdc',... )");
    
     Table table = tEnv.from("orders")
     .select($("*"));
    
     tEnv.toChangelogStream(table)
     .print()
     .setParallelism(1);
    
     env.execute();

How do I configure a parameter to filter out invalid date values?

To filter out invalid date values, you can add one of the following configurations to the WITH clause for the Postgres CDC connector based on your business requirements:

  • 'debezium.event.deserialization.failure.handling.mode'='warn': Skip dirty data and print the dirty data to WARN logs.

  • 'debezium.event.deserialization.failure.handling.mode'='ignore': Skip dirty data. Do not print dirty data to logs.

Why does an error message indicating that TOAST data is not transmitted appear when I use the PostgreSQL CDC connector?

Make sure that the replica identity is complete. The size of TOAST data is large. To reduce the size of WAL logs, the wal2json plug-in does not insert TOAST data into the updated data if the TOAST data remains unchanged and the 'debezium.schema.refresh.mode'='columns_diff_exclude_unchanged_toast' configuration is used.

Why are WAL logs not released when the disk usage on the PostgreSQL database server is high?

The PostgreSQL CDC connector updates the log sequence number (LSN) in the replication slots of the PostgreSQL database only when checkpointing is complete. If the disk usage is high, check whether checkpointing of the PostgreSQL database is enabled and whether a replication slot is not used or has a synchronization latency.

What is returned if the precision of the DECIMAL data that is synchronized from PostgreSQL by using the PostgreSQL CDC connector exceeds the maximum precision?

If the precision of the DECIMAL data that the PostgreSQL CDC connector receives is greater than the precision of the data type declared in the statement of the source table that uses the Postgres CDC connector, the DECIMAL data is processed as null. In this case, you can configure 'debezium.decimal.handling.mode' = 'string' to process the data that is read from the PostgreSQL database as strings.

How do I configure the tableList option when I create a PostgreSQL CDC source table by using the DataStream API?

The value of the tableList option must include database names and table names, rather than the table names in the DataStream API. When you create a PostgreSQL CDC source table, you can configure tableList("yourDatabaseName.yourTableName").

Why do I fail to download flink-sql-connector-mysql-cdc-2.2-SNAPSHOT.jar? Why do the xxx-SNAPSHOT dependencies not exist in the Maven repository?

The xxx-SNAPSHOT version corresponds to the code of the development branch based on the version management mechanism of mainstream Maven projects. If you want to use this version, you must download the source code and compile the related JAR package. You can use the package of a stable version, such as flink-sql-connector-mysql-cdc-2.1.0.jar. You can obtain the package from the Maven central repository.

What are the differences between flink-sql-connector-xxx.jar and flink-connector-xxx.jar?

The package naming conventions of Flink CDC connectors are consistent with the package naming conventions of other Flink connectors.

  • flink-sql-connector-xx is a fat JAR. In addition to the code of the connector, all third-party packages that the connector depends on are shaded into the fat JAR. flink-sql-connector-xx is provided for SQL deployments. You need to only add the fat JAR to the lib directory.

  • flink-connector-xx contains only the code of the connector and does not contain the required dependencies of the connector. flink-connector-xx is provided for DataStream deployments. You must manage the required third-party package dependencies, and perform the exclude and shade operations to handle dependency conflicts.

Why am I unable to find the package of a connector of 2.X version in the Maven repository?

The group ID is changed from com.alibaba.ververica to com.ververica in Flink CDC connectors of 2.0.0. Therefore, the package path of the 2.X version in the Maven repository is changed to /com/ververica.

When the DataStream API uses the JsonDebeziumDeserializationSchema deserializer, data of the numeric type is displayed as strings. What do I do?

Different conversion methods are used when Debezium parses data of the numeric type. For more information, see Debezium connector for MySQL. The following sample code shows the conversion method that is configured in Flink CDC.

Properties properties = new Properties();
....
properties.setProperty("bigint.unsigned.handling.mode","long");
properties.setProperty("decimal.handling.mode","double");

MySqlSource.<String>builder()
 .hostname(config.getHostname())
 ....
 .debeziumProperties(properties);

What do I do if the error message "Replication slot "xxxx" is active" appears?

  • Problem description

    After your PostgreSQL CDC deployment ends, the slot may not be correctly released.

  • Solutions

    Use one of the following methods to release the slot:

    • Run the following command in your PostgreSQL CDC deployment to manually release the slot:

      select pg_drop_replication_slot('rep_slot');

      If the error message "ERROR: replication slot "rep_slot" is active for PID 162564" appears, the slot is being occupied by the process whose ID is specified in the message. You must terminate the process before you can release the slot. To terminate the process and release the slot, run the following commands:

      select pg_terminate_backend(162564);
      select pg_drop_replication_slot('rep_slot');
    • Enable automatic slot cleanup. To enable this feature, add the 'debezium.slot.drop.on.stop' = 'true' configuration to the Postgres source of the deployment. This way, the slot can be automatically dropped when the PostgreSQL CDC deployment stops.

      Warning

      If you enable automatic slot cleanup, WAL logs are reclaimed. When the deployment is restarted, data is lost and the At-Least Once semantics cannot be ensured.

What do I do if the error message "Lock wait timeout exceeded; try restarting transaction" appears?

  • Problem description

    org.apache.kafka.connect.errors.ConnectException: Lock wait timeout exceeded; try restarting transaction Error code: 1205; SQLSTATE: 40001.
        at io.debezium.connector.mysql.AbstractReader.wrap(AbstractReader.java:241)
        at io.debezium.connector.mysql.AbstractReader.failed(AbstractReader.java:218)
        at io.debezium.connector.mysql.SnapshotReader.execute(SnapshotReader.java:857)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1147)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:622)
        at java.lang.Thread.run(Thread.java:834)
    Caused by: com.mysql.cj.jdbc.exceptions.MySQLTransactionRollbackException: Lock wait timeout exceeded; try restarting transaction
        at com.mysql.cj.jdbc.exceptions.SQLError.createSQLException(SQLError.java:123)
        at com.mysql.cj.jdbc.exceptions.SQLError.createSQLException(SQLError.java:97)
        at com.mysql.cj.jdbc.exceptions.SQLExceptionsMapping.translateException(SQLExceptionsMapping.java:122)
        at com.mysql.cj.jdbc.StatementImpl.executeQuery(StatementImpl.java:1200)
        at io.debezium.jdbc.JdbcConnection.query(JdbcConnection.java:554)
        at io.debezium.jdbc.JdbcConnection.query(JdbcConnection.java:497)
        at io.debezium.connector.mysql.SnapshotReader.readTableSchema(SnapshotReader.java:888)
        at io.debezium.connector.mysql.SnapshotReader.execute(SnapshotReader.java:550)
        ... 3 more
  • Cause

    If incremental savepoint reading is not enabled for a MySQL CDC source table, a timeout error may occur when the source table applies for a lock.

  • Solution

    Upgrade the Ververica Runtime (VVR) version to 4.0.8 or later. In VVR 4.0.8 and later, the lock-free algorithm is used by default, and source tables do not need to apply for locks.

What do I do if the error message "Cause by: java.lang.ArrayIndexOutOfBoundsException" appears?

  • Problem description报错情况

  • Cause

    An error occurs in the binary log reading tool that is used in VVR of a version earlier than 4.0.12.

  • Solution

    Upgrade the VVR version to 4.0.12 or later.

What do I do if the error message "Caused by: io.debezium.DebeziumException: Received DML 'xxx' for processing, binlog probably contains events generated with statement or mixed based replication format" appears?

  • Problem description

    Caused by: io.debezium.DebeziumException: Received DML 'insert into gd_chat_fetch_log (
    
    id,
    c_cursor,
    d_timestamp,
    msg_cnt,
    state,
    ext1,
    ext2,
    cost_time
    
    ) values (
    null,
    null,
    '2022-03-23 16:51:00.616',
    0,
    1,
    null,
    null,
    0
    )' for processing, binlog probably contains events generated with statement or mixed based replication format
  • Cause

    The binary log format is MIXED. MySQL CDC source tables support only binary logs in the ROW format.

  • Solution

    1. Run the show global variables like "binlog_format" command to query the binary log format.

      Note

      This show variables like "binlog_format" command can be used only to query the format of the current binary log.

    2. If the binary log format is not ROW, change the format to ROW on the MySQL server. For more information, see Setting The Binary Log Format.

    3. Restart the deployment.

What do I do if the error message "Encountered change event for table xxx.xxx whose schema isn't known to this connector" appears?

  • Problem description报错详情

    202x-xx-xx xx:xx:xx,xxx ERROR io.debezium.connector.mysql.BinlogReader                     [] - Encountered change event 'Event{header=EventHeaderV4{timestamp=xxx, eventType=TABLE_MAP, serverId=xxx, headerLength=xxx, dataLength=xxx, nextPosition=xxx, flags=xxx}, data=TableMapEventData{tableId=xxx, database='xxx', table='xxx', columnTypes=xxx, xxx..., columnMetadata=xxx,xxx..., columnNullability={xxx,xxx...}, eventMetadata=null}}' at offset {ts_sec=xxx, file=mysql-bin.xxx, pos=xxx, gtids=xxx, server_id=xxx, event=xxx} for table xxx.xxx whose schema isn't known to this connector. One possible cause is an incomplete database history topic. Take a new snapshot in this case.
    Use the mysqlbinlog tool to view the problematic event: mysqlbinlog --start-position=30946 --stop-position=31028 --verbose mysql-bin.004419
    202x-xx-xx xx:xx:xx,xxx ERROR io.debezium.connector.mysql.BinlogReader                     [] - Error during binlog processing. Last offset stored = null, binlog reader near position = mysql-bin.xxx/xxx
    202x-xx-xx xx:xx:xx,xxx ERROR io.debezium.connector.mysql.BinlogReader                     [] - Failed due to error: Error processing binlog event
    org.apache.kafka.connect.errors.ConnectException: Encountered change event for table statistic.apk_info whose schema isn't known to this connector
        at io.debezium.connector.mysql.AbstractReader.wrap(AbstractReader.java:241) ~[ververica-connector-mysql-1.12-vvr-3.0.0-SNAPSHOT-jar-with-dependencies.jar:1.12-vvr-3.0.0-SNAPSHOT]
        at io.debezium.connector.mysql.AbstractReader.failed(AbstractReader.java:218) ~[ververica-connector-mysql-1.12-vvr-3.0.0-SNAPSHOT-jar-with-dependencies.jar:1.12-vvr-3.0.0-SNAPSHOT]
        at io.debezium.connector.mysql.BinlogReader.handleEvent(BinlogReader.java:607) ~[ververica-connector-mysql-1.12-vvr-3.0.0-SNAPSHOT-jar-with-dependencies.jar:1.12-vvr-3.0.0-SNAPSHOT]
        at com.github.shyiko.mysql.binlog.BinaryLogClient.notifyEventListeners(BinaryLogClient.java:1104) [ververica-connector-mysql-1.12-vvr-3.0.0-SNAPSHOT-jar-with-dependencies.jar:1.12-vvr-3.0.0-SNAPSHOT]
        at com.github.shyiko.mysql.binlog.BinaryLogClient.listenForEventPackets(BinaryLogClient.java:955) [ververica-connector-mysql-1.12-vvr-3.0.0-SNAPSHOT-jar-with-dependencies.jar:1.12-vvr-3.0.0-SNAPSHOT]
        at com.github.shyiko.mysql.binlog.BinaryLogClient.connect(BinaryLogClient.java:595) [ververica-connector-mysql-1.12-vvr-3.0.0-SNAPSHOT-jar-with-dependencies.jar:1.12-vvr-3.0.0-SNAPSHOT]
        at com.github.shyiko.mysql.binlog.BinaryLogClient$7.run(BinaryLogClient.java:839) [ververica-connector-mysql-1.12-vvr-3.0.0-SNAPSHOT-jar-with-dependencies.jar:1.12-vvr-3.0.0-SNAPSHOT]
        at java.lang.Thread.run(Thread.java:834) [?:1.8.0_102]
    Caused by: org.apache.kafka.connect.errors.ConnectException: Encountered change event for table xxx.xxx whose schema isn't known to this connector
        at io.debezium.connector.mysql.BinlogReader.informAboutUnknownTableIfRequired(BinlogReader.java:875) ~[ververica-connector-mysql-1.12-vvr-3.0.0-SNAPSHOT-jar-with-dependencies.jar:1.12-vvr-3.0.0-SNAPSHOT]
        at io.debezium.connector.mysql.BinlogReader.handleUpdateTableMetadata(BinlogReader.java:849) ~[ververica-connector-mysql-1.12-vvr-3.0.0-SNAPSHOT-jar-with-dependencies.jar:1.12-vvr-3.0.0-SNAPSHOT]
        at io.debezium.connector.mysql.BinlogReader.handleEvent(BinlogReader.java:590) ~[ververica-connector-mysql-1.12-vvr-3.0.0-SNAPSHOT-jar-with-dependencies.jar:1.12-vvr-3.0.0-SNAPSHOT]
        ... 5 more
  • Causes

    • You do not have the required permissions on specific databases that are used in the deployment.

    • The 'debezium.snapshot.mode'='never' configuration is used in the deployment. If the debezium.snapshot.mode option is set to never, the data is read from the beginning of binary logs. However, the table schema that corresponds to the change event in the beginning of binary logs does not match the schema of the current table. As a result, this error occurs.

    • A change that cannot be interpreted by Debezium exists. For example, if `DEFAULT (now())` exists, this error may occur.

  • Solutions

    • Check whether you have the required permissions on all databases that are used in the deployment. For more information, see Configure a MySQL database.

    • We recommend that you do not use the 'debezium.snapshot.mode'='never' configuration. To avoid this error, you can use the 'debezium.inconsistent.schema.handling.mode' = 'warn' configuration.

    • Query the io.debezium.connector.mysql.MySqlSchema WARN log to check for the change that cannot be interpreted by Debezium. For example, `DEFAULT (now())` cannot be interpreted.

What do I do if the error message "org.apache.kafka.connect.errors.DataException: xxx is not a valid field name" appears?

  • Problem description

    org.apache.kafka.connect.errors.DataException: xxx is not a valid field name
        at org.apache.kafka.connect.data.Struct.lookupField(Struct.java:254)
        at org.apache.kafka.connect.data.Struct.get(Struct.java:74)
        at com.alibaba.ververica.cdc.debezium.table.RowDataDebeziumDeserializeSchema.lambdaşcreateRowConverter$508c5858$1(RowDataDebeziumDeserializeSchema.java:369)
        at com.alibaba.ververica.cdc.debezium.table.RowDataDebeziumDeserializeSchema.lambdaşwrapIntoNullableConverter$7b91dc26$1(RowDataDebeziumDeserializeSchema.java:394)
        at com.alibaba.ververica.cdc.debezium.table.RowDataDebeziumDeserializeSchema.extractAfterRow(RowDataDebeziumDeserializeSchema.java:127) 
        at com.alibaba.ververica.cdc.debezium.table.RowDataDebeziumDeserializeSchema.deserialize(RowDataDebeziumDeserializeSchema.java:102)
        at com.alibaba.ververica.cdc.debezium.internal.DebeziumChangeConsumer.handleBatch(DebeziumChangeConsumer.java:124)
        at io.debezium.embedded.ConvertingEngineBuilder.lambdaşnotifying$2(ConvertingEngineBuilder.java:82) 
        at io.debezium.embedded.EmbeddedEngine.run(EmbeddedEngine.java:812) 
        at io.debezium.embedded.ConvertingEngineBuilder$2.run(ConvertingEngineBuilder.java:171)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1147)
        at java.util.concurrent.ThreadPoolExecutorsWorker.run ThreadPoolExecutor.java:622) 
        at java.lang.Thread.run(Thread.java: 834)
  • Cause

    A physical table in the sharded database does not contain specific fields that you declare in the MySQL CDC table. As a result, the schema of the table in the sharded database is inconsistent with the schema of the MySQL CDC table.

    For example, you use the regular expression mydb.users_\d{3} to monitor the users_001, users_002,…, and users_999 tables in the mydb database, and you declare the user_name field in the DDL statement of the MySQL CDC table. If the users_002 table does not contain the user_name field, this error occurs when the binary logs of the users_002 table are parsed.

  • Solutions

    Make sure that each table in the sharded database contains the fields declared in the DDL statement of the MySQL CDC table.

    You can also upgrade the VVR version of your deployment to 6.0.2 or later. In VVR 6.0.2 and later, MySQL CDC deployments automatically use the widest schema in a sharded database.

What do I do if the error message "Caused by: java.sql.SQLSyntaxErrorException: Unknown storage engine 'BLACKHOLE'" appears?

  • Problem description错误详情

  • Cause

    An unsupported syntax is used in the DDL statement of MySQL V5.6. As a result, a parsing error occurs.

  • Solutions

    • Add the 'debezium.database.history.store.only.monitored.tables.ddl'='true' and 'debezium.database.exclude.list'='mysql' configurations to the WITH clause in the DDL statement of the MySQL CDC table.

    • Upgrade the VVR version of your MySQL CDC deployment to 6.0.2 or later. In VVR 6.0.2 or later, MySQL CDC deployments provide enhanced parsing capabilities for DDL statements.

What do I do if the error message "Caused by: org.apache.kafka.connect.errors.ConnectException: The connector is trying to read binlog starting at GTIDs ..., but this is no longer available on the server. Reconfigure the connector to use a snapshot when needed." appears?

  • Problem description

    org.apache.kafka.connect.errors.ConnectException: The connector is trying to read binlog starting at GTIDs xxx and binlog file 'binlog.000064', pos=89887992, skipping 4 events plus 1 rows, but this is no longer available on the server. Reconfigure the connector to use a snapshot when needed
        at io.debezium.connector.mysql.MySqlConnectorTask.start(MySqlConnectorTask.java:133)
        at io.debezium.connector.common. BaseSourceTask.start (BaseSourceTask.java:106) 
        at io.debezium.embedded.EmbeddedEngine.run (EmbeddedEngine.java:758) 
        at io.debezium.embedded.ConvertingEngineBuilder$2.run(ConvertingEngineBuilder.java:171)
        at java.util.concurrent.ThreadPoolExecutor. runWorker(ThreadPoolExecutor.java:1147)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:622)
        at java.lang.Thread.run(Thread.java:834)
  • Causes and solutions

    Cause

    Solution

    The binary log file that is being read by the deployment is deleted from the MySQL server. This is because the retention period of binary logs on the MySQL server is excessively short.

    Increase the retention period of binary logs. For example, you can change the retention period of binary logs to 7 days. To change the retention period of binary logs, run the following command:

    show variables like 'expire_logs_days';
    set global expire_logs_days=7;

    The MySQL CDC deployment consumes binary logs at a low speed. For example, a downstream aggregate operator or a sink operator has backpressure for a long period of time and the backpressure is transferred to the source. As a result, the source cannot consume data.

    Optimize the resource configuration of the deployment to allow the source to consume data as expected.

    Logs of ApsaraDB for RDS MySQL can be retained for a maximum of 18 hours and occupy up to 30% of storage space. If the retention period of the logs exceeds 18 hours or the storage space occupied by the logs exceeds 30%, the log deletion operation is triggered. If more than 30% of storage space is occupied due to excessive data that is written, binary logs may be deleted and cannot be used.

    Modify the binary log expiration policy of ApsaraDB RDS for MySQL to make sure that binary logs can be read as expected.

    If you use a read-only ApsaraDB RDS for MySQL instance to consume CDC data, the availability of binary logs is not ensured. The binary logs that are consumed by a read-only ApsaraDB RDS for MySQL instance can be retained for a minimum of 10 seconds on an on-premises machine and then are uploaded to Object Storage Service (OSS). If the connector for a MySQL CDC source table reads data from a read-only ApsaraDB RDS for MySQL instance based on the configuration of the deployment and the deployment cannot resume within 10 seconds after a failover, this error occurs.

    We recommend that you configure parameters to prohibit the connector for a MySQL CDC source table from reading data from read-only ApsaraDB RDS for MySQL instances.

    Note

    The hostname of a read-only instance starts with rr, and the hostname of a common instance starts with rm.

    An internal data migration is performed on the ApsaraDB RDS for MySQL instance.

    Restart the deployment to re-read the data.

What do I do if the error message "EventDataDeserializationException: Failed to deserialize data of EventHeaderV4.... Caused by: java.net.SocketException: Connection reset" appears?

  • Problem description

    EventDataDeserializationException: Failed to deserialize data of EventHeaderV4 .... Caused by: java.net.SocketException: Connection reset
        at com.github.shyiko.mysql.binlog.event.deserialization.EventDeserializer.deserializeEventData(EventDeserializer.java:304)
        at com.github.shyiko.mysql.binlog.event.deserialization.EventDeserializer.nextEvent(EventDeserializer.java:227)
        at io.debezium.connector.mysql.BinlogReader$1.nextEvent(BinlogReader.java:252)
        at com.github.shyiko.mysql.binlog.BinaryLogClient.listenForEventPackets(BinaryLogClient.java:934)
    ... 3 more
    Caused by: java.io.EOFException
        at com.github.shyiko.mysql.binlog.io.ByteArrayInputStream.read (ByteArrayInputStream.java:192)
        at java.io.InputSt ream.read (InputStream.java:170)
        at java.io.InputSt ream.skip (InputStream.java:224)
        at com.github.shyiko.mysql.binlog.event.deserialization.EventDeserializer.deserializeEventData(EventDeserializer.java:301)
    ...    6 more
  • Causes

    • A network issue occurs.

    • The deployment has backpressure.

      If a MySQL CDC deployment has backpressure, the Binlog client that is used in the MySQL CDC source table cannot continue to read data due to the backpressure. To minimize the number of remaining connections on the database, the MySQL database automatically cancels the connection when the connection of a Binlog client is inactive for a period of time that exceeds the specified timeout period on the database. As a result, the deployment becomes abnormal.

  • Solutions

    • If this error is caused by a network issue, perform the following steps to handle this error:

      1. Run the following code to increase the values of network-related parameters of the MySQL server:

        set global slave_net_timeout = 120; (default was 30sec)
        set global thread_pool_idle_timeout = 120;

        For more information, see Debezium documentation.

      2. Configure the following Flink parameters:

        execution.checkpointing.interval=10min
        execution.checkpointing.tolerable-failed-checkpoints=100
        restart-strategy=fixed-delay
        restart-strategy.fixed-delay.attempts=2147483647
        restart-strategy.fixed-delay.delay= 30s
    • If the deployment has backpressure, modify the configurations of deployment parameters.

      To resolve this issue, you must increase the values of network-related parameters on the MySQL server. This prevents the MySQL database from automatically canceling the connection from a Binlog client when the connection to the MySQL CDC source table is idle due to backpressure.

      set global slave_net_timeout = 120; (default was 30sec)
      set global thread_pool_idle_timeout = 120;

What do I do if the error message "The slave is connecting using CHANGE MASTER TO MASTER_AUTO_POSITION = 1, but the master has purged binary logs containing GTIDs that the slave requires." appears?

  • Problem description

    org.apache.kafka.connect.errors.ConnectException: The slave is connecting using CHANGE MASTER TO MASTER_AUTO_POSITION = 1, but the master has purged binary logs containing GTIDS that the slave requires. Error code: 1236; SQLSTATE: HY000.
        at io.debezium.connector.mysql.AbstractReader.wrap(AbstractReader.java:241) 
        at io.debezium.connector.mysql.AbstractReader.failed(AbstractReader.java:207) 
        at io.debezium.connector.mysql.BinlogReadersReaderThreadLifecycleListener.onCommunicationFailure(BinlogReader.java:1142) 
        at com.github.shyiko.mysql.binlog.BinaryLogClient.listenForEventPackets(BinaryLogClient.java:962)
        at com.github.shyiko.mysql.binlog.BinaryLogClient.connect(BinaryLogClient.java:595)
        at com.github.shyiko.mysql.binlog.BinaryLogClient$7.run(BinaryLogClient.java:839)
        at java.lang.Thread.run(Thread. java:834)
    Caused by: com.github.shyiko.mysql. binlog.network.ServerException: The slave is connecting using CHANGE MASTER TO MASTER_AUTO_POSITION = 1, but the master has purged binary logs containing GTIDs that the slave requires.
        at com.github.shyiko.mysql.binlog. BinaryLogClient.listenForEventPackets(BinaryLogClient.java:926) 
    ... 3 more
  • Cause

    The time required for reading full data is excessively long. As a result, when full data reading is complete and binary log reading starts, the previously recorded startup offset in a GTID set is deleted from the MySQL database.

  • Solution

    Increase the time required for deleting binary logs or increase the maximum size of a binary log file that is allowed. To change the time required for deleting binary logs, run the following command:

    mysql> show variables like 'expire_logs_days';
    mysql> set global expire_logs_days=7;

What do I do if the error message "java.lang.IllegalStateException: The "before" field of UPDATE/DELETE message is null,please check the Postgres table has been set REPLICA IDENTITY to FULL level." appears?

  • Problem description

    java.lang.IllegalStateException: The "before" feild of UPDATE/DELETE message is null,please check the Postgres table has been set REPLICA IDENTITY to FULL level. You can update the setting by running the command in Postgres 'ALTER TABLE xxx.xxx REPLICA IDENTITY FULL'. Please see more in Debezium documentation:https:debezium.io/documentation/reference/1.2/connectors/postresql.html#postgresql-replica-identity
        at com.alibaba.ververica.cdc.connectors.postgres.table.PostgresValueValidator.validate(PostgresValueValidator.java:46)
        at com.alibaba.ververica.cdc.debezium.table.RowDataDebeziumDeserializeSchema.deserialize(RowDataDebeziumDeserializeSchema.java:113)
        at com.alibaba.ververica.cdc.debezium.internal.DebeziumChangeConsumer.handleBatch(DebeziumChangeConsumer.java:158)
        at io.debezium.embedded.ConvertingEngineBuilder.lambdaşnotifying$2(ConvertingEngineBuilder.java:82)
        at io.debezium.embedded.EmbeddedEngine.run(EmbeddedEngine.java:812)
        at io.debezium.embedded.ConvertingEngineBuilder$2.run(ConvertingEngineBuilder.java:171) 
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1147)
        at java.util.concurrent.ThreadPoolExecutorSWorker.run(ThreadPoolExecutor.java:622)
        at java.lang.Thread.run(Thread.java:834)
  • Cause

    REPLICA IDENTITY of the PostgreSQL table is not set to FULL.

  • Solution

    Execute the ALTER TABLE yourTableName REPLICA IDENTITY FULL; statement as prompted. If the error persists after you execute the statement and restart the deployment, add the 'debezium.slot.drop.on.stop' = 'true' configuration to the code of the deployment.

What do I do if the error message "Caused by: java.lang.IllegalArgumentException: Can't find any matched tables, please check your configured database-name: xxx and table-name: xxxx" appears?

  • Causes

    • The configured table name cannot be found in the database.

    • The MySQL CDC deployment contains tables of different databases. However, the account that you use does not have permissions on specific databases.

  • Solutions

    1. Check whether the configured table name exists in the database.

    2. Grant required permissions on all databases in the deployment to the account that you use.

What do I do if the error message "com.github.shyiko.mysql.binlog.network.ServerException" appears?

  • Problem description

    Caused by: com.github.shyiko.mysql.binlog.network.ServerException: The slave is connecting using CHANGE MASTER TO MASTER_AUTO_POSITION = 1, but the master has purged binary logs containing GTIDs that the slave requires.
  • Cause

    The MySQL CDC connector records the position of the binary log file before the connector starts to read full data. After full data reading is complete, the connector reads incremental data from the position of the binary log file that is recorded. In most cases, this error occurs because the full data reading is not complete before the MySQL CDC connector deletes the data from the position of the MySQL binary log file.

  • Solution

    Check the rules based on which MySQL binary log files can be deleted, including the rules that are related to the time, the storage space, and the number of files. We recommend that you retain binary log files for more than one day. For more information about binary log files of ApsaraDB RDS for MySQL, see Delete the binary log files of an ApsaraDB RDS for MySQL instance.

    Note

    In VVR 4.0.8 and later, the MySQL CDC connector supports full data reading in multiple parallel subtasks. This way, full data reading is accelerated and the probability of the preceding error is reduced.

What do I do if the error message "The primary key is necessary when enable 'Key: 'scan.incremental.snapshot.enabled'" appears?

  • Problem description

    When a syntax check is performed on a MySQL CDC source table in VVR 4.0.X, the following error message appears:

    Caused by: org.apache.flink.table.api.ValidationException: The primary key is necessary when enable 'Key: 'scan.incremental.snapshot.enabled' , default: true (fallback keys: [])' to 'true'
        at com.alibaba.ververica.cdc.connectors.mysql.table.MySqlTableSourceFactory.validatePrimaryKeyIfEnableParallel(MySqlTableSourceFactory.java:186)
        at com.alibaba.ververica.cdc.connectors.mysql.table.MySqlTableSourceFactory.createDynamicTableSource(MySqlTableSourceFactory.java:85)
        at org.apache.flink.table.factories.FactoryUtil.createTableSource(FactoryUtil.java:134)
        ... 30 more
  • Cause

    No primary key is defined in the parameters in the WITH clause of the DDL statement that is used to create the MySQL CDC source table. In VVR 6.0.X and VVR 4.0.8 and later, data can be sharded based on the primary key to support data reading in multiple parallel subtasks.

    Important

    In a VVR version that is earlier than 4.0.8, MySQL CDC source tables support only data reading in a single subtask.

  • Solutions

    • In VVR 6.0.X, VVR 4.0.8, or a version later than VVR 4.0.8, if you want to read MySQL data in multiple parallel subtasks, add the primary key information to the DDL statement.

    • In a VVR version that is earlier than 4.0.8, MySQL CDC source tables do not support data reading from a MySQL database in multiple parallel subtasks. Therefore, you must add the scan.incremental.snapshot.enabled parameter to the DDL statement and set this parameter to false. You do not need to add the primary key information to the DDL statement.

What do I do if the error message "java.io.EOFException: SSL peer shut down incorrectly" appears?

  • Problem description

    Caused by: java.io.EOFException: SSL peer shut down incorrectly
        at sun.security.ssl.SSLSocketInputRecord.decodeInputRecord(SSLSocketInputRecord.java:239) ~[?:1.8.0_302]
        at sun.security.ssl.SSLSocketInputRecord.decode(SSLSocketInputRecord.java:190) ~[?:1.8.0_302]
        at sun.security.ssl.SSLTransport.decode(SSLTransport.java:109) ~[?:1.8.0_302]
        at sun.security.ssl.SSLSocketImpl.decode(SSLSocketImpl.java:1392) ~[?:1.8.0_302]
        at sun.security.ssl.SSLSocketImpl.readHandshakeRecord(SSLSocketImpl.java:1300) ~[?:1.8.0_302]
        at sun.security.ssl.SSLSocketImpl.startHandshake(SSLSocketImpl.java:435) ~[?:1.8.0_302]
        at com.mysql.cj.protocol.ExportControlled.performTlsHandshake(ExportControlled.java:347) ~[?:?]
        at com.mysql.cj.protocol.StandardSocketFactory.performTlsHandshake(StandardSocketFactory.java:194) ~[?:?]
        at com.mysql.cj.protocol.a.NativeSocketConnection.performTlsHandshake(NativeSocketConnection.java:101) ~[?:?]
        at com.mysql.cj.protocol.a.NativeProtocol.negotiateSSLConnection(NativeProtocol.java:308) ~[?:?]
        at com.mysql.cj.protocol.a.NativeAuthenticationProvider.connect(NativeAuthenticationProvider.java:204) ~[?:?]
        at com.mysql.cj.protocol.a.NativeProtocol.connect(NativeProtocol.java:1369) ~[?:?]
        at com.mysql.cj.NativeSession.connect(NativeSession.java:133) ~[?:?]
        at com.mysql.cj.jdbc.ConnectionImpl.connectOneTryOnly(ConnectionImpl.java:949) ~[?:?]
        at com.mysql.cj.jdbc.ConnectionImpl.createNewIO(ConnectionImpl.java:819) ~[?:?]
        at com.mysql.cj.jdbc.ConnectionImpl.<init>(ConnectionImpl.java:449) ~[?:?]
        at com.mysql.cj.jdbc.ConnectionImpl.getInstance(ConnectionImpl.java:242) ~[?:?]
        at com.mysql.cj.jdbc.NonRegisteringDriver.connect(NonRegisteringDriver.java:198) ~[?:?]
        at org.apache.flink.connector.jdbc.internal.connection.SimpleJdbcConnectionProvider.getOrEstablishConnection(SimpleJdbcConnectionProvider.java:128) ~[?:?]
        at org.apache.flink.connector.jdbc.internal.AbstractJdbcOutputFormat.open(AbstractJdbcOutputFormat.java:54) ~[?:?]
        ... 14 more
  • Cause

    In MySQL V8.0.27, the MySQL database is connected over SSL by default. However, the MySQL database that is connected by using a JDBC driver cannot be accessed over SSL. As a result, this error occurs.

  • Solutions

    • If you can upgrade the VVR version of your deployment to 6.0.2 or later, add the 'jdbc.properties.useSSL'='false' configuration to the WITH clause of the MySQL CDC table.

    • If the table is declared only as a dimension table, set the connector parameter to rds in the WITH clause of the MySQL CDC table and add the characterEncoding=utf-8&useSSL=false configuration to the URL parameter. Example:

      'url'='jdbc:mysql://***.***.***.***:3306/test?characterEncoding=utf-8&useSSL=false'

What do I do if the error message "com.github.shyiko.mysql.binlog.network.ServerException: A slave with the same server_uuid/server_id as this slave has connected to the master" appears?

  • Problem description

    Caused by: com.ververica.cdc.connectors.shaded.org.apache.kafka.connect.errors.ConnectException: An exception occurred in the change event producer. This connector will be stopped.
        at io.debezium.pipeline.ErrorHandler.setProducerThrowable(ErrorHandler.java:42) ~[?:?]
        at io.debezium.connector.mysql.MySqlStreamingChangeEventSource$ReaderThreadLifecycleListener.onCommunicationFailure(MySqlStreamingChangeEventSource.java:1185) ~[?:?]
        at com.github.shyiko.mysql.binlog.BinaryLogClient.listenForEventPackets(BinaryLogClient.java:973) ~[?:?]
        at com.github.shyiko.mysql.binlog.BinaryLogClient.connect(BinaryLogClient.java:606) ~[?:?]
        at com.github.shyiko.mysql.binlog.BinaryLogClient$7.run(BinaryLogClient.java:850) ~[?:?]
        ... 1 more
    Caused by: io.debezium.DebeziumException: A slave with the same server_uuid/server_id as this slave has connected to the master; the first event '' at 4, the last event read from '/home/mysql/dataxxx/mysql/mysql-bin.xxx' at xxx, the last byte read from '/home/mysql/dataxxx/mysql/mysql-bin.xxx' at xxx. Error code: 1236; SQLSTATE: HY000.
        at io.debezium.connector.mysql.MySqlStreamingChangeEventSource.wrap(MySqlStreamingChangeEventSource.java:1146) ~[?:?]
        at io.debezium.connector.mysql.MySqlStreamingChangeEventSource$ReaderThreadLifecycleListener.onCommunicationFailure(MySqlStreamingChangeEventSource.java:1185) ~[?:?]
        at com.github.shyiko.mysql.binlog.BinaryLogClient.listenForEventPackets(BinaryLogClient.java:973) ~[?:?]
        at com.github.shyiko.mysql.binlog.BinaryLogClient.connect(BinaryLogClient.java:606) ~[?:?]
        at com.github.shyiko.mysql.binlog.BinaryLogClient$7.run(BinaryLogClient.java:850) ~[?:?]
        ... 1 more
    Caused by: com.github.shyiko.mysql.binlog.network.ServerException: A slave with the same server_uuid/server_id as this slave has connected to the master; the first event '' at 4, the last event read from '/home/mysql/dataxxx/mysql/mysql-bin.xxx' at xxx, the last byte read from '/home/mysql/dataxxx/mysql/mysql-bin.xxx' at xxx.
        at com.github.shyiko.mysql.binlog.BinaryLogClient.listenForEventPackets(BinaryLogClient.java:937) ~[?:?]
        at com.github.shyiko.mysql.binlog.BinaryLogClient.connect(BinaryLogClient.java:606) ~[?:?]
        at com.github.shyiko.mysql.binlog.BinaryLogClient$7.run(BinaryLogClient.java:850) ~[?:?]
        ... 1 more
  • Cause

    When the connector for a MySQL CDC source table reads data, make sure that a server ID is configured for each parallel subtask and each server ID is unique. If the value of the server-id parameter in the data that is read by the connector for the MySQL CDC source table conflicts with the value of the server-id parameter for the CDC source table of the same deployment, the CDC source table of another deployment, or a synchronization tool, this error occurs.

  • Solution

    Specify a globally unique server ID for each parallel subtask of the MySQL CDC source table. For more information, see the "Precautions" section in Create a MySQL CDC source table.

What do I do if the error message "NullPointerException" for TableMapEventDataDeserializer.readMetadata appears?

  • Problem description

    Causedby:java.lang.NullPointerException
        atcom.github.shyiko.mysql.binlog.event.deserialization.TableMapEventDataDeserializer.readMetadata(TableMapEventDataDeserializer.java:81)
    atcom.github.shyiko.mysql.binlog.event.deserialization.TableMapEventDataDeserializer.deserialize(TableMapEventDataDeserializer.java:42)
    atcom.github.shyiko.mysql.binlog.event.deserialization.TableMapEventDataDeserializer.deserialize(TableMapEventDataDeserializer.java:27)
    atcom.github.shyiko.mysql.binlog.event.deserialization.EventDeserializer.deserializeEventData(EventDeserializer.java:303)
    atcom.github.shyiko.mysql.binlog.event.deserialization.EventDeserializer.deserializeTableMapEventData(EventDeserializer.java:281)
    atcom.github.shyiko.mysql.binlog.event.deserialization.EventDeserializer.nextEvent(EventDeserializer.java:228)
    atio.debezium.connector.mysql.MySqlStreamingChangeEventSource$1.nextEvent(MySqlStreamingChangeEventSource.java:259)
    atcom.github.shyiko.mysql.binlog.BinaryLogClient.listenForEventPackets(BinaryLogClient.java:952)
    ...3more
  • Cause

    MySQL V8.0.18 and later support new data types. Realtime Compute for Apache Flink is not compatible with the new data types. As a result, the error message appears when Realtime Compute for Apache Flink parses binary logs.

  • Solution

    Upgrade the VVR version of Realtime Compute for Apache Flink to VVR 6.0.6 or later. Realtime Compute for Apache Flink that uses VVR 6.0.6 or later is compatible with the new data types supported by MySQL.

What do I do if the error message "NullPointerException" appears after I add a column to a MySQL table during full data reading of the table?

  • Problem description

    Caused by: org.apache.flink.util.FlinkRuntimeException: Read split MySqlSnapshotSplit{tableId=iplus.retail_detail, splitId='iplus.retail_detail:68', splitKeyType=[`id` BIGINT NOT NULL], splitStart=[212974500236****], splitEnd=[213118153601****], highWatermark=null} error due to java.lang.NullPointerException.
      at com.ververica.cdc.connectors.mysql.debezium.reader.SnapshotSplitReader.checkReadException(SnapshotSplitReader.java:361)
      at com.ververica.cdc.connectors.mysql.debezium.reader.SnapshotSplitReader.pollSplitRecords(SnapshotSplitReader.java:293)
      at com.ververica.cdc.connectors.mysql.source.reader.MySqlSplitReader.pollSplitRecords(MySqlSplitReader.java:124)
      at com.ververica.cdc.connectors.mysql.source.reader.MySqlSplitReader.fetch(MySqlSplitReader.java:86)
      at org.apache.flink.connector.base.source.reader.fetcher.FetchTask.run(FetchTask.java:58)
      at org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.runOnce(SplitFetcher.java:142)
      ... 6 more
  • Cause

    During full data reading of a table in a deployment, the schema of the table is determined when the deployment is started, and the schema is recorded in checkpoints. If a column is added to the table during full data reading, the schema cannot be matched. As a result, an error is returned.

  • Solution

    Cancel the deployment and delete the downstream table to which data is synchronized. Then, restart the deployment without states.

What do I do if the error message "The connector is trying to read binlog starting at GTIDs xxx and binlog file 'binlog.000064', pos=89887992, skipping 4 events plus 1 rows, but this is no longer available on the server. Reconfigure the connector to use a snapshot when needed" appears?

  • Problem description

    This error indicates that the binary log file that the CDC deployment attempts to read has been cleared on the MySQL server.

  • Causes

    • The retention period that is configured for the binary log file on the MySQL server is excessively short. As a result, the file is automatically deleted.

    • The CDC deployment processes binary log data at an excessively low speed.

  • Solutions

    • Increase the retention period of the binary log file. For example, you can set the retention period to seven days.

      mysql> show variables like 'expire_logs_days';
      mysql> set global expire_logs_days=7;
    • Allocate more resources to the Flink deployment to help accelerate the processing of binary log data.

What do I do if the error message "Mysql8.0 Public Key Retrieval is not allowed" appears?

  • Cause

    The configured MySQL user uses SHA256 password authentication, and the password must be transmitted over Transport Layer Security (TLS).

  • Solution

    Allow the MySQL user to access databases by using the native password. To change the authentication method, run the following commands:

    mysql> ALTER USER 'username'@'localhost' IDENTIFIED WITH mysql_native_password BY 'password';
    mysql> FLUSH PRIVILEGES;