All Products
Search
Document Center

Realtime Compute for Apache Flink:MySQL connector

Last Updated:Mar 13, 2024

This topic describes how to use the MySQL connector.

Background information

The MySQL connector supports all databases that are compatible with the MySQL protocol. The databases include ApsaraDB RDS for MySQL, PolarDB for MySQL, and self-managed MySQL databases.

Important

We recommend that you use the MySQL connector instead of the ApsaraDB RDS for MySQL connector. The "ApsaraDB RDS for MySQL connector" topic will be removed from the Connectors documentation.

The following table describes the capabilities supported by the MySQL connector.

Item

Description

Table type

Source table, dimension table, and result table

Running mode

Streaming mode

Data format

N/A

Metric

  • Metrics for source tables

    • currentFetchEventTimeLag: the interval from the time when data is generated to the time when the data is pulled to the source operator.

      The value of this metric is greater than 0 only in the binary log data reading phase. The value of this metric is fixed to 0 in the savepoint reading phase.

    • currentEmitEventTimeLag: the interval from the time when data is generated to the time when the data leaves the source operator.

      The value of this metric is greater than 0 only in the binary log data reading phase. The value of this metric is fixed to 0 in the savepoint reading phase.

    • sourceIdleTime: the period of time for which no new data is generated in the source table.

  • Metrics for dimension tables and result tables: none.

Note

For more information about the metrics and how to view the metrics, see Report metrics of fully managed Flink to other platforms.

API type

DataStream API and SQL API

Data update or deletion in a result table

Supported

Features

A MySQL Change Data Capture (CDC) source table is a streaming source table of MySQL databases. The MySQL CDC source connector reads full historical data from a MySQL database and then reads binary log data. This way, data accuracy is ensured. If an error occurs, the exactly-once semantics can be used to ensure data accuracy. The MySQL CDC source connector supports full data reading in multiple subtasks at the same time. You can use the incremental savepoint algorithm to perform lock-free reading and resumable uploads. For more information, see Basic concepts about MySQL CDC source tables.

The MySQL CDC source connector supports the following features:

  • Integrates stream processing and batch processing to support full and incremental data reading. This way, you do not need to separately implement stream processing and batch processing.

  • Supports full data reading in multiple subtasks at the same time. This way, data can be read in a more efficient manner.

  • Seamlessly switches between full and incremental data reading and supports automatic scale-in operations. This reduces the computing resources that are consumed.

  • Supports resumable uploads during full data reading. This way, data can be uploaded in a more stable manner.

  • Reads full data without locks. This way, online business is not affected.

Prerequisites

Limits

  • MySQL CDC source table

    • Only Realtime Compute for Apache Flink that uses Ververica Runtime (VVR) 4.0.8 or later supports lock-free reading and parallel reading.

    • You must select a VVR version based on the MySQL version. The following table describes the mappings between VVR and MySQL versions.

      You can run the select version() command to query the MySQL version.

      VVR version

      Supported MySQL version

      VVR 4.0.8 to VVR 4.0.10

      5.7

      8.0.X

      VVR 4.0.11 and later

      5.6.X

      5.7.X

      8.0.X

      Important

      The default value of the scan.incremental.snapshot.enabled parameter is true. This indicates that the incremental savepoint feature is enabled by default. This ensures that ApsaraDB RDS for MySQL 5.6.X runs as expected. You cannot disable the incremental savepoint feature for ApsaraDB RDS for MySQL 5.6.X. This limit is removed from ApsaraDB RDS for MySQL 6.0.8 and ApsaraDB RDS for MySQL 8.0.1. Therefore, you can disable the incremental savepoint feature for ApsaraDB RDS for MySQL 6.0.8 and ApsaraDB RDS for MySQL 8.0.1. We recommend that you do not disable the incremental savepoint feature. If you disable this feature, the MySQL database is locked and the online business processing performance may be affected.

    • MySQL CDC source tables do not support watermarks.

    • Only MySQL users who are granted specific permissions can use the MySQL CDC source connector to read full data and incremental data from a MySQL database. The permissions include SELECT, SHOW DATABASES, REPLICATION SLAVE, and REPLICATION CLIENT.

    • If you use the CREATE TABLE AS statement together with the CREATE DATABASE AS statement to perform data synchronization, specific schema changes of the MySQL CDC source table can be synchronized to the result table. For more information about the supported change types, see Synchronization policies for table schema changes. In other scenarios, the schema changes of the MySQL CDC source table cannot be synchronized to the result table.

    • MySQL CDC source tables do not support the synchronization of truncate operations.

    • We recommend that you do not read data from a secondary instance or a read-only secondary instance of ApsaraDB RDS for MySQL. By default, the binary log retention period of the secondary instance or read-only secondary instance of ApsaraDB RDS for MySQL is short. If the binary log file is deleted due to expiration, the deployment cannot consume the binary log data and an error occurs.

    • Data of Multi-master Cluster (Database/Table) Edition of PolarDB for MySQL of 1.0.19 or earlier cannot be read for the MySQL CDC source table. For more information about Multi-master Cluster (Database/Table) Edition, see Overview. Duplicate table IDs may be generated in binary log data of Multi-master Cluster (Database/Table) Edition of PolarDB for MySQL of 1.0.19 or earlier. This leads to incorrect mapping of the schema of the MySQL CDC source table. As a result, an error occurs when the binary log data is parsed. This issue is resolved in PolarDB for MySQL of a version later than 1.0.19.

  • Dimension table and result table

    • Only Realtime Compute for Apache Flink that uses VVR 4.0.11 or later supports the MySQL connector.

    • The at-least-once semantics can be used. If an ApsaraDB RDS for MySQL result table contains a primary key, idempotence can be used to ensure data correctness.

