This topic describes how to create an ApsaraDB RDS for SQL Server result table in Realtime Compute for Apache Flink. This topic also describes the parameters in the WITH clause, data type mappings, and Java Database Connectivity (JDBC) parameters that are used when you create such a result table.

Notice
  • This topic applies only to Blink V3.2.0 and later.
  • Realtime Compute for Apache Flink cannot use ApsaraDB RDS for SQL Server as a data store.

DDL syntax

In Realtime Compute for Apache Flink, you can use ApsaraDB RDS for SQL Server to store output data. The following code shows an example:
create table ss_output(
 id INT,
 len INT,
 content VARCHAR,
 primary key(id,len)
) with (
 type='jdbc',
 url='jdbc:sqlserver://ip:port;database=****',
 tableName='<yourDatabaseTableName>',
 userName='<yourDatabaseUserName>',
 password='<yourDatabasePassword>'
);
Note
  • In Realtime Compute for Apache Flink, each row of output data is converted to a line of SQL statement and then written and executed in the destination database. If you want to write multiple rows of data to the result table at the same time, you must add ?rewriteBatchedStatements=true to the URL to improve system performance.
  • You can define an auto-increment primary key for an ApsaraDB RDS for SQL Server database. If you want to use the auto-increment primary key, do not declare the auto-increment field in the DDL statement. For example, if you use the ID field as an auto-increment field, do not declare the ID field in the DDL statement. When a row of output data is written into an ApsaraDB RDS for SQL Server database, a value is automatically filled for the auto-increment field.
  • If a DRDS result table has partitions, the shard key must be declared in primary key() of the DDL statement. Otherwise, you cannot write data into the partitioned table.
  • The fields that are declared in a DDL statement must include at least one non-primary key field. Otherwise, an error is returned.

Parameters in the WITH clause

Parameter Description Required Remarks
url The JDBC URL of the database. Yes For more information, see
tableName The name of the table. Yes N/A.
username The username that is used to access the database. Yes N/A.
password The password that is used to access the database. Yes N/A.
maxRetryTimes The maximum number of retries for writing data to the table. No Default value: 10.
bufferSize The maximum number of data records that can be stored in the buffer before data deduplication is triggered. No Default value: 10000. This value indicates that deduplication is triggered after the number of input data records reaches 10,000.
flushIntervalMs The interval at which the cache is cleared. No Unit: milliseconds. Default value: 2000. This value indicates that if the number of input data records does not reach the value specified by the bufferSize parameter within 2,000 milliseconds, all cached data is written into the result table.
excludeUpdateColumns Specifies whether to ignore the update of the specified field. No This parameter is optional. This parameter is empty by default, which indicates that the primary key field is ignored by default. When data with the same primary key is updated, the specified columns are not updated.
ignoreDelete Specifies whether to ignore delete operations. No Default value: False.

Data type mappings

Data type of ApsaraDB RDS for SQL Server Data type of Realtime Compute for Apache Flink
BOOLEAN BOOLEAN
TINYINT TINYINT
SMALLINT SMALLINT
INT INT
BIGINT BIGINT
FLOAT FLOAT
DECIMAL DECIMAL
DOUBLE DOUBLE
DATE DATE
TIME TIME
TIMESTAMP TIMESTAMP
VARCHAR VARCHAR
VARBINARY VARBINARY

JDBC parameters

Parameter Description Default value Since version (JDBC driver)
useUnicode Specifies whether to use the Unicode character set. This parameter must be set to True if you set the characterEncoding parameter to GB2312 or GBK. False 1.1g
characterEncoding The character encoding format, such as GB2312 or GBK. If useUnicode is set to true, you must specify a character encoding format. False 1.1g
autoReconnect Specifies whether to automatically re-establish a connection when the connection to the database is unexpectedly interrupted. False 1.1
autoReconnectForPools Specifies whether to use the reconnection policy for a database connection pool. False 3.1.3
failOverReadOnly Specifies whether the database is read-only after it is automatically reconnected. True 3.0.12
maxReconnects The maximum number of reconnection attempts allowed. This parameter must be set if the autoReconnect parameter is set to True. 3 1.1
initialTimeout The interval between two reconnection attempts. Unit: seconds. This parameter must be set if the autoReconnect parameter is set to True. 2 1.1
connectTimeout The timeout period when you use a socket connection to access the database server. Unit: milliseconds. Default value: 0. This value indicates that the connection never times out. This parameter is provided in JDK V1.4 and later. 3.0.1
socketTimeout The timeout period for read and write operations on a socket connection. Unit: milliseconds. Default value: 0. This value indicates that the read or write operation never times out. 3.0.1

Sample code

The following example describes how to create an ApsaraDB RDS for SQL Server result table in a Realtime Compute for Apache Flink job.
CREATE TABLE source (
   id INT,
   len INT,
   content VARCHAR
) with (
   type = 'random'
);

CREATE TABLE rds_output(
 id INT,
 len INT,
 content VARCHAR,
 PRIMARY KEY (id,len)
) WITH (
 type='jdbc',
 url='<yourDatabaseURL>',
 tableName='<yourDatabaseTable>',
 userName='<yourDatabaseUserName>',
 password='<yourDatabasePassword>'
);

INSERT INTO rds_output
SELECT id, len, content FROM source;

FAQ

  • Q: When the output data of Realtime Compute for Apache Flink is written to an ApsaraDB RDS for SQL Server table, is the result table updated based on the primary key or is a new data record generated in the table?
    A: The processing method depends on whether the primary key is defined in the DDL statement.
    • If a primary key is defined in the DDL statement, the result table is updated by using insert into on duplicate key update. For a data record, if the primary key does not exist, the record is inserted into the table as a new row. If the value of the primary key field exists, the original row in the table is updated.
    • If no primary key is defined in the DDL statement, new data records are appended to the table by using insert into.
  • Q: What do I need to pay attention to when I perform GROUP BY operations based on the unique index of an ApsaraDB RDS for SQL Server table?
    A: Pay attention to the following points:
    • Declare the unique index in the primary key of your job.
    • An ApsaraDB RDS for SQL Server table has only one auto-increment primary key. Therefore, this auto-increment primary key cannot be declared as the primary key in a Realtime Compute for Apache Flink job.