All Products
Search
Document Center

Realtime Compute for Apache Flink:ApsaraDB RDS for MySQL

Last Updated:Mar 26, 2026
Important

The ApsaraDB RDS for MySQL connector will not be supported in the future. Use the MySQL connector instead.

The ApsaraDB RDS for MySQL connector lets you write Flink SQL output to an ApsaraDB RDS for MySQL sink table or join a stream against an ApsaraDB RDS for MySQL dimension table.

Supported table types: Sink table · Dimension table

Supported running modes: Batch mode · Streaming mode

API type: SQL

Data updates and deletions in sink tables: Supported

Prerequisites

Before you begin, ensure that you have:

Limitations

  • Requires Realtime Compute for Apache Flink using Ververica Runtime (VVR) 2.0.0 or later. For best performance and stability, use VVR 6.X or later.

  • Only ApsaraDB RDS for MySQL databases are supported.

  • The connector uses at-least-once semantics. If the sink table has a primary key, idempotence ensures data correctness.

How it works

Sink write behavior

Each output row is converted to a SQL statement before being written to the sink table:

  • No primary key — runs INSERT INTO table_name (col1, col2, ...) VALUES (val1, val2, ...);

  • With primary key — runs INSERT INTO table_name (col1, col2, ...) VALUES (val1, val2, ...) ON DUPLICATE KEY UPDATE col1 = VALUES(col1), col2 = VALUES(col2), ...;

Unique index conflicts: If the physical table has a unique index constraint in addition to the primary key, inserting two rows with different primary keys but the same unique index value causes the earlier row to be overwritten, resulting in data loss.

Auto-increment primary keys: Do not declare auto-increment fields in the Flink DDL. The database assigns these values automatically. The connector can write and delete rows with auto-increment fields, but cannot update them.

Dimension table cache policies

The connector supports three cache policies for dimension table lookups:

Policy Behavior When to use
NONE No caching — every lookup queries the database directly Low-latency requirements, small datasets
LRU Caches a fixed number of recently used rows per task manager Frequently accessed subsets of large tables
ALL Loads the entire table into memory and reloads it periodically Small, static reference tables

Syntax

Sink table

CREATE TABLE rds_sink (
  id  INT,
  num BIGINT,
  PRIMARY KEY (id) NOT ENFORCED
) WITH (
  'connector'  = 'rds',
  'tableName'  = '<your-table-name>',
  'userName'   = '<your-user-name>',
  'password'   = '<your-password>',
  'url'        = 'jdbc:mysql://<internal-endpoint>:<port>/<database-name>?rewriteBatchedStatements=true'
);
Note

Append ?rewriteBatchedStatements=true to the url value for sink tables to improve write throughput.

Dimension table

CREATE TABLE rds_dim (
  id1 INT,
  id2 VARCHAR
) WITH (
  'connector' = 'rds',
  'tableName' = '<your-table-name>',
  'userName'  = '<your-user-name>',
  'password'  = '<your-password>',
  'url'       = 'jdbc:mysql://<internal-endpoint>:<port>/<database-name>',
  'cache'     = 'NONE'
);

Parameters in the WITH clause

Common parameters

Parameter Type Required Default Description
connector STRING Yes Set to rds
tableName STRING Yes Name of the physical table in ApsaraDB RDS for MySQL
userName STRING Yes Database username
password STRING Yes Database password
url STRING Yes Virtual private cloud (VPC) endpoint of the database, in the format jdbc:mysql://<internal-endpoint>:<port>/<database-name>. For sink tables, append ?rewriteBatchedStatements=true. For endpoint details, see View and change the internal and public endpoints and port numbers of an ApsaraDB RDS for MySQL instance
maxRetryTimes INTEGER No 10 (VVR 4.0.7+), 3 (VVR 4.0.6 and earlier) Maximum retries for failed dimension table lookups or sink writes

Sink table parameters

