All Products
Search
Document Center

Realtime Compute for Apache Flink:MySQL connector

Last Updated:Dec 19, 2025

This topic describes how to use the MySQL connector.

Overview

The MySQL connector supports all databases that are compatible with the MySQL protocol, including ApsaraDB RDS for MySQL, PolarDB for MySQL, OceanBase (MySQL mode), and self-managed MySQL.

Important

When you use the MySQL connector to read from OceanBase, ensure that OceanBase Binlog is enabled and correctly configured. For more information, see Related operations. This feature is in public preview. Please evaluate it thoroughly and use it with caution.

The following table describes the support for the MySQL connector.

Category

Details

Supported type

Source table, dimension table, sink table, and data ingestion source

Runtime mode

Streaming mode only

Data format

Not applicable

Specific monitoring metrics

Monitoring metrics

  • Source table

    • currentFetchEventTimeLag: The time interval from when data is generated to when it is pulled by the Source Operator.

      This metric is valid only in the binlog phase. The value is 0 during the snapshot phase.

    • currentEmitEventTimeLag: The time interval from when data is generated to when it leaves the Source Operator.

      This metric is valid only in the binlog phase. The value is 0 during the snapshot phase.

    • sourceIdleTime: The duration for which the source table has not generated new data.

  • Dimension table and sink table: None.

Note

For more information about the metrics, see Metrics.

API type

DataStream, SQL, and data ingestion YAML

Update or delete data in sink tables

Supported

Features

The MySQL CDC source first takes a consistent snapshot of the existing data in a database and then seamlessly switches to reading the binary log (binlog) for change events. This process guarantees exactly-once semantics, ensuring no data is missed or duplicated, even during failures. The MySQL CDC source table supports concurrent reading of full data and implements lock-free reading and resumable data transfer by using an incremental snapshot algorithm. For more information, see About MySQL CDC source tables.

  • Unified batch and stream processing: Reads both full and incremental data, which eliminates the need to maintain separate pipelines.

  • Concurrent full data reading: Horizontally scales performance.

  • Seamless switch from full to incremental reading: Automatically scales in to save computing resources.

  • Resumable data transfer: Supports resumable data transfer during the full data reading phase for enhanced stability.

  • Lock-free reading: Reads full data without affecting online business operations.

  • Supports reading backup logs from ApsaraDB RDS for MySQL.

  • Parallel parsing of binlog files reduces read latency.

Prerequisites

Before you use a MySQL CDC source table, configure your MySQL database as described in Configure MySQL. These configurations are required to use a MySQL CDC source table.

ApsaraDB RDS for MySQL

  • Perform a network probe with Realtime Compute for Apache Flink to ensure network connectivity.

  • MySQL versions: 5.6, 5.7, and 8.0.x.

  • Enable the binary log (binlog). This is enabled by default.

  • Set the binlog format to ROW. This is the default format.

  • Set binlog_row_image to FULL. This is the default setting.

  • Disable Binary Log Transaction Compression. This feature was introduced in MySQL 8.0.20 and is disabled by default.

  • Create a MySQL user and grant the SELECTSHOW DATABASESREPLICATION SLAVE, and REPLICATION CLIENT permissions.

  • Create a MySQL database and table. For more information, see Create a database and an account for ApsaraDB RDS for MySQL. Use a privileged account to create the MySQL database to avoid operational failures due to insufficient permissions.

  • Configure an IP whitelist. For more information, see Create a database and an account for ApsaraDB RDS for MySQL.

PolarDB for MySQL

  • Perform a network probe with Realtime Compute for Apache Flink to ensure network connectivity.

  • MySQL versions: 5.6, 5.7, and 8.0.x.

  • Enable the binary log (binlog). This is disabled by default.

  • Set the binlog format to ROW. This is the default format.

  • Set binlog_row_image to FULL. This is the default setting.

  • Disable Binary Log Transaction Compression. This feature was introduced in MySQL 8.0.20 and is disabled by default.

  • Create a MySQL user and grant the SELECTSHOW DATABASESREPLICATION SLAVE, and REPLICATION CLIENT permissions.

  • Create a MySQL database and table. For more information, see Create a database and an account for PolarDB for MySQL. Use a privileged account to create the MySQL database to avoid operational failures due to insufficient permissions.

  • Configure an IP whitelist. For more information, see Configure a whitelist for a PolarDB for MySQL cluster.

Self-managed MySQL

  • Perform a network probe with Realtime Compute for Apache Flink to ensure network connectivity.

  • MySQL versions: 5.6, 5.7, and 8.0.x.

  • Enable the binary log (binlog). This is disabled by default.

  • Set the binlog format to ROW. The default format is STATEMENT.

  • Set binlog_row_image to FULL. This is the default setting.

  • Disable Binary Log Transaction Compression. This feature was introduced in MySQL 8.0.20 and is disabled by default.

  • Create a MySQL user and grant the SELECTSHOW DATABASESREPLICATION SLAVE, and REPLICATION CLIENT permissions.

  • Create a MySQL database and table. For more information, see Create a database and an account for a self-managed MySQL instance. Use a privileged account to create the MySQL database to avoid operational failures due to insufficient permissions.

  • Configure an IP whitelist. For more information, see Configure a whitelist for a self-managed MySQL instance.

Limitations

General limitations

  • The MySQL CDC source table does not support defining a watermark.

  • In CTAS and CDAS jobs, the MySQL CDC source table can synchronize partial schema changes. For more information about the supported change types, see Schema evolution synchronization policies.

  • The MySQL CDC connector does not support Binary Log Transaction Compression. Therefore, when you use the MySQL CDC connector to consume incremental data, ensure that this feature is disabled. Otherwise, incremental data may fail to be retrieved.

Limitations for ApsaraDB RDS for MySQL

  • For ApsaraDB RDS for MySQL, we do not recommend reading data from a secondary database or a read-only replica. The binlog retention period for these instances is short by default. If the binlog expires and is cleared, the job may fail to consume binlog data and report an error.

  • By default, ApsaraDB RDS for MySQL enables parallel synchronization between the primary and secondary databases and does not guarantee transaction order consistency. This may cause some data to be missed during a primary-secondary switchover and checkpoint recovery. You can manually enable the slave_preserve_commit_order option in ApsaraDB RDS for MySQL to resolve this issue.

Limitations for PolarDB for MySQL

MySQL CDC source tables do not support reading from a Multi-master Cluster of PolarDB for MySQL 1.0.19 or earlier. For more information, see What is a Multi-master Cluster?. The binlog generated by these clusters may contain duplicate table IDs, which can cause schema mapping errors in the CDC source table and lead to errors when parsing binlog data.

Limitations for open source MySQL

By default, MySQL maintains transaction order during primary-replica binlog replication. If a MySQL replica enables parallel replication (slave_parallel_workers> 1) but does not have slave_preserve_commit_order=ON, its transaction commit order may be inconsistent with the primary database. When Flink CDC recovers from a checkpoint, it may miss data due to this order inconsistency. We recommend setting slave_preserve_commit_order = ON on the MySQL replica, or setting slave_parallel_workers = 1, which may sacrifice replication performance.

Notes

  • Explicitly configure a different Server ID for each MySQL CDC data source

    Purpose of server ID

    You must explicitly configure a different Server ID for each MySQL CDC data source. If multiple MySQL CDC data sources share the same Server ID and cannot be reused, it causes disordered binlog offsets and can cause data to be over-read or under-read.

    Configure server ID in different scenarios

    You can specify the Server ID in the DDL statement, but we recommend that you configure it by using dynamic hints instead.

    • Parallelism = 1 or incremental snapshot disabled

      ## When the incremental snapshot framework is not enabled or the parallelism is 1, you can specify a specific Server ID.
      SELECT * FROM source_table /*+ OPTIONS('server-id'='123456') */ ;
    • Parallelism > 1 and incremental snapshot enabled

      ## You must specify a Server ID range. The number of available Server IDs in the range must be greater than or equal to the parallelism. Assume the parallelism is 3.
      SELECT * FROM source_table /*+ OPTIONS('server-id'='123456-123458') */ ;
    • Data synchronization with CTAS

      When you use CTAS for data synchronization, if the CDC data sources have the same configuration, the sources are automatically reused. In this case, you can configure the same Server ID for multiple CDC data sources. For more information, see Example 4: Multiple CTAS statements.

    • Multiple non-CTAS source tables that cannot be reused

      If a job contains multiple MySQL CDC source tables and does not use CTAS statements for synchronization, the data sources cannot be reused. You must provide a different Server ID for each CDC source table. Similarly, if the incremental snapshot framework is enabled and the parallelism is greater than 1, you must specify a Server ID range.

      select * from 
        source_table1 /*+ OPTIONS('server-id'='123456-123457') */
      left join 
        source_table2 /*+ OPTIONS('server-id'='123458-123459') */
      on source_table1.id=source_table2.id;
  • Sink table

    • Do not declare an auto-increment primary key in the DDL statement. MySQL automatically populates this field when writing data.

    • Declare at least one non-primary key field, otherwise an error is reported.

    • NOT ENFORCED in the DDL statement indicates that Flink does not perform a validity check on the primary key. You must ensure the correctness and integrity of the primary key. For more information, see Validity Check.

  • Dimension table

    To use indexes for query acceleration, the field order in the JOIN clause must match the index definition order. This is known as the leftmost prefix rule. For example, if the index is on (a, b, c), the JOIN condition should be ON t.a = x AND t.b = y.

    Flink-generated SQL may be rewritten by the optimizer, which can prevent the index from being used during the actual database query. To verify whether an index is used, check the execution plan (EXPLAIN) or the slow query log in MySQL to see the actual SELECT statement that is executed.

