アプリケーションがレコメンデーションエンジンや画像処理パイプラインなどからベクトル埋め込みを継続的に生成する場合、バッチ処理の遅延なくそれらのベクトルを AnalyticDB for PostgreSQL に格納する方法が必要です。このガイドでは、flink-adbpg-connector を使用して、ApsaraMQ for Kafka から AnalyticDB for PostgreSQL にベクトルデータをストリーミングする方法を説明します。これにより、新しいベクトルは生成後数秒でクエリ可能になります。
前提条件
開始する前に、以下が準備できていることを確認してください。
AnalyticDB for PostgreSQL インスタンス。セットアップ手順については、「インスタンスの作成」をご参照ください。
AnalyticDB for PostgreSQL インスタンスと同じ VPC 内にある Flink 完全管理ワークスペース。セットアップ手順については、「Flink 完全管理のアクティベーション」をご参照ください。
オープンソースの自己管理 Flink を使用する場合は、
$FLINK_HOME/libディレクトリに flink-adbpg-connector をインストールしてください。Flink 完全管理を使用する場合、追加のインストールは不要です。
AnalyticDB for PostgreSQL データベースに FastANN 拡張がインストールされていること。psql クライアントで
\dx fastannを実行して確認します。出力が返されない場合は、チケットを起票してインストールを依頼してください。AnalyticDB for PostgreSQL インスタンスと同じ VPC 内にある ApsaraMQ for Kafka インスタンス。 詳細については、「インターネットおよび VPC 接続インスタンスを購入してデプロイする」をご参照ください。
Flink ワークスペースと ApsaraMQ for Kafka インスタンスの CIDR ブロックが、AnalyticDB for PostgreSQL インスタンスの IP アドレスホワイトリストに追加されていること。手順については、「IP アドレスホワイトリストの設定」をご参照ください。
テストデータ
AnalyticDB for PostgreSQL では、手順に沿って操作するためのサンプルデータファイル、vector_sample_data.csv を提供しています。このファイルは以下のスキーマを使用します:
| フィールド | 型 | 説明 |
|---|---|---|
| id | bigint | 車のシリアル番号 |
| market_time | timestamp | 車が市場に投入された時間 |
| color | varchar(10) | 車の色 |
| price | int | 車の価格 |
| feature | float4[] | 車の画像の特徴ベクトル |
Linux でファイルをダウンロードするには:
wget https://help-static-aliyun-doc.aliyuncs.com/file-manage-files/zh-CN/20230606/uzkx/vector_sample_data.csv仕組み
AnalyticDB for PostgreSQL に、構造化インデックスとベクトルインデックスを持つ送信先テーブルを作成します。
ベクトルテストデータを ApsaraMQ for Kafka トピックに発行します。
Kafka ソースと AnalyticDB for PostgreSQL シンクの両方に対して Flink マッピングテーブルを作成し、INSERT 文を実行してストリーミングインポートを開始します。
ステップ 1:送信先テーブルとインデックスの作成
psql クライアントを使用して AnalyticDB for PostgreSQL データベースに接続します。手順については、「psql」セクションのクライアント接続に関するトピックをご参照ください。
テストデータベースを作成して切り替えます。
CREATE DATABASE adbpg_test; \c adbpg_test送信先テーブルを作成します。
CREATE SCHEMA IF NOT EXISTS vector_test; CREATE TABLE IF NOT EXISTS vector_test.car_info ( id bigint NOT NULL, market_time timestamp, color varchar(10), price int, feature float4[], PRIMARY KEY(id) ) DISTRIBUTED BY(id);構造化インデックスとベクトルインデックスを作成します。
-- ベクトル列のストレージフォーマットを PLAIN に設定 ALTER TABLE vector_test.car_info ALTER COLUMN feature SET STORAGE PLAIN; -- フィルタリング可能な列に構造化インデックスを作成 CREATE INDEX ON vector_test.car_info(market_time); CREATE INDEX ON vector_test.car_info(color); CREATE INDEX ON vector_test.car_info(price); -- 近似最近傍 (ANN) 検索を使用してベクトルインデックスを作成 CREATE INDEX ON vector_test.car_info USING ann(feature) WITH (dim='10', pq_enable='0');
ステップ 2:ベクトルテストデータを Kafka トピックに発行
vector_ingestという名前の Kafka トピックを作成します。bin/kafka-topics.sh --create --topic vector_ingest --partitions 1 \ --bootstrap-server <your_broker_list>テストデータをトピックに発行します。
bin/kafka-console-producer.sh \ --bootstrap-server <your_broker_list> \ --topic vector_ingest < ../vector_sample_data.csv
<your_broker_list> を、ApsaraMQ for Kafka インスタンスのエンドポイントに置き換えてください。このエンドポイントは、ApsaraMQ for Kafka コンソールの[インスタンスの詳細]ページにある[エンドポイント情報]セクションで確認できます。
ステップ 3:マッピングテーブルの作成とインポートの開始
Flink ドラフトの作成
Realtime Compute for Apache Flink コンソールにログインします。[完全マネージド Flink] タブで、使用するワークスペースを見つけ、[操作] 列の [コンソール] をクリックします。
左側のナビゲーションウィンドウで、[SQL エディター] をクリックします。左上隅で、[新規] をクリックします。
[New Draft] ダイアログボックスで、[Blank Stream Draft] を [SQL Scripts] タブでクリックし、次に [Next] をクリックします。
ドラフトパラメーターを設定します。
パラメーター 説明 例 Name ドラフトの名前です。現在のプロジェクト内で一意である必要があります。 adbpg-test Location ドラフトを保存するフォルダです。フォルダ横のアイコンをクリックすると、サブフォルダを作成できます。 Draft Engine Version このドラフトで使用する Flink エンジンのバージョンです。バージョンの詳細については、「Engine version」をご参照ください。 vvr-6.0.6-flink-1.15
AnalyticDB for PostgreSQL マッピングテーブルの作成
Flink で結果テーブルを定義します。feature 列は VARCHAR を使用します。これは、Flink がベクトルデータを文字列としてコネクタに渡し、コネクタがそれを AnalyticDB for PostgreSQL の float4[] 列に書き込むためです。
CREATE TABLE vector_ingest (
id INT,
market_time TIMESTAMP,
color VARCHAR(10),
price int,
feature VARCHAR
) WITH (
'connector' = 'adbpg-nightly-1.13',
'url' = 'jdbc:postgresql://<your_instance_url>:5432/adbpg_test',
'tablename' = 'car_info',
'username' = '<your_username>',
'password' = '<your_password>',
'targetschema' = 'vector_test',
'maxretrytimes' = '2',
'batchsize' = '3000',
'batchwritetimeoutms' = '10000',
'connectionmaxactive' = '20',
'conflictmode' = 'ignore',
'exceptionmode' = 'ignore',
'casesensitive' = '0',
'writemode' = '1',
'retrywaittime' = '200'
);次のプレースホルダーを置き換えます。
| プレースホルダー | 説明 |
|---|---|
<your_instance_url> | AnalyticDB for PostgreSQL インスタンスの接続エンドポイント |
<your_username> | データベースのユーザー名 |
<your_password> | データベースのパスワード |
次の表に、主要なシンクコネクタのパラメーターを示します。
| パラメーター | 型 | デフォルト | 説明 |
|---|---|---|---|
connector | String | (なし) | コネクタのタイプ。adbpg-nightly-1.13 に設定します。 |
url | String | (なし) | AnalyticDB for PostgreSQL インスタンスの JDBC 接続 URL。フォーマット: jdbc:postgresql://<host>:5432/<database>。 |
tablename | String | (なし) | 送信先テーブル名。 |
targetschema | String | (なし) | 送信先テーブルのスキーマ。 |
username | String | (なし) | データベースのユーザー名。 |
password | String | (なし) | データベースのパスワード。 |
batchsize | Integer | 3000 | データベースにフラッシュする前にバッファリングする行数。 |
batchwritetimeoutms | Integer | 10000 | 部分的なバッチをフラッシュするまでの最大待機時間 (ミリ秒)。 |
connectionmaxactive | Integer | 20 | 接続プール内のアクティブなデータベース接続の最大数。 |
maxretrytimes | Integer | 2 | 書き込み失敗時のリトライ回数。 |
retrywaittime | Integer | 200 | リトライ試行間の待機時間 (ミリ秒)。 |
writemode | Integer | 1 | 書き込みモード。1 = 追加。プライマリキーが重複する行は conflictmode によって処理されます。 |
conflictmode | String | ignore | プライマリキーの競合時の動作。ignore = 重複行をサイレントに破棄します。 |
exceptionmode | String | ignore | その他の書き込み例外時の動作。ignore = 失敗した行をスキップします。 |
casesensitive | Integer | 0 | 列名の一致で大文字と小文字を区別するかどうか。0 = 大文字と小文字を区別しません。 |
完全なパラメーターリファレンスについては、「Realtime Compute for Apache Flink を使用して AnalyticDB for PostgreSQL にデータを書き込む」をご参照ください。
ApsaraMQ for Kafka マッピングテーブルの作成
Flink でソーステーブルを定義します。
CREATE TABLE vector_kafka (
id INT,
market_time TIMESTAMP,
color VARCHAR(10),
price int,
feature string
) WITH (
'connector' = 'kafka',
'properties.bootstrap.servers' = '<your_broker_list>',
'topic' = 'vector_ingest',
'format' = 'csv',
'csv.field-delimiter' = '\t',
'scan.startup.mode' = 'earliest-offset'
);次の表に、Kafka ソースコネクタのパラメーターを示します。
| パラメーター | 型 | 必須 | デフォルト | 説明 |
|---|---|---|---|---|
connector | String | はい | (なし) | コネクタのタイプ。kafka に設定します。 |
properties.bootstrap.servers | String | はい | (なし) | ApsaraMQ for Kafka インスタンスのエンドポイントです。[エンドポイント情報] セクションで確認できます。これは、[インスタンスの詳細] ページの ApsaraMQ for Kafka コンソール にあります。 |
topic | String | はい | (なし) | 読み取り元の Kafka トピックの名前。 |
format | String | はい | (なし) | メッセージフォーマット。有効な値: csv、json、avro、debezium-json、canal-json、maxwell-json、avro-confluent、raw。 |
csv.field-delimiter | String | はい | (なし) | CSV フォーマットのメッセージのフィールドデリミタ。 |
scan.startup.mode | String | はい | (なし) | 読み取りを開始するオフセット。earliest-offset はトピックの先頭から読み取り、latest-offset は新しいメッセージのみを読み取ります。 |
インポートの開始
次の SQL 文を実行して、Kafka から AnalyticDB for PostgreSQL へのデータストリーミングを開始します。
INSERT INTO vector_ingest SELECT * FROM vector_kafka;Flink はこれを継続的なストリーミングジョブとして送信します。メッセージが到着すると、データは Kafka トピックから vector_test.car_info テーブルに流れます。
次のステップ
Realtime Compute for Apache Flink を使用して AnalyticDB for PostgreSQL にデータを書き込む — コネクタ パラメーターの完全なリファレンス
IP アドレスホワイトリストの設定 — インスタンスのネットワークアクセスを管理