全部產品
Search
文件中心

OpenSearch:多模態資料解析及向量化

更新時間:Aug 06, 2025

本文介紹在AI搜尋開放平台進行多模態資料預先處理流程。

應用情境

多模態資料預先處理情境提供非結構化文檔及圖片處理方案,全鏈路分別由文檔解析服務、圖片解析服務、文檔切分服務、文本向量化服務、文本稀疏向量化服務組成,您將體驗到完整的資料處理流程。以上服務均需使用AI搜尋開放平台的API服務,將按照實際調用產生費用。

前提條件

  • 開通AI搜尋開放平台服務,詳情請參見開通服務

  • 擷取服務調用地址和身份鑒權資訊,詳情請參見擷取服務接入地址擷取API-KEY

    AI搜尋開放平台支援通過公網和VPC地址調用服務,且可通過VPC實現跨地區調用服務。目前支援上海、杭州、深圳、北京、張家口、青島地區的使用者,通過VPC地址調用AI搜尋開放平台的服務。

多模態資料預先處理鏈路搭建

說明

為方便使用者使用,AI搜尋開放平台提供四種類型的開發架構:

  • Java SDK。

  • Python SDK。

  • 如果業務已經使用LangChain開發架構,在開發架構中選擇LangChain。

  • 如果業務已經使用LlamaIndex開發架構,在開發架構中選擇LlamaIndex。

步驟一:完成服務選型和代碼下載

本文以Python SDK開發架構為例介紹如何搭建多模態資料預先處理鏈路。

  1. 登入AI搜尋開放平台控制台

  2. 選擇上海地區,切換到AI搜尋開放平台,切換到目標空間。

    說明
    • 目前僅支援在上海開通AI搜尋開放平台功能。

    • 支援杭州、深圳、北京、張家口、青島地區的使用者,通過VPC地址跨地區調用AI搜尋開放平台的服務。

  3. 在左側導覽列選擇情境中心,選擇多模態資料預先處理情境-資料解析和向量化右側的進入

  4. 根據服務資訊結合業務特點,從下拉式清單中選擇所需服務,服務詳情頁面可查看服務詳細資料。

    說明
    • 通過API調用多模態資料預先處理鏈路中的演算法服務時,需要提供服務ID(service_id),如文檔內容解析服務的ID為ops-document-analyze-001。

    • 從服務列表中切換服務後,產生代碼中的service_id會同步更新。當代碼下載到本地環境後,您仍可以更改service_id,調用對應服務。

    環節

    服務說明

    文檔內容解析

    文檔內容解析服務(ops-document-analyze-001):提供通用文檔解析服務,支援從非結構化文檔(文本、表格、圖片等)中提取標題、分段等邏輯層級結構,以結構化格式輸出。

    圖片內容解析

    • 圖片內容理解服務(ops-image-analyze-vlm-001):可基於多模態大模型對圖片內容進行解析理解以及文字識別,解析後的文本可用於圖片檢索問答情境。

    • 圖片文本識別服務(ops-image-analyze-ocr-001):使用OCR能力進行圖片文字識別,解析後的文本可用於圖片檢索問答情境。

    文檔切片

    文檔切片服務(ops-document-split-001):提供通用文本切片服務,支援基於文檔段落、文本語義、指定規則,對HTML、Markdown、txt格式的結構化資料進行拆分,同時支援以富文本形式提取文檔中的代碼、圖片以及表格。

    文本向量化

    • OpenSearch文本向量化服務-001(ops-text-embedding-001):提供多語言(40+)文本向量化服務,輸入文本最大長度300,輸出向量維度1536維。

    • OpenSearch通用文本向量化服務-002(ops-text-embedding-002):提供多語言(100+)文本向量化服務,輸入文本最大長度8192,輸出向量維度1024維。

    • OpenSearch文本向量化服務-中文-001(ops-text-embedding-zh-001):提供中文文本向量化服務,輸入文本最大長度1024,輸出向量維度768維。

    • OpenSearch文本向量化服務-英文-001(ops-text-embedding-en-001):提供英文文本向量化服務,輸入文本最大長度512,輸出向量維度768維。

    文本稀疏向量化

    提供將文本資料轉化為稀疏向量形式表達的服務,稀疏向量儲存空間更小,常用於表達關鍵詞和詞頻資訊,可與稠密向量搭配進行混合檢索,提升檢索效果。

    OpenSearch文本稀疏向量化服務(ops-text-sparse-embedding-001):提供多語言(100+)文本向量化服務,輸入文本最大長度8192。

完成服務選型後,單擊配置完成,進入代碼查詢查看和下載代碼,按照應用調用資料預先處理鏈路時的運行流程:

作用

說明

負責文檔處理,包含文檔解析/圖片解析、文檔切片、文本向量化。

