全部产品
Search
文档中心

云原生数据库 PolarDB:自动将PolarDB MySQL版数据向量化并同步至PolarSearch

更新时间:Mar 27, 2026

当您需要构建基于文本内容的语义搜索、智能问答或推荐系统等AI应用时,通常需要将数据库中的业务数据(如商品描述、用户评论)转换为向量(Embedding),并存储到专门的向量数据库中。PolarDB MySQL版提供的AutoETL功能,结合PolarSearch的数据接入管道(ingestion pipeline),可以自动化完成数据提取-向量化-存入索引的全过程,无需您手动搭建和维护复杂的数据同步链路。本文将指导您如何配置这一自动化流程,实现将PolarDB MySQL版表中的文本数据,通过调用外部Embedding模型进行向量化,并最终将原始文本和生成的向量一并同步至PolarSearch索引中。

工作原理

整个自动化数据流程涉及PolarDB MySQL版PolarSearch以及Embedding模型服务,其核心工作流如下:

  1. 数据源:您的原始文本数据存储在PolarDB MySQL版的表中。

  2. 触发同步:您通过在PolarDB MySQL版中调用dbms_etl.sync_by_map函数,创建并启动一个从MySQL到PolarSearch的数据同步任务(AutoETL)。

  3. 数据写入:AutoETL任务将表中的数据变更(增、删、改)实时同步到PolarSearch的指定索引中。

  4. 管道处理:在数据写入PolarSearch索引之前,会触发预设的数据接入管道(Ingestion Pipeline)。

  5. 调用模型:管道中的text_embedding处理器会读取指定文本字段的内容,通过连接器(Connector)调用外部Embedding模型服务,将文本转换为向量。

  6. 向量存储:模型返回的向量会与原始数据一同写入PolarSearch索引的相应字段中,完成向量索引的构建。

image

适用范围

在使用此功能前,请确保您的环境满足以下条件:

  • 集群版本

    • MySQL 8.0.1,且修订版本需为8.0.1.1.52或以上。

    • MySQL 8.0.2,且修订版本需为8.0.2.2.33或以上。

  • 网络环境:用于文本向量化的Embedding模型服务需部署在与PolarDB MySQL版集群相同的专有网络内。PolarSearch节点将作为客户端主动访问该模型服务。

准备Embedding模型服务

在开始配置PolarDB MySQL版之前,您需要一个可以通过HTTP访问的Embedding模型服务,并将其部署在PolarSearch中。该服务接收文本输入,并返回对应的向量。详细部署说明,请参见集成外部模型服务

配置访问凭证和环境变量

在开始操作前,请先准备好以下信息,并设置为环境变量。这将有效简化后续的curl命令,避免重复修改。统一管理所有配置和凭证,方便后续命令的复制和执行。

变量名

说明

示例值

POLARSEARCH_HOST_PORT

PolarSearch节点的获取连接地址

pc-xxx.polardbsearch.rds.aliyuncs.com:3001

USER_PASSWORD

PolarSearch节点的管理员账号

polarsearch_user:your_password

YOUR_API_KEY

外部模型服务的API Key。

说明
  • 使用阿里云大模型服务平台百炼时,请参见获取API Key

  • 自建模型服务无身份验证时,可忽略YOUR_API_KEY变量的设置。

sk-xxxxxxxxxxxxxxxxxxxxxxxx

操作步骤: 在您的终端中执行以下命令,将示例值替换为您的真实信息。

# 设置 PolarSearch 访问地址和端口
export POLARSEARCH_HOST_PORT="pc-xxx.polardbsearch.rds.aliyuncs.com:3001"

# 设置 PolarSearch 管理员密码
export USER_PASSWORD="polarsearch_user:your_password"

# 设置您的阿里云大模型服务平台百炼 API Key
export YOUR_API_KEY="sk-xxxxxxxxxxxxxxxxxxxxxxxx"

创建数据接入管道

创建一个数据接入管道(ingestion pipeline),它会在数据写入索引前自动执行向量化操作。定义一个名为text_to_vec_pipeline的管道,并在其中使用text_embedding处理器。

参数说明

  • model_id:为您在集成外部模型服务中部署的Embedding模型ID。

  • field_map:定义了输入和输出字段的映射关系。以下示例表示:读取my_text字段的文本内容,调用模型生成向量,然后将向量结果存入my_vector字段。

命令行

