AnalyticDB for PostgreSQLでは、flink-adbpg-connectorを使用してベクトルデータをインポートできます。 このトピックでは、ベクトルデータをAnalyticDB for PostgreSQLにインポートする方法について説明します。 この例では、ApsaraMQ for Kafkaデータが使用されます。
前提条件
AnalyticDB for PostgreSQLインスタンスが作成されました。 詳細については、「インスタンスの作成」をご参照ください。
フルマネージドFlinkワークスペースが作成されます。 Flinkワークスペースは、AnalyticDB for PostgreSQLインスタンスと同じ仮想プライベートクラウド (VPC) にあります。 詳細については、「フルマネージドFlinkの有効化」をご参照ください。
オープンソースのセルフマネージドFlinkを使用する場合は、flink-adbpg-connectorが
$FLINK_HOME/lib
ディレクトリにインストールされていることを確認してください。フルマネージドFlinkを使用する場合、操作は必要ありません。
ベクトル検索拡張機能FastANNは、AnalyticDB for PostgreSQLデータベースにインストールされます。
psqlクライアントで
\dx fastann
コマンドを実行して、FastANN拡張機能がインストールされているかどうかを確認できます。拡張機能に関する関連情報が返された場合、拡張機能がインストールされます。
情報が返されない場合、 拡張機能をインストールするには、チケットを起票してください。
ApsaraMQ for Kafkaインスタンスが購入され、デプロイされます。 インスタンスは、AnalyticDB for PostgreSQLインスタンスと同じVPCにあります。 詳細については、「インターネットおよびVPC接続インスタンスの購入とデプロイ」をご参照ください。
FlinkワークスペースとApsaraMQ for KafkaインスタンスのCIDRブロックが、AnalyticDB for PostgreSQLインスタンスのIPアドレスホワイトリストに追加されます。 詳細については、「IPアドレスホワイトリストの設定」をご参照ください。
テストデータ
テストを容易にするために、AnalyticDB for PostgreSQLはvector_sample_data.csvという名前のテストデータファイルを提供します。
次の表に、ファイルのスキーマを示します。
項目 | データ型 | 説明 |
id | bigint | 車のシリアル番号。 |
market_time | timestamp | 車が市場に投入される時間。 |
color | varchar (10) | 車の色。 |
price | int | 車の価格。 |
特徴 | float4[] | 車の画像の特徴ベクトル。 |
Linuxシステムでは、コマンドを実行してテストデータをダウンロードできます。 サンプルコマンド:
wget https://help-static-aliyun-doc.aliyuncs.com/file-manage-files/zh-CN/20230606/uzkx/vector_sample_data.cs
手順
構造化インデックスとベクトルインデックスの作成
AnalyticDB for PostgreSQLデータベースに接続します。 この例では、psqlクライアントを使用してインスタンスに接続します。 詳細については、「クライアントツールを使用してインスタンスに接続する」トピックの「psql」セクションを参照してください。
テストデータベースを作成して切り替えます。
CREATE DATABASE adbpg_test; \c adbpg_test
宛先テーブルを作成します。
vector_testが存在しない場合は
CREATE SCHEMA IF NOT EXISTS vector_test; CREATE TABLE IF NOT EXISTS vector_test.car_info ( id bigint NOT NULL, market_time timestamp, color varchar(10), price int, feature float4[], PRIMARY KEY(id) ) DISTRIBUTED BY(id);
構造化インデックスとベクトルインデックスを作成します。
-- Change the storage format of the vector column to PLAIN. ALTER TABLE vector_test.car_info ALTER COLUMN feature SET STORAGE PLAIN; -- Create structured indexes. CREATE INDEX ON vector_test.car_info(market_time); CREATE INDEX ON vector_test.car_info(color); CREATE INDEX ON vector_test.car_info(price); -- Create a vector index. CREATE INDEX ON vector_test.car_info USING ann(feature) WITH (dim='10', pq_enable='0');
ApsaraMQ for Kafkaトピックへのベクターテストデータの書き込み
ApsaraMQ for Kafkaトピックを作成します。
bin/kafka-topics.sh --create --topic vector_ingest --partitions 1 --bootstrap-server <your_broker_list>
ApsaraMQ for Kafkaトピックにベクターテストデータを書き込みます。
bin/kafka-console-producer.sh --bootstrap-server <your_broker_list> --topic vector_ingest < ../vector_sample_data.csv
<your_broker_list>
: ApsaraMQ for Kafkaインスタンスのエンドポイント。 ApsaraMQ for Kafkaコンソールに移動し、[インスタンスの詳細] ページの [エンドポイント情報] セクションでインスタンスのエンドポイントを表示できます。
マッピングテーブルの作成とデータのインポート
Flinkドラフトを作成します。
Realtime Compute for Apache Flinkコンソールにログインします。 [完全管理Flink] タブで、管理するワークスペースを見つけ、[操作] 列の [コンソール] をクリックします。
左側のナビゲーションウィンドウで、
をクリックします。 SQLエディターページの左上隅で、[新規作成] をクリックします。 [新しいドラフト] ダイアログボックスで、[SQLスクリプト] タブの [空白のストリームドラフト] をクリックし、[次へ] をクリックします。[新しいドラフト] ダイアログボックスで、ドラフトのパラメーターを設定します。 下表に、各パラメーターを説明します。
パラメーター
説明
例
名前
作成するドラフトの名前。
説明ドラフト名は、現在のプロジェクトで一意である必要があります。
adbpg-test
場所
ドラフトのコードファイルが格納されているフォルダ。
フォルダの右側にある
アイコンをクリックすると、サブフォルダを作成できます。
ドラフト
エンジン版
ドラフトで使用されるFlinkのエンジンバージョン。 エンジンのバージョン、バージョンマッピング、および各バージョンのライフサイクルにおける重要な時点の詳細については、「エンジンバージョン」をご参照ください。
vvr-6.0.6-flink-1.15
AnalyticDB for PostgreSQLマッピングテーブルを作成します。
CREATE TABLE vector_ingest ( id INT, market_time TIMESTAMP, color VARCHAR(10), price int, feature VARCHAR )WITH ( 'connector' = 'adbpg-nightly-1.13', 'url' = 'jdbc:postgresql://<your_instance_url>:5432/adbpg_test', 'tablename' = 'car_info', 'username' = '<your_username>', 'password' = '<your_password>', 'targetschema' = 'vector_test', 'maxretrytimes' = '2', 'batchsize' = '3000', 'batchwritetimeoutms' = '10000', 'connectionmaxactive' = '20', 'conflictmode' = 'ignore', 'exceptionmode' = 'ignore', 'casesensitive' = '0', 'writemode' = '1', 'retrywaittime' = '200' );
パラメーターの詳細については、「Realtime Compute For Apache Flinkを使用してAnalyticDB for PostgreSQLにデータを書き込む」をご参照ください。
ApsaraMQ for Kafkaマッピングテーブルを作成します。
CREATE TABLE vector_kafka ( id INT, market_time TIMESTAMP, color VARCHAR(10), price int, feature string ) WITH ( 'connector' = 'kafka', 'properties.bootstrap.servers' = '<your_broker_list>', 'topic' = 'vector_ingest', 'format' = 'csv', 'csv.field-delimiter' = '\t', 'scan.startup.mode' = 'earliest-offset' );
下表に、各パラメーターを説明します。
パラメーター
必須
説明
コネクター
継続する
コネクタの名前。 値をkafkaに設定します。
properties.bootstrap.servers
継続する
ApsaraMQ for Kafkaインスタンスのエンドポイント。 ApsaraMQ for Kafkaコンソールに移動し、[インスタンスの詳細] ページの [エンドポイント情報] セクションでインスタンスのエンドポイントを表示できます。
topic
継続する
ApsaraMQ for Kafkaメッセージを含むトピックの名前。
フォーマット
継続する
ApsaraMQ for Kafkaメッセージの値フィールドの書き込みに使用される形式。 有効な値:
csv
JSON
avro
debezium-json
運河-json
maxwell-json
avro-confluent
生
csv.field-区切り文字
継続する
CSVフィールドの区切り文字。
scan.startup.mode
継続する
ApsaraMQ for Kafkaインスタンスからデータを読み取る開始オフセット。 有効な値:
bearest-offset: ApsaraMQ for Kafkaインスタンスの最も早いパーティションからデータが読み取られます。
latest-offset: ApsaraMQ for Kafkaインスタンスの最新パーティションからデータが読み取られます。
インポートタスクを作成します。
INSERT INTO vector_ingest SELECT * FROM vector_kafka;