All Products
Search
Document Center

AnalyticDB for PostgreSQL:Use Realtime Compute for Apache Flink to import vector data

Last Updated:Dec 12, 2023

AnalyticDB for PostgreSQL allows you to import vector data by using flink-adbpg-connector. This topic describes how to import vector data to AnalyticDB for PostgreSQL. In this example, ApsaraMQ for Kafka data is used.

Prerequisites

  • An AnalyticDB for PostgreSQL instance is created. For more information, see Create an instance.

  • A fully managed Flink workspace is created. The Flink workspace resides in the same virtual private cloud (VPC) as the AnalyticDB for PostgreSQL instance. For more information, see Activate fully managed Flink.

  • The vector retrieval extension FastANN is installed in the AnalyticDB for PostgreSQL database.

    You can run the \dx fastann command on the psql client to check whether the FastANN extension is installed.

    • If relevant information about the extension is returned, the extension is installed.

    • If no information is returned, Submit a ticket to install the extension.

  • An ApsaraMQ for Kafka instance is purchased and deployed. The instance resides in the same VPC as the AnalyticDB for PostgreSQL instance. For more information, see Purchase and deploy an Internet- and VPC-connected instance.

  • The CIDR blocks of the Flink workspace and the ApsaraMQ for Kafka instance are added to an IP address whitelist of the AnalyticDB for PostgreSQL instance. For more information, see Configure an IP address whitelist.

Test data

To facilitate your test, AnalyticDB for PostgreSQL provides a test data file named vector_sample_data.csv.

The following table describes the schema of the file.

Field

Type

Description

id

bigint

The serial number of the car.

market_time

timestamp

The time when the car is launched to the market.

color

varchar(10)

The color of the car.

price

int

The price of the car.

feature

float4[]

The feature vectors of the car image.

In the Linux system, you can run a command to download the test data. Sample command:

wget https://help-static-aliyun-doc.aliyuncs.com/file-manage-files/zh-CN/20230606/uzkx/vector_sample_data.cs

Procedure

  1. Create structured indexes and a vector index.

  2. Write the vector test data to an ApsaraMQ for Kafka topic.

  3. Create mapping tables and import data.

Create structured indexes and a vector index

  1. Connect to the AnalyticDB for PostgreSQL database. In this example, the psql client is used to connect to the instance. For more information, see the "psql" section of the Use client tools to connect to an instance topic.

  2. Create and switch to a test database.

    CREATE DATABASE adbpg_test;
    \c adbpg_test
  3. Create a destination table.

    CREATE SCHEMA IF NOT EXISTS vector_test;
    CREATE TABLE IF NOT EXISTS vector_test.car_info
    (
      id bigint NOT NULL,
      market_time timestamp,
      color varchar(10),
      price int,
      feature float4[],
      PRIMARY KEY(id)
    ) DISTRIBUTED BY(id);
  4. Create structured indexes and a vector index.

    -- Change the storage format of the vector column to PLAIN. 
    ALTER TABLE vector_test.car_info ALTER COLUMN feature SET STORAGE PLAIN;
    
    -- Create structured indexes. 
    CREATE INDEX ON vector_test.car_info(market_time);
    CREATE INDEX ON vector_test.car_info(color);
    CREATE INDEX ON vector_test.car_info(price);
    
    -- Create a vector index. 
    CREATE INDEX ON vector_test.car_info USING ann(feature) 
    WITH (dim='10', pq_enable='0');

Write the vector test data to an ApsaraMQ for Kafka topic

  1. Create an ApsaraMQ for Kafka topic.

    bin/kafka-topics.sh --create --topic vector_ingest --partitions 1 
    --bootstrap-server <your_broker_list>
  2. Write the vector test data to the ApsaraMQ for Kafka topic.

    bin/kafka-console-producer.sh 
    --bootstrap-server <your_broker_list>
    --topic vector_ingest < ../vector_sample_data.csv