Parameter Type Required Default Description
batchSize INTEGER No 4096 (VVR 4.0.7+), 5000 (VVR 4.0.0–4.0.6), 100 (VVR 3.x and earlier) Number of rows written per batch
bufferSize INTEGER No 10000 Maximum rows cached in memory before a write is triggered. Supported in VVR 4.0.7 and later. Takes effect only when a primary key is defined
flushIntervalMs INTEGER No 2000 (VVR 4.0.7+), 0 (VVR 4.0.0–4.0.6), 1000 (VVR 3.x and earlier) Interval in milliseconds at which the buffer is flushed to the sink table, regardless of whether batchSize or bufferSize thresholds are reached. If set to 0 (the default for VVR 4.0.0–4.0.6), small amounts of buffered data may never be written — upgrade to a later VVR version to avoid this
ignoreDelete BOOLEAN No false Set to true to skip delete operations. Useful when multiple operators update different fields of the same row — without this setting, a delete in one operator followed by a partial update in another leaves un-updated fields as null or their default values
connectionMaxActive INTEGER No 40 Connection pool size. Supported in VVR 4.0.7 and later. Increase this value if connection pool timeouts occur; decrease it if the database limits the number of concurrent connections

Dimension table parameters

Parameter Type Required Default Description
cache STRING No NONE (VVR earlier than 4.0.6), ALL (VVR 4.0.6+) Cache policy. Valid values: NONE, LRU, ALL. See Cache policies
cacheSize INTEGER No 100000 Maximum number of rows to cache. Required when cache is set to LRU; ignored for NONE and ALL
cacheTTLMs LONG No No expiry for NONE and LRU; no reload for ALL Cache time-to-live in milliseconds. For LRU, rows expire after this period. For ALL, the entire cache reloads at this interval
maxJoinRows INTEGER No 1024 Maximum number of dimension table rows matched per input row. Set this to the maximum number of dimension rows expected per primary table row to avoid unnecessary scanning

Metrics

The sink table exposes the following metrics. Dimension tables have no metrics.

Metric Description
numRecordsOut Total rows written
numRecordsOutPerSecond Rows written per second
numBytesOut Total bytes written
numBytesOutPerSecond Bytes written per second
currentSendTime Current write latency
numRecordsOutErrors Total write errors

For metric definitions, see Metrics.

Data type mappings

Flink type ApsaraDB RDS for MySQL type
BOOLEAN BOOLEAN
TINYINT TINYINT
TINYINT(1) (dimension tables only) BOOLEAN
SMALLINT SMALLINT
SMALLINT TINYINT UNSIGNED
INT INT
INT SMALLINT UNSIGNED
BIGINT BIGINT
BIGINT INT UNSIGNED
DECIMAL(20, 0) BIGINT UNSIGNED
FLOAT FLOAT
DECIMAL DECIMAL
DOUBLE DOUBLE
DATE DATE
TIME TIME
TIMESTAMP TIMESTAMP
VARCHAR VARCHAR
VARBINARY VARBINARY

Examples

Sink table example

The following example reads from a DataGen source and writes to an ApsaraDB RDS for MySQL sink table.

CREATE TEMPORARY TABLE datagen_source (
  `name` VARCHAR,
  `age`  INT
) WITH (
  'connector' = 'datagen'
);

CREATE TEMPORARY TABLE rds_sink (
  `name` VARCHAR,
  `age`  INT
) WITH (
  'connector' = 'rds',
  'tableName' = '<your-table-name>',
  'userName'  = '<your-user-name>',
  'password'  = '<your-password>',
  'url'       = 'jdbc:mysql://<internal-endpoint>:<port>/<database-name>?rewriteBatchedStatements=true'
);

INSERT INTO rds_sink
SELECT * FROM datagen_source;

Dimension table example

The following example joins a stream against an ApsaraDB RDS for MySQL dimension table using a temporal join.

CREATE TEMPORARY TABLE datagen_source (
  a          INT,
  b          BIGINT,
  c          STRING,
  `proctime` AS PROCTIME()
) WITH (
  'connector' = 'datagen'
);

CREATE TEMPORARY TABLE rds_dim (
  a INT,
  b VARCHAR,
  c VARCHAR
) WITH (
  'connector' = 'rds',
  'tableName' = '<your-table-name>',
  'userName'  = '<your-user-name>',
  'password'  = '<your-password>',
  'url'       = 'jdbc:mysql://<internal-endpoint>:<port>/<database-name>'
);

CREATE TEMPORARY TABLE blackhole_sink (
  a INT,
  b STRING
) WITH (
  'connector' = 'blackhole'
);

INSERT INTO blackhole_sink
SELECT T.a, H.b
FROM datagen_source AS T
JOIN rds_dim FOR SYSTEM_TIME AS OF T.`proctime` AS H
ON T.a = H.a;

FAQ

What's next