SQL

You can use the MySQL connector in SQL jobs as a source table, dimension table, or sink table.

Syntax

CREATE TEMPORARY TABLE mysqlcdc_source (
   order_id INT,
   order_date TIMESTAMP(0),
   customer_name STRING,
   price DECIMAL(10, 5),
   product_id INT,
   order_status BOOLEAN,
   PRIMARY KEY(order_id) NOT ENFORCED
) WITH (
  'connector' = 'mysql',
  'hostname' = '<yourHostname>',
  'port' = '3306',
  'username' = '<yourUsername>',
  'password' = '<yourPassword>',
  'database-name' = '<yourDatabaseName>',
  'table-name' = '<yourTableName>'
);

Note
  • How the connector writes to a sink table: For each record received, the connector constructs and executes a single SQL statement. The specific SQL statement depends on the table structure:

    • For a sink table without a primary key, the connector executes an INSERT INTO table_name (column1, column2, ...) VALUES (value1, value2, ...); statement.

    • For a sink table with a primary key, the connector executes an INSERT INTO table_name (column1, column2, ...) VALUES (value1, value2, ...) ON DUPLICATE KEY UPDATE column1 = VALUES(column1), column2 = VALUES(column2), ...; statement. Note: If the physical table has a unique index constraint other than the primary key, inserting two records with different primary keys but the same unique index value causes a unique index conflict in the downstream database. This conflict results in data overwrites and potential data loss.

  • If you define an auto-increment primary key in the MySQL database, do not declare the auto-increment field in the Flink DDL statement. The database automatically populates this field during data insertion. The connector only supports writing and deleting data with an auto-increment field and does not support updates.