Precautions

  • MySQL CDC source table

    • You must specify a unique server ID for each MySQL CDC data source.

      • Use of server IDs

        Each client that synchronizes data from a database has a unique server ID. The MySQL server maintains network connections and binary log file positions based on the server ID. If a large number of clients that have different server IDs connect to the MySQL server, the CPU utilization of the MySQL server may significantly increase. This affects the stability of online business.

        If multiple MySQL CDC data sources use the same server ID and the data sources cannot be merged as one source operator, the checkpoints for binary log positions may be out of order. As a result, the amount of data that is read may be greater than or less than expected. A conflict between server IDs may also occur. For more information, see FAQ about upstream and downstream storage. To prevent the preceding issue, we recommend that you specify a unique server ID for each MySQL CDC data source.

      • Configuration methods of server IDs

        You can specify server IDs in DDL statements or by using dynamic hints.

        We recommend that you use dynamic hints to configure server IDs instead of configuring server IDs in DDL statements. For more information about dynamic hints, see Dynamic hints.

      • Configuration of server IDs in different scenarios

        • Scenario 1: The incremental savepoint algorithm is disabled or the degree of parallelism is 1.

          If the incremental savepoint algorithm is disabled or the degree of parallelism is 1, you can specify a server ID.

          SELECT * FROM source_table /*+ OPTIONS('server-id'='123456') */ ;
        • Scenario 2: The incremental savepoint algorithm is enabled and the degree of parallelism is greater than 1.

          If the incremental savepoint algorithm is enabled and the degree of parallelism is greater than 1, you must specify a server ID range. Make sure that the number of available server IDs in the range is not less than the degree of parallelism. The following statement shows the configuration of server IDs when the degree of parallelism is 3.

          SELECT * FROM source_table /*+ OPTIONS('server-id'='123456-123458') */ ;
        • Scenario 3: The CREATE TABLE AS statement is executed together with other statements to synchronize data.

          If you execute the CREATE TABLE AS statement together with other statements to synchronize data and the configurations of multiple MySQL CDC data sources are the same, the data sources are automatically merged as one source operator. In this case, you can specify the same server ID for multiple CDC data sources. For more information, see Example 4: execution of multiple CREATE TABLE AS statements.

        • Scenario 4: Multiple MySQL CDC source tables are used in a deployment in which the CREATE TABLE AS statement is not executed.

          If multiple MySQL CDC source tables are used in a deployment and the CREATE TABLE AS statement is not executed, the data sources cannot be merged as one source operator. You must specify a unique server ID for each MySQL CDC source table. Similarly, if the incremental savepoint algorithm is enabled and the degree of parallelism is greater than 1, you must specify the server ID range.

          select * from 
            source_table1 /*+ OPTIONS('server-id'='123456-123457') */
          left join 
            source_table2 /*+ OPTIONS('server-id'='123458-123459') */
          on source_table1.id=source_table2.id;
    • Only deployments that use VVR 4.0.8 or later support lock-free reading, parallel reading, and resumable uploads during full data reading.

      If your deployment uses a VVR version that is earlier than 4.0.8, you must grant the RELOAD permission to the MySQL user to obtain the global read lock. This ensures data reading consistency. The global read lock may block write operations for a few seconds. This may affect online business.

      If your deployment uses a VVR version that is earlier than 4.0.8, checkpoints cannot be generated during full data reading. If your deployment fails in the process, the deployment reads full data again. This reduces the stability of data reading performance. Therefore, we recommend that you update the version of VVR that is used by your deployment to 4.0.8 or later.

  • Result table

    • ApsaraDB RDS for MySQL supports the auto-increment primary key. Therefore, you do not need to declare the auto-increment field in the DDL statement. For example, if you use ID as an auto-increment field, you do not need to declare the ID field in the DDL statement. When a row of output data is written to the ApsaraDB RDS for MySQL database, a value is automatically filled for the auto-increment field.

    • You must declare at least one non-primary key in the DDL statement. Otherwise, an error is returned.

    • NOT ENFORCED indicates that Flink does not perform mandatory verification on the primary key. You must ensure the correctness and integrity of the primary key.

      Flink does not fully support mandatory verification. Flink considers that the primary key is correct based on the assumption that the nullability of columns is aligned with the columns in the primary key. For more information, see Validity Check.

  • Dimension table

    If you want to perform queries by index when the MySQL connector is used for a dimension table, you must follow the leftmost prefix matching principle of MySQL to sort the columns that are specified in the JOIN operation. However, this cannot ensure that indexes are used for queries. Specific filter conditions may be rewritten or modified due to SQL optimization. As a result, the filter conditions that are obtained by the connector may not hit indexes. To determine whether the connector queries data by index, check the specific SELECT statements that are executed on the related database.

Syntax

CREATE TABLE mysqlcdc_source (
   order_id INT,
   order_date TIMESTAMP(0),
   customer_name STRING,
   price DECIMAL(10, 5),
   product_id INT,
   order_status BOOLEAN,
   PRIMARY KEY(order_id) NOT ENFORCED
) WITH (
  'connector' = 'mysql',
  'hostname' = '<yourHostname>',
  'port' = '3306',
  'username' = '<yourUsername>',
  'password' = '<yourPassword>',
  'database-name' = '<yourDatabaseName>',
  'table-name' = '<yourTableName>'
);

Note
  • When the connector writes data to a result table, the connector concatenates each received data record into an SQL statement and execute the SQL statement based on the following rules:

    • If the result table does not have a primary key, the INSERT INTO table_name (column1, column2, ...) VALUES (value1, value2, ...); statement is executed.

    • If the result table has a primary key, the INSERT INTO table_name (column1, column2, ...) VALUES (value1, value2, ...) ON DUPLICATE KEY UPDATE column1 = VALUES(column1), column2 = VALUES(column2), ...; statement is executed.

  • If an auto-increment primary key is defined in the MySQL database, the auto-increment field cannot be declared in Flink DDL statements. During data writing, the database automatically configures the auto-increment field. The connector can be used to only write and delete data that has auto-increment fields and cannot be used to update such data.

