Blink 3.6.0 and later allow you to use Blink connectors to write data to an AnalyticDB for PostgreSQL instance. This topic describes the related prerequisites, procedures, field mappings, and parameters.

Prerequisites

  • The Realtime Compute cluster and the AnalyticDB for PostgreSQL instance exist within the same Virtual Private Cloud (VPC). The CIDR block of the cluster is added to the whitelist of the AnalyticDB for PostgreSQL instance.
  • The Blink version of the Realtime Compute cluster must be 3.6.0 or later. The following steps describe how to create the cluster:
    1. Activate Realtime Compute and create a project. For more information, see Activate Realtime Compute and create a project.
      Note The activated Realtime Compute cluster and the destination AnalyticDB for PostgreSQL instance must reside within the same VPC.
    2. Install Blink 3.6.0 or later for the Realtime Compute cluster. For more information, see Manage Blink versions of a Realtime Compute cluster in exclusive mode.版本管理
  • Configure an AnalyticDB for PostgreSQL V6.0 instance.
    1. Create an instance.
      Note The created AnalyticDB for PostgreSQL instance and the Realtime Compute cluster must reside within the same VPC.
    2. Configure the whitelist of the AnalyticDB for PostgreSQL instance.
      1. In the VPC console, find the CIDR block of the cluster.VPC
      2. In the AnalyticDB for PostgreSQL console, find the AnalyticDB for PostgreSQL instance and click its ID. In the left-side navigation pane, click Security Controls. On the Security Controls page, click Create Whitelist.Security Controls
      3. Set Whitelist Name and IP Addresses, and click OK. The specified CIDR block in the VPC is allowed to access the AnalyticDB for PostgreSQL instance.Whitelist
    3. Create a destination table in the AnalyticDB for PostgreSQL instance.
      create table test15(                  
      b1 bigint,
      b2 smallint,
      b3 smallint,
      b4 int,
      b5 boolean,
      b6 real,                      
      b7 double precision,          
      b8 double precision,          
      b9 date,
      b10 time with time zone,
      b11 timestamp with time zone,
      b12 text,
      b15 json
      );

Create a file in which to write data

Random data sources are used in this section. You can create your data sources in actual scenarios.

  1. In the Realtime Compute console, choose Project Management > Projects. Find the target project and click its name.项目列表
  2. In the left-side navigation pane, click Development. In the top navigation bar, click Create File to create a Flink SQL file to which data is written.新建作业新建作业
The following sample SQL statements show how to write data to the AnalyticDB for PostgreSQL instance:
--SQL
--********************************************************************--
--Author: sqream_test
--CreateTime: 2020-04-27 19:13:44
--Comment: comments for businesses
--********************************************************************--

CREATE TABLE s_member_cart
(
a1 bigint   ,
a2 tinyint      ,
a3 smallint  ,
a4 int       ,
a5 boolean   ,
a6 FLOAT     ,
a7 DECIMAL   ,
a8 double    ,
a9 date      ,
a10 time      ,
a11 timestamp    ,
a12 tinyint


) WITH (
    type='random'
);

-- ads sink.
CREATE TABLE adsSink (
    `B1` bigint   ,
    `B2` tinyint  ,
    `B3` smallint  ,
    `B4` int       ,
    `B5` boolean,
    `B6` FLOAT     ,
    `B7` FLOAT   ,
    `B8` double    ,
    `B9` date      ,
    `B10` time      ,
    `B11` timestamp    ,
    `B12` varchar,
    `B15` varchar 
    --PRIMARY KEY(b1)
) with (  
    --type='print'
    type='adbpg',
    version='1.1',
    url='jdbc:postgresql://gp-xxxx:3432/testblink',
    tableName='test',
    userName='xxxx',
    password='xxxxxx',
    timeZone='Asia/Shanghai',
    useCopy='0'
);



INSERT INTO adsSink
SELECT a1,a2,a3,a4,a5,a6,a6,a8,a9,a10,a11, case when a12 >0 then 'letter1' else 'letter3' end as b12,'{ "customer": "letter56", "items": {"product": "Beer","qty": 6}}'
     from s_member_cart;

--insert into adsSink2 select a2, sum(a4) from s_member_cart group by a2;

Parameters

Parameter Description Required Constraint
type The type of the source table. Yes Set the value to adbpg.
url The JDBC URL. Yes The JDBC URL used to access the AnalyticDB for PostgreSQL database. The format of the URL is jdbc:postgresql://<yourNetworkAddress>:<PortId>/<yourDatabaseName>. In the format, yourNetworkAddress indicates the internal URL of the database. PortId indicates the ID of the port used to connect to the database. yourDatabaseName indicates the name of the database to be accessed. Example: url='jdbc:postgresql://gp-xxxxxx.gpdb.cn-chengdu.rds.aliyuncs.com:3432/postgres'
tableName The name of the table. Yes None
username The username that is used to access the database. Yes None
password The password that is used to access the database. Yes None
maxRetryTimes The maximum number of retries for data writing. No Default value: 3.
useCopy Specifies whether to use the copy API to write data. No Valid values: 0 and 1. Default value: 0. A value of 0 indicates that the INSERT statement is used to write data to the AnalyticDB for PostgreSQL instance. A value of 1 indicates that the copy API is used to write data to the AnalyticDB for PostgreSQL instance.
batchSize The number of data records that can be written at a time. No Default value: 5000.
exceptionMode The policy that is used to handle exceptions during data writing. No Valid values: ignore and strict. A value of ignore indicates that the written data is ignored when exceptions occur. A value of strict indicates that exceptions cause failovers during data writing.
conflictMode The policy that is used to handle primary key conflicts or unique index conflicts. No Valid values: ignore, strict, and update. A value of ignore indicates that primary key conflicts are ignored to retain original data. A value of strict indicates that primary key conflicts cause failovers. A value of update indicates that primary key conflicts cause data update.
targetSchema The name of the schema. No Default value: public.

Field type mapping

Realtime Compute field type AnalyticDB for PostgreSQL field type
BOOLEAN BOOLEAN
TINYINT SAMLLINT
SAMLLINT SAMLLINT
INT INT
BIGINT BIGINT
DOUBLE DOUBLE PRECISION
VARCHAR TEXT
DATETIME TIMESTAMP
DATE DATE
FLOAT REAL
DECIMAL DOUBLE PRECISION
TIME TIME
TIMESTAMP TIMESTAMP

Start a job

  1. In the lower-right corner of the Development tab, check whether Blink 3.6.0 or later is used. Otherwise, change the version.版本选择
  2. After you create a file, click Save and Publish in sequence to publish the job.保存 上线
  3. Click the Administration tab, and click Start in the Actions column corresponding to the job to write data.运维 导入
Connect to the AnalyticDB for PostgreSQL instance and check whether the data is written to the destination table.结果