All Products
Search
Document Center

OpenSearch:Build RAG-based conversational search

Last Updated:Mar 12, 2026

For online knowledge base Q&A scenarios, AI Search Open Platform provides a complete RAG development pipeline. This pipeline includes three main modules: data pre-processing, retrieval service, and Q&A summary generation. AI Search Open Platform componentizes the algorithm services available for each module. You can flexibly select services for each module, such as document parsing, sorting, and Q&A summary services, to quickly generate development code. AI Search Open Platform exposes services as APIs. Developers can download the code locally and then replace information such as the API key, API endpoint, and local knowledge base, following the steps in this topic. This lets you quickly build a knowledge base Q&A application based on the RAG development pipeline.

Technical principles

Retrieval-Augmented Generation (RAG) is an artificial intelligence method that combines retrieval and generation technologies. It aims to improve the relevance, accuracy, and diversity of content generated by models. When processing generation tasks, RAG first retrieves the most relevant segments for the input from large amounts of external data or knowledge bases. It then feeds the retrieved information along with the original input into a Large Language Model (LLM) as a prompt or context. This guides the model to generate more precise and rich answers. This method allows the model to rely not only on its internal parameters and training data when generating responses, but also to use external, up-to-date, or domain-specific information to improve answer accuracy.

基于rag智能问答技术实现图-流程图.jpg

Scenarios

Knowledge base online Q&A is often used in business scenarios such as enterprise internal knowledge base retrieval and summarization, and online Q&A in vertical domains. Leveraging customers' professional knowledge base documents, Retrieval-Augmented Generation (RAG) technology and Large Language Models (LLM) can understand and respond to complex natural language queries. This helps enterprise customers quickly retrieve information from PDF, Word, table, and image documents using natural language.

image

Prerequisites

  • Activate AI Search Open Platform. For more information, see Activate Service.

  • Obtain the service endpoint and authentication information. For more information, see Obtain Service Endpoint and Obtain API Key.

    AI Search Open Platform supports service invocation via public network and VPC endpoints. VPC enables cross-region service invocation. Currently, users in Shanghai, Hangzhou, Shenzhen, Beijing, Zhangjiakou, and Qingdao regions can invoke AI Search Open Platform services via VPC endpoints.

    image

  • Create an Alibaba Cloud Elasticsearch (ES) instance. ES 8.5 or later is required. For more information, see Create an Alibaba Cloud Elasticsearch Instance. When accessing an Alibaba Cloud ES instance via a public or private network, add the IP address of the device to the instance's access whitelist. For more information, see Configure a Public or Private Network Access Whitelist for an Instance.

  • Use Python 3.7 or later. In your development environment, install the Python package dependencies `aiohttp 3.8.6` and `elasticsearch 8.14`.

Build the RAG development pipeline

Note

For ease of use, AI Search Open Platform provides four types of development frameworks:

  • Java SDK.

  • Python SDK.

  • If your business already uses the LangChain development framework, select LangChain.

  • If your business already uses the LlamaIndex development framework, select LlamaIndex.

Step 1: Select services and download code