Connector options

  • General

    Option

    Description

    Required

    Data type

    Default value

    Remarks

    connector

    The type of the table.

    Yes

    STRING

    None

    When used as a source table, set this option to mysql-cdc or mysql. They are equivalent. When used as a dimension or sink table, set this option to mysql.

    hostname

    The IP address or hostname of the MySQL database.

    Yes

    STRING

    None

    We recommend that you use a VPC endpoint.

    Note

    If the MySQL database and Realtime Compute for Apache Flink are not in the same VPC, you must establish a cross-VPC network connection or use the Internet for access. For more information, see Manage and operate workspaces and How can a fully managed Flink cluster access the Internet?.

    username

    The username for the MySQL database service.

    Yes

    STRING

    None

    None.

    password

    The password for the MySQL database service.

    Yes

    STRING

    None

    None.

    database-name

    The name of the MySQL database.

    Yes

    STRING

    None

    • When used as a source table, this option supports regular expressions to read data from multiple databases.

    • When you use a regular expression, avoid using the ^ and $ symbols to match the start and end of the string. For more information, see the Remarks column for the table-name option.

    table-name

    The name of the MySQL table.

    Yes

    STRING

    None

    • When used as a source table, this option supports regular expressions to read data from multiple tables.

      When you read data from multiple MySQL tables, submit multiple CTAS statements as a single job. This avoids enabling multiple Binlog listeners and improves performance and efficiency. For more information, see Multiple CTAS statements: Submit as a single job.

    • When you use a regular expression, avoid using the ^ and $ symbols to match the start and end of the string. For more information, see the following note.

    Note

    When a MySQL CDC source table matches table names, it combines the database-name and table-name that you specify into a full-path regular expression using the string \\. (the character . is used in VVR versions earlier than 8.0.1). It then uses this regular expression to match the fully qualified names of tables in the MySQL database.

    For example, if you set 'database-name'='db_.*' and 'table-name'='tb_.+', the connector uses the regular expression db_.*\\.tb_.+ (db_.*.tb_.+ in versions earlier than 8.0.1) to match the fully qualified table names to determine which tables to read.

    port

    The port number of the MySQL database service.

    No

    INTEGER

    3306

    None.

  • Source-specific

    Option

    Description

    Required

    Data type

    Default value

    Remarks

    server-id

    A numeric ID for the database client.

    No

    STRING

    A random value between 5400 and 6400 is generated.

    This ID must be globally unique within the MySQL cluster. We recommend that you set a different ID for each job that connects to the same database.

    This option also supports an ID range, such as 5400-5408. When incremental reading is enabled, concurrent reading is supported. In this case, we recommend that you set an ID range so that each concurrent reader uses a different ID. For more information, see Use of server ID.

    scan.incremental.snapshot.enabled

    Specifies whether to enable incremental snapshots.

    No

    BOOLEAN

    true

    Incremental snapshot is enabled by default. Incremental snapshot is a new mechanism for reading full data snapshots. Compared to the old snapshot reading method, incremental snapshots offer several advantages:

    • The source can read full data in parallel.

    • The source supports chunk-level checkpoints when reading full data.

    • The source does not need to acquire a global read lock (FLUSH TABLES WITH read lock) when reading full data.

    If you want the source to support concurrent reading, each concurrent reader needs a unique server ID. Therefore, server-id must be a range, such as 5400-6400, and the range must be greater than or equal to the degree of parallelism.

    Note

    This configuration item is removed in Ververica Runtime (VVR) 11.1 and later.

    scan.incremental.snapshot.chunk.size

    The size of each chunk in number of rows.

    No

    INTEGER

    8096

    When incremental snapshot reading is enabled, the table is split into multiple chunks for reading. The data of a chunk is buffered in memory before it is fully read.

    A smaller number of rows per chunk results in a larger total number of chunks in the table. While this improves the granularity of fault recovery, it may lead to out-of-memory (OOM) errors and lower overall throughput. Therefore, you need to find a balance and set a reasonable chunk size.

    scan.snapshot.fetch.size

    The maximum number of records to fetch at a time when reading the full data of a table.

    No

    INTEGER

    1024

    None.

    scan.startup.mode

    The startup mode for data consumption.

    No

    STRING

    initial

    Valid values:

    • initial (Default): Scans the full historical data first and then reads the latest Binlog data upon the first startup.

    • latest-offset: Does not scan historical data upon the first startup. It starts reading from the end of the Binlog, which means it only reads the latest changes made after the connector starts.

    • earliest-offset: Does not scan historical data. It starts reading from the earliest available Binlog.

    • specific-offset: Does not scan historical data. It starts from a specific Binlog offset that you specify. You can specify the offset by configuring both scan.startup.specific-offset.file and scan.startup.specific-offset.pos, or by configuring only scan.startup.specific-offset.gtid-set to start from a specific GTID set.

    • timestamp: Does not scan historical data. It starts reading the Binlog from a specified timestamp. The timestamp is specified by scan.startup.timestamp-millis in milliseconds.

    Important

    When you use the earliest-offset, specific-offset, or timestamp startup mode, make sure that the schema of the corresponding table does not change between the specified Binlog consumption position and the job startup time to avoid errors due to schema mismatch.

    scan.startup.specific-offset.file

    The Binlog filename of the start offset when using the specific offset startup mode.

    No

    STRING

    None

    When you use this configuration, you must set scan.startup.mode to specific-offset. The filename format is, for example, mysql-bin.000003.

    scan.startup.specific-offset.pos

    The offset in the specified Binlog file to start from when using the specific offset startup mode.

    No

    INTEGER

    None

    When you use this configuration, you must set scan.startup.mode to specific-offset.

    scan.startup.specific-offset.gtid-set

    The GTID set of the start offset when using the specific offset startup mode.

    No

    STRING

    None

    When you use this configuration, you must set scan.startup.mode to specific-offset. The GTID set format is, for example, 24DA167-0C0C-11E8-8442-00059A3C7B00:1-19.

    scan.startup.timestamp-millis

    The start offset as a millisecond timestamp when using the timestamp startup mode.

    No

    LONG

    None

    When you use this configuration, you must set scan.startup.mode to timestamp. The timestamp is in milliseconds.

    Important

    When you specify a time, MySQL CDC attempts to read the initial event of each Binlog file to determine its timestamp and locate the Binlog file corresponding to the specified time. Make sure that the Binlog file for the specified timestamp has not been cleared from the database and can be read.

    server-time-zone

    The session time zone used by the database.

    No

    STRING

    If you do not specify this option, the system uses the time zone of the Flink job's runtime environment as the database server time zone. This is the time zone of the zone you selected.

    Example: Asia/Shanghai. This option controls how TIMESTAMP types in MySQL are converted to STRING types. For more information, see Debezium temporal values.

    debezium.min.row.count.to.stream.results

    When the number of rows in a table exceeds this value, batch reading mode is used.

    No

    INTEGER

    1000

    Flink reads data from a MySQL source table in the following ways:

    • Full read: Reads the entire table's data directly into memory. This method is fast but consumes a corresponding amount of memory. If the source table is very large, it may pose an OOM risk.

    • Batch read: Reads data in multiple batches, with a certain number of rows per batch, until all data is read. This method avoids OOM risks when reading large tables but is relatively slower.

    connect.timeout

    The maximum time to wait for a connection to the MySQL database server to time out before retrying.

    No

    DURATION

    30s

    None.

    connect.max-retries

    The maximum number of retries after a failed connection to the MySQL database service.

    No

    INTEGER

    3

    None.

    connection.pool.size

    The size of the database connection pool.

    No

    INTEGER

    20

    The database connection pool is used to reuse connections, which can reduce the number of database connections.

    jdbc.properties.*

    Custom connection options in the JDBC URL.

    No

    STRING

    None

    You can pass custom connection options. For example, to not use the SSL protocol, you can set 'jdbc.properties.useSSL' = 'false'.

    For more information about supported connection options, see MySQL Configuration Properties.

    debezium.*

    Custom options for Debezium to read Binlogs.

    No

    STRING

    None

    You can pass custom Debezium options. For example, use 'debezium.event.deserialization.failure.handling.mode'='ignore' to specify the handling logic for parsing errors.

    heartbeat.interval

    The interval at which the source uses heartbeat events to advance the Binlog offset.

    No

    DURATION

    30s

    Heartbeat events are used to advance the Binlog offset in the source, which is very useful for slowly updated tables in MySQL. For such tables, the Binlog offset does not advance automatically. Heartbeat events can push the Binlog offset forward, preventing issues where an expired Binlog offset causes the job to fail and require a stateless restart.

    scan.incremental.snapshot.chunk.key-column

    The column used to split chunks during the snapshot phase.

    See Remarks.

    STRING

    None

    • Required for tables without a primary key. The selected column must be of a non-null type (NOT NULL).

    • Optional for tables with a primary key. You can only select one column from the primary key.

    rds.region-id

    The region ID of the ApsaraDB RDS for MySQL instance.

    Required when reading archived logs from OSS.

    STRING

    None

    For more information about region IDs, see Regions and zones.

    rds.access-key-id

    The AccessKey ID of the account for the ApsaraDB RDS for MySQL instance.

    Required when reading archived logs from OSS.

    STRING

    None

    For more information, see How do I view the AccessKey ID and AccessKey secret?.

    Important

    To prevent your AccessKey information from being leaked, we recommend that you manage your AccessKey ID using secrets management. For more information, see Manage variables.

    rds.access-key-secret

    The AccessKey secret of the account for the ApsaraDB RDS for MySQL instance.

    Required when reading archived logs from OSS.

    STRING

    None

    For more information, see How do I view the AccessKey ID and AccessKey secret?

    Important

    To prevent your AccessKey information from being leaked, we recommend that you manage your AccessKey secret using secrets management. For more information, see Manage variables.

    rds.db-instance-id

    The instance ID of the ApsaraDB RDS for MySQL instance.

    Required when reading archived logs from OSS.

    STRING

    None

    None.

    rds.main-db-id

    The ID of the primary database of the ApsaraDB RDS for MySQL instance.

    No

    STRING

    None

    rds.download.timeout

    The timeout period for downloading a single archived log from OSS.

    No

    DURATION

    60s

    None.

    rds.endpoint

    The service endpoint for obtaining OSS Binlog information.

    No

    STRING

    None

    • For more information about valid values, see Endpoints.

    • Supported only in VVR 8.0.8 and later.

    scan.incremental.close-idle-reader.enabled

    Specifies whether to close idle readers after the snapshot phase ends.

    No

    BOOLEAN

    false

    • Supported only in VVR 8.0.1 and later.

    • For this configuration to take effect, you must set execution.checkpointing.checkpoints-after-tasks-finish.enabled to true.

    scan.read-changelog-as-append-only.enabled

    Specifies whether to convert the changelog stream to an append-only stream.

    No

    BOOLEAN

    false

    Valid values:

    • true: All types of messages (including INSERT, DELETE, UPDATE_BEFORE, and UPDATE_AFTER) are converted to INSERT messages. Enable this only in special scenarios, such as when you need to save delete messages from the upstream table.

    • false (Default): All types of messages are sent downstream as they are.

    Note

    Supported only in VVR 8.0.8 and later.

    scan.only.deserialize.captured.tables.changelog.enabled

    During the incremental phase, specifies whether to deserialize change events only for the specified tables.

    No

    BOOLEAN

    • The default value is false in VVR 8.x versions.

    • The default value is true in VVR 11.1 and later.

    Valid values:

    • true: Deserializes change data only for the target tables to accelerate Binlog reading.

    • false (Default): Deserializes change data for all tables.

    Note
    • Supported only in VVR 8.0.7 and later.

    • When used in VVR 8.0.8 and earlier, the option name must be changed to debezium.scan.only.deserialize.captured.tables.changelog.enable.

    scan.parse.online.schema.changes.enabled

    During the incremental phase, specifies whether to attempt to parse RDS lock-free DDL events.

    No

    BOOLEAN

    false

    Valid values:

    • true: Parses RDS lock-free DDL events.

    • false (Default): Does not parse RDS lock-free DDL events.

    This is an experimental feature. We recommend that you take a snapshot of the Flink job for recovery before performing online lock-free changes.

    Note

    Supported only in VVR 11.1 and later.

    scan.incremental.snapshot.backfill.skip

    Specifies whether to skip backfill during the snapshot reading phase.

    No

    BOOLEAN

    false

    Valid values:

    • true: Skips backfill during the snapshot reading phase.

    • false (Default): Does not skip backfill during the snapshot reading phase.

    If you skip backfill, changes to the table during the snapshot phase are read in the later incremental phase instead of being merged into the snapshot.

    Important

    Skipping backfill may lead to data inconsistency because changes that occur during the snapshot phase might be replayed. Only at-least-once semantics are guaranteed.

    Note

    Supported only in VVR 11.1 and later.

    scan.incremental.snapshot.unbounded-chunk-first.enabled

    Specifies whether to distribute unbounded chunks first during the snapshot reading phase.

    No

    BOOELEAN

    false

    Valid values:

    • true: Distributes unbounded chunks first during the snapshot reading phase.

    • false (Default): Does not distribute unbounded chunks first during the snapshot reading phase.

    This is an experimental feature. Enabling it can reduce the risk of OOM errors when a TaskManager synchronizes the last chunk during the snapshot phase. We recommend adding this before the job's first startup.

    Note

    Supported only in VVR 11.1 and later.

  • Dimension table-specific

    Option

    Description

    Required

    Data type

    Default value

    Remarks

    url

    The MySQL JDBC URL.

    No

    STRING

    None

    The URL format is jdbc:mysql://<endpoint>:<port>/<database_name>.

    lookup.max-retries

    The maximum number of retries after a failed data read.

    No

    INTEGER

    3

    Supported only in VVR 6.0.7 and later.

    lookup.cache.strategy

    The cache policy.

    No

    STRING

    None

    Supports three cache policies: None, LRU, and ALL. For more information about the values, see Background information.

    Note

    When you use the LRU cache policy, you must also configure the lookup.cache.max-rows option.

    lookup.cache.max-rows

    The maximum number of cached rows.

    No

    INTEGER

    100000

    • If you select the LRU cache policy, you must set the cache size.

    • If you select the ALL cache policy, you do not need to set the cache size.

    lookup.cache.ttl

    The cache time-to-live (TTL).

    No

    DURATION

    10 s

    The configuration of lookup.cache.ttl depends on lookup.cache.strategy:

    • If lookup.cache.strategy is set to None, you do not need to configure lookup.cache.ttl. This means the cache does not expire.

    • If lookup.cache.strategy is set to LRU, lookup.cache.ttl is the cache TTL. By default, it does not expire.

    • If lookup.cache.strategy is set to ALL, lookup.cache.ttl is the cache reload time. By default, it is not reloaded.

    Use a time format, such as 1min or 10s.

    lookup.max-join-rows

    The maximum number of results returned when querying the dimension table for each record in the primary table.

    No

    INTEGER

    1024

    None.

    lookup.filter-push-down.enabled

    Specifies whether to enable filter pushdown for the dimension table.

    No

    BOOLEAN

    false

    Valid values:

    • true: Enables filter pushdown for the dimension table. When loading data from the MySQL database table, the dimension table filters data in advance based on the conditions set in the SQL job.

    • false (Default): Disables filter pushdown for the dimension table. When loading data from the MySQL database table, the dimension table loads all data.

    Note

    Supported only in VVR 8.0.7 and later.

    Important

    Filter pushdown for a dimension table should only be enabled when the Flink table is used as a dimension table. MySQL source tables do not support enabling filter pushdown. If a Flink table is used as both a source table and a dimension table, and filter pushdown is enabled for the dimension table, you must explicitly set this configuration item to false for the source table using SQL Hints. Otherwise, the job may run abnormally.

  • Sink-specific

    Option

    Description

    Required

    Data type

    Default value

    Remarks

    url

    The MySQL JDBC URL.

    No

    STRING

    None

    The URL format is jdbc:mysql://<endpoint>:<port>/<database_name>.

    sink.max-retries

    The maximum number of retries after a failed data write.

    No

    INTEGER

    3

    None.

    sink.buffer-flush.batch-size

    The number of records in a single batch write.

    No

    INTEGER

    4096

    None.

    sink.buffer-flush.max-rows

    The number of data records buffered in memory.

    No

    INTEGER

    10000

    This option takes effect only after a primary key is specified.

    sink.buffer-flush.interval

    The time interval for flushing the buffer. If the data in the buffer does not meet the output conditions after the specified waiting time, the system automatically outputs all data in the buffer.

    No

    DURATION

    1s

    None.

    sink.ignore-delete

    Specifies whether to ignore DELETE operations.

    No

    BOOLEAN

    false

    When the stream generated by Flink SQL contains delete or update-before records, if multiple output tasks update different fields of the same table simultaneously, data inconsistency may occur.

    For example, after a record is deleted, another task updates only some fields. The un-updated fields will become null or their default values, causing data errors.

    By setting sink.ignore-delete to true, you can ignore upstream DELETE and UPDATE_BEFORE operations to avoid such issues.

    Note
    • UPDATE_BEFORE is part of Flink's retraction mechanism, used to "retract" the old value in an update operation.

    • When ignoreDelete = true, all DELETE and UPDATE_BEFORE records are skipped. Only INSERT and UPDATE_AFTER records are processed.

    sink.ignore-null-when-update

    When updating data, specifies whether to update the corresponding field to null or skip the update for that field if the incoming data field is null.

    No

    BOOLEAN

    false

    Valid values:

    • true: Does not update the field. This option can be set to true only when a primary key is set for the Flink table. When set to true:

      • In VVR 8.0.6 and earlier, batch execution is not supported for writing data to the sink table.

      • In VVR 8.0.7 and later, batch execution is supported for writing data to the sink table.

        Although batch writing can significantly improve write efficiency and overall throughput, it can cause data latency and OOM risks. Therefore, make a trade-off based on your actual business scenario.

    • false: Updates the field to null.

    Note

    This option is supported only in VVR 8.0.5 and later.