使用主函數document_pipeline_execute完成以下流程,可通過文檔URL或Base64編碼輸入待處理文檔。

  1. 文檔解析、圖片解析,服務調用請參見文檔解析API圖片內容提取API

    • 調用非同步文檔解析介面,從文檔URL地址中提取文檔內容,或者從Base64編碼檔案中進行解碼。

    • 調用非同步圖片解析介面,從圖片URL地址中提取圖片內容,或者從Base64編碼檔案中進行解碼。

  2. 文檔切片,服務調用請參見文檔切片API

    • 調用文檔切片介面,將解析後的文檔按指定策略切片。

    • 通過document_split函數進行文檔切片,包含文檔切片和富常值內容解析兩部分。

  3. 文本向量化、文本稀疏向量化,服務調用請參見文本向量化API文本稀疏向量化

    • 調用文本向量化介面,將切分後的資料轉化為稠密向量。

    • 調用文本稀疏向量化介面,將切分後的資料轉化為稀疏向量。後續如需進行內容檢索,可將向量資料寫入搜尋引擎。

選擇代碼查詢下的文檔解析與向量化,單擊複製代碼或者下載檔案,將代碼下載到本地。

步驟二:本地環境適配和測試資料預先處理開發鏈路

將代碼下載到本地檔案後,需要配置代碼中的關鍵參數。

類別

參數

說明

AI搜尋開放平台

api_key

API調用密鑰,擷取方式請參見管理API Key

aisearch_endpoint

API調用地址,擷取方式請參見擷取服務接入地址

說明

注意需要去掉“http://”。

支援通過公網和VPC兩種方式調用API。

workspace_name

AI搜尋開放平台中的空間名稱。

service_id

服務ID,為操作方便,可以通過service_id_config配置各項服務以及ID。

image

完成參數配置後即可在Python 3.8.1及以上版本環境中運行代碼,測試結果是否正確。

如在代碼中對AI搜尋開放平台介紹進行資料預先處理,運行結果如下:

image

文檔解析與向量化檔案:

# 多模態資料處理鏈路

# 環境需求:
# Python版本:3.7及以上


# 包需求:
# pip install alibabacloud_searchplat20240529


# AI搜尋開放平台配置
aisearch_endpoint = "xxx.platform-cn-shanghai.opensearch.aliyuncs.com"
api_key = "OS-xxx"
workspace_name = "default"
service_id_config = {"document_analyze": "ops-document-analyze-001",
                     "split": "ops-document-split-001",
                     "text_embedding": "ops-text-embedding-001",
                     "text_sparse_embedding": "ops-text-sparse-embedding-001",
                     "image_analyze": "ops-image-analyze-ocr-001"}

# 輸入文檔url,範例文件為opensearch產品說明文檔
document_url = "https://www.alibabacloud.com/help/zh/open-search/search-platform/product-overview/introduction-to-search-platform?spm=a2c4g.11186623.0.0.7ab93526WDzQ8z"

import asyncio
from operator import attrgetter
from typing import List
from Tea.exceptions import TeaException, RetryError
from alibabacloud_tea_openapi.models import Config
from alibabacloud_searchplat20240529.client import Client
from alibabacloud_searchplat20240529.models import GetDocumentSplitRequest, CreateDocumentAnalyzeTaskRequest, \
    CreateDocumentAnalyzeTaskRequestDocument, GetDocumentAnalyzeTaskStatusRequest, \
    GetDocumentSplitRequestDocument, GetTextEmbeddingRequest, GetTextEmbeddingResponseBodyResultEmbeddings, \
    GetTextSparseEmbeddingRequest, GetTextSparseEmbeddingResponseBodyResultSparseEmbeddings, \
    GetImageAnalyzeTaskStatusResponse, CreateImageAnalyzeTaskRequest, GetImageAnalyzeTaskStatusRequest, \
    CreateImageAnalyzeTaskRequestDocument, CreateImageAnalyzeTaskResponse


async def poll_doc_analyze_task_result(ops_client, task_id, service_id, interval=5):
    while True:
        request = GetDocumentAnalyzeTaskStatusRequest(task_id=task_id)
        response = await ops_client.get_document_analyze_task_status_async(workspace_name, service_id, request)
        status = response.body.result.status
        if status == "PENDING":
            await asyncio.sleep(interval)
        elif status == "SUCCESS":
            return response
        else:
            print("error: " + response.body.result.error)
            raise Exception("document analyze task failed")


def is_analyzable_url(url:str):
    if not url:
        return False
    image_extensions = {'.jpg', '.jpeg', '.png', '.bmp', '.tiff'}
    return url.lower().endswith(tuple(image_extensions))