Parameters in the WITH clause

  • Common parameters

    Parameter

    Description

    Required

    Data type

    Default value

    Remarks

    connector

    The type of the table.

    Yes

    STRING

    No default value

    When you use the MySQL connector for a source table, you can set this parameter to mysql-cdc or mysql. The two values are equivalent. When you use the MySQL connector for a dimension table or a result table, you must set this parameter to mysql.

    hostname

    The IP address or hostname that is used to access the MySQL database.

    Yes

    STRING

    N/A

    We recommend that you enter the IP address of a virtual private cloud (VPC).

    Note

    If the MySQL database and fully managed Flink are not deployed in the same VPC, you must establish a connection between the two VPCs or enable fully managed Flink to access the MySQL database over the Internet. For more information, see How does fully managed Flink access a service across VPCs? and How does fully managed Flink access the Internet?

    username

    The username that is used to access the MySQL database.

    Yes

    STRING

    No default value

    N/A.

    password

    The password that is used to access the MySQL database.

    Yes

    STRING

    No default value

    N/A.

    database-name

    The name of the MySQL database that you want to access.

    Yes

    STRING

    No default value

    • You can set this parameter to a regular expression to read data from multiple databases.

    • When you use a regular expression, we recommend that you do not use the caret symbol (^) and dollar sign ($) to match the beginning and end of the regular expression. For more information about the reason, see the Remarks column of table-name.

    table-name

    The name of the MySQL table.

    Yes

    STRING

    No default value

    • If you want to read data from multiple tables, you can set this parameter to a regular expression.

      When you use the MySQL connector to read data from multiple MySQL tables, you can commit multiple CREATE TABLE AS statements as one deployment to avoid enabling multiple Binlog listeners. This improves performance and efficiency. For more information, see Example 4: execution of multiple CREATE TABLE AS statements.

    • When you use a regular expression, we recommend that you do not use the caret symbol (^) and dollar sign ($) to match the beginning and end of the regular expression. For more information about the reason, see the following note.

    Note

    When the MySQL CDC source table matches a table name in a regular expression, the character string (\\.) is used to concatenate the values of database-name and table-name that you configure as a full-path regular expression. In Realtime Compute for Apache Flink whose VVR version is earlier than 8.0.1, a period (.) is used instead of the character string (\\.). Then, the regular expression and the fully qualified names of tables in the MySQL database are used to perform regular expression matching.

    For example, if you configure database-name'='db_. and table-name'='tb_.+, the connector uses the regular expression db_.*\\.tb_.+ to match the fully qualified names of tables to determine the tables that need to be read. In Realtime Compute for Apache Flink whose VVR version is earlier than 8.0.1, the regular expression is db_.*.tb_.+.

    port

    The port that is used to access the MySQL database.

    No

    INTEGER

    3306

    N/A.

  • Parameters only for source tables

    Parameter

    Description

    Required

    Data type

    Default value

    Remarks

    server-id

    The ID that is allocated to a database client.

    No

    STRING

    By default, a value in the range of 5400 to 6400 is randomly generated.

    The ID must be unique in a MySQL cluster. We recommend that you configure a unique server ID for each deployment in the same database.

    This parameter can also be set to an ID range, such as 5400-5408. If the incremental savepoint algorithm is enabled, parallel reading is supported. In this case, we recommend that you set this parameter to an ID range so that each parallel subtask uses a unique ID.

    scan.incremental.snapshot.enabled

    Specifies whether to enable the incremental savepoint algorithm.

    No

    BOOLEAN

    true

    By default, the incremental savepoint algorithm is enabled. The incremental savepoint algorithm is a new mechanism that is used to read savepoints of full data. Compared with the original savepoint reading algorithm, the incremental savepoint algorithm has the following advantages:

    • When the MySQL CDC source connector reads full data, the connector can read data in multiple subtasks at the same time.

    • When the MySQL CDC source connector reads full data, the connector can generate checkpoints for chunks.

    • When the MySQL CDC source connector reads full data, the connector does not need to execute the FLUSH TABLES WITH READ LOCK statement to obtain a global read lock.

    If you want the MySQL CDC source connector to support parallel reading, a unique server ID must be allocated to each parallel reader. Therefore, you must set the server-id parameter to a range that is greater than or equal to the value of the Parallelism parameter. For example, you can set the server-id parameter to 5400-6400.

    scan.incremental.snapshot.chunk.size

    The chunk size of the table. The chunk size indicates the number of rows.

    No

    INTEGER

    8096

    If the incremental savepoint algorithm is enabled, the table is split into multiple chunks for data reading. Before data in a chunk is read, the data is cached in the memory. Therefore, if the chunk size is excessively large, an out of memory (OOM) error may occur. If the chunk size is small, a small number of data records are read after fault recovery. This reduces data throughput.

    scan.snapshot.fetch.size

    The maximum number of data records that can be extracted each time full data of a table is read.

    No

    INTEGER

    1024

    N/A.

    scan.startup.mode

    The startup mode when data is consumed.

    No

    STRING

    initial

    Valid values:

    • initial: When the MySQL CDC source connector starts for the first time, the connector scans all historical data and reads the most recent binary log data. This is the default value.

    • latest-offset: When the MySQL CDC source connector starts for the first time, the connector reads binary log data from the most recent offset, instead of scanning all historical data. This way, the connector reads only the most recent incremental data after the connector starts.

    • earliest-offset: The MySQL CDC source connector does not scan full historical data. The connector starts to read accessible binary log data from the earliest offset.

    • specific-offset: The MySQL CDC source connector does not scan full historical data. The connector starts to scan binary log data from the specified offset. You can configure the scan.startup.specific-offset.file and scan.startup.specific-offset.pos parameters to specify the name of the binary log file and offset at which the scan starts. You can also configure the scan.startup.specific-offset.gtid-set parameter to specify a global transaction identifier (GTID) set at which the scan starts.

    • timestamp: The MySQL CDC source connector does not scan full historical data. The connector starts to read binary log data from the specified timestamp. The timestamp is specified by the scan.startup.timestamp-millis parameter. Unit: milliseconds.

    Important
    • You can set the scan.startup.mode parameter to earliest-offset, specific-offset, or timestamp in Realtime Compute for Apache Flink that uses VVR 6.0.4 or later.

    • If the scan.startup.mode parameter is set to earliest-offset, specific-offset, or timestamp for a deployment and the table schema at the startup time is different from the table schema at the specified start offset time, an error is reported in the deployment. If you use one of the three startup modes for a deployment, you must ensure that the table schema does not change between the specified binary log consumption offset time and the startup time of the deployment.

    scan.startup.specific-offset.file

    The name of the binary log file that is used to specify the startup offset when the scan.startup.mode parameter is set to specific-offset.

    No

    STRING

    No default value

    This parameter is required when the scan.startup.mode parameter is set to specific-offset. Example of a file name: mysql-bin. 000003.

    scan.startup.specific-offset.pos

    The startup offset that is specified in the specified binary log file when the scan.startup.mode parameter is set to specific-offset.

    No

    INTEGER

    No default value

    This parameter is required when the scan.startup.mode parameter is set to specific-offset.

    scan.startup.specific-offset.gtid-set

    A GTID set that is used to specify the startup offset when the scan.startup.mode parameter is set to specific-offset.

    No

    STRING

    No default value

    This parameter is required when the scan.startup.mode parameter is set to specific-offset. Example of a GTID: 24DA167-0C0C-11E8-8442-00059A3C7B00:1-19.

    scan.startup.timestamp-millis

    The timestamp in milliseconds that is used to specify the startup offset when the scan.startup.mode parameter is set to timestamp.

    No

    LONG

    No default value

    This parameter is required when the scan.startup.mode parameter is set to timestamp. The timestamp is measured in milliseconds.

    Important

    If you set the scan.startup.mode parameter to timestamp, the MySQL CDC source connector attempts to read the initial event of each binary log file to determine the timestamp of the file and finally locates the binary log file that corresponds to the specified timestamp. Make sure that the binary log file that corresponds to the specified timestamp is not deleted from the database and can be read.

    server-time-zone

    The time zone of the session that is used by the database.

    Required only in a VVR version that is earlier than 6.0.2

    STRING

    If you do not configure this parameter, the system uses the time zone of the environment in which the fully managed Flink deployment runs as the time zone of the database server. The time zone is the time zone of the zone that you selected.

    Example: Asia/Shanghai. This parameter determines how to convert data in the MySQL database from the TIMESTAMP type to the STRING type. For more information, see Debezium time types.

    debezium.min.row.count.to.stream.results

    The minimum number of data records allowed in a table to trigger the batch data read mode. If the number of data records in a table is greater than the value of this parameter, the batch data read mode is used.

    No

    INTEGER

    1000

    Fully managed Flink reads data from a MySQL source table in one of the following modes:

    • Full data read: reads all data from the table and writes the data to the memory. Data is read at a high speed, but memory space is consumed. If the source table contains a large amount of data, an OOM error may occur in this mode.

    • Batch data read: reads data of a specific number of rows at a specified point in time until all data is read. An OOM error is prevented even if the table contains a large amount of data. However, data is read at a low speed.

    connect.timeout

    The maximum duration for which the connector waits before the connector makes a retry if the connection to the MySQL database server times out.

    No

    DURATION

    30s

    N/A.

    connect.max-retries

    The maximum number of retries after the connection to the MySQL database server fails.

    No

    INTEGER

    3

    N/A.

    connection.pool.size

    The size of the database connection pool.

    No

    INTEGER

    20

    The database connection pool is used to reuse connections. This can reduce the number of database connections.

    jdbc.properties.*

    Custom Java Database Connectivity (JDBC) URL parameters.

    No

    STRING

    No default value

    You can pass custom JDBC parameters. For example, if you do not want to use the SSL protocol, you can set the jdbc.properties.useSSL parameter to false.

    For more information about the JDBC parameters, see MySQL Configuration Properties.

    debezium.*

    The custom Debezium parameter that is used to read binary log data.

    No

    STRING

    No default value

    You can pass custom Debezium parameters. For example, you can use 'debezium.event.deserialization.failure.handling.mode'='ignore' to specify the logic for handling parsing errors.

    heartbeat.interval

    The interval at which the source moves forward the binary log file position by using heartbeat events.

    No

    DURATION

    30s

    Heartbeat events are used to identify the latest position of the binary log file that is read from the source. For slowly updated tables in MySQL, the binary log file position cannot automatically move forward. The source can move forward the binary log file position by using heartbeat events. This can prevent the binary log file position from expiring. If the binary log file position expires, the deployment fails to run and cannot recover from the failure. If this occurs, you can only select Start without state for Start Method and specify Start Time for Reading Data to start the deployment.

    scan.incremental.snapshot.chunk.key-column

    Specifies columns for sharding in the savepoint phase.

    For more information, see the Remarks column.

    STRING

    No default value

    • This parameter is required for a table that does not have a primary key. The selected columns must be of the NOT NULL data type.

    • This parameter is optional for a table that has a primary key. You can select only one column from the primary key of the table for sharding.

    Note

    Only Realtime Compute for Apache Flink that uses VVR 6.0.7 or later supports this parameter.

    rds.region-id

    The ID of the region to which the ApsaraDB RDS instance belongs.

    This parameter is required when you use the feature of reading OSS archived logs.

    STRING

    No default value

    • Only Realtime Compute for Apache Flink that uses VVR 6.0.7 or later supports this parameter.

    • For more information about region IDs, see Regions and zones.

    rds.access-key-id

    The AccessKey ID of your Alibaba Cloud account.

    This parameter is required when you use the feature of reading OSS archived logs.

    STRING

    No default value

    For more information, see How do I view information about the AccessKey ID and AccessKey secret of the account?

    Important
    • Only Realtime Compute for Apache Flink that uses VVR 6.0.7 or later supports this parameter.

    • To protect your AccessKey pair, we recommend that you specify the AccessKey ID by using the key management method. For more information, see Manage keys.

    rds.access-key-secret

    The AccessKey secret of your Alibaba Cloud account.

    This parameter is required when you use the feature of reading OSS archived logs.

    STRING

    N/A

    For more information, see How do I view information about the AccessKey ID and AccessKey secret of the account?

    Important
    • Only Realtime Compute for Apache Flink that uses VVR 6.0.7 or later supports this parameter.

    • To protect your AccessKey pair, we recommend that you specify the AccessKey secret by using the key management method. For more information, see Manage keys.

    rds.db-instance-id

    The IDs of the ApsaraDB RDS instances that you want to associate with the scaling group.

    This parameter is required when you use the feature of reading OSS archived logs.

    STRING

    No default value

    Only Realtime Compute for Apache Flink that uses VVR 6.0.7 or later supports this parameter.

    scan.incremental.close-idle-reader.enabled

    Specifies whether to close the idle reader after the savepoint is created.

    No

    BOOLEAN

    false

    • Only Realtime Compute for Apache Flink that uses VVR 8.0.1 or later supports this parameter.

    • To make the configuration of this parameter take effect, you must set the execution.checkpointing.checkpoints-after-tasks-finish.enabled parameter to true.

  • Parameters only for dimension tables

    Parameter

    Description

    Required

    Data type

    Default value

    Remarks

    url

    The JDBC URL that is used to access the MySQL database.

    No

    STRING

    No default value

    The URL is in the jdbc:mysql://<Endpoint>:<Port number>/<Database name> format.

    lookup.max-retries

    The maximum number of retries to read data after the data fails to be read.

    No

    INTEGER

    3

    Only Realtime Compute for Apache Flink that uses VVR 6.0.7 or later supports this parameter.

    lookup.cache.strategy

    The cache policy.

    No

    STRING

    None

    Valid values: None, LRU, and ALL. For more information about the valid values, see Background information.

    Note

    If you set this parameter to LRU, you must also configure the lookup.cache.max-rows parameter.

    lookup.cache.max-rows

    The maximum number of data records that can be cached.

    No

    INTEGER

    100000

    • You must configure this parameter if the lookup.cache.strategy parameter is set to LRU.

    • You do not need to configure this parameter if the lookup.cache.strategy parameter is set to ALL.

    lookup.cache.ttl

    The cache timeout period.

    No

    DURATION

    10s

    The configuration of the lookup.cache.ttl parameter varies based on the value of the lookup.cache.strategy parameter.

    • If you set the lookup.cache.strategy parameter to None, you do not need to configure the lookup.cache.ttl parameter. This indicates that cache entries do not expire.

    • If you set the lookup.cache.strategy parameter to LRU, the lookup.cache.ttl parameter indicates the cache timeout period. By default, cache entries do not expire.

    • If you set the lookup.cache.strategy parameter to ALL, the lookup.cache.ttl parameter indicates the cache refresh period. By default, the cache is not refreshed.

    The value of this parameter must be in the time format. For example, you can set this parameter to 1min or 10s.

    lookup.max-join-rows

    The maximum number of results that are returned each time a data record in the primary table is queried and matched with data records in the dimension table.

    No

    INTEGER

    1024

    Only Realtime Compute for Apache Flink that uses VVR 6.0.7 or later supports this parameter.

  • Parameters only for result tables

    Parameter

    Description

    Required

    Data type

    Default value

    Remarks

    url

    The JDBC URL that is used to access the MySQL database.

    No

    STRING

    No default value

    The URL is in the jdbc:mysql://<Endpoint>:<Port number>/<Database name> format.

    sink.max-retries

    The maximum number of retries to write data to the table after data fails to be written.

    No

    INTEGER

    3

    N/A.

    sink.buffer-flush.batch-size

    The number of data records that can be written at a time.

    No

    INTEGER

    4096

    Only Realtime Compute for Apache Flink that uses VVR 6.0.7 or later supports this parameter.

    sink.buffer-flush.max-rows

    The maximum number of data records that can be cached in the memory.

    No

    INTEGER

    • If Realtime Compute for Apache Flink uses a VVR version earlier than 6.0.7, the default value of this parameter is 100.

    • If Realtime Compute for Apache Flink uses VVR 6.0.7 or later, the default value of this parameter is 10000.

    This parameter takes effect only after you specify the primary key.

    sink.buffer-flush.interval

    The interval at which the cache is cleared. The value of this parameter indicates that if the number of input data records does not reach the value specified by the batchSize parameter within the specified time, all cached data is written to the result table.

    No

    DURATION

    1s

    N/A.

    sink.ignore-delete

    Specifies whether to ignore delete operations.

    No

    BOOLEAN

    false

    • Only Realtime Compute for Apache Flink that uses VVR 6.0.7 or later supports this parameter.

    • Delete operations may occur when you use Flink SQL. If multiple output operators update different fields in the same result table based on the primary key, the data result may be incorrect.

      For example, a data record is deleted in a task and then only some fields of the data record are updated in another task. In this case, the values of the fields that are not updated become null or default values because the fields are deleted. To avoid delete operations, you can set the ignoreDelete parameter to true.

    sink.ignore-null-when-update

    Specifies whether to update the fields that are empty with null values during data updates.

    No

    BOOLEAN

    false

    Valid values:

    • true: The fields that are empty are skipped. This parameter can be set to true only after a primary key is specified for the result table. In this case, data is not written to the result table in batches.

    • false: The fields that are empty are updated with null values.

    Note

    This parameter is available only in Realtime Compute for Apache Flink that uses VVR 8.0.5 or later.

