All Products
Search
Document Center

Realtime Compute for Apache Flink:MySQL

Last Updated:Mar 25, 2026

Learn how to use the MySQL connector.

Overview

The MySQL connector supports all databases compatible with the MySQL protocol, including ApsaraDB RDS for MySQL, PolarDB for MySQL, OceanBase (MySQL compatible mode), and self-managed MySQL databases.

Important

To use the MySQL connector for reading data from OceanBase, you must enable and configure binlog on the OceanBase instance. Refer to the Binary logging operations for setup instructions.

Note: This feature is currently in public preview. Please evaluate it thoroughly for your use case before deploying to production.

The MySQL connector supports the following features.

Category

Details

Supported types

  • SQL source, sink, and dimension (lookup)

  • Flink CDC source

  • DataStream source

Execution mode

Streaming

Data format

Not applicable

Metrics

Metrics

  • Source table

    • currentFetchEventTimeLag: The latency (in milliseconds) between the event generation time and the time the record is fetched by the source operator.

      Note: This metric is only active during the binary log (CDC) phase; it is 0 during the initial snapshot phase.

    • currentEmitEventTimeLag: The latency (in milliseconds) between the event generation time and the time the record is emitted by the source operator.

      Note: This metric is only active during the binary log (CDC) phase; it is 0 during the initial snapshot phase.

    • sourceIdleTime: The time elapsed (in milliseconds) since the source operator last emitted a record.

  • Dimension table and sink table: None

Note

For more information, see Monitoring metrics.

API types

SQL, Flink CDC, and DataStream

Update/delete sink table data

Supported

Benefits

The MySQL connector provides a unified stream of database changes by performing an initial snapshot followed by a seamless transition to binlog consumption. This architecture ensures Exactly-Once processing, preventing data loss or duplication even in the event of job failures. For more information, see Understanding MySQL source. Features:

  • Unified batch and streaming: Processes both snapshot and incremental data in a single pipeline, eliminating the need for separate workflows.

  • Incremental snapshot algorithm: Enables lockless reads during the snapshot phase, ensuring no impact on production database performance.

  • Horizontal scalability: Supports concurrent snapshot reading, allowing you to parallelize the initial data load.

  • Elastic resource management: Automatically scales down resources after transitioning from the snapshot phase to the incremental binlog phase.

  • Checkpoint-based recovery: Provides stateful recovery during both snapshot and streaming phases, improving overall pipeline stability.

  • Optimized performance: Features parallel parsing of binlog files to minimize end-to-end latency.

  • Cloud integration: Supports reading from binary logs on ApsaraDB RDS for MySQL.

Prerequisites

Before you use a MySQL CDC source table, complete the steps in Configure a MySQL database.

ApsaraDB RDS for MySQL

  1. Connectivity & permissions

  2. Database configuration

    • Version: MySQL 5.6, 5.7, or 8.0.x.

    • Binlog: Ensure binlog is enabled.

    • Binlog format: Set it to ROW (the default.)

    • Binlog row image: Set binlog_row_image to FULL.

    • Transaction compression: Ensure Binary Log Transaction Compression is disabled (applies to MySQL 8.0.20+).

  3. Database objects

PolarDB for MySQL

  1. Connectivity & permissions

  2. Database configuration

    • Version: MySQL 5.6, 5.7, or 8.0.x.

    • Binlog: Ensure binlog is enabled.

    • Binlog format: Set it to ROW (the default.)

    • Binlog row image: Set binlog_row_image to FULL.

    • Transaction compression: Ensure Binary Log Transaction Compression is disabled (applies to MySQL 8.0.20+).

  3. Database objects

Self-managed MySQL

  1. Connectivity & permissions

    • Network connectivity: Ensure network connectivity between your database and the Flink cluster. See How do I use the network detection feature? to verify.

    • IP whitelist: Configure the database whitelist to allow traffic from the Flink cluster. See Add a security group rule.

    • Permissions: A MySQL user with SELECT, SHOW DATABASES, REPLICATION SLAVE, and REPLICATION CLIENT privileges.

  2. Database configuration

    • Version: MySQL 5.6, 5.7, or 8.0.x.

    • Binlog: Ensure binlog is enabled.

    • Binlog format: Set it to ROW (the default is STATEMENT.)

    • Binlog row image: Set binlog_row_image to FULL.

    • Transaction compression: Ensure Binary Log Transaction Compression is disabled (applies to MySQL 8.0.20+).

  3. Database objects

Limitations

General

  • Watermarks: MySQL CDC source tables do not support watermarks. For windowed aggregations, use non-windowed aggregation patterns.

  • Schema evolution: In CTAS and CDAS jobs, only specific schema changes are supported. Refer to CREATE TABLE AS (CTAS) for details.

  • Binary log transaction compression: This feature is not supported and must be disabled on your MySQL instance.

ApsaraDB RDS for MySQL

  • Do not point the connector to read-only or standby instances; the short binary log retention period on these instances leads to log expiration and job failure.

  • Primary-secondary switchover: To prevent data loss during switchovers, ensure slave_preserve_commit_order is enabled on the instance.

PolarDB for MySQL

For versions 1.0.19 and earlier, reading from a multi-master cluster is not supported due to potential table ID conflicts and schema parsing failures.

Open-source MySQL

  • If parallel replication is enabled (slave_parallel_workers > 1), you must set slave_preserve_commit_order = ON.

  • Risk: Failing to preserve commit order can cause the transaction commit sequence on the replica to deviate from the primary, leading to data loss when the Flink job resumes from a checkpoint.

  • Alternative: If you cannot enable slave_preserve_commit_order, set slave_parallel_workers = 1.

Usage notes

  • Source: Specify a unique server ID for each source.

    Purpose of server ID

    Each MySQL CDC data source must have a unique server ID. If multiple data sources share the same server ID and cannot reuse connections, binary log offsets may become inconsistent, leading to missing or duplicated data.

    Server ID configuration for different scenarios

    You can specify the server ID in DDL statements, but we recommend using dynamic hints instead of DDL parameters.

    • Parallelism = 1 or incremental snapshots disabled

      ## Specify a server ID when incremental snapshots are disabled or degree of parallelism is 1.
      SELECT * FROM source_table /*+ OPTIONS('server-id'='123456') */ ;
    • Parallelism > 1 and incremental snapshots enabled

      ## Specify a range of server IDs. Ensure the number of available IDs in the range is at least equal to the degree of parallelism. For example, if the degree of parallelism is 3:
      SELECT * FROM source_table /*+ OPTIONS('server-id'='123456-123458') */ ;
    • Data synchronization using CTAS

      When you synchronize data by using CTAS, CDC data sources with identical configurations are automatically reused. In this case, you can assign the same server ID to multiple CDC data sources. For more information, see Example 4: Multiple CTAS statements.

    • Multiple non-CTAS source tables that cannot be reused

      If your job contains multiple MySQL CDC source tables and does not use CTAS, data sources cannot be reused. Assign a unique server ID to each CDC source table. Similarly, if incremental snapshots are enabled and the parallelism is greater than 1, specify a server ID range.

      select * from 
        source_table1 /*+ OPTIONS('server-id'='123456-123457') */
      left join 
        source_table2 /*+ OPTIONS('server-id'='123458-123459') */
      on source_table1.id=source_table2.id;
  • Sink tables

    • Do not declare auto-increment primary keys in the DDL. MySQL fills them automatically.

    • At least one non-primary-key field must be declared. Otherwise, an error occurs.

    • NOT ENFORCED means Flink does not validate primary keys. You must ensure primary key correctness. For more information, see Validity check.

  • Dimension tables

    To accelerate queries using indexes, the JOIN condition fields must match the index definition order (leftmost prefix rule). For example, if the index is (a, b, c), the JOIN condition should be ON t.a = x AND t.b = y.

    Flink-generated SQL may be rewritten by the optimizer, which can prevent index usage. To verify index usage, check the execution plan (EXPLAIN) or slow query logs in MySQL.

