All Products
Search
Document Center

Realtime Compute for Apache Flink:AnalyticDB for MySQL V3.0

Last Updated:Mar 26, 2026

The AnalyticDB for MySQL V3.0 connector lets you read from and write to AnalyticDB for MySQL V3.0 clusters using Flink SQL. AnalyticDB for MySQL is a cloud-native data warehousing service that supports high-throughput real-time writes, low-latency analytics, and complex extract, transform, and load (ETL) operations.

The connector supports the following table types and capabilities:

Item Description
Table types Source table, dimension table, and sink table.
Note

Source tables require Ververica Runtime (VVR) 8.0.4 or later. For source table parameters, see Use Flink to subscribe to binary logs.

Running modes Streaming mode and batch mode
Data format N/A
Metrics N/A
API type SQL API
Data update and deletion in sink tables Supported

Prerequisites

Before you begin, make sure that you have:

Syntax

CREATE TEMPORARY TABLE adb_table (
  `id` INT,
  `num` BIGINT,
  PRIMARY KEY (`id`) NOT ENFORCED
) WITH (
  'connector' = 'adb3.0',
  'url' = 'jdbc:mysql://<endpoint>:<port>/<databaseName>',
  'userName' = '<yourUsername>',
  'password' = '<yourPassword>',
  'tableName' = '<yourTablename>'
);
Important

The primary key defined in the Flink DDL statement must match the primary key of the physical table in AnalyticDB for MySQL — same fields, same names, present in both. Mismatched primary keys can cause data corruption.

Key behaviors

Primary key and write mode

When a primary key is defined in the DDL, the connector operates in upsert mode, where duplicate primary keys trigger an update rather than a new insert. The exact SQL behavior depends on the replaceMode parameter:

  • replace — uses REPLACE INTO. A duplicate primary key overwrites the entire existing row.

  • upsert — uses INSERT INTO ... ON DUPLICATE KEY UPDATE. Only the specified fields are updated; other fields retain their current values.

  • insert — uses INSERT IGNORE INTO. Duplicate primary keys are silently ignored; the existing row is kept.

When no primary key is defined, the connector always uses INSERT IGNORE INTO.

replaceMode requires AnalyticDB for MySQL V3.1.3.5 or later and VVR 11.2 or later for the string values (replace, upsert, insert). Earlier VVR versions use true (equivalent to replace) and false (equivalent to upsert). VVR 11.2 and later remain compatible with true and false.

Write buffering

The connector buffers records in memory and flushes them in batches. A flush is triggered when either of the following conditions is met:

  • The number of buffered records reaches batchSize (default: 1,000) or bufferSize (default: 1,000)

  • The time since the last flush reaches flushIntervalMs (default: 3,000 ms)

Both batchSize and bufferSize take effect only when a primary key is defined.

Dimension table caching

The connector supports three cache policies for dimension tables. Caching reduces lookups against the physical table but requires extra memory.

Cache policy Behavior Use when
ALL (default) All dimension table data is loaded into memory before the job starts. Subsequent lookups hit the cache only. Data is reloaded after cacheTTLMs expires. The dimension table is small and missing keys are common
LRU Frequently accessed rows are cached. On a cache miss, the connector queries the physical table and updates the cache. Entries expire after cacheTTLMs. The dimension table is large but access is skewed to a subset of keys
None No caching. Every lookup queries the physical table directly. Low lookup volume or strict data freshness requirements
Important

With ALL caching, the connector asynchronously loads the full dimension table into memory. You must increase the memory of the node used for joining tables — the required increase is at least twice the size of the remote table — to avoid out of memory (OOM) errors. Monitor node memory usage during the job.

Parameters in the WITH clause

Common parameters

Parameter Type Required Default Description
connector String Yes Set to adb3.0
url String Yes Java Database Connectivity (JDBC) URL of the database, in the format jdbc:mysql://<endpoint>:<port>/<databaseName>. Get the endpoint and port from the Network Information section of your cluster page in the AnalyticDB for MySQL console.
userName String Yes Username for database access
password String Yes Password for database access
tableName String Yes Name of the target table in the database
maxRetryTimes Integer No 10 Maximum retries on read or write failure

Sink table parameters

