All Products
Search
Document Center

AnalyticDB:Integrate vector data using Realtime Compute for Apache Flink

Last Updated:Mar 30, 2026

When your application continuously generates vector embeddings — for example, from a recommendation engine or an image-processing pipeline — you need a way to land those vectors in AnalyticDB for PostgreSQL without batch delays. This guide shows you how to stream vector data from ApsaraMQ for Kafka into AnalyticDB for PostgreSQL using flink-adbpg-connector, so new vectors are queryable within seconds of being produced.

Prerequisites

Before you begin, ensure that you have:

  • An AnalyticDB for PostgreSQL instance. For setup instructions, see Create an instance.

  • A fully managed Flink workspace in the same virtual private cloud (VPC) as the AnalyticDB for PostgreSQL instance. For setup instructions, see Activate fully managed Flink.

    • If you use open source self-managed Flink, install flink-adbpg-connector in the $FLINK_HOME/lib directory.

    • If you use fully managed Flink, no additional installation is required.

  • The FastANN extension installed in the AnalyticDB for PostgreSQL database. Run \dx fastann on the psql client to check. If no output is returned, Submit a ticket to have it installed.

  • An ApsaraMQ for Kafka instance in the same VPC as the AnalyticDB for PostgreSQL instance. For setup instructions, see Purchase and deploy an Internet- and VPC-connected instance.

  • The CIDR blocks of the Flink workspace and the ApsaraMQ for Kafka instance added to the IP address whitelist of the AnalyticDB for PostgreSQL instance. For instructions, see Configure an IP address whitelist.

Test data

AnalyticDB for PostgreSQL provides a sample data file, vector_sample_data.csv, to help you follow along. The file uses the following schema:

Field Type Description
id bigint The serial number of the car
market_time timestamp The time when the car is launched to the market
color varchar(10) The color of the car
price int The price of the car
feature float4[] The feature vectors of the car image

To download the file on Linux:

wget https://help-static-aliyun-doc.aliyuncs.com/file-manage-files/zh-CN/20230606/uzkx/vector_sample_data.csv

How it works

  1. Create a destination table in AnalyticDB for PostgreSQL with structured indexes and a vector index.

  2. Publish the vector test data to an ApsaraMQ for Kafka topic.

  3. Create Flink mapping tables for both the Kafka source and the AnalyticDB for PostgreSQL sink, then run an INSERT statement to start the streaming import.

Step 1: Create the destination table and indexes

  1. Connect to the AnalyticDB for PostgreSQL database using the psql client. For instructions, see the psql section of the client connection topic.

  2. Create and switch to a test database:

    CREATE DATABASE adbpg_test;
    \c adbpg_test
  3. Create the destination table:

    CREATE SCHEMA IF NOT EXISTS vector_test;
    CREATE TABLE IF NOT EXISTS vector_test.car_info
    (
      id bigint NOT NULL,
      market_time timestamp,
      color varchar(10),
      price int,
      feature float4[],
      PRIMARY KEY(id)
    ) DISTRIBUTED BY(id);
  4. Create structured indexes and a vector index:

    -- Set the storage format of the vector column to PLAIN
    ALTER TABLE vector_test.car_info ALTER COLUMN feature SET STORAGE PLAIN;
    
    -- Create structured indexes on filterable columns
    CREATE INDEX ON vector_test.car_info(market_time);
    CREATE INDEX ON vector_test.car_info(color);
    CREATE INDEX ON vector_test.car_info(price);
    
    -- Create a vector index using approximate nearest neighbor (ANN) search
    CREATE INDEX ON vector_test.car_info USING ann(feature)
    WITH (dim='10', pq_enable='0');

Step 2: Publish vector test data to a Kafka topic

  1. Create a Kafka topic named vector_ingest:

    bin/kafka-topics.sh --create --topic vector_ingest --partitions 1 \
      --bootstrap-server <your_broker_list>
  2. Publish the test data to the topic:

    bin/kafka-console-producer.sh \
      --bootstrap-server <your_broker_list> \
      --topic vector_ingest < ../vector_sample_data.csv

Replace <your_broker_list> with the endpoint of your ApsaraMQ for Kafka instance. Find the endpoint in the Endpoint Information section of the Instance Details page in the ApsaraMQ for Kafka console.

Step 3: Create mapping tables and start the import

