This topic describes how to create an ApsaraDB RDS result table in Realtime Compute for Apache Flink. It also describes the parameters in the WITH clause and data type mapping used when you create such a result table.

Notice Realtime Compute for Apache Flink cannot use ApsaraDB RDS for SQL Server as a data store.

Introduction to ApsaraDB RDS

Alibaba Cloud ApsaraDB RDS is a stable, reliable, and scalable online database service. Based on the distributed file system and high-performance storage services of Alibaba Cloud, ApsaraDB RDS supports a variety of database engines such as MySQL, SQL Server, PostgreSQL, and Postgres Plus Advanced Server (PPAS). PPAS is compatible with Oracle databases. ApsaraDB RDS provides a wide range of solutions for database operations and management (O&M), including disaster recovery, backup, restoration, monitoring, and migration.
Note ApsaraDB RDS, Distributed Relational Database Service (DRDS), and PolarDB use the same parameters in the WITH clause. When you want to use an ApsaraDB RDS, DRDS, or PolarDB table as a result table, make sure that a real table exists.

DDL definition

The following sample code demonstrates how to create an ApsaraDB RDS or DRDS result table. Only ApsaraDB RDS for MySQL is supported.
CREATE TABLE rds_output(
   id INT,
   len INT,
   content VARCHAR,
   PRIMARY KEY (id,len)
) WITH (
   type='rds',
   url='<yourDatabaseURL>',
   tableName='<yourDatabaseTable>',
   userName='<yourDatabaseUserName>',
   password='<yourDatabasePassword>'
);        
Note
  • In Realtime Compute for Apache Flink, each row of output data is converted to a line of SQL statement and then written and executed in the destination database. If you want to write multiple rows of output data at a time, you must add ? rewriteBatchedStatements=true to the end of the URL. This improves system efficiency.
  • You can define an auto-increment primary key for the ApsaraDB RDS for SQL Server database that stores the result table. If you want to use the auto-increment primary key, do not declare the auto-increment field in the DDL statement. For example, if you use ID as an auto-increment field, do not declare the ID field in the DDL statement. When a row of output data is written into the ApsaraDB RDS for MySQL database, the value for the auto-increment field is automatically filled.
  • We recommend that you use the storage registration method to connect to the database. For more information, see Register ApsaraDB for RDS resources.
  • You must declare at least one non-primary key in the DDL statement. Otherwise, an error is returned.

Parameters in the WITH clause

Parameter Description Required Remarks
type The type of the result table. Yes Set the value to rds.
url The Java Database Connectivity (JDBC) URL of a database. Yes Set this parameter in the jdbc:mysql://<Internal endpoint>/<databaseName> format. Replace databaseName with the name of your database. To obtain the internal endpoint, click the following link:
tableName The name of the table. Yes None.
userName The username that is used to log on to the database. Yes None.
password The password that is used to log on to the database. Yes None.
maxRetryTimes The maximum number of retries for writing data. No Default value: 10.
batchSize The number of data records that are written at a time. No Default value: 4096.
bufferSize The maximum number of data records that can be stored in the buffer before deduplication is triggered. No Default value: 10000.
flushIntervalMs The time interval at which the buffer is cleared. No Default value: 2000. Unit: milliseconds. This value indicates that if the number of input data records does not reach the value specified by the bufferSize parameter within 2,000 milliseconds, all cached data is written into the result table.
excludeUpdateColumns Specifies whether to ignore the update of a specified field. No This parameter is empty by default. If it is empty, the primary key column is not updated. When data with the same primary key is updated, the specified columns are not updated.
ignoreDelete Specifies whether to skip the delete operation. No Default value: false. This value indicates that the delete operation is supported.
partitionBy The columns based on which Realtime Compute for Apache Flink performs hash partitioning. No This parameter is empty by default. Before Realtime Compute for Apache Flink writes data to the sink node, Realtime Compute for Apache Flink performs hash partitioning based on the value of this parameter. The data then flows to the relevant hash node.

Field type mapping

ApsaraDB RDS data type Data type of Realtime Compute for Apache Flink
BOOLEAN BOOLEAN
TINYINT TINYINT
SMALLINT SMALLINT
INT INT
BIGINT BIGINT
FLOAT FLOAT
DECIMAL DECIMAL
DOUBLE DOUBLE
DATE DATE
TIME TIME
TIMESTAMP TIMESTAMP
VARCHAR VARCHAR
VARBINARY VARBINARY

JDBC parameters

Parameter Description Default value Required JDBC version
useUnicode Specifies whether to use the Unicode character set. If you want to set the characterEncoding parameter to GB2312 or GBK, you must set this parameter to true. false 1.1g
characterEncoding The character encoding format, such as GB2312 or GBK. If useUnicode is set to true, you must specify a character encoding format. false 1.1g
autoReconnect Specifies whether to automatically re-establish a connection if the connection to the database is unexpectedly interrupted. false 1.1
autoReconnectForPools Specifies whether to apply the reconnection policy to a database connection pool. false 3.1.3
failOverReadOnly Specifies whether to set the connection to read-only after the database is automatically reconnected. true 3.0.12
maxReconnects The maximum number of reconnection attempts allowed. This parameter must be set if the autoReconnect parameter is set to true. 3 1.1
initialTimeout The interval between two reconnection attempts. Unit: seconds. This parameter must be set if the autoReconnect parameter is set to true. 2 1.1
connectTimeout Specifies the timeout period when you use a socket connection to access the database server. Unit: milliseconds. Default value: 0. This value indicates that the connection never times out. This parameter applies to JDK V1.4 and later. 0 3.0.1
socketTimeout The timeout period for a socket-based read or write operation. Unit: milliseconds. Default value: 0. This value indicates that the read or write operation never times out. 0 3.0.1

Sample code

The following sample code demonstrates how to create an ApsaraDB RDS result table in a Realtime Compute for Apache Flink job.
CREATE TABLE source (
   id INT,
   len INT,
   content VARCHAR
) with (
   type = 'random'
);

CREATE TABLE rds_output(
   id INT,
   len INT,
   content VARCHAR,
   PRIMARY KEY (id,len)
) WITH (
   type='rds',
   url='<yourDatabaseURL>',
   tableName='<yourDatabaseTable>',
   userName='<yourDatabaseUserName>',
   password='<yourDatabasePassword>'
);

INSERT INTO rds_output
SELECT id, len, content FROM source;

FAQ

  • Q: When output data is written to an ApsaraDB RDS result table, is a new data record inserted in the table or is the result table updated based on the primary key value?

    A: If a primary key is defined in the DDL statement, the following statement is executed to write output data: INSERT INTO tablename(field1,field2, field3, ...) VALUES(value1, value2, value3, ...) ON DUPLICATE KEY UPDATE field1=value1,field2=value2, field3=value3, ... ;. If the primary key value in the output data already exists in the table, the matching record is updated. Otherwise, the output data is inserted as a new record. If no primary key is defined in the DDL statement, the INSERT INTO statement is executed to insert the output data.

  • Q: How do I perform GROUP BY operations based on the unique index of an ApsaraDB RDS result table?
    A:
    • You must declare the unique index in the GROUP BY clause in your Realtime Compute for Apache Flink job.
    • An ApsaraDB RDS table has only one auto-increment primary key, which cannot be declared as a primary key in a Realtime Compute for Apache Flink job.