Type mapping

  • CDC source table

    MySQL CDC field type

    Flink field type

    TINYINT

    TINYINT

    SMALLINT

    SMALLINT

    TINYINT UNSIGNED

    TINYINT UNSIGNED ZEROFILL

    INT

    INT

    MEDIUMINT

    SMALLINT UNSIGNED

    SMALLINT UNSIGNED ZEROFILL

    BIGINT

    BIGINT

    INT UNSIGNED

    INT UNSIGNED ZEROFILL

    MEDIUMINT UNSIGNED

    MEDIUMINT UNSIGNED ZEROFILL

    BIGINT UNSIGNED

    DECIMAL(20, 0)

    BIGINT UNSIGNED ZEROFILL

    SERIAL

    FLOAT [UNSIGNED] [ZEROFILL]

    FLOAT

    DOUBLE [UNSIGNED] [ZEROFILL]

    DOUBLE

    DOUBLE PRECISION [UNSIGNED] [ZEROFILL]

    REAL [UNSIGNED] [ZEROFILL]

    NUMERIC(p, s) [UNSIGNED] [ZEROFILL]

    DECIMAL(p, s)

    DECIMAL(p, s) [UNSIGNED] [ZEROFILL]

    BOOLEAN

    BOOLEAN

    TINYINT(1)

    DATE

    DATE

    TIME [(p)]

    TIME [(p)] [WITHOUT TIME ZONE]

    DATETIME [(p)]

    TIMESTAMP [(p)] [WITHOUT TIME ZONE]

    TIMESTAMP [(p)]

    TIMESTAMP [(p)]

    TIMESTAMP [(p)] WITH LOCAL TIME ZONE

    CHAR(n)

    STRING

    VARCHAR(n)

    TEXT

    BINARY

    BYTES

    VARBINARY

    BLOB

    Important

    We recommend that you do not use the TINYINT(1) type in MySQL to store values other than 0 and 1. When property-version=0, the MySQL CDC source table maps TINYINT(1) to Flink's BOOLEAN type by default, which can cause data inaccuracies. If you need to use the TINYINT(1) type to store values other than 0 and 1, see the configuration option catalog.table.treat-tinyint1-as-boolean.

  • Dimension table and sink table

    MySQL field type

    Flink field type

    TINYINT

    TINYINT

    SMALLINT

    SMALLINT

    TINYINT UNSIGNED

    INT

    INT

    MEDIUMINT

    SMALLINT UNSIGNED

    BIGINT

    BIGINT

    INT UNSIGNED

    BIGINT UNSIGNED

    DECIMAL(20, 0)

    FLOAT

    FLOAT

    DOUBLE

    DOUBLE

    DOUBLE PRECISION

    NUMERIC(p, s)

    DECIMAL(p, s)

    Note

    p must be less than or equal to 38.

    DECIMAL(p, s)

    BOOLEAN

    BOOLEAN

    TINYINT(1)

    DATE

    DATE

    TIME [(p)]

    TIME [(p)] [WITHOUT TIME ZONE]

    DATETIME [(p)]

    TIMESTAMP [(p)] [WITHOUT TIME ZONE]

    TIMESTAMP [(p)]

    CHAR(n)

    CHAR(n)

    VARCHAR(n)

    VARCHAR(n)

    BIT(n)

    BINARY(⌈n/8⌉)

    BINARY(n)

    BINARY(n)

    VARBINARY(N)

    VARBINARY(N)

    TINYTEXT

    STRING

    TEXT

    MEDIUMTEXT

    LONGTEXT

    TINYBLOB

    BYTES

    Important

    Flink only supports MySQL BLOB type records that are less than or equal to 2,147,483,647 (2^31 - 1) bytes.

    BLOB

    MEDIUMBLOB

    LONGBLOB

Data ingestion

You can use the MySQL connector as a data source in a data ingestion YAML job.

Syntax

source:
   type: mysql
   name: MySQL Source
   hostname: localhost
   port: 3306
   username: <username>
   password: <password>
   tables: adb.\.*, bdb.user_table_[0-9]+, [app|web].order_\.*
   server-id: 5401-5404

sink:
  type: xxx

Connector options

Option

Description

Required

Data type

Default value

Remarks

type

The type of the data source.

Yes

STRING

None

Set this option to mysql.

name

The name of the data source.

No

STRING

None

None.

hostname

The IP address or hostname of the MySQL database.

Yes

STRING

None

We recommend that you use a VPC endpoint.

Note

If the MySQL database and Realtime Compute for Apache Flink are not in the same VPC, you must establish a cross-VPC network connection or use the Internet for access. For more information, see Workspace and namespace management and Workspace and namespace management.

username

The username for the MySQL database service.

Yes

STRING

None

None.

password

The password for the MySQL database service.

Yes

STRING

None

None.

tables

The MySQL data tables to synchronize.

Yes

STRING

None

  • This option supports regular expressions to read data from multiple tables.

  • You can use commas to separate multiple regular expressions.

Note
  • Do not use the start and end matching characters ^ and $ in the regular expression. In version 11.2, the database regular expression is obtained by splitting with a period. Start and end matching characters will make the obtained database regular expression unusable. For example, ^db.user_[0-9]+$ needs to be changed to db.user_[0-9]+.

  • A period is used to separate the database name and table name. To use a period to match any character, you must escape the period with a backslash. For example: db0.\.*, db1.user_table_[0-9]+, db[1-2].[app|web]order_\.*.

tables.exclude

The tables to exclude from synchronization.

No

STRING

None

  • This option supports regular expressions to exclude data from multiple tables.

  • You can use commas to separate multiple regular expressions.

Note

A period is used to separate the database name and table name. To use a period to match any character, you must escape the period with a backslash. For example: db0.\.*, db1.user_table_[0-9]+, db[1-2].[app|web]order_\.*.

port

The port number of the MySQL database service.

No

INTEGER

3306

None.

schema-change.enabled

Specifies whether to send schema change events.

No

BOOLEAN

true

None.

server-id

The numeric ID or range of IDs for the database client used for synchronization.

No

STRING

A random value between 5400 and 6400 is generated.

This ID must be globally unique within the MySQL cluster. We recommend that you set a different ID for each job that connects to the same database.

This option also supports an ID range, such as 5400-5408. When incremental reading is enabled, concurrent reading is supported. In this case, we recommend that you set an ID range so that each concurrent reader uses a different ID.

jdbc.properties.*

Custom connection options in the JDBC URL.

No

STRING

None

You can pass custom connection options. For example, to not use the SSL protocol, you can set 'jdbc.properties.useSSL' = 'false'.

For more information about supported connection options, see MySQL Configuration Properties.

debezium.*

Custom options for Debezium to read Binlogs.

No

STRING

None

You can pass custom Debezium options. For example, use 'debezium.event.deserialization.failure.handling.mode'='ignore' to specify the handling logic for parsing errors.

scan.incremental.snapshot.chunk.size

