This topic provides the DDL syntax that is used to create an AnalyticDB for MySQL V3.0 result table, describes the parameters in the WITH clause, and provides data type mappings.
What is AnalyticDB for MySQL?
AnalyticDB for MySQL is a cloud-native enterprise-class data warehousing service that integrates database and big data technologies. AnalyticDB for MySQL supports high-throughput real-time data addition, removal, and modification, low-latency real-time analysis, and complex extract, transform, load (ETL) operations. AnalyticDB for MySQL is compatible with upstream and downstream ecosystem tools and can be used to build enterprise-class report systems, data warehouses, and data service engines.
Prerequisites
- An AnalyticDB for MySQL cluster and an AnalyticDB for MySQL table are created. For more information, see Create a cluster and CREATE TABLE.
- A whitelist is configured for the AnalyticDB for MySQL cluster. For more information, see Configure a whitelist.
Limits
Only Flink that uses Ververica Runtime (VVR) 2.0.0 or later supports AnalyticDB for MySQL V3.0 connectors.
DDL syntax
CREATE TABLE adb_sink (
id INT,
num BIGINT,
PRIMARY KEY (id) NOT ENFORCED
) WITH (
'connector' = 'adb3.0',
'tableName' = '<yourTablename>',
'userName' = '<yourUsername>',
'password' = '<yourPassword>',
'url' = 'jdbc:mysql://<yourNetworkAddress>:<PortId>/<yourDatabaseName>'
);
Parameters in the WITH clause
Parameter | Description | Required | Remarks |
---|---|---|---|
connector | The type of the result table. | Yes | Set the value to adb3.0 .
|
tableName | The name of the table. | Yes | N/A. |
username | The username that is used to access the AnalyticDB for MySQL database. | Yes | N/A. |
password | The password that is used to access the AnalyticDB for MySQL database. | Yes | N/A. |
url | The Java Database Connectivity (JDBC) URL of the database. | Yes | The URL of the AnalyticDB for MySQL database, such as url='jdbc:mysql://databaseName****-cn-shenzhen-a.ads.aliyuncs.com:10014/databaseName' .
Note
|
maxRetryTimes | The maximum number of retries to write data to the table after data fails to be written. | No | The default value of this parameter varies based on the VVR version of Flink:
|
batchSize | The number of data records that can be written at a time. | No | The default value of this parameter varies based on the VVR version of Flink:
|
bufferSize | The maximum number of data records that can be cached in the memory. Write operations are triggered if the value of the batchSize or bufferSize parameter reaches the specified threshold. | No | Default value: 1000.
Note
|
flushIntervalMs | The interval at which the cache is cleared. This value indicates that if the number of input data records does not reach the value specified by the batchSize parameter within the specified time, all cached data is written to the result table. | No | Unit: milliseconds. The default value of this parameter varies based on the VVR version
of Flink:
|
ignoreDelete | Specifies whether to ignore delete operations. | No | Valid values:
Note Only Flink that uses VVR 4.0.10 or later supports this parameter.
|
replaceMode | Specifies whether to use the REPLACE INTO statement to insert data into the table. | No | Valid values:
Note
|
excludeUpdateColumns | The fields that are not updated when data that has the same primary key is updated. | No | If you specify multiple such fields, use commas (,) to separate these fields. Example:
excludeUpdateColumns=column1,column2 .
Note This parameter takes effect only when
replaceMode is set to false. If replaceMode is set to true, the values of the fields specified by this parameter are changed
to null.
|
connectionMaxActive | The maximum size of the thread pool. | No | Default value: 40.
Note Only Flink that uses VVR 4.0.10 or later supports this parameter.
|
Data type mappings
Data type of AnalyticDB for MySQL V3.0 | Data type of Flink |
---|---|
BOOLEAN | BOOLEAN |
TINYINT | INT |
SMALLINT | INT |
INT | INT |
BIGINT | BIGINT |
DOUBLE | DOUBLE |
VARCHAR | VARCHAR |
DATETIME | TIMESTAMP |
DATE | DATE |
Sample code
CREATE TEMPORARY TABLE datagen_source (
`name` VARCHAR,
`age` INT
) WITH (
'connector' = 'datagen'
);
CREATE TEMPORARY TABLE adb_sink (
`name` VARCHAR,
`age` INT
) WITH (
'connector' = 'adb3.0',
'password' = '<yourPassword>',
'tableName' = '<yourTablename>',
'url' = '<yourUrl>',
'userName' = '<yourUsername>'
);
INSERT INTO adb_sink
SELECT * FROM datagen_source;