Data type mappings

  • MySQL CDC source table

    Data type of MySQL CDC

    Data type of Flink

    TINYINT

    TINYINT

    SMALLINT

    SMALLINT

    TINYINT UNSIGNED

    TINYINT UNSIGNED ZEROFILL

    INT

    INT

    MEDIUMINT

    SMALLINT UNSIGNED

    SMALLINT UNSIGNED ZEROFILL

    BIGINT

    BIGINT

    INT UNSIGNED

    INT UNSIGNED ZEROFILL

    MEDIUMINT UNSIGNED

    MEDIUMINT UNSIGNED ZEROFILL

    BIGINT UNSIGNED

    DECIMAL(20, 0)

    BIGINT UNSIGNED ZEROFILL

    SERIAL

    FLOAT [UNSIGNED] [ZEROFILL]

    FLOAT

    DOUBLE [UNSIGNED] [ZEROFILL]

    DOUBLE

    DOUBLE PRECISION [UNSIGNED] [ZEROFILL]

    REAL [UNSIGNED] [ZEROFILL]

    NUMERIC(p, s) [UNSIGNED] [ZEROFILL]

    DECIMAL(p, s)

    DECIMAL(p, s) [UNSIGNED] [ZEROFILL]

    BOOLEAN

    BOOLEAN

    TINYINT(1)

    DATE

    DATE

    TIME [(p)]

    TIME [(p)] [WITHOUT TIME ZONE]

    DATETIME [(p)]

    TIMESTAMP [(p)] [WITHOUT TIME ZONE]

    TIMESTAMP [(p)]

    TIMESTAMP [(p)]

    TIMESTAMP [(p)] WITH LOCAL TIME ZONE

    CHAR(n)

    STRING

    VARCHAR(n)

    TEXT

    BINARY

    BYTES

    VARBINARY

    BLOB

    Important

    We recommend that you use fields of the TINYINT(1) data type of MySQL to store only values 0 and 1. When the MySQL database uses the TINYINT(1) data type to store data of the BOOLEAN data type, the MySQL CDC source table maps the TINYINT(1) data type to the BOOLEAN data type of Flink. If you use fields of the TINYINT(1) data type of MySQL to store values other than 0 and 1, the data may be inaccurate.

  • Dimension table and result table

    Data type of MySQL

    Data type of Flink

    TINYINT

    TINYINT

    SMALLINT

    SMALLINT

    TINYINT UNSIGNED

    INT

    INT

    MEDIUMINT

    SMALLINT UNSIGNED

    BIGINT

    BIGINT

    INT UNSIGNED

    BIGINT UNSIGNED

    DECIMAL(20, 0)

    FLOAT

    FLOAT

    DOUBLE

    DOUBLE

    DOUBLE PRECISION

    NUMERIC(p, s)

    DECIMAL(p, s)

    Note

    The value of p is less than or equal to 38.

    DECIMAL(p, s)

    BOOLEAN

    BOOLEAN

    TINYINT(1)

    DATE

    DATE

    TIME [(p)]

    TIME [(p)] [WITHOUT TIME ZONE]

    DATETIME [(p)]

    TIMESTAMP [(p)] [WITHOUT TIME ZONE]

    TIMESTAMP [(p)]

    CHAR(n)

    CHAR(n)

    VARCHAR(n)

    VARCHAR(n)

    BIT(n)

    BINARY(⌈n/8⌉)

    BINARY(n)

    BINARY(n)

    VARBINARY(N)

    VARBINARY(N)

    TINYTEXT

    STRING

    TEXT

    MEDIUMTEXT

    LONGTEXT

    TINYBLOB

    BYTES

    Important

    Flink supports only MySQL binary large object (BLOB) records that are less than or equal to 2,147,483,647(2^31 - 1).

    BLOB

    MEDIUMBLOB

    LONGBLOB