Based on your knowledge base and business needs, select the algorithm services and development framework required in the RAG pipeline. This topic uses the Python SDK development framework as an example to describe how to build the RAG pipeline.

  1. Log on to the AI Search Open Platform console.

  2. Select the Shanghai region, switch to AI Search Open Platform, and switch to the target workspace.

    Note
    • Currently, AI Search Open Platform is available only in the China (Shanghai) and Germany (Frankfurt) regions.

    • Users in the China (Hangzhou), China (Shenzhen), China (Beijing), China (Zhangjiakou), and China (Qingdao) regions can invoke AI Search Open Platform services across regions using VPC endpoints.

  3. In the navigation pane on the left, select Scenario Center. To the right of RAG Scenario - Knowledge Base Online Q&A, click Enter.

  4. Based on the service information and business requirements, select the required service from the drop-down list. The Service Details page displays detailed information about the service.

    Note
    • When invoking algorithm services in the RAG pipeline via API, provide the service ID (service_id). For example, the ID for the document content parsing service is ops-document-analyze-001.

    • After switching services from the service list, the service_id in the generated code updates synchronously. After downloading the code to your local environment, you can still change the service_id to invoke the corresponding service.

    Stage

    Service Description

    Document Content Parsing

    Document Content Parsing Service (ops-document-analyze-001): Provides a general document parsing service. It supports extracting logical hierarchical structures such as titles and paragraphs from unstructured documents (text, tables, images, etc.) and outputs them in a structured format.

    Image Content Parsing

    • Image Content Recognition Service (ops-image-analyze-vlm-001): Parses and understands image content and recognizes text based on multimodal Large Language Models. The parsed text can be used for image retrieval Q&A scenarios.

    • Image Text Recognition Service (ops-image-analyze-ocr-001): Uses OCR capabilities for image text recognition. The parsed text can be used for image retrieval Q&A scenarios.

    Document Segmentation

    Document Segmentation Service (ops-document-split-001): Provides a general text segmentation service. It supports splitting structured data in HTML, Markdown, and TXT formats based on document paragraphs, text semantics, and specified rules. It also supports extracting code, images, and tables from documents as rich text.

    Text Embedding

    • OpenSearch Text Embedding Service-001 (ops-text-embedding-001): Provides multilingual (40+) text embedding services. The maximum input text length is 300, and the output vector dimension is 1536.

    • OpenSearch General Text Embedding Service-002 (ops-text-embedding-002): Provides multilingual (100+) text embedding services. The maximum input text length is 8192, and the output vector dimension is 1024.

    • OpenSearch Text Embedding Service-Chinese-001 (ops-text-embedding-zh-001): Provides Chinese text embedding services. The maximum input text length is 1024, and the output vector dimension is 768.

    • OpenSearch Text Embedding Service-English-001 (ops-text-embedding-en-001): Provides English text embedding services. The maximum input text length is 512, and the output vector dimension is 768.

    Text Sparse Embedding

    Provides a service to convert text data into sparse vector representation. Sparse vectors require less storage space and are often used to express keywords and term frequency information. Combine them with dense vectors for hybrid retrieval to improve retrieval effectiveness.

    OpenSearch Text Sparse Embedding Service (ops-text-sparse-embedding-001): Provides multilingual (100+) text embedding services. The maximum input text length is 8192.

    Query Analysis

    Query Analysis Service 001 (ops-query-analyze-001) provides a general query analysis service. It uses Large Language Models to understand the intent of user queries and expand similar questions.

    Search Engine

    • Alibaba Cloud Elasticsearch: A fully managed cloud service built on open-source Elasticsearch. It is 100% compatible with open-source features and supports out-of-the-box use and pay-as-you-go billing.

      Note

      When selecting Alibaba Cloud Elasticsearch as the search engine service, the text sparse embedding service is unavailable due to compatibility issues. Use text embedding-related services instead.

    • OpenSearch Vector Search Edition is a large-scale distributed vector search engine independently developed by Alibaba. It supports various vector retrieval algorithms, offers excellent performance with high accuracy, and enables cost-effective large-scale index building and retrieval. Additionally, the index supports horizontal scaling and merging, stream-based index building, real-time indexing and querying, and dynamic data updates.

      Note

      To use the OpenSearch Vector Search Edition engine solution, replace the engine configuration and code in the RAG pipeline.

    Sorting Service

    BGE Rerank Model (ops-bge-reranker-larger): A general document scoring service. It sorts documents by relevance to the query, from highest to lowest score, and outputs the scoring results.

    Large Language Model

    • OpenSearch-Qwen-Turbo (ops-qwen-turbo): Uses the Qwen-Turbo large language model as its base, with supervised model fine-tuning to enhance retrieval augmentation and reduce harmfulness.

    • Qwen-Turbo (qwen-turbo): A Qwen ultra-large language model that supports input in various languages, including Chinese and English. For more information, see Introduction to Qwen Large Language Models.

    • Qwen-Plus (qwen-plus): An enhanced version of the Qwen ultra-large language model that supports input in various languages, including Chinese and English. For more information, see Introduction to Qwen Large Language Models.

    • Qwen-Max (qwen-max): A Qwen hundred-billion-parameter ultra-large language model that supports input in various languages, including Chinese and English. For more information, see Introduction to Qwen Large Language Models.

  5. Click Configuration Complete, View Code to view and download the code.

    The code is divided into two parts: offline document processing and online user Q&A processing. This division aligns with the application's runtime flow when invoking the RAG pipeline:

    Process

    Function

    Description

    Offline Document Processing

    Handles document processing, including document parsing, image extraction, segmentation, embedding, and writing document processing results to an ES index.

    Use the main function `document_pipeline_execute` to complete the following process. You can input documents to be processed via document URL or Base64 encoding.

    1. Document parsing. For service invocation, see Document Parsing API.

      • Invoke the asynchronous document parsing API to extract document content from a document URL or decode it from a Base64 encoded file.

      • Construct a parsing task using the `create_async_extraction_task` function and poll the task completion status using the `poll_task_result` function.

    2. Image extraction. For service invocation, see Image Content Extraction API.

      • Invoke the asynchronous image parsing API to extract image content from an image URL or decode it from a Base64 encoded file.

      • Create an image parsing task using the `create_image_analyze_task` function and get the image parsing task status using the `get_image_analyze_task_status` function.

    3. Document segmentation. For service invocation, see Document Segmentation API.

      • Invoke the document segmentation API to segment the parsed document according to a specified strategy.

      • Perform document segmentation using the `document_split` function, which includes document segmentation and rich text content parsing.

    4. Text embedding. For service invocation, see Text Embedding API.

      • Invoke the text dense vector representation API to embed the segmented text.

      • Calculate the embedding vector for each segment using the `text_embedding` function.

    5. Write to ES. For service invocation, see Use the kNN Feature of Elasticsearch for Vector Nearest Neighbor Search.

      • Create ES index configuration, including specifying the vector field `embedding` and document content field `content`.

        Important

        When creating an ES index, the system deletes existing indexes with the same name. To avoid accidentally deleting indexes with the same name, change the index name in the code.

      • Batch write embedding results to the ES index using the `helpers.async_bulk` function.

    Online Q&A Processing

    Handles user online queries, including generating query vectors, query analysis, retrieving relevant document segments, sorting retrieval results, and generating answers based on retrieval results.

    Use the main function `query_pipeline_execute` to complete the following process, which processes user queries and outputs answers.

    1. Query embedding. For service invocation, see Text Embedding API.

      • Invoke the text dense vector representation API to convert user queries into vectors.

      • Generate query vectors using the `text_embedding` function.

    2. Invoke the query analysis service. For more information, see Query Analysis API.

      Invoke the query analysis API to identify user query intent and generate similar questions by analyzing historical messages.

    3. Search embedding segments. For service invocation, see Use the kNN Feature of Elasticsearch for Vector Nearest Neighbor Search.

      • Use ES to retrieve document segments in the index that are similar to the query vector.

      • Perform similarity retrieval using the `AsyncElasticsearch` search API in conjunction with kNN queries.

    4. Invoke the sorting service. For more information, see Sorting API.

      • Invoke the sorting service API to score and sort the retrieved relevant segments.

      • Score and sort document content based on user queries using the `documents_ranking` function.

    5. Invoke the Large Language Model to generate answers. For service invocation, see Answer Generation API.

      Invoke the Large Language Model service to generate the final answer using retrieval results and user queries via the `llm_call` function.

    Select the Document Processing Flow and Online Q&A Flow under Code Query, and click Copy Code or Download File to download the code to your local machine.

