This topic describes how to create an AnalyticDB for MySQL V3.0 result table in Realtime Compute for Apache Flink. This topic also describes the parameters in the WITH clause used when you create an AnalyticDB for MySQL V3.0 result table.

Notice
  • You cannot register AnalyticDB for MySQL V3.0 storage resources in the Realtime Compute for Apache Flink console to store result tables.
  • This topic applies only to Blink 3.3.0 and later.
  • For more information about how to create an AnalyticDB for MySQL V2.0 result table, see Create an AnalyticDB for MySQL V2.0 result table.
  • You are allowed to define an auto-increment primary key for an AnalyticDB for MySQL V3.0 database. If you want to use the auto-increment primary key, do not declare the auto-increment field in a data definition language (DDL) statement. For example, if you use ID as an auto-increment field, do not declare the ID field in the DDL statement. When a row of output data is written to the AnalyticDB for MySQL V3.0 database, the value for the auto-increment field is automatically filled.

DDL syntax

In Realtime Compute for Apache Flink, you can use AnalyticDB for MySQL V3.0 to store output data. The following sample code shows how to create an AnalyticDB for MySQL V3.0 result table.
CREATE TABLE rds_output (
id INT,
len INT,
content VARCHAR, 
PRIMARY KEY(id,len)
) WITH (
type='ADB30',
url='jdbc:mysql://<yourNetworkAddress>:<PortId>/<yourDatabaseName>',
tableName='<yourDatabaseTableName>',
userName='<yourDatabaseUserName>',
password='<yourDatabasePassword>'
);

Principles

Realtime Compute for Apache Flink writes data to an AnalyticDB for MySQL V3.0 result table in two steps:
  1. Converts each row of output data to a line of SQL statement.
  2. Writes and executes the SQL statement in the destination database.

Parameters in the WITH clause

Parameter Description Required Remarks
type The type of the connector. Yes Set the value to ADB30.
url The Java Database Connectivity (JDBC) URL of the database. Yes The URL of the AnalyticDB for MySQL database. Example: url='jdbc:mysql://databaseName****-cn-shenzhen-a.ads.aliyuncs.com:10014/databaseName'.
Note
  • For more information about how to access an AnalyticDB for MySQL database, see Query a URL.
  • databaseName is the name of the AnalyticDB for MySQL database.
tableName The name of the table. Yes N/A
username The username that is used to access the database. Yes N/A
password The password that is used to access the AnalyticDB for MySQL database. Yes N/A
maxRetryTimes The maximum number of retries for writing data. No Default value: 3.
bufferSize The maximum number of data records that can be stored in the buffer before data deduplication is triggered. No Default value: 1000. This value indicates that duplicates are removed when the number of input data records reaches 1,000.
Note This parameter is valid only after you specify the primary key.
batchSize The number of data records that is written at a time. No Default value: 1000.
Note This parameter is valid only after you specify the primary key.
flushIntervalMs The time interval at which the cache is cleared. No Default value: 3000. Unit: milliseconds. This value indicates that all the cached data is written to the result table if the number of input data records does not reach the batchSize value within 3,000 milliseconds.
ignoreDelete Specifies whether to ignore delete operations. No Default value: false. This value indicates that the delete operations are supported.
replaceMode Specifies whether to use the REPLACE INTO statement to insert data into the table. No Valid values:
  • true: The REPLACE INTO statement is used to insert data into the table. This is the default value.
  • false: The INSERT INTO ON DUPLICATE KEY statement is used to insert data into the table.
Note This parameter is valid only when the following conditions are met:
  • The Blink version is 3.X or later.
  • The AnalyticDB for MySQL version is 3.1.3.5 or later.
excludeUpdateColumns Specifies the columns that are not updated when data with the same primary key is updated. No If you specify multiple columns that are not updated, separate them with commas (,). Example: excludeUpdateColumns=column1,column2.
reserveMilliSecond Specifies whether to reserve the millisecond component in a value of the TIMESTAMP data type. No Default value: false. This indicates that the millisecond component is not reserved.