セマンティック検索、インテリジェント Q&A、レコメンデーションシステムなどの AI アプリケーションを構築する際、通常、プロダクトの説明やユーザーのコメントといったビジネスデータをデータベースからベクトル埋め込みに変換し、専用のベクトルデータベースに格納する必要があります。 PolarDB for MySQL の AutoETL 機能と PolarSearch の取り込みパイプラインを組み合わせることで、データ抽出、ベクトル化、インデックス作成の全プロセスが自動化されます。 これにより、複雑なデータ同期パイプラインを手動で構築・維持する必要がなくなります。このトピックでは、この自動化されたワークフローを設定して、外部の埋め込みモデルを呼び出して PolarDB for MySQL テーブルのテキストデータをベクトル埋め込みに変換し、元のテキストと生成されたベクトルの両方を PolarSearch インデックスに同期する方法を説明します。
仕組み
自動化されたデータフローには、PolarDB for MySQL、PolarSearch、および埋め込みモデルサービスが関わります。中心となるワークフローは以下の通りです。
データソース:元のテキストデータは PolarDB for MySQL テーブルに格納されます。
同期のトリガー:PolarDB for MySQL の
dbms_etl.sync_by_mapストアドプロシージャを呼び出すことで、MySQL から PolarSearch へのデータ同期タスク (AutoETL) を作成し、開始します。データ書き込み:AutoETL タスクは、ソーステーブルでのデータの変更 (挿入、削除、更新) を、指定された PolarSearch インデックスにリアルタイムで同期します。
パイプライン処理:データが PolarSearch インデックスに書き込まれる前に、事前に設定された取り込みパイプラインが実行されます。
モデルの呼び出し:パイプライン内の
text_embeddingプロセッサが、指定されたテキストフィールドのコンテンツを読み取り、コネクタを介して外部の埋め込みモデルサービスを呼び出し、テキストをベクトルに変換します。ベクトルストレージ:パイプラインは、モデルから返されたベクトルを元のデータとともに PolarSearch インデックスの対応するフィールドに書き込み、ベクトルインデックスを構築します。

