All Products
Search
Document Center

AnalyticDB for PostgreSQL:Use Realtime Compute for Apache Flink to write data to AnalyticDB for PostgreSQL

Last Updated:Oct 18, 2023

This topic describes how to use Realtime Compute for Apache Flink to write data to AnalyticDB for PostgreSQL.

Limits

  • Realtime Compute for Apache Flink cannot read data from AnalyticDB for PostgreSQL in serverless mode.

  • Only Realtime Compute for Apache Flink that uses Ververica Runtime (VVR) 6.0.0 or later supports the AnalyticDB for PostgreSQL connector.

  • Only Realtime Compute for Apache Flink that uses VVR 8.0.1 or later supports AnalyticDB for PostgreSQL V7.0.

    Note

    If you use a custom connector, perform operations by following the instructions that are described in Manage custom connectors.

Prerequisites

  • A fully managed Flink workspace is created. For more information, see Activate fully managed Flink.

  • An AnalyticDB for PostgreSQL instance is created. For more information, see Create an instance.

  • The AnalyticDB for PostgreSQL instance and the fully managed Flink workspace reside in the same virtual private cloud (VPC).

Configure an AnalyticDB for PostgreSQL instance

  1. Log on to the AnalyticDB for PostgreSQL console.
  2. Add the CIDR block of the fully managed Flink workspace to an IP address whitelist of the AnalyticDB for PostgreSQL instance. For more information, see Configure an IP address whitelist.

  3. Click Log On to Database. For more information about how to connect to a database, see Client connection.

  4. Create a table on the AnalyticDB for PostgreSQL instance.

    Sample statement:

    CREATE TABLE test_adbpg_table(
    b1 int,
    b2 int,
    b3 text,
    PRIMARY KEY(b1)
    );

Configure a fully managed Flink workspace

  1. Log on to the Realtime Compute for Apache Flink console.

  2. On the Fully Managed Flink tab, find the workspace that you want to manage and click Console in the Actions column.

  3. In the left-side navigation pane, click Connectors.

  4. On the Connectors page, click Create Custom Connector.

  5. Upload the JAR file of the custom connector that you want to create.

    Note
    • For information about how to obtain the JAR file of the custom connector of AnalyticDB for PostgreSQL, visit GitHub.

    • The JAR file must be in the same version as the Flink engine of Realtime Compute for Apache Flink.

  6. After you upload the JAR file, click Next.

    The system parses the content of the JAR file that you uploaded. If file parsing is successful, proceed to the next step. If file parsing fails, check whether the code of your custom connector complies with the standards that are defined by the Apache Flink community.

  7. Click Finish.

    The custom connector that you create appears in the connector list.

Create a Flink job

  1. Log on to the Realtime Compute for Apache Flink console. On the Fully Managed Flink tab, find the workspace that you want to manage and click Console in the Actions column.

  2. In the left-side navigation pane, click SQL Editor. In the upper-left corner of the SQL Editor page, click New. In the New Draft dialog box, click Blank Stream Draft on the SQL Scripts tab and click Next.

  3. In the New Draft dialog box, configure the parameters of the draft. The following table describes the parameters.

    Parameter

    Description

    Example

    Name

    The name of the draft that you want to create.

    Note

    The draft name must be unique in the current project.

    adbpg-test

    Location

    The folder in which the code file of the draft is saved.

    You can also click the 新建文件夹 icon to the right of an existing folder to create a subfolder.

    Draft

    Engine Version

    The engine version of Flink that is used by the draft. For more information about engine versions, version mappings, and important time points in the lifecycle of each version, see Engine version.

    vvr-6.0.7-flink-1.15

  4. Click Create.