Parameter Type Required Default Description
batchSize Integer No 1000 Number of records written per batch. Takes effect only when a primary key is defined.
bufferSize Integer No 1000 Maximum number of records buffered in memory before a flush is triggered. Takes effect only when a primary key is defined.
flushIntervalMs Integer No 3000 Maximum time (in milliseconds) between flushes. When this interval elapses, all buffered records are written regardless of buffer size.
ignoreDelete Boolean No false Whether to ignore delete operations. Set to true to skip deletes; false to apply them.
replaceMode Boolean No true Write mode when a primary key is defined. VVR 11.2+: replace, upsert, or insert. Earlier versions: true (same as replace) or false (same as upsert). Requires AnalyticDB for MySQL V3.1.3.5 or later. Takes effect only when a primary key is defined in the DDL; otherwise, INSERT IGNORE INTO is always used.
excludeUpdateColumns String No (empty) Comma-separated list of columns to exclude from updates when replaceMode is upsert (or false). On duplicate primary key: only the remaining columns are updated; excluded columns keep their existing values. On unique primary key: all columns are inserted. Example: excludeUpdateColumns='column1,column2'. The value must be on a single line with no line breaks.
connectionMaxActive Integer No 40 Maximum number of concurrent database connections in the thread pool

Dimension table parameters

Parameter Type Required Default Description
cache String No ALL Cache policy: None, LRU, or ALL. See Dimension table caching.
cacheSize Integer No 100000 Maximum number of cached rows. Required when cache is LRU.
cacheTTLMs Integer No Long.MAX_VALUE Cache entry lifetime in milliseconds. For LRU: entries expire after this period (default: never expires). For ALL: the full cache is reloaded after this period (default: never reloads). Not used when cache is None.
maxJoinRows Integer No 1024 Maximum number of dimension table rows matched per input record. If each input record matches at most n dimension table rows, set this to n to optimize join performance in Realtime Compute for Apache Flink.

Data type mappings

AnalyticDB for MySQL V3.0 Realtime Compute for Apache Flink
BOOLEAN BOOLEAN
TINYINT TINYINT
SMALLINT SMALLINT
INT INT
BIGINT BIGINT
FLOAT FLOAT
DOUBLE DOUBLE
DECIMAL(p, s) or NUMERIC(p, s) DECIMAL(p, s)
VARCHAR STRING
BINARY BYTES
DATE DATE
TIME TIME
DATETIME TIMESTAMP
TIMESTAMP TIMESTAMP
POINT STRING

Examples

Sink table

The following example reads from a datagen source and writes the data to an AnalyticDB for MySQL sink table.

CREATE TEMPORARY TABLE datagen_source (
  `name` VARCHAR,
  `age`  INT
) WITH (
  'connector' = 'datagen'
);

CREATE TEMPORARY TABLE adb_sink (
  `name` VARCHAR,
  `age`  INT
) WITH (
  'connector' = 'adb3.0',
  'url'       = 'jdbc:mysql://<endpoint>:<port>/<databaseName>',
  'userName'  = '<yourUsername>',
  'password'  = '<yourPassword>',
  'tableName' = '<yourTablename>'
);

INSERT INTO adb_sink
SELECT * FROM datagen_source;

Dimension table

The following example joins a datagen source with an AnalyticDB for MySQL dimension table using a temporal join. Results are written to a blackhole sink.

CREATE TEMPORARY TABLE datagen_source (
  `a`       INT,
  `b`       VARCHAR,
  `c`       STRING,
  `proctime` AS PROCTIME()
) WITH (
  'connector' = 'datagen'
);

CREATE TEMPORARY TABLE adb_dim (
  `a` INT,
  `b` VARCHAR,
  `c` VARCHAR
) WITH (
  'connector' = 'adb3.0',
  'url'       = 'jdbc:mysql://<endpoint>:<port>/<databaseName>',
  'userName'  = '<yourUsername>',
  'password'  = '<yourPassword>',
  'tableName' = '<yourTablename>'
);

CREATE TEMPORARY TABLE blackhole_sink (
  `a` INT,
  `b` VARCHAR
) WITH (
  'connector' = 'blackhole'
);

INSERT INTO blackhole_sink
SELECT T.a, H.b
FROM datagen_source AS T
JOIN adb_dim FOR SYSTEM_TIME AS OF T.proctime AS H
  ON T.a = H.a;

References