All Products
Search
Document Center

OpenSearch:Build a RAG-based conversational search application

Last Updated:Aug 05, 2025

AI Search Open Platform provides a Retrieval-Augmented Generation (RAG)-based solution for conversational search based on knowledge bases. The solution consists of three modules: data preprocessing, data retrieval, and response generation. AI Search Open Platform provides the services available to each module as components to select, such as document parsing, re-ranking, and text generation. The platform allows you to use the services by calling APIs. To quickly build a RAG-based conversational search application, download the code to your machine and configure the information such as the API key, API endpoint, and local knowledge base.

How it works

RAG is an AI method that combines the retrieval and generation services and improves the relevance, accuracy, and diversity of the content generated by large language models (LLMs). During the generation process, RAG first retrieves the information most relevant to the input from a large amount of external data or a knowledge base. Then, it imports the retrieved information and the original input into an LLM as a prompt or context, to generate a more precise and informative output. In addition to internal parameters and training data, the LLM can reference the latest external data or domain-specific information to improve the accuracy of the output. 基于rag智能问答技术实现图-流程图.jpg

Scenarios

Conversational search based on knowledge bases is applicable to multiple scenarios, such as retrieval and summary of private knowledge bases, or conversational search in industry verticals. AI Search Open Platform combines RAG and LLMs to understand and respond to complex queries in natural languages based on the domain-specific knowledge bases of enterprises. The RAG-based solution helps enterprises quickly retrieve the required information from PDF and WORD files, tables, and images by using natural languages.

Prerequisites

Build a RAG-based development solution

Note

To facilitate user access, AI Search Open Platform offers the following development frameworks:

  • SDK for Java.

  • SDK for Python.

  • If your business uses LangChain, select LangChain for development framework.

  • If your business uses LlamaIndex, select LlamaIndex for development framework.

Step 1: Select service and download code

