When your application continuously generates vector embeddings — for example, from a recommendation engine or an image-processing pipeline — you need a way to land those vectors in AnalyticDB for PostgreSQL without batch delays. This guide shows you how to stream vector data from ApsaraMQ for Kafka into AnalyticDB for PostgreSQL using flink-adbpg-connector, so new vectors are queryable within seconds of being produced.
Prerequisites
Before you begin, ensure that you have:
-
An AnalyticDB for PostgreSQL instance. For setup instructions, see Create an instance.
-
A fully managed Flink workspace in the same virtual private cloud (VPC) as the AnalyticDB for PostgreSQL instance. For setup instructions, see Activate fully managed Flink.
-
If you use open source self-managed Flink, install flink-adbpg-connector in the
$FLINK_HOME/libdirectory. -
If you use fully managed Flink, no additional installation is required.
-
-
The FastANN extension installed in the AnalyticDB for PostgreSQL database. Run
\dx fastannon the psql client to check. If no output is returned, Submit a ticket to have it installed. -
An ApsaraMQ for Kafka instance in the same VPC as the AnalyticDB for PostgreSQL instance. For setup instructions, see Purchase and deploy an Internet- and VPC-connected instance.
-
The CIDR blocks of the Flink workspace and the ApsaraMQ for Kafka instance added to the IP address whitelist of the AnalyticDB for PostgreSQL instance. For instructions, see Configure an IP address whitelist.
Test data
AnalyticDB for PostgreSQL provides a sample data file, vector_sample_data.csv, to help you follow along. The file uses the following schema:
| Field | Type | Description |
|---|---|---|
| id | bigint | The serial number of the car |
| market_time | timestamp | The time when the car is launched to the market |
| color | varchar(10) | The color of the car |
| price | int | The price of the car |
| feature | float4[] | The feature vectors of the car image |
To download the file on Linux:
wget https://help-static-aliyun-doc.aliyuncs.com/file-manage-files/zh-CN/20230606/uzkx/vector_sample_data.csv
How it works
-
Create a destination table in AnalyticDB for PostgreSQL with structured indexes and a vector index.
-
Publish the vector test data to an ApsaraMQ for Kafka topic.
-
Create Flink mapping tables for both the Kafka source and the AnalyticDB for PostgreSQL sink, then run an INSERT statement to start the streaming import.
Step 1: Create the destination table and indexes
-
Connect to the AnalyticDB for PostgreSQL database using the psql client. For instructions, see the psql section of the client connection topic.
-
Create and switch to a test database:
CREATE DATABASE adbpg_test; \c adbpg_test -
Create the destination table:
CREATE SCHEMA IF NOT EXISTS vector_test; CREATE TABLE IF NOT EXISTS vector_test.car_info ( id bigint NOT NULL, market_time timestamp, color varchar(10), price int, feature float4[], PRIMARY KEY(id) ) DISTRIBUTED BY(id); -
Create structured indexes and a vector index:
-- Set the storage format of the vector column to PLAIN ALTER TABLE vector_test.car_info ALTER COLUMN feature SET STORAGE PLAIN; -- Create structured indexes on filterable columns CREATE INDEX ON vector_test.car_info(market_time); CREATE INDEX ON vector_test.car_info(color); CREATE INDEX ON vector_test.car_info(price); -- Create a vector index using approximate nearest neighbor (ANN) search CREATE INDEX ON vector_test.car_info USING ann(feature) WITH (dim='10', pq_enable='0');
Step 2: Publish vector test data to a Kafka topic
-
Create a Kafka topic named
vector_ingest:bin/kafka-topics.sh --create --topic vector_ingest --partitions 1 \ --bootstrap-server <your_broker_list> -
Publish the test data to the topic:
bin/kafka-console-producer.sh \ --bootstrap-server <your_broker_list> \ --topic vector_ingest < ../vector_sample_data.csv
Replace <your_broker_list> with the endpoint of your ApsaraMQ for Kafka instance. Find the endpoint in the Endpoint Information section of the Instance Details page in the ApsaraMQ for Kafka console.
Step 3: Create mapping tables and start the import
Create a Flink draft
-
Log on to the Realtime Compute for Apache Flink console. On the Fully Managed Flink tab, find the workspace you want to use and click Console in the Actions column.
-
In the left-side navigation pane, click SQL Editor. In the upper-left corner, click New.
-
In the New Draft dialog box, click Blank Stream Draft on the SQL Scripts tab, then click Next.
-
Configure the draft parameters:
Parameter Description Example Name The name of the draft. Must be unique in the current project. adbpg-test Location The folder where the draft is stored. Click the icon next to a folder to create a subfolder. Draft Engine Version The Flink engine version for this draft. For version details, see Engine version. vvr-6.0.6-flink-1.15
Create an AnalyticDB for PostgreSQL mapping table
Define the sink table in Flink. The feature column uses VARCHAR because Flink passes the vector data as a string to the connector, which then writes it to the float4[] column in AnalyticDB for PostgreSQL.
CREATE TABLE vector_ingest (
id INT,
market_time TIMESTAMP,
color VARCHAR(10),
price int,
feature VARCHAR
) WITH (
'connector' = 'adbpg-nightly-1.13',
'url' = 'jdbc:postgresql://<your_instance_url>:5432/adbpg_test',
'tablename' = 'car_info',
'username' = '<your_username>',
'password' = '<your_password>',
'targetschema' = 'vector_test',
'maxretrytimes' = '2',
'batchsize' = '3000',
'batchwritetimeoutms' = '10000',
'connectionmaxactive' = '20',
'conflictmode' = 'ignore',
'exceptionmode' = 'ignore',
'casesensitive' = '0',
'writemode' = '1',
'retrywaittime' = '200'
);
Replace the following placeholders:
| Placeholder | Description |
|---|---|
<your_instance_url> |
The connection endpoint of the AnalyticDB for PostgreSQL instance |
<your_username> |
The database username |
<your_password> |
The database password |
The following table describes the key sink connector parameters:
| Parameter | Type | Default | Description |
|---|---|---|---|
connector |
String | (none) | The connector type. Set to adbpg-nightly-1.13. |
url |
String | (none) | The JDBC connection URL of the AnalyticDB for PostgreSQL instance. Format: jdbc:postgresql://<host>:5432/<database>. |
tablename |
String | (none) | The destination table name. |
targetschema |
String | (none) | The schema of the destination table. |
username |
String | (none) | The database username. |
password |
String | (none) | The database password. |
batchsize |
Integer | 3000 | The number of rows to buffer before flushing to the database. |
batchwritetimeoutms |
Integer | 10000 | The maximum time in milliseconds to wait before flushing a partial batch. |
connectionmaxactive |
Integer | 20 | The maximum number of active database connections in the pool. |
maxretrytimes |
Integer | 2 | The number of retry attempts on write failure. |
retrywaittime |
Integer | 200 | The wait time in milliseconds between retry attempts. |
writemode |
Integer | 1 | The write mode. 1 = append; rows with duplicate primary keys are handled by conflictmode. |
conflictmode |
String | ignore | The behavior on primary key conflict. ignore = drop the duplicate row silently. |
exceptionmode |
String | ignore | The behavior on other write exceptions. ignore = skip the failed row. |
casesensitive |
Integer | 0 | Whether column name matching is case-sensitive. 0 = case-insensitive. |
For the full parameter reference, see Use Realtime Compute for Apache Flink to write data to AnalyticDB for PostgreSQL.
Create an ApsaraMQ for Kafka mapping table
Define the source table in Flink:
CREATE TABLE vector_kafka (
id INT,
market_time TIMESTAMP,
color VARCHAR(10),
price int,
feature string
) WITH (
'connector' = 'kafka',
'properties.bootstrap.servers' = '<your_broker_list>',
'topic' = 'vector_ingest',
'format' = 'csv',
'csv.field-delimiter' = '\t',
'scan.startup.mode' = 'earliest-offset'
);
The following table describes the Kafka source connector parameters:
| Parameter | Type | Required | Default | Description |
|---|---|---|---|---|
connector |
String | Yes | (none) | The connector type. Set to kafka. |
properties.bootstrap.servers |
String | Yes | (none) | The endpoint of the ApsaraMQ for Kafka instance. Find it in the Endpoint Information section of the Instance Details page in the ApsaraMQ for Kafka console. |
topic |
String | Yes | (none) | The name of the Kafka topic to read from. |
format |
String | Yes | (none) | The message format. Valid values: csv, json, avro, debezium-json, canal-json, maxwell-json, avro-confluent, raw. |
csv.field-delimiter |
String | Yes | (none) | The field delimiter for CSV-formatted messages. |
scan.startup.mode |
String | Yes | (none) | The offset from which to start reading. earliest-offset reads from the beginning of the topic; latest-offset reads only new messages. |
Start the import
Run the following SQL statement to start streaming data from Kafka into AnalyticDB for PostgreSQL:
INSERT INTO vector_ingest SELECT * FROM vector_kafka;
Flink submits this as a continuous streaming job. Data flows from the Kafka topic into the vector_test.car_info table as messages arrive.
What's next
-
Use Realtime Compute for Apache Flink to write data to AnalyticDB for PostgreSQL — full connector parameter reference
-
Configure an IP address whitelist — manage network access for your instance