Sample code

  • Sample code for a MySQL CDC source table

    CREATE TEMPORARY TABLE mysqlcdc_source (
       order_id INT,
       order_date TIMESTAMP(0),
       customer_name STRING,
       price DECIMAL(10, 5),
       product_id INT,
       order_status BOOLEAN,
       PRIMARY KEY(order_id) NOT ENFORCED
    ) WITH (
      'connector' = 'mysql',
      'hostname' = '<yourHostname>',
      'port' = '3306',
      'username' = '<yourUsername>',
      'password' = '<yourPassword>',
      'database-name' = '<yourDatabaseName>',
      'table-name' = '<yourTableName>'
    );
    
    CREATE TEMPORARY TABLE blackhole_sink(
      order_id INT,
      customer_name STRING
    ) WITH (
      'connector' = 'blackhole'
    );
    
    INSERT INTO blackhole_sink
    SELECT order_id, customer_name FROM mysqlcdc_source;
  • Sample code for a MySQL dimension table

    CREATE TEMPORARY TABLE datagen_source(
      a INT,
      b BIGINT,
      c STRING,
      `proctime` AS PROCTIME()
    ) WITH (
      'connector' = 'datagen'
    );
    
    CREATE TEMPORARY TABLE mysql_dim (
      a INT,
      b VARCHAR,
      c VARCHAR
    ) WITH (
      'connector' = 'mysql',
      'hostname' = '<yourHostname>',
      'port' = '3306',
      'username' = '<yourUsername>',
      'password' = '<yourPassword>',
      'database-name' = '<yourDatabaseName>',
      'table-name' = '<yourTableName>'
    );
    
    CREATE TEMPORARY TABLE blackhole_sink(
      a INT,
      b STRING
    ) WITH (
      'connector' = 'blackhole'
    );
    
    INSERT INTO blackhole_sink
    SELECT T.a, H.b
    FROM datagen_source AS T JOIN mysql_dim FOR SYSTEM_TIME AS OF T.`proctime` AS H ON T.a = H.a;
  • Sample code for a MySQL result table

    CREATE TEMPORARY TABLE datagen_source (
      `name` VARCHAR,
      `age` INT
    ) WITH (
      'connector' = 'datagen'
    );
    
    CREATE TEMPORARY TABLE mysql_sink (
      `name` VARCHAR,
      `age` INT
    ) WITH (
      'connector' = 'mysql',
      'hostname' = '<yourHostname>',
      'port' = '3306',
      'username' = '<yourUsername>',
      'password' = '<yourPassword>',
      'database-name' = '<yourDatabaseName>',
      'table-name' = '<yourTableName>'
    );
    
    INSERT INTO mysql_sink
    SELECT * FROM datagen_source;