<your_broker_list>: the endpoint of the ApsaraMQ for Kafka instance. You can go to the ApsaraMQ for Kafka console and view the endpoint of the instance in the Endpoint Information section of the Instance Details page.

Create mapping tables and import data

  1. Create a Flink draft.

    1. Log on to the Realtime Compute for Apache Flink console. On the Fully Managed Flink tab, find the workspace that you want to manage and click Console in the Actions column.

    2. In the left-side navigation pane, click SQL Editor. In the upper-left corner of the SQL Editor page, click New. In the New Draft dialog box, click Blank Stream Draft on the SQL Scripts tab and click Next.

    3. In the New Draft dialog box, configure the parameters of the draft. The following table describes the parameters.

      Parameter

      Description

      Example

      Name

      The name of the draft that you want to create.

      Note

      The draft name must be unique in the current project.

      adbpg-test

      Location

      The folder in which the code file of the draft is stored.

      You can click the 新建文件夹 icon to the right of a folder to create a subfolder.

      Draft

      Engine Version

      The engine version of Flink that is used by the draft. For more information about engine versions, version mappings, and important time points in the lifecycle of each version, see Engine version.

      vvr-6.0.6-flink-1.15

  2. Create an AnalyticDB for PostgreSQL mapping table.

    CREATE TABLE vector_ingest (
      id INT,
      market_time TIMESTAMP,
      color VARCHAR(10),
      price int,
      feature VARCHAR
    )WITH (
       'connector' = 'adbpg-nightly-1.13',
       'url' = 'jdbc:postgresql://<your_instance_url>:5432/adbpg_test',
       'tablename' = 'car_info',
       'username' = '<your_username>',
       'password' = '<your_password>',
       'targetschema' = 'vector_test',
       'maxretrytimes' = '2',
       'batchsize' = '3000',
       'batchwritetimeoutms' = '10000',
       'connectionmaxactive' = '20',
       'conflictmode' = 'ignore',
       'exceptionmode' = 'ignore',
       'casesensitive' = '0',
       'writemode' = '1',
       'retrywaittime' = '200'
    );

    For more information about the parameters, see Use Realtime Compute for Apache Flink to write data to AnalyticDB for PostgreSQL.

  3. Create an ApsaraMQ for Kafka mapping table.

    CREATE TABLE vector_kafka (
      id INT,
      market_time TIMESTAMP,
      color VARCHAR(10),
      price int,
      feature string
    ) 
    WITH (
        'connector' = 'kafka',
        'properties.bootstrap.servers' = '<your_broker_list>',
        'topic' = 'vector_ingest',
        'format' = 'csv',
        'csv.field-delimiter' = '\t',
        'scan.startup.mode' = 'earliest-offset'
    );

    The following table describes the parameters.

    Parameter

    Required

    Description

    connector

    Yes

    The name of the connector. Set the value to kafka.

    properties.bootstrap.servers

    Yes

    The endpoint of the ApsaraMQ for Kafka instance. You can go to the ApsaraMQ for Kafka console and view the endpoint of the instance in the Endpoint Information section of the Instance Details page.

    topic

    Yes

    The name of the topic that contains ApsaraMQ for Kafka messages.

    format

    Yes

    The format that is used to write the value fields of ApsaraMQ for Kafka messages. Valid values:

    • csv

    • json

    • avro

    • debezium-json

    • canal-json

    • maxwell-json

    • avro-confluent

    • raw

    csv.field-delimiter

    Yes

    The delimiter of CSV fields.

    scan.startup.mode

    Yes

    The start offset from which data is read from the ApsaraMQ for Kafka instance. Valid values:

    • earliest-offset: Data is read from the earliest partition of the ApsaraMQ for Kafka instance.

    • latest-offset: Data is read from the latest partition of the ApsaraMQ for Kafka instance.

  4. Create an import task.

    INSERT INTO vector_ingest SELECT * FROM vector_kafka;