All Products
Search
Document Center

OpenSearch:Bangun pencarian percakapan berbasis RAG

Last Updated:Jun 22, 2026

Untuk pencarian percakapan di atas basis pengetahuan, AI Search Open Platform menyediakan pipeline pengembangan lengkap guna membangun aplikasi Retrieval-Augmented Generation (RAG). Pipeline ini mencakup tiga modul utama: pra-pemrosesan data, pengambilan (retrieval), dan pembangkitan jawaban. AI Search Open Platform menyediakan kemampuan masing-masing modul sebagai layanan algoritma berbasis komponen yang dapat dikomposisikan—seperti parsing dokumen, reranking, dan pembangkitan jawaban—sehingga Anda dapat dengan cepat menghasilkan kode pengembangan. Layanan-layanan tersebut diekspos melalui API. Unduh kode yang disediakan dan ganti kunci API, titik akhir layanan, serta informasi basis pengetahuan lokal sesuai petunjuk dalam topik ini untuk segera membangun aplikasi pencarian percakapan berbasis RAG Anda.

Cara kerja

Retrieval-Augmented Generation (RAG) adalah teknik AI yang menggabungkan pengambilan informasi dengan model bahasa besar (LLM) untuk meningkatkan relevansi, akurasi, dan keragaman konten yang dihasilkan. Saat menangani kueri, sistem RAG pertama-tama mengambil cuplikan informasi paling relevan dari basis pengetahuan eksternal. Informasi tersebut, bersama dengan kueri asli, kemudian diberikan sebagai konteks kepada LLM. Pendekatan ini memungkinkan model menghasilkan jawaban yang lebih tepat dan informatif dengan merujuk pada data eksternal yang mutakhir atau spesifik domain, bukan hanya mengandalkan data pelatihan internalnya.

RAG-based intelligent Q&A implementation flowchart.jpg

Kasus penggunaan

Pencarian percakapan di atas basis pengetahuan sangat ideal untuk kasus penggunaan seperti pengambilan pengetahuan internal perusahaan dan tanya jawab spesialisasi di domain vertikal. Penerapan Retrieval-Augmented Generation (RAG) dan model bahasa besar (LLM) pada dokumen basis pengetahuan profesional Anda memungkinkan sistem memahami dan merespons kueri bahasa alami yang kompleks. Hal ini membantu pengguna perusahaan menemukan informasi yang dibutuhkan dengan cepat dalam berbagai format dokumen, termasuk PDF, Word, tabel, dan gambar.

Dalam antarmuka pencarian percakapan, pengguna mengajukan pertanyaan dalam bahasa alami. Sistem mengembalikan jawaban terstruktur dan memberikan rekomendasi pertanyaan lanjutan untuk eksplorasi lebih jauh.

Prasyarat

  • Layanan AI Search Open Platform telah diaktifkan. Untuk informasi selengkapnya, lihat Aktifkan layanan.

  • Dapatkan titik akhir layanan dan kredensial autentikasi. Untuk informasi selengkapnya, lihat Dapatkan titik akhir layanan dan Kelola kunci API.

    AI Search Open Platform mendukung panggilan layanan melalui titik akhir publik dan titik akhir VPC. Panggilan lintas wilayah didukung melalui titik akhir VPC. Saat ini, pengguna di wilayah China (Shanghai), China (Hangzhou), China (Shenzhen), China (Beijing), China (Zhangjiakou), dan China (Qingdao) dapat memanggil layanan AI Search Open Platform melalui titik akhir VPC.

    Pada halaman API Keys, pesan di bagian atas berbunyi, "Kunci API digunakan untuk izin pemanggilan layanan. Harap simpan dengan aman. Jika kunci API dikompromikan, segera nonaktifkan. Anda dapat mengaktifkan hingga 10 kunci API secara bersamaan." Bagian Access Domain menampilkan Public API Domain dan Private API Domain, keduanya mendukung akses HTTPS.

  • Buat kluster Alibaba Cloud Elasticsearch versi 8.5 atau lebih baru. Untuk informasi selengkapnya, lihat Buat kluster Alibaba Cloud Elasticsearch. Saat mengakses kluster melalui jaringan publik atau VPC, Anda harus menambahkan alamat IP perangkat Anda ke daftar putih alamat IP kluster. Untuk informasi selengkapnya, lihat Konfigurasikan daftar putih alamat IP publik atau privat untuk kluster Elasticsearch.

  • Anda memiliki lingkungan Python 3.7 atau lebih baru dengan paket aiohttp 3.8.6 dan elasticsearch 8.14 yang telah diinstal.

Bangun pipeline pengembangan RAG

Catatan

Untuk kenyamanan Anda, AI Search Open Platform menyediakan empat jenis framework pengembangan:

  • Java SDK

  • Python SDK

  • LangChain: Pilih opsi ini jika bisnis Anda sudah menggunakan framework LangChain.

  • LlamaIndex: Pilih opsi ini jika bisnis Anda sudah menggunakan framework LlamaIndex.