Basic concepts about MySQL CDC source tables

  • Principles

    When the MySQL CDC source connector is started, the connector scans the whole table whose data needs to be read, splits the table into multiple chunks based on the primary key, and then records the binary log file position at this time. Then, the MySQL CDC source connector uses the incremental savepoint algorithm to read data from each chunk by using the SELECT statement. The deployment periodically generates checkpoints to record the chunks whose data is read. If a failover occurs, the MySQL CDC source connector needs to only continue reading data from the chunks whose data is not read. After the data of all chunks is read, incremental change records are read from the previous binary log file position. The deployment continues periodically generating checkpoints to record the binary log file position. If a failover occurs, the MySQL CDC source connector processes data from the previous binary log file position. This way, the exactly-once semantics is implemented.

    For more information about the incremental savepoint algorithm, see MySQL CDC Connector.

  • Metadata

    In most cases, access to metadata is required when you merge and synchronize tables in a sharded database. If you expect to identify data records by the source database names and table names after tables are merged, you can configure metadata columns in the data merging statement to read the source database name and table name of each data record. This way, you can identify the source of each data record after tables are merged.

    MySQL CDC source tables that are used by fully managed Flink of vvr-4.0.11-flink-1.13 or later support the metadata column syntax. The following table describes the metadata that you can access by using metadata columns.

    Metadata key

    Metadata type

    Description

    database_name

    STRING NOT NULL

    The name of the source database to which the current data record belongs.

    table_name

    STRING NOT NULL

    The name of the source table to which the current data record belongs.

    op_ts

    TIMESTAMP_LTZ(3) NOT NULL

    The time when the current data record changes in the database. If the data record is obtained from the historical data of the table instead of from the binary log file, the value of the metadata key is fixed to 0.

    The following sample code shows how to merge multiple orders tables in database shards of a MySQL instance into a MySQL table named mysql_orders and synchronize data from the MySQL table to a Hologres table named holo_orders.

    CREATE TABLE mysql_orders (
      db_name STRING METADATA FROM 'database_name' VIRTUAL,  -- Read the database name. 
      table_name STRING METADATA  FROM 'table_name' VIRTUAL, -- Read the table name. 
      operation_ts TIMESTAMP_LTZ(3) METADATA FROM 'op_ts' VIRTUAL, -- Read the change time. 
      order_id INT,
      order_date TIMESTAMP(0),
      customer_name STRING,
      price DECIMAL(10, 5),
      product_id INT,
      order_status BOOLEAN,
      PRIMARY KEY(order_id) NOT ENFORCED
    ) WITH (
      'connector' = 'mysql-cdc',
      'hostname' = 'localhost',
      'port' = '3306',
      'username' = 'flinkuser',
      'password' = 'flinkpw',
      'database-name' = 'mydb_.*', -- Use a regular expression to match multiple database shards. 
      'table-name' = 'orders_.*'   -- Use a regular expression to match multiple tables in the sharded database. 
    );
    
    INSERT INTO holo_orders SELECT * FROM mysql_orders;
  • Regular expressions

    MySQL CDC source tables allow you to use regular expressions in table names or database names to match multiple tables or databases. The following sample code provides an example on how to use a regular expression to specify multiple tables.

    CREATE TABLE products (
      db_name STRING METADATA FROM 'database_name' VIRTUAL,
      table_name STRING METADATA  FROM 'table_name' VIRTUAL,
      operation_ts TIMESTAMP_LTZ(3) METADATA FROM 'op_ts' VIRTUAL,
      order_id INT,
      order_date TIMESTAMP(0),
      customer_name STRING,
      price DECIMAL(10, 5),
      product_id INT,
      order_status BOOLEAN,
      PRIMARY KEY(order_id) NOT ENFORCED
    ) WITH (
      'connector' = 'mysql-cdc',
      'hostname' = 'localhost',
      'port' = '3306',
      'username' = 'root',
      'password' = '123456',
      'database-name' = '(^(test).*|^(tpc).*|txc|.*[p$]|t{2})', -- Use the regular expression to match multiple databases. 
      'table-name' = '(t[5-8]|tt)' -- Use the regular expression to match multiple tables. 
    );

    Elements in the preceding regular expressions:

    • In this example, ^(test).* is used to match a database whose name starts with the specified prefix. This expression can be used to match databases whose names start with test, such as test1 or test2.

    • In this example, .*[p$] is used to match a database whose name ends with the specified suffix. This expression can be used to match databases whose names end with p, such as cdcp or edcp.

    • In this example, txc is used to match the specified database name. This expression can be used to match a database that has a specific name, such as txc.

    When the MySQL CDC source connector matches a full-path table name, the connector uses the database name and table name to uniquely identify a table. This way, database-name.table-name is used as the pattern to match a table. For example, the (^(test).*| ^(tpc).*| txc |.*[p$]| t{2}).(t[ 5-8]| tt) pattern can match the txc.tt and test2.test5 tables in the database.

    Important

    Values of table-name and database-name can also be separated by commas (,) to specify multiple tables or databases, such as 'table-name' = 'mytable1,mytable2'. However, the separator conflicts with the commas (,) in regular expressions. If you use a regular expression that contains commas (,), you must rewrite the regular expression into an equivalent regular expression that contains vertical bars (|). For example, if you use the mytable_\d{1, 2) regular expression, you must rewrite the regular expression into an equivalent regular expression (mytable_\d{1}|mytable_\d{2}) to prevent comma conflicts.

  • Parallelism control

    The MySQL CDC source connector can read full data in multiple subtasks at the same time. This improves the efficiency of data loading. If you use the MySQL CDC source connector together with the Autopilot feature that is provided by the VVP of fully managed Flink, automatic scale-in can be performed during incremental data reading after parallel reading is complete. This helps reduce the computing resources that are consumed.

    You can configure the Parallelism parameter in Basic mode or Expert mode in the Deployment Starting Configuration - Streaming dialog box in the console of fully managed Flink. The setting of the Parallelism parameter varies based on the resource configuration mode.

    • In Basic mode, the Parallelism parameter specifies the global parallelism of all deployments.基础模式

    • In Expert mode, you can configure the Parallelism parameter for a specific vertex node based on your business requirements.vertex并发

    For more information about resource configurations, see Configure a deployment.

    Important

    When you configure the Parallelism parameter for a deployment in Basic mode or Expert mode, make sure that the range of the server-id parameter declared in the table is greater than or equal to the value of the Parallelism parameter for the deployment. For example, if the range of the server-id parameter is 5404-5412, eight unique server IDs can be used. Therefore, you can configure a maximum of eight parallel subtasks. In addition, the range specified by the server-id parameter for the same MySQL instance in different deployments cannot overlap. Each deployment must be explicitly configured with a unique server ID.

  • Automatic scale-in by using Autopilot

    When full data is read, a large amount of historical data is accumulated. In most cases, fully managed Flink reads historical data in parallel to improve the efficiency of data reading. When incremental data is read, only a single subtask is run because the amount of binary log data is small and the global order must be ensured. The numbers of compute units (CUs) that are required during full data reading and incremental data reading are different. You can use the Autopilot feature to balance performance and resource consumption.

    Autopilot monitors the traffic for each task that is used by the MySQL CDC source table. If the binary log data is read in only one task and other tasks are idle during incremental data reading, Autopilot automatically reduces the number of CUs and the degree of parallelism. To enable Autopilot, you need only to set the Mode parameter of Autopilot to Active on the Deployments page.

    Note

    By default, the minimum interval at which the degree of parallelism is decreased is 24 hours. For more information about automatic tuning and related parameters, see Configure automatic tuning.

  • Startup mode

    You can configure the scan.startup.mode parameter to specify the mode in which the MySQL CDC source connector starts to read data from a MySQL database. Valid values:

    • initial: When the MySQL CDC source connector starts for the first time, the connector reads full data in a database table. After the full data reading is complete, the connector switches to the incremental reading mode to read binary log data. This is the default value.

    • earliest-offset: The MySQL CDC source connector skips the savepoint reading phase and starts to read accessible binary log data from the earliest offset.

    • latest-offset: The MySQL CDC source connector skips the savepoint reading phase and starts to read binary log data from the most recent offset. In this mode, the MySQL CDC source connector can read only data changes after the deployment is started.

    • specific-offset: The MySQL CDC source connector skips the savepoint reading phase and starts to read binary log data from the specified offset. You can specify the startup offset by using the name of a binary log file and the binary log file position or by using a GTID set.

    • timestamp: The MySQL CDC source connector skips the savepoint reading phase and starts to read binary log events from the specified timestamp.

    Sample code:

    CREATE TABLE mysql_source (...) WITH (
        'connector' = 'mysql-cdc',
        'scan.startup.mode'='earliest-offset, ' -- Starts to read data from the earliest offset. 
        'scan.startup.mode' = 'latest-offset', -- Starts to read data from the most recent offset. 
        'scan.startup.mode' = 'specific-offset', -- Starts to read data from the specified offset. 
        'scan.startup.mode' = 'timestamp', -- Starts to read data from the specified timestamp. 
        'scan.startup.specific-offset.file' = 'mysql-bin.000003', -- Specifies the name of the binlog log file when the scan.startup.mode parameter is set to specific-offset. 
        'scan.startup.specific-offset.pos' = '4', -- Specifies the binary log file position when the scan.startup.mode parameter is set to specific-offset. 
        'scan.startup.specific-offset.gtid-set' = '24DA167-0C0C-11E8-8442-00059A3C7B00:1-19', -- Specifies the GTID set when the scan.startup.mode parameter is set to specific-offset. 
        'scan.startup.timestamp-millis' = '1667232000000' -- Specifies the startup timestamp when the scan.startup.mode parameter is set to timestamp. 
        ...
    )
    Important
    • When a checkpoint is created, the MySQL CDC source connector records the current binary log event at the current offset as a log entry of the INFO level. The prefix of the log entry is Binlog offset on checkpoint {checkpoint-id}. The log entry can help you start the deployment from a specific checkpoint.

    • If the schema of a table is changed after the table is read, an error may occur when the scan.startup.mode parameter is set to earliest-offset, specific-offset, or timestamp. This is because the Debezium reader saves the latest table schema and earlier data that does not match the table schema cannot be correctly parsed.

  • MySQL CDC source tables that do not have a primary key

    • In Realtime Compute for Apache Flink that uses VVR 6.0.7 or later, you can use MySQL CDC source tables that do not have a primary key. To use a MySQL CDC source table that does not have a primary key, you must configure the scan.incremental.snapshot.chunk.key-column parameter and specify only non-null fields.

    • The processing semantics of a MySQL CDC source table that does not have a primary key is determined based on the behavior of the columns that are specified by the scan.incremental.snapshot.chunk.key-column parameter.

      • If no update operation is performed on the specified columns, the exactly-once semantics is ensured.

      • If the update operation is performed on the specified columns, only the at-least-once semantics is ensured. However, you can specify the downstream primary key and perform the idempotence operation to ensure data correctness.

