Configure the MySQL CDC connector to optimize performance, prevent conflicts, and ensure reliable data synchronization in Realtime Compute for Apache Flink.
Set the server ID to avoid binary log conflicts
Each MySQL CDC client requires a unique server ID. Multiple source tables sharing the same server ID cause conflicts and job errors. Set a server ID in the Flink table's DDL statement or via SQL Hints. Using SQL Hints is recommended.
Configure the server ID based on your deployment scenario:
-
Incremental snapshot disabled, or parallelism = 1: Specify a single server ID.
SELECT * FROM source_table /*+ OPTIONS('server-id'='123456') */ ; -
Incremental snapshot enabled and parallelism > 1: Specify a range. The range must contain at least as many IDs as the parallelism value. For example, with parallelism = 3:
SELECT * FROM source_table /*+ OPTIONS('server-id'='123456-123458') */ ; -
CTAS-based synchronization: MySQL CDC source tables with identical configurations are automatically merged. Assign the same server ID to all merged source tables. For details, see the "Example 4: execution of multiple CREATE TABLE AS statements" section of the CREATE TABLE AS statement topic.
-
Multiple MySQL CDC source tables without CTAS: If source reuse is disabled (see Enable source reuse to reduce binary log connections), assign a unique server ID to each source table. If incremental snapshot is enabled and parallelism > 1, also specify a range per table:
SELECT * FROM source_table1 /*+ OPTIONS('server-id'='123456-123457') */ LEFT JOIN source_table2 /*+ OPTIONS('server-id'='123458-123459') */ ON source_table1.id = source_table2.id;
Configure chunk options for memory optimization
When the MySQL CDC source connector starts, it scans the entire table, splits data into chunks by primary key, and records the binary log position. Checkpoints track progress at chunk granularity. On failover, the connector resumes from the last unread chunk. After all chunks are read, the connector switches to incremental reading from the recorded binary log position, ensuring exactly-once semantics.
For a detailed description of the incremental snapshot algorithm, see MySQL CDC connector in the Apache Flink documentation.
By default, tables with a single-column primary key split by that key; tables with a composite primary key split by the first column. Ververica Runtime (VVR) 6.0.7 and later supports tables without a primary key — use scan.incremental.snapshot.chunk.key-column to specify a non-null column as the chunk key.
Tune chunk size
Both chunk data and metadata are stored in memory. scan.incremental.snapshot.chunk.size controls how many rows each chunk contains. Increasing the chunk size reduces total chunk count and improves throughput, but requires more memory per chunk. Decreasing it lowers memory usage but increases total chunk count, which adds overhead to the JobManager. Adjust based on where out-of-memory (OOM) errors occur.
JobManager OOM — caused by too many chunks, because the JobManager holds metadata for all of them:
-
Increase
scan.incremental.snapshot.chunk.sizeto reduce the total number of chunks. -
Alternatively, increase JVM heap memory with
jobmanager.memory.heap.size. See Memory configuration in the Apache Flink documentation.
TaskManager OOM — three common causes:
-
Too many rows per chunk: decrease
scan.incremental.snapshot.chunk.size, or increasetaskmanager.memory.framework.heap.size. -
In VVR 8.0.8 and earlier, the last chunk is typically large and can trigger OOM. Upgrade to VVR 8.0.9 or later.
-
When the first column of a composite primary key has many duplicate values, chunks become unevenly sized. Set
scan.incremental.snapshot.chunk.key-columnto split by a different column.
Accelerate snapshot reads
During the snapshot phase, the MySQL source connector reads data through a JDBC connection. Use the following methods to speed up snapshot reads:
-
Increase the source parallelism.
-
Increase
scan.incremental.snapshot.chunk.sizeto fetch more rows per chunk. -
If the downstream sink supports idempotent writes with a primary key, enable
scan.incremental.snapshot.backfill.skipto skip binary log reads during backfill.
Enable source reuse to reduce binary log connections
Source reuse merges multiple MySQL CDC source tables in a job into a single binary log connection, reducing database load. This feature is available only in the Realtime Compute for Apache Flink connector for MySQL CDC.
Enable source reuse in an SQL job:
SET 'table.optimizer.source-merge.enabled' = 'true';
Enable this on new jobs only. On an existing job, enabling source reuse changes the job topology — a stateful restart may fail or cause data loss. Perform a stateless restart instead.
After enabling, MySQL CDC source tables with identical configurations are merged. When all source tables share the same configuration, MySQL connection counts are:
| Phase | Connections |
|---|---|
| Snapshot reading | Equal to source parallelism |
| Incremental reading | 1 |
-
In VVR 8.0.8 and 8.0.9, also set
SET 'sql-gateway.exec-plan.enabled' = 'false';when source reuse is enabled. -
Do not set
pipeline.operator-chainingtofalseafter enabling source reuse. Breaking the operator chain adds serialization and deserialization overhead for data from the source to downstream operators, and the overhead scales with the number of merged sources. -
In VVR 8.0.7, setting
pipeline.operator-chainingtofalsecauses a serialization issue.
Accelerate incremental reading
The MySQL CDC connector parses binary log files to generate change messages. Because binary logs record changes across all tables, parsing can become a bottleneck when many tables are involved.
Enable parallel parsing and event filtering (VVR 8.0.7 and later)
This feature is available only in the Realtime Compute for Apache Flink connector for MySQL CDC.
-
scan.only.deserialize.captured.tables.changelog.enabled: Parse change events only for the specified tables, skipping all other table events. -
scan.parallel-deserialize-changelog.enabled: Use multiple threads for binary log parsing. Increase TaskManager CPU allocation when this is enabled.
Tune Debezium options
Three Debezium options directly affect parsing throughput:
| Option | Description | Default | Recommended |
|---|---|---|---|
debezium.max.queue.size |
Maximum records the blocking queue can hold. Acts as a backpressure buffer between the connector and downstream. | 8192 | 162580 |
debezium.max.batch.size |
Maximum events processed per iteration. | 2048 | 40960 |
debezium.poll.interval.ms |
Interval in milliseconds between requests for new change events. | 1000 | 50 |
The following example shows a complete configuration with Debezium options and parallel parsing enabled:
CREATE TABLE mysql_source (...) WITH (
'connector' = 'mysql-cdc',
-- Debezium options
'debezium.max.queue.size' = '162580',
'debezium.max.batch.size' = '40960',
'debezium.poll.interval.ms' = '50',
-- Parallel parsing and event filtering
'scan.only.deserialize.captured.tables.changelog.enabled' = 'true', -- Parse only the specified tables
'scan.parallel-deserialize-changelog.enabled' = 'true' -- Use multiple threads for parsing
...
)
Diagnose data latency
If data latency occurs during the incremental phase, follow these steps to identify and resolve the bottleneck.
Step 1: Check latency metrics
Check currentFetchEventTimeLag and currentEmitEventTimeLag in the Overview:
| Metric | Measures |
|---|---|
currentFetchEventTimeLag |
Latency of reading from the binary log |
currentEmitEventTimeLag |
End-to-end latency for the job's tables |
Interpret the combination of both metrics:
| Observation | Explanation |
|---|---|
currentFetchEventTimeLag is small; currentEmitEventTimeLag is large and updates infrequently |
Binary log reads are efficient. Infrequent updates mean the binary log contains few changes for the job's tables. This is expected behavior. |
| Both metrics are large | The source has poor read performance. Continue to the next steps. |
Step 2: Check for backpressure
Backpressure from downstream operators slows data flow and causes both lag metrics to increase continuously. sourceIdleTime may also increase periodically. Identify the bottleneck operator and increase its parallelism.
Step 3: Check CPU and memory
Check TM CPU Usage and TM GC Time. If resources are exhausted, scale up or configure mini-batch options. For details, see Optimize Flink SQL.
Step 4: Check for SinkUpsertMaterializer
A SinkUpsertMaterializer operator with large state degrades read performance. Increase the job parallelism or remove the operator. See Avoid using SinkUpsertMaterializer for details.
After removing the operator, the job topology changes — perform a stateless restart. A stateful restart may fail or cause data loss.
Read RDS binary logs from OSS to prevent expiration
When using ApsaraDB RDS for MySQL as a data source, Flink can read binary log backups stored in OSS (Object Storage Service). If the requested binary log file is in OSS, Flink downloads it to the cluster before processing. If the file is still on the database server, Flink reads it directly through the database connection. This feature is available only in the Realtime Compute for Apache Flink connector for MySQL CDC.
To enable this, add the following options to your table definition:
CREATE TABLE mysql_source (...) WITH (
'connector' = 'mysql-cdc',
'rds.region-id' = 'cn-beijing',
'rds.access-key-id' = 'your_access_key_id',
'rds.access-key-secret' = 'your_access_key_secret',
'rds.db-instance-id' = 'rm-xxxxxxxx', -- ApsaraDB RDS for MySQL instance ID
'rds.main-db-id' = '12345678', -- Primary database ID
'rds.endpoint' = 'rds.aliyuncs.com'
...
)
Synchronize database data and schema changes with data ingestion
For data synchronization tasks, create a data ingestion deployment, which is optimized for data integration scenarios. See Getting started with a YAML deployment for data ingestion and Develop a Flink CDC job (Beta).
The following example synchronizes data and schema changes from the MySQL database app_db to Hologres:
source:
type: mysql
hostname: <hostname>
port: 3306
username: ${secret_values.mysqlusername}
password: ${secret_values.mysqlpassword}
tables: app_db.\.*
server-id: 5400-5404
sink:
type: hologres
name: Hologres Sink
endpoint: <endpoint>
dbname: <database-name>
username: ${secret_values.holousername}
password: ${secret_values.holopassword}
pipeline:
name: Sync MySQL Database to Hologres
Control new table discovery
The MySQL data ingestion connector provides two options for controlling how new tables are discovered during a job run:
| Option | Description | Notes |
|---|---|---|
scan.newly-added-table.enabled |
When a job restarts from a checkpoint, reads both snapshot and incremental data from tables that were not discovered at the previous startup. | Effective only when scan.startup.mode is set to initial. Has no effect in other startup modes. |
scan.binlog.newly-added-table.enabled |
Synchronizes new tables during incremental reading. | Enable at the first job startup. If you enable it after tables are already created, data loss may occur. In initial startup mode, DDL operations are not synced downstream until the snapshot phase completes — tables created during the snapshot phase are not automatically synchronized even with this option enabled. |
Do not enable both options simultaneously. Enabling both scan.newly-added-table.enabled and scan.binlog.newly-added-table.enabled at the same time causes data duplication.