This topic describes how to read data from AnalyticDB for PostgreSQL in real time by using Realtime Compute for Apache Flink built on the Ververica Platform (VVP) computing engine.

Precautions

This feature is not supported in AnalyticDB for PostgreSQL in Serverless mode.

Prerequisites

Configure a Realtime Compute for Apache Flink cluster

  1. Log on to the Realtime Compute for Apache Flink console.
  2. On the Fully Managed Flink tab, find the workspace that you want to manage and click Console in the Actions column.
  3. In the left-side navigation pane, click Draft Editor.
  4. On the Connectors tab, click the + icon. In the Create Connector dialog box, upload the JAR package of a custom Flink connector of AnalyticDB for PostgreSQL.
    Upload a connector
    Note
    • For more information about how to obtain the JAR package of a custom Flink connector of AnalyticDB for PostgreSQL, visit GitHub.
    • The JAR package must be in the same version as the Flink engine of the Realtime Compute for Apache Flink cluster.
  5. After you upload the JAR package, click Continue.
  6. Click Finish.

Configure an AnalyticDB for PostgreSQL instance

  1. Log on to the AnalyticDB for PostgreSQL console.
  2. Add the CIDR block of the Realtime Compute for Apache Flink cluster to the IP address whitelist of an AnalyticDB for PostgreSQL instance. For more information about how to configure an IP address whitelist, see Configure an IP address whitelist.
  3. Click Log On to Database. For more information about how to connect to a database, see Use client tools to connect to an instance.
  4. Create a table on the AnalyticDB for PostgreSQL instance and insert test data to the table.
    Sample statements:
    CREATE TABLE test_dim_table(
    id int,
    username text,
    PRIMARY KEY(id)
    );
    
    INSERT INTO test_dim_table(id, username)
    SELECT i, 'username'||i::text
    FROM generate_series(1, 1000) AS t(i);

