This topic provides the DDL syntax that is used to create an AnalyticDB for MySQL V3.0 result table, describes the parameters in the WITH clause, and provides data type mappings.

What is AnalyticDB for MySQL?

AnalyticDB for MySQL is a cloud native enterprise-class data warehousing service that integrates database and big data technologies. AnalyticDB for MySQL supports high-throughput real-time data addition, removal, and modification, low-latency real-time analysis, and complex extract, transform, load (ETL) operations. AnalyticDB for MySQL is compatible with upstream and downstream ecosystem tools and can be used to build enterprise-class report systems, data warehouses, and data service engines.

Prerequisites

  • An AnalyticDB for MySQL cluster and an AnalyticDB for MySQL table are created. For more information, see Create a cluster and CREATE TABLE.
  • A whitelist is configured for the AnalyticDB for MySQL cluster. For more information, see Configure a whitelist.

Limits

Only Flink that uses Ververica Runtime (VVR) 2.0.0 or later supports AnalyticDB for MySQL V3.0 connectors.

DDL syntax

CREATE TABLE adb_sink (
  id INT,
  num BIGINT
) WITH (
  'connector' = 'adb3.0',
  'password' = '<yourPassword>',
  'tableName' = '<yourTablename>',
  'url' = 'jdbc:mysql://<yourNetworkAddress>:<PortId>/<yourDatabaseName>',
  'userName' = '<yourUsername>'
);

Parameters in the WITH clause

Parameter Description Required Remarks
connector The type of the result table. Yes Set the value to adb3.0.
password The password that is used to access the AnalyticDB for MySQL database. Yes N/A.
tableName The name of the table. Yes N/A.
url The Java Database Connectivity (JDBC) URL of the database. Yes The URL of the AnalyticDB for MySQL database, such as url='jdbc:mysql://databaseName****-cn-shenzhen-a.ads.aliyuncs.com:10014/databaseName'.
Note
  • For more information about how to access an AnalyticDB for MySQL database, see Query a URL.
  • databaseName is the name of the AnalyticDB for MySQL database.
username The username that is used to access the AnalyticDB for MySQL database. Yes N/A.
maxRetryTimes The number of retries for writing data to the table after data writing fails. No Default value: 3.
batchSize The number of data records that are written at a time. No Default value: 5000.
flushIntervalMs The interval at which the cache is cleared. No Default value: 0. Unit: milliseconds. This value indicates that cache flushing is disabled. If this parameter is set to 2000, all cached data is written to the result table when the number of input data records does not reach the value specified by the bufferSize parameter within 2,000 milliseconds.
replaceMode Specifies whether to use the REPLACE INTO statement to insert data into the table. No Valid values:
  • true: The REPLACE INTO statement is used to insert data into the table. This is the default value.
  • false: The INSERT INTO ON DUPLICATE KEY statement is used to insert data into the table.
Note
  • Only VVR 4.x and later of Flink support this parameter.
  • Only AnalyticDB for MySQL 3.1.3.5 and later support this parameter.

Data type mapping

Data type of AnalyticDB for MySQL V3.0 Data type of Flink
BOOLEAN BOOLEAN
TINYINT INT
SMALLINT INT
INT INT
BIGINT BIGINT
DOUBLE DOUBLE
VARCHAR VARCHAR
DATETIME TIMESTAMP
DATE DATE

Sample code

CREATE TEMPORARY TABLE datagen_source (
  `name` VARCHAR,
  `age` INT
) WITH (
  'connector' = 'datagen'
);

CREATE TEMPORARY TABLE adb_sink (
  `name` VARCHAR,
  `age` INT
) WITH (
  'connector' = 'adb3.0',
  'password' = '<yourPassword>',
  'tableName' = '<yourTablename>',
  'url' = '<yourUrl>',
  'userName' = '<yourUsername>'
);

INSERT INTO adb_sink
SELECT  * FROM datagen_source;