Langkah 1: Pilih layanan dan unduh kode

Berdasarkan basis pengetahuan dan kebutuhan bisnis Anda, pilih layanan algoritma dan framework pengembangan untuk pipeline RAG Anda. Topik ini menggunakan Python SDK sebagai contoh.

  1. Masuk ke Konsol AI Search Open Platform.

  2. Pilih wilayah China (Shanghai), beralih ke AI Search Open Platform, lalu pilih ruang kerja target Anda.

    Catatan
    • AI Search Open Platform hanya tersedia di wilayah China (Shanghai) dan Jerman (Frankfurt).

    • Pengguna di wilayah China (Hangzhou), China (Shenzhen), China (Beijing), China (Zhangjiakou), dan China (Qingdao) dapat menggunakan titik akhir VPC untuk memanggil layanan AI Search Open Platform lintas wilayah.

  3. Di panel navigasi sebelah kiri, klik Scene Center. Pada kartu RAG Scene-Knowledge Base Online Q & A., klik Enter.

  4. Dari daftar drop-down, pilih layanan yang Anda butuhkan sesuai kebutuhan bisnis. Anda dapat melihat informasi detail setiap layanan pada tab Service Details.

    Catatan
    • Saat memanggil layanan algoritma dalam pipeline RAG melalui API, Anda harus menyediakan ID layanan (service_id). Misalnya, ID untuk layanan parsing konten dokumen adalah ops-document-analyze-001.

    • Saat Anda mengganti layanan dalam daftar, service_id dalam kode yang dihasilkan akan diperbarui secara otomatis. Setelah mengunduh kode, Anda tetap dapat mengubah service_id untuk memanggil layanan berbeda.

    Tahap

    Deskripsi layanan

    Document content parsing

    Document content parsing service (ops-document-analyze-001): Layanan umum yang mengekstraksi struktur logis seperti judul dan paragraf dari dokumen tidak terstruktur (teks, tabel, dan gambar) serta mengeluarkannya dalam format terstruktur.

    Image content parsing

    • Image content understanding service (ops-image-analyze-vlm-001): Menggunakan model besar multimodal untuk mengurai, memahami, dan mengenali teks dari gambar. Teks yang diekstraksi dapat digunakan untuk skenario pengambilan dan tanya jawab berbasis gambar.

    • Image text recognition service (ops-image-analyze-ocr-001): Menggunakan OCR untuk mengenali teks dalam gambar. Teks yang diurai dapat digunakan untuk skenario pengambilan dan tanya jawab berbasis gambar.

    Document chunking

    Document chunking service (ops-document-split-001): Layanan chunking teks umum yang membagi data terstruktur dalam format HTML, Markdown, dan TXT berdasarkan paragraf dokumen, semantik teks, atau aturan tertentu. Layanan ini juga mendukung ekstraksi kode, gambar, dan tabel sebagai teks kaya dari dokumen.

    Text embedding

    • OpenSearch text embedding service-001 (ops-text-embedding-001): Menyediakan penyematan teks multibahasa (40+). Panjang input maksimum adalah 300 token, dan dimensi vektor output adalah 1536.

    • OpenSearch general text embedding service-002 (ops-text-embedding-002): Menyediakan penyematan teks multibahasa (100+). Panjang input maksimum adalah 8.192 token, dan dimensi vektor output adalah 1024.

    • OpenSearch text embedding service-Chinese-001 (ops-text-embedding-zh-001): Menyediakan penyematan teks bahasa Tionghoa. Panjang input maksimum adalah 1.024 token, dan dimensi vektor output adalah 768.

    • OpenSearch text embedding service-English-001 (ops-text-embedding-en-001): Menyediakan penyematan teks bahasa Inggris. Panjang input maksimum adalah 512 token, dan dimensi vektor output adalah 768.

    Sparse text embedding

    Mengonversi data teks menjadi representasi vektor jarang. Vektor jarang menggunakan lebih sedikit penyimpanan dan sering digunakan untuk merepresentasikan kata kunci dan frekuensi istilah. Vektor ini dapat dikombinasikan dengan vektor padat untuk pencarian hibrid guna meningkatkan kinerja pengambilan.

    OpenSearch sparse text embedding service (ops-text-sparse-embedding-001): Menyediakan penyematan teks jarang multibahasa (100+). Panjang input maksimum adalah 8.192 token.

    Query analysis

    Query analysis service 001 (ops-query-analyze-001): Layanan analisis kueri umum yang menggunakan model bahasa besar untuk memahami maksud kueri pengguna dan memperluasnya dengan pertanyaan serupa.

    Search engine

    • Alibaba Cloud Elasticsearch: Layanan cloud terkelola penuh yang dibangun di atas Elasticsearch open-source. Layanan ini 100% kompatibel dengan fitur open-source dan menawarkan pengalaman siap pakai dengan model bayar sesuai penggunaan.

      Catatan

      Jika Anda memilih Alibaba Cloud Elasticsearch sebagai mesin pencari, layanan sparse text embedding tidak tersedia karena masalah kompatibilitas. Kami menyarankan menggunakan layanan text embedding sebagai gantinya.

    • OpenSearch Vector Search Edition: Mesin pencarian vektor terdistribusi berskala besar yang dikembangkan oleh Alibaba. Layanan ini mendukung berbagai algoritma pencarian vektor, memberikan kinerja unggul dengan presisi tinggi, serta memungkinkan pengindeksan dan pengambilan hemat biaya dalam skala besar. Indeksnya mendukung skalabilitas horizontal dan penggabungan, pembuatan streaming, kueri real-time, serta pembaruan data dinamis.

      Catatan

      Jika Anda perlu menggunakan OpenSearch Vector Search Edition, Anda dapat mengganti konfigurasi mesin dan kode dalam pipeline RAG.

    Reranking service

    BGE reranker model (ops-bge-reranker-larger): Layanan penilaian dokumen umum. Layanan ini mengurutkan dokumen berdasarkan relevansi antara kueri dan konten dokumen, mengurutkannya dari skor tertinggi ke terendah, serta mengeluarkan hasil penilaian tersebut.

    Large language model

    • OpenSearch-Qwen-Turbo (ops-qwen-turbo): Dibangun di atas model bahasa besar Qwen-Turbo, layanan ini telah dilatih ulang dengan Supervised Learning untuk meningkatkan augmentasi pengambilan dan mengurangi respons berbahaya.

    • Qwen-Turbo (qwen-turbo): Model bahasa besar dari seri Qwen yang mendukung berbagai bahasa, termasuk Tionghoa dan Inggris. Untuk informasi selengkapnya, lihat Pengantar LLM seri Qwen.

    • Qwen-Plus (qwen-plus): Versi peningkatan dari model bahasa besar Qwen-Turbo yang mendukung berbagai bahasa, termasuk Tionghoa dan Inggris. Untuk informasi selengkapnya, lihat Pengantar LLM seri Qwen.

    • Qwen-Max (qwen-max): Model bahasa ultra-besar berskala triliunan parameter dari seri Qwen yang mendukung berbagai bahasa, termasuk Tionghoa dan Inggris. Untuk informasi selengkapnya, lihat Pengantar LLM seri Qwen.

  5. Setelah memilih layanan Anda, klik After the configuration is completed, enter the code query untuk melihat dan mengunduh kode.

    Kode tersebut terstruktur menjadi dua bagian yang mencerminkan alur waktu proses pipeline RAG: pemrosesan dokumen offline dan pencarian percakapan online.

    Proses

    Fungsi

    Deskripsi

    Offline document processing

    Memproses dokumen, termasuk parsing, ekstraksi gambar, chunking, embedding, dan menulis hasil ke indeks Elasticsearch.

    Fungsi utama document_pipeline_execute menyelesaikan alur kerja berikut. Anda dapat memasukkan dokumen melalui URL atau encoding Base64.

    1. Document parsing. Untuk detail API, lihat Document Parsing API.

      • Panggil API parsing dokumen asinkron untuk mengekstraksi konten dari URL dokumen atau mendekode konten dari file berkode Base64.

      • Gunakan fungsi create_async_extraction_task untuk membuat tugas parsing dan fungsi poll_task_result untuk memeriksa status penyelesaian tugas.

    2. Image extraction. Untuk detail API, lihat Image Content Extraction API.

      • Panggil API parsing gambar asinkron untuk mengekstraksi konten dari URL gambar atau mendekodenya dari file berkode Base64.

      • Gunakan fungsi create_image_analyze_task untuk membuat tugas parsing gambar dan fungsi get_image_analyze_task_status untuk mendapatkan statusnya.

    3. Document chunking. Untuk detail API, lihat Document Chunking API.

      • Panggil API chunking dokumen untuk membagi dokumen yang telah diurai sesuai strategi tertentu.

      • Gunakan fungsi document_split untuk chunking dokumen dan parsing konten teks kaya.

    4. Text embedding. Untuk detail API, lihat Text Embedding API.

      • Panggil API text embedding untuk membuat representasi vektor dari teks yang telah di-chunk.

      • Gunakan fungsi text_embedding untuk menghitung vektor embedding setiap chunk.

    5. Write to Elasticsearch. Untuk detail layanan, lihat Gunakan fitur pencarian k-nearest neighbor (kNN) Elasticsearch.

      • Buat konfigurasi indeks Elasticsearch yang menentukan bidang vektor embedding dan bidang konten dokumen content.

        Penting

        Saat Anda membuat indeks Elasticsearch, indeks yang sudah ada dengan nama yang sama akan dihapus. Untuk menghindari kehilangan data secara tidak sengaja, ubah nama indeks dalam kode.

      • Gunakan fungsi helpers.async_bulk untuk menulis secara massal hasil vektorisasi ke indeks Elasticsearch.

    Online conversational search

    Memproses kueri pengguna online, termasuk menghasilkan vektor kueri, melakukan analisis kueri, mengambil chunk dokumen relevan, menyusun ulang hasil pencarian, dan menghasilkan jawaban akhir.

    Fungsi utama query_pipeline_execute menyelesaikan alur kerja berikut untuk memproses kueri pengguna dan mengembalikan jawaban.

    1. Vectorize the query. Untuk detail API, lihat Text Embedding API.

      • Panggil API text embedding untuk mengonversi kueri pengguna menjadi vektor.

      • Gunakan fungsi text_embedding untuk menghasilkan vektor kueri.

    2. Panggil layanan analisis kueri. Untuk detailnya, lihat Query Analysis API.

      Layanan ini mengidentifikasi maksud pengguna dan menghasilkan pertanyaan serupa dengan menganalisis riwayat percakapan.

    3. Search for embedding chunks. Untuk detail layanan, lihat Gunakan fitur pencarian k-nearest neighbor (kNN) Elasticsearch.

      • Gunakan Elasticsearch untuk mengambil chunk dokumen dari indeks yang mirip dengan vektor kueri.

      • Gunakan API search dari AsyncElasticsearch yang dikombinasikan dengan kueri kNN untuk melakukan pencarian kemiripan.

    4. Panggil layanan reranking. Untuk detailnya, lihat Reranking API.

      • Panggil API layanan reranking untuk memberi skor dan mengurutkan chunk yang diambil.

      • Gunakan fungsi documents_ranking untuk memberi skor dan mengurutkan dokumen berdasarkan kueri pengguna.

    5. Generate an answer with the large language model. Untuk detail API, lihat Answer Generation API.

      Panggil layanan LLM, menggunakan fungsi llm_call dengan hasil pengambilan dan kueri pengguna untuk menghasilkan jawaban akhir.

    Di bawah Code Query, pilih Document processing flow dan Online Q & A Process, lalu klik Copy Code atau Download File untuk menyimpan kode secara lokal.