SQL

Use the MySQL connector in SQL jobs as a source, dimension, or sink table.

Syntax

CREATE TEMPORARY TABLE mysqlcdc_source (
   order_id INT,
   order_date TIMESTAMP(0),
   customer_name STRING,
   price DECIMAL(10, 5),
   product_id INT,
   order_status BOOLEAN,
   PRIMARY KEY(order_id) NOT ENFORCED
) WITH (
  'connector' = 'mysql',
  'hostname' = '<yourHostname>',
  'port' = '3306',
  'username' = '<yourUsername>',
  'password' = '<yourPassword>',
  'database-name' = '<yourDatabaseName>',
  'table-name' = '<yourTableName>'
);
Note

Sink behavior and write semantics

  • The MySQL connector converts incoming records into SQL statements based on the target table's schema:

    • Tables without primary keys: Executes standard INSERT statements: INSERT INTO table_name (...) VALUES (...);.

    • Tables with primary keys: Executes UPSERT operations using INSERT INTO table_name (...) VALUES (...) ON DUPLICATE KEY UPDATE column1 = VALUES(column1), ...;.

Unique index conflicts

If your physical table contains unique index constraints other than the primary key, inserting records with different primary keys but identical unique index values may cause data loss due to constraint violations.

Auto-increment columns

If your target table uses an AUTO_INCREMENT primary key, do not include this column in your Flink DDL. MySQL will generate the values automatically during the write. While the connector handles INSERT and DELETE operations for tables with auto-increment keys, it does not support updates to the auto-increment column itself.

