Blink 3.6.0 and later allow you to use Blink connectors to write data to an AnalyticDB for PostgreSQL instance. This topic describes the related prerequisites, procedures, data type mappings, and parameters.
Prerequisites
The Realtime Compute for Apache Flink cluster and the AnalyticDB for PostgreSQL instance reside within the same virtual private cloud (VPC). The CIDR block of the cluster is added to the IP address whitelist of the AnalyticDB for PostgreSQL instance.
Create a Realtime Compute for Apache Flink cluster
To create a Realtime Compute for Apache Flink cluster in Blink 3.6.0 or later, perform the following steps:
- Activate Realtime Compute for Apache Flink and create a project. For more information, see Activate Realtime Compute for Apache Flink and create a project. Note The created Realtime Compute for Apache Flink cluster and the destination AnalyticDB for PostgreSQL instance must reside within the same VPC.
- Install Blink 3.6.0 or later for the Realtime Compute for Apache Flink cluster. For more information, see Manage Blink versions of a Realtime Compute for Apache Flink cluster deployed in exclusive mode.
Configure an AnalyticDB for PostgreSQL V6.0 instance
- Create an instance. Note The created AnalyticDB for PostgreSQL instance and the Realtime Compute for Apache Flink cluster must reside within the same VPC.
- Configure an IP address whitelist for the AnalyticDB for PostgreSQL instance.
- In the VPC console, find the CIDR block of the cluster.
- In the AnalyticDB for PostgreSQL console, find the destination AnalyticDB for PostgreSQL instance and click its ID. In the left-side navigation pane, click Security Controls. On the page that appears, click Create Whitelist.
- Enter the CIDR block of the cluster in the IP address whitelist of the AnalyticDB for PostgreSQL instance. Click OK.
- Create a destination table in the AnalyticDB for PostgreSQL instance.
create table test15( b1 bigint, b2 smallint, b3 smallint, b4 int, b5 boolean, b6 real, b7 double precision, b8 double precision, b9 date, b10 time with time zone, b11 timestamp with time zone, b12 text, b15 json );
Create a file in which to write data
Random data sources are used in this section. You can create your data sources in actual scenarios.
- Log on to the Realtime Compute Development Platform. In the left-side navigation pane, choose . Find a project and click its name.
- In the left-side navigation pane, click Development. In the top navigation bar, click Create File to create a Flink SQL file in which to write data.
--SQL
--********************************************************************--
--Author: sqream_test
--CreateTime: 2020-04-27 19:13:44
--********************************************************************--
CREATE TABLE s_member_cart
(
a1 bigint ,
a2 tinyint ,
a3 smallint ,
a4 int ,
a5 boolean ,
a6 FLOAT ,
a7 DECIMAL ,
a8 double ,
a9 date ,
a10 time ,
a11 timestamp ,
a12 tinyint
) WITH (
type='random'
);
-- ads sink.
CREATE TABLE adsSink (
`B1` bigint ,
`B2` tinyint ,
`B3` smallint ,
`B4` int ,
`B5` boolean,
`B6` FLOAT ,
`B7` FLOAT ,
`B8` double ,
`B9` date ,
`B10` time ,
`B11` timestamp ,
`B12` varchar,
`B15` varchar
--PRIMARY KEY(b1)
) with (
--type='print'
type='adbpg',
version='1.1',
url='jdbc:postgresql://gp-xxxx:3432/testblink',
tableName='test',
userName='xxxx',
password='xxxxxx',
timeZone='Asia/Shanghai',
useCopy='0'
);
INSERT INTO adsSink
SELECT a1,a2,a3,a4,a5,a6,a6,a8,a9,a10,a11, case when a12 >0 then 'value1' else 'value2' end as b12,'{ "customer": "value", "items": {"product": "Beer","qty": 6}}'
from s_member_cart;
--insert into adsSink2 select a2, sum(a4) from s_member_cart group by a2;
Parameters
Parameter | Description | Required | Constraint |
---|---|---|---|
type | The type of the dimension table. | Yes | Set the value to adbpg. |
url | The JDBC URL. | Yes | The JDBC URL is used to connect to the AnalyticDB for PostgreSQL instance. The URL must be in the following format: 'jdbc:postgresql://<yourNetworkAddress>:<PortId>/<yourDatabaseName>'. yourNetworkAddress indicates the internal URL of the database. PortId indicates the ID of the port that is used to connect to the database. yourDatabaseName indicates the name of the database. Example: url='jdbc:postgresql://gp-xxxxxx.gpdb.cn-chengdu.rds.aliyuncs.com:3432/postgres'. |
tableName | The name of the table. | Yes | None |
username | The username that is used to connect to the database. | Yes | None |
password | The password that is used to connect to the database. | Yes | None |
maxRetryTimes | The maximum number of retries for writing data to the table. | No | Default value: 3. |
useCopy | Specifies whether to copy the API to write data. | No | Valid values:
|
batchSize | The number of data records that can be written in one operation. | No | Default value: 5000. |
exceptionMode | The policy that is used to handle exceptions when data is written to the table. | No | Default value: ignore. Valid values:
|
conflictMode | The policy that is used to handle primary key or unique index conflicts. | No | Default value: ignore. Valid values:
|
targetSchema | The name of the schema. | No | Default value: public. |
writeMode | The fine-grained policy for data writing based on the useCopy parameter. | No | Blink versions later than 3.6.4 support this parameter. You can specify this parameter when you set the useCopy parameter. Default value: 1. Valid values:
|
Data type mappings
Realtime Compute for Apache Flink data type | AnalyticDB for PostgreSQL data type |
---|---|
BOOLEAN | BOOLEAN |
TINYINT | SAMLLINT |
SAMLLINT | SAMLLINT |
INT | INT |
BIGINT | BIGINT |
DOUBLE | DOUBLE PRECISION |
VARCHAR | TEXT |
DATETIME | TIMESTAMP |
DATE | DATE |
FLOAT | REAL |
DECIMAL | DOUBLE PRECISION |
TIME | TIME |
TIMESTAMP | TIMESTAMP |
Start a job
- In the lower-right corner of the Development tab, check whether Blink 3.6.0 or later is used. If not, change the version.
- After you create a file, click Save and Publish in sequence to publish the job.
- Click the Administration tab. Click Start in the Actions column corresponding to the job to write data.
Connect to the AnalyticDB for PostgreSQL instance and check whether the data is written to the destination table.
Data writing policies in different Blink versions
Blink 3.6.4:
- By default, the BATCH COPY statement is used to write data. The BATCH COPY statement provides higher write performance than the BATCH INSERT statement.
- The writeMode parameter is supported.
- In Blink versions later than 3.6.4, if the useCopy parameter is not set to 1, the BATCH COPY statement is used to write data regardless of the writeMode value.
- For example, if you want to use the BATCH INSERT statement to write data to the AnalyticDB for PostgreSQL instance, you must set the useCopy parameter to 0 and the writeMode parameter to 0. If you want to use the BATCH UPSERT statement to write data to the AnalyticDB for PostgreSQL instance, you must set the useCopy parameter to 0 and the writeMode parameter to 2.
- In later updates, the useCopy parameter will be removed. We recommend that you use the writeMode parameter to configure the data writing policy.
- You can set the conflictMode parameter to upsert. This handles primary key conflicts by using the INSERT ON CONFLICT statement.
Blink 3.6.0:
Blink 3.6.0 supports result tables of AnalyticDB for PostgreSQL V6.0.