Langkah 2: Konfigurasikan dan uji pipeline

Setelah mengunduh kode ke dalam dua file lokal, seperti offline.py dan online.py, Anda perlu mengonfigurasi parameter kunci dalam kode.

Kategori

Parameter

Deskripsi

AI Search Open Platform

api_key

Kunci API untuk autentikasi. Untuk informasi selengkapnya, lihat Kelola kunci API.

aisearch_endpoint

Titik akhir layanan untuk panggilan API. Untuk informasi selengkapnya, lihat Dapatkan titik akhir layanan.

Catatan

Hapus awalan "http://" dari URL titik akhir.

Panggilan API didukung melalui titik akhir publik maupun VPC.

workspace_name

Nama ruang kerja Anda di AI Search Open Platform.

service_id

ID layanan. Untuk kenyamanan, Anda dapat mengonfigurasi ID layanan untuk berbagai layanan di kedua file offline.py dan online.py dengan menggunakan kamus service_id_config.

# Konfigurasi AI Search Open Platform
api_key = "xxx"
host = "http://xxx.platform-cn-shanghai.opensearch.aliyuncs.com"
workspace_name = "default"
# Konfigurasi ID layanan
service_id_config = {"extract": "ops-document-analyze-001", "split": "ops-document-split-001", "emb": "ops-text-embedding-001"}

