The ApsaraDB RDS for MySQL connector will not be supported in the future. Use the MySQL connector instead.
The ApsaraDB RDS for MySQL connector lets you write Flink SQL output to an ApsaraDB RDS for MySQL sink table or join a stream against an ApsaraDB RDS for MySQL dimension table.
Supported table types: Sink table · Dimension table
Supported running modes: Batch mode · Streaming mode
API type: SQL
Data updates and deletions in sink tables: Supported
Prerequisites
Before you begin, ensure that you have:
-
An ApsaraDB RDS for MySQL database and table. See Create databases and accounts for an ApsaraDB RDS for MySQL instance
-
An IP address whitelist configured for the database. See Connect to an ApsaraDB RDS for MySQL instance using a database client or the CLI
Limitations
-
Requires Realtime Compute for Apache Flink using Ververica Runtime (VVR) 2.0.0 or later. For best performance and stability, use VVR 6.X or later.
-
Only ApsaraDB RDS for MySQL databases are supported.
-
The connector uses at-least-once semantics. If the sink table has a primary key, idempotence ensures data correctness.
How it works
Sink write behavior
Each output row is converted to a SQL statement before being written to the sink table:
-
No primary key — runs
INSERT INTO table_name (col1, col2, ...) VALUES (val1, val2, ...); -
With primary key — runs
INSERT INTO table_name (col1, col2, ...) VALUES (val1, val2, ...) ON DUPLICATE KEY UPDATE col1 = VALUES(col1), col2 = VALUES(col2), ...;
Unique index conflicts: If the physical table has a unique index constraint in addition to the primary key, inserting two rows with different primary keys but the same unique index value causes the earlier row to be overwritten, resulting in data loss.
Auto-increment primary keys: Do not declare auto-increment fields in the Flink DDL. The database assigns these values automatically. The connector can write and delete rows with auto-increment fields, but cannot update them.
Dimension table cache policies
The connector supports three cache policies for dimension table lookups:
| Policy | Behavior | When to use |
|---|---|---|
NONE |
No caching — every lookup queries the database directly | Low-latency requirements, small datasets |
LRU |
Caches a fixed number of recently used rows per task manager | Frequently accessed subsets of large tables |
ALL |
Loads the entire table into memory and reloads it periodically | Small, static reference tables |
Syntax
Sink table
CREATE TABLE rds_sink (
id INT,
num BIGINT,
PRIMARY KEY (id) NOT ENFORCED
) WITH (
'connector' = 'rds',
'tableName' = '<your-table-name>',
'userName' = '<your-user-name>',
'password' = '<your-password>',
'url' = 'jdbc:mysql://<internal-endpoint>:<port>/<database-name>?rewriteBatchedStatements=true'
);
Append ?rewriteBatchedStatements=true to the url value for sink tables to improve write throughput.
Dimension table
CREATE TABLE rds_dim (
id1 INT,
id2 VARCHAR
) WITH (
'connector' = 'rds',
'tableName' = '<your-table-name>',
'userName' = '<your-user-name>',
'password' = '<your-password>',
'url' = 'jdbc:mysql://<internal-endpoint>:<port>/<database-name>',
'cache' = 'NONE'
);
Parameters in the WITH clause
Common parameters
| Parameter | Type | Required | Default | Description |
|---|---|---|---|---|
connector |
STRING | Yes | — | Set to rds |
tableName |
STRING | Yes | — | Name of the physical table in ApsaraDB RDS for MySQL |
userName |
STRING | Yes | — | Database username |
password |
STRING | Yes | — | Database password |
url |
STRING | Yes | — | Virtual private cloud (VPC) endpoint of the database, in the format jdbc:mysql://<internal-endpoint>:<port>/<database-name>. For sink tables, append ?rewriteBatchedStatements=true. For endpoint details, see View and change the internal and public endpoints and port numbers of an ApsaraDB RDS for MySQL instance |
maxRetryTimes |
INTEGER | No | 10 (VVR 4.0.7+), 3 (VVR 4.0.6 and earlier) | Maximum retries for failed dimension table lookups or sink writes |
Sink table parameters
| Parameter | Type | Required | Default | Description |
|---|---|---|---|---|
batchSize |
INTEGER | No | 4096 (VVR 4.0.7+), 5000 (VVR 4.0.0–4.0.6), 100 (VVR 3.x and earlier) | Number of rows written per batch |
bufferSize |
INTEGER | No | 10000 | Maximum rows cached in memory before a write is triggered. Supported in VVR 4.0.7 and later. Takes effect only when a primary key is defined |
flushIntervalMs |
INTEGER | No | 2000 (VVR 4.0.7+), 0 (VVR 4.0.0–4.0.6), 1000 (VVR 3.x and earlier) | Interval in milliseconds at which the buffer is flushed to the sink table, regardless of whether batchSize or bufferSize thresholds are reached. If set to 0 (the default for VVR 4.0.0–4.0.6), small amounts of buffered data may never be written — upgrade to a later VVR version to avoid this |
ignoreDelete |
BOOLEAN | No | false | Set to true to skip delete operations. Useful when multiple operators update different fields of the same row — without this setting, a delete in one operator followed by a partial update in another leaves un-updated fields as null or their default values |
connectionMaxActive |
INTEGER | No | 40 | Connection pool size. Supported in VVR 4.0.7 and later. Increase this value if connection pool timeouts occur; decrease it if the database limits the number of concurrent connections |
Dimension table parameters
| Parameter | Type | Required | Default | Description |
|---|---|---|---|---|
cache |
STRING | No | NONE (VVR earlier than 4.0.6), ALL (VVR 4.0.6+) | Cache policy. Valid values: NONE, LRU, ALL. See Cache policies |
cacheSize |
INTEGER | No | 100000 | Maximum number of rows to cache. Required when cache is set to LRU; ignored for NONE and ALL |
cacheTTLMs |
LONG | No | No expiry for NONE and LRU; no reload for ALL |
Cache time-to-live in milliseconds. For LRU, rows expire after this period. For ALL, the entire cache reloads at this interval |
maxJoinRows |
INTEGER | No | 1024 | Maximum number of dimension table rows matched per input row. Set this to the maximum number of dimension rows expected per primary table row to avoid unnecessary scanning |
Metrics
The sink table exposes the following metrics. Dimension tables have no metrics.
| Metric | Description |
|---|---|
numRecordsOut |
Total rows written |
numRecordsOutPerSecond |
Rows written per second |
numBytesOut |
Total bytes written |
numBytesOutPerSecond |
Bytes written per second |
currentSendTime |
Current write latency |
numRecordsOutErrors |
Total write errors |
For metric definitions, see Metrics.
Data type mappings
| Flink type | ApsaraDB RDS for MySQL type |
|---|---|
| BOOLEAN | BOOLEAN |
| TINYINT | TINYINT |
| TINYINT(1) (dimension tables only) | BOOLEAN |
| SMALLINT | SMALLINT |
| SMALLINT | TINYINT UNSIGNED |
| INT | INT |
| INT | SMALLINT UNSIGNED |
| BIGINT | BIGINT |
| BIGINT | INT UNSIGNED |
| DECIMAL(20, 0) | BIGINT UNSIGNED |
| FLOAT | FLOAT |
| DECIMAL | DECIMAL |
| DOUBLE | DOUBLE |
| DATE | DATE |
| TIME | TIME |
| TIMESTAMP | TIMESTAMP |
| VARCHAR | VARCHAR |
| VARBINARY | VARBINARY |
Examples
Sink table example
The following example reads from a DataGen source and writes to an ApsaraDB RDS for MySQL sink table.
CREATE TEMPORARY TABLE datagen_source (
`name` VARCHAR,
`age` INT
) WITH (
'connector' = 'datagen'
);
CREATE TEMPORARY TABLE rds_sink (
`name` VARCHAR,
`age` INT
) WITH (
'connector' = 'rds',
'tableName' = '<your-table-name>',
'userName' = '<your-user-name>',
'password' = '<your-password>',
'url' = 'jdbc:mysql://<internal-endpoint>:<port>/<database-name>?rewriteBatchedStatements=true'
);
INSERT INTO rds_sink
SELECT * FROM datagen_source;
Dimension table example
The following example joins a stream against an ApsaraDB RDS for MySQL dimension table using a temporal join.
CREATE TEMPORARY TABLE datagen_source (
a INT,
b BIGINT,
c STRING,
`proctime` AS PROCTIME()
) WITH (
'connector' = 'datagen'
);
CREATE TEMPORARY TABLE rds_dim (
a INT,
b VARCHAR,
c VARCHAR
) WITH (
'connector' = 'rds',
'tableName' = '<your-table-name>',
'userName' = '<your-user-name>',
'password' = '<your-password>',
'url' = 'jdbc:mysql://<internal-endpoint>:<port>/<database-name>'
);
CREATE TEMPORARY TABLE blackhole_sink (
a INT,
b STRING
) WITH (
'connector' = 'blackhole'
);
INSERT INTO blackhole_sink
SELECT T.a, H.b
FROM datagen_source AS T
JOIN rds_dim FOR SYSTEM_TIME AS OF T.`proctime` AS H
ON T.a = H.a;
FAQ
What's next
-
MySQL connector — the recommended replacement for this connector
-
ApsaraDB RDS for MySQL — product overview and feature documentation
-
Metrics — definitions for all connector metrics