Select the algorithm services and development framework to be used in the RAG-based solution based on the knowledge. In this example, SDK for Python is used.

  1. Log on to the AI Search Open Platform console.

  2. Select the Germany (Frankfurt) region. Switch to AI Search Open Platform and the target workspace.

  3. In the left-side navigation pane, click Scene Center. Click Enter in the RAG Scene-Knowledge Base Online Q & A section.

    image

  4. Select the services that you want to use from the drop-down lists. Then, on the Service Details tab, view the service details.

    Note
    • To use an algorithm service in the RAG-based solution through API, specify the service ID by using the service_id. For example, the ID of the document content parsing service is ops-document-analyze-001.

    • After you select a service, the service_id parameter in the generated code is modified accordingly. After you download the code to the local environment, modify the service_id parameter in the code to call other services.

    Service

    Description

    Document content parsing

    Document content parsing service (ops-document-analyze-001) provides a general document parsing service that supports extracting logical hierarchical structures such as titles and segments, as well as text, tables, pictures, and other information from unstructured documents, and outputting them in a structured format.

    Image content parsing

    • The image text recognition service (ops-image-analyze-ocr-001) allows you to use the OCR feature to recognize the text in an image and extract the text information for image retrieval and conversational search.

    • The image content recognition service (ops-image-analyze-vlm-001) allows you to parse the content of an image based on multi-modal LLMs. You can also use the service to parse the text in the image and use the parsed text for image retrieval and conversational search.

    Document chunking

    Common Document Slicing Service (ops-document-split-001): provides a general-purpose text slicing service. You can use this service to segment structured data in the HTML, MARKDOWN, and TXT formats based on paragraphs, semantics, and specific rules. You can also extract code, images, and tables from rich text.

    Text embedding

    • OpenSearch text vectorization service -001 (ops-text-embedding-001): provides a text vectorization service that supports more than 40 languages. The maximum length of the input text can be 300 tokens, and the dimension of the generated vectors is 1,536.

    • OpenSearch Universal Text Vectorization Service -002 (ops-text-embedding-002): provides a text vectorization service that supports more than 100 languages. The maximum length of the input text can be 8,192 tokens, and the dimension of the generated vectors is 1,024.

    • OpenSearch text vectorization service-Chinese -001 (ops-text-embedding-zh-001): provides a text vectorization service for Chinese text. The maximum length of the input text can be 1,024 tokens, and the dimension of the generated vectors is 768.

    • OpenSearch text vectorization service-English -001 (ops-text-embedding-en-001): provides a text vectorization service for English text. The maximum length of the input text can be 512 tokens, and the dimension of the generated vectors is 768.

    Sparse text embedding

    Text sparse vectorization converts text data into sparse vectors that occupy less storage space. You can use sparse vectors to express keywords and the information about frequently used terms. You can perform a hybrid search by using sparse and dense vectors to improve the retrieval performance.

    OpenSearch text sparse vectorization service-generic (ops-text-sparse-embedding-001): provides a text vectorization service that supports more than 100 languages. The maximum length of the input text can be 8,192 tokens.

    Query analysis

    Query Analysis Service 001 (ops-query-analyze-001): provides a general-purpose query analysis service based on LLMs to understand user intents and extend similar questions.

    Search engine

    • Alibaba Cloud Elasticsearch is a fully managed and out-of-the-box cloud service developed based on open source Elasticsearch. Completely compatible with the open source features, Alibaba Cloud Elasticsearch supports the pay-as-you-go billing method and provides the out-of-the-box availability.

      Note

      If you select Alibaba Cloud Elasticsearch as a search engine, the text sparse vectorization service is unavailable due to compatibility issues. In this case, we recommend that you use a text vectorization service.

    • OpenSearch Vector Search Edition is a large-scale distributed vector search engine developed by Alibaba Group. OpenSearch Vector Search Edition supports multiple vector search algorithms and performs well in high-accuracy retrieval. You can use OpenSearch Vector Search Edition for large-scale index building and retrieval in a cost-effective manner. The following features are supported: horizontal scaling and merging of indexes, pipeline creation of indexes, real-time query upon creation, and dynamic update of real-time data.

      Note

      To use OpenSearch Vector Search Edition, replace the engine configurations and code in the solution.

    Re-ranking

    BGE rearrangement model (ops-bge-reranker-larger): provides a general-purpose document scoring service. You can use this service to score documents based on a query and the relevance of content, sort documents in descending order based on scores, and then return the scores.

    LLM

    • OpenSearch-Qwen-Turbo: Uses qwen-turbo as the base model, with supervised model fine-tuning, enhanced retrieval augmentation, and reduced harmfulness.

    • Qwen-Turbo: The fastest and most cost-effective model in the Qwen series, suitable for simple tasks. For more information, see Qwen LLMs.

    • Qwen-Plus: Balanced in capabilities, with reasoning effectiveness, cost, and speed between Qwen-Max and Qwen-Turbo, suitable for moderately complex tasks. For more information, see Qwen LLMs.

    • Qwen-Max (qwen-max) is a Qwen ultra-large language model that supports hundreds of billions of parameters and multiple input languages such as Chinese and English. For more information, see Qwen LLMs.

  5. After you select the services, click After the configuration is completed, enter the code query to view and download the code.

    The code consists of two parts based on the offline document processing and online conversational search processes of the RAG-based solution.

    Process

    Description

    Procedure

    Offline document processing

    This process consists of document parsing, image extraction, document segmentation, text vectorization, and writing the processing results to an Elasticsearch index.

    Use the main function document_pipeline_execute to perform the following steps. Pass the document to be processed by using a document URL or Base64-encoded file.

    1. Parse the document. For more information, see Document content parsing.

      • Call the asynchronous operation of document parsing to extract the content from the URL or decode the content from the Base64-encoded file.

      • Create a parsing task with the create_async_extraction_task function and poll the task completion status with the poll_task_result function.

    2. Extract the image. For more information, see Image content extraction.

      • Call the asynchronous operation of image parsing to extract the image content from the URL or decode the image content from the Base64-encoded file.

      • Create an image parsing task with the create_image_analyze_task function and poll the task completion status with the get_image_analyze_task_status function.

    3. Chunk the document. For more information, see Document chunking.

      • Call the document chunking operation to chunk the parsed document based on a specific policy.

      • Use the document_split function to chunk the document. This process includes text chunking and rich text parsing.

    4. Text embedding. For more information, see Text embedding.

      • Call the text embedding operation to convert the chunked text into dense vectors.

      • Use the text_embedding function to generate the embedding vector of each chunk.

    5. Write the processing results to an Elasticsearch index. For more information, see Use the kNN search feature of Elasticsearch.

      • Create an Elasticsearch index whose configurations include the embedding and content fields.

        Important

        When you create an Elasticsearch index, the system will delete the index with the same name. To prevent the system from deleting an index incorrectly, change the index name in the code.

      • Call the helpers.async_bulk function to write the embedded results to the Elasticsearch index.

    Online conversational search

    This process consists of generating query vectors, performing query analysis, retrieving relevant document chunks, re-ranking search results, and generating answers based on search results.

    Use the main function query_pipeline_execute to process a user query and generate an answer.

    1. Perform text embedding on the query. For more information, see Text embedding.

      • Call the text vectorization operation to convert the query into a dense vector.

      • Use the text_embedding function to generate a query vector.

    2. Call the query analysis service. For more information, see Query analysis.

      Call the query analysis operation to identify user intents and generate similar questions by analyzing conversation history.

    3. Retrieve embedded chunks. For more information, see Use the kNN search feature of Elasticsearch.

      • Use the Elasticsearch index to retrieve the embedded chunks that are similar to the query vector.

      • Use the search operation of AsyncElasticsearch and the k-nearest neighbor (kNN) search feature to retrieve results based on the vector similarity.

    4. Re-rank the results. For more information, see Re-rank service.

      • Call the re-ranking operation to score the retrieved chunks and sort the results based on scores.

      • Call the documents_ranking function to score and re-rank the documents based on the query vector.

    5. Call the text generation operation to generate an answer. For more information, see API details.

      Use the text generation service and call the llm_call function to generate an answer based on the retrieved results and the user query.

    Select Document processing flow and Online Q & A Process under Code Query. In the code editor, click Copy Code or Download File.