curl -XPUT "http://${POLARSEARCH_HOST_PORT}/_ingest/pipeline/text_to_vec_pipeline" \
--user "${USER_PASSWORD}" \
-H 'Content-Type: application/json' \
-d '{
  "description": "A text embedding pipeline",
  "processors": [
    {
      "text_embedding": {
        "model_id": "<部署的Embedding模型ID>",
        "field_map": {
          "my_text": "my_vector"
        }
      }
    }
  ]
}'

Dashboard

PUT _ingest/pipeline/text_to_vec_pipeline
{
  "description": "A text embedding pipeline",
  "processors": [
    {
      "text_embedding": {
        "model_id": "<部署的Embedding模型ID>",
        "field_map": {
          "my_text": "my_vector"
        }
      }
    }
  ]
}

创建数据同步链路

  1. 准备测试数据:登录PolarDB MySQL版集群,创建一个数据库和表,并插入一些测试数据。

    CREATE DATABASE IF NOT EXISTS db;
    CREATE TABLE IF NOT EXISTS db.test_table (
        id INT PRIMARY KEY, 
        t1 INT, 
        t2 TEXT
    );
    INSERT INTO db.test_table(id, t1, t2) VALUES 
    (1, 11, 'aaa'), 
    (2, 22, 'bbb'), 
    (3, 33, 'ccc');
  2. 创建索引:创建一个用于存储原始数据和向量的PolarSearch索引。

    • default_pipeline:将此索引的默认管道设置为上一步创建的text_to_vec_pipeline。这样,任何写入该索引的数据都会自动经过向量化处理。

    • my_vector.type:向量字段的数据类型需为knn_vector

    • my_vector.dimension:向量维度需与模型实际输出的维度(1024)完全一致。

    命令行

    curl -XPUT "http://${POLARSEARCH_HOST_PORT}/test_index" \
    --user "${USER_PASSWORD}" \
    -H 'Content-Type: application/json' \
    -d '{
      "settings": {
        "index": {
          "knn": true,
          "default_pipeline": "text_to_vec_pipeline"
        }
      },
      "mappings": {
        "properties": {
          "id": {
            "type": "integer"
          },
          "my_text": {
            "type": "text"
          },
          "my_vector": {
            "type": "knn_vector",
            "dimension": 1024,
            "method": {
              "engine": "faiss",
              "name": "hnsw"
            }
          }
        }
      }
    }'

    Dashboard

    PUT /test_index
    {
      "settings": {
        "index": {
          "knn": true,
          "default_pipeline": "text_to_vec_pipeline"
        }
      },
      "mappings": {
        "properties": {
          "id": {
            "type": "integer"
          },
          "my_text": {
            "type": "text"
          },
          "my_vector": {
            "type": "knn_vector",
            "dimension": 1024,
            "method": {
              "engine": "faiss",
              "name": "hnsw"
            }
          }
        }
      }
    }
  3. 创建并启动AutoETL同步链路:在PolarDB MySQL版集群中,调用dbms_etl.sync_by_map存储过程。

    该命令会建立一个从db.test_tabletest_index的映射关系。当db.test_table中的数据发生变化时,AutoETL会将idt2字段的值分别同步到test_indexidmy_text 字段。由于test_index设置了默认管道,数据在写入时会自动触发向量化流程,最终将生成的向量存入my_vector字段。

    call dbms_etl.sync_by_map(
      "search",
      "test_index.my_text(db.test_table.t2),test_index.id(db.test_table.id)",
      "test_index.id=db.test_table.id",
      "",
      ""
    );
    

验证数据同步

PolarSearch中查询test_index,确认数据已写入,并且my_vector字段包含了向量数据。

命令行

curl -XPUT "http://${POLARSEARCH_HOST_PORT}/test_index/_search" \
--user "${USER_PASSWORD}" \
-H 'Content-Type: application/json' \
-d '{
  "query": {
    "match": {
      "my_text": "aaa"
    }
  }
}'

Dashboard

POST /test_index/_search
{
  "query": {
    "match": {
      "my_text": "aaa"
    }
  }
}

预期返回结果如下所示(部分展示):

"hits": [
      {
        "_index": "test_index",
        "_id": "1",
        "_score": 0.44583148,
        "_source": {
          "my_text": "aaa",
          "id": 1,
          "my_vector": [
            -0.013453668,
            0.009771001,
            -0.00977745,
            ...
          ]
        }
      }
    ]