The size of each chunk in number of rows.

No

INTEGER

8096

A MySQL table is split into multiple chunks for reading. The data of a chunk is buffered in memory before it is fully read.

A smaller number of rows per chunk results in a larger total number of chunks in the table. While this improves the granularity of fault recovery, it may lead to OOM errors and lower overall throughput. Therefore, you need to find a balance and set a reasonable chunk size.

scan.snapshot.fetch.size

The maximum number of records to fetch at a time when reading the full data of a table.

No

INTEGER

1024

None.

scan.startup.mode

The startup mode for data consumption.

No

STRING

initial

Valid values:

  • initial (Default): Scans the full historical data first and then reads the latest Binlog data upon the first startup.

  • latest-offset: Does not scan historical data upon the first startup. It starts reading from the end of the Binlog, which means it only reads the latest changes made after the connector starts.

  • earliest-offset: Does not scan historical data. It starts reading from the earliest available Binlog.

  • specific-offset: Does not scan historical data. It starts from a specific Binlog offset that you specify. You can specify the offset by configuring both scan.startup.specific-offset.file and scan.startup.specific-offset.pos, or by configuring only scan.startup.specific-offset.gtid-set to start from a specific GTID set.

  • timestamp: Does not scan historical data. It starts reading the Binlog from a specified timestamp. The timestamp is specified by scan.startup.timestamp-millis in milliseconds.

Important

For the earliest-offset, specific-offset, and timestamp startup modes, if the table schema at the startup time is different from the schema at the specified start offset time, the job will fail due to a schema mismatch. In other words, when using these three startup modes, you must ensure that the schema of the corresponding table does not change between the specified Binlog consumption position and the job startup time.

scan.startup.specific-offset.file

The Binlog filename of the start offset when using the specific offset startup mode.

No

STRING

None

When you use this configuration, you must set scan.startup.mode to specific-offset. The filename format is, for example, mysql-bin.000003.

scan.startup.specific-offset.pos

The offset in the specified Binlog file to start from when using the specific offset startup mode.

No

INTEGER

None

When you use this configuration, you must set scan.startup.mode to specific-offset.

scan.startup.specific-offset.gtid-set

The GTID set of the start offset when using the specific offset startup mode.

No

STRING

None

When you use this configuration, you must set scan.startup.mode to specific-offset. The GTID set format is, for example, 24DA167-0C0C-11E8-8442-00059A3C7B00:1-19.

scan.startup.timestamp-millis

The start offset as a millisecond timestamp when using the timestamp startup mode.

No

LONG

None

When you use this configuration, you must set scan.startup.mode to timestamp. The timestamp is in milliseconds.

Important

When you specify a time, MySQL CDC attempts to read the initial event of each Binlog file to determine its timestamp and locate the Binlog file corresponding to the specified time. Make sure that the Binlog file for the specified timestamp has not been cleared from the database and can be read.

server-time-zone

The session time zone used by the database.

No

STRING

If you do not specify this option, the system uses the time zone of the Flink job's runtime environment as the database server time zone. This is the time zone of the zone you selected.

Example: Asia/Shanghai. This option controls how TIMESTAMP types in MySQL are converted to STRING types. For more information, see Debezium temporal values.

scan.startup.specific-offset.skip-events

The number of Binlog events to skip when reading from a specific offset.

No

INTEGER

None

When you use this configuration, you must set scan.startup.mode to specific-offset.

scan.startup.specific-offset.skip-rows

The number of row changes to skip when reading from a specific offset. A single Binlog event may correspond to multiple row changes.

No

INTEGER

None

When you use this configuration, you must set scan.startup.mode to specific-offset.

connect.timeout

The maximum time to wait for a connection to the MySQL database server to time out before retrying.

No

DURATION

30s

None.

connect.max-retries

The maximum number of retries after a failed connection to the MySQL database service.

No

INTEGER

3

None.

connection.pool.size

The size of the database connection pool.

No

INTEGER

20

The database connection pool is used to reuse connections, which can reduce the number of database connections.

heartbeat.interval

The interval at which the source uses heartbeat events to advance the Binlog offset.

No

DURATION

30s

Heartbeat events are used to advance the Binlog offset in the source, which is very useful for slowly updated tables in MySQL. For such tables, the Binlog offset does not advance automatically. Heartbeat events can push the Binlog offset forward, preventing issues where an expired Binlog offset causes the job to fail and require a stateless restart.

scan.incremental.snapshot.chunk.key-column

The column used to split chunks during the snapshot phase.

No.

STRING

None

You can only select one column from the primary key.

rds.region-id

The region ID of the ApsaraDB RDS for MySQL instance.

Required when reading archived logs from OSS.

STRING

None

For more information about region IDs, see Regions and zones.

rds.access-key-id

The AccessKey ID of the account for the ApsaraDB RDS for MySQL instance.

Required when reading archived logs from OSS.

STRING

None

For more information, see How do I view the AccessKey ID and AccessKey secret?

Important

To prevent your AccessKey information from being leaked, we recommend that you manage your AccessKey ID using secrets management. For more information, see Manage variables.

rds.access-key-secret

The AccessKey secret of the account for the ApsaraDB RDS for MySQL instance.

Required when reading archived logs from OSS.

STRING

None

For more information, see How do I view the AccessKey ID and AccessKey secret?

Important

To prevent your AccessKey information from being leaked, we recommend that you manage your AccessKey secret using secrets management. For more information, see Manage variables.

rds.db-instance-id

The instance ID of the ApsaraDB RDS for MySQL instance.

Required when reading archived logs from OSS.

STRING

None

None.

rds.main-db-id

The ID of the primary database of the ApsaraDB RDS for MySQL instance.

No

STRING

None

For more information about how to obtain the primary database ID, see Log backup for ApsaraDB RDS for MySQL.

rds.download.timeout

The timeout period for downloading a single archived log from OSS.

No

DURATION

60s

None.

rds.endpoint

The service endpoint for obtaining OSS Binlog information.

No

STRING

None

For more information about valid values, see Endpoints.

rds.binlog-directory-prefix

The directory prefix for storing Binlog files.

No

STRING

rds-binlog-

None.

rds.use-intranet-link

Specifies whether to use an internal network to download Binlog files.

No

BOOLEAN

true

None.

rds.binlog-directories-parent-path

The absolute path of the parent directory where Binlog files are stored.

No

STRING

None

None.

chunk-meta.group.size

The size of the chunk metadata.

No

INTEGER

1000

If the metadata is larger than this value, it is transmitted in multiple parts.

chunk-key.even-distribution.factor.lower-bound

The lower bound of the chunk distribution factor for even splitting.

No

DOUBLE

0.05

If the distribution factor is less than this value, uneven splitting is used.

Chunk distribution factor = (MAX(chunk-key) - MIN(chunk-key) + 1) / Total number of rows.

chunk-key.even-distribution.factor.upper-bound

The upper bound of the chunk distribution factor for even splitting.

No

DOUBLE

1000.0

If the distribution factor is greater than this value, uneven splitting is used.

Chunk distribution factor = (MAX(chunk-key) - MIN(chunk-key) + 1) / Total number of rows.

scan.incremental.close-idle-reader.enabled

Specifies whether to close idle readers after the snapshot phase ends.

No

BOOLEAN

false

For this configuration to take effect, you must set execution.checkpointing.checkpoints-after-tasks-finish.enabled to true.

scan.only.deserialize.captured.tables.changelog.enabled

During the incremental phase, specifies whether to deserialize change events only for the specified tables.

No

BOOLEAN

  • The default value is false in VVR 8.x versions.

  • The default value is true in VVR 11.1 and later.

Valid values:

  • true: Deserializes change data only for the target tables to accelerate Binlog reading.

  • false (Default): Deserializes change data for all tables.

metadata-column.include-list

The metadata columns to pass to the downstream.

No

STRING

None

Available metadata includes table_name, database_name, op_ts, es_ts, and query_log. You can use commas to separate them.

Note

The MySQL CDC YAML connector does not need to and does not support adding the op_type metadata column. You can directly use __data_event_type__ in the Transform expression to get the change data type.

Important

The es_ts metadata column represents the start time of the transaction. It can only be added when using MySQL version 8.0.x. Do not add this metadata column when using earlier versions of MySQL.

scan.newly-added-table.enabled

When restarting from a checkpoint, specifies whether to synchronize newly added tables that were not matched in the previous run or to remove currently unmatched tables that are saved in the state.

No

BOOLEAN

false

This takes effect when restarting from a checkpoint or savepoint.

scan.binlog.newly-added-table.enabled

During the incremental phase, specifies whether to send data from newly added tables that are matched.

No

BOOLEAN

false

Cannot be enabled at the same time as scan.newly-added-table.enabled.

scan.incremental.snapshot.chunk.key-column

Specifies a column for certain tables to be used as the splitting key for chunks during the snapshot phase.

No

STRING

None

  • Use a colon (:) to connect the table name and field name to form a rule. The table name can be a regular expression. You can define multiple rules separated by semicolons (;). For example: db1.user_table_[0-9]+:col1;db[1-2].[app|web]_order_\\.*:col2.

  • For tables without a primary key, this is required. The selected column must not be null (NOT NULL). For tables with a primary key, this is optional. You can select only one column from the primary key.