Connector options

  • General

    Option

    Description

    Required

    Data type

    Default value

    Notes

    connector

    The connector to use.

    Yes

    STRING

    -

    Use mysql-cdc or mysql for sources; use mysql for sinks and lookups.

    hostname

    The IP address or hostname of the database.

    Yes

    STRING

    -

    We recommend using a VPC endpoint. If your Flink workspace and MySQL database are in different networks, ensure cross-VPC or public connectivity is configured. For more information, see Storage management and operations and How do I access the Internet from a fully managed Flink cluster?.

    username

    The database username.

    Yes

    STRING

    -

    -

    password

    The database password.

    Yes

    STRING

    -

    -

    database-name

    The database name.

    Yes

    STRING

    -

    Regex matching:

    • Supports regex for source tables.

    • Do not use ^ or $ in your regex patterns. See the note for table-name for details.

    table-name

    The table name. Supports regex for source tables.

    Yes

    STRING

    None

    Regex matching:

    • To read multiple databases or tables, use regular expressions.

    • Avoid anchors: Do not use ^ or $ in your regex patterns.

    • Full-path resolution: The connector concatenates database-name and table-name with \\. (or . for VVR 8.0.1 and earlier) to form a fully qualified name for matching (e.g., db_name.table_name).

    • Optimization: When reading multiple tables, combine them into a single CTAS job rather than launching individual jobs to reduce the number of binlog listeners. For details, see Execute multiple CTAS statements in a single job.

    port

    The port number of the MySQL database service.

    No

    INTEGER

    3306

  • Source-specific

    Option

    Description

    Required

    Data type

    Default value

    Notes

    server-id

    Unique ID for the database client. Use a range (e.g., 5400-6400) to support concurrent reading.

    No

    STRING

    Random

    Best practice: Assigning a different ID for each job accessing the same database.

    For more information, see Using server ID.

    scan.incremental.snapshot.enabled

    Enables incremental snapshot.

    No

    BOOLEAN

    true

    Incremental snapshots are enabled by default. An incremental snapshot is a new mechanism for reading snapshots. Compared to traditional snapshots, incremental snapshots offer several advantages:

    • Concurrent snapshot reading.

    • Checkpointing at the chunk level during snapshot reading.

    • No global read lock (FLUSH TABLES WITH READ LOCK) required during snapshot reading.

    If you want the source to support concurrent reading, each reader needs a unique server ID. Therefore, the server-id must be a range like 5400-6400, and the range size must be at least equal to the parallelism.

    Note

    This configuration is removed in VVR 11.1 and later.

    scan.incremental.snapshot.chunk.size

    Number of rows per read chunk. Balance for throughput vs. memory.

    No

    INTEGER

    8096

    scan.snapshot.fetch.size

    The maximum number of records to fetch per read when scanning full table data.

    No

    INTEGER

    1024

    scan.startup.mode

    Initial data position: initiallatest-offsetearliest-offsetspecific-offset, or timestamp.

    No

    STRING

    initial

    See Startup modes for details.

    Important

    When using earliest-offset, specific-offset, or timestamp, ensure the table schema remains unchanged between the specified binary log position and job startup. Schema changes may cause errors.

    scan.startup.specific-offset.file

    The binary log filename for the specified startup offset.

    No

    STRING

    -

    When using this configuration, scan.startup.mode must be set to specific-offset. Example filename: mysql-bin.000003.

    scan.startup.specific-offset.pos

    The offset within the specified binary log file for the startup position.

    No

    INTEGER

    -

    When using this configuration, scan.startup.mode must be set to specific-offset.

    scan.startup.specific-offset.gtid-set

    The GTID set for the startup position.

    No

    STRING

    -

    When using this configuration, scan.startup.mode must be set to specific-offset. Example GTID set: 24DA167-0C0C-11E8-8442-00059A3C7B00:1-19.

    scan.startup.timestamp-millis

    The startup timestamp in milliseconds.

    No

    LONG

    -

    When using this configuration, scan.startup.mode must be set to timestamp. Timestamp units are milliseconds.

    Important

    When using a timestamp, MySQL CDC attempts to read the initial event of each binary log file to determine its timestamp and locate the corresponding file. Ensure the specified timestamp corresponds to a binary log file that exists and is readable in the database.

    server-time-zone

    Sets the session time zone (e.g., Asia/Shanghai) for temporal type conversion.

    No

    STRING

    Local

    For more information, see Debezium temporal values.

    debezium.min.row.count.to.stream.results

    When the table row count exceeds this value, use batch reading mode.

    No

    INTEGER

    1000

    Flink reads data from the MySQL source table as follows:

    • Snapshot mode: Loads the entire table into memory. Fast but memory-intensive. Large tables risk OOM errors.

    • Batch mode: Reads data in batches. Memory-efficient but slower for large tables.

    connect.timeout

    Max wait time for a connection.

    No

    DURATION

    30s

    connect.max-retries

    Max retries after a connection failure.

    No

    INTEGER

    3

    connection.pool.size

    Size of the connection pool, used for reuses and reduces connections.

    No

    INTEGER

    20

    jdbc.properties.*

    Custom connection parameters for the JDBC URL.

    No

    STRING

    -

    You can pass custom connection parameters. For example, to disable SSL, set 'jdbc.properties.useSSL' = 'false'.

    For supported connection parameters, see MySQL Configuration Properties.

    debezium.*

    Custom Debezium parameters for reading binary logs.

    No

    STRING

    -

    You can pass custom Debezium parameters. For example, use 'debezium.event.deserialization.failure.handling.mode'='ignore' to define how to handle parsing errors.

    heartbeat.interval

    Heartbeat frequency to advance offsets and prevent log expiration.

    No

    DURATION

    30s

    Useful for tables with infrequent updates. Without heartbeats, the binary log offset for such tables may stall, causing expiration and job failure.

    scan.incremental.snapshot.chunk.key-column

    Column to split chunks.

    No

    STRING

    -

    • Tables without primary keys: Required. The selected column must be non-nullable.

    • Tables with primary keys: Optional. Supports only a primary key column.

    rds.region-id

    The region ID of the ApsaraDB RDS for MySQL instance.

    No

    STRING

    -

    Required when reading archived logs from OSS. For region IDs, see Regions and zones.

    rds.access-key-id

    The AccessKey ID for the account with access to ApsaraDB RDS for MySQL.

    No

    STRING

    -

    Required when reading archived logs from OSS. For more information, see How do I view my AccessKey ID and AccessKey secret?.

    Important

    Security recommendation: Use variables instead of hardcoding your credentials. For more information, see Manage variables.

    rds.access-key-secret

    The AccessKey secret for the account with access to ApsaraDB RDS for MySQL.

    No

    STRING

    -

    rds.db-instance-id

    The ApsaraDB RDS for MySQL instance ID.

    No

    STRING

    -

    Required when reading archived logs from OSS.

    rds.main-db-id

    The primary ApsaraDB RDS for MySQL database ID.

    No

    STRING

    -

    rds.download.timeout

    Max wait time for downloading a single archived log from OSS.

    No

    DURATION

    60s

    rds.endpoint

    The endpoint for accessing OSS binary log information.

    No

    STRING

    -

    scan.incremental.close-idle-reader.enabled

    Closes idle readers after snapshot reading.

    No

    BOOLEAN

    false

    • Requires VVR 8.0.1+.

    • Dependency: execution.checkpointing.checkpoints-after-tasks-finish.enabled is true.

    scan.read-changelog-as-append-only.enabled

    Transforms all events to INSERTs.

    No

    BOOLEAN

    false

    Use this carefully, as it destroys the "Delete/Update" semantics of the original database.

    Note

    Requires VVR 8.0.8+.

    scan.only.deserialize.captured.tables.changelog.enabled

    Deserializes only events for tables defined in the query.

    No

    BOOLEAN

    true (VVR 11.1+)

    Valid values:

    • true: Deserialize only change data for target tables to speed up binary log reading.

    • false (default): Deserialize change data for all tables.

    Note
    • Supported only in VVR 8.0.7+.

    • In VVR 8.0.8 and earlier, use debezium.scan.only.deserialize.captured.tables.changelog.enable.

    scan.parse.online.schema.changes.enabled

    Parses RDS lockless DDL changes (Experimental).

    No

    BOOLEAN

    false

    Valid values:

    • true

    • false

    Before performing online lockless changes, take a savepoint for recovery.

    Note

    Requires VVR 11.1+.

    scan.incremental.snapshot.backfill.skip

    Skip backfill during snapshot reading.

    No

    BOOLEAN

    false

    If backfill is skipped, changes made during the snapshot phase are consumed in the incremental phase.

    Important

    Skipping backfill may cause data inconsistency due to change replays, ensuring at-least-once semantics.

    Note

    Requires VVR 11.1+.

    scan.incremental.snapshot.unbounded-chunk-first.enabled

    Distributes unbounded chunks first during snapshot reading. (Experimental)

    No

    BOOELEAN

    false

    Enabling this reduces the risk of TaskManager OOMs during the final chunk sync in the snapshot phase. We recommend enabling it before the first job startup.

    Note

    Requires VVR 11.1+.

    binlog.session.network.timeout

    The network timeout for binary log connections.

    No

    DURATION

    10m

    Setting this to 0s uses the MySQL server's default timeout.

    Note

    Requires VVR 11.5+.

    scan.rate-limit.records-per-second

    Limits throughput to prevent source overloading.

    No

    LONG

    -

    The numRecordsOutPerSecond metric reflects the total records emitted per second. Adjust this option based on that metric.

    During snapshot reading, a best practice is to use this option and a smaller chunk size (scan.incremental.snapshot.chunk.size).

    Note

    Requires VVR 11.5+.

    scan.binlog.tolerate.gtid-holes

    Allows ignoring gaps in GTID sequences to keep the job running.

    No

    BOOLEAN

    false

    Before enabling this option, ensure the job's startup offset has not expired. If the job starts from a cleaned or expired GTID offset, VVR silently skips missing logs, resulting in data loss.

    Note

    Requires VVR 11.6+.

  • Lookup-specific (dimension table)

    Option

    Description

    Required

    Data type

    Default value

    Notes

    url

    The JDBC URL (jdbc:mysql://<host>:<port>/<db>).

    No

    STRING

    -

    The URL format is: jdbc:mysql://<endpoint>:<port>/<database-name>.

    lookup.max-retries

    Max retries for failed lookup requests.

    No

    INTEGER

    3

    Requires VVR 6.0.7+.

    lookup.cache.strategy

    Caching policy: None, LRU, and ALL. For descriptions, see Background information.

    No

    STRING

    None

    Dependency: The LRU cache policy relies on the lookup.cache.max-rows option.

    lookup.cache.max-rows

    Max number of rows in the cache.

    No

    INTEGER

    100000

    Required for LRU

    lookup.cache.ttl

    Cache expiration (TTL) for LRU or reload interval for ALL.

    No

    DURATION

    10s

    The lookup.cache.ttl setting depends on lookup.cache.strategy:

    • lookup.cache.strategy is None: lookup.cache.ttl is optional and indicates no TTL.

    • lookup.cache.strategy is LRU: lookup.cache.ttl is the cache TTL and indicates no TTL.

    • lookup.cache.strategy is ALL: lookup.cache.ttl is cache reload interval. Default is no reload.

    Specify time in formats like 1min or 10s.

    lookup.max-join-rows

    Max result rows returned per join query.

    No

    INTEGER

    1024

    lookup.filter-push-down.enabled

    Pushes filters to the database to reduce data loaded into memory.

    No

    BOOLEAN

    false

    Requires VVR 8.0.7+.

    Important

    Filter pushdown is not supported for MySQL source tables. If a table is used as both a source and dimension table, and filter pushdown is enabled for the dimension table, explicitly set this configuration to false using SQL hints when using it as a source table to prevent failures.

  • Sink-specific

    Option

    Description

    Required

    Data type

    Default value

    Notes

    url

    The JDBC URL (jdbc:mysql://<host>:<port>/<db>).

    No

    STRING

    -

    The URL format is: jdbc:mysql://<endpoint>:<port>/<database-name>.

    sink.max-retries

    Max retries for failed write operations.

    No

    INTEGER

    3

    sink.buffer-flush.batch-size

    Number of records per batch write.

    No

    INTEGER

    4096

    sink.buffer-flush.max-rows

    Max rows cached in memory (requires primary key).

    No

    INTEGER

    10000

    This parameter takes effect only when a primary key is specified.

    sink.buffer-flush.interval

    Max time to hold buffered data before flushing.

    No

    DURATION

    1s

    None.

    sink.ignore-delete

    Whether to ignore DELETE and UPDATE_BEFORE messages.

    No

    BOOLEAN

    false

    In high-concurrency environments, multiple tasks updating the same table may cause race conditions when processing DELETE or UPDATE_BEFORE messages.

    Setting this to true ignores these signals and processes only INSERT and UPDATE_AFTER events. Use this to prevent accidental nullification of records during concurrent updates.

    Note
    • UPDATE_BEFORE is part of Flink's retraction mechanism, used to "retract" old values during updates.

    • When this option is true, all DELETE and UPDATE_BEFORE records are skipped. Only INSERT and UPDATE_AFTER records are processed.

    sink.ignore-null-when-update

    If true, skips updating fields that contain null values.

    No

    BOOLEAN

    e

    Valid values:

    • true: Skip updating the field. Supported only when the Flink table has a primary key. When set to true:

      • In VVR 8.0.6 and earlier, batch writes are not supported for sink tables.

      • In VVR 8.0.7 and later, batch writes are supported for sink tables.

        Batch writes improve write efficiency and throughput but introduce latency and OOM risks. Balance these trade-offs based on your business needs.

    • false: Update the field to null.

    Note

    Supported only in VVR 8.0.5 and later.

Type mappings

  • Source

    MySQL

    Flink

    TINYINT

    TINYINT

    SMALLINT

    SMALLINT

    TINYINT UNSIGNED

    TINYINT UNSIGNED ZEROFILL

    INT

    INT

    MEDIUMINT

    SMALLINT UNSIGNED

    SMALLINT UNSIGNED ZEROFILL

    BIGINT

    BIGINT

    INT UNSIGNED

    INT UNSIGNED ZEROFILL

    MEDIUMINT UNSIGNED

    MEDIUMINT UNSIGNED ZEROFILL

    BIGINT UNSIGNED

    DECIMAL(20, 0)

    BIGINT UNSIGNED ZEROFILL

    SERIAL

    FLOAT [UNSIGNED] [ZEROFILL]

    FLOAT

    DOUBLE [UNSIGNED] [ZEROFILL]

    DOUBLE

    DOUBLE PRECISION [UNSIGNED] [ZEROFILL]

    REAL [UNSIGNED] [ZEROFILL]

    NUMERIC(p, s) [UNSIGNED] [ZEROFILL]

    DECIMAL(p, s)

    DECIMAL(p, s) [UNSIGNED] [ZEROFILL]

    BOOLEAN

    BOOLEAN

    TINYINT(1)

    DATE

    DATE

    TIME [(p)]

    TIME [(p)] [WITHOUT TIME ZONE]

    DATETIME [(p)]

    TIMESTAMP [(p)] [WITHOUT TIME ZONE]

    TIMESTAMP [(p)]

    TIMESTAMP [(p)]

    TIMESTAMP [(p)] WITH LOCAL TIME ZONE

    CHAR(n)

    STRING

    VARCHAR(n)

    TEXT

    BINARY

    BYTES

    VARBINARY

    BLOB

    Important

    Do not use the TINYINT(1) type in MySQL to store values other than 0 and 1. When property-version is set to 0, the MySQL connector maps TINYINT(1) to the Flink BOOLEAN type by default. This can cause inaccurate data. To use the TINYINT(1) type to store values other than 0 and 1, see the catalog.table.treat-tinyint1-as-boolean configuration parameter.

  • Look and sink

    MySQL

    Flink

    TINYINT

    TINYINT

    SMALLINT

    SMALLINT

    TINYINT UNSIGNED

    INT

    INT

    MEDIUMINT

    SMALLINT UNSIGNED

    BIGINT

    BIGINT

    INT UNSIGNED

    BIGINT UNSIGNED

    DECIMAL(20, 0)

    FLOAT

    FLOAT

    DOUBLE

    DOUBLE

    DOUBLE PRECISION

    NUMERIC(p, s)

    DECIMAL(p, s)

    Note

    p <= 38 is required.

    DECIMAL(p, s)

    BOOLEAN

    BOOLEAN

    TINYINT(1)

    DATE

    DATE

    TIME [(p)]

    TIME [(p)] [WITHOUT TIME ZONE]

    DATETIME [(p)]

    TIMESTAMP [(p)] [WITHOUT TIME ZONE]

    TIMESTAMP [(p)]

    CHAR(n)

    CHAR(n)

    VARCHAR(n)

    VARCHAR(n)

    BIT(n)

    BINARY(⌈n/8⌉)

    BINARY(n)

    BINARY(n)

    VARBINARY(N)

    VARBINARY(N)

    TINYTEXT

    STRING

    TEXT

    MEDIUMTEXT

    LONGTEXT

    TINYBLOB

    BYTES

    Important

    Flink supports MySQL BLOB records up to 2,147,483,647 bytes (231 − 1).

    BLOB

    MEDIUMBLOB

    LONGBLOB

Flink CDC

The MySQL connector can be used as a data source for Flink CDC.

Syntax

source:
   type: mysql
   name: MySQL Source
   hostname: localhost
   port: 3306
   username: <username>
   password: <password>
   tables: adb.\.*, bdb.user_table_[0-9]+, [app|web].order_\.*
   server-id: 5401-5404

sink:
  type: xxx

Configuration options

Option

Description

Required

Data type

Default value

Remarks

type

The connector type.

Yes

STRING

-

Must be mysql.

name

The name of the data source.

No

STRING

-

-

hostname

MySQL host or IP.

Yes

STRING

-

VPC endpoint recommended.

Note

If your MySQL database and Flink workspace are not in the same VPC, establish a cross-VPC network connection or access the database over the public network. For more information, see Workspace and namespace FAQ.

username

Database username.

Yes

STRING

-

-

password

Database password.

Yes

STRING

-

-

tables

The MySQL tables to sync.

Yes

STRING

-

  • Regex is supported.

  • Separate multiple regular expressions with commas.

Note
  • Avoid anchors: Do not use ^ (start-of-string) or $ (end-of-string) anchors. These will invalidate the internal database-to-table path matching logic. Example: Change ^db.user_[0-9]+$ to db.user_[0-9]+.

  • Separator: The connector internally concatenates the database and table names with a dot (.).

  • Escaping dots: Since a literal dot is used as a separator, you must escape it with a backslash (\\.) if you intend to match a literal dot in your database or table name. Example: To match db0.table_name, use db0\\.table_name.

tables.exclude

The MySQL tables to exclude.

No

STRING

-

port

Database port.

No

INTEGER

3306

-

schema-change.enabled

Whether to propagate schema evolution.

No

BOOLEAN

true

-

server-id

Unique ID or range (e.g., 5400-5408) for the client.

No

STRING

Random

Best practices:

  • Assign a unique ID (or ID range) to every Flink job connecting to the same database.

  • Concurrent reads: When incremental reading is enabled, the job may use multiple concurrent readers. In this case, specify an ID range (e.g., 5400-5408) to ensure each reader is assigned a unique ID.

Default behavior: If left blank, Flink automatically generates a random ID between 5400 and 6400.

jdbc.properties.*

Custom JDBC connection parameters.

No

STRING

-

For more information, see MySQL Configuration Properties.

debezium.*

Custom Debezium configuration properties.

No

STRING

-

-

scan.incremental.snapshot.chunk.size

Number of rows per snapshot chunk.

No

INTEGER

8096

Tables are partitioned into chunks for reading. Each chunk is cached in memory until fully processed.

Trade-offs:

  • Smaller chunk size: Increases the number of chunks. While this improves checkpointing granularity for fault recovery, it may increase overhead and reduce overall throughput.

  • Larger chunk size: Improves throughput but increases memory pressure. If the chunk size is set too high, it may cause OutOfMemory (OOM) errors in the TaskManagers.

scan.snapshot.fetch.size

Records fetched per read during full scan.

No

INTEGER

1024

-

scan.startup.mode

The startup mode for consuming data.

No

STRING

initial

See Startup modes for details.

Important
  • Schema consistency: For earliest-offsetspecific-offset, and timestamp modes, the table schema must match the schema at the target binlog position. If the schema has evolved since that position, the job will fail.

  • Log retention: Ensure that the binlog file corresponding to your chosen startup position has not been purged by the MySQL server.

scan.startup.specific-offset.file

Binlog file name for specific-offset mode.

No

STRING

-

Dependency: This option relies on scan.startup.mode set to specific-offset.

Example: mysql-bin.000003.

scan.startup.specific-offset.pos

Binlog file offset for specific-offset mode.

No

INTEGER

-

Dependency: This option is valid only when scan.startup.mode is set to specific-offset.

scan.startup.specific-offset.gtid-set

GTID set for specific-offset mode.

No

STRING

-

Dependency: This option is valid only when scan.startup.mode is set to specific-offset.

Example: 24DA167-0C0C-11E8-8442-00059A3C7B00:1-19.

scan.startup.timestamp-millis

Start timestamp (ms) for timestamp mode.

No

LONG

-

Dependency: scan.startup.mode must timestamp.

Important

The MySQL CDC connector scans the initial event of each binlog file to locate the file matching your specified scan.startup.timestamp-millis. Ensure the required binlog files exist and have not been purged or archived by the database server.

server-time-zone

Database session time zone (e.g., Asia/Shanghai).

No

STRING

Local

Influences TIMESTAMP to STRING conversion.

See also: Debezium temporal types.

scan.startup.specific-offset.skip-events

Binlog events to skip in specific-offset mode.

No

INTEGER

-

Dependency: scan.startup.mode must be specific-offset.

scan.startup.specific-offset.skip-rows

Row changes to skip in specific-offset mode.

No

INTEGER

-

Dependency: scan.startup.mode must be specific-offset.

connect.timeout

Max wait time for a connection.

No

DURATION

30s

-

connect.max-retries

Max retries for connection failures.

No

INTEGER

3

-

connection.pool.size

Size of the JDBC connection pool, for connection reuse.

No

INTEGER

20

-

heartbeat.interval

Interval to emit heartbeats to advance offsets.

No

DURATION

30s

Prevents log expiration for inactive tables.

scan.incremental.snapshot.chunk.key-column

The chunk column.

No

STRING

-

  • Tables with primary key: Optional. Requires a PK column.

  • Tables without primary key: Required. Must be a non-nullable column.

rds.region-id

Region ID of the Apsara RDS for MySQL instance.

No

STRING

-

Required for OSS archived log consumption.

See Regions and zones.

rds.access-key-id

Alibaba Cloud AccessKey ID.

Required when you read archived logs from OSS.

STRING

-

Required for OSS archived log consumption.

See How do I view the AccessKey ID and AccessKey secret?

Important

To ensure security, use variables instead of hardcoding your credentials. For more information, see Manage variables.

rds.access-key-secret

Alibaba Cloud AccessKey Secret.

Required when you read archived logs from OSS.

STRING

-

rds.db-instance-id

Apsara RDS for MySQL instance ID.

Required when you read archived logs from OSS.

STRING

-

-

rds.main-db-id

Apsara RDS for MySQL primary database ID.

No

STRING

-

For details, see RDS for MySQL log backup.

rds.download.timeout

Timeout for downloading archived logs.

No

DURATION

60s

rds.endpoint

OSS service endpoint.

No

STRING

-

For details, see Endpoints.

rds.binlog-directory-prefix

Directory prefix for binlog files.

No

STRING

rds-binlog-

rds.use-intranet-link

Whether to use internal network for OSS logs.

No

BOOLEAN

true

rds.binlog-directories-parent-path

Parent path for binlog storage.

No

STRING

-

chunk-meta.group.size

Chunk metadata grouping size.

No

INTEGER

1000

If the metadata is larger than this value, it is passed in multiple parts.

chunk-key.even-distribution.factor.lower-bound

Lower bound for even chunk splitting.

No

DOUBLE

0.05

If the distribution factor is less than this value, uneven chunking occurs.

Chunk distribution factor = (MAX(chunk-key) - MIN(chunk-key) + 1) / Total number of data rows.

chunk-key.even-distribution.factor.upper-bound

Upper bound for even chunk splitting.

No

DOUBLE

1000.0

If the distribution factor is greater than this value, uneven chunking occurs.

Chunk distribution factor = (MAX(chunk-key) - MIN(chunk-key) + 1) / Total number of data rows.

scan.incremental.close-idle-reader.enabled

Closes idle readers after snapshot completion.

No

BOOLEAN

false

Dependency: execution.checkpointing.checkpoints-after-tasks-finish.enabled must true.

scan.only.deserialize.captured.tables.changelog.enabled

Deserializes only specified tables for efficiency.

No

BOOLEAN

  • false (VVR8.x.)

  • true (VVR 11.1+)

-

scan.parallel-deserialize-changelog.enabled

Enables multi-threaded binlog deserialization.

No

BOOLEAN

false

-

Note

Requires VVR 8.0.11+.

scan.parallel-deserialize-changelog.handler.size

Thread count for multi-threaded deserialization.

No

INTEGER

2

Note

Requires VVR 8.0.11+.

metadata-column.include-list

Metadata columns to expose.

No

STRING

-

Available metadata columns include op_ts, es_ts, query_log, file, and pos.

  • file: The name of the binary log file. During the snapshot phase, the value is "". During the incremental phase, it displays the actual log file name. (Supported in VVR 11.5 and later).

  • pos: The offset position within the binary log file. During the snapshot phase, the value is 0. During the incremental phase, it displays the actual offset. (Supported in VVR 11.5 and later).

  • es_ts: The transaction start time (event timestamp) in milliseconds. This is supported only for MySQL 8.0.x; do not include this column if you are using an earlier version.

  • op_ts: The operation timestamp in seconds.

Note
  • The MySQL connector does not support adding metadata columns for the database name, table name, or op_type. Directly use __data_event_type__ in a Transform expression to get the change data type, or use __schema_name__ and __table_name__ to get the database and table names.

  • Separate multiple metadata columns with commas.

scan.newly-added-table.enabled

Sync newly discovered tables at restart.

No

BOOLEAN

false

This takes effect when restarting from a checkpoint or savepoint.

scan.binlog.newly-added-table.enabled

Sync newly discovered tables during binlog phase.

No

BOOLEAN

false

Conflict with scan.newly-added-table.enabled.

scan.parse.online.schema.changes.enabled

Parse RDS lockless DDL events (Experimental).

No

BOOLEAN

false

Before you perform an online lockless change, create a savepoint of the job.

Note

Requires VVR 11.0+.

scan.incremental.snapshot.backfill.skip

Skip backfill during snapshot.

No

BOOLEAN

false

If backfill is skipped, changes to the table during the snapshot phase are read in the later incremental phase instead of being merged into the snapshot.

Important

Skipping backfill may cause data inconsistency because changes that occur during the snapshot phase might be replayed. Only at-least-once semantics are guaranteed.

Note

Requires VVR 11.1+.

treat-tinyint1-as-boolean.enabled

Maps TINYINT(1) to BOOLEAN.

No

BOOLEAN

true

-

treat-timestamp-as-datetime-enabled

Maps TIMESTAMP to DATETIME.

No

BOOLEAN

false

The TIMESTAMP type stores UTC time and is affected by the time zone, while the DATETIME type stores literal time and is not affected by the time zone.

If this option is enabled, the MySQL TIMESTAMP type data is converted to the DATETIME type based on the server-time-zone.

include-comments.enabled

Synchronizes table/column comments.

No

BOOELEAN

false

This option increases the memory usage of the job.

scan.incremental.snapshot.unbounded-chunk-first.enabled

Distribute unbounded chunks first (Experimental).

No

BOOELEAN

false

This option reduces OOM risk during snapshot phase. We recommend enabling this option before first-time startup.

Note

Requires VVR 11.1+.

binlog.session.network.timeout

Network timeout for binlog connection.

No

DURATION

10m

If this is set to 0s, the default timeout of the MySQL server is used.

Note

Requires VVR 11.5+.

scan.rate-limit.records-per-second

Limit records emitted per second.

No

LONG

-

Applies to both snapshot and binlog phases.

The numRecordsOutPerSecond metric of the source reflects the number of records that the entire data stream outputs per second. You can adjust this parameter based on this metric.

During the snapshot phase, you usually need to reduce the number of records read in each batch. Reduce the value of the scan.incremental.snapshot.chunk.size option.

Note

Requires VVR 11.5+.

include-binlog-meta.enable

Include original GTID/offset in message.

No

Boolean

false

This is suitable for original binlog synchronization scenarios, such as replacing existing canal synchronization.

Note

Requires VVR 11.6+.

scan.binlog.tolerate.gtid-holes

Ignores GTID gaps in sequence.

No

Boolean

false

Before enabling this option, ensure that the start offset of the job has not expired. If the job starts from a cleared or expired GTID offset, Flink skips the missing logs, which results in data loss.

Note

Requires VVR 11.6+.

Type mappings

The following table shows the data type mappings for data ingestion.

MySQL

Flink CDC

TINYINT(n)

TINYINT

SMALLINT

SMALLINT

TINYINT UNSIGNED

TINYINT UNSIGNED ZEROFILL

YEAR

INT

INT

MEDIUMINT

MEDIUMINT UNSIGNED

MEDIUMINT UNSIGNED ZEROFILL

SMALLINT UNSIGNED

SMALLINT UNSIGNED ZEROFILL

BIGINT

BIGINT

INT UNSIGNED

INT UNSIGNED ZEROFILL

BIGINT UNSIGNED

DECIMAL(20, 0)

BIGINT UNSIGNED ZEROFILL

SERIAL

FLOAT [UNSIGNED] [ZEROFILL]

FLOAT

DOUBLE [UNSIGNED] [ZEROFILL]

DOUBLE

DOUBLE PRECISION [UNSIGNED] [ZEROFILL]

REAL [UNSIGNED] [ZEROFILL]

NUMERIC(p, s) [UNSIGNED] [ZEROFILL] where p <= 38

DECIMAL(p, s)

DECIMAL(p, s) [UNSIGNED] [ZEROFILL] where p <= 38

FIXED(p, s) [UNSIGNED] [ZEROFILL] where p <= 38

BOOLEAN

BOOLEAN

BIT(1)

TINYINT(1)

DATE

DATE

TIME [(p)]

TIME [(p)]

DATETIME [(p)]

TIMESTAMP [(p)]

TIMESTAMP [(p)]

The mapping depends on the value of the treat-timestamp-as-datetime-enabled parameter:

true:TIMESTAMP[(p)]

false:TIMESTAMP_LTZ[(p)]

CHAR(n)

CHAR(n)

VARCHAR(n)

VARCHAR(n)

BIT(n)

BINARY(⌈(n + 7) / 8⌉)

BINARY(n)

BINARY(n)

VARBINARY(N)

VARBINARY(N)

NUMERIC(p, s) [UNSIGNED] [ZEROFILL] where 38 < p <= 65

STRING

Note

In MySQL, DECIMAL types support a precision of up to 65 digits. However, Flink SQL limits DECIMAL precision to 38. To prevent data truncation or loss of precision:

  • For DECIMAL columns with precision > 38: Map these columns to STRING in your Flink DDL.

  • For DECIMAL columns with precision ≤ 38: You may map them directly to Flink DECIMAL types.

DECIMAL(p, s) [UNSIGNED] [ZEROFILL] where 38 < p <= 65

FIXED(p, s) [UNSIGNED] [ZEROFILL] where 38 < p <= 65

TINYTEXT

STRING

TEXT

MEDIUMTEXT

LONGTEXT

ENUM

JSON

STRING

Note

The JSON data type is converted to a JSON-formatted string in Flink.

GEOMETRY

STRING

Note

Spatial data types in MySQL are converted to strings in a fixed JSON format. For more information, see MySQL spatial data type mapping.

POINT

LINESTRING

POLYGON

MULTIPOINT

MULTILINESTRING

MULTIPOLYGON

GEOMETRYCOLLECTION

TINYBLOB

BYTES

Note

For BLOB data types in MySQL, only blobs with a length no greater than 2,147,483,647 (2^31 - 1) are supported.

BLOB

MEDIUMBLOB

LONGBLOB

Examples

  • Source table

    CREATE TEMPORARY TABLE mysqlcdc_source (
       order_id INT,
       order_date TIMESTAMP(0),
       customer_name STRING,
       price DECIMAL(10, 5),
       product_id INT,
       order_status BOOLEAN,
       PRIMARY KEY(order_id) NOT ENFORCED
    ) WITH (
      'connector' = 'mysql',
      'hostname' = '<yourHostname>',
      'port' = '3306',
      'username' = '<yourUsername>',
      'password' = '<yourPassword>',
      'database-name' = '<yourDatabaseName>',
      'table-name' = '<yourTableName>'
    );
    
    CREATE TEMPORARY TABLE blackhole_sink(
      order_id INT,
      customer_name STRING
    ) WITH (
      'connector' = 'blackhole'
    );
    
    INSERT INTO blackhole_sink
    SELECT order_id, customer_name FROM mysqlcdc_source;
  • Dimension (lookup) table

    CREATE TEMPORARY TABLE datagen_source(
      a INT,
      b BIGINT,
      c STRING,
      `proctime` AS PROCTIME()
    ) WITH (
      'connector' = 'datagen'
    );
    
    CREATE TEMPORARY TABLE mysql_dim (
      a INT,
      b VARCHAR,
      c VARCHAR
    ) WITH (
      'connector' = 'mysql',
      'hostname' = '<yourHostname>',
      'port' = '3306',
      'username' = '<yourUsername>',
      'password' = '<yourPassword>',
      'database-name' = '<yourDatabaseName>',
      'table-name' = '<yourTableName>'
    );
    
    CREATE TEMPORARY TABLE blackhole_sink(
      a INT,
      b STRING
    ) WITH (
      'connector' = 'blackhole'
    );
    
    INSERT INTO blackhole_sink
    SELECT T.a, H.b
    FROM datagen_source AS T JOIN mysql_dim FOR SYSTEM_TIME AS OF T.`proctime` AS H ON T.a = H.a;
  • Sink table

    CREATE TEMPORARY TABLE datagen_source (
      `name` VARCHAR,
      `age` INT
    ) WITH (
      'connector' = 'datagen'
    );
    
    CREATE TEMPORARY TABLE mysql_sink (
      `name` VARCHAR,
      `age` INT
    ) WITH (
      'connector' = 'mysql',
      'hostname' = '<yourHostname>',
      'port' = '3306',
      'username' = '<yourUsername>',
      'password' = '<yourPassword>',
      'database-name' = '<yourDatabaseName>',
      'table-name' = '<yourTableName>'
    );
    
    INSERT INTO mysql_sink
    SELECT * FROM datagen_source;
  • Flink CDC source

    source:
      type: mysql
      name: MySQL Source
      hostname: ${mysql.hostname}
      port: ${mysql.port}
      username: ${mysql.username}
      password: ${mysql.password}
      tables: ${mysql.source.table}
      server-id: 7601-7604
    
    sink:
      type: values
      name: Values Sink
      print.enabled: true
      sink.print.logger: true

Understanding MySQL source

  • How it works

    The MySQL CDC source utilizes an incremental snapshot algorithm to provide a lockless, high-performance, and exactly-once streaming experience:

    1. Snapshot phase: The table is split into chunks based on the primary key. These are read sequentially in parallel. Checkpoints occur regularly, allowing the job to resume from specific chunks upon failure.

    2. Binlog phase: Once all chunks are processed, the connector transitions to reading binlogs from the offset recorded at the start of the snapshot phase.

    3. Failover: The job resumes from the last successful checkpoint (either a specific chunk or a binlog offset), ensuring exactly-once semantics.

      See also: MySQL CDC connector in Apache Flink documentation.

  • Metadata columns

    Metadata is essential when merging sharded tables into a single sink. Use the METADATA FROM syntax to access metadata columns:

    Metadata key

    Type

    Description

    database_name

    STRING NOT NULL

    The source database name.

    table_name

    STRING NOT NULL

    The source table name.

    op_ts

    TIMESTAMP_LTZ(3) NOT NULL

    The change timestamp (0 if historical snapshot data).

    op_type

    STRING NOT NULL

    Change type: +I-D+U (after), -U (before).

    Note

    Requires VVR 8.0.7+.

    query_log

    STRING NOT NULL

    The raw MySQL query (Requires binlog_rows_query_log_events).

    Note

    MySQL must have the binlog_rows_query_log_events parameter enabled to record query logs.

    Example: Merging sharded tables a single holo_orders Hologres sink

    CREATE TEMPORARY TABLE mysql_orders (
      db_name STRING METADATA FROM 'database_name' VIRTUAL,  
      table_name STRING METADATA  FROM 'table_name' VIRTUAL, 
      operation_ts TIMESTAMP_LTZ(3) METADATA FROM 'op_ts' VIRTUAL, -- Read change time.
      op_type STRING METADATA FROM 'op_type' VIRTUAL, -- Read change type.
      order_id INT,
      order_date TIMESTAMP(0),
      customer_name STRING,
      price DECIMAL(10, 5),
      product_id INT,
      order_status BOOLEAN,
      PRIMARY KEY(order_id) NOT ENFORCED
    ) WITH (
      'connector' = 'mysql-cdc',
      'hostname' = 'localhost',
      'port' = '3306',
      'username' = 'flinkuser',
      'password' = 'flinkpw',
      'database-name' = 'mydb_.*', -- Regex match multiple sharded databases.
      'table-name' = 'orders_.*'   -- Regex match multiple sharded tables.
    );
    
    INSERT INTO holo_orders SELECT * FROM mysql_orders;

    Additionally, if you set scan.read-changelog-as-append-only.enabled to true, the output varies based on the sink table's primary key configuration:

    • If the sink table's primary key is order_id, the output contains only the last change for each primary key from the source table. For a primary key whose last change was a delete operation, the sink table shows a record with the same primary key and an op_type of -D.

    • If the sink table's primary key is order_id, operation_ts, and op_type, the output contains the complete change history for each primary key from the source table.

  • Regular expression support

    The MySQL connector supports matching multiple tables or databases using regular expressions. Example:

    CREATE TABLE products (
      db_name STRING METADATA FROM 'database_name' VIRTUAL,
      table_name STRING METADATA  FROM 'table_name' VIRTUAL,
      operation_ts TIMESTAMP_LTZ(3) METADATA FROM 'op_ts' VIRTUAL,
      order_id INT,
      order_date TIMESTAMP(0),
      customer_name STRING,
      price DECIMAL(10, 5),
      product_id INT,
      order_status BOOLEAN,
      PRIMARY KEY(order_id) NOT ENFORCED
    ) WITH (
      'connector' = 'mysql-cdc',
      'hostname' = 'localhost',
      'port' = '3306',
      'username' = 'root',
      'password' = '123456',
      'database-name' = '(^(test).*|^(tpc).*|txc|.*[p$]|t{2})', -- Regex match multiple databases.
      'table-name' = '(t[5-8]|tt)' -- Regex match multiple tables.
    );

    Regular expression explanation:

    • ^(test).*: Prefix match. Matches database names starting with "test", such as test1 or test2.

    • .*[p$]: Suffix match. Matches database names ending with "p", such as cdcp or edcp.

    • txc: Exact match.

    The MySQL connector identifies tables using the fully qualified format: database-name.table-name.

    Important

    No comma support: The table-name and database-name options do not support comma-separated lists. Use the vertical bar (|) operator inside parentheses to specify multiple values.

    Escaping special characters: If a pattern contains a comma or other special regex characters (like {1,2}), rewrite them using the | operator to ensure compatibility.

  • Resource management

    • Parallelism: The server-id range must be greater than or equal to the job parallelism (e.g., range 5404-5412 supports up to 8 concurrent tasks). Each job must use a unique, non-overlapping server-id range.

    • Autopilot: When enabled, Flink automatically scales down resources after the snapshot phase completes, as the incremental binlog phase typically requires less parallelism than the initial snapshot load.

    In the Development Console, set the job's parallelism in basic or expert mode. The difference is as follows:

    • Basic mode: Parallelism applies to the job.基础模式

    • Expert mode: Vertext-specific parallelism is supported.vertex并发

    For details, see Configure job deployments.

  • Startup modes

    Use the scan.startup.mode configuration item to specify the startup mode for the MySQL CDC source table. Valid values:

    • initial: Performs an initial snapshot of the table, then transitions to reading binary logs.

    • latest-offset: Skips the snapshot; starts reading changes from the current end of the binary log.

    • earliest-offset: Skips the snapshot; starts reading from the earliest available binary log.

    • specific-offset: Skips the snapshot; starts from a user-defined position:

      • File/Pos: Use scan.startup.specific-offset.file and scan.startup.specific-offset.pos.

      • GTID: Use scan.startup.specific-offset.gtid-set.

    • timestamp: Skips the snapshot; starts from a specific time (in ms) provided by scan.startup.timestamp-millis.

    Example:

    CREATE TABLE mysql_source (...) WITH (
        'connector' = 'mysql-cdc',
        'scan.startup.mode' = 'earliest-offset', -- Start from the earliest offset.
        'scan.startup.mode' = 'latest-offset', -- Start from the latest offset.
        'scan.startup.mode' = 'specific-offset', -- Start from a specific offset.
        'scan.startup.mode' = 'timestamp', -- Start from a specific timestamp.
        'scan.startup.specific-offset.file' = 'mysql-bin.000003', -- Specify the binary log filename for specific-offset mode.
        'scan.startup.specific-offset.pos' = '4', -- Specify the binary log position for specific-offset mode.
        'scan.startup.specific-offset.gtid-set' = '24DA167-0C0C-11E8-8442-00059A3C7B00:1-19', -- Specify the GTID set for specific-offset mode.
        'scan.startup.timestamp-millis' = '1667232000000' -- Specify the startup timestamp for timestamp mode.
        ...
    )
    Important
    • The MySQL source prints the current offset at checkpoint time with INFO level logging. The log prefix is Binlog offset on checkpoint {checkpoint-id}. This log helps you start the job from a specific checkpoint offset.

    • If the table being read has undergone schema changes, starting from the earliest-offset, specific-offset, or timestamp modes may cause errors. This is because the Debezium reader internally maintains the latest table schema, and earlier data with mismatched schemas cannot be parsed correctly.

  • MySQL source tables without PK

    If your source table lacks a primary key, you must manually specify a chunk key column using the scan.incremental.snapshot.chunk.key-column option.

    • The processing semantics depend on whether the column specified in scan.incremental.snapshot.chunk.key-column:

    • Processing semantics:

      • Immutable chunk key: If the selected column is never updated, the connector guarantees exactly-once semantics.

      • Mutable chunk key: If the column is updated, the connector provides At-Least-Once semantics.

    • Best practice for data integrity: To ensure data consistency when using At-Least-Once semantics, define a primary key in your downstream system and ensure the sink operation is idempotent (e.g., using UPSERT or REPLACE logic) to handle potential duplicates.

  • Reading ApsaraDB RDS for MySQL backup logs

    The MySQL CDC source can read archived binary logs from Object Storage Service (OSS). This is particularly useful in two scenarios:

    • Slow snapshot phase: When a snapshot takes a long time and the MySQL instance purges its local binary logs before the snapshot phase completes.

    • Log retention: When local binary logs have been deleted, but the backup files uploaded to OSS are still available.

      Example:

      CREATE TABLE mysql_source (...) WITH (
          'connector' = 'mysql-cdc',
          'rds.region-id' = 'cn-beijing',
          'rds.access-key-id' = 'xxxxxxxxx', 
          'rds.access-key-secret' = 'xxxxxxxxx', 
          'rds.db-instance-id' = 'rm-xxxxxxxxxxxxxxxxx', 
          'rds.main-db-id' = '12345678',
          'rds.download.timeout' = '60s'
          ...
      )
  • Source reuse

    In a single job, each MySQL source table starts its own binlog client. When all source tables connect to the same instance, this increases the database load. For more information, see Flink CDC FAQ.

    Solution

    VVR 8.0.7 and later supports MySQL CDC Source reuse. Source reuse merges CDC source tables with compatible configurations. Tables are merged when their configurations are identical except for the database name, table name, and server-id.

    Procedure

    1. Use the SET command in your SQL job:

      SET 'table.optimizer.source-merge.enabled' = 'true';
      
      # (VVR 8.0.8 and 8.0.9) Additionally set this:
      SET 'sql-gateway.exec-plan.enabled' = 'false';
      VVR 11.1 and later have reuse enabled by default.
    2. Start the job stateless. Changing source reuse configurations changes the job topology, so the job must start without state. Otherwise, the job may fail to start or lose data. If sources are merged, you will see a MergetableSourceScan node.

    Important
    • After you enable reuse, do not disable operator chaining. Setting pipeline.operator-chaining to false increases data serialization and deserialization overhead. The more Sources merged, the greater the overhead.

    • In VVR 8.0.7, disabling operator chaining causes serialization issues.

Accelerate binlog reading

When the MySQL connector is used as a source, it parses binary logs to generate change events during the incremental phase. Binlog files record all table changes in binary format. Accelerate binary log file parsing using the following methods:

  1. Enable parsing filter configuration

    Use the configuration item scan.only.deserialize.captured.tables.changelog.enabled to parse change events for specified tables.

  2. Optimize Debezium options

    debezium.max.queue.size: 162580
    debezium.max.batch.size: 40960
    debezium.poll.interval.ms: 50
    • debezium.max.queue.size: Max records the blocking queue can hold. When Debezium reads an event stream from the database, it places events into a blocking queue before writing them downstream. The default value is 8192.

    • debezium.max.batch.size: Max events the connector processes in each iteration. The default value is 2048.

    • debezium.poll.interval.ms: The number of milliseconds the connector waits before requesting new change events. The default value is 1000 milliseconds (1 second).

Example:

CREATE TABLE mysql_source (...) WITH (
    'connector' = 'mysql-cdc',
    -- Debezium configuration
    'debezium.max.queue.size' = '162580',
    'debezium.max.batch.size' = '40960',
    'debezium.poll.interval.ms' = '50',
    -- Enable parsing filter
    'scan.only.deserialize.captured.tables.changelog.enabled' = 'true',  -- Only parse change events for specified tables.
    ...
)
source:
  type: mysql
  name: MySQL Source
  hostname: ${mysql.hostname}
  port: ${mysql.port}
  username: ${mysql.username}
  password: ${mysql.password}
  tables: ${mysql.source.table}
  server-id: 7601-7604
  # Debezium configuration
  debezium.max.queue.size: 162580
  debezium.max.batch.size: 40960
  debezium.poll.interval.ms: 50
  # Enable parsing filter
  scan.only.deserialize.captured.tables.changelog.enabled: true

The fully managed MySQL connector can consume binary logs at up to 85 MB/s, approximately twice the throughput of the Apache Flink connector for MySQL. If the binlog generation rate exceeds 85 MB/s (a 512 MB file every 6 seconds), job latency increases. Latency decreases when the generation rate drops. Large transactions may briefly increase processing latency.

DataStream API

Important

To read and write data using DataStream API, use the corresponding DataStream connector. For setup instructions, see Integrate DataStream connectors.

Create a DataStream API program and use MySqlSource. The following shows the code and POM dependency:

import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import com.ververica.cdc.debezium.JsonDebeziumDeserializationSchema;
import com.ververica.cdc.connectors.mysql.source.MySqlSource;
public class MySqlSourceExample {
  public static void main(String[] args) throws Exception {
    MySqlSource<String> mySqlSource = MySqlSource.<String>builder()
        .hostname("yourHostname")
        .port(yourPort)
        .databaseList("yourDatabaseName") // set captured database
        .tableList("yourDatabaseName.yourTableName") // set captured table
        .username("yourUsername")
        .password("yourPassword")
        .deserializer(new JsonDebeziumDeserializationSchema()) // converts SourceRecord to JSON String
        .build();
    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    // enable checkpoint
    env.enableCheckpointing(3000);
    env
      .fromSource(mySqlSource, WatermarkStrategy.noWatermarks(), "MySQL Source")
      // set 4 parallel source tasks
      .setParallelism(4)
      .print().setParallelism(1); // use parallelism 1 for sink to keep message ordering
    env.execute("Print MySQL Snapshot + Binlog");
  }
}
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-core</artifactId>
    <version>${flink.version}</version>
    <scope>provided</scope>
</dependency>
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-streaming-java</artifactId>
    <version>${flink.version}</version>
    <scope>provided</scope>
</dependency>
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-connector-base</artifactId>
    <version>${flink.version}</version>
    <scope>provided</scope>
</dependency>
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-table-common</artifactId>
    <version>${flink.version}</version>
    <scope>provided</scope>
</dependency>
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-clients</artifactId>
    <version>${flink.version}</version>
    <scope>provided</scope>
</dependency>
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-table-api-java-bridge</artifactId>
    <version>${flink.version}</version>
    <scope>provided</scope>
</dependency>
<dependency>
    <groupId>com.alibaba.ververica</groupId>
    <artifactId>ververica-connector-mysql</artifactId>
    <version>${vvr.version}</version>
</dependency>

When building MySqlSource, specify the following parameters:

Parameter

Description

hostname

The IP address or hostname.

port

The port number.

databaseList

The MySQL database name.

Note

The database name supports regular expressions to read data from multiple databases. You can use .* to match all databases.

username

The database username.

password

The database password.

deserializer

The deserializer converts SourceRecord type records to the specified type. Valid values:

  • RowDataDebeziumDeserializeSchema: Converts SourceRecord to Flink Table or SQL internal data structure RowData.

  • JsonDebeziumDeserializationSchema: Converts SourceRecord to a JSON-formatted String.

Required pom parameters:

Parameter

Description

${vvr.version}

The engine version of Realtime Compute for Apache Flink. Example: 1.17-vvr-8.0.4-3.

Note

Refer to the version displayed on Maven, as hotfix versions may be released periodically without other notifications.

${flink.version}

The Apache Flink version. Example: 1.17.2.

Important

Ensure ${flink.version} is consistent with ${vvr.version} to avoid incompatibility issues during job execution. For version mapping, see Engines.

Troubleshooting & FAQ

See Flink CDC FAQ.