MySQL CDC DataStream API

Important

If you want to call a DataStream API to read or write data, you must use a DataStream connector of the related type to connect to fully managed Flink. For more information about how to configure a DataStream connector, see Usage of DataStream connectors.

Create a DataStream API program and use the MySqlSource class. Sample code:

import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import com.ververica.cdc.debezium.JsonDebeziumDeserializationSchema;
import com.ververica.cdc.connectors.mysql.source.MySqlSource;
public class MySqlSourceExample {
  public static void main(String[] args) throws Exception {
    MySqlSource<String> mySqlSource = MySqlSource.<String>builder()
        .hostname("yourHostname")
        .port(yourPort)
        .databaseList("yourDatabaseName") // set captured database
        .tableList("yourDatabaseName.yourTableName") // set captured table
        .username("yourUsername")
        .password("yourPassword")
        .deserializer(new JsonDebeziumDeserializationSchema()) // converts SourceRecord to JSON String
        .build();
    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    // enable checkpoint
    env.enableCheckpointing(3000);
    env
      .fromSource(mySqlSource, WatermarkStrategy.noWatermarks(), "MySQL Source")
      // set 4 parallel source tasks
      .setParallelism(4)
      .print().setParallelism(1); // use parallelism 1 for sink to keep message ordering
    env.execute("Print MySQL Snapshot + Binlog");
  }
}
                

The following table describes the parameters that you must configure for the MySqlSource class.

Parameter

Description

hostname

The IP address or hostname that is used to access the MySQL database.

port

The port that is used to access the MySQL database.

databaseList

The name of the MySQL database that you want to access.

Note

If you want to read data from multiple databases, you can set this parameter to a regular expression. You can use .* to match all databases.

username

The username that is used to access the MySQL database.

password

The password that is used to access the MySQL database.

deserializer

A deserializer, which deserializes SourceRecord into a specified type. Valid values:

  • RowDataDebeziumDeserializeSchema: deserializes SourceRecords to the internal data structure RowData of the Flink Table API or Flink SQL API.

  • JsonDebeziumDeserializationSchema: deserializes SourceRecords to JSON strings.

FAQ

For more information about issues that may occur when you use a MySQL CDC source table, see FAQ about CDC.