scan.parse.online.schema.changes.enabled

Specifies whether to parse RDS lock-free DDL events during the incremental phase.

No

BOOLEAN

false

Valid values:

  • true: Parses RDS lock-free DDL events.

  • false (default): Does not parse RDS lock-free DDL events.

This is an experimental feature. Before performing an online lock-free DDL change, take a snapshot of the Flink job for recovery.

Note

This feature is supported only by Flink compute engine Ververica Runtime (VVR) 11.0 and later.

scan.incremental.snapshot.backfill.skip

Specifies whether to skip the backfill during the snapshot read phase.

No

BOOLEAN

false

Valid values:

  • true: Skips the backfill during the snapshot read phase.

  • false (Default): Does not skip the backfill during the snapshot read phase.

If the backfill is skipped, changes to the table during the snapshot phase are read in a later incremental phase instead of being merged into the snapshot.

Important

Skipping the backfill may cause data inconsistency because changes that occur during the snapshot phase may be replayed. This only guarantees at-least-once semantics.

Note

This option is supported only by Flink compute engine Ververica Runtime (VVR) 11.1 and later.

treat-tinyint1-as-boolean.enabled

Specifies whether to treat the TINYINT(1) type as the Boolean type.

No

BOOLEAN

true

The valid values are:

  • true (Default): Treats the TINYINT(1) type as the Boolean type.

  • false: Does not treat the TINYINT(1) type as the Boolean type.

treat-timestamp-as-datetime-enabled

Specifies whether to process TIMESTAMP as DATETIME.

No

BOOLEAN

false

Valid values:

  • true: Processes MySQL TIMESTAMP data as DATETIME data and converts it to Flink CDC TIMESTAMP.

  • false (default): Converts MySQL TIMESTAMP data to Flink CDC TIMESTAMP_LTZ data.

MySQL TIMESTAMP type stores UTC time and is affected by time zones, while the MySQL DATETIME type stores literal time and is not affected by time zones.

Enabling this will convert MySQL TIMESTAMP type data to DATETIME type based on server-time-zone.

include-comments.enabled

Specifies whether to sync table and field comments.

No

BOOLEAN

false

Valid values:

  • true: Syncs table and field comments.

  • false (Default): Does not sync table and field comments.

Enabling this option increases the memory usage of the job.

scan.incremental.snapshot.unbounded-chunk-first.enabled

Specifies whether to distribute unbounded shards first during the snapshot read phase.

No

BOOLEAN

false

The valid values are as follows:

  • true: Prioritizes the distribution of unbounded shards during the snapshot read phase.

  • false (default): Does not prioritize the distribution of unbounded shards during the snapshot read phase.

This is an experimental feature. Enabling this feature reduces the risk of an out-of-memory (OOM) error when a TaskManager synchronizes the last shard during the snapshot phase. Add this option before the job starts for the first time.

Note

This feature is available only in Flink compute engine Ververica Runtime (VVR) 11.1 and later.

Type mapping

The following table shows the type mappings for data ingestion.

MySQL CDC field type

CDC field type

TINYINT(n)

TINYINT

SMALLINT

SMALLINT

TINYINT UNSIGNED

TINYINT UNSIGNED ZEROFILL

YEAR

INT

INT

MEDIUMINT

MEDIUMINT UNSIGNED

MEDIUMINT UNSIGNED ZEROFILL

SMALLINT UNSIGNED

SMALLINT UNSIGNED ZEROFILL

BIGINT

BIGINT

INT UNSIGNED

INT UNSIGNED ZEROFILL

BIGINT UNSIGNED

DECIMAL(20, 0)

BIGINT UNSIGNED ZEROFILL

SERIAL

FLOAT [UNSIGNED] [ZEROFILL]

FLOAT

DOUBLE [UNSIGNED] [ZEROFILL]

DOUBLE

DOUBLE PRECISION [UNSIGNED] [ZEROFILL]

REAL [UNSIGNED] [ZEROFILL]

NUMERIC(p, s) [UNSIGNED] [ZEROFILL] where p <= 38

DECIMAL(p, s)

DECIMAL(p, s) [UNSIGNED] [ZEROFILL] where p <= 38

FIXED(p, s) [UNSIGNED] [ZEROFILL] where p <= 38

BOOLEAN

BOOLEAN

BIT(1)

TINYINT(1)

DATE

DATE

TIME [(p)]

TIME [(p)]

DATETIME [(p)]

TIMESTAMP [(p)]

TIMESTAMP [(p)]

The Flink CDC type is determined by the treat-timestamp-as-datetime-enabled option value:

true:TIMESTAMP[(p)]

false:TIMESTAMP_LTZ[(p)]

CHAR(n)

CHAR(n)

VARCHAR(n)

VARCHAR(n)

BIT(n)

BINARY(⌈(n + 7) / 8⌉)

BINARY(n)

BINARY(n)

VARBINARY(N)

VARBINARY(N)

NUMERIC(p, s) [UNSIGNED] [ZEROFILL] where 38 < p <= 65

STRING

Note

MySQL supports a decimal precision up to 65. Flink limits decimal precision to 38. If a decimal column has a precision greater than 38, map it to a string to prevent precision loss.

DECIMAL(p, s) [UNSIGNED] [ZEROFILL] where 38 < p <= 65

FIXED(p, s) [UNSIGNED] [ZEROFILL] where 38 < p <= 65

TINYTEXT

STRING

TEXT

MEDIUMTEXT

LONGTEXT

ENUM

JSON

STRING

Note

The JSON data type is transformed into a JSON-formatted string in Flink.

GEOMETRY

STRING

Note

MySQL spatial data types are transformed into strings with a fixed JSON format. For more information, see MySQL spatial data type mapping.

POINT

LINESTRING

POLYGON

MULTIPOINT

MULTILINESTRING

MULTIPOLYGON

GEOMETRYCOLLECTION

TINYBLOB

BYTES

Note

For MySQL BLOB data types, the maximum supported length is 2,147,483,647 (2**31 - 1).

BLOB

MEDIUMBLOB

LONGBLOB

Examples

  • CDC source table

    CREATE TEMPORARY TABLE mysqlcdc_source (
       order_id INT,
       order_date TIMESTAMP(0),
       customer_name STRING,
       price DECIMAL(10, 5),
       product_id INT,
       order_status BOOLEAN,
       PRIMARY KEY(order_id) NOT ENFORCED
    ) WITH (
      'connector' = 'mysql',
      'hostname' = '<yourHostname>',
      'port' = '3306',
      'username' = '<yourUsername>',
      'password' = '<yourPassword>',
      'database-name' = '<yourDatabaseName>',
      'table-name' = '<yourTableName>'
    );
    
    CREATE TEMPORARY TABLE blackhole_sink(
      order_id INT,
      customer_name STRING
    ) WITH (
      'connector' = 'blackhole'
    );
    
    INSERT INTO blackhole_sink
    SELECT order_id, customer_name FROM mysqlcdc_source;
  • Dimension table

    CREATE TEMPORARY TABLE datagen_source(
      a INT,
      b BIGINT,
      c STRING,
      `proctime` AS PROCTIME()
    ) WITH (
      'connector' = 'datagen'
    );
    
    CREATE TEMPORARY TABLE mysql_dim (
      a INT,
      b VARCHAR,
      c VARCHAR
    ) WITH (
      'connector' = 'mysql',
      'hostname' = '<yourHostname>',
      'port' = '3306',
      'username' = '<yourUsername>',
      'password' = '<yourPassword>',
      'database-name' = '<yourDatabaseName>',
      'table-name' = '<yourTableName>'
    );
    
    CREATE TEMPORARY TABLE blackhole_sink(
      a INT,
      b STRING
    ) WITH (
      'connector' = 'blackhole'
    );
    
    INSERT INTO blackhole_sink
    SELECT T.a, H.b
    FROM datagen_source AS T JOIN mysql_dim FOR SYSTEM_TIME AS OF T.`proctime` AS H ON T.a = H.a;
  • Sink table

    CREATE TEMPORARY TABLE datagen_source (
      `name` VARCHAR,
      `age` INT
    ) WITH (
      'connector' = 'datagen'
    );
    
    CREATE TEMPORARY TABLE mysql_sink (
      `name` VARCHAR,
      `age` INT
    ) WITH (
      'connector' = 'mysql',
      'hostname' = '<yourHostname>',
      'port' = '3306',
      'username' = '<yourUsername>',
      'password' = '<yourPassword>',
      'database-name' = '<yourDatabaseName>',
      'table-name' = '<yourTableName>'
    );
    
    INSERT INTO mysql_sink
    SELECT * FROM datagen_source;
  • Data ingestion source

    source:
      type: mysql
      name: MySQL Source
      hostname: ${mysql.hostname}
      port: ${mysql.port}
      username: ${mysql.username}
      password: ${mysql.password}
      tables: ${mysql.source.table}
      server-id: 7601-7604
    
    sink:
      type: values
      name: Values Sink
      print.enabled: true
      sink.print.logger: true

