All Products
Search
Document Center

AnalyticDB:Use Flink to subscribe to binary logs

Last Updated:Jul 29, 2025

Realtime Compute for Apache Flink can subscribe to AnalyticDB for MySQL to capture and process database change data in real time. This enables efficient data synchronization and stream computing. This topic describes how to use Flink to subscribe to binary logs from AnalyticDB for MySQL.

Prerequisites

  • The AnalyticDB for MySQL clusters are of the Enterprise Edition, Basic Edition, Data Lakehouse Edition, or Data Warehouse Edition in elastic mode.

  • The minor versions of the AnalyticDB for MySQL clusters are 3.2.1.0 or later.

    Note
  • The Flink real-time computing engine is VVR 8.0.4 or later.

  • The AnalyticDB for MySQL cluster and the fully managed Flink workspace are in the same VPC.

  • You have added the CIDR block of the Flink workspace to the whitelist of AnalyticDB for MySQL.

Limits

  • The binary logging feature cannot be enabled for XUANWU_V2 tables. Therefore, you cannot use binary log subscriptions to perform data synchronization or stream computing on XUANWU_V2 tables in AnalyticDB for MySQL clusters.

  • Flink can process binary logs from AnalyticDB for MySQL only for basic data types and the complex JSON data type.

  • Flink does not process records in AnalyticDB for MySQL binary logs that are related to DDL operations or automatic partition deletion operations on partitioned tables.

Step 1: Enable the binary logging feature

  1. Enable the binary logging feature for a table in the source AnalyticDB for MySQL cluster. In this example, a table named source_table is used.

    Note

    You can enable the binary logging feature only for tables in AnalyticDB for MySQL.

    Enable the binary logging feature when you create a table

    CREATE TABLE source_table (
      `id` INT,
      `num` BIGINT,
      PRIMARY KEY (`id`)
    )DISTRIBUTED BY HASH (id) BINLOG=true;

    Enable the binary logging feature for an existing table

    ALTER TABLE source_table BINLOG=true;
  2. (Optional) Change the retention period of binary logs.

    You can modify the binlog_ttl parameter to change the retention period of binary logs. The default value of the parameter is 6h. Execute the following statement to change the retention period of binary logs to 1 day for the source_table table:

    ALTER TABLE source_table binlog_ttl='1d';

    The binlog_ttl parameter supports values in the following formats:

    • Millisecond: pure number. For example, 60 specifies 60 milliseconds.

    • Second: number + s. For example, 30s specifies 30 seconds.

    • Hour: number + h. For example, 2h specifies 2 hours.

    • Day: number + d. For example, 1d specifies 1 day.

    Note
    • We recommend that you set the retention period of binary logs to a value that is greater than or equal to the default value of the binlog_ttl parameter. If you set the retention period to a small value, binary logs may be deleted and data synchronization fails.

    • To query the current retention period of binary logs, execute the SHOW CREATE TABLE source_table; statement.

Step 2: Upload the AnalyticDB for MySQL connector to Flink

  1. Download the connector.

  2. Log on to the Realtime Compute for Apache Flink console.

  3. On the Fully Managed Flink tab, find the workspace that you want to manage and click Console in the Actions column.

  4. In the left navigation pane, click Connectors.

  5. On the Connectors page, click Create Custom Connector.

  6. Upload the connector that you downloaded in Step 1 and click Next.

  7. Click Finish. The custom connector appears in the connector list.