Step 2: Test the code in local environment

After you download the code files to your device, you must specify the parameters in the code. In this example, the online.py and offline.py files are downloaded. The online.py file is used for online conversational search and the offline.py file is used for offline document processing. The following table describes the parameters.

Section

Parameter

Description

AI Search Open Platform

api_key

The API key. For more information about how to obtain the API key, see Manage API key.

aisearch_endpoint

The API endpoint. For more information about how to obtain the API endpoint, see Query service endpoint.

Note

You must remove "http://".

You can call API operations over the Internet or a VPC.

workspace_name

The workspace name in AI Search Open Platform.

service_id

The service ID. To facilitate code development, you can configure services and specify service IDs separately in the offline.py and online.py files by using the service_id_config parameter.

image

Elasticsearch search engine

es_host

The endpoint of the Elasticsearch cluster. If you want to access the Elasticsearch cluster over the Internet or a VPC, add the IP address of your device to a public or private IP address whitelist of the Elasticsearch cluster. For more information, see Configure a public or private IP address whitelist for an Elasticsearch cluster.

es_auth

The username and password that are used to access the Elasticsearch cluster. The username is elastic and the password is the password that you set when you created the Elasticsearch cluster. If you forget the password, you can reset it. For more information, see Reset the access password for an Elasticsearch cluster.

Other parameters

You do not need to modify other parameters if you use the sample code.

After you specify the parameters, separately run the code in the offline.py and online.py files in Python 3.7 or later to check whether the generated answer is correct.

For example, the document What is AI Search Open Platform? is used as the knowledge base. The following question is asked based on the document: What features does AI Search Open Platform provide?

