All Products
Search
Document Center

AnalyticDB:Use Flink to subscribe to binary logs

Last Updated:Mar 28, 2026

Realtime Compute for Apache Flink can subscribe to AnalyticDB for MySQL binary logs to capture and process database changes in real time. This enables continuous data synchronization and stream computing.

This topic walks you through the full setup: enabling binary logging on a source table, uploading the connector to Flink, and creating a Flink SQL job to subscribe to and replicate changes.

Prerequisites

Before you begin, ensure that you have:

  • An AnalyticDB for MySQL cluster of Enterprise Edition, Basic Edition, Data Lakehouse Edition, or Data Warehouse Edition in elastic mode

  • An AnalyticDB for MySQL cluster running minor version 3.2.1.0 or later

    To view and update the minor version, log on to the AnalyticDB for MySQL console and go to the Configuration Information section of the Cluster Information page. For details, see Update the minor version of a cluster.
  • A Flink workspace running Ververica Runtime (VVR) 8.0.4 or later

  • The AnalyticDB for MySQL cluster and the fully managed Flink workspace in the same VPC

  • The CIDR block of the Flink workspace added to the AnalyticDB for MySQL whitelist

Limitations

  • Binary logging cannot be enabled for XUANWU_V2 tables, so binary log subscriptions cannot be used for data synchronization or stream computing on XUANWU_V2 tables.

  • Flink can process binary logs from AnalyticDB for MySQL only for basic data types and the complex JSON data type.

  • Flink does not process binary log records related to DDL operations or automatic partition deletion on partitioned tables.

Step 1: Enable binary logging

Enable binary logging on the source AnalyticDB for MySQL table. The examples below use a table named source_table.

Binary logging is enabled at the table level in AnalyticDB for MySQL. Enable it only on the specific table you want to subscribe to.

When creating a new table:

CREATE TABLE source_table (
  `id` INT,
  `num` BIGINT,
  PRIMARY KEY (`id`)
) DISTRIBUTED BY HASH (id) BINLOG=true;

For an existing table:

ALTER TABLE source_table BINLOG=true;

(Optional) Set the binary log retention period

The default retention period is 6 hours. To change it, use the binlog_ttl parameter:

ALTER TABLE source_table binlog_ttl='1d';

binlog_ttl supports the following formats:

FormatExampleMeaning
Pure number6060 milliseconds
Number + s30s30 seconds
Number + h2h2 hours
Number + d1d1 day
Important

Set the retention period to at least the default value (6 hours). A short retention period may cause binary logs to be deleted before they are consumed, resulting in data synchronization failures.

To check the current retention period, run:

SHOW CREATE TABLE source_table;

Step 2: Upload the AnalyticDB for MySQL connector to Flink

  1. Download the connector JAR file.

  2. Log on to the Realtime Compute for Apache Flink console.

  3. On the Streaming Compute Flink tab, find the workspace and click Console in the Actions column.

  4. In the left navigation pane, click Connectors.

  5. On the Connectors page, click Create Custom Connector.

  6. Upload the connector JAR file and click Next.

  7. Click Finish. The custom connector appears in the connector list.

Step 3: Subscribe to binary logs

Create a Flink SQL job

Log on to the Realtime Compute for Apache Flink console and create an SQL job.

Create a source table

Define a source table in Flink that connects to AnalyticDB for MySQL and reads binary log data from source_table:

CREATE TEMPORARY TABLE adb_source (
  `id` INT,
  `num` BIGINT,
  PRIMARY KEY (`id`) NOT ENFORCED
) WITH (
  'connector' = 'adb-mysql-cdc',
  'hostname' = 'amv-2zepb9n1l58ct01z50000****.ads.aliyuncs.com',
  'username' = 'testUser',
  'password' = 'Test12****',
  'database-name' = 'binlog',
  'table-name' = 'source_table'
);
The primary key defined in the Flink DDL statement must be identical to the primary key in the AnalyticDB for MySQL table, including the key name. Mismatched primary keys affect data correctness.

Required parameters

ParameterTypeDescription
connectorSTRINGThe connector type. Set to adb-mysql-cdc.
hostnameSTRINGThe VPC endpoint of AnalyticDB for MySQL.
usernameSTRINGThe AnalyticDB for MySQL database account.
passwordSTRINGThe password for the database account.
database-nameSTRINGThe AnalyticDB for MySQL database name. Because binary logging is table-level, only one database can be specified.
table-nameSTRINGThe table name in the AnalyticDB for MySQL database. Because binary logging is table-level, only one table can be specified.

Optional parameters