前提条件
この機能を使用する前に、ご利用の環境が以下の要件を満たしていることを確認してください。
クラスターバージョン:
MySQL 8.0.1、リビジョンバージョン 8.0.1.1.52 以降。
MySQL 8.0.2、リビジョンバージョン 8.0.2.2.33 以降。
ネットワーク環境:テキストのベクトル化に使用する埋め込みモデルサービスは、PolarSearch ノードがアクセスできるように、ご利用の PolarDB for MySQL クラスターと同じ Virtual Private Cloud (VPC) 内にデプロイする必要があります。
埋め込みモデルサービスの準備
PolarDB for MySQL を設定する前に、PolarSearch が HTTP 経由でアクセスできる埋め込みモデルサービスが必要です。このサービスは、テキスト入力を受け取り、対応するベクトルを返します。詳細なデプロイメント手順については、「外部モデルサービスとの連携」をご参照ください。
アクセス認証情報と環境変数の設定
開始する前に、以下の情報を準備し、環境変数として設定します。これらの値を一元管理することで、後続の curl コマンドが簡素化され、コピーして実行しやすくなります。
パラメーター | 説明 | 値の例 |
| ご利用の PolarSearch ノードの接続アドレス。詳細については、「接続文字列の取得」をご参照ください。 |
|
| PolarSearch ノードのAdmin アカウント。 |
|
| 外部モデルサービスの API キー。 説明
|
|
手順:ターミナルで以下のコマンドを実行します。その際、値の例を実際の情報に置き換えてください。
# PolarSearch のホストとポートを設定
export POLARSEARCH_HOST_PORT="pc-xxx.polardbsearch.rds.aliyuncs.com:3001"
# PolarSearch の Admin ユーザー名とパスワードを設定
export USER_PASSWORD="polarsearch_user:your_password"
# Alibaba Cloud Model Studio (Bailian) の API キーを設定
export YOUR_API_KEY="sk-xxxxxxxxxxxxxxxxxxxxxxxx"取り込みパイプラインの作成
データがインデックスに書き込まれる前に自動的にベクトル化を実行する、取り込みパイプラインを作成します。text_to_vec_pipeline という名前のパイプラインを定義し、text_embedding プロセッサを含めます。
パラメーター
model_id:「外部モデルサービスとの連携」で説明したようにデプロイした埋め込みモデルの ID。field_map:入力フィールドと出力フィールド間のマッピングを定義します。以下の例では、my_textフィールドからテキストコンテンツを読み取り、モデルを呼び出してベクトルを生成し、結果のベクトルをmy_vectorフィールドに格納します。
CLI
curl -XPUT "http://${POLARSEARCH_HOST_PORT}/_ingest/pipeline/text_to_vec_pipeline" \
--user "${USER_PASSWORD}" \
-H 'Content-Type: application/json' \
-d '{
"description": "テキスト埋め込みパイプライン",
"processors": [
{
"text_embedding": {
"model_id": "<your_embedding_model_id>",
"field_map": {
"my_text": "my_vector"
}
}
}
]
}'ダッシュボード
PUT _ingest/pipeline/text_to_vec_pipeline
{
"description": "テキスト埋め込みパイプライン",
"processors": [
{
"text_embedding": {
"model_id": "<your_embedding_model_id>",
"field_map": {
"my_text": "my_vector"
}
}
}
]
}データ同期タスクの作成
テストデータの準備:ご利用の PolarDB for MySQL クラスターにログインし、データベースとテーブルを作成して、テストデータをいくつか挿入します。
CREATE DATABASE IF NOT EXISTS db; CREATE TABLE IF NOT EXISTS db.test_table ( id INT PRIMARY KEY, t1 INT, t2 TEXT ); INSERT INTO db.test_table(id, t1, t2) VALUES (1, 11, 'aaa'), (2, 22, 'bbb'), (3, 33, 'ccc');インデックスの作成:元のデータとベクトルを格納するための PolarSearch インデックスを作成します。
default_pipeline:このインデックスのデフォルトパイプラインを、前のステップで作成したtext_to_vec_pipelineに設定します。これにより、このインデックスに書き込まれるすべてのデータが自動的にベクトル化されます。my_vector.type:ベクトルフィールドのデータ型はknn_vectorである必要があります。my_vector.dimension:ベクトルディメンションは、モデルが出力するディメンション (1024) と一致する必要があります。
CLI
curl -XPUT "http://${POLARSEARCH_HOST_PORT}/test_index" \ --user "${USER_PASSWORD}" \ -H 'Content-Type: application/json' \ -d '{ "settings": { "index": { "knn": true, "default_pipeline": "text_to_vec_pipeline" } }, "mappings": { "properties": { "id": { "type": "integer" }, "my_text": { "type": "text" }, "my_vector": { "type": "knn_vector", "dimension": 1024, "method": { "engine": "faiss", "name": "hnsw" } } } } }'ダッシュボード
PUT /test_index { "settings": { "index": { "knn": true, "default_pipeline": "text_to_vec_pipeline" } }, "mappings": { "properties": { "id": { "type": "integer" }, "my_text": { "type": "text" }, "my_vector": { "type": "knn_vector", "dimension": 1024, "method": { "engine": "faiss", "name": "hnsw" } } } } }AutoETL 同期タスクの作成と開始:ご利用の PolarDB for MySQL クラスターで、
dbms_etl.sync_by_mapストアドプロシージャを呼び出します。このコマンドは、
db.test_tableテーブルからtest_indexインデックスへのマッピングを確立します。db.test_tableのデータが変更されると、AutoETL はidフィールドとt2フィールドの値をtest_indexインデックスのidフィールドとmy_textフィールドに同期します。test_indexにはデフォルトパイプラインが設定されているため、パイプラインは書き込まれたデータを自動的にベクトル化します。結果のベクトルはmy_vectorフィールドに格納されます。call dbms_etl.sync_by_map( "search", "test_index.my_text(db.test_table.t2),test_index.id(db.test_table.id)", "test_index.id=db.test_table.id", "", "" );
データ同期の検証
PolarSearch で test_index をクエリして、データが書き込まれ、my_vector フィールドにベクトルが含まれていることを確認します。
CLI
curl -XPUT "http://${POLARSEARCH_HOST_PORT}/test_index/_search" \
--user "${USER_PASSWORD}" \
-H 'Content-Type: application/json' \
-d '{
"query": {
"match": {
"my_text": "aaa"
}
}
}'ダッシュボード
POST /test_index/_search
{
"query": {
"match": {
"my_text": "aaa"
}
}
}
期待される応答は、以下の抜粋のようになります。
"hits": [
{
"_index": "test_index",
"_id": "1",
"_score": 0.44****48,
"_source": {
"my_text": "aaa",
"id": 1,
"my_vector": [
-0.013453668,
0.009771001,
-0.00977745,
...
]
}
}
]