MySQL CDC source tables

  • How it works

    When the MySQL CDC source table starts, it scans the entire table and splits it into multiple chunks based on the primary key, recording the current binlog position. It then uses the incremental snapshot algorithm to read the data from each chunk one by one by using SELECT statements. The job periodically performs checkpoints to record the completed chunks. If a failover occurs, the job only needs to continue reading the unfinished chunks. After all chunks are read, it starts reading incremental change records from the previously recorded binlog position. The Flink job continues to perform periodic checkpoints to record the binlog position. If the job fails over, it resumes processing from the last recorded binlog position, thus achieving exactly-once semantics.

    For a more detailed explanation of the incremental snapshot algorithm, see MySQL CDC Connector.

  • Metadata

    Metadata is highly useful in scenarios where you merge sharded databases and tables. After merging, businesses often still need to identify the source database and table for each row of data. Metadata columns allow you to access this information. Therefore, you can easily merge multiple sharded tables into a single destination table by using metadata columns.

    The MySQL CDC Source supports metadata column syntax. You can access the following metadata through metadata columns.

    Metadata key

    Metadata type

    Description

    database_name

    STRING NOT NULL

    The name of the database that contains the row.

    table_name

    STRING NOT NULL

    The name of the table that contains the row.

    op_ts

    TIMESTAMP_LTZ(3) NOT NULL

    The time when the row was changed in the database. If the record is from the table's existing historical data instead of the Binlog, this value is always 0.

    op_type

    STRING NOT NULL

    The change type of the row.

    • +I: INSERT message

    • -D: DELETE message

    • -U: UPDATE_BEFORE message

    • +U: UPDATE_AFTER message

    Note

    Supported only in Ververica Runtime (VVR) 8.0.7 and later.

    query_log

    STRING NOT NULL

    The MySQL query log record corresponding to the row read.

    Note

    To log queries, MySQL requires enabling binlog_rows_query_log_events.

    The following example shows how to merge multiple orders tables from different sharded databases in a MySQL instance and synchronize them to a holo_orders table in Hologres.

    CREATE TEMPORARY TABLE mysql_orders (
      db_name STRING METADATA FROM 'database_name' VIRTUAL,  -- Read the database name.
      table_name STRING METADATA  FROM 'table_name' VIRTUAL, -- Read the table name.
      operation_ts TIMESTAMP_LTZ(3) METADATA FROM 'op_ts' VIRTUAL, -- Read the change timestamp.
      op_type STRING METADATA FROM 'op_type' VIRTUAL, -- Read the change type.
      order_id INT,
      order_date TIMESTAMP(0),
      customer_name STRING,
      price DECIMAL(10, 5),
      product_id INT,
      order_status BOOLEAN,
      PRIMARY KEY(order_id) NOT ENFORCED
    ) WITH (
      'connector' = 'mysql-cdc',
      'hostname' = 'localhost',
      'port' = '3306',
      'username' = 'flinkuser',
      'password' = 'flinkpw',
      'database-name' = 'mydb_.*', -- Match multiple sharded databases using a regular expression.
      'table-name' = 'orders_.*'   -- Match multiple sharded tables using a regular expression.
    );
    
    INSERT INTO holo_orders SELECT * FROM mysql_orders;

    Based on the code above, if you set the scan.read-changelog-as-append-only.enabled option to true in the WITH clause, the output varies based on the primary key settings of the downstream table:

    • If the primary key of the downstream table is order_id, the output includes only the last change for each primary key in the source table. For example, if the last change for a primary key was a delete operation, you see a record in the downstream table with the same primary key and an op_type of -D.

    • If the primary key of the downstream table is a composite of order_id, operation_ts, and op_type, the output includes the complete change history for each primary key in the source table.

  • Regular expression support

    The MySQL CDC source table supports using regular expressions in the table name or database name to match multiple tables or databases. The following example shows how to specify multiple tables by using a regular expression.

    CREATE TABLE products (
      db_name STRING METADATA FROM 'database_name' VIRTUAL,
      table_name STRING METADATA  FROM 'table_name' VIRTUAL,
      operation_ts TIMESTAMP_LTZ(3) METADATA FROM 'op_ts' VIRTUAL,
      order_id INT,
      order_date TIMESTAMP(0),
      customer_name STRING,
      price DECIMAL(10, 5),
      product_id INT,
      order_status BOOLEAN,
      PRIMARY KEY(order_id) NOT ENFORCED
    ) WITH (
      'connector' = 'mysql-cdc',
      'hostname' = 'localhost',
      'port' = '3306',
      'username' = 'root',
      'password' = '123456',
      'database-name' = '(^(test).*|^(tpc).*|txc|.*[p$]|t{2})', -- Match multiple databases using a regular expression.
      'table-name' = '(t[5-8]|tt)' -- Match multiple tables using a regular expression.
    );

    Explanation of the regular expressions in the preceding example:

    • ^(test).* is a prefix match example. This expression can match database names that start with test, such as test1 or test2.

    • .*[p$] is a suffix match example. This expression can match database names that end with p, such as cdcp or edcp.

    • txc is a specific match. It can match a specific database name, such as txc.

    When matching a fully qualified table name, MySQL CDC uses both the database name and table name to uniquely identify a table. It uses the pattern database-name.table-name for matching. For example, the pattern (^(test).*|^(tpc).*|txc|.*[p$]|t{2}).(t[ 5-8]|tt) can match tables such as txc.tt and test2.test5 in the database.

    Important

    In SQL job configurations, the table-name and database-name options do not support using a comma (,) to specify multiple tables or databases.

    • To match multiple tables or use multiple regular expressions, connect them with a vertical bar (|) and enclose them in parentheses. For example, to read the user and product tables, you can configure table-name as (user|product).

    • If a regular expression contains a comma, you must rewrite it using the vertical bar (|) operator. For example, the regular expression mytable_\d{1, 2} needs to be rewritten as the equivalent (mytable_\d{1}|mytable_\d{2}) to avoid using a comma.

  • Concurrency control

    The MySQL connector supports reading full data with multiple concurrencies, which improves data loading efficiency. When combined with Autopilot in the Realtime Compute for Apache Flink console, it can automatically scale in during the incremental phase after the multi-concurrency reading is complete, thereby saving computing resources.

    In the Realtime Compute development console, you can set the job's parallelism on the Resource Configuration page in either Basic mode or Expert mode. The differences are as follows:

    • The parallelism set in Basic mode is the global parallelism for the entire job.基础模式

    • Expert mode allows you to set the parallelism for a specific VERTEX as needed.vertex并发

    For more information about resource configuration, see Configure a job deployment.

    Important

    Whether you use Basic mode or Expert mode, the server-id range declared in the table must be greater than or equal to the job's parallelism. For example, if the server-id range is 5404-5412, there are 8 unique server IDs, so the job can have a maximum of 8 concurrencies. Different jobs for the same MySQL instance must have non-overlapping server-id ranges, meaning each job must explicitly configure a different server-id.

  • Autopilot auto-scaling

    The full data phase accumulates a large amount of historical data. To improve reading efficiency, historical data is typically read concurrently. In the incremental binlog phase, however, because the volume of binlog data is small and a global order must be maintained, a single concurrency is usually sufficient. Autopilot can automatically balance performance and resources to meet these different requirements of the full and incremental phases.

    Autopilot monitors the traffic of each task in the MySQL CDC Source. When the job enters the binlog phase, if only one task is responsible for reading the binlog and the other tasks are idle, Autopilot will automatically reduce the CU count and parallelism of the Source. To enable Autopilot, set the Autopilot mode to Active on the job's Operations and Maintenance page.

    Note

    The minimum trigger interval for reducing parallelism is 24 hours by default. For more information on Autopilot options and details, see Configure Autopilot.

  • Startup modes

    You can use the scan.startup.mode option to specify the startup mode for the MySQL CDC source table. The options include:

    • initial (default): Performs a full read of the database table upon the first startup, and then switches to incremental mode to read the binlog.

    • earliest-offset: Skips the snapshot phase and starts reading from the earliest available binlog position.

    • latest-offset: Skips the snapshot phase and starts reading from the end of the binlog. In this mode, the source table can only read data changes that occur after the job starts.

    • specific-offset: Skips the snapshot phase and starts reading from a specified binlog position. The position can be specified by the binlog filename and position, or by using a GTID set.

    • timestamp: Skips the snapshot phase and starts reading binlog events from a specified timestamp.

    Example:

    CREATE TABLE mysql_source (...) WITH (
        'connector' = 'mysql-cdc',
        'scan.startup.mode' = 'earliest-offset', -- Start from the earliest offset.
        'scan.startup.mode' = 'latest-offset', -- Start from the latest offset.
        'scan.startup.mode' = 'specific-offset', -- Start from a specific offset.
        'scan.startup.mode' = 'timestamp', -- Start from a specific timestamp.
        'scan.startup.specific-offset.file' = 'mysql-bin.000003', -- Specify the Binlog file name for the specific-offset mode.
        'scan.startup.specific-offset.pos' = '4', -- Specify the Binlog position for the specific-offset mode.
        'scan.startup.specific-offset.gtid-set' = '24DA167-0C0C-11E8-8442-00059A3C7B00:1-19', -- Specify the GTID set for the specific-offset mode.
        'scan.startup.timestamp-millis' = '1667232000000' -- Specify the startup timestamp for the timestamp mode.
        ...
    )
    Important
    • The MySQL source logs the current position at the INFO level during a checkpoint. The log prefix is Binlog offset on checkpoint {checkpoint-id}. This log can help you restart the job from a specific checkpoint position.

    • If the schema of the table being read has changed in the past, starting from the earliest-offset, a specific-offset, or a timestamp may cause errors. This is because the Debezium reader internally stores the latest schema, and older data with a mismatched schema cannot be parsed correctly.

  • Keyless CDC source tables

    • To use a keyless table, you must set scan.incremental.snapshot.chunk.key-column and can only choose a non-nullable column.

    • The processing semantics of a keyless CDC source table are determined by the behavior of the column specified in scan.incremental.snapshot.chunk.key-column:

      • If the specified column is not updated, exactly-once semantics can be guaranteed.

      • If the specified column is updated, only at-least-once semantics can be guaranteed. However, you can achieve data correctness by combining it with a downstream system, specifying a downstream primary key, and using idempotent operations.

  • Read backup logs from ApsaraDB RDS for MySQL

    The MySQL CDC source table supports reading backup logs from Alibaba Cloud ApsaraDB RDS for MySQL. This is particularly useful in scenarios where the full snapshot phase takes a long time, causing local binlog files to be automatically cleaned up, while automatically or manually uploaded backup files still exist.

    Example:

    CREATE TABLE mysql_source (...) WITH (
        'connector' = 'mysql-cdc',
        'rds.region-id' = 'cn-beijing',
        'rds.access-key-id' = 'xxxxxxxxx', 
        'rds.access-key-secret' = 'xxxxxxxxx', 
        'rds.db-instance-id' = 'rm-xxxxxxxxxxxxxxxxx', 
        'rds.main-db-id' = '12345678',
        'rds.download.timeout' = '60 s'
        ...
    )
  • Enable CDC source reuse

    When a single job has multiple MySQL CDC source tables, each source table starts a corresponding binlog client. If there are many source tables and they all read from the same MySQL instance, it can put significant pressure on the database. For more information, see MySQL CDC FAQ.

    VVR 8.0.7+ versions support MySQL CDC source reuse. Different CDC source tables can be merged if all their configuration options, except for the database, table name, and server-id, are identical. After enabling source reuse, VVR will merge as many compatible MySQL CDC source tables as possible within the same job.

    Procedure

    1. Enable the source reuse feature in your SQL job draft using the SET command:

      SET 'table.optimizer.source-merge.enabled' = 'true';
    2. Start the job without states. After enabling source reuse for an existing job, you must perform a stateless restart. This is because source reuse changes the job topology, and restarting from the old job state may fail or lead to data loss. If source reuse occurs, a MergetableSourceScan operator will show.

    Important
    • After you enable CDC source reuse, do not set pipeline.operator-chaining to false. Disabling operator chaining adds serialization and deserialization overhead. The more sources are merged, the greater the overhead becomes.

    • In VVR 8.0.7, disabling operator chaining causes a serialization issue.