Elasticsearch search engine

es_host

Titik akhir kluster Elasticsearch. Saat mengakses kluster melalui jaringan publik atau VPC, Anda harus menambahkan alamat IP perangkat Anda ke daftar putih alamat IP kluster. Untuk informasi selengkapnya, lihat Konfigurasikan daftar putih alamat IP publik atau privat untuk kluster Elasticsearch.

es_auth

Username dan password untuk mengakses kluster Elasticsearch. Username-nya adalah elastic, dan password-nya adalah yang Anda tetapkan saat membuat kluster. Jika lupa password, Anda dapat mengatur ulang. Untuk informasi selengkapnya, lihat Atur ulang password akses instans.

Parameter lainnya

Tidak perlu modifikasi jika Anda menggunakan data sampel.

Setelah mengonfigurasi parameter, jalankan skrip offline.py terlebih dahulu, diikuti skrip online.py, dalam lingkungan Python 3.7 atau lebih baru untuk menguji hasilnya.

Jika dokumen basis pengetahuan adalah Pengantar AI Search Open Platform, ajukan pertanyaan berikut: What can AI Search Open Platform do?

Anda seharusnya melihat output berikut:

  • Hasil pemrosesan dokumen offline

    image analyze :https://img.alicdn.com/imgextra/i2/O1CN01bYc1m81RrcSAyOjMu_!!6000000002165-54-tps-60-60.apng
        https://img.alicdn.com/imgextra/i2/O1CN01bYc1m81RrcSAyOjMu_!!6000000002165-54-tps-60-60.apng is not analyzable.
        image analyze :https://help-static-aliyun-doc.aliyuncs.com/assets/img/zh-CN/3873436171/p802381.png
        image analyze :https://help-static-aliyun-doc.aliyuncs.com/assets/img/zh-CN/0650850271/p819277.png
        image analyze :https://help-static-aliyun-doc.aliyuncs.com/assets/img/zh-CN/0650850271/p819277.png
        image analyze ://gw.alicdn.com/tfs/TB1GxwdSXXXXXa.aXXXXXXXXXXX-65-70.gif
            https://gw.alicdn.com/tfs/TB1GxwdSXXXXXa.aXXXXXXXXXXX-65-70.gif is not analyzable.
        image analyze ://img.alicdn.com/tfs/TB1..50QpXXXX7XpXXXXXXXXXX-40-40.png
        image analyze :https://img.alicdn.com/tfs/TB1A0dINW6qK1RjSZFmXXX0PFXa-258-258.jpg
        image analyze ://gw.alicdn.com/tfs/TB1GxwdSXXXXXa.aXXXXXXXXXXX-65-70.gif
            https://gw.alicdn.com/tfs/TB1GxwdSXXXXXa.aXXXXXXXXXXX-65-70.gif is not analyzable.
        image analyze ://img.alicdn.com/tfs/TB1..50QpXXXX7XpXXXXXXXXXX-40-40.png
        image analyze ://gw.alicdn.com/tfs/TB1GxwdSXXXXXa.aXXXXXXXXXXX-65-70.gif
            https://gw.alicdn.com/tfs/TB1GxwdSXXXXXa.aXXXXXXXXXXX-65-70.gif is not analyzable.
        image analyze ://img.alicdn.com/tfs/TB1..50QpXXXX7XpXXXXXXXXXX-40-40.png
    text-embedding done
    OS write response:  {"status":"OK","code":200}
  • Hasil pencarian percakapan online

    /opt/miniconda3/envs/QA-pytest-base-lib1/bin/python /Users/liu/codeRepos/QA-pytest-base-lib/rag/case/SDK/python_sdk_es_zx.py
    query analysis rewrite result:What can the OpenSearch AI Search Open Platform do?
    Final answer from the large model:  AI Search Open Platform provides intelligent search services that power the core search functions for Alibaba's businesses, including Taobao and Tmall, and offers intelligent search solutions to external clients across various industries. It features industry-specific query semantic understanding and machine learning ranking algorithms to help developers build high-quality intelligent search services.
    The platform is suitable for a wide range of scenarios, including but not limited to:
    - E-commerce and retail intelligent search
    - Content and news search
    - Gaming industry search
    - Healthcare industry search
    - Financial industry search
    AI Search Open Platform focuses on intelligent search and Retrieval-Augmented Generation (RAG) scenarios, providing component-based services and flexible calling mechanisms. It has built-in services for document parsing, document chunking, text embedding, retrieval, reranking, and large language models, enabling a one-stop, flexible development experience for AI search applications.
    Process finished with exit code 0
  • File kode sumber

    offline.py
    # Pipeline offline RAG - Mesin Elasticsearch
    # Persyaratan lingkungan:
    # Python 3.7 atau lebih baru
    # Kluster Elasticsearch 8.5 atau lebih baru. Jika menggunakan Alibaba Cloud Elasticsearch, Anda harus mengaktifkan layanan dan mengonfigurasi daftar putih alamat IP terlebih dahulu. Lihat https://www.alibabacloud.com/help/en/elasticsearch/latest/configure-a-public-or-private-ip-address-whitelist-for-an-elasticsearch-cluster
    
    # Persyaratan paket:
    # pip install alibabacloud_searchplat20240529
    # pip install elasticsearch
    
    # Konfigurasi AI Search Open Platform
    aisearch_endpoint = "xxx.platform-cn-shanghai.opensearch.aliyuncs.com"
    api_key = "OS-xxx"
    workspace_name = "default"
    service_id_config = {"extract": "ops-document-analyze-001",
                         "split": "ops-document-split-001",
                         "text_embedding": "ops-text-embedding-001",
                         "text_sparse_embedding": "ops-text-sparse-embedding-001",
                         "image_analyze": "ops-image-analyze-ocr-001"}
    
    # Konfigurasi Elasticsearch
    es_host = 'http://es-cn-xxx.public.elasticsearch.aliyuncs.com:9200'
    es_auth = ('elastic', 'xxx')
    
    # URL dokumen input. Contoh menggunakan dokumentasi produk untuk AI Search Open Platform.
    document_url = "https://www.alibabacloud.com/help/en/open-search/search-platform/product-overview/introduction-to-search-platform"
    
    import asyncio
    from typing import List
    from elasticsearch import AsyncElasticsearch
    from elasticsearch import helpers
    from alibabacloud_tea_openapi.models import Config
    from alibabacloud_searchplat20240529.client import Client
    from alibabacloud_searchplat20240529.models import GetDocumentSplitRequest, CreateDocumentAnalyzeTaskRequest, \
        CreateDocumentAnalyzeTaskRequestDocument, GetDocumentAnalyzeTaskStatusRequest, \
        GetDocumentSplitRequestDocument, GetTextEmbeddingRequest, GetTextEmbeddingResponseBodyResultEmbeddings, \
        GetTextSparseEmbeddingRequest, GetTextSparseEmbeddingResponseBodyResultSparseEmbeddings, \
        CreateImageAnalyzeTaskRequestDocument, CreateImageAnalyzeTaskRequest, CreateImageAnalyzeTaskResponse, \
        GetImageAnalyzeTaskStatusRequest, GetImageAnalyzeTaskStatusResponse
    
    
    async def poll_task_result(ops_client, task_id, service_id, interval=5):
        while True:
            request = GetDocumentAnalyzeTaskStatusRequest(task_id=task_id)
            response = await ops_client.get_document_analyze_task_status_async(workspace_name, service_id, request)
            status = response.body.result.status
            if status == "PENDING":
                await asyncio.sleep(interval)
            elif status == "SUCCESS":
                return response
            else:
                raise Exception("document analyze task failed")
    
    
    def is_analyzable_url(url:str):
        if not url:
            return False
        image_extensions = {'.jpg', '.jpeg', '.png', '.bmp', '.tiff'}
        return url.lower().endswith(tuple(image_extensions))
    
    
    async def image_analyze(ops_client, url):
        try:
            print("image analyze :" + url)
            if url.startswith("//"):
                url = "https:" + url
            if not is_analyzable_url(url):
                print(url + " is not analyzable.")
                return url
            image_analyze_service_id = service_id_config["image_analyze"]
            document = CreateImageAnalyzeTaskRequestDocument(
                url=url,
            )
            request = CreateImageAnalyzeTaskRequest(document=document)
            response: CreateImageAnalyzeTaskResponse = ops_client.create_image_analyze_task(workspace_name, image_analyze_service_id, request)
            task_id = response.body.result.task_id
            while True:
                request = GetImageAnalyzeTaskStatusRequest(task_id=task_id)
                response: GetImageAnalyzeTaskStatusResponse = ops_client.get_image_analyze_task_status(workspace_name, image_analyze_service_id, request)
                status = response.body.result.status
                if status == "PENDING":
                    await asyncio.sleep(5)
                elif status == "SUCCESS":
                    return url + response.body.result.data.content
                else:
                    print("image analyze error: " + response.body.result.error)
                    return url
        except Exception as e:
            print(f"image analyze Exception : {e}")
    
    
    def chunk_list(lst, chunk_size):
        for i in range(0, len(lst), chunk_size):
            yield lst[i:i + chunk_size]
    
    
    async def write_to_es(doc_list):
        es = AsyncElasticsearch(
            [es_host],
            basic_auth=es_auth,
            verify_certs=False,  # Jangan verifikasi sertifikat SSL
            request_timeout=30,
            max_retries=10,
            retry_on_timeout=True
        )
        index_name = 'dense_vertex_index'
    
        # Hapus indeks yang sudah ada jika ada.
        if await es.indices.exists(index=index_name):
            await es.indices.delete(index=index_name)
    
        # Buat indeks vektor. Tentukan bidang `emb` sebagai dense_vector, `content` sebagai text, dan `source_doc` sebagai keyword.
        index_mappings = {
            "mappings": {
                "properties": {
                    "emb": {
                        "type": "dense_vector",
                        "index": True,
                        "similarity": "cosine",
                        "dims": 1536  # Ubah dimensi berdasarkan output model text embedding.
                    },
                    "content": {
                        "type": "text"
                    },
                    "source_doc": {
                        "type": "keyword"
                    }
                }
            }
        }
        await es.indices.create(index=index_name, body=index_mappings)
    
        # Unggah massal hasil embedding ke indeks yang baru dibuat.
        actions = []
        for i, doc in enumerate(doc_list):
            action = {
                "_index": index_name,
                "_id": doc['id'],
                "_source": {
                    "emb": doc['embedding'],
                    "content": doc['content'],
                    "source_doc": document_url
                }
            }
            actions.append(action)
    
        try:
            await helpers.async_bulk(es, actions)
        except Exception as e:
            for error in e.errors:
                print(error)
    
        # Konfirmasi unggahan berhasil.
        await asyncio.sleep(2)
        query = {
            "query": {
                "ids": {
                    "values": [doc_list[0]["id"]]
                }
            }
        }
        res = await es.search(index=index_name, body=query)
        if len(res['hits']['hits']) > 0:
            print("ES write success")
        await es.close()
    
    
    async def document_pipeline_execute(document_url: str = None, document_base64: str = None, file_name: str = None):
    
        # Inisialisasi klien AI Search Open Platform.
        config = Config(bearer_token=api_key, endpoint=aisearch_endpoint, protocol="http")
        ops_client = Client(config=config)
    
        # Langkah 1: Document parsing
        document_analyze_request = CreateDocumentAnalyzeTaskRequest(
            document=CreateDocumentAnalyzeTaskRequestDocument(url=document_url, content=document_base64,
                                                              file_name=file_name, file_type='html'))
        document_analyze_response = await ops_client.create_document_analyze_task_async(workspace_name=workspace_name,
                                                                                        service_id=service_id_config[
                                                                                            "extract"],
                                                                                        request=document_analyze_request)
        print("document-analyze task_id:" + document_analyze_response.body.result.task_id)
        extraction_result = await poll_task_result(ops_client, document_analyze_response.body.result.task_id,
                                                   service_id_config["extract"])
        print("document-analyze done")
        document_content = extraction_result.body.result.data.content
        content_type = extraction_result.body.result.data.content_type
        
        # Langkah 2: Document chunking
        document_split_request = GetDocumentSplitRequest(
            GetDocumentSplitRequestDocument(content=document_content, content_type=content_type))
        document_split_result = await ops_client.get_document_split_async(workspace_name, service_id_config["split"],
                                                                          document_split_request)
        print("document-split done, chunks count: " + str(len(document_split_result.body.result.chunks))
              + " rich text count:" + str(len(document_split_result.body.result.rich_texts)))
    
        # Langkah 3: Text embedding
        # Ekstrak hasil chunking. Untuk chunk gambar, konten teks diekstraksi menggunakan layanan analisis gambar.
        doc_list = ([{"id": chunk.meta.get("id"), "content": chunk.content} for chunk in
                     document_split_result.body.result.chunks]
                    + [{"id": chunk.meta.get("id"), "content": chunk.content} for chunk in
                       document_split_result.body.result.rich_texts if chunk.meta.get("type") != "image"]
                    + [{"id": chunk.meta.get("id"), "content": await image_analyze(ops_client, chunk.content)} for chunk in
                       document_split_result.body.result.rich_texts if chunk.meta.get("type") == "image"]
                    )
    
        chunk_size = 32  # Maksimal 32 embedding dapat dihitung per permintaan.
        all_text_embeddings: List[GetTextEmbeddingResponseBodyResultEmbeddings] = []
        for chunk in chunk_list([text["content"] for text in doc_list], chunk_size):
            response = await ops_client.get_text_embedding_async(workspace_name, service_id_config["text_embedding"],
                                                                 GetTextEmbeddingRequest(chunk))
            all_text_embeddings.extend(response.body.result.embeddings)
    
        all_text_sparse_embeddings: List[GetTextSparseEmbeddingResponseBodyResultSparseEmbeddings] = []
        for chunk in chunk_list([text["content"] for text in doc_list], chunk_size):
            response = await ops_client.get_text_sparse_embedding_async(workspace_name,
                                                                        service_id_config["text_sparse_embedding"],
                                                                        GetTextSparseEmbeddingRequest(chunk,
                                                                                                      input_type="document",
                                                                                                      return_token=True))
            all_text_sparse_embeddings.extend(response.body.result.sparse_embeddings)
    
        for i in range(len(doc_list)):
            doc_list[i]["embedding"] = all_text_embeddings[i].embedding
            doc_list[i]["sparse_embedding"] = all_text_sparse_embeddings[i].embedding
    
        print("text-embedding done")
    
        # Langkah 4: Tulis ke mesin penyimpanan Elasticsearch.
        await write_to_es(doc_list)
    
    
    if __name__ == "__main__":
        # Jalankan tugas asinkron.
        #    import nest_asyncio # Jika menjalankan di notebook Jupyter, hapus komentar dua baris ini.
        #    nest_asyncio.apply() # Jika menjalankan di notebook Jupyter, hapus komentar dua baris ini.
        asyncio.run(document_pipeline_execute(document_url))
        # asyncio.run(document_pipeline_execute(document_base64="eHh4eHh4eHg...", file_name="attention.pdf")) # Metode pemanggilan alternatif
              
    online.py
    # Pipeline online RAG - Mesin Elasticsearch
    # Persyaratan lingkungan:
    # Python 3.7 atau lebih baru
    # Kluster Elasticsearch 8.5 atau lebih baru. Jika menggunakan Alibaba Cloud Elasticsearch, Anda harus mengaktifkan layanan dan mengonfigurasi daftar putih alamat IP terlebih dahulu. Lihat https://www.alibabacloud.com/help/en/elasticsearch/latest/configure-a-public-or-private-ip-address-whitelist-for-an-elasticsearch-cluster
    
    # Persyaratan paket:
    # pip install alibabacloud_searchplat20240529
    # pip install elasticsearch
    
    # Konfigurasi AI Search Open Platform
    api_key = "OS-xxx"
    aisearch_endpoint = "xxx.platform-cn-shanghai.opensearch.aliyuncs.com"
    workspace_name = "default"
    service_id_config = {
        "rank": "ops-bge-reranker-larger",
        "text_embedding": "ops-text-embedding-001",
        "text_sparse_embedding": "ops-text-sparse-embedding-001",
        "llm": "ops-qwen-turbo",
        "query_analyze": "ops-query-analyze-001"
    }
    
    # Konfigurasi Elasticsearch
    es_host = 'http://es-cn-xxx.public.elasticsearch.aliyuncs.com:9200'
    es_auth = ('elastic', 'xxx')
    
    # Kueri pengguna:
    user_query = "What can AI Search Open Platform do?"
    
    import asyncio
    from elasticsearch import AsyncElasticsearch
    from alibabacloud_tea_openapi.models import Config
    from alibabacloud_searchplat20240529.client import Client
    from alibabacloud_searchplat20240529.models import GetTextEmbeddingRequest,  \
        GetDocumentRankRequest, GetTextGenerationRequest, GetTextGenerationRequestMessages, \
        GetQueryAnalysisRequest
    
    # Inisialisasi klien AI Search Open Platform.
    config = Config(bearer_token=api_key, endpoint=aisearch_endpoint, protocol="http")
    ops_client = Client(config=config)
    
    
    async def es_retrieve(query):
        es = AsyncElasticsearch(
            [es_host],
            basic_auth=es_auth,
            verify_certs=False,
            request_timeout=30,
            max_retries=10,
            retry_on_timeout=True
        )
        index_name = 'dense_vertex_index'
        # Vektorisasi kueri.
        query_emb_result = await ops_client.get_text_embedding_async(workspace_name, service_id_config["text_embedding"],
                                                                     GetTextEmbeddingRequest(input=[query],
                                                                                             input_type="query"))
        query_emb = query_emb_result.body.result.embeddings[0].embedding
        query = {
            "field": "emb",
            "query_vector": query_emb,
            "k": 5,  # Jumlah chunk dokumen yang dikembalikan
            "num_candidates": 100  # Parameter pencarian HNSW (ef_search)
        }
    
        res = await es.search(index=index_name, knn=query)
        search_results = [item['_source']['content'] for item in res['hits']['hits']]
        await es.close()
        return search_results
    
    
    # Pipeline pencarian percakapan online. Inputnya adalah pertanyaan pengguna.
    async def query_pipeline_execute():
    
        # Langkah 1: Analisis kueri
        query_analyze_response = ops_client.get_query_analysis(workspace_name, service_id_config['query_analyze'],
                                                               GetQueryAnalysisRequest(query=user_query))
        print("query analysis rewrite result:" + query_analyze_response.body.result.query)
    
        # Langkah 2: Pengambilan dokumen
        all_query_results = []
        user_query_results = await es_retrieve(user_query)
        all_query_results.extend(user_query_results)
        rewrite_query_results = await es_retrieve(query_analyze_response.body.result.query)
        all_query_results.extend(rewrite_query_results)
        for extend_query in query_analyze_response.body.result.queries:
            extend_query_result = await es_retrieve(extend_query)
            all_query_results.extend(extend_query_result)
        # Hilangkan duplikasi semua hasil yang diambil.
        remove_duplicate_results = list(set(all_query_results))
    
        # Langkah 3: Susun ulang dokumen yang diambil.
        rerank_top_k = 8
        score_results = await ops_client.get_document_rank_async(workspace_name, service_id_config["rank"],GetDocumentRankRequest(remove_duplicate_results, user_query))
        rerank_results = [remove_duplicate_results[item.index] for item in score_results.body.result.scores[:rerank_top_k]]
    
        # Langkah 4: Panggil model bahasa besar untuk menghasilkan jawaban.
        docs = '\n'.join([f"<article>{s}</article>" for s in rerank_results])
        messages = [
            GetTextGenerationRequestMessages(role="system", content="You are a helpful assistant."),
            GetTextGenerationRequestMessages(role="user",
                                             content=f"""The provided information contains multiple independent documents, each enclosed in <article> and </article> tags. Information:\n'''{docs}'''
                                             \n\nBased on the information provided above, answer the user's question in a detailed and organized manner. Ensure your answer fully addresses the question and correctly uses the provided information. If the information is insufficient to answer the question, state "The question cannot be answered based on the provided information." Do not use any information outside of the provided context. Ensure that every statement in your answer is supported by the context. Please answer in English.
                                             \nQuestion: '''{user_query}'''""""")
        ]
        response = await ops_client.get_text_generation_async(workspace_name, service_id_config["llm"],
                                                              GetTextGenerationRequest(messages=messages))
        print("Final answer from the large model: ", response.body.result.text)
    
    
    if __name__ == "__main__":
        # Jalankan tugas asinkron.
        #    import nest_asyncio # Jika menjalankan di notebook Jupyter, hapus komentar dua baris ini.
        #    nest_asyncio.apply() # Jika menjalankan di notebook Jupyter, hapus komentar dua baris ini.
        asyncio.run(query_pipeline_execute())
              

FAQ

Saat eksekusi kode, Anda mungkin melihat pesan "Unclosed connector" karena sumber daya belum dilepas tepat waktu. Anda dapat mengabaikan pesan ini dengan aman.