Create a Flink draft

  1. Log on to the Realtime Compute for Apache Flink console. On the Fully Managed Flink tab, find the workspace you want to use and click Console in the Actions column.

  2. In the left-side navigation pane, click SQL Editor. In the upper-left corner, click New.

  3. In the New Draft dialog box, click Blank Stream Draft on the SQL Scripts tab, then click Next.

  4. Configure the draft parameters:

    Parameter Description Example
    Name The name of the draft. Must be unique in the current project. adbpg-test
    Location The folder where the draft is stored. Click the icon next to a folder to create a subfolder. Draft
    Engine Version The Flink engine version for this draft. For version details, see Engine version. vvr-6.0.6-flink-1.15

Create an AnalyticDB for PostgreSQL mapping table

Define the sink table in Flink. The feature column uses VARCHAR because Flink passes the vector data as a string to the connector, which then writes it to the float4[] column in AnalyticDB for PostgreSQL.

CREATE TABLE vector_ingest (
  id INT,
  market_time TIMESTAMP,
  color VARCHAR(10),
  price int,
  feature VARCHAR
) WITH (
  'connector' = 'adbpg-nightly-1.13',
  'url' = 'jdbc:postgresql://<your_instance_url>:5432/adbpg_test',
  'tablename' = 'car_info',
  'username' = '<your_username>',
  'password' = '<your_password>',
  'targetschema' = 'vector_test',
  'maxretrytimes' = '2',
  'batchsize' = '3000',
  'batchwritetimeoutms' = '10000',
  'connectionmaxactive' = '20',
  'conflictmode' = 'ignore',
  'exceptionmode' = 'ignore',
  'casesensitive' = '0',
  'writemode' = '1',
  'retrywaittime' = '200'
);

Replace the following placeholders:

Placeholder Description
<your_instance_url> The connection endpoint of the AnalyticDB for PostgreSQL instance
<your_username> The database username
<your_password> The database password

The following table describes the key sink connector parameters:

Parameter Type Default Description
connector String (none) The connector type. Set to adbpg-nightly-1.13.
url String (none) The JDBC connection URL of the AnalyticDB for PostgreSQL instance. Format: jdbc:postgresql://<host>:5432/<database>.
tablename String (none) The destination table name.
targetschema String (none) The schema of the destination table.
username String (none) The database username.
password String (none) The database password.
batchsize Integer 3000 The number of rows to buffer before flushing to the database.
batchwritetimeoutms Integer 10000 The maximum time in milliseconds to wait before flushing a partial batch.
connectionmaxactive Integer 20 The maximum number of active database connections in the pool.
maxretrytimes Integer 2 The number of retry attempts on write failure.
retrywaittime Integer 200 The wait time in milliseconds between retry attempts.
writemode Integer 1 The write mode. 1 = append; rows with duplicate primary keys are handled by conflictmode.
conflictmode String ignore The behavior on primary key conflict. ignore = drop the duplicate row silently.
exceptionmode String ignore The behavior on other write exceptions. ignore = skip the failed row.
casesensitive Integer 0 Whether column name matching is case-sensitive. 0 = case-insensitive.

For the full parameter reference, see Use Realtime Compute for Apache Flink to write data to AnalyticDB for PostgreSQL.

Create an ApsaraMQ for Kafka mapping table

Define the source table in Flink:

CREATE TABLE vector_kafka (
  id INT,
  market_time TIMESTAMP,
  color VARCHAR(10),
  price int,
  feature string
) WITH (
  'connector' = 'kafka',
  'properties.bootstrap.servers' = '<your_broker_list>',
  'topic' = 'vector_ingest',
  'format' = 'csv',
  'csv.field-delimiter' = '\t',
  'scan.startup.mode' = 'earliest-offset'
);

The following table describes the Kafka source connector parameters:

Parameter Type Required Default Description
connector String Yes (none) The connector type. Set to kafka.
properties.bootstrap.servers String Yes (none) The endpoint of the ApsaraMQ for Kafka instance. Find it in the Endpoint Information section of the Instance Details page in the ApsaraMQ for Kafka console.
topic String Yes (none) The name of the Kafka topic to read from.
format String Yes (none) The message format. Valid values: csv, json, avro, debezium-json, canal-json, maxwell-json, avro-confluent, raw.
csv.field-delimiter String Yes (none) The field delimiter for CSV-formatted messages.
scan.startup.mode String Yes (none) The offset from which to start reading. earliest-offset reads from the beginning of the topic; latest-offset reads only new messages.

Start the import

Run the following SQL statement to start streaming data from Kafka into AnalyticDB for PostgreSQL:

INSERT INTO vector_ingest SELECT * FROM vector_kafka;

Flink submits this as a continuous streaming job. Data flows from the Kafka topic into the vector_test.car_info table as messages arrive.

What's next