Step 2: Adapt to local environment and test the RAG development pipeline

After downloading the code to two local files, such as `online.py` and `offline.py`, configure key parameters in the code.

Category

Parameter

Description

AI Search Open Platform

api_key

API key. For more information, see Manage API Key.

aisearch_endpoint

API endpoint. For more information, see Obtain Service Endpoint.

Note

Remove "http://."

Supports API invocation via public network and VPC.

workspace_name

AI Search Open Platform

service_id

Service ID. For convenience, configure services and their IDs using `service_id_config` in both the offline document processing (`offline.py`) and online Q&A processing (`online.py`) code.

image

ES Search Engine

es_host

Elasticsearch (ES) instance endpoint. When accessing an Alibaba Cloud ES instance via public or private network, add the IP address of the device to be accessed to the instance's access whitelist. For more information, see Configure a Public or Private Network Access Whitelist for an Instance.

es_auth

Username and password for accessing the Elasticsearch instance. The username is `elastic`, and the password is the one you set when creating the instance. If you forget the password, reset it. For specific operations, see Reset Instance Access Password.

Other Parameters

No modification is needed if you use sample data.

After configuring the parameters, in a Python 3.7 or later environment, run the `offline.py` offline document processing file and the `online.py` online Q&A processing file sequentially to test whether the results are correct.

