當您需要構建基於常值內容的語義搜尋、智能問答或推薦系統等AI應用時,通常需要將資料庫中的業務資料(如商品描述、使用者評論)轉換為向量(Embedding),並儲存到專門的向量資料庫中。PolarDB MySQL版提供的AutoETL功能,結合PolarSearch的資料接入管道(ingestion pipeline),可以自動化完成資料提取-向量化-存入索引的全過程,無需您手動搭建和維護複雜的資料同步鏈路。本文將指導您如何配置這一自動化流程,實現將PolarDB MySQL版表中的文本資料,通過調用外部Embedding模型進行向量化,並最終將原始文本和產生的向量一併同步至PolarSearch索引中。
工作原理
整個自動化資料流程涉及PolarDB MySQL版、PolarSearch以及Embedding模型服務,其核心工作流程如下:
資料來源:您的原始文本資料存放區在PolarDB MySQL版的表中。
觸發同步:您通過在PolarDB MySQL版中調用
dbms_etl.sync_by_map函數,建立並啟動一個從MySQL到PolarSearch的資料同步任務(AutoETL)。資料寫入:AutoETL任務將表中的資料變更(增、刪、改)即時同步到PolarSearch的指定索引中。
管道處理:在資料寫入PolarSearch索引之前,會觸發預設的資料接入管道(Ingestion Pipeline)。
調用模型:管道中的
text_embedding處理器會讀取指定文字欄位的內容,通過連接器(Connector)調用外部Embedding模型服務,將文本轉換為向量。向量儲存:模型返回的向量會與未經處理資料一同寫入PolarSearch索引的相應欄位中,完成向量索引的構建。

適用範圍
在使用此功能前,請確保您的環境滿足以下條件:
叢集版本:
MySQL 8.0.1,且修訂版本需為8.0.1.1.52或以上。
MySQL 8.0.2,且修訂版本需為8.0.2.2.33或以上。
網路環境:用於文本向量化的Embedding模型服務需部署在與PolarDB MySQL版叢集相同的專用網路內。PolarSearch節點將作為用戶端主動訪問該模型服務。
準備Embedding模型服務
在開始配置PolarDB MySQL版之前,您需要一個可以通過HTTP訪問的Embedding模型服務,並將其部署在PolarSearch中。該服務接收文本輸入,並返回對應的向量。詳細部署說明,請參見整合外部模型服務。
建立資料接入管道
建立一個資料接入管道(ingestion pipeline),它會在資料寫入索引前自動執行向量化操作。定義一個名為text_to_vec_pipeline的管道,並在其中使用text_embedding處理器。
參數說明
model_id:為您在整合外部模型服務中部署的Embedding模型ID。field_map:定義了輸入和輸出欄位的映射關係。以下樣本表示:讀取my_text欄位的常值內容,調用模型產生向量,然後將向量結果存入my_vector欄位。
命令列
curl -XPUT "http://${POLARSEARCH_HOST_PORT}/_ingest/pipeline/text_to_vec_pipeline" \
--user "${USER_PASSWORD}" \
-H 'Content-Type: application/json' \
-d '{
"description": "A text embedding pipeline",
"processors": [
{
"text_embedding": {
"model_id": "<部署的Embedding模型ID>",
"field_map": {
"my_text": "my_vector"
}
}
}
]
}'Dashboard
PUT _ingest/pipeline/text_to_vec_pipeline
{
"description": "A text embedding pipeline",
"processors": [
{
"text_embedding": {
"model_id": "<部署的Embedding模型ID>",
"field_map": {
"my_text": "my_vector"
}
}
}
]
}建立資料同步鏈路
準備測試資料:登入PolarDB MySQL版叢集,建立一個資料庫和表,並插入一些測試資料。
CREATE DATABASE IF NOT EXISTS db; CREATE TABLE IF NOT EXISTS db.test_table ( id INT PRIMARY KEY, t1 INT, t2 TEXT ); INSERT INTO db.test_table(id, t1, t2) VALUES (1, 11, 'aaa'), (2, 22, 'bbb'), (3, 33, 'ccc');建立索引:建立一個用於儲存未經處理資料和向量的PolarSearch索引。
default_pipeline:將此索引的預設管道設定為上一步建立的text_to_vec_pipeline。這樣,任何寫入該索引的資料都會自動經過向量化處理。my_vector.type:向量欄位的資料類型需為knn_vector。my_vector.dimension:向量維度需與模型實際輸出的維度(1024)完全一致。
命令列
curl -XPUT "http://${POLARSEARCH_HOST_PORT}/test_index" \ --user "${USER_PASSWORD}" \ -H 'Content-Type: application/json' \ -d '{ "settings": { "index": { "knn": true, "default_pipeline": "text_to_vec_pipeline" } }, "mappings": { "properties": { "id": { "type": "integer" }, "my_text": { "type": "text" }, "my_vector": { "type": "knn_vector", "dimension": 1024, "method": { "engine": "faiss", "name": "hnsw" } } } } }'Dashboard
PUT /test_index { "settings": { "index": { "knn": true, "default_pipeline": "text_to_vec_pipeline" } }, "mappings": { "properties": { "id": { "type": "integer" }, "my_text": { "type": "text" }, "my_vector": { "type": "knn_vector", "dimension": 1024, "method": { "engine": "faiss", "name": "hnsw" } } } } }建立並啟動AutoETL同步鏈路:在PolarDB MySQL版叢集中,調用
dbms_etl.sync_by_map預存程序。該命令會建立一個從
db.test_table到test_index的映射關係。當db.test_table中的資料發生變化時,AutoETL會將id和t2欄位的值分別同步到test_index的id和my_text欄位。由於test_index設定了預設管道,資料在寫入時會自動觸發向量化流程,最終將產生的向量存入my_vector欄位。call dbms_etl.sync_by_map( "search", "test_index.my_text(db.test_table.t2),test_index.id(db.test_table.id)", "test_index.id=db.test_table.id", "", "" );
驗證資料同步
在PolarSearch中查詢test_index,確認資料已寫入,並且my_vector欄位包含了向量資料。
命令列
curl -XPUT "http://${POLARSEARCH_HOST_PORT}/test_index/_search" \
--user "${USER_PASSWORD}" \
-H 'Content-Type: application/json' \
-d '{
"query": {
"match": {
"my_text": "aaa"
}
}
}'Dashboard
POST /test_index/_search
{
"query": {
"match": {
"my_text": "aaa"
}
}
}
預期返回結果如下所示(部分展示):
"hits": [
{
"_index": "test_index",
"_id": "1",
"_score": 0.44583148,
"_source": {
"my_text": "aaa",
"id": 1,
"my_vector": [
-0.013453668,
0.009771001,
-0.00977745,
...
]
}
}
]