This topic describes how to create an ApsaraDB RDS result table in Realtime Compute for Apache Flink. It also describes the parameters in the WITH clause and data type mapping used when you create an ApsaraDB RDS result table.

Notice Realtime Compute for Apache Flink does not allow you to use ApsaraDB RDS for MySQL V8.0 by using the storage registration method. To use ApsaraDB RDS for MySQL V8.0, we recommend that you configure a plaintext AccessKey pair. For more information about the storage registration method, see Overview.

Introduction to ApsaraDB RDS

ApsaraDB RDS is a stable, reliable, and scalable online database service. ApsaraDB RDS supports a wide range of database engines, such as MySQL, SQL Server, PostgreSQL, and Postgres Plus Advanced Server (PPAS), based on Apsara Distributed File System and high-performance storage services. ApsaraDB RDS provides comprehensive solutions for database operations and maintenance (O&M), such as disaster recovery, data backup, data recovery and restoration, monitoring, and data migration.
Note ApsaraDB RDS, Distributed Relational Database Service (DRDS), and PolarDB use the same parameters in the WITH clause. If you want to use an ApsaraDB RDS, DRDS, or PolarDB table as a result table, make sure that a real table exists.

DDL syntax

The following sample code shows how to create an ApsaraDB RDS or DRDS result table. Only ApsaraDB RDS for MySQL is supported.
CREATE TABLE rds_output(
   id INT,
   len INT,
   content VARCHAR,
   PRIMARY KEY (id,len)
) WITH (
   type='rds',
   url='<yourDatabaseURL>',
   tableName='<yourDatabaseTable>',
   userName='<yourDatabaseUserName>',
   password='<yourDatabasePassword>'
);        
Note
  • In Realtime Compute for Apache Flink, each row of output data is converted to a line of SQL statement and then written and executed in the destination database. If you want to write multiple rows of output data at a time, you must add ?rewriteBatchedStatements=true to the end of the URL. This improves the system performance.
  • You can define an auto-increment primary key for the ApsaraDB RDS for SQL database that stores the result table. If you want to use the auto-increment primary key, do not declare the auto-increment field in the DDL statement. For example, if you use ID as an auto-increment field, do not declare the ID field in the DDL statement. When a row of output data is written to the ApsaraDB RDS for MySQL database, the value for the auto-increment field is automatically filled.
  • We recommend that you use the storage registration method to connect to the database. For more information, see Register an ApsaraDB for RDS instance.
  • You must declare at least one non-primary key in the DDL statement. Otherwise, an error is returned.

Parameters in the WITH clause

Parameter Description Required Remarks
type The type of the result table. Yes Set the value to rds.
url The Java Database Connectivity (JDBC) URL of the database. Yes Set the value in the jdbc:mysql://<Internal endpoint>/<databaseName> format. Replace databaseName with the name of your database. To obtain the internal endpoint, click the following links:
tableName The name of the table. Yes N/A.
userName The username that is used to access the database. Yes N/A.
password The password that is used to access the database. Yes N/A.
maxRetryTimes The maximum number of retries that are allowed to write data to the table. No Default value: 10.
batchSize The number of data records that is written at a time. No Default value: 4096.
bufferSize The maximum number of data records that can be stored in the buffer before data deduplication is triggered. No Default value: 10000.
flushIntervalMs The interval at which the cache is cleared. No Default value: 2000. Unit: milliseconds. This value indicates that if the number of input data records does not reach the value specified by the bufferSize parameter within 2,000 milliseconds, all cached data is written to the result table.
excludeUpdateColumns Specifies whether to ignore the update of the specified field. No This parameter is empty by default. If it is empty, the primary key column is not updated. When data with the same primary key is updated, the specified columns are not updated.
ignoreDelete Specifies whether to skip the delete operation. No Default value: false. This value indicates that the delete operation is supported.
partitionBy The columns based on which Realtime Compute for Apache Flink performs hash partitioning. No This parameter is empty by default. Before Realtime Compute for Apache Flink writes data to the sink node, Realtime Compute for Apache Flink performs hash partitioning based on the value of this parameter. The data then flows to the relevant hash node.

Data type mapping

Data type of ApsaraDB RDS Data type of Realtime Compute for Apache Flink
BOOLEAN BOOLEAN
TINYINT TINYINT
SMALLINT SMALLINT
INT INT
BIGINT BIGINT
FLOAT FLOAT
DECIMAL DECIMAL
DOUBLE DOUBLE
DATE DATE
TIME TIME
TIMESTAMP TIMESTAMP
VARCHAR VARCHAR
VARBINARY VARBINARY

JDBC parameters

Parameter Description Default value Since version (JDBC driver)
useUnicode Specifies whether to use the Unicode character set. If you want to set the characterEncoding parameter to GB2312 or GBK, you must set this parameter to true. false 1.1g
characterEncoding The character encoding format, such as GB2312 or GBK. If useUnicode is set to true, you must specify a character encoding format. false 1.1g
autoReconnect Specifies whether to automatically re-establish a connection when the connection to the database is unexpectedly interrupted. false 1.1
autoReconnectForPools Specifies whether to use the reconnection policy for a database connection pool. false 3.1.3
failOverReadOnly Specifies whether the database is read-only after it is automatically reconnected. true 3.0.12
maxReconnects Specifies the maximum number of reconnection attempts allowed if the autoReconnect parameter is set to true. 3 1.1
initialTimeout Specifies the interval between two reconnection attempts if the autoReconnect parameter is set to true. Unit: seconds. 2 1.1
connectTimeout Specifies the timeout period when you use a socket connection to access the database server. Unit: milliseconds. Default value: 0. This value indicates that the connection never times out. This parameter applies to JDK V1.4 and later. 0 3.0.1
socketTimeout The timeout period for a socket-based read or write operation. Unit: milliseconds. Default value: 0. This value indicates that the read or write operation never times out. 0 3.0.1

Sample code

The following sample code shows how to create an ApsaraDB RDS result table in a Realtime Compute for Apache Flink job.
CREATE TABLE source (
   id INT,
   len INT,
   content VARCHAR
) with (
   type = 'random'
);

CREATE TABLE rds_output(
   id INT,
   len INT,
   content VARCHAR,
   PRIMARY KEY (id,len)
) WITH (
   type='rds',
   url='<yourDatabaseURL>',
   tableName='<yourDatabaseTable>',
   userName='<yourDatabaseUserName>',
   password='<yourDatabasePassword>'
);

INSERT INTO rds_output
SELECT id, len, content FROM source;

FAQ

  • Q: When output data is written to an ApsaraDB RDS result table, is a new data record inserted into the table or is the result table updated based on the primary key value?

    A: If a primary key is defined in the DDL statement, the following statement is executed to write output data: INSERT INTO tablename(field1,field2, field3, ...) VALUES(value1, value2, value3, ...) ON DUPLICATE KEY UPDATE field1=value1,field2=value2, field3=value3, ...;. If the primary key value in the output data exists in the table, the matching record is updated. Otherwise, the output data is inserted as a new record. If no primary key is defined in the DDL statement, the INSERT INTO statement is executed to insert the output data.

  • Q: How do I perform GROUP BY operations by using the unique index of an ApsaraDB RDS result table?
    A: Use the following method to resolve the issue:
    • You must declare the unique index in the GROUP BY clause in your Realtime Compute for Apache Flink job.
    • An ApsaraDB RDS table has only one auto-increment primary key, which cannot be declared as a primary key in a Realtime Compute for Apache Flink job.