Learn how to use the MySQL connector.
Overview
The MySQL connector supports all databases compatible with the MySQL protocol, including ApsaraDB RDS for MySQL, PolarDB for MySQL, OceanBase (MySQL compatible mode), and self-managed MySQL databases.
To use the MySQL connector for reading data from OceanBase, you must enable and configure binlog on the OceanBase instance. Refer to the Binary logging operations for setup instructions.
Note: This feature is currently in public preview. Please evaluate it thoroughly for your use case before deploying to production.
The MySQL connector supports the following features.
Category | Details |
Supported types |
|
Execution mode | Streaming |
Data format | Not applicable |
Metrics | |
API types | SQL, Flink CDC, and DataStream |
Update/delete sink table data | Supported |
Benefits
The MySQL connector provides a unified stream of database changes by performing an initial snapshot followed by a seamless transition to binlog consumption. This architecture ensures Exactly-Once processing, preventing data loss or duplication even in the event of job failures. For more information, see Understanding MySQL source. Features:
Unified batch and streaming: Processes both snapshot and incremental data in a single pipeline, eliminating the need for separate workflows.
Incremental snapshot algorithm: Enables lockless reads during the snapshot phase, ensuring no impact on production database performance.
Horizontal scalability: Supports concurrent snapshot reading, allowing you to parallelize the initial data load.
Elastic resource management: Automatically scales down resources after transitioning from the snapshot phase to the incremental binlog phase.
Checkpoint-based recovery: Provides stateful recovery during both snapshot and streaming phases, improving overall pipeline stability.
Optimized performance: Features parallel parsing of binlog files to minimize end-to-end latency.
Cloud integration: Supports reading from binary logs on ApsaraDB RDS for MySQL.
Prerequisites
Before you use a MySQL CDC source table, complete the steps in Configure a MySQL database.
ApsaraDB RDS for MySQL
Connectivity & permissions
Network connectivity: Ensure network connectivity between your database and the Flink cluster. See How do I use the network detection feature? to verify.
IP whitelist: Configure the database whitelist to allow traffic from the Flink cluster. See Use a database client or the CLI to connect to an ApsaraDB RDS for MySQL instance.
Permissions: A MySQL user with SELECT, SHOW DATABASES, REPLICATION SLAVE, and REPLICATION CLIENT privileges.
Database configuration
Version: MySQL 5.6, 5.7, or 8.0.x.
Binlog: Ensure binlog is enabled.
Binlog format: Set it to ROW (the default.)
Binlog row image: Set
binlog_row_imagetoFULL.Transaction compression: Ensure Binary Log Transaction Compression is disabled (applies to MySQL 8.0.20+).
Database objects
Ensure the target database and tables exist and are accessible with the user credentials provided. See Create a database and account for ApsaraDB RDS for MySQL.
PolarDB for MySQL
Connectivity & permissions
Network connectivity: Ensure network connectivity between your database and the Flink cluster. See How do I use the network detection feature? to verify.
IP whitelist: Configure the database whitelist to allow traffic from the Flink cluster. See Configure an IP address whitelist for PolarDB for MySQL.
Permissions: A MySQL user with SELECT, SHOW DATABASES, REPLICATION SLAVE, and REPLICATION CLIENT privileges.
Database configuration
Version: MySQL 5.6, 5.7, or 8.0.x.
Binlog: Ensure binlog is enabled.
Binlog format: Set it to ROW (the default.)
Binlog row image: Set
binlog_row_imagetoFULL.Transaction compression: Ensure Binary Log Transaction Compression is disabled (applies to MySQL 8.0.20+).
Database objects
Ensure the target database and tables exist and are accessible with the user credentials provided. See Create a database and account for PolarDB for MySQL.
Self-managed MySQL
Connectivity & permissions
Network connectivity: Ensure network connectivity between your database and the Flink cluster. See How do I use the network detection feature? to verify.
IP whitelist: Configure the database whitelist to allow traffic from the Flink cluster. See Add a security group rule.
Permissions: A MySQL user with SELECT, SHOW DATABASES, REPLICATION SLAVE, and REPLICATION CLIENT privileges.
Database configuration
Version: MySQL 5.6, 5.7, or 8.0.x.
Binlog: Ensure binlog is enabled.
Binlog format: Set it to ROW (the default is STATEMENT.)
Binlog row image: Set
binlog_row_imagetoFULL.Transaction compression: Ensure Binary Log Transaction Compression is disabled (applies to MySQL 8.0.20+).
Database objects
Ensure the target database and tables exist and are accessible with the user credentials provided. See Create a database and account for a self-managed MySQL database.
Limitations
General
Watermarks: MySQL CDC source tables do not support watermarks. For windowed aggregations, use non-windowed aggregation patterns.
Schema evolution: In CTAS and CDAS jobs, only specific schema changes are supported. Refer to CREATE TABLE AS (CTAS) for details.
Binary log transaction compression: This feature is not supported and must be disabled on your MySQL instance.
ApsaraDB RDS for MySQL
Do not point the connector to read-only or standby instances; the short binary log retention period on these instances leads to log expiration and job failure.
Primary-secondary switchover: To prevent data loss during switchovers, ensure
slave_preserve_commit_orderis enabled on the instance.
PolarDB for MySQL
For versions 1.0.19 and earlier, reading from a multi-master cluster is not supported due to potential table ID conflicts and schema parsing failures.
Open-source MySQL
If parallel replication is enabled (
slave_parallel_workers > 1), you must setslave_preserve_commit_order = ON.Risk: Failing to preserve commit order can cause the transaction commit sequence on the replica to deviate from the primary, leading to data loss when the Flink job resumes from a checkpoint.
Alternative: If you cannot enable
slave_preserve_commit_order, setslave_parallel_workers = 1.
Usage notes
Sink tables
Do not declare auto-increment primary keys in the DDL. MySQL fills them automatically.
At least one non-primary-key field must be declared. Otherwise, an error occurs.
NOT ENFORCED means Flink does not validate primary keys. You must ensure primary key correctness. For more information, see Validity check.
Dimension tables
To accelerate queries using indexes, the JOIN condition fields must match the index definition order (leftmost prefix rule). For example, if the index is (a, b, c), the JOIN condition should be
ON t.a = x AND t.b = y.Flink-generated SQL may be rewritten by the optimizer, which can prevent index usage. To verify index usage, check the execution plan (EXPLAIN) or slow query logs in MySQL.
SQL
Use the MySQL connector in SQL jobs as a source, dimension, or sink table.
Syntax
CREATE TEMPORARY TABLE mysqlcdc_source (
order_id INT,
order_date TIMESTAMP(0),
customer_name STRING,
price DECIMAL(10, 5),
product_id INT,
order_status BOOLEAN,
PRIMARY KEY(order_id) NOT ENFORCED
) WITH (
'connector' = 'mysql',
'hostname' = '<yourHostname>',
'port' = '3306',
'username' = '<yourUsername>',
'password' = '<yourPassword>',
'database-name' = '<yourDatabaseName>',
'table-name' = '<yourTableName>'
);Sink behavior and write semantics
The MySQL connector converts incoming records into SQL statements based on the target table's schema:
Tables without primary keys: Executes standard
INSERTstatements:INSERT INTO table_name (...) VALUES (...);.Tables with primary keys: Executes
UPSERToperations usingINSERT INTO table_name (...) VALUES (...) ON DUPLICATE KEY UPDATE column1 = VALUES(column1), ...;.
Unique index conflicts
If your physical table contains unique index constraints other than the primary key, inserting records with different primary keys but identical unique index values may cause data loss due to constraint violations.
Auto-increment columns
If your target table uses an AUTO_INCREMENT primary key, do not include this column in your Flink DDL. MySQL will generate the values automatically during the write. While the connector handles INSERT and DELETE operations for tables with auto-increment keys, it does not support updates to the auto-increment column itself.
Connector options
General
Option
Description
Required
Data type
Default value
Notes
connector
The connector to use.
Yes
STRING
-
Use
mysql-cdcormysqlfor sources; usemysqlfor sinks and lookups.hostname
The IP address or hostname of the database.
Yes
STRING
-
We recommend using a VPC endpoint. If your Flink workspace and MySQL database are in different networks, ensure cross-VPC or public connectivity is configured. For more information, see Storage management and operations and How do I access the Internet from a fully managed Flink cluster?.
username
The database username.
Yes
STRING
-
-
password
The database password.
Yes
STRING
-
-
database-name
The database name.
Yes
STRING
-
Regex matching:
Supports regex for source tables.
Do not use
^or$in your regex patterns. See the note for table-name for details.
table-name
The table name. Supports regex for source tables.
Yes
STRING
None
Regex matching:
To read multiple databases or tables, use regular expressions.
Avoid anchors: Do not use
^or$in your regex patterns.Full-path resolution: The connector concatenates
database-nameandtable-namewith\\.(or.for VVR 8.0.1 and earlier) to form a fully qualified name for matching (e.g.,db_name.table_name).Optimization: When reading multiple tables, combine them into a single
CTASjob rather than launching individual jobs to reduce the number of binlog listeners. For details, see Execute multiple CTAS statements in a single job.
port
The port number of the MySQL database service.
No
INTEGER
3306
Source-specific
Option
Description
Required
Data type
Default value
Notes
server-id
Unique ID for the database client. Use a range (e.g.,
5400-6400) to support concurrent reading.No
STRING
Random
Best practice: Assigning a different ID for each job accessing the same database.
For more information, see Using server ID.
scan.incremental.snapshot.enabled
Enables incremental snapshot.
No
BOOLEAN
trueIncremental snapshots are enabled by default. An incremental snapshot is a new mechanism for reading snapshots. Compared to traditional snapshots, incremental snapshots offer several advantages:
Concurrent snapshot reading.
Checkpointing at the chunk level during snapshot reading.
No global read lock (FLUSH TABLES WITH READ LOCK) required during snapshot reading.
If you want the source to support concurrent reading, each reader needs a unique server ID. Therefore, the server-id must be a range like 5400-6400, and the range size must be at least equal to the parallelism.
NoteThis configuration is removed in VVR 11.1 and later.
scan.incremental.snapshot.chunk.size
Number of rows per read chunk. Balance for throughput vs. memory.
No
INTEGER
8096scan.snapshot.fetch.size
The maximum number of records to fetch per read when scanning full table data.
No
INTEGER
1024scan.startup.mode
Initial data position:
initial,latest-offset,earliest-offset,specific-offset, ortimestamp.No
STRING
initialSee Startup modes for details.
ImportantWhen using earliest-offset, specific-offset, or timestamp, ensure the table schema remains unchanged between the specified binary log position and job startup. Schema changes may cause errors.
scan.startup.specific-offset.file
The binary log filename for the specified startup offset.
No
STRING
-
When using this configuration, scan.startup.mode must be set to specific-offset. Example filename:
mysql-bin.000003.scan.startup.specific-offset.pos
The offset within the specified binary log file for the startup position.
No
INTEGER
-
When using this configuration, scan.startup.mode must be set to specific-offset.
scan.startup.specific-offset.gtid-set
The GTID set for the startup position.
No
STRING
-
When using this configuration, scan.startup.mode must be set to specific-offset. Example GTID set:
24DA167-0C0C-11E8-8442-00059A3C7B00:1-19.scan.startup.timestamp-millis
The startup timestamp in milliseconds.
No
LONG
-
When using this configuration, scan.startup.mode must be set to timestamp. Timestamp units are milliseconds.
ImportantWhen using a timestamp, MySQL CDC attempts to read the initial event of each binary log file to determine its timestamp and locate the corresponding file. Ensure the specified timestamp corresponds to a binary log file that exists and is readable in the database.
server-time-zone
Sets the session time zone (e.g.,
Asia/Shanghai) for temporal type conversion.No
STRING
Local
For more information, see Debezium temporal values.
debezium.min.row.count.to.stream.results
When the table row count exceeds this value, use batch reading mode.
No
INTEGER
1000Flink reads data from the MySQL source table as follows:
Snapshot mode: Loads the entire table into memory. Fast but memory-intensive. Large tables risk OOM errors.
Batch mode: Reads data in batches. Memory-efficient but slower for large tables.
connect.timeout
Max wait time for a connection.
No
DURATION
30s
connect.max-retries
Max retries after a connection failure.
No
INTEGER
3connection.pool.size
Size of the connection pool, used for reuses and reduces connections.
No
INTEGER
20jdbc.properties.*
Custom connection parameters for the JDBC URL.
No
STRING
-
You can pass custom connection parameters. For example, to disable SSL, set 'jdbc.properties.useSSL' = 'false'.
For supported connection parameters, see MySQL Configuration Properties.
debezium.*
Custom Debezium parameters for reading binary logs.
No
STRING
-
You can pass custom Debezium parameters. For example, use 'debezium.event.deserialization.failure.handling.mode'='ignore' to define how to handle parsing errors.
heartbeat.interval
Heartbeat frequency to advance offsets and prevent log expiration.
No
DURATION
30s
Useful for tables with infrequent updates. Without heartbeats, the binary log offset for such tables may stall, causing expiration and job failure.
scan.incremental.snapshot.chunk.key-column
Column to split chunks.
No
STRING
-
Tables without primary keys: Required. The selected column must be non-nullable.
Tables with primary keys: Optional. Supports only a primary key column.
rds.region-id
The region ID of the ApsaraDB RDS for MySQL instance.
No
STRING
-
Required when reading archived logs from OSS. For region IDs, see Regions and zones.
rds.access-key-id
The AccessKey ID for the account with access to ApsaraDB RDS for MySQL.
No
STRING
-
Required when reading archived logs from OSS. For more information, see How do I view my AccessKey ID and AccessKey secret?.
ImportantSecurity recommendation: Use variables instead of hardcoding your credentials. For more information, see Manage variables.
rds.access-key-secret
The AccessKey secret for the account with access to ApsaraDB RDS for MySQL.
No
STRING
-
rds.db-instance-id
The ApsaraDB RDS for MySQL instance ID.
No
STRING
-
Required when reading archived logs from OSS.
rds.main-db-id
The primary ApsaraDB RDS for MySQL database ID.
No
STRING
-
Requires VVR 8.0.7+.
rds.download.timeout
Max wait time for downloading a single archived log from OSS.
No
DURATION
60s
rds.endpoint
The endpoint for accessing OSS binary log information.
No
STRING
-
For valid values, see Service endpoints.
Requires VVR 8.0.8+.
scan.incremental.close-idle-reader.enabled
Closes idle readers after snapshot reading.
No
BOOLEAN
falseRequires VVR 8.0.1+.
Dependency:
execution.checkpointing.checkpoints-after-tasks-finish.enabledistrue.
scan.read-changelog-as-append-only.enabled
Transforms all events to
INSERTs.No
BOOLEAN
falseUse this carefully, as it destroys the "Delete/Update" semantics of the original database.
NoteRequires VVR 8.0.8+.
scan.only.deserialize.captured.tables.changelog.enabled
Deserializes only events for tables defined in the query.
No
BOOLEAN
true(VVR 11.1+)Valid values:
true: Deserialize only change data for target tables to speed up binary log reading.
false (default): Deserialize change data for all tables.
NoteSupported only in VVR 8.0.7+.
In VVR 8.0.8 and earlier, use debezium.scan.only.deserialize.captured.tables.changelog.enable.
scan.parse.online.schema.changes.enabled
Parses RDS lockless DDL changes (Experimental).
No
BOOLEAN
falseValid values:
true
false
Before performing online lockless changes, take a savepoint for recovery.
NoteRequires VVR 11.1+.
scan.incremental.snapshot.backfill.skip
Skip backfill during snapshot reading.
No
BOOLEAN
falseIf backfill is skipped, changes made during the snapshot phase are consumed in the incremental phase.
ImportantSkipping backfill may cause data inconsistency due to change replays, ensuring at-least-once semantics.
NoteRequires VVR 11.1+.
scan.incremental.snapshot.unbounded-chunk-first.enabled
Distributes unbounded chunks first during snapshot reading. (Experimental)
No
BOOELEAN
falseEnabling this reduces the risk of TaskManager OOMs during the final chunk sync in the snapshot phase. We recommend enabling it before the first job startup.
NoteRequires VVR 11.1+.
binlog.session.network.timeout
The network timeout for binary log connections.
No
DURATION
10m
Setting this to 0s uses the MySQL server's default timeout.
NoteRequires VVR 11.5+.
scan.rate-limit.records-per-second
Limits throughput to prevent source overloading.
No
LONG
-
The
numRecordsOutPerSecondmetric reflects the total records emitted per second. Adjust this option based on that metric.During snapshot reading, a best practice is to use this option and a smaller chunk size (
scan.incremental.snapshot.chunk.size).NoteRequires VVR 11.5+.
scan.binlog.tolerate.gtid-holes
Allows ignoring gaps in GTID sequences to keep the job running.
No
BOOLEAN
falseBefore enabling this option, ensure the job's startup offset has not expired. If the job starts from a cleaned or expired GTID offset, VVR silently skips missing logs, resulting in data loss.
NoteRequires VVR 11.6+.
Lookup-specific (dimension table)
Option
Description
Required
Data type
Default value
Notes
url
The JDBC URL (
jdbc:mysql://<host>:<port>/<db>).No
STRING
-
The URL format is:
jdbc:mysql://<endpoint>:<port>/<database-name>.lookup.max-retries
Max retries for failed lookup requests.
No
INTEGER
3
Requires VVR 6.0.7+.
lookup.cache.strategy
Caching policy:
None,LRU, andALL. For descriptions, see Background information.No
STRING
NoneDependency: The
LRUcache policy relies on the lookup.cache.max-rows option.lookup.cache.max-rows
Max number of rows in the cache.
No
INTEGER
100000Required for
LRUlookup.cache.ttl
Cache expiration (TTL) for
LRUor reload interval forALL.No
DURATION
10s
The lookup.cache.ttl setting depends on lookup.cache.strategy:
lookup.cache.strategy is None: lookup.cache.ttl is optional and indicates no TTL.
lookup.cache.strategy is LRU: lookup.cache.ttl is the cache TTL and indicates no TTL.
lookup.cache.strategy is ALL: lookup.cache.ttl is cache reload interval. Default is no reload.
Specify time in formats like 1min or 10s.
lookup.max-join-rows
Max result rows returned per join query.
No
INTEGER
1024lookup.filter-push-down.enabled
Pushes filters to the database to reduce data loaded into memory.
No
BOOLEAN
falseRequires VVR 8.0.7+.
ImportantFilter pushdown is not supported for MySQL source tables. If a table is used as both a source and dimension table, and filter pushdown is enabled for the dimension table, explicitly set this configuration to false using SQL hints when using it as a source table to prevent failures.
Sink-specific
Option
Description
Required
Data type
Default value
Notes
url
The JDBC URL (
jdbc:mysql://<host>:<port>/<db>).No
STRING
-
The URL format is:
jdbc:mysql://<endpoint>:<port>/<database-name>.sink.max-retries
Max retries for failed write operations.
No
INTEGER
3sink.buffer-flush.batch-size
Number of records per batch write.
No
INTEGER
4096sink.buffer-flush.max-rows
Max rows cached in memory (requires primary key).
No
INTEGER
10000
This parameter takes effect only when a primary key is specified.
sink.buffer-flush.interval
Max time to hold buffered data before flushing.
No
DURATION
1s
None.
sink.ignore-delete
Whether to ignore
DELETEandUPDATE_BEFOREmessages.No
BOOLEAN
falseIn high-concurrency environments, multiple tasks updating the same table may cause race conditions when processing
DELETEorUPDATE_BEFOREmessages.Setting this to
trueignores these signals and processes onlyINSERTandUPDATE_AFTERevents. Use this to prevent accidental nullification of records during concurrent updates.NoteUPDATE_BEFORE is part of Flink's retraction mechanism, used to "retract" old values during updates.
When this option is true, all DELETE and UPDATE_BEFORE records are skipped. Only INSERT and UPDATE_AFTER records are processed.
sink.ignore-null-when-update
If
true, skips updating fields that containnullvalues.No
BOOLEAN
e
Valid values:
true: Skip updating the field. Supported only when the Flink table has a primary key. When set to true:
In VVR 8.0.6 and earlier, batch writes are not supported for sink tables.
In VVR 8.0.7 and later, batch writes are supported for sink tables.
Batch writes improve write efficiency and throughput but introduce latency and OOM risks. Balance these trade-offs based on your business needs.
false: Update the field to null.
NoteSupported only in VVR 8.0.5 and later.
Type mappings
Source
MySQL
Flink
TINYINT
TINYINT
SMALLINT
SMALLINT
TINYINT UNSIGNED
TINYINT UNSIGNED ZEROFILL
INT
INT
MEDIUMINT
SMALLINT UNSIGNED
SMALLINT UNSIGNED ZEROFILL
BIGINT
BIGINT
INT UNSIGNED
INT UNSIGNED ZEROFILL
MEDIUMINT UNSIGNED
MEDIUMINT UNSIGNED ZEROFILL
BIGINT UNSIGNED
DECIMAL(20, 0)
BIGINT UNSIGNED ZEROFILL
SERIAL
FLOAT [UNSIGNED] [ZEROFILL]
FLOAT
DOUBLE [UNSIGNED] [ZEROFILL]
DOUBLE
DOUBLE PRECISION [UNSIGNED] [ZEROFILL]
REAL [UNSIGNED] [ZEROFILL]
NUMERIC(p, s) [UNSIGNED] [ZEROFILL]
DECIMAL(p, s)
DECIMAL(p, s) [UNSIGNED] [ZEROFILL]
BOOLEAN
BOOLEAN
TINYINT(1)
DATE
DATE
TIME [(p)]
TIME [(p)] [WITHOUT TIME ZONE]
DATETIME [(p)]
TIMESTAMP [(p)] [WITHOUT TIME ZONE]
TIMESTAMP [(p)]
TIMESTAMP [(p)]
TIMESTAMP [(p)] WITH LOCAL TIME ZONE
CHAR(n)
STRING
VARCHAR(n)
TEXT
BINARY
BYTES
VARBINARY
BLOB
ImportantDo not use the TINYINT(1) type in MySQL to store values other than 0 and 1. When property-version is set to 0, the MySQL connector maps TINYINT(1) to the Flink BOOLEAN type by default. This can cause inaccurate data. To use the TINYINT(1) type to store values other than 0 and 1, see the catalog.table.treat-tinyint1-as-boolean configuration parameter.
Look and sink
MySQL
Flink
TINYINT
TINYINT
SMALLINT
SMALLINT
TINYINT UNSIGNED
INT
INT
MEDIUMINT
SMALLINT UNSIGNED
BIGINT
BIGINT
INT UNSIGNED
BIGINT UNSIGNED
DECIMAL(20, 0)
FLOAT
FLOAT
DOUBLE
DOUBLE
DOUBLE PRECISION
NUMERIC(p, s)
DECIMAL(p, s)
Notep <= 38 is required.
DECIMAL(p, s)
BOOLEAN
BOOLEAN
TINYINT(1)
DATE
DATE
TIME [(p)]
TIME [(p)] [WITHOUT TIME ZONE]
DATETIME [(p)]
TIMESTAMP [(p)] [WITHOUT TIME ZONE]
TIMESTAMP [(p)]
CHAR(n)
CHAR(n)
VARCHAR(n)
VARCHAR(n)
BIT(n)
BINARY(⌈n/8⌉)
BINARY(n)
BINARY(n)
VARBINARY(N)
VARBINARY(N)
TINYTEXT
STRING
TEXT
MEDIUMTEXT
LONGTEXT
TINYBLOB
BYTES
ImportantFlink supports MySQL BLOB records up to 2,147,483,647 bytes (231 − 1).
BLOB
MEDIUMBLOB
LONGBLOB
Flink CDC
The MySQL connector can be used as a data source for Flink CDC.
Syntax
source:
type: mysql
name: MySQL Source
hostname: localhost
port: 3306
username: <username>
password: <password>
tables: adb.\.*, bdb.user_table_[0-9]+, [app|web].order_\.*
server-id: 5401-5404
sink:
type: xxxConfiguration options
Option | Description | Required | Data type | Default value | Remarks |
type | The connector type. | Yes | STRING | - | Must be |
name | The name of the data source. | No | STRING | - | - |
hostname | MySQL host or IP. | Yes | STRING | - | VPC endpoint recommended. Note If your MySQL database and Flink workspace are not in the same VPC, establish a cross-VPC network connection or access the database over the public network. For more information, see Workspace and namespace FAQ. |
username | Database username. | Yes | STRING | - | - |
password | Database password. | Yes | STRING | - | - |
tables | The MySQL tables to sync. | Yes | STRING | - |
Note
|
tables.exclude | The MySQL tables to exclude. | No | STRING | - | |
port | Database port. | No | INTEGER |
| - |
schema-change.enabled | Whether to propagate schema evolution. | No | BOOLEAN |
| - |
server-id | Unique ID or range (e.g., | No | STRING | Random | Best practices:
Default behavior: If left blank, Flink automatically generates a random ID between |
jdbc.properties.* | Custom JDBC connection parameters. | No | STRING | - | For more information, see MySQL Configuration Properties. |
debezium.* | Custom Debezium configuration properties. | No | STRING | - | - |
scan.incremental.snapshot.chunk.size | Number of rows per snapshot chunk. | No | INTEGER |
| Tables are partitioned into chunks for reading. Each chunk is cached in memory until fully processed. Trade-offs:
|
scan.snapshot.fetch.size | Records fetched per read during full scan. | No | INTEGER |
| - |
scan.startup.mode | The startup mode for consuming data. | No | STRING |
| See Startup modes for details. Important
|
scan.startup.specific-offset.file | Binlog file name for | No | STRING | - | Dependency: This option relies on scan.startup.mode set to specific-offset. Example: |
scan.startup.specific-offset.pos | Binlog file offset for | No | INTEGER | - | Dependency: This option is valid only when scan.startup.mode is set to specific-offset. |
scan.startup.specific-offset.gtid-set | GTID set for | No | STRING | - | Dependency: This option is valid only when scan.startup.mode is set to specific-offset. Example: |
scan.startup.timestamp-millis | Start timestamp (ms) for | No | LONG | - | Dependency: scan.startup.mode must timestamp. Important The MySQL CDC connector scans the initial event of each binlog file to locate the file matching your specified |
server-time-zone | Database session time zone (e.g., | No | STRING | Local | Influences See also: Debezium temporal types. |
scan.startup.specific-offset.skip-events | Binlog events to skip in | No | INTEGER | - | Dependency: |
scan.startup.specific-offset.skip-rows | Row changes to skip in | No | INTEGER | - | Dependency: |
connect.timeout | Max wait time for a connection. | No | DURATION | 30s | - |
connect.max-retries | Max retries for connection failures. | No | INTEGER |
| - |
connection.pool.size | Size of the JDBC connection pool, for connection reuse. | No | INTEGER |
| - |
heartbeat.interval | Interval to emit heartbeats to advance offsets. | No | DURATION | 30s | Prevents log expiration for inactive tables. |
scan.incremental.snapshot.chunk.key-column | The chunk column. | No | STRING | - |
|
rds.region-id | Region ID of the Apsara RDS for MySQL instance. | No | STRING | - | Required for OSS archived log consumption. See Regions and zones. |
rds.access-key-id | Alibaba Cloud AccessKey ID. | Required when you read archived logs from OSS. | STRING | - | Required for OSS archived log consumption. See How do I view the AccessKey ID and AccessKey secret? Important To ensure security, use variables instead of hardcoding your credentials. For more information, see Manage variables. |
rds.access-key-secret | Alibaba Cloud AccessKey Secret. | Required when you read archived logs from OSS. | STRING | - | |
rds.db-instance-id | Apsara RDS for MySQL instance ID. | Required when you read archived logs from OSS. | STRING | - | - |
rds.main-db-id | Apsara RDS for MySQL primary database ID. | No | STRING | - | For details, see RDS for MySQL log backup. |
rds.download.timeout | Timeout for downloading archived logs. | No | DURATION | 60s | |
rds.endpoint | OSS service endpoint. | No | STRING | - | For details, see Endpoints. |
rds.binlog-directory-prefix | Directory prefix for binlog files. | No | STRING |
| |
rds.use-intranet-link | Whether to use internal network for OSS logs. | No | BOOLEAN |
| |
rds.binlog-directories-parent-path | Parent path for binlog storage. | No | STRING | - | |
chunk-meta.group.size | Chunk metadata grouping size. | No | INTEGER |
| If the metadata is larger than this value, it is passed in multiple parts. |
chunk-key.even-distribution.factor.lower-bound | Lower bound for even chunk splitting. | No | DOUBLE |
| If the distribution factor is less than this value, uneven chunking occurs. Chunk distribution factor = (MAX(chunk-key) - MIN(chunk-key) + 1) / Total number of data rows. |
chunk-key.even-distribution.factor.upper-bound | Upper bound for even chunk splitting. | No | DOUBLE |
| If the distribution factor is greater than this value, uneven chunking occurs. Chunk distribution factor = (MAX(chunk-key) - MIN(chunk-key) + 1) / Total number of data rows. |
scan.incremental.close-idle-reader.enabled | Closes idle readers after snapshot completion. | No | BOOLEAN |
| Dependency: |
scan.only.deserialize.captured.tables.changelog.enabled | Deserializes only specified tables for efficiency. | No | BOOLEAN |
| - |
scan.parallel-deserialize-changelog.enabled | Enables multi-threaded binlog deserialization. | No | BOOLEAN |
| - Note Requires VVR 8.0.11+. |
scan.parallel-deserialize-changelog.handler.size | Thread count for multi-threaded deserialization. | No | INTEGER |
| Note Requires VVR 8.0.11+. |
metadata-column.include-list | Metadata columns to expose. | No | STRING | - | Available metadata columns include
Note
|
scan.newly-added-table.enabled | Sync newly discovered tables at restart. | No | BOOLEAN |
| This takes effect when restarting from a checkpoint or savepoint. |
scan.binlog.newly-added-table.enabled | Sync newly discovered tables during binlog phase. | No | BOOLEAN |
| Conflict with |
scan.parse.online.schema.changes.enabled | Parse RDS lockless DDL events (Experimental). | No | BOOLEAN |
| Before you perform an online lockless change, create a savepoint of the job. Note Requires VVR 11.0+. |
scan.incremental.snapshot.backfill.skip | Skip backfill during snapshot. | No | BOOLEAN |
| If backfill is skipped, changes to the table during the snapshot phase are read in the later incremental phase instead of being merged into the snapshot. Important Skipping backfill may cause data inconsistency because changes that occur during the snapshot phase might be replayed. Only at-least-once semantics are guaranteed. Note Requires VVR 11.1+. |
treat-tinyint1-as-boolean.enabled | Maps | No | BOOLEAN |
| - |
treat-timestamp-as-datetime-enabled | Maps | No | BOOLEAN |
| The TIMESTAMP type stores UTC time and is affected by the time zone, while the DATETIME type stores literal time and is not affected by the time zone. If this option is enabled, the MySQL TIMESTAMP type data is converted to the DATETIME type based on the server-time-zone. |
include-comments.enabled | Synchronizes table/column comments. | No | BOOELEAN |
| This option increases the memory usage of the job. |
scan.incremental.snapshot.unbounded-chunk-first.enabled | Distribute unbounded chunks first (Experimental). | No | BOOELEAN |
| This option reduces OOM risk during snapshot phase. We recommend enabling this option before first-time startup. Note Requires VVR 11.1+. |
binlog.session.network.timeout | Network timeout for binlog connection. | No | DURATION | 10m | If this is set to 0s, the default timeout of the MySQL server is used. Note Requires VVR 11.5+. |
scan.rate-limit.records-per-second | Limit records emitted per second. | No | LONG | - | Applies to both snapshot and binlog phases. The During the snapshot phase, you usually need to reduce the number of records read in each batch. Reduce the value of the Note Requires VVR 11.5+. |
include-binlog-meta.enable | Include original GTID/offset in message. | No | Boolean |
| This is suitable for original binlog synchronization scenarios, such as replacing existing canal synchronization. Note Requires VVR 11.6+. |
scan.binlog.tolerate.gtid-holes | Ignores GTID gaps in sequence. | No | Boolean |
| Before enabling this option, ensure that the start offset of the job has not expired. If the job starts from a cleared or expired GTID offset, Flink skips the missing logs, which results in data loss. Note Requires VVR 11.6+. |
Type mappings
The following table shows the data type mappings for data ingestion.
MySQL | Flink CDC |
TINYINT(n) | TINYINT |
SMALLINT | SMALLINT |
TINYINT UNSIGNED | |
TINYINT UNSIGNED ZEROFILL | |
YEAR | |
INT | INT |
MEDIUMINT | |
MEDIUMINT UNSIGNED | |
MEDIUMINT UNSIGNED ZEROFILL | |
SMALLINT UNSIGNED | |
SMALLINT UNSIGNED ZEROFILL | |
BIGINT | BIGINT |
INT UNSIGNED | |
INT UNSIGNED ZEROFILL | |
BIGINT UNSIGNED | DECIMAL(20, 0) |
BIGINT UNSIGNED ZEROFILL | |
SERIAL | |
FLOAT [UNSIGNED] [ZEROFILL] | FLOAT |
DOUBLE [UNSIGNED] [ZEROFILL] | DOUBLE |
DOUBLE PRECISION [UNSIGNED] [ZEROFILL] | |
REAL [UNSIGNED] [ZEROFILL] | |
NUMERIC(p, s) [UNSIGNED] [ZEROFILL] where p <= 38 | DECIMAL(p, s) |
DECIMAL(p, s) [UNSIGNED] [ZEROFILL] where p <= 38 | |
FIXED(p, s) [UNSIGNED] [ZEROFILL] where p <= 38 | |
BOOLEAN | BOOLEAN |
BIT(1) | |
TINYINT(1) | |
DATE | DATE |
TIME [(p)] | TIME [(p)] |
DATETIME [(p)] | TIMESTAMP [(p)] |
TIMESTAMP [(p)] | The mapping depends on the value of the
|
CHAR(n) | CHAR(n) |
VARCHAR(n) | VARCHAR(n) |
BIT(n) | BINARY(⌈(n + 7) / 8⌉) |
BINARY(n) | BINARY(n) |
VARBINARY(N) | VARBINARY(N) |
NUMERIC(p, s) [UNSIGNED] [ZEROFILL] where 38 < p <= 65 | STRING Note In MySQL,
|
DECIMAL(p, s) [UNSIGNED] [ZEROFILL] where 38 < p <= 65 | |
FIXED(p, s) [UNSIGNED] [ZEROFILL] where 38 < p <= 65 | |
TINYTEXT | STRING |
TEXT | |
MEDIUMTEXT | |
LONGTEXT | |
ENUM | |
JSON | STRING Note The JSON data type is converted to a JSON-formatted string in Flink. |
GEOMETRY | STRING Note Spatial data types in MySQL are converted to strings in a fixed JSON format. For more information, see MySQL spatial data type mapping. |
POINT | |
LINESTRING | |
POLYGON | |
MULTIPOINT | |
MULTILINESTRING | |
MULTIPOLYGON | |
GEOMETRYCOLLECTION | |
TINYBLOB | BYTES Note For BLOB data types in MySQL, only blobs with a length no greater than 2,147,483,647 (2^31 - 1) are supported. |
BLOB | |
MEDIUMBLOB | |
LONGBLOB |
Examples
Source table
CREATE TEMPORARY TABLE mysqlcdc_source ( order_id INT, order_date TIMESTAMP(0), customer_name STRING, price DECIMAL(10, 5), product_id INT, order_status BOOLEAN, PRIMARY KEY(order_id) NOT ENFORCED ) WITH ( 'connector' = 'mysql', 'hostname' = '<yourHostname>', 'port' = '3306', 'username' = '<yourUsername>', 'password' = '<yourPassword>', 'database-name' = '<yourDatabaseName>', 'table-name' = '<yourTableName>' ); CREATE TEMPORARY TABLE blackhole_sink( order_id INT, customer_name STRING ) WITH ( 'connector' = 'blackhole' ); INSERT INTO blackhole_sink SELECT order_id, customer_name FROM mysqlcdc_source;Dimension (lookup) table
CREATE TEMPORARY TABLE datagen_source( a INT, b BIGINT, c STRING, `proctime` AS PROCTIME() ) WITH ( 'connector' = 'datagen' ); CREATE TEMPORARY TABLE mysql_dim ( a INT, b VARCHAR, c VARCHAR ) WITH ( 'connector' = 'mysql', 'hostname' = '<yourHostname>', 'port' = '3306', 'username' = '<yourUsername>', 'password' = '<yourPassword>', 'database-name' = '<yourDatabaseName>', 'table-name' = '<yourTableName>' ); CREATE TEMPORARY TABLE blackhole_sink( a INT, b STRING ) WITH ( 'connector' = 'blackhole' ); INSERT INTO blackhole_sink SELECT T.a, H.b FROM datagen_source AS T JOIN mysql_dim FOR SYSTEM_TIME AS OF T.`proctime` AS H ON T.a = H.a;Sink table
CREATE TEMPORARY TABLE datagen_source ( `name` VARCHAR, `age` INT ) WITH ( 'connector' = 'datagen' ); CREATE TEMPORARY TABLE mysql_sink ( `name` VARCHAR, `age` INT ) WITH ( 'connector' = 'mysql', 'hostname' = '<yourHostname>', 'port' = '3306', 'username' = '<yourUsername>', 'password' = '<yourPassword>', 'database-name' = '<yourDatabaseName>', 'table-name' = '<yourTableName>' ); INSERT INTO mysql_sink SELECT * FROM datagen_source;Flink CDC source
source: type: mysql name: MySQL Source hostname: ${mysql.hostname} port: ${mysql.port} username: ${mysql.username} password: ${mysql.password} tables: ${mysql.source.table} server-id: 7601-7604 sink: type: values name: Values Sink print.enabled: true sink.print.logger: true
Understanding MySQL source
How it works
The MySQL CDC source utilizes an incremental snapshot algorithm to provide a lockless, high-performance, and exactly-once streaming experience:
Snapshot phase: The table is split into chunks based on the primary key. These are read sequentially in parallel. Checkpoints occur regularly, allowing the job to resume from specific chunks upon failure.
Binlog phase: Once all chunks are processed, the connector transitions to reading binlogs from the offset recorded at the start of the snapshot phase.
Failover: The job resumes from the last successful checkpoint (either a specific chunk or a binlog offset), ensuring exactly-once semantics.
See also: MySQL CDC connector in Apache Flink documentation.
Metadata columns
Metadata is essential when merging sharded tables into a single sink. Use the
METADATA FROMsyntax to access metadata columns:Metadata key
Type
Description
database_name
STRING NOT NULL
The source database name.
table_name
STRING NOT NULL
The source table name.
op_ts
TIMESTAMP_LTZ(3) NOT NULL
The change timestamp (0 if historical snapshot data).
op_type
STRING NOT NULL
Change type:
+I,-D,+U(after),-U(before).NoteRequires VVR 8.0.7+.
query_log
STRING NOT NULL
The raw MySQL query (Requires
binlog_rows_query_log_events).NoteMySQL must have the binlog_rows_query_log_events parameter enabled to record query logs.
Example: Merging sharded tables a single
holo_ordersHologres sinkCREATE TEMPORARY TABLE mysql_orders ( db_name STRING METADATA FROM 'database_name' VIRTUAL, table_name STRING METADATA FROM 'table_name' VIRTUAL, operation_ts TIMESTAMP_LTZ(3) METADATA FROM 'op_ts' VIRTUAL, -- Read change time. op_type STRING METADATA FROM 'op_type' VIRTUAL, -- Read change type. order_id INT, order_date TIMESTAMP(0), customer_name STRING, price DECIMAL(10, 5), product_id INT, order_status BOOLEAN, PRIMARY KEY(order_id) NOT ENFORCED ) WITH ( 'connector' = 'mysql-cdc', 'hostname' = 'localhost', 'port' = '3306', 'username' = 'flinkuser', 'password' = 'flinkpw', 'database-name' = 'mydb_.*', -- Regex match multiple sharded databases. 'table-name' = 'orders_.*' -- Regex match multiple sharded tables. ); INSERT INTO holo_orders SELECT * FROM mysql_orders;Additionally, if you set
scan.read-changelog-as-append-only.enabledtotrue, the output varies based on the sink table's primary key configuration:If the sink table's primary key is order_id, the output contains only the last change for each primary key from the source table. For a primary key whose last change was a delete operation, the sink table shows a record with the same primary key and an op_type of -D.
If the sink table's primary key is order_id, operation_ts, and op_type, the output contains the complete change history for each primary key from the source table.
Regular expression support
The MySQL connector supports matching multiple tables or databases using regular expressions. Example:
CREATE TABLE products ( db_name STRING METADATA FROM 'database_name' VIRTUAL, table_name STRING METADATA FROM 'table_name' VIRTUAL, operation_ts TIMESTAMP_LTZ(3) METADATA FROM 'op_ts' VIRTUAL, order_id INT, order_date TIMESTAMP(0), customer_name STRING, price DECIMAL(10, 5), product_id INT, order_status BOOLEAN, PRIMARY KEY(order_id) NOT ENFORCED ) WITH ( 'connector' = 'mysql-cdc', 'hostname' = 'localhost', 'port' = '3306', 'username' = 'root', 'password' = '123456', 'database-name' = '(^(test).*|^(tpc).*|txc|.*[p$]|t{2})', -- Regex match multiple databases. 'table-name' = '(t[5-8]|tt)' -- Regex match multiple tables. );Regular expression explanation:
^(test).*: Prefix match. Matches database names starting with "test", such astest1ortest2..*[p$]: Suffix match. Matches database names ending with "p", such ascdcporedcp.txc: Exact match.
The MySQL connector identifies tables using the fully qualified format:
database-name.table-name.ImportantNo comma support: The
table-nameanddatabase-nameoptions do not support comma-separated lists. Use the vertical bar (|) operator inside parentheses to specify multiple values.Escaping special characters: If a pattern contains a comma or other special regex characters (like
{1,2}), rewrite them using the|operator to ensure compatibility.Resource management
Parallelism: The
server-idrange must be greater than or equal to the job parallelism (e.g., range5404-5412supports up to 8 concurrent tasks). Each job must use a unique, non-overlappingserver-idrange.Autopilot: When enabled, Flink automatically scales down resources after the snapshot phase completes, as the incremental binlog phase typically requires less parallelism than the initial snapshot load.
In the Development Console, set the job's parallelism in basic or expert mode. The difference is as follows:
Basic mode: Parallelism applies to the job.

Expert mode: Vertext-specific parallelism is supported.

For details, see Configure job deployments.
Startup modes
Use the scan.startup.mode configuration item to specify the startup mode for the MySQL CDC source table. Valid values:
initial: Performs an initial snapshot of the table, then transitions to reading binary logs.
latest-offset: Skips the snapshot; starts reading changes from the current end of the binary log.
earliest-offset: Skips the snapshot; starts reading from the earliest available binary log.
specific-offset: Skips the snapshot; starts from a user-defined position:
File/Pos: Use
scan.startup.specific-offset.fileandscan.startup.specific-offset.pos.GTID: Use
scan.startup.specific-offset.gtid-set.
timestamp: Skips the snapshot; starts from a specific time (in ms) provided by
scan.startup.timestamp-millis.
Example:
CREATE TABLE mysql_source (...) WITH ( 'connector' = 'mysql-cdc', 'scan.startup.mode' = 'earliest-offset', -- Start from the earliest offset. 'scan.startup.mode' = 'latest-offset', -- Start from the latest offset. 'scan.startup.mode' = 'specific-offset', -- Start from a specific offset. 'scan.startup.mode' = 'timestamp', -- Start from a specific timestamp. 'scan.startup.specific-offset.file' = 'mysql-bin.000003', -- Specify the binary log filename for specific-offset mode. 'scan.startup.specific-offset.pos' = '4', -- Specify the binary log position for specific-offset mode. 'scan.startup.specific-offset.gtid-set' = '24DA167-0C0C-11E8-8442-00059A3C7B00:1-19', -- Specify the GTID set for specific-offset mode. 'scan.startup.timestamp-millis' = '1667232000000' -- Specify the startup timestamp for timestamp mode. ... )ImportantThe MySQL source prints the current offset at checkpoint time with INFO level logging. The log prefix is
Binlog offset on checkpoint {checkpoint-id}. This log helps you start the job from a specific checkpoint offset.If the table being read has undergone schema changes, starting from the earliest-offset, specific-offset, or timestamp modes may cause errors. This is because the Debezium reader internally maintains the latest table schema, and earlier data with mismatched schemas cannot be parsed correctly.
MySQL source tables without PK
If your source table lacks a primary key, you must manually specify a chunk key column using the
scan.incremental.snapshot.chunk.key-columnoption.The processing semantics depend on whether the column specified in scan.incremental.snapshot.chunk.key-column:
Processing semantics:
Immutable chunk key: If the selected column is never updated, the connector guarantees exactly-once semantics.
Mutable chunk key: If the column is updated, the connector provides At-Least-Once semantics.
Best practice for data integrity: To ensure data consistency when using At-Least-Once semantics, define a primary key in your downstream system and ensure the sink operation is idempotent (e.g., using
UPSERTorREPLACElogic) to handle potential duplicates.
Reading ApsaraDB RDS for MySQL backup logs
The MySQL CDC source can read archived binary logs from Object Storage Service (OSS). This is particularly useful in two scenarios:
Slow snapshot phase: When a snapshot takes a long time and the MySQL instance purges its local binary logs before the snapshot phase completes.
Log retention: When local binary logs have been deleted, but the backup files uploaded to OSS are still available.
Example:
CREATE TABLE mysql_source (...) WITH ( 'connector' = 'mysql-cdc', 'rds.region-id' = 'cn-beijing', 'rds.access-key-id' = 'xxxxxxxxx', 'rds.access-key-secret' = 'xxxxxxxxx', 'rds.db-instance-id' = 'rm-xxxxxxxxxxxxxxxxx', 'rds.main-db-id' = '12345678', 'rds.download.timeout' = '60s' ... )
Source reuse
In a single job, each MySQL source table starts its own binlog client. When all source tables connect to the same instance, this increases the database load. For more information, see Flink CDC FAQ.
Solution
VVR 8.0.7 and later supports MySQL CDC Source reuse. Source reuse merges CDC source tables with compatible configurations. Tables are merged when their configurations are identical except for the database name, table name, and
server-id.Procedure
Use the
SETcommand in your SQL job:SET 'table.optimizer.source-merge.enabled' = 'true'; # (VVR 8.0.8 and 8.0.9) Additionally set this: SET 'sql-gateway.exec-plan.enabled' = 'false';VVR 11.1 and later have reuse enabled by default.
Start the job stateless. Changing source reuse configurations changes the job topology, so the job must start without state. Otherwise, the job may fail to start or lose data. If sources are merged, you will see a
MergetableSourceScannode.
ImportantAfter you enable reuse, do not disable operator chaining. Setting
pipeline.operator-chainingtofalseincreases data serialization and deserialization overhead. The more Sources merged, the greater the overhead.In VVR 8.0.7, disabling operator chaining causes serialization issues.
Accelerate binlog reading
When the MySQL connector is used as a source, it parses binary logs to generate change events during the incremental phase. Binlog files record all table changes in binary format. Accelerate binary log file parsing using the following methods:
Enable parsing filter configuration
Use the configuration item
scan.only.deserialize.captured.tables.changelog.enabledto parse change events for specified tables.Optimize Debezium options
debezium.max.queue.size: 162580 debezium.max.batch.size: 40960 debezium.poll.interval.ms: 50debezium.max.queue.size: Max records the blocking queue can hold. When Debezium reads an event stream from the database, it places events into a blocking queue before writing them downstream. The default value is 8192.debezium.max.batch.size: Max events the connector processes in each iteration. The default value is 2048.debezium.poll.interval.ms: The number of milliseconds the connector waits before requesting new change events. The default value is 1000 milliseconds (1 second).
Example:
CREATE TABLE mysql_source (...) WITH (
'connector' = 'mysql-cdc',
-- Debezium configuration
'debezium.max.queue.size' = '162580',
'debezium.max.batch.size' = '40960',
'debezium.poll.interval.ms' = '50',
-- Enable parsing filter
'scan.only.deserialize.captured.tables.changelog.enabled' = 'true', -- Only parse change events for specified tables.
...
)source:
type: mysql
name: MySQL Source
hostname: ${mysql.hostname}
port: ${mysql.port}
username: ${mysql.username}
password: ${mysql.password}
tables: ${mysql.source.table}
server-id: 7601-7604
# Debezium configuration
debezium.max.queue.size: 162580
debezium.max.batch.size: 40960
debezium.poll.interval.ms: 50
# Enable parsing filter
scan.only.deserialize.captured.tables.changelog.enabled: trueThe fully managed MySQL connector can consume binary logs at up to 85 MB/s, approximately twice the throughput of the Apache Flink connector for MySQL. If the binlog generation rate exceeds 85 MB/s (a 512 MB file every 6 seconds), job latency increases. Latency decreases when the generation rate drops. Large transactions may briefly increase processing latency.
DataStream API
To read and write data using DataStream API, use the corresponding DataStream connector. For setup instructions, see Integrate DataStream connectors.
Create a DataStream API program and use MySqlSource. The following shows the code and POM dependency:
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import com.ververica.cdc.debezium.JsonDebeziumDeserializationSchema;
import com.ververica.cdc.connectors.mysql.source.MySqlSource;
public class MySqlSourceExample {
public static void main(String[] args) throws Exception {
MySqlSource<String> mySqlSource = MySqlSource.<String>builder()
.hostname("yourHostname")
.port(yourPort)
.databaseList("yourDatabaseName") // set captured database
.tableList("yourDatabaseName.yourTableName") // set captured table
.username("yourUsername")
.password("yourPassword")
.deserializer(new JsonDebeziumDeserializationSchema()) // converts SourceRecord to JSON String
.build();
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// enable checkpoint
env.enableCheckpointing(3000);
env
.fromSource(mySqlSource, WatermarkStrategy.noWatermarks(), "MySQL Source")
// set 4 parallel source tasks
.setParallelism(4)
.print().setParallelism(1); // use parallelism 1 for sink to keep message ordering
env.execute("Print MySQL Snapshot + Binlog");
}
}<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-core</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-base</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-common</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-api-java-bridge</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>com.alibaba.ververica</groupId>
<artifactId>ververica-connector-mysql</artifactId>
<version>${vvr.version}</version>
</dependency>When building MySqlSource, specify the following parameters:
Parameter | Description |
hostname | The IP address or hostname. |
port | The port number. |
databaseList | The MySQL database name. Note The database name supports regular expressions to read data from multiple databases. You can use |
username | The database username. |
password | The database password. |
deserializer | The deserializer converts SourceRecord type records to the specified type. Valid values:
|
Required pom parameters:
Parameter | Description |
${vvr.version} | The engine version of Realtime Compute for Apache Flink. Example: Note Refer to the version displayed on Maven, as hotfix versions may be released periodically without other notifications. |
${flink.version} | The Apache Flink version. Example: Important Ensure |
Troubleshooting & FAQ
See Flink CDC FAQ.