The AnalyticDB for MySQL V3.0 connector lets you read from and write to AnalyticDB for MySQL V3.0 clusters using Flink SQL. AnalyticDB for MySQL is a cloud-native data warehousing service that supports high-throughput real-time writes, low-latency analytics, and complex extract, transform, and load (ETL) operations.
The connector supports the following table types and capabilities:
| Item | Description |
|---|---|
| Table types | Source table, dimension table, and sink table. Note
Source tables require Ververica Runtime (VVR) 8.0.4 or later. For source table parameters, see Use Flink to subscribe to binary logs. |
| Running modes | Streaming mode and batch mode |
| Data format | N/A |
| Metrics | N/A |
| API type | SQL API |
| Data update and deletion in sink tables | Supported |
Prerequisites
Before you begin, make sure that you have:
-
An AnalyticDB for MySQL cluster and a table. See Create a cluster and CREATE TABLE
-
An IP address whitelist configured for the cluster. See Configure an IP address whitelist
Syntax
CREATE TEMPORARY TABLE adb_table (
`id` INT,
`num` BIGINT,
PRIMARY KEY (`id`) NOT ENFORCED
) WITH (
'connector' = 'adb3.0',
'url' = 'jdbc:mysql://<endpoint>:<port>/<databaseName>',
'userName' = '<yourUsername>',
'password' = '<yourPassword>',
'tableName' = '<yourTablename>'
);
The primary key defined in the Flink DDL statement must match the primary key of the physical table in AnalyticDB for MySQL — same fields, same names, present in both. Mismatched primary keys can cause data corruption.
Key behaviors
Primary key and write mode
When a primary key is defined in the DDL, the connector operates in upsert mode, where duplicate primary keys trigger an update rather than a new insert. The exact SQL behavior depends on the replaceMode parameter:
-
replace— usesREPLACE INTO. A duplicate primary key overwrites the entire existing row. -
upsert— usesINSERT INTO ... ON DUPLICATE KEY UPDATE. Only the specified fields are updated; other fields retain their current values. -
insert— usesINSERT IGNORE INTO. Duplicate primary keys are silently ignored; the existing row is kept.
When no primary key is defined, the connector always uses INSERT IGNORE INTO.
replaceModerequires AnalyticDB for MySQL V3.1.3.5 or later and VVR 11.2 or later for the string values (replace,upsert,insert). Earlier VVR versions usetrue(equivalent toreplace) andfalse(equivalent toupsert). VVR 11.2 and later remain compatible withtrueandfalse.
Write buffering
The connector buffers records in memory and flushes them in batches. A flush is triggered when either of the following conditions is met:
-
The number of buffered records reaches
batchSize(default: 1,000) orbufferSize(default: 1,000) -
The time since the last flush reaches
flushIntervalMs(default: 3,000 ms)
Both batchSize and bufferSize take effect only when a primary key is defined.
Dimension table caching
The connector supports three cache policies for dimension tables. Caching reduces lookups against the physical table but requires extra memory.
| Cache policy | Behavior | Use when |
|---|---|---|
ALL (default) |
All dimension table data is loaded into memory before the job starts. Subsequent lookups hit the cache only. Data is reloaded after cacheTTLMs expires. |
The dimension table is small and missing keys are common |
LRU |
Frequently accessed rows are cached. On a cache miss, the connector queries the physical table and updates the cache. Entries expire after cacheTTLMs. |
The dimension table is large but access is skewed to a subset of keys |
None |
No caching. Every lookup queries the physical table directly. | Low lookup volume or strict data freshness requirements |
With ALL caching, the connector asynchronously loads the full dimension table into memory. You must increase the memory of the node used for joining tables — the required increase is at least twice the size of the remote table — to avoid out of memory (OOM) errors. Monitor node memory usage during the job.
Parameters in the WITH clause
Common parameters
| Parameter | Type | Required | Default | Description |
|---|---|---|---|---|
connector |
String | Yes | — | Set to adb3.0 |
url |
String | Yes | — | Java Database Connectivity (JDBC) URL of the database, in the format jdbc:mysql://<endpoint>:<port>/<databaseName>. Get the endpoint and port from the Network Information section of your cluster page in the AnalyticDB for MySQL console. |
userName |
String | Yes | — | Username for database access |
password |
String | Yes | — | Password for database access |
tableName |
String | Yes | — | Name of the target table in the database |
maxRetryTimes |
Integer | No | 10 | Maximum retries on read or write failure |
Sink table parameters
| Parameter | Type | Required | Default | Description |
|---|---|---|---|---|
batchSize |
Integer | No | 1000 | Number of records written per batch. Takes effect only when a primary key is defined. |
bufferSize |
Integer | No | 1000 | Maximum number of records buffered in memory before a flush is triggered. Takes effect only when a primary key is defined. |
flushIntervalMs |
Integer | No | 3000 | Maximum time (in milliseconds) between flushes. When this interval elapses, all buffered records are written regardless of buffer size. |
ignoreDelete |
Boolean | No | false | Whether to ignore delete operations. Set to true to skip deletes; false to apply them. |
replaceMode |
Boolean | No | true |
Write mode when a primary key is defined. VVR 11.2+: replace, upsert, or insert. Earlier versions: true (same as replace) or false (same as upsert). Requires AnalyticDB for MySQL V3.1.3.5 or later. Takes effect only when a primary key is defined in the DDL; otherwise, INSERT IGNORE INTO is always used. |
excludeUpdateColumns |
String | No | (empty) | Comma-separated list of columns to exclude from updates when replaceMode is upsert (or false). On duplicate primary key: only the remaining columns are updated; excluded columns keep their existing values. On unique primary key: all columns are inserted. Example: excludeUpdateColumns='column1,column2'. The value must be on a single line with no line breaks. |
connectionMaxActive |
Integer | No | 40 | Maximum number of concurrent database connections in the thread pool |
Dimension table parameters
| Parameter | Type | Required | Default | Description |
|---|---|---|---|---|
cache |
String | No | ALL | Cache policy: None, LRU, or ALL. See Dimension table caching. |
cacheSize |
Integer | No | 100000 | Maximum number of cached rows. Required when cache is LRU. |
cacheTTLMs |
Integer | No | Long.MAX_VALUE | Cache entry lifetime in milliseconds. For LRU: entries expire after this period (default: never expires). For ALL: the full cache is reloaded after this period (default: never reloads). Not used when cache is None. |
maxJoinRows |
Integer | No | 1024 | Maximum number of dimension table rows matched per input record. If each input record matches at most n dimension table rows, set this to n to optimize join performance in Realtime Compute for Apache Flink. |
Data type mappings
| AnalyticDB for MySQL V3.0 | Realtime Compute for Apache Flink |
|---|---|
| BOOLEAN | BOOLEAN |
| TINYINT | TINYINT |
| SMALLINT | SMALLINT |
| INT | INT |
| BIGINT | BIGINT |
| FLOAT | FLOAT |
| DOUBLE | DOUBLE |
| DECIMAL(p, s) or NUMERIC(p, s) | DECIMAL(p, s) |
| VARCHAR | STRING |
| BINARY | BYTES |
| DATE | DATE |
| TIME | TIME |
| DATETIME | TIMESTAMP |
| TIMESTAMP | TIMESTAMP |
| POINT | STRING |
Examples
Sink table
The following example reads from a datagen source and writes the data to an AnalyticDB for MySQL sink table.
CREATE TEMPORARY TABLE datagen_source (
`name` VARCHAR,
`age` INT
) WITH (
'connector' = 'datagen'
);
CREATE TEMPORARY TABLE adb_sink (
`name` VARCHAR,
`age` INT
) WITH (
'connector' = 'adb3.0',
'url' = 'jdbc:mysql://<endpoint>:<port>/<databaseName>',
'userName' = '<yourUsername>',
'password' = '<yourPassword>',
'tableName' = '<yourTablename>'
);
INSERT INTO adb_sink
SELECT * FROM datagen_source;
Dimension table
The following example joins a datagen source with an AnalyticDB for MySQL dimension table using a temporal join. Results are written to a blackhole sink.
CREATE TEMPORARY TABLE datagen_source (
`a` INT,
`b` VARCHAR,
`c` STRING,
`proctime` AS PROCTIME()
) WITH (
'connector' = 'datagen'
);
CREATE TEMPORARY TABLE adb_dim (
`a` INT,
`b` VARCHAR,
`c` VARCHAR
) WITH (
'connector' = 'adb3.0',
'url' = 'jdbc:mysql://<endpoint>:<port>/<databaseName>',
'userName' = '<yourUsername>',
'password' = '<yourPassword>',
'tableName' = '<yourTablename>'
);
CREATE TEMPORARY TABLE blackhole_sink (
`a` INT,
`b` VARCHAR
) WITH (
'connector' = 'blackhole'
);
INSERT INTO blackhole_sink
SELECT T.a, H.b
FROM datagen_source AS T
JOIN adb_dim FOR SYSTEM_TIME AS OF T.proctime AS H
ON T.a = H.a;