This topic provides the DDL syntax that is used to create an AnalyticDB for MySQL V3.0 result table, describes the parameters in the WITH clause, and provides data type mappings.

What is AnalyticDB for MySQL?

AnalyticDB for MySQL is a cloud-native enterprise-class data warehousing service that integrates database and big data technologies. AnalyticDB for MySQL supports high-throughput real-time data addition, removal, and modification, low-latency real-time analysis, and complex extract, transform, load (ETL) operations. AnalyticDB for MySQL is compatible with upstream and downstream ecosystem tools and can be used to build enterprise-class report systems, data warehouses, and data service engines.

Prerequisites

  • An AnalyticDB for MySQL cluster and an AnalyticDB for MySQL table are created. For more information, see Create a cluster and CREATE TABLE.
  • A whitelist is configured for the AnalyticDB for MySQL cluster. For more information, see Configure a whitelist.

Limits

Only Flink that uses Ververica Runtime (VVR) 2.0.0 or later supports AnalyticDB for MySQL V3.0 connectors.

DDL syntax

CREATE TABLE adb_sink (
  id INT,
  num BIGINT,
  PRIMARY KEY (id) NOT ENFORCED
) WITH (
  'connector' = 'adb3.0',
  'tableName' = '<yourTablename>',
  'userName' = '<yourUsername>',
  'password' = '<yourPassword>',
  'url' = 'jdbc:mysql://<yourNetworkAddress>:<PortId>/<yourDatabaseName>'
);
Note The primary key that is defined in the DDL statement must be the same as the primary key that is defined in the table in the AnalyticDB for MySQL database. You must check whether the primary key exists both in the DDL statement and the table in the AnalyticDB for MySQL database. If the primary key exists, you must check whether the field name of the primary key is the same.

Parameters in the WITH clause

Parameter Description Required Remarks
connector The type of the result table. Yes Set the value to adb3.0.
tableName The name of the table. Yes N/A.
username The username that is used to access the AnalyticDB for MySQL database. Yes N/A.
password The password that is used to access the AnalyticDB for MySQL database. Yes N/A.
url The Java Database Connectivity (JDBC) URL of the database. Yes The URL of the AnalyticDB for MySQL database, such as url='jdbc:mysql://databaseName****-cn-shenzhen-a.ads.aliyuncs.com:10014/databaseName'.
Note
  • For more information about how to access an AnalyticDB for MySQL database, see Query a URL.
  • databaseName is the name of the AnalyticDB for MySQL database.
maxRetryTimes The maximum number of retries to write data to the table after data fails to be written. No The default value of this parameter varies based on the VVR version of Flink:
  • If the VVR version is 3.X or earlier, the default value is 3.
  • If the VVR version is 4.0.10 or later, the default value is 10.
batchSize The number of data records that can be written at a time. No The default value of this parameter varies based on the VVR version of Flink:
  • If the VVR version is 3.X or earlier, the default value is 100.
  • If the VVR version is 4.0.10 or later, the default value is 1000.
bufferSize The maximum number of data records that can be cached in the memory. Write operations are triggered if the value of the batchSize or bufferSize parameter reaches the specified threshold. No Default value: 1000.
Note
  • Only Flink that uses VVR 4.0.10 or later supports this parameter.
  • This parameter takes effect only after you specify the primary key.
flushIntervalMs The interval at which the cache is cleared. This value indicates that if the number of input data records does not reach the value specified by the batchSize parameter within the specified time, all cached data is written to the result table. No Unit: milliseconds. The default value of this parameter varies based on the VVR version of Flink:
  • If the VVR version is 3.X or earlier, the default value is 1000.
  • If the VVR version is 4.0.10 or later, the default value is 3000.
ignoreDelete Specifies whether to ignore delete operations. No Valid values:
  • true: The delete operations are ignored.
  • false: The delete operations are not ignored. This is the default value.
Note Only Flink that uses VVR 4.0.10 or later supports this parameter.
replaceMode Specifies whether to use the REPLACE INTO statement to insert data into the table. No Valid values:
  • true: The REPLACE INTO statement is used to insert data into the table. This is the default value.
  • false: The INSERT INTO ON DUPLICATE KEY statement is used to insert data into the table.
Note
  • Only Flink that uses VVR 4.0.10 or later supports this parameter.
  • Only AnalyticDB for MySQL 3.1.3.5 or later supports this parameter.
excludeUpdateColumns The fields that are not updated when data that has the same primary key is updated. No If you specify multiple such fields, use commas (,) to separate these fields. Example: excludeUpdateColumns=column1,column2.
Note This parameter takes effect only when replaceMode is set to false. If replaceMode is set to true, the values of the fields specified by this parameter are changed to null.
connectionMaxActive The maximum size of the thread pool. No Default value: 40.
Note Only Flink that uses VVR 4.0.10 or later supports this parameter.

Data type mappings

Data type of AnalyticDB for MySQL V3.0 Data type of Flink
BOOLEAN BOOLEAN
TINYINT INT
SMALLINT INT
INT INT
BIGINT BIGINT
DOUBLE DOUBLE
VARCHAR VARCHAR
DATETIME TIMESTAMP
DATE DATE
Note To ensure data precision, the Java Database Connectivity (JDBC) driver of MySQL converts the received data into a different data type.

Sample code

CREATE TEMPORARY TABLE datagen_source (
  `name` VARCHAR,
  `age` INT
) WITH (
  'connector' = 'datagen'
);

CREATE TEMPORARY TABLE adb_sink (
  `name` VARCHAR,
  `age` INT
) WITH (
  'connector' = 'adb3.0',
  'password' = '<yourPassword>',
  'tableName' = '<yourTablename>',
  'url' = '<yourUrl>',
  'userName' = '<yourUsername>'
);

INSERT INTO adb_sink
SELECT  * FROM datagen_source;