Write data to AnalyticDB for PostgreSQL

  1. Write deployment code.

    Create a source table named datagen_source and an AnalyticDB for PostgreSQL table named test_adbpg_table. Copy the following code to the code editor of the deployment.

    CREATE TABLE datagen_source (
     f_sequence INT,
     f_random INT,
     f_random_str STRING
    ) WITH (
     'connector' = 'datagen',
     'rows-per-second'='5',
     'fields.f_sequence.kind'='sequence',
     'fields.f_sequence.start'='1',
     'fields.f_sequence.end'='1000',
     'fields.f_random.min'='1',
     'fields.f_random.max'='1000',
     'fields.f_random_str.length'='10'
    );
    
    CREATE TABLE test_adbpg_table (
        `B1` bigint   ,
        `B2` bigint  ,
        `B3` VARCHAR ,
        `B4` VARCHAR,
         PRIMARY KEY(B1) not ENFORCED
    ) with (
       'connector' = 'adbpg-nightly-1.13',
       'password' = 'xxx',
       'tablename' = 'test_adbpg_table',
       'username' = 'xxxx',
       'url' = 'jdbc:postgresql://url:5432/schema',
       'maxretrytimes' = '2',
       'batchsize' = '50000',
       'connectionmaxactive' = '5',
       'conflictmode' = 'ignore',
       'usecopy' = '0',
       'targetschema' = 'public',
       'exceptionmode' = 'ignore',
       'casesensitive' = '0',
       'writemode' = '1',
       'retrywaittime' = '200'
    );

    In the preceding code, retain the values of parameters related to the datagen_source table and modify the values of parameters related to the test_adbpg_table table based on your business requirements. The following table describes the parameters of the test_adbpg_table table.

    Parameter

    Required

    Description

    connector

    Yes

    The name of the connector. It is in the adbpg-nightly-Version number format. Example: adbpg-nightly-1.13.

    url

    Yes

    The Java Database Connectivity (JDBC) URL that is used to connect to the AnalyticDB for PostgreSQL instance. The URL is in the jdbc:postgresql://<Internal endpoint>:<Port number>/<Database name> format. Example: jdbc:postgresql://gp-xxxxxx.gpdb.cn-chengdu.rds.aliyuncs.com:5432/postgres.

    tablename

    Yes

    The name of the AnalyticDB for PostgreSQL table.

    username

    Yes

    The name of the database account that is used to connect to the AnalyticDB for PostgreSQL database.

    password

    Yes

    The password of the AnalyticDB for PostgreSQL database account.

    maxretrytimes

    No

    The maximum number of retries that are allowed to write data to the table if a data writing attempt fails. Default value: 3.

    batchsize

    No

    The maximum number of rows that can be written to the table at a time. Default value: 50000.

    exceptionmode

    No

    The policy that is used to handle exceptions during data writing. Valid values:

    • ignore (default): The system ignores the data that is written during an exception.

    • strict: If an exception occurs during data writing, the system performs a failover and reports an error.

    conflictmode

    No

    The policy that is used to handle primary key or unique index conflicts. Valid values:

    • ignore: If a primary key conflict occurs, the system ignores the primary key conflicts and retains the existing data.

    • strict: If a primary key conflict occurs, the system performs a failover and reports an error.

    • update: If a primary key conflict occurs, the system updates data.

    • upsert (default): If a primary key conflict occurs, the system performs UPSERT operations to write data.

      AnalyticDB for PostgreSQL uses a combination of INSERT ON CONFLICT and COPY ON CONFLICT statements to perform UPSERT operations. If the destination table is a partitioned table, the minor version must be V6.3.6.1 or later. For information about how to update the minor version, see Update the minor engine version.

    targetschema

    No

    The schema of the AnalyticDB for PostgreSQL database. Default value: public.

    writemode

    No

    The method that is used to write data. Valid values:

    • 0: The BATCH INSERT statement is used to write data.

    • 1 (default): The COPY API is used to write data.

    • 2: The BATCH UPSERT statement is used to write data.

    verbose

    No

    Specifies whether to display connector logs. Valid values:

    • 0 (default): does not display connector logs.

    • 1: displays connector logs.

    retrywaittime

    No

    The interval between retries when an exception occurs. Unit: milliseconds. Default value: 100.

    batchwritetimeoutms

    No

    The timeout period of batch data writes. After this period ends, the data batch is written. Unit: milliseconds. Default value: 50000.

    connectionmaxactive

    No

    The maximum number of active connections that can be allocated in a connection pool at the same time for a single task manager. Default value: 5.

    casesensitive

    No

    Specifies whether column and table names are case-sensitive. Valid values:

    • 0 (default): case-insensitive.

    • 1: case-sensitive.

    Note

    For more information about the supported parameters and data type mappings, see AnalyticDB for PostgreSQL connector.

  2. Start the deployment.

    1. In the upper-right corner of the SQL Editor page, click Deploy. In the message that appears, click OK.

      Note

      Session clusters are suitable for development and test environments in non-production environments. You can debug deployments in a session cluster to improve the resource utilization of the JobManager and accelerate the deployment startup. To ensure your business stability, we recommend that you do not publish a deployment to a session cluster. For more information, see Debug a deployment.

    2. On the Deployments page, find the deployment that you want to start and click Start in the Actions column.

    3. In the Start Job message, click Start.

Verify the synchronization result

  1. Connect to the AnalyticDB for PostgreSQL database. For more information, see Client connection.

  2. Execute the following statement to query the test_adbpg_table table:

    SELECT * FROM test_adbpg_table;

    Data is written to the AnalyticDB for PostgreSQL database. The following figure shows the returned result.

    adbpg2.png

References