Step 3: Subscribe to binary logs

  1. Log on to the Realtime Compute for Apache Flink console and create an SQL job.

  2. Create a source table to connect to AnalyticDB for MySQL and read binary log data from a specified table (source_table).

    Note
    • The primary key defined in the Flink DDL statement must be identical to the primary key in the physical table of the AnalyticDB for MySQL cluster, including the key name. If they are not identical, data correctness is affected.

    • The data types in Flink must be compatible with those in AnalyticDB for MySQL. For more information about the mappings, see Type mappings.

    CREATE TEMPORARY TABLE adb_source (
      `id` INT,
      `num` BIGINT,
      PRIMARY KEY (`id`) NOT ENFORCED
    ) WITH (
      'connector' = 'adb-mysql-cdc',
      'hostname' = 'amv-2zepb9n1l58ct01z50000****.ads.aliyuncs.com',
      'username' = 'testUser',
      'password' = 'Test12****',
      'database-name' = 'binlog',
      'table-name' = 'source_table'
    );

    The following table describes the parameters in the WITH clause.

    Parameter

    Required

    Default value

    Data type

    Description

    connector

    Yes

    None

    STRING

    The connector to use.

    This parameter is required. Set the value to adb-mysql-cdc.

    hostname

    Yes

    None

    STRING

    The VPC endpoint of AnalyticDB for MySQL.

    username

    Yes

    None

    STRING

    The AnalyticDB for MySQL database account.

    password

    Yes

    None

    STRING

    The password of the AnalyticDB for MySQL database account.

    database-name

    Yes

    None

    STRING

    The name of the AnalyticDB for MySQL database.

    Because AnalyticDB for MySQL implements table-level binary logging, you can specify only one database.

    table-name

    Yes

    None

    STRING

    The name of the table in the AnalyticDB for MySQL database.

    Because AnalyticDB for MySQL implements table-level binary logging, you can specify only one table.

    port

    No

    3306

    INTEGER

    The port number.

    scan.incremental.snapshot.enabled

    No

    true

    BOOLEAN

    Incremental snapshot.

    This feature is enabled by default. Incremental snapshot is a new mechanism for reading table snapshots. Compared with the legacy snapshot mechanism, the incremental snapshot mechanism provides the following benefits:

    • During a snapshot read, the source supports concurrent reads.

    • During a snapshot read, the source supports checkpoints at the chunk granularity.

    • Before a snapshot read, the source does not need to obtain database lock permissions.

    scan.incremental.snapshot.chunk.size

    No

    8096

    INTEGER

    The size of a table snapshot chunk, which is the number of rows that a chunk contains.

    When incremental snapshot reading is enabled, the table is split into multiple chunks to be read.

    scan.snapshot.fetch.size

    No

    1024

    INTEGER

    The maximum number of rows that can be read each time a table snapshot is read.

    scan.startup.mode

    No

    initial

    STRING

    The startup mode for data consumption.

    Valid values:

    • initial (default): When the job is started for the first time, it scans all historical data and then reads the latest binary log data.

    • earliest-offset: The job does not scan historical data and starts to read data from the earliest available binary log.

    • specific-offset: Does not scan historical full data and starts from a binary log offset that you specify. You can specify this offset by configuring both the scan.startup.specific-offset.file and scan.startup.specific-offset.pos parameters to define the starting binary log file and offset.

    • latest-offset: When the job is started for the first time, it does not scan historical data and starts to read data from the end of the binary log (the latest binary log). This means that the job reads only the latest changes that occur after the connector is started.

    • timestamp: Does not scan historical full data. The connector starts to read the binary log from a specified timestamp. The timestamp is specified in milliseconds (ms) by the scan.startup.timestamp-millis parameter.

    Important

    When you use the earliest-offset, specific-offset, or timestamp startup mode, make sure that the schema of the corresponding table remains unchanged from the specified binary log consumption position to the time when the job starts. This prevents job failures caused by schema evolution.

    scan.startup.specific-offset.file

    No

    None

    STRING

    In specific-offset startup mode, this parameter specifies the name of the binary log file at the start offset.

    To obtain the most recent binary log file name, execute the SHOW MASTER STATUS for table_name; statement.

    scan.startup.specific-offset.pos

    No

    None

    LONG

    In specific-offset startup mode, this parameter specifies the position in the binary log file at the start offset.

    You can obtain the latest binary log position by executing the SHOW MASTER STATUS for table_name; statement.

    scan.startup.specific-offset.skip-events

    No

    None

    LONG

    The number of events to skip after the specified start offset.

    scan.startup.specific-offset.skip-rows

    No

    None

    LONG

    The number of data rows to skip after the specified start offset.

    scan.startup.timestamp-millis

    No

    None

    LONG

    When you use the specified time mode to start the job, this parameter specifies the start offset in milliseconds.

    When you use this configuration, scan.startup.mode must be set to timestamp. The timestamp is in milliseconds (ms).

    server-time-zone

    No

    None

    STRING

    The session time zone on the database server.

    Example: "Asia/Shanghai". This parameter controls, in AnalyticDB for MySQL, how the TIMESTAMP type is converted to the STRING type. If this parameter is not set, ZONELD.SYSTEMDEFAULT() is used to determine the server time zone.

    debezium.min.row.count.to.stream.result

    No

    1000

    INTEGER

    If the number of rows in a table is greater than this value, the connector streams the results.

    If you set this parameter to 0, all table size checks are skipped, and all results are always streamed during the snapshot.

    connect.timeout

    No

    30s

    DURATION

    The maximum amount of time to wait for a connection to the database server to time out before the system retries the connection.

    The default unit is seconds (s).

    connect.max-retries

    No

    3

    INTEGER

    The maximum number of retries after a connection to the database service fails.

  3. Create a destination table to store the processed data. This example uses AnalyticDB for MySQL as the destination. For more information about the connectors that Flink supports, see Supported connectors.

    CREATE TABLE target_table (
      `id` INT,
      `num` BIGINT,
      PRIMARY KEY (`id`)
    )
  4. Create a sink table to connect to the destination table that you created in the previous step. The sink table writes the processed data to the specified table in AnalyticDB for MySQL.

    CREATE TEMPORARY TABLE adb_sink (
      `id` INT,
      `num` BIGINT,
      PRIMARY KEY (`id`) NOT ENFORCED
    ) WITH (
      'connector' = 'adb3.0',
      'url' = 'jdbc:mysql://amv-2zepb9n1l58ct01z50000****.ads.aliyuncs.com:3306/flinktest',
      'userName' = 'testUser',
      'password' = 'Test12****',
      'tableName' = 'target_table'
    );

    For more information about the WITH parameters and type mappings for the sink table, see AnalyticDB for MySQL V3.0 connector.

  5. Synchronize the captured source data changes to the sink table. The sink table then writes the data to the destination.

    INSERT INTO adb_sink
    SELECT * FROM adb_source;
  6. Click Save.

  7. Click Validate.

    The validation feature checks the job's SQL semantics, network connectivity, and the metadata of the tables it uses. You can also click SQL Optimization in the results area to view SQL risk alerts and optimization suggestions.

  8. (Optional) Click Debug.

    You can use the job debugging feature to simulate a job run, check the output results, and verify the business logic of SELECT or INSERT statements. This improves development efficiency and reduces data quality risks.

  9. Click Deploy.

    After you develop and validate the job, you can deploy the job to publish the data to the production environment. After the job is deployed, you can go to the O&M page to start the job.

  10. (Optional) View information about binary logs.

    Note

    After you execute the following SQL statements to query information about binary logs, 0 is returned if you only enable the binary logging feature. Log information is displayed only after you subscribe to binary logs.

    • To query the file names and locations of the most recent binary logs, execute the following statement:

      SHOW MASTER STATUS FOR source_table;
    • To query all the uncleared historical binary logs and their sizes, execute the following statement:

      SHOW BINARY LOGS FOR source_table;

Type mappings

The following table describes the data type mappings between AnalyticDB for MySQL and Flink.

AnalyticDB for MySQL field type

Flink field type

BOOLEAN

BOOLEAN

TINYINT

TINYINT

SMALLINT

SMALLINT

INT

INT

BIGINT

BIGINT

FLOAT

FLOAT

DOUBLE

DOUBLE

DECIMAL(p,s) or NUMERIC(p,s)

DECIMAL(p,s)

VARCHAR

STRING

BINARY

BYTES

DATE

DATE

TIME

TIME

DATETIME

TIMESTAMP

TIMESTAMP

TIMESTAMP

POINT

STRING

JSON

STRING