async def image_analyze(ops_client, url):
    try:
        print("image analyze :" + url)
        if url.startswith("//"):
            url = "https:" + url
        if not is_analyzable_url(url):
            print(url + " is  unanalysable.")
            return url
        image_analyze_service_id = service_id_config["image_analyze"]
        document = CreateImageAnalyzeTaskRequestDocument(
            url=url,
        )
        request = CreateImageAnalyzeTaskRequest(document=document)
        response: CreateImageAnalyzeTaskResponse = ops_client.create_image_analyze_task(workspace_name, image_analyze_service_id, request)
        task_id = response.body.result.task_id
        while True:
            request = GetImageAnalyzeTaskStatusRequest(task_id=task_id)
            response: GetImageAnalyzeTaskStatusResponse = ops_client.get_image_analyze_task_status(workspace_name, image_analyze_service_id, request)
            status = response.body.result.status
            if status == "PENDING":
                await asyncio.sleep(5)
            elif status == "SUCCESS":
                return url + response.body.result.data.content
            else:
                print("image analyze error: " + response.body.result.error)
                return url
    except Exception as e:
        print(f"image analyze Exception : {e}")


def chunk_list(lst, chunk_size):
    for i in range(0, len(lst), chunk_size):
        yield lst[i:i + chunk_size]


async def document_pipeline_execute(document_url: str = None, document_base64: str = None, file_name: str = None):

    # 產生opensearch開發平台client
    config = Config(bearer_token=api_key,endpoint=aisearch_endpoint,protocol="http")
    ops_client = Client(config=config)

    # Step 1: 文檔解析/圖片解析
    document_analyze_request = CreateDocumentAnalyzeTaskRequest(document=CreateDocumentAnalyzeTaskRequestDocument(url=document_url, content=document_base64,file_name=file_name, file_type='html'))
    document_analyze_response = await ops_client.create_document_analyze_task_async(workspace_name=workspace_name,service_id=service_id_config["document_analyze"],request=document_analyze_request)
    print("document_analyze task_id:" + document_analyze_response.body.result.task_id)
    extraction_result = await poll_doc_analyze_task_result(ops_client, document_analyze_response.body.result.task_id, service_id_config["document_analyze"])
    print("document_analyze done")
    document_content = extraction_result.body.result.data.content
    content_type = extraction_result.body.result.data.content_type

    # Step 2: 文檔切片
    document_split_request = GetDocumentSplitRequest(
        GetDocumentSplitRequestDocument(content=document_content, content_type=content_type))
    document_split_result = await ops_client.get_document_split_async(workspace_name, service_id_config["split"],
                                                                      document_split_request)
    print("document-split done, chunks count: " + str(len(document_split_result.body.result.chunks))
          + " rich text count:" + str(len(document_split_result.body.result.rich_texts)))

    # Step 3: 文本向量化
    # 提取切片結果。圖片切片會通過圖片解析服務提取出常值內容
    doc_list = ([{"id": chunk.meta.get("id"), "content": chunk.content} for chunk in document_split_result.body.result.chunks]
                + [{"id": chunk.meta.get("id"), "content": chunk.content} for chunk in document_split_result.body.result.rich_texts if chunk.meta.get("type") != "image"]
                + [{"id": chunk.meta.get("id"), "content": await image_analyze(ops_client,chunk.content)} for chunk in document_split_result.body.result.rich_texts if chunk.meta.get("type") == "image"]
                )
    chunk_size = 32  # 一次最多允許計算32個embedding
    all_text_embeddings: List[GetTextEmbeddingResponseBodyResultEmbeddings] = []
    for chunk in chunk_list([text["content"] for text in doc_list], chunk_size):
        response = await ops_client.get_text_embedding_async(workspace_name,service_id_config["text_embedding"],GetTextEmbeddingRequest(chunk))
        all_text_embeddings.extend(response.body.result.embeddings)

    all_text_sparse_embeddings: List[GetTextSparseEmbeddingResponseBodyResultSparseEmbeddings] = []
    for chunk in chunk_list([text["content"] for text in doc_list], chunk_size):
        response = await ops_client.get_text_sparse_embedding_async(workspace_name,service_id_config["text_sparse_embedding"],GetTextSparseEmbeddingRequest(chunk,input_type="document",return_token=True))
        all_text_sparse_embeddings.extend(response.body.result.sparse_embeddings)

    for i in range(len(doc_list)):
        doc_list[i]["embedding"] = all_text_embeddings[i].embedding
        doc_list[i]["sparse_embedding"] = all_text_sparse_embeddings[i].embedding

    print("text-embedding done.")


if __name__ == "__main__":
    # 運行非同步任務
    #    import nest_asyncio # 如果在Jupyter notebook中運行,反注釋這兩行
    #    nest_asyncio.apply() # 如果在Jupyter notebook中運行,反注釋這兩行
    asyncio.run(document_pipeline_execute(document_url))
    # asyncio.run(document_pipeline_execute(document_base64="eHh4eHh4eHg...", file_name="attention.pdf")) #另外一種調用方式