全部產品
Search
文件中心

PolarDB:自動將PolarDB MySQL版資料向量化並同步至PolarSearch

更新時間:Mar 28, 2026

當您需要構建基於常值內容的語義搜尋、智能問答或推薦系統等AI應用時,通常需要將資料庫中的業務資料(如商品描述、使用者評論)轉換為向量(Embedding),並儲存到專門的向量資料庫中。PolarDB MySQL版提供的AutoETL功能,結合PolarSearch的資料接入管道(ingestion pipeline),可以自動化完成資料提取-向量化-存入索引的全過程,無需您手動搭建和維護複雜的資料同步鏈路。本文將指導您如何配置這一自動化流程,實現將PolarDB MySQL版表中的文本資料,通過調用外部Embedding模型進行向量化,並最終將原始文本和產生的向量一併同步至PolarSearch索引中。

工作原理

整個自動化資料流程涉及PolarDB MySQL版PolarSearch以及Embedding模型服務,其核心工作流程如下:

  1. 資料來源:您的原始文本資料存放區在PolarDB MySQL版的表中。

  2. 觸發同步:您通過在PolarDB MySQL版中調用dbms_etl.sync_by_map函數,建立並啟動一個從MySQL到PolarSearch的資料同步任務(AutoETL)。

  3. 資料寫入:AutoETL任務將表中的資料變更(增、刪、改)即時同步到PolarSearch的指定索引中。

  4. 管道處理:在資料寫入PolarSearch索引之前,會觸發預設的資料接入管道(Ingestion Pipeline)。

  5. 調用模型:管道中的text_embedding處理器會讀取指定文字欄位的內容,通過連接器(Connector)調用外部Embedding模型服務,將文本轉換為向量。

  6. 向量儲存:模型返回的向量會與未經處理資料一同寫入PolarSearch索引的相應欄位中,完成向量索引的構建。

image

適用範圍

在使用此功能前,請確保您的環境滿足以下條件:

  • 叢集版本

    • MySQL 8.0.1,且修訂版本需為8.0.1.1.52或以上。

    • MySQL 8.0.2,且修訂版本需為8.0.2.2.33或以上。

  • 網路環境:用於文本向量化的Embedding模型服務需部署在與PolarDB MySQL版叢集相同的專用網路內。PolarSearch節點將作為用戶端主動訪問該模型服務。

準備Embedding模型服務

在開始配置PolarDB MySQL版之前,您需要一個可以通過HTTP訪問的Embedding模型服務,並將其部署在PolarSearch中。該服務接收文本輸入,並返回對應的向量。詳細部署說明,請參見整合外部模型服務

建立資料接入管道

建立一個資料接入管道(ingestion pipeline),它會在資料寫入索引前自動執行向量化操作。定義一個名為text_to_vec_pipeline的管道,並在其中使用text_embedding處理器。

參數說明

  • model_id:為您在整合外部模型服務中部署的Embedding模型ID。

  • field_map:定義了輸入和輸出欄位的映射關係。以下樣本表示:讀取my_text欄位的常值內容,調用模型產生向量,然後將向量結果存入my_vector欄位。

命令列

curl -XPUT "http://${POLARSEARCH_HOST_PORT}/_ingest/pipeline/text_to_vec_pipeline" \
--user "${USER_PASSWORD}" \
-H 'Content-Type: application/json' \
-d '{
  "description": "A text embedding pipeline",
  "processors": [
    {
      "text_embedding": {
        "model_id": "<部署的Embedding模型ID>",
        "field_map": {
          "my_text": "my_vector"
        }
      }
    }
  ]
}'

Dashboard

PUT _ingest/pipeline/text_to_vec_pipeline
{
  "description": "A text embedding pipeline",
  "processors": [
    {
      "text_embedding": {
        "model_id": "<部署的Embedding模型ID>",
        "field_map": {
          "my_text": "my_vector"
        }
      }
    }
  ]
}

建立資料同步鏈路

  1. 準備測試資料:登入PolarDB MySQL版叢集,建立一個資料庫和表,並插入一些測試資料。

    CREATE DATABASE IF NOT EXISTS db;
    CREATE TABLE IF NOT EXISTS db.test_table (
        id INT PRIMARY KEY, 
        t1 INT, 
        t2 TEXT
    );
    INSERT INTO db.test_table(id, t1, t2) VALUES 
    (1, 11, 'aaa'), 
    (2, 22, 'bbb'), 
    (3, 33, 'ccc');
  2. 建立索引:建立一個用於儲存未經處理資料和向量的PolarSearch索引。

    • default_pipeline:將此索引的預設管道設定為上一步建立的text_to_vec_pipeline。這樣,任何寫入該索引的資料都會自動經過向量化處理。

    • my_vector.type:向量欄位的資料類型需為knn_vector

    • my_vector.dimension:向量維度需與模型實際輸出的維度(1024)完全一致。

    命令列

    curl -XPUT "http://${POLARSEARCH_HOST_PORT}/test_index" \
    --user "${USER_PASSWORD}" \
    -H 'Content-Type: application/json' \
    -d '{
      "settings": {
        "index": {
          "knn": true,
          "default_pipeline": "text_to_vec_pipeline"
        }
      },
      "mappings": {
        "properties": {
          "id": {
            "type": "integer"
          },
          "my_text": {
            "type": "text"
          },
          "my_vector": {
            "type": "knn_vector",
            "dimension": 1024,
            "method": {
              "engine": "faiss",
              "name": "hnsw"
            }
          }
        }
      }
    }'

    Dashboard

    PUT /test_index
    {
      "settings": {
        "index": {
          "knn": true,
          "default_pipeline": "text_to_vec_pipeline"
        }
      },
      "mappings": {
        "properties": {
          "id": {
            "type": "integer"
          },
          "my_text": {
            "type": "text"
          },
          "my_vector": {
            "type": "knn_vector",
            "dimension": 1024,
            "method": {
              "engine": "faiss",
              "name": "hnsw"
            }
          }
        }
      }
    }
  3. 建立並啟動AutoETL同步鏈路:在PolarDB MySQL版叢集中,調用dbms_etl.sync_by_map預存程序。

    該命令會建立一個從db.test_tabletest_index的映射關係。當db.test_table中的資料發生變化時,AutoETL會將idt2欄位的值分別同步到test_indexidmy_text 欄位。由於test_index設定了預設管道,資料在寫入時會自動觸發向量化流程,最終將產生的向量存入my_vector欄位。

    call dbms_etl.sync_by_map(
      "search",
      "test_index.my_text(db.test_table.t2),test_index.id(db.test_table.id)",
      "test_index.id=db.test_table.id",
      "",
      ""
    );
    

驗證資料同步

PolarSearch中查詢test_index,確認資料已寫入,並且my_vector欄位包含了向量資料。

命令列

curl -XPUT "http://${POLARSEARCH_HOST_PORT}/test_index/_search" \
--user "${USER_PASSWORD}" \
-H 'Content-Type: application/json' \
-d '{
  "query": {
    "match": {
      "my_text": "aaa"
    }
  }
}'

Dashboard

POST /test_index/_search
{
  "query": {
    "match": {
      "my_text": "aaa"
    }
  }
}

預期返回結果如下所示(部分展示):

"hits": [
      {
        "_index": "test_index",
        "_id": "1",
        "_score": 0.44583148,
        "_source": {
          "my_text": "aaa",
          "id": 1,
          "my_vector": [
            -0.013453668,
            0.009771001,
            -0.00977745,
            ...
          ]
        }
      }
    ]