Read data from AnalyticDB for PostgreSQL

  1. Log on to the Realtime Compute for Apache Flink console.
  2. On the Fully Managed Flink tab, find the workspace that you want to manage and click Console in the Actions column.
  3. In the left-side navigation pane, click Draft Editor.
  4. Create a source table named datagen_source, a corresponding AnalyticDB for PostgreSQL dimension table named dim_adbpg, and a result table named print_table.
    1. On the Draft Editor page, click New.
    2. In the New Draft dialog box, specify Name, retain the default settings for other fields, and then click OK.
      Create a file
    3. Copy the following job code to the code editor:
      DROP TABLE IF EXISTS datagen_source;
      CREATE TABLE datagen_source (
       f_sequence INT,
       f_random INT,
       f_random_str STRING
      ) WITH (
       'connector' = 'datagen',
       'rows-per-second'='5',
       'fields.f_sequence.kind'='sequence',
       'fields.f_sequence.start'='1',
       'fields.f_sequence.end'='1000',
       'fields.f_random.min'='1',
       'fields.f_random.max'='1000',
       'fields.f_random_str.length'='10'
      );
      
      
      DROP TABLE IF EXISTS dim_adbpg;
      CREATE TABLE dim_adbpg(
          id int,
          username varchar,
          PRIMARY KEY(id) not ENFORCED
      ) with(
          'connector' = 'adbpg-nightly-1.13',
          'password' = '*******',
         'tablename' = 'test_dim_table',
         'username' = 'testuser',
         'url' = 'jdbc:postgresql://gp-2z***************-master.gpdb.rds.aliyuncs.com:5432/why_test',
          'joinMaxRows'='100',
          'maxRetryTimes'='1',
          'connectionMaxActive'='5',
          'retryWaitTime'='100',
          'targetSchema'='public',
          'caseSensitive'='0',
          'cache'='lru',
          'cacheSize'='1000',
          'cacheTTLMs'='10000'
      );
      
      DROP TABLE IF EXISTS print_table;
      CREATE TABLE print_table (
        id INT,
        username varchar,
        f_random_str VARCHAR
      ) WITH (
        'connector'='print',
        'logger'='true'
      );

      Retain the values of parameters related to the datagen_source and print_table tables and modify the values of parameters related to the dim_adbpg table based on your business requirements. The following table describes the parameters of dim_adbpg.

      Parameter Required Description
      connector Yes The name of the connector. It is in the adbpg-nightly-Version number format. Example: adbpg-nightly-1.13.
      url Yes The JDBC URL that is used to connect to the AnalyticDB for PostgreSQL instance. It is in the jdbc:postgresql://<Internal endpoint>:<Port>/<Database name> format. Example: jdbc:postgresql://gp-xxxxxx.gpdb.cn-chengdu.rds.aliyuncs.com:3432/postgres.
      tablename Yes The name of the table created in the AnalyticDB for PostgreSQL instance.
      username Yes The name of the database account used to connect to the AnalyticDB for PostgreSQL instance.
      password Yes The password of the database account used to connect to the AnalyticDB for PostgreSQL instance.
      joinmaxrows No The maximum number of rows in the right table that can be joined with a row in the left table. Default value: 1024.

      If a large number of rows in the right table are joined, you must increase the cache size to ensure the performance of streaming tasks.

      maxretrytimes No The maximum number of retries allowed after a statement fails to be executed. Default value: 3.
      connectionmaxactive No The maximum number of active connections that can be allocated in the connection pool of the dimension table. Default value: 5.
      retrywaittime No The wait interval between retries of failed statements. Default value: 100. Unit: milliseconds.
      targetschema No The schema of the AnalyticDB for PostgreSQL instance. Default value: public.
      casesensitive No Specifies whether column and table names are case-sensitive. Default value: 0. Valid values:
      • 0: case-insensitive
      • 1: case-sensitive
      cache No The policy that is used to cache data. Default value: none. Valid values:
      • none: No data is cached.
      • lru: Partial data in the dimension table is cached. The system searches the cache each time it receives a data record from the source table. If the system cannot find the record in the cache, the system searches for the data record in the physical dimension table.

        If you set this parameter to lru, you must specify the cachesize and cachettlms parameters.

      • all: All data in the dimension table is cached. Before a Realtime Compute for Apache Flink job starts to run, the system loads all data in the dimension table to the cache, and then looks up the cache for all subsequent queries on the dimension table. If no keys exist, the system cannot find the data record in the cache. The system reloads all data in the cache after cache entries expire. If the remote table stores only a small volume of data and a large number of keys are missing, we recommend that you set this parameter to all. If you use the ON clause to join the dimension table with a source table and the dimension table does not contain the keys that are specified in the join conditions, these keys are missing.

        If you set this parameter to all, you must specify the cachettlms parameter.

      cachesize No The maximum number of rows that can be cached if the cache parameter is set to lru. Default value: 10000.
      cachettlms No The interval at which the system refreshes the cache. Unit: milliseconds. The system reloads the most recent data in the dimension table based on the value of this parameter to ensure that the source table can be joined with the most recent data of the dimension table.

      By default, this parameter is left empty, which indicates that the most recent data in the dimension table is not reloaded.

      exceptionmode No The policy that is used to handle exceptions during data reading. Default value: ignore. Valid values:
      • ignore: If an exception occurs during data reading, the system ignores the exception and returns only the data that exists before the exception.
      • strict: If an exception occurs during data reading, the system performs a failover and reports an error.
    4. Click Execute in the upper-right corner of the page.
  5. Read data from the AnalyticDB for PostgreSQL instance.
    1. Copy the following job code to the code editor:
      INSERT INTO print_table
      SELECT adbpg.id,adbpg.username,ds.f_random_str
      FROM datagen_source AS ds
       join
      dim_adbpg FOR SYSTEM_TIME AS OF PROCTIME() AS adbpg
      on ds.f_sequence = adbpg.id;
      Note When you join a dimension table with another table, you must include FOR SYSTEM_TIME AS OF PROCTIME() in the JOIN clause. For more information, see JOIN statements for dimension tables.
    2. Click Execute in the upper-right corner of the page.
      If no session cluster in the running state exists, the Session Cluster required dialog box appears. Perform the following operations:
      1. In the Session Cluster required dialog box, click OK.
      2. Enter a session cluster name in Name, turn on Use for SQL Editor Previews, and then retain the default settings for other parameters.
      3. Click Create Session Cluster.
      4. Click Start in the upper-right corner of the page.
      5. After the state of the session cluster changes to RUNNING, return to the Draft Editor page and click Execute in the upper-right corner.
    3. In the Mock Data Configuration dialog box, click Execute.
    4. If the data of the AnalyticDB for PostgreSQL instance is read normally, the query result is displayed in the Results section of the Realtime Compute for Apache Flink console. The following figure shows a sample result.
      Query result

Data type mappings

The following table describes the data type mappings between Realtime Compute for Apache Flink and AnalyticDB for PostgreSQL.

Data type of Realtime Compute for Apache Flink Data type of AnalyticDB for PostgreSQL
BOOLEAN BOOLEAN
TINYINT SAMLLINT
SAMLLINT SAMLLINT
INT INT
BIGINT BIGINT
DOUBLE DOUBLE PRECISION
VARCHAR TEXT
DATE DATE
FLOAT DOUBLE PRECISION
DECIMAL DECIMAL
TIME TIME
TIMESTAMP TIMESTAMP