The following figures show the results that are returned.

  • Offline document processing results

    raglixian.jpg.png

  • Online conversational search results

    ragzaxian.png

  • Source code files

    offline.py
    # RAG-based offline link-Elasticsearch engine
    
    # The following environment requirements are met:
    # Python version 3.7 or later is installed.
    # An Elasticsearch cluster of V8.9 or later is created. If you want to create an Alibaba Cloud Elasticsearch cluster, you must activate Alibaba Cloud Elasticsearch in advance and configure the IP address whitelist of the cluster. For more information, visit https://www.alibabacloud.com/help/zh/es/user-guide/configure-a-public-or-private-ip-address-whitelist-for-an-elasticsearch-cluster.
    
    # The following dependencies are installed:
    # pip install alibabacloud_searchplat20240529
    # pip install elasticsearch
    
    # AI Search Open Platform configurations
    aisearch_endpoint = "xxx.platform-cn-shanghai.opensearch.aliyuncs.com"
    api_key = "OS-xxx"
    workspace_name = "default"
    service_id_config = {"extract": "ops-document-analyze-001",
                         "split": "ops-document-split-001",
                         "text_embedding": "ops-text-embedding-001",
                         "text_sparse_embedding": "ops-text-sparse-embedding-001",
                         "image_analyze": "ops-image-analyze-ocr-001"}
    
    # Elasticsearch configurations
    es_host = 'http://es-cn-xxx.public.elasticsearch.aliyuncs.com:9200'
    es_auth = ('elastic', 'xxx')
    
    
    # Specify the document URL. In this example, the OpenSearch product description document is used.
    document_url = "https://www.alibabacloud.com/help/zh/open-search/search-platform/product-overview/introduction-to-search-platform?spm=a2c4g.11186623.0.0.7ab93526WDzQ8z"
    
    import asyncio
    from typing import List
    from elasticsearch import AsyncElasticsearch
    from elasticsearch import helpers
    from alibabacloud_tea_openapi.models import Config
    from alibabacloud_searchplat20240529.client import Client
    from alibabacloud_searchplat20240529.models import GetDocumentSplitRequest, CreateDocumentAnalyzeTaskRequest, \
        CreateDocumentAnalyzeTaskRequestDocument, GetDocumentAnalyzeTaskStatusRequest, \
        GetDocumentSplitRequestDocument, GetTextEmbeddingRequest, GetTextEmbeddingResponseBodyResultEmbeddings, \
        GetTextSparseEmbeddingRequest, GetTextSparseEmbeddingResponseBodyResultSparseEmbeddings, \
        CreateImageAnalyzeTaskRequestDocument, CreateImageAnalyzeTaskRequest, CreateImageAnalyzeTaskResponse, \
        GetImageAnalyzeTaskStatusRequest, GetImageAnalyzeTaskStatusResponse
    
    
    async def poll_task_result(ops_client, task_id, service_id, interval=5):
        while True:
            request = GetDocumentAnalyzeTaskStatusRequest(task_id=task_id)
            response = await ops_client.get_document_analyze_task_status_async(workspace_name, service_id, request)
            status = response.body.result.status
            if status == "PENDING":
                await asyncio.sleep(interval)
            elif status == "SUCCESS":
                return response
            else:
                raise Exception("document analyze task failed")
    
    
    def is_analyzable_url(url:str):
        if not url:
            return False
        image_extensions = {'.jpg', '.jpeg', '.png', '.bmp', '.tiff'}
        return url.lower().endswith(tuple(image_extensions))
    
    
    async def image_analyze(ops_client, url):
        try:
            print("image analyze :" + url)
            if url.startswith("//"):
                url = "https:" + url
            if not is_analyzable_url(url):
                print(url + " is not analyzable.")
                return url
            image_analyze_service_id = service_id_config["image_analyze"]
            document = CreateImageAnalyzeTaskRequestDocument(
                url=url,
            )
            request = CreateImageAnalyzeTaskRequest(document=document)
            response: CreateImageAnalyzeTaskResponse = ops_client.create_image_analyze_task(workspace_name, image_analyze_service_id, request)
            task_id = response.body.result.task_id
            while True:
                request = GetImageAnalyzeTaskStatusRequest(task_id=task_id)
                response: GetImageAnalyzeTaskStatusResponse = ops_client.get_image_analyze_task_status(workspace_name, image_analyze_service_id, request)
                status = response.body.result.status
                if status == "PENDING":
                    await asyncio.sleep(5)
                elif status == "SUCCESS":
                    return url + response.body.result.data.content
                else:
                    print("image analyze error: " + response.body.result.error)
                    return url
        except Exception as e:
            print(f"image analyze Exception : {e}")
    
    
    def chunk_list(lst, chunk_size):
        for i in range(0, len(lst), chunk_size):
            yield lst[i:i + chunk_size]
    
    
    async def write_to_es(doc_list):
        es = AsyncElasticsearch(
            [es_host],
            basic_auth=es_auth,
            verify_certs=False,  # SSL certificate verification is not used.
            request_timeout=30,
            max_retries=10,
            retry_on_timeout=True
        )
        index_name = 'dense_vertex_index'
    
        # Delete the existing index.
        if await es.indices.exists(index=index_name):
            await es.indices.delete(index=index_name)
    
        # Create a vector index and set the embedding field to the generated dense vectors, the content field to the input text, and the source field to keywords.
        index_mappings = {
            "mappings": {
                "properties": {
                    "emb": {
                        "type": "dense_vector",
                        "index": True,
                        "similarity": "cosine",
                        "dims": 1536  # Modify the dimension based on the output of the embedding model.
                    },
                    "content": {
                        "type": "text"
                    },
                    "source_doc": {
                        "type": "keyword"
                    }
                }
            }
        }
        await es.indices.create(index=index_name, body=index_mappings)
    
        # Write the vectorized results to the index that you created in the previous step.
        actions = []
        for i, doc in enumerate(doc_list):
            action = {
                "_index": index_name,
                "_id": doc['id'],
                "_source": {
                    "emb": doc['embedding'],
                    "content": doc['content'],
                    "source_doc": document_url
                }
            }
            actions.append(action)
    
        try:
            await helpers.async_bulk(es, actions)
        except Exception as e:
            for error in e.errors:
                print(error)
    
        # Check whether the vectorized results are written to the index.
        await asyncio.sleep(2)
        query = {
            "query": {
                "ids": {
                    "values": [doc_list[0]["id"]]
                }
            }
        }
        res = await es.search(index=index_name, body=query)
        if len(res['hits']['hits']) > 0:
            print("ES write success")
        await es.close()
    
    
    async def document_pipeline_execute(document_url: str = None, document_base64: str = None, file_name: str = None):
    
        # Generate an AI Search Open Platform client.
        config = Config(bearer_token=api_key, endpoint=aisearch_endpoint, protocol="http")
        ops_client = Client(config=config)
    
        # Step 1: Parse the document.
        document_analyze_request = CreateDocumentAnalyzeTaskRequest(
            document=CreateDocumentAnalyzeTaskRequestDocument(url=document_url, content=document_base64,
                                                              file_name=file_name, file_type='html'))
        document_analyze_response = await ops_client.create_document_analyze_task_async(workspace_name=workspace_name,
                                                                                        service_id=service_id_config[
                                                                                            "extract"],
                                                                                        request=document_analyze_request)
        print("document-analyze task_id:" + document_analyze_response.body.result.task_id)
        extraction_result = await poll_task_result(ops_client, document_analyze_response.body.result.task_id,
                                                   service_id_config["extract"])
        print("document-analyze done")
        document_content = extraction_result.body.result.data.content
        content_type = extraction_result.body.result.data.content_type
        # Step 2: Split the document.
        document_split_request = GetDocumentSplitRequest(
            GetDocumentSplitRequestDocument(content=document_content, content_type=content_type))
        document_split_result = await ops_client.get_document_split_async(workspace_name, service_id_config["split"],
                                                                          document_split_request)
        print("document-split done, chunks count: " + str(len(document_split_result.body.result.chunks))
              + " rich text count:" + str(len(document_split_result.body.result.rich_texts)))
    
        # Step 3: Perform text vectorization.
        # Extract the splitting results. Image splitting uses the image parsing service to extract text.
        doc_list = ([{"id": chunk.meta.get("id"), "content": chunk.content} for chunk in
                     document_split_result.body.result.chunks]
                    + [{"id": chunk.meta.get("id"), "content": chunk.content} for chunk in
                       document_split_result.body.result.rich_texts if chunk.meta.get("type") != "image"]
                    + [{"id": chunk.meta.get("id"), "content": await image_analyze(ops_client, chunk.content)} for chunk in
                       document_split_result.body.result.rich_texts if chunk.meta.get("type") == "image"]
                    )
    
        chunk_size = 32  # A maximum of 32 vectors can be generated at a time.
        all_text_embeddings: List[GetTextEmbeddingResponseBodyResultEmbeddings] = []
        for chunk in chunk_list([text["content"] for text in doc_list], chunk_size):
            response = await ops_client.get_text_embedding_async(workspace_name, service_id_config["text_embedding"],
                                                                 GetTextEmbeddingRequest(chunk))
            all_text_embeddings.extend(response.body.result.embeddings)
    
        all_text_sparse_embeddings: List[GetTextSparseEmbeddingResponseBodyResultSparseEmbeddings] = []
        for chunk in chunk_list([text["content"] for text in doc_list], chunk_size):
            response = await ops_client.get_text_sparse_embedding_async(workspace_name,
                                                                        service_id_config["text_sparse_embedding"],
                                                                        GetTextSparseEmbeddingRequest(chunk,
                                                                                                      input_type="document",
                                                                                                      return_token=True))
            all_text_sparse_embeddings.extend(response.body.result.sparse_embeddings)
    
        for i in range(len(doc_list)):
            doc_list[i]["embedding"] = all_text_embeddings[i].embedding
            doc_list[i]["sparse_embedding"] = all_text_sparse_embeddings[i].embedding
    
        print("text-embedding done")
    
        # Step 4: Write the processing results to an Elasticsearch engine.
        await write_to_es(doc_list)
    
    
    if __name__ == "__main__":
        # Run the aynchronous task.
        #    import nest_asyncio # If you run the code in Jupyter Notebook, uncomment this line of code.
        #    nest_asyncio.apply() # If you run the code in Jupyter Notebook, uncomment this line of code.
        asyncio.run(document_pipeline_execute(document_url))
        # asyncio.run(document_pipeline_execute(document_base64="eHh4eHh4eHg...", file_name="attention.pdf")) #You can also use a Base64-encoded file to specify the document to be processed.
    
    online.py
    # RAG-based online link-Elasticsearch engine
    
    # The following environment requirements are met:
    # Python version 3.7 or later is installed.
    # An Elasticsearch cluster of V8.9 or later is created. If you want to create an Alibaba Cloud Elasticsearch cluster, you must activate Alibaba Cloud Elasticsearch in advance and configure the IP address whitelist of the cluster. For more information, visit https://www.alibabacloud.com/help/zh/es/user-guide/configure-a-public-or-private-ip-address-whitelist-for-an-elasticsearch-cluster.
    
    # The following dependencies are installed:
    # pip install alibabacloud_searchplat20240529
    # pip install elasticsearch
    
    # AI Search Open Platform configurations
    api_key = "OS-xxx"
    aisearch_endpoint = "xxx.platform-cn-shanghai.opensearch.aliyuncs.com"
    workspace_name = "default"
    service_id_config = {
        "rank": "ops-bge-reranker-larger",
        "text_embedding": "ops-text-embedding-001",
        "text_sparse_embedding": "ops-text-sparse-embedding-001",
        "llm": "ops-qwen-turbo",
        "query_analyze": "ops-query-analyze-001"
    
    }
    
    # Elasticsearch configurations
    es_host = 'http://es-cn-xxx.public.elasticsearch.aliyuncs.com:9200'
    es_auth = ('elastic', 'xxx')
    
    # Enter the user query
    user_query = "What features does AI Search Open Platform provide?"
    
    
    import asyncio
    from elasticsearch import AsyncElasticsearch
    from alibabacloud_tea_openapi.models import Config
    from alibabacloud_searchplat20240529.client import Client
    from alibabacloud_searchplat20240529.models import GetTextEmbeddingRequest,  \
        GetDocumentRankRequest, GetTextGenerationRequest, GetTextGenerationRequestMessages, \
        GetQueryAnalysisRequest
    
    # Generate an AI Search Open Platform client.
    config = Config(bearer_token=api_key, endpoint=aisearch_endpoint, protocol="http")
    ops_client = Client(config=config)
    
    
    async def es_retrieve(query):
        es = AsyncElasticsearch(
            [es_host],
            basic_auth=es_auth,
            verify_certs=False,
            request_timeout=30,
            max_retries=10,
            retry_on_timeout=True
        )
        index_name = 'dense_vertex_index'
        # Perform query vectorization.
        query_emb_result = await ops_client.get_text_embedding_async(workspace_name, service_id_config["text_embedding"],
                                                                     GetTextEmbeddingRequest(input=[query],
                                                                                             input_type="query"))
        query_emb = query_emb_result.body.result.embeddings[0].embedding
        query = {
            "field": "emb",
            "query_vector": query_emb,
            "k": 5,  # Generate the number of document segments.
            "num_candidates": 100  # This parameter works in the same way as the efsearch parameter in a hierarchical navigable small world (HNSW)-based search.
        }
    
        res = await es.search(index=index_name, knn=query)
        search_results = [item['_source']['content'] for item in res['hits']['hits']]
        await es.close()
        return search_results
    
    
    # Perform a conversational search. The input is the user query.
    async def query_pipeline_execute():
    
        # Step 1: Perform query analysis.
        query_analyze_response = ops_client.get_query_analysis(workspace_name, service_id_config['query_analyze'],
                                                               GetQueryAnalysisRequest(query=user_query))
        print("query analysis rewrite result:" + query_analyze_response.body.result.query)
    
        # Step 2: Retrieve the document.
        all_query_results = []
        user_query_results = await es_retrieve(user_query)
        all_query_results.extend(user_query_results)
        rewrite_query_results = await es_retrieve(query_analyze_response.body.result.query)
        all_query_results.extend(rewrite_query_results)
        for extend_query in query_analyze_response.body.result.queries:
            extend_query_result = await es_retrieve(extend_query)
            all_query_results.extend(extend_query_result)
        # Deduplicate all retrieved segments.
        remove_duplicate_results = list(set(all_query_results))
    
        # Step 3: Call the sorting operation to sort the retrieved segments.
        rerank_top_k = 8
        score_results = await ops_client.get_document_rank_async(workspace_name, service_id_config["rank"],GetDocumentRankRequest(remove_duplicate_results, user_query))
        rerank_results = [remove_duplicate_results[item.index] for item in score_results.body.result.scores[:rerank_top_k]]
    
        # Step 4: Call the LLM to generate an answer.
        docs = '\n'.join([f"<article>{s}</article>" for s in rerank_results])
        messages = [
            GetTextGenerationRequestMessages(role="system", content="You are a helpful assistant."),
            GetTextGenerationRequestMessages(role="user",
                                             content=f"""The context contains multiple independent documents, each of which is placed between the <article> and </article> tags. Context:\n'''{docs}'''
                                             \n\nAnswer the question in a detailed and organized manner based on the preceding context. Make sure that the question is adequately answered based on the context. If the information provided by the context is not sufficient to answer the question, return the following message: The question cannot be answered based on the context. Do not use content that is not included in the context to generate answers. Make sure that each statement in the answer is supported by the corresponding content in the context. Answer the question in Chinese.
                                             \nQuestion:'''{user_query}'''""""")
        ]
        response = await ops_client.get_text_generation_async(workspace_name, service_id_config["llm"],
                                                              GetTextGenerationRequest(messages=messages))
        print("Final answer: ", response.body.result.text)
    
    
    if __name__ == "__main__":
        #    import nest_asyncio # If you run the code in Jupyter Notebook, uncomment this line of code.
        #    nest_asyncio.apply() # If you run the code in Jupyter Notebook, uncomment this line of code.
        asyncio.run(query_pipeline_execute())