This topic provides the DDL syntax that is used to create an ApsaraDB RDS for MySQL dimension table, describes the parameters in the WITH clause and cache parameters, and provides data type mappings and sample code.
What is ApsaraDB RDS for MySQL?
ApsaraDB RDS for MySQL is developed based on a branch of MySQL and provides excellent performance. ApsaraDB RDS for MySQL is a tried and tested solution that handled the high-volume concurrent traffic during Double 11. ApsaraDB RDS for MySQL provides basic features such as whitelist configuration, backup and restoration, Transparent Data Encryption (TDE), data migration, and management of instances, accounts, and databases. For more information about ApsaraDB RDS for MySQL, see Overview.
Prerequisites
- An ApsaraDB RDS for MySQL database and an ApsaraDB RDS for MySQL table are created. For more information, see Create databases and accounts for an ApsaraDB RDS for MySQL instance.
- An IP address whitelist is configured for the ApsaraDB RDS for MySQL database. For more information, see Use a database client or the CLI to connect to an ApsaraDB RDS for MySQL instance.
Limits
Only Flink that uses Ververica Runtime (VVR) 2.0.0 or later supports the ApsaraDB RDS for MySQL connector.
DDL syntax
CREATE TABLE rds_dim (
id1 INT,
id2 VARCHAR
) WITH (
'connector' = 'rds',
'password' = '<yourPassword>',
'tableName' = '<yourTablename>',
'url' = '<yourUrl>',
'userName' = '<yourUsername>',
'cache' = 'ALL'
);
Parameters in the WITH clause
Parameter | Description | Required | Remarks |
---|---|---|---|
connector | The type of the dimension table. | Yes | Set the value to rds .
|
password | The password that is used to access the ApsaraDB RDS for MySQL database. | Yes | N/A. |
tableName | The name of the table. | Yes | N/A. |
url | The URL of the ApsaraDB RDS for MySQL database. | Yes | The virtual private cloud (VPC) endpoint of the ApsaraDB RDS for MySQL database. The
value is an internal endpoint. For more information, see View and change the internal and public endpoints and port numbers of an ApsaraDB
RDS for MySQL instance. Set the value in the jdbc:mysql://<Internal endpoint>/<databaseName> format. Replace databaseName with the name of your database.
|
userName | The username that is used to access the ApsaraDB RDS for MySQL database. | Yes | N/A. |
maxRetryTimes | The maximum number of retries to write data to the table after data writing fails. | No | Default value: 3. |
CACHE | The cache parameters. These parameters are used to configure the cache policy, cache size, and cache timeout period. | No | For more information, see Cache parameters. |
Cache parameters
Parameter | Description | Required | Remarks |
---|---|---|---|
cache | The cache policy. | No | Valid values:
Note
|
cacheSize | The maximum number of data records that can be cached. | No |
|
cacheTTLMs | The cache timeout period. Unit: milliseconds. | No | The cacheTTLMs parameter applies configurations based on the value of the cache parameter.
|
maxJoinRows | The maximum number of results returned after each data record in the primary table is mapped to the data in the dimension table. | No | Default value: 1024. If you can estimate that each data record in the primary table
is mapped to a maximum of n data records in the dimension table, you can configure
maxJoinRows='n' to ensure efficient matching in Realtime Compute for Apache Flink.
Note When you join the primary table with a dimension table, the number of results returned
after an input data record in the primary table is mapped to the data records in the
dimension table is limited by this parameter.
|
Data type mappings
Data type of ApsaraDB RDS for MySQL | Data type of Flink |
---|---|
BOOLEAN | BOOLEAN |
TINYINT | TINYINT |
TINYINT(1) | BOOLEAN |
SMALLINT | SMALLINT |
TINYINT UNSIGNED | SMALLINT |
INT | INT |
SMALLINT UNSIGNED | INT |
BIGINT | BIGINT |
INT UNSIGNED | BIGINT |
BIGINT UNSIGNED | DECIMAL(20,0) |
FLOAT | FLOAT |
DECIMAL | DECIMAL |
DOUBLE | DOUBLE |
DATE | DATE |
TIME | TIME |
TIMESTAMP | TIMESTAMP |
VARCHAR | VARCHAR |
VARBINARY | VARBINARY |
Sample code
CREATE TEMPORARY TABLE datagen_source(
a INT,
b BIGINT,
c STRING,
`proctime` AS PROCTIME()
) with (
'connector' = 'datagen'
);
CREATE TEMPORARY TABLE rds_dim (
a INT,
b VARCHAR,
c VARCHAR
) with (
'connector' = 'rds',
'password' = '<yourPassword>',
'tableName' = '<yourTablename>',
'url' = 'jdbc:mysql://xxx',
'userName' = '<yourUsername>'
);
CREATE TEMPORARY TABLE blackhole_sink(
a INT,
b STRING
) with (
'connector' = 'blackhole'
);
insert into blackhole_sink select T.a,H.b
FROM datagen_source AS T JOIN rds_dim FOR SYSTEM_TIME AS OF T.`proctime` AS H ON T.a = H.a;