ParameterDefaultTypeDescription
port3306INTEGERThe port number.
scan.incremental.snapshot.enabledtrueBOOLEANEnables incremental snapshot reading. When enabled, the table is split into chunks that can be read concurrently, checkpoints are supported at chunk granularity, and no database lock is required before reading.
scan.incremental.snapshot.chunk.size8096INTEGERThe number of rows per snapshot chunk when incremental snapshot reading is enabled.
scan.snapshot.fetch.size1024INTEGERThe maximum number of rows fetched per snapshot read.
scan.startup.modeinitialSTRINGThe startup mode for data consumption. See Choose a startup mode below.
scan.startup.specific-offset.fileNoneSTRINGIn specific-offset mode, the binary log file name at the start offset. Run SHOW MASTER STATUS FOR table_name; to get the latest file name.
scan.startup.specific-offset.posNoneLONGIn specific-offset mode, the position in the binary log file at the start offset. Run SHOW MASTER STATUS FOR table_name; to get the latest position.
scan.startup.specific-offset.skip-eventsNoneLONGThe number of events to skip after the specified start offset.
scan.startup.specific-offset.skip-rowsNoneLONGThe number of rows to skip after the specified start offset.
scan.startup.timestamp-millisNoneLONGIn timestamp mode, the start offset in milliseconds. Requires scan.startup.mode to be set to timestamp.
server-time-zoneNoneSTRINGThe session time zone on the database server (for example, Asia/Shanghai). Controls how TIMESTAMP values are converted to STRING. If not set, ZONELD.SYSTEMDEFAULT() is used.
debezium.min.row.count.to.stream.result1000INTEGERIf the row count in a table exceeds this value, results are streamed. Set to 0 to always stream regardless of table size.
connect.timeout30sDURATIONThe maximum wait time for a database connection before the system retries.
connect.max-retries3INTEGERThe maximum number of connection retries after a failure.

Choose a startup mode

The scan.startup.mode parameter controls where Flink starts reading binary log data:

ModeBehaviorUse when
initial (default)Reads all historical data on first startup, then continues reading from the latest binary log.Starting a new pipeline that needs full historical data plus ongoing changes.
earliest-offsetSkips the snapshot phase and starts from the earliest available binary log.Reading all available binary log history without a full table scan.
latest-offsetSkips the snapshot phase and starts from the end of the binary log (changes after the connector starts).Capturing only new changes going forward.
specific-offsetSkips the snapshot phase and starts from a specific binary log file and position, configured with scan.startup.specific-offset.file and scan.startup.specific-offset.pos.Resuming from a known checkpoint position.
timestampSkips the snapshot phase and starts from a specified timestamp (in milliseconds), configured with scan.startup.timestamp-millis.Replaying changes from a specific point in time.
Important

When using earliest-offset, specific-offset, or timestamp mode, the schema of the target table must remain unchanged from the specified start position to the time when the job starts. Schema changes between the start position and job startup cause job failures.

Create a destination table

Create the physical table in AnalyticDB for MySQL where the processed data will be stored:

CREATE TABLE target_table (
  `id` INT,
  `num` BIGINT,
  PRIMARY KEY (`id`)
)

Create a sink table

Define a Flink sink table that writes processed data to the destination table in AnalyticDB for MySQL:

CREATE TEMPORARY TABLE adb_sink (
  `id` INT,
  `num` BIGINT,
  PRIMARY KEY (`id`) NOT ENFORCED
) WITH (
  'connector' = 'adb3.0',
  'url' = 'jdbc:mysql://amv-2zepb9n1l58ct01z50000****.ads.aliyuncs.com:3306/flinktest',
  'userName' = 'testUser',
  'password' = 'Test12****',
  'tableName' = 'target_table'
);

For sink table WITH parameters and type mappings, see AnalyticDB for MySQL V3.0 connector.

Sync changes from source to sink

Write a query that reads from the source table and writes to the sink table:

INSERT INTO adb_sink
SELECT * FROM adb_source;

Validate and deploy the job

  1. Click Save.

  2. Click Validate. Validation checks the job's SQL semantics, network connectivity, and table metadata. In the results area, click SQL Advice to view risk alerts and optimization suggestions.

  3. (Optional) Click Debug to simulate the job, check output results, and verify the business logic of SELECT or INSERT statements. For details, see Debug a deployment.

  4. Click Deploy. After deployment, go to the O&M page to start the job. For details on deploying, see Create a deployment.

Verify binary log status

Use the following SQL statements to check binary log status. These statements return meaningful results only after you have both enabled binary logging and started a subscription — enabling binary logging alone returns 0.

QueryWhen to use
SHOW MASTER STATUS FOR source_table;Check the latest binary log file name and position after starting the subscription. Also use this to retrieve values for scan.startup.specific-offset.file and scan.startup.specific-offset.pos.
SHOW BINARY LOGS FOR source_table;List all uncleared historical binary logs and their sizes.
SHOW CREATE TABLE source_table;Check the current binary log retention period (binlog_ttl).

Type mappings

The following table describes the data type mappings between AnalyticDB for MySQL and Flink.

AnalyticDB for MySQL typeFlink type
BOOLEANBOOLEAN
TINYINTTINYINT
SMALLINTSMALLINT
INTINT
BIGINTBIGINT
FLOATFLOAT
DOUBLEDOUBLE
DECIMAL(p,s) or NUMERIC(p,s)DECIMAL(p,s)
VARCHARSTRING
BINARYBYTES
DATEDATE
TIMETIME
DATETIMETIMESTAMP
TIMESTAMPTIMESTAMP
POINTSTRING
JSONSTRING

What's next