Accelerate binlog reading

When you use the MySQL connector as a source table or a data ingestion source, it parses binlog files to generate various change messages during the incremental phase. The binlog file records all table changes in a binary format. You can accelerate binlog file parsing in the following ways.

  • Enable parsing filter configuration

    • Use the scan.only.deserialize.captured.tables.changelog.enabled option to parse change events only for the specified tables.

  • Optimize Debezium options

    debezium.max.queue.size: 162580
    debezium.max.batch.size: 40960
    debezium.poll.interval.ms: 50
    • debezium.max.queue.size: The maximum number of records that the blocking queue can hold. When Debezium reads an event stream from the database, it places the events in a blocking queue before writing them downstream. The default value is 8192.

    • debezium.max.batch.size: The maximum number of events that the connector processes in each iteration. The default value is 2048.

    • debezium.poll.interval.ms: The number of milliseconds the connector should wait before requesting new change events. The default value is 1000 milliseconds, or 1 second.

Example:

CREATE TABLE mysql_source (...) WITH (
    'connector' = 'mysql-cdc',
    -- Debezium configuration
    'debezium.max.queue.size' = '162580',
    'debezium.max.batch.size' = '40960',
    'debezium.poll.interval.ms' = '50',
    -- Enable parsing filter
    'scan.only.deserialize.captured.tables.changelog.enabled' = 'true',  -- Parses change events only for specified tables.
    ...
)
source:
  type: mysql
  name: MySQL Source
  hostname: ${mysql.hostname}
  port: ${mysql.port}
  username: ${mysql.username}
  password: ${mysql.password}
  tables: ${mysql.source.table}
  server-id: 7601-7604
  # Debezium configuration
  debezium.max.queue.size: 162580
  debezium.max.batch.size: 40960
  debezium.poll.interval.ms: 50
  # Enable parsing filter
  scan.only.deserialize.captured.tables.changelog.enabled: true

The enterprise version of MySQL CDC has a binlog consumption capacity of 85 MB/s, which is about twice that of the open-source community version. If the binlog generation speed exceeds 85 MB/s (equivalent to one 512 MB file every 6 seconds), the Flink job's latency will continue to increase. The processing latency will gradually decrease after the binlog generation speed slows down. When the binlog file contains large transactions, processing latency may temporarily increase and will decrease after the transaction's log is read.

MySQL CDC DataStream API

Important

To read and write data by using the DataStream API, you must use the corresponding DataStream connector to connect to Flink. For information on how to set up the DataStream connector, see Integrate and use connectors in DataStream programs.

The following examples show how to create a DataStream API program and use MySqlSource, including the required pom dependencies.

import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import com.ververica.cdc.debezium.JsonDebeziumDeserializationSchema;
import com.ververica.cdc.connectors.mysql.source.MySqlSource;
public class MySqlSourceExample {
  public static void main(String[] args) throws Exception {
    MySqlSource<String> mySqlSource = MySqlSource.<String>builder()
        .hostname("yourHostname")
        .port(yourPort)
        .databaseList("yourDatabaseName") // Set the database to capture.
        .tableList("yourDatabaseName.yourTableName") // Set the table to capture.
        .username("yourUsername")
        .password("yourPassword")
        .deserializer(new JsonDebeziumDeserializationSchema()) // Converts a SourceRecord to a JSON string.
        .build();
    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    // Enable checkpointing.
    env.enableCheckpointing(3000);
    env
      .fromSource(mySqlSource, WatermarkStrategy.noWatermarks(), "MySQL Source")
      // Set 4 parallel source tasks.
      .setParallelism(4)
      .print().setParallelism(1); // Use a parallelism of 1 for the sink to maintain message order.
    env.execute("Print MySQL Snapshot + Binary Log");
  }
}
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-core</artifactId>
    <version>${flink.version}</version>
    <scope>provided</scope>
</dependency>
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-streaming-java</artifactId>
    <version>${flink.version}</version>
    <scope>provided</scope>
</dependency>
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-connector-base</artifactId>
    <version>${flink.version}</version>
    <scope>provided</scope>
</dependency>
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-table-common</artifactId>
    <version>${flink.version}</version>
    <scope>provided</scope>
</dependency>
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-clients</artifactId>
    <version>${flink.version}</version>
    <scope>provided</scope>
</dependency>
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-table-api-java-bridge</artifactId>
    <version>${flink.version}</version>
    <scope>provided</scope>
</dependency>
<dependency>
    <groupId>com.alibaba.ververica</groupId>
    <artifactId>ververica-connector-mysql</artifactId>
    <version>${vvr.version}</version>
</dependency>

When you build MySqlSource, you can specify the following parameters in your code:

Parameter

Description

hostname

The IP address or hostname of the MySQL database.

port

The port number of the MySQL database service.

databaseList

The name of the MySQL database.

Note

The database name supports regular expressions to read data from multiple databases. Use .* to match all databases.

username

The username for the MySQL database service.

password

The password for the MySQL database service.

deserializer

A deserializer that deserializes records of the SourceRecord type to a specified type. The parameter can be set to one of the following values:

  • RowDataDebeziumDeserializeSchema: Converts a SourceRecord to RowData, which is the internal data structure for Flink Table or SQL.

  • JsonDebeziumDeserializationSchema: Converts a SourceRecord to a string in JSON format.

You must specify the following parameters in your pom dependencies:

${vvr.version}

The version of the Realtime Compute for Apache Flink engine. For example, 1.17-vvr-8.0.4-3.

Note

Use the version number displayed on Maven. We periodically release hotfix versions, and these updates might not be announced through other channels.

${flink.version}

The Apache Flink version. For example: 1.17.2.

Important

Ensure that your Apache Flink version matches the engine version of Realtime Compute for Apache Flink to avoid incompatibility issues during job runtime. For more information about the version mapping, see Engine.

FAQ

For information about issues you might encounter when using a CDC source table, see CDC issues.