Realtime Compute for Apache Flink can subscribe to AnalyticDB for MySQL binary logs to capture and process database changes in real time. This enables continuous data synchronization and stream computing.
This topic walks you through the full setup: enabling binary logging on a source table, uploading the connector to Flink, and creating a Flink SQL job to subscribe to and replicate changes.
Prerequisites
Before you begin, ensure that you have:
An AnalyticDB for MySQL cluster of Enterprise Edition, Basic Edition, Data Lakehouse Edition, or Data Warehouse Edition in elastic mode
An AnalyticDB for MySQL cluster running minor version 3.2.1.0 or later
To view and update the minor version, log on to the AnalyticDB for MySQL console and go to the Configuration Information section of the Cluster Information page. For details, see Update the minor version of a cluster.
A Flink workspace running Ververica Runtime (VVR) 8.0.4 or later
The AnalyticDB for MySQL cluster and the fully managed Flink workspace in the same VPC
The CIDR block of the Flink workspace added to the AnalyticDB for MySQL whitelist
Limitations
Binary logging cannot be enabled for XUANWU_V2 tables, so binary log subscriptions cannot be used for data synchronization or stream computing on XUANWU_V2 tables.
Flink can process binary logs from AnalyticDB for MySQL only for basic data types and the complex JSON data type.
Flink does not process binary log records related to DDL operations or automatic partition deletion on partitioned tables.
Step 1: Enable binary logging
Enable binary logging on the source AnalyticDB for MySQL table. The examples below use a table named source_table.
Binary logging is enabled at the table level in AnalyticDB for MySQL. Enable it only on the specific table you want to subscribe to.
When creating a new table:
CREATE TABLE source_table (
`id` INT,
`num` BIGINT,
PRIMARY KEY (`id`)
) DISTRIBUTED BY HASH (id) BINLOG=true;For an existing table:
ALTER TABLE source_table BINLOG=true;(Optional) Set the binary log retention period
The default retention period is 6 hours. To change it, use the binlog_ttl parameter:
ALTER TABLE source_table binlog_ttl='1d';binlog_ttl supports the following formats:
| Format | Example | Meaning |
|---|---|---|
| Pure number | 60 | 60 milliseconds |
Number + s | 30s | 30 seconds |
Number + h | 2h | 2 hours |
Number + d | 1d | 1 day |
Set the retention period to at least the default value (6 hours). A short retention period may cause binary logs to be deleted before they are consumed, resulting in data synchronization failures.
To check the current retention period, run:
SHOW CREATE TABLE source_table;Step 2: Upload the AnalyticDB for MySQL connector to Flink
Download the connector JAR file.
Log on to the Realtime Compute for Apache Flink console.
On the Streaming Compute Flink tab, find the workspace and click Console in the Actions column.
In the left navigation pane, click Connectors.
On the Connectors page, click Create Custom Connector.
Upload the connector JAR file and click Next.
Click Finish. The custom connector appears in the connector list.
Step 3: Subscribe to binary logs
Create a Flink SQL job
Log on to the Realtime Compute for Apache Flink console and create an SQL job.
Create a source table
Define a source table in Flink that connects to AnalyticDB for MySQL and reads binary log data from source_table:
CREATE TEMPORARY TABLE adb_source (
`id` INT,
`num` BIGINT,
PRIMARY KEY (`id`) NOT ENFORCED
) WITH (
'connector' = 'adb-mysql-cdc',
'hostname' = 'amv-2zepb9n1l58ct01z50000****.ads.aliyuncs.com',
'username' = 'testUser',
'password' = 'Test12****',
'database-name' = 'binlog',
'table-name' = 'source_table'
);The primary key defined in the Flink DDL statement must be identical to the primary key in the AnalyticDB for MySQL table, including the key name. Mismatched primary keys affect data correctness.
Required parameters
| Parameter | Type | Description |
|---|---|---|
connector | STRING | The connector type. Set to adb-mysql-cdc. |
hostname | STRING | The VPC endpoint of AnalyticDB for MySQL. |
username | STRING | The AnalyticDB for MySQL database account. |
password | STRING | The password for the database account. |
database-name | STRING | The AnalyticDB for MySQL database name. Because binary logging is table-level, only one database can be specified. |
table-name | STRING | The table name in the AnalyticDB for MySQL database. Because binary logging is table-level, only one table can be specified. |
Optional parameters
| Parameter | Default | Type | Description |
|---|---|---|---|
port | 3306 | INTEGER | The port number. |
scan.incremental.snapshot.enabled | true | BOOLEAN | Enables incremental snapshot reading. When enabled, the table is split into chunks that can be read concurrently, checkpoints are supported at chunk granularity, and no database lock is required before reading. |
scan.incremental.snapshot.chunk.size | 8096 | INTEGER | The number of rows per snapshot chunk when incremental snapshot reading is enabled. |
scan.snapshot.fetch.size | 1024 | INTEGER | The maximum number of rows fetched per snapshot read. |
scan.startup.mode | initial | STRING | The startup mode for data consumption. See Choose a startup mode below. |
scan.startup.specific-offset.file | None | STRING | In specific-offset mode, the binary log file name at the start offset. Run SHOW MASTER STATUS FOR table_name; to get the latest file name. |
scan.startup.specific-offset.pos | None | LONG | In specific-offset mode, the position in the binary log file at the start offset. Run SHOW MASTER STATUS FOR table_name; to get the latest position. |
scan.startup.specific-offset.skip-events | None | LONG | The number of events to skip after the specified start offset. |
scan.startup.specific-offset.skip-rows | None | LONG | The number of rows to skip after the specified start offset. |
scan.startup.timestamp-millis | None | LONG | In timestamp mode, the start offset in milliseconds. Requires scan.startup.mode to be set to timestamp. |
server-time-zone | None | STRING | The session time zone on the database server (for example, Asia/Shanghai). Controls how TIMESTAMP values are converted to STRING. If not set, ZONELD.SYSTEMDEFAULT() is used. |
debezium.min.row.count.to.stream.result | 1000 | INTEGER | If the row count in a table exceeds this value, results are streamed. Set to 0 to always stream regardless of table size. |
connect.timeout | 30s | DURATION | The maximum wait time for a database connection before the system retries. |
connect.max-retries | 3 | INTEGER | The maximum number of connection retries after a failure. |
Choose a startup mode
The scan.startup.mode parameter controls where Flink starts reading binary log data:
| Mode | Behavior | Use when |
|---|---|---|
initial (default) | Reads all historical data on first startup, then continues reading from the latest binary log. | Starting a new pipeline that needs full historical data plus ongoing changes. |
earliest-offset | Skips the snapshot phase and starts from the earliest available binary log. | Reading all available binary log history without a full table scan. |
latest-offset | Skips the snapshot phase and starts from the end of the binary log (changes after the connector starts). | Capturing only new changes going forward. |
specific-offset | Skips the snapshot phase and starts from a specific binary log file and position, configured with scan.startup.specific-offset.file and scan.startup.specific-offset.pos. | Resuming from a known checkpoint position. |
timestamp | Skips the snapshot phase and starts from a specified timestamp (in milliseconds), configured with scan.startup.timestamp-millis. | Replaying changes from a specific point in time. |
When using earliest-offset, specific-offset, or timestamp mode, the schema of the target table must remain unchanged from the specified start position to the time when the job starts. Schema changes between the start position and job startup cause job failures.
Create a destination table
Create the physical table in AnalyticDB for MySQL where the processed data will be stored:
CREATE TABLE target_table (
`id` INT,
`num` BIGINT,
PRIMARY KEY (`id`)
)Create a sink table
Define a Flink sink table that writes processed data to the destination table in AnalyticDB for MySQL:
CREATE TEMPORARY TABLE adb_sink (
`id` INT,
`num` BIGINT,
PRIMARY KEY (`id`) NOT ENFORCED
) WITH (
'connector' = 'adb3.0',
'url' = 'jdbc:mysql://amv-2zepb9n1l58ct01z50000****.ads.aliyuncs.com:3306/flinktest',
'userName' = 'testUser',
'password' = 'Test12****',
'tableName' = 'target_table'
);For sink table WITH parameters and type mappings, see AnalyticDB for MySQL V3.0 connector.
Sync changes from source to sink
Write a query that reads from the source table and writes to the sink table:
INSERT INTO adb_sink
SELECT * FROM adb_source;Validate and deploy the job
Click Save.
Click Validate. Validation checks the job's SQL semantics, network connectivity, and table metadata. In the results area, click SQL Advice to view risk alerts and optimization suggestions.
(Optional) Click Debug to simulate the job, check output results, and verify the business logic of SELECT or INSERT statements. For details, see Debug a deployment.
Click Deploy. After deployment, go to the O&M page to start the job. For details on deploying, see Create a deployment.
Verify binary log status
Use the following SQL statements to check binary log status. These statements return meaningful results only after you have both enabled binary logging and started a subscription — enabling binary logging alone returns 0.
| Query | When to use |
|---|---|
SHOW MASTER STATUS FOR source_table; | Check the latest binary log file name and position after starting the subscription. Also use this to retrieve values for scan.startup.specific-offset.file and scan.startup.specific-offset.pos. |
SHOW BINARY LOGS FOR source_table; | List all uncleared historical binary logs and their sizes. |
SHOW CREATE TABLE source_table; | Check the current binary log retention period (binlog_ttl). |
Type mappings
The following table describes the data type mappings between AnalyticDB for MySQL and Flink.
| AnalyticDB for MySQL type | Flink type |
|---|---|
| BOOLEAN | BOOLEAN |
| TINYINT | TINYINT |
| SMALLINT | SMALLINT |
| INT | INT |
| BIGINT | BIGINT |
| FLOAT | FLOAT |
| DOUBLE | DOUBLE |
| DECIMAL(p,s) or NUMERIC(p,s) | DECIMAL(p,s) |
| VARCHAR | STRING |
| BINARY | BYTES |
| DATE | DATE |
| TIME | TIME |
| DATETIME | TIMESTAMP |
| TIMESTAMP | TIMESTAMP |
| POINT | STRING |
| JSON | STRING |
What's next
Supported connectors: Browse all connectors available in Realtime Compute for Apache Flink, including options for using a different destination.
AnalyticDB for MySQL V3.0 connector: Full reference for the sink table connector parameters and type mappings.