For example, if the knowledge base document is Introduction to AI Search Open Platform, ask the document: What can AI Search Open Platform do?

The running results are as follows:

  • Offline document processing results

    raglixian.jpg.png

  • Online Q&A processing results

    ragzaxian.png

  • View source files

    offline.py
    # RAG Offline Pipeline - Elasticsearch Engine
    
    # Environment requirements:
    # Python version: 3.7 or later
    # ES cluster version: 8.9 or later. If using Alibaba Cloud ES, activate it and set up an access IP whitelist in advance: https://www.alibabacloud.com/help/zh/es/user-guide/configure-a-public-or-private-ip-address-whitelist-for-an-elasticsearch-cluster
    
    # Package requirements:
    # pip install alibabacloud_searchplat20240529
    # pip install elasticsearch
    
    # AI Search Open Platform configuration
    aisearch_endpoint = "xxx.platform-cn-shanghai.opensearch.aliyuncs.com"
    api_key = "OS-xxx"
    workspace_name = "default"
    service_id_config = {"extract": "ops-document-analyze-001",
                         "split": "ops-document-split-001",
                         "text_embedding": "ops-text-embedding-001",
                         "text_sparse_embedding": "ops-text-sparse-embedding-001",
                         "image_analyze": "ops-image-analyze-ocr-001"}
    
    # ES configuration
    es_host = 'http://es-cn-xxx.public.elasticsearch.aliyuncs.com:9200'
    es_auth = ('elastic', 'xxx')
    
    
    # Input document URL. The sample document is the AI Search Open Platform product description.
    document_url = "https://www.alibabacloud.com/help/zh/open-search/search-platform/product-overview/introduction-to-search-platform?spm=a2c4g.11186623.0.0.7ab93526WDzQ8z"
    
    import asyncio
    from typing import List
    from elasticsearch import AsyncElasticsearch
    from elasticsearch import helpers
    from alibabacloud_tea_openapi.models import Config
    from alibabacloud_searchplat20240529.client import Client
    from alibabacloud_searchplat20240529.models import GetDocumentSplitRequest, CreateDocumentAnalyzeTaskRequest, \
        CreateDocumentAnalyzeTaskRequestDocument, GetDocumentAnalyzeTaskStatusRequest, \
        GetDocumentSplitRequestDocument, GetTextEmbeddingRequest, GetTextEmbeddingResponseBodyResultEmbeddings, \
        GetTextSparseEmbeddingRequest, GetTextSparseEmbeddingResponseBodyResultSparseEmbeddings, \
        CreateImageAnalyzeTaskRequestDocument, CreateImageAnalyzeTaskRequest, CreateImageAnalyzeTaskResponse, \
        GetImageAnalyzeTaskStatusRequest, GetImageAnalyzeTaskStatusResponse
    
    
    async def poll_task_result(ops_client, task_id, service_id, interval=5):
        while True:
            request = GetDocumentAnalyzeTaskStatusRequest(task_id=task_id)
            response = await ops_client.get_document_analyze_task_status_async(workspace_name, service_id, request)
            status = response.body.result.status
            if status == "PENDING":
                await asyncio.sleep(interval)
            elif status == "SUCCESS":
                return response
            else:
                raise Exception("document analyze task failed")
    
    
    def is_analyzable_url(url:str):
        if not url:
            return False
        image_extensions = {'.jpg', '.jpeg', '.png', '.bmp', '.tiff'}
        return url.lower().endswith(tuple(image_extensions))
    
    
    async def image_analyze(ops_client, url):
        try:
            print("image analyze: " + url)
            if url.startswith("//"):
                url = "https:" + url
            if not is_analyzable_url(url):
                print(url + " is not analyzable.")
                return url
            image_analyze_service_id = service_id_config["image_analyze"]
            document = CreateImageAnalyzeTaskRequestDocument(
                url=url,
            )
            request = CreateImageAnalyzeTaskRequest(document=document)
            response: CreateImageAnalyzeTaskResponse = ops_client.create_image_analyze_task(workspace_name, image_analyze_service_id, request)
            task_id = response.body.result.task_id
            while True:
                request = GetImageAnalyzeTaskStatusRequest(task_id=task_id)
                response: GetImageAnalyzeTaskStatusResponse = ops_client.get_image_analyze_task_status(workspace_name, image_analyze_service_id, request)
                status = response.body.result.status
                if status == "PENDING":
                    await asyncio.sleep(5)
                elif status == "SUCCESS":
                    return url + response.body.result.data.content
                else:
                    print("image analyze error: " + response.body.result.error)
                    return url
        except Exception as e:
            print(f"image analyze Exception : {e}")
    
    
    def chunk_list(lst, chunk_size):
        for i in range(0, len(lst), chunk_size):
            yield lst[i:i + chunk_size]
    
    
    async def write_to_es(doc_list):
        es = AsyncElasticsearch(
            [es_host],
            basic_auth=es_auth,
            verify_certs=False,  # Do not use SSL certificate verification
            request_timeout=30,
            max_retries=10,
            retry_on_timeout=True
        )
        index_name = 'dense_vertex_index'
    
        # Delete existing index
        if await es.indices.exists(index=index_name):
            await es.indices.delete(index=index_name)
    
        # Create vector index, specify embedding field as dense_vector, content field as text, source field as keyword
        index_mappings = {
            "mappings": {
                "properties": {
                    "emb": {
                        "type": "dense_vector",
                        "index": True,
                        "similarity": "cosine",
                        "dims": 1536  # Modify according to the embedding model output dimension
                    },
                    "content": {
                        "type": "text"
                    },
                    "source_doc": {
                        "type": "keyword"
                    }
                }
            }
        }
        await es.indices.create(index=index_name, body=index_mappings)
    
        # Upload embedding results to the index created in the previous step
        actions = []
        for i, doc in enumerate(doc_list):
            action = {
                "_index": index_name,
                "_id": doc['id'],
                "_source": {
                    "emb": doc['embedding'],
                    "content": doc['content'],
                    "source_doc": document_url
                }
            }
            actions.append(action)
    
        try:
            await helpers.async_bulk(es, actions)
        except Exception as e:
            for error in e.errors:
                print(error)
    
        # Confirm successful upload
        await asyncio.sleep(2)
        query = {
            "query": {
                "ids": {
                    "values": [doc_list[0]["id"]]
                }
            }
        }
        res = await es.search(index=index_name, body=query)
        if len(res['hits']['hits']) > 0:
            print("ES write success")
        await es.close()
    
    
    async def document_pipeline_execute(document_url: str = None, document_base64: str = None, file_name: str = None):
    
        # Generate OpenSearch development platform client
        config = Config(bearer_token=api_key, endpoint=aisearch_endpoint, protocol="http")
        ops_client = Client(config=config)
    
        # Step 1: Document parsing
        document_analyze_request = CreateDocumentAnalyzeTaskRequest(
            document=CreateDocumentAnalyzeTaskRequestDocument(url=document_url, content=document_base64,
                                                              file_name=file_name, file_type='html'))
        document_analyze_response = await ops_client.create_document_analyze_task_async(workspace_name=workspace_name,
                                                                                        service_id=service_id_config[
                                                                                            "extract"],
                                                                                        request=document_analyze_request)
        print("document-analyze task_id: " + document_analyze_response.body.result.task_id)
        extraction_result = await poll_task_result(ops_client, document_analyze_response.body.result.task_id,
                                                   service_id_config["extract"])
        print("document-analyze done")
        document_content = extraction_result.body.result.data.content
        content_type = extraction_result.body.result.data.content_type
        # Step 2: Document segmentation
        document_split_request = GetDocumentSplitRequest(
            GetDocumentSplitRequestDocument(content=document_content, content_type=content_type))
        document_split_result = await ops_client.get_document_split_async(workspace_name, service_id_config["split"],
                                                                          document_split_request)
        print("document-split done, chunks count: " + str(len(document_split_result.body.result.chunks))
              + " rich text count: " + str(len(document_split_result.body.result.rich_texts)))
    
        # Step 3: Text embedding
        # Extract segmentation results. Image segments extract text content via image parsing services.
        doc_list = ([{"id": chunk.meta.get("id"), "content": chunk.content} for chunk in
                     document_split_result.body.result.chunks]
                    + [{"id": chunk.meta.get("id"), "content": chunk.content} for chunk in
                       document_split_result.body.result.rich_texts if chunk.meta.get("type") != "image"]
                    + [{"id": chunk.meta.get("id"), "content": await image_analyze(ops_client, chunk.content)} for chunk in
                       document_split_result.body.result.rich_texts if chunk.meta.get("type") == "image"]
                    )
    
        chunk_size = 32  # A maximum of 32 embeddings can be computed at once.
        all_text_embeddings: List[GetTextEmbeddingResponseBodyResultEmbeddings] = []
        for chunk in chunk_list([text["content"] for text in doc_list], chunk_size):
            response = await ops_client.get_text_embedding_async(workspace_name, service_id_config["text_embedding"],
                                                                 GetTextEmbeddingRequest(chunk))
            all_text_embeddings.extend(response.body.result.embeddings)
    
        all_text_sparse_embeddings: List[GetTextSparseEmbeddingResponseBodyResultSparseEmbeddings] = []
        for chunk in chunk_list([text["content"] for text in doc_list], chunk_size):
            response = await ops_client.get_text_sparse_embedding_async(workspace_name,
                                                                        service_id_config["text_sparse_embedding"],
                                                                        GetTextSparseEmbeddingRequest(chunk,
                                                                                                      input_type="document",
                                                                                                      return_token=True))
            all_text_sparse_embeddings.extend(response.body.result.sparse_embeddings)
    
        for i in range(len(doc_list)):
            doc_list[i]["embedding"] = all_text_embeddings[i].embedding
            doc_list[i]["sparse_embedding"] = all_text_sparse_embeddings[i].embedding
    
        print("text-embedding done")
    
        # Step 4: Write to Elasticsearch storage engine
        await write_to_es(doc_list)
    
    
    if __name__ == "__main__":
        # Run asynchronous task
        #    import nest_asyncio # Uncomment these two lines if running in a Jupyter notebook
        #    nest_asyncio.apply() # Uncomment these two lines if running in a Jupyter notebook
        asyncio.run(document_pipeline_execute(document_url))
        # asyncio.run(document_pipeline_execute(document_base64="eHh4eHh4eHg...", file_name="attention.pdf")) # Another invocation method
    
    online.py
    # RAG Online Pipeline - Elasticsearch Engine
    
    # Environment requirements:
    # Python version: 3.7 or later
    # ES cluster version: 8.9 or later (If using Alibaba Cloud ES, activate it and set up an access IP whitelist in advance: https://www.alibabacloud.com/help/zh/es/user-guide/configure-a-public-or-private-ip-address-whitelist-for-an-elasticsearch-cluster)
    
    # Package requirements:
    # pip install alibabacloud_searchplat20240529
    # pip install elasticsearch
    
    # OpenSearch Search Development Workbench configuration
    api_key = "OS-xxx"
    aisearch_endpoint = "xxx.platform-cn-shanghai.opensearch.aliyuncs.com"
    workspace_name = "default"
    service_id_config = {
        "rank": "ops-bge-reranker-larger",
        "text_embedding": "ops-text-embedding-001",
        "text_sparse_embedding": "ops-text-sparse-embedding-001",
        "llm": "ops-qwen-turbo",
        "query_analyze": "ops-query-analyze-001"
    
    }
    
    # ES configuration
    es_host = 'http://es-cn-xxx.public.elasticsearch.aliyuncs.com:9200'
    es_auth = ('elastic', 'xxx')
    
    # User query:
    user_query = "What can the OpenSearch Search Development Platform do?"
    
    
    import asyncio
    from elasticsearch import AsyncElasticsearch
    from alibabacloud_tea_openapi.models import Config
    from alibabacloud_searchplat20240529.client import Client
    from alibabacloud_searchplat20240529.models import GetTextEmbeddingRequest,  \
        GetDocumentRankRequest, GetTextGenerationRequest, GetTextGenerationRequestMessages, \
        GetQueryAnalysisRequest
    
    # Generate OpenSearch development platform client
    config = Config(bearer_token=api_key, endpoint=aisearch_endpoint, protocol="http")
    ops_client = Client(config=config)
    
    
    async def es_retrieve(query):
        es = AsyncElasticsearch(
            [es_host],
            basic_auth=es_auth,
            verify_certs=False,
            request_timeout=30,
            max_retries=10,
            retry_on_timeout=True
        )
        index_name = 'dense_vertex_index'
        # Query embedding
        query_emb_result = await ops_client.get_text_embedding_async(workspace_name, service_id_config["text_embedding"],
                                                                     GetTextEmbeddingRequest(input=[query],
                                                                                             input_type="query"))
        query_emb = query_emb_result.body.result.embeddings[0].embedding
        query = {
            "field": "emb",
            "query_vector": query_emb,
            "k": 5,  # Number of document segments to return
            "num_candidates": 100  # HNSW search parameter efsearch
        }
    
        res = await es.search(index=index_name, knn=query)
        search_results = [item['_source']['content'] for item in res['hits']['hits']]
        await es.close()
        return search_results
    
    
    # Online Q&A pipeline, input is user question
    async def query_pipeline_execute():
    
        # Step 1: Query analysis
        query_analyze_response = ops_client.get_query_analysis(workspace_name, service_id_config['query_analyze'],
                                                               GetQueryAnalysisRequest(query=user_query))
        print("query analysis rewrite result: " + query_analyze_response.body.result.query)
    
        # Step 2: Document retrieval
        all_query_results = []
        user_query_results = await es_retrieve(user_query)
        all_query_results.extend(user_query_results)
        rewrite_query_results = await es_retrieve(query_analyze_response.body.result.query)
        all_query_results.extend(rewrite_query_results)
        for extend_query in query_analyze_response.body.result.queries:
            extend_query_result = await es_retrieve(extend_query)
            all_query_results.extend(extend_query_result)
        # Deduplicate all retrieval results
        remove_duplicate_results = list(set(all_query_results))
    
        # Step 3: Rerank retrieved documents
        rerank_top_k = 8
        score_results = await ops_client.get_document_rank_async(workspace_name, service_id_config["rank"],GetDocumentRankRequest(remove_duplicate_results, user_query))
        rerank_results = [remove_duplicate_results[item.index] for item in score_results.body.result.scores[:rerank_top_k]]
    
        # Step 4: Invoke Large Language Model to answer the question
        docs = '\n'.join([f"<article>{s}</article>" for s in rerank_results])
        messages = [
            GetTextGenerationRequestMessages(role="system", content="You are a helpful assistant."),
            GetTextGenerationRequestMessages(role="user",
                                             content=f"""The known information contains multiple independent documents, each between <article> and </article>. The known information is as follows:\n'''{docs}'''
                                             \n\nBased on the known information above, answer the user's question in detail and systematically. Ensure the answer fully addresses the question and correctly uses the known information. If the information is insufficient to answer the question, state 'Cannot answer this question based on the known information.'
                                             Do not use content not present in the known information to generate the answer. Ensure every statement in the answer is supported by the known information above. Please provide the answer in Chinese.
                                             \nThe question is:'''{user_query}'''""""")
        ]
        response = await ops_client.get_text_generation_async(workspace_name, service_id_config["llm"],
                                                              GetTextGenerationRequest(messages=messages))
        print("Final answer from Large Language Model: ", response.body.result.text)
    
    
    if __name__ == "__main__":
        #    import nest_asyncio # Uncomment these two lines if running in a Jupyter notebook
        #    nest_asyncio.apply() # Uncomment these two lines if running in a Jupyter notebook
        asyncio.run(query_pipeline_execute())
    

FAQ

During code execution, "Unclosed connector" warnings may appear because resources are not released promptly. No action is needed.