全部產品
Search
文件中心

Hologres:最佳實務:金融多模態AI資料分析與檢索系統

更新時間:Nov 06, 2025

在資料驅動時代,非結構化資料(文本、映像、音視頻、日誌等)與結構化、半結構化資料(JSON)共同構成企業的核心資料資產。其中,非結構化資料以更原始、多元的形態蘊含著海量的業務洞察(如使用者反饋、合約條款、產品缺陷映像),本文將會類比金融情境中對招股書、合約等PDF檔案的檢索與分析,以輔助業務進行下一步的精細化營運決策。

核心能力介紹

本最佳實務主要是對PDF非結構化資料的處理與檢索,包含的主要能力如下:

  • 非結構化資料(Object Table):支援通過表的形式讀取OSS中非結構化資料(PDF/IMAGE/PPT等)。

  • AI Function:在Hologres中可以用標準SQL的方式調用AI Function,自動調用內建大模型,完成AI服務建設情境

    • 資料加工:提供Embed、Chunk運算元,可以對非結構化資料加工成結構化資料存放區,無需使用外部演算法就能自動Embed。

    • 資料檢索和分析:提供ai_genai_summarize等運算元,能夠通過SQL對資料進行推理、問題總結及翻譯等操作。

  • Dynamic Table介紹:支援增量重新整理模式對非結構化資料自動加工,每次只計算增量的資料有效減少重複計算,降低資源使用率。

  • 向量檢索:支援標準SQL的向量檢索,用於非結構化資料的相似性搜尋、情境識別等,在同一個查詢中可以自由地實現向量和標量的檢索。

  • 全文檢索索引:通過倒排索引、分詞等機制實現對非結構化資料的高效檢索,支援關鍵詞匹配、短語檢索等豐富的檢索方式,實現更加靈活的檢索。

方案優勢

通過如上核心能力,在Hologres中多模態AI檢索與分析的核心優勢如下:

  • 完整的AI資料處理流程:涵蓋從資料Embed、Chunk、增量加工、檢索/分析的全流程,開發人員可以使用巨量資料系統一樣,輕鬆構建AI應用。

  • 標準SQL加工和分析非結構化資料:無需使用專用開發語言,純SQL就能完成非結構化資料提取、加工,也無需藉助外部系統,資料處理更加高效和簡單,降低開發人員學習成本。

  • 檢索更精準、靈活和智能:可以輕鬆構建“關鍵詞+語義+多模態”的混合檢索鏈路,覆蓋從精準搜尋到意圖理解的全情境需求。還能結合AI Function實現對使用者意圖的深度理解,語義關聯和上下文推理,實現更智能的檢索能力。

  • 資料不出庫,更安全:不需要將資料匯出到外部系統,與hologres的多種安全能力無縫整合,高效保護資料安全。

本實踐文檔將會介紹如何通過上訴核心能力在Hologres中對非結構化資料加工和檢索,助力搭建企業級多模態AI資料平台,打破資料孤島,釋放全域資料價值

方案流程

本次方案的流程如下:

  1. 資料集準備。

    金融資料集中的PDF檔案上傳至OSS儲存。

  2. PDF資料加工。

    使用Object Table讀取PDF的中繼資料資訊,然後建立增量重新整理的Dynamic Table,並對資料進行Embed和Chunk,同時也對Dynamic Table構建向量索引和全文索引,以便後續檢索可以使用索引的能力。

  3. 使用ai_embed運算元對將自然語言的問題進行Embedding,然後使用全文和向量雙路召回結果,並對結果進行排序,結合大模型的推理能力,最終輸出相似性最高的答案。

準備工作

  • 資料準備

    本文使用ModelScope公開的金融資料集中的PDF檔案夾中的檔案,共80份公司招股說明書。

  • 環境準備

    1. 購買Hologres V4.0及以上版本執行個體並建立資料庫

    2. 購買AI資源

      本文以large-96core-512GB-384GB、1個節點為例。

    3. 模型部署。本次方案使用的模型以及分配的資源為:

      模型名稱

      模型類別

      模型作用描述

      單副本CPU(Core)

      單副本記憶體

      單副本GPU

      資源副本數

      to_doc

      ds4sd/docling-models

      將PDF轉換成文檔。

      20

      100 GB

      1卡(48 GB)

      1

      chunk

      recursive-character-text-splitter

      文檔切片,每個PDF較大,建議使用切片。

      15

      30 GB

      0卡(0 GB)

      1

      pdf_embed

      BAAI/bge-base-zh-v1.5

      文檔Embedding。

      7

      30 GB

      1卡(96 GB)

      1

      llm

      Qwen/Qwen3-32B

      使用大模型對檢索出的文檔內容按照提示詞推理.

      7

      30 GB

      1卡(96 GB)

      1

      說明

      上述模型的資源均為預設分配的資源。

操作步驟

  1. 下載PDF檔案並上傳至OSS。

    1. 下載博金大模型挑戰賽-金融千問14b資料集中80份招股書(PDF)。

    2. 登入OSS管理主控台建立Bucket並將已下載的PDF檔案上傳至該Bucket路徑下。上傳操作詳情,請參見簡單上傳

  2. 帳號授權。

    1. 登入RAM控制台,建立阿里雲RAM角色並授予OSS的相關許可權。

      推薦授予AliyunOSSReadOnlyAccess許可權。

    2. 為上述阿里雲RAM角色添加登入和Hologres的存取權限。

      • 阿里雲帳號(主帳號)

        修改RAM角色的信任策略。重點需更新如下參數:

        • Action:更新為sts:AssumeRole

        • Service:更新為hologres.aliyuncs.com

        {
          "Statement": [
            {
              "Action": "sts:AssumeRole",
              "Effect": "Allow",
              "Principal": {
                "RAM": [
                  "acs:ram::1866xxxx:root"
                ],
                "Service": [
                  "hologres.aliyuncs.com"
                ]
              }
            }
          ],
          "Version": "1"
        }
      • RAM使用者(子帳號)

        1. 為RAM使用者授權。

          1. 許可權管理 > 權限原則頁面,單擊建立權限原則,並選擇指令碼編輯模式建立權限原則。具體操作,請參見建立自訂權限原則

            Hologres可通過該策略判斷當前RAM使用者是否具備建立對應RAM角色的許可權。權限原則內容如下。

            {
              "Version": "1",
              "Statement": [
                {
                  "Effect": "Allow",
                  "Action": "hologram:GrantAssumeRole",
                  "Resource": "<arn帳號>"
                }
              ]
            }
          2. 身份管理 > 使用者頁面,單擊目標RAM使用者操作列中的添加許可權,為RAM使用者(子帳號)授予上述步驟已建立的權限原則。具體操作,請參見為RAM使用者授權

        2. 為已建立的RAM角色授權。

          修改RAM角色的信任策略。重點需更新如下參數:

          • Action:更新為sts:AssumeRole

          • Service:更新為hologres.aliyuncs.com

          {
            "Statement": [
              {
                "Action": "sts:AssumeRole",
                "Effect": "Allow",
                "Principal": {
                  "RAM": [
                    "acs:ram::1866xxxx:root"
                  ],
                  "Service": [
                    "hologres.aliyuncs.com"
                  ]
                }
              }
            ],
            "Version": "1"
          }
  3. 對PDF檔案進行Embedding和Chunk。

    需要建立Object Table和Dynamic Table對PDF的中繼資料讀取以及加工。因為流程較長,Hologres直接將其封裝為預存程序。該預存程序包括的能力如下:

    • 會建立一張Object Table,用於取PDF的中繼資料。

    • 建立一張增量重新整理模式的Dynamic Table結果表,用於儲存加工後的資料。同時,該表需設定向量索引和全文索引,且Dynamic Table不設定自動重新整理,需要手動重新整理。

    • Dynamic Table的重新整理過程中會使用ai_embedai_chunk對資料進行Embed和切片。

    該預存程序代碼如下:

    CALL create_rag_corpus_from_oss(
        oss_path => 'oss://xxxx/bs_challenge_financial_14b_dataset/pdf',
        oss_endpoint => 'oss-cn-hangzhou-internal.aliyuncs.com',
        oss_role_arn => 'acs:ram::186xxxx:role/xxxx',
        corpus_table => 'public.dt_bs_challenge_financial'
    );
  4. 重新整理結果表。

    通過如上預存程序建立的Object Table和Dynamic Table均需手動重新整理,才能完成資料加工。該步驟已被封裝為PDF加工預存程序,該預存程序包括的能力如下:

    • 重新整理一次Object Table擷取PDF中繼資料

    • 重新整理一次Dynamic Table,進行PDF的Embedding和Chunk加工。

    該預存程序使用代碼如下:

    CALL refresh_rag_corpus_table(
        corpus_table => 'public.dt_bs_challenge_financial'
    );
  5. PDF檢索。

    加工好的資料可以根據業務使用情境,通過向量、全文等方式進行檢索。例如:可以根據招股書來查詢某個公司的業績走勢,以此來判斷公司後續的走勢是悲觀還是樂觀,以便對後續的投資意向提供輔助建議。

    向量檢索

    在向量檢索時,為了檢索方便,我們將問題Embedding和Prompt構建,大模型輸出答案等過程封裝成為向量檢索函數,直接調用如下該預存程序可以實現向量召回。

    -- 向量單路召回 + AI重排
    SELECT qa_vector_search_retrieval(
      question => '報告期內,湖南國科微電子股份有限公司2014年度、2015年度、2016年度營業收入和淨利潤分別較上年增長多大幅度?',
      corpus_table => 'dt_bs_challenge_financial',
      prompt => '請分析如下業績走勢是悲觀還是樂觀,並給出原因:${question}\n\n 參考資訊:\n\n ${context}'
    )

    檢索答案如下:

    qa_retrieval
    ---------
    "根據提供的資訊,對湖南國科微電子股份有限公司的業績走勢進行分析,可以得出以下結論:
    
    ### 一、業績走勢分析:悲觀
    
    #### 1. **營業收入增長乏力**
    - 2014年度營業收入較上年增長 **15.13%**,但2015年度營業收入卻 **下降5.21%**,2016年度資料未提供,但可以看出營業收入增長趨勢在2015年出現明顯下滑。
    - 2012年至2014年營業收入的年複合增長率僅為 **4.47%**,表明公司業務擴張較為緩慢。
    
    #### 2. **淨利潤增長持續下降**
    - 2014年度淨利潤增長 **5.43%**,2015年度淨利潤 **下降3.29%**。
    - 扣除非經常性損益後,2014年度歸屬於母公司股東的淨利潤增長 **-3.14%**,2015年度進一步下降 **-5.60%**,表明公司主營業務盈利能力在持續惡化。
    - 2012年至2014年扣除非經常性損益後淨利潤的年複合增長率為 **-4.38%**,遠低於營業收入增長,說明公司主營業務盈利能力不足,增長主要依賴非經常性損益。
    
    #### 3. **非經常性損益佔比偏高**
    - 報告期內,非經常性損益占淨利潤的比例較高,2014年、2013年、2012年分別為 **17.54%、10.25%、8.06%**,表明公司利潤中有一部分來自政策扶持、政府補貼等非經常性因素,而非核心業務的持續增長。
    - 依賴非經常性損益來維持利潤增長,不利於公司長期的穩定發展。
    
    #### 4. **淨資產收益率下降**
    - 加權平均淨資產收益率從2014年的 **18.10%** 下降到2015年的 **24.82%**,再到2016年的 **28.23%**,雖然資料看似增長,但需注意該指標是以扣除非經常性損益後的淨利潤計算的,而淨利潤本身在下降,因此這種增長可能與資本結構變化有關,而非盈利能力的實質性提升。
    
    ### 二、原因總結
    1. **主營業務增長乏力**:營業收入和淨利潤增長均呈現下降趨勢,尤其是淨利潤的下降表明公司盈利能力在減弱。
    2. **非經常性損益依賴度高**:公司利潤中非經常性損益佔比較高,說明主營業務的盈利能力不足,公司業績的持久性存疑。
    3. **市場競爭激烈**:公司採購的工控機、顯示器、電源等產品市場競爭激烈,價格平穩,利潤空間受到擠壓。
    4. **行業環境影響**:不鏽鋼市場價格波動、原材料價格波動可能對公司經營業績造成一定影響,雖然公司已採取措施降低影響,但長期來看仍需關注。
    
    ### 三、結論
    總體來看,湖南國科微電子股份有限公司的業績走勢偏 **悲觀**。公司主營業務增長乏力,淨利潤持續下降,對非經常性損益依賴度高,未來盈利能力的可持久性存疑。公司需要加強核心業務的競爭力,最佳化成本結構,提高主營業務盈利能力,以實現長期穩健發展。"

    全文檢索索引

    在全文檢索索引時,為了檢索方便,我們將問題Embedding和Prompt構建,大模型輸出答案等過程封裝為全文檢索索引函數,直接調用如下預存程序可以實現全文召回:

    --全文檢索索引召回
    SELECT qa_text_search_retrieval(
        question => '報告期內,湖南國科微電子股份有限公司2014年度、2015年度、2016年度營業收入和淨利潤分別較上年增長多大幅度?',
        corpus_table => 'dt_bs_challenge_financial',
        prompt => '請分析如下業績走勢是悲觀還是樂觀,並給出原因:${question}\n\n 參考資訊:\n\n ${context}'
    );

    檢索答案如下:

    qa_text_search_retrieval
    ----------------
    "根據提供的資訊,湖南國科微電子股份有限公司在2014年、2015年和2016年的業績走勢整體上呈現**悲觀**趨勢,具體原因如下:
    
    ### 1. **營業收入增長乏力**
    - 2014年營業收入增長率為**15.13%**,但2015年營業收入增長率轉為**-5.21%**,即出現負增長。
    - 2012年至2014年的營業收入年複合增長率僅為**4.47%**,說明公司營業收入增長較為緩慢,業務發展不夠強勁。
    - 2015年上半年營業收入預測與2014年同期相比大致持平,但2015年上半年淨利潤較上年同期**略有下降**,表明盈利能力下降。
    
    ### 2. **淨利潤和扣非淨利潤增長不佳**
    - 2014年淨利潤增長率為**5.43%**,2015年淨利潤增長率下降為**-3.29%**,即淨利潤出現下滑。
    - 扣除非經常性損益後的淨利潤增長率在2014年為**-3.14%**,2015年進一步下降為**-5.60%**,說明公司主營業務的盈利能力持續下降。
    - 2012年至2014年扣非淨利潤的年複合增長率為**-4.38%**,明顯低於營業收入的年複合增長率,說明公司利潤品質不高,主營業務盈利能力較弱。
    
    ### 3. **經營活動現金流波動**
    - 2014年銷售商品、提供勞務收到的現金占營業收入的比例較前兩年有所下降,主要與部分收入確認專案的**收款跨期**有關,說明公司現金流管理存在問題。
    - 2013年購買商品、接受勞務支付的現金占營業成本比例較高,主要是由於當年**採購原材料並完成生產**,但部分成本在2014年才結轉,導致2014年該比例較低,反映公司採購和生產節奏不夠穩定。
    
    ### 4. **投資和盈利能力指標**
    - 加權平均淨資產收益率(ROE)在2014年為**18.10%**,2015年上升至**24.82%**,2016年進一步上升至**28.23%**,雖然有所提升,但ROE的提高可能主要依賴**財務槓桿**,而非核心業務盈利能力的提升。
    - 考慮到淨利潤和扣非淨利潤持續下降,ROE的提升並不能完全反映公司經營品質的改善。
    
    ### 5. **2015年上半年業績預測**
    - 2015年上半年營業收入預計為**8,505萬元至10,395萬元**,與2014年同期的**10,127.35萬元**大致持平,但淨利潤預計為**2,340萬元至2,860萬元**,低於2014年同期的**2,912.66萬元**,說明公司盈利能力下降。
    
    ### 總結
    綜合來看,湖南國科微電子股份有限公司在2014年至2016年的業績走勢**偏向悲觀**。雖然ROE有所提升,但營業收入增長乏力、淨利潤和扣非淨利潤持續下降、經營活動現金流波動較大,表明公司主營業務盈利能力較弱,經營品質有待提升。"

    向量+全文混合檢索

    在向量、全文結合Rank排序混合檢索情境中,為了檢索方便,Hologres將其封裝為向量+全文雙路召回+Rank排序函數,該預存程序的能力如下:

    • 根據問題使用向量計算,召回TOP 20的答案。

    • 根據問題使用全文檢索索引,召回TOP 20的答案。

    • 使用ai_rank,對向量和全文召回的答案進行排序,最後輸出Top1的答案。

    • 使用ai_gen,結合大模型根據提示詞以及檢索的答案,產生最終答案並進行輸出。

    -- 全文、向量雙路召回 + AI重排
    SELECT qa_hybrid_retrieval(
        question => '報告期內,湖南國科微電子股份有限公司2014年度、2015年度、2016年度營業收入和淨利潤分別較上年增長多大幅度?',
        corpus_table => 'dt_bs_challenge_financial',
        prompt => '請分析如下業績走勢是悲觀還是樂觀,並給出原因:${question}\n\n 參考資訊:\n\n ${context}'
    );

    檢索答案如下:

    qa_hybrid_retrieval
    ---
    "根據提供的資訊,我們可以對湖南國科微電子股份有限公司的業績走勢進行如下分析,並判斷其趨勢是悲觀還是樂觀:
    
    ---
    
    ### 一、**營業收入走勢分析**
    1. **2012-2014年複合增長率**:
       - 營業收入的年複合增長率為 **4.47%**,表明公司營業收入的增長較為平穩。
       - 2014年營業收入為 **18,154.06萬元**,較2013年增長 **15.13%**。
       - 2015年營業收入較上年 **下降5.21%**,出現了負增長。
    
    2. **結論**:
       - 營業收入的增長在2014年有所回升,但2015年出現明顯下滑,表明公司業務擴張遇到了一定阻力。
    
    ---
    
    ### 二、**淨利潤走勢分析**
    1. **2012-2014年複合增長率**:
       - 扣除非經常性損益後的淨利潤年複合增長率為 **-4.38%**,低於營業收入的年複合增長率,說明公司盈利能力有所下降。
       - 2014年扣非淨利潤為 **42,731,071.18元**,較2013年下降 **3.14%**。
       - 2015年扣非淨利潤較上年進一步下降 **5.60%**。
    
    2. **非經常性損益影響**:
       - 2014年、2013年及2012年非經常性損益占淨利潤的比例分別為 **17.54%、10.25%、8.06%**,呈上升趨勢。
       - 非經常性損益的增加主要來自於政府補貼和理財產品收益,而非主營業務帶來的持續增長。
    
    3. **結論**:
       - 扣非淨利潤連續兩年下降,說明公司主營業務盈利能力減弱,業績增長依賴於非經常性損益,這是令人擔憂的訊號。
    
    ---
    
    ### 三、**現金流量與經營穩定性**
    1. **經營活動現金流**:
       - 2014年營業收入為 **18,154.06萬元**,但銷售商品、提供勞務收到的現金並未明確給出,無法判斷現金流是否健康。
       - 報告期內,公司銀行存款分別為 **13,063.38萬元、4,152.54萬元、9,864.61萬元**,資金流動性波動較大,但主要客戶、供應商及經營模式保持穩定。
    
    2. **結論**:
       - 雖然公司現金流存在波動,但客戶和供應商穩定,經營模式未發生重大變化,這為公司未來的發展提供了一定保障。
    
    ---
    
    ### 四、**2015年上半年業績預測**
    - 2015年1至6月預計營業收入為 **8,505萬元至10,395萬元**,較2014年同期的 **4,641.19萬元**有明顯增長。
    - 但2015年1至3月淨利潤較上年同期下降 **48.26%**,主要是因為確認收入的專案毛利率較低。
    
    ---
    
    ### 五、**綜合分析與判斷**
    1. **樂觀因素**:
       - 2014年營業收入增長較快,達到 **15.13%**。
       - 2015年上半年營業收入預計增長顯著,表明公司可能正在逐步恢複。
       - 主要客戶、供應商及經營模式保持穩定,為公司提供了良好的營運基礎。
    
    2. **悲觀因素**:
       - 2015年營業收入較上年 **下降5.21%**,淨利潤也出現下滑。
       - 扣非淨利潤連續兩年下降,表明公司主營業務盈利能力不足。
       - 非經常性損益佔比上升,業績增長依賴於政府補貼和理財產品收益,缺乏內生增長動力。
       - 2015年1至3月淨利潤大幅下滑 **48.26%**,表明短期業績波動較大。
    
    ---
    
    ### **最終結論:整體趨勢偏悲觀**
    - 儘管公司在2014年營業收入有所回升,且2015年上半年預計增長,但 **扣非淨利潤連續下降**、**淨利潤增長依賴非經常性損益**、**短期業績波動較大**,表明公司目前的業績增長缺乏持久性和穩定性。
    - 因此,從長期來看,公司業績走勢偏 **悲觀**,需關注其主營業務盈利能力的改善和非經常性損益的依賴問題。
    
    ---
    
    ### **建議**
    1. 關注公司未來主營業務的盈利能力是否能有所提升。
    2. 降低對非經常性損益的依賴,提高內生增長動力。
    3. 穩定客戶和供應商關係,最佳化業務結構,提高毛利率。"

    向量+全文雙路召回+RRF排序

    使用向量和全文檢索索引後,通過RRF (Reciprocal Rank Fusion)排序召回結果。為了檢索方便,Hologres已經封裝為向量+全文雙路召回+RRF排序函數(詳細定義見下方附錄),該預存程序的能力如下:

    • 根據問題使用向量計算,召回TOP 20的答案。

    • 根據問題使用全文檢索索引,召回TOP 20的答案。

    • 對向量和全文召回的答案,計算RRF分數,最後輸出Top N的答案。

    • 使用ai_gen、大模型根據提示詞,以及檢索的答案,拼裝成最終答案並輸出。

    -- 全文、向量雙路召回 + RRF重排
    SELECT qa_hybrid_retrieval_rrf(
        question => '報告期內,湖南國科微電子股份有限公司2014年度、2015年度、2016年度營業收入和淨利潤分別較上年增長多大幅度?',
        corpus_table => 'dt_bs_challenge_financial',
        prompt => '請分析如下業績走勢是悲觀還是樂觀,並給出原因:${question}\n\n 參考資訊:\n\n ${context}'
    );

    檢索答案如下:

    qa_hybrid_retrieval_rrf
    ------------------
    "根據提供的資訊,對湖南國科微電子股份有限公司的業績走勢進行分析,可以得出以下結論:
    
    ### **業績走勢判斷:悲觀**
    #### **原因分析如下:**
    
    1. **淨利潤增長低於營業收入增長:**
       - 提供的資訊指出,公司2012年至2014年的**營業收入年複合增長率為4.47%**,表明公司整體業務增長較為平穩。
       - 但**扣除非經常性損益後歸屬於母公司股東的淨利潤年複合增長率為-4.38%**,明顯低於營業收入的增長率。這說明公司在收入增長的同時,盈利能力並未同步提升,甚至出現下滑,可能受到成本上升、毛利率下降或非經常性損益減少等因素影響。
    
    2. **淨利潤波動較大:**
       - 2015年1至3月的淨利潤較上年同期減少了48.26%,且主要原因在於確認收入的專案毛利率較低(如無錫地鐵一號線專案以模組外購為主)。這表明公司短期業績容易受到業務結構變化的影響,存在一定的不穩定性。
    
    3. **毛利率和盈利能力下降:**
       - 提到“主營業務利潤對公司淨利潤的貢獻”在2014年較2013年略有下降,而2013年又較2012年下降。這說明公司核心業務的盈利能力可能在減弱,可能受到市場競爭加劇、成本上升或產品結構變化的影響。
    
    4. **2015年上半年預測利潤下降:**
       - 2015年1至6月預計營業收入為8,505萬元至10,395萬元,與2014年同期大致持平,但淨利潤預計為2,340萬元至2,860萬元,低於2014年同期的2,912.66萬元。這表明公司盈利能力在進一步下降,可能面臨一定的經營壓力。
    
    5. **業務增長乏力:**
       - 雖然營業收入增長較為平穩,但淨利潤的下降表明公司業務增長的品質不高,未能有效轉化為利潤。這可能影響投資者對公司未來發展的信心。
    
    ### **總結:**
    湖南國科微電子股份有限公司的業績走勢整體偏向**悲觀**。雖然營業收入保持了平穩增長,但淨利潤的增長明顯滯後甚至出現負增長,表明公司盈利能力在下降,業務發展品質不高,且存在短期業績波動的風險。如果公司不能有效提升毛利率、控製成本或最佳化產品結構,未來業績可能繼續承壓。"

附錄:預存程序定義

上述文檔中使用的預存程序定義如下,方便您做參考。

說明

在Hologres中不支援建立Function,如下預存程序僅做參考,無法修改後直接執行。

PDF加工預存程序

  • 建立Object Table和Dynamic Table

    CREATE OR REPLACE PROCEDURE create_rag_corpus_from_oss(
        oss_path TEXT,
        oss_endpoint TEXT,
        oss_role_arn TEXT,
        corpus_table TEXT,
        embedding_model TEXT DEFAULT NULL,
        parse_document_model TEXT DEFAULT NULL,
        chunk_model TEXT DEFAULT NULL,
        chunk_size INT DEFAULT 300,
        chunk_overlap INT DEFAULT 50,
        overwrite BOOLEAN DEFAULT FALSE
    )
    AS $$
    DECLARE
        corpus_schema TEXT;
        corpus_name TEXT;
        obj_table_name TEXT;
        full_corpus_ident TEXT;
        full_obj_ident TEXT;
        embed_expr TEXT;
        chunk_expr TEXT;
        parse_expr TEXT;
        embedding_dims INT;
    BEGIN
        -- 1. 拆 schema name + table name
        IF position('.' in corpus_table) > 0 THEN
            corpus_schema := split_part(corpus_table, '.', 1);
            corpus_name   := split_part(corpus_table, '.', 2);
        ELSE
            corpus_schema := 'public';
            corpus_name   := corpus_table;
        END IF;
    
        obj_table_name := corpus_name || '_obj_table';
    
        full_corpus_ident := format('%I.%I', corpus_schema, corpus_name);
        full_obj_ident    := format('%I.%I', corpus_schema, obj_table_name);
        
        -- 2. 如果需要覆蓋,先刪表和索引
        IF overwrite THEN
            DECLARE
                dyn_table_exists BOOLEAN;
                rec RECORD;
            BEGIN
                -- 檢查 dynamic table 是否存在
                SELECT EXISTS (
                    SELECT 1
                    FROM pg_class c
                    JOIN pg_namespace n ON n.oid = c.relnamespace
                    WHERE c.relname = corpus_name
                    AND n.nspname = corpus_schema
                )
                INTO dyn_table_exists;
    
                IF dyn_table_exists THEN
                    -- 2.1 關閉動態表自動重新整理
                    -- RAISE NOTICE 'Disabling auto refresh for %', full_corpus_ident;
                    -- EXECUTE format('ALTER TABLE IF EXISTS %s SET (auto_refresh_enable=false)', full_corpus_ident);
    
                    -- 2.2 尋找 RUNNING 重新整理任務並取消
                    FOR rec IN
                        EXECUTE format(
                            $f$
                            SELECT query_job_id
                                FROM hologres.hg_dynamic_table_refresh_log(%L)
                                WHERE status = 'RUNNING';
                            $f$,
                            corpus_table
                        )
                    LOOP
                        RAISE NOTICE 'Found running refresh job: %', rec.query_job_id;
                        IF hologres.hg_internal_cancel_query_job(rec.query_job_id::bigint) THEN
                            RAISE NOTICE 'Cancel job % succeeded.', rec.query_job_id;
                        ELSE
                            RAISE WARNING 'Cancel job % failed.', rec.query_job_id;
                        END IF;
                    END LOOP;
    
                    -- 2.3 刪除 Dynamic Table
                    EXECUTE format('DROP TABLE IF EXISTS %s;', full_corpus_ident);
                ELSE
                    RAISE NOTICE 'Dynamic table % does not exist, skip cancel job and drop.', full_corpus_ident;
                END IF;
    
                -- 2.4 無論如何,Object Table 都要刪除
                EXECUTE format('DROP OBJECT TABLE IF EXISTS %s;', full_obj_ident);
            END;
        END IF;
    
        -- 3. 建立 Object Table
        RAISE NOTICE 'Create object table: %', obj_table_name;
        EXECUTE format(
            $f$
            CREATE OBJECT TABLE %s
            WITH (
                path = %L,
                oss_endpoint = %L,
                role_arn = %L
            );
            $f$,
            full_obj_ident,
            oss_path,
            oss_endpoint,
            oss_role_arn
        );
    
        COMMIT;
    
        -- 4. 重新整理 Object Table
        RAISE NOTICE 'Refresh object table: %', obj_table_name;
        EXECUTE format('REFRESH OBJECT TABLE %s;', full_obj_ident);
    
        COMMIT;
    
        -- 5. 文檔解析模型選擇
        IF parse_document_model IS NULL OR length(trim(parse_document_model)) = 0 THEN
            parse_expr := 'ai_parse_document(file, ''auto'', ''markdown'')';
        ELSE
            parse_expr := format(
                'ai_parse_document(%L, file, ''auto'', ''markdown'')',
                parse_document_model
            );
        END IF;
    
        -- 6. chunk 模型選擇
        IF chunk_model IS NULL OR length(trim(chunk_model)) = 0 THEN
            chunk_expr := format('ai_chunk(doc, %s, %s)', chunk_size, chunk_overlap);
        ELSE
            chunk_expr := format(
                'ai_chunk(%L, doc, %s, %s)',
                chunk_model,
                chunk_size,
                chunk_overlap
            );
        END IF;
    
        -- 7. embedding 模型選擇
        IF embedding_model IS NULL OR length(trim(embedding_model)) = 0 THEN
            embed_expr := 'ai_embed(chunk)';
    
            EXECUTE 'SELECT array_length(ai_embed(''dummy''), 1)'
            INTO embedding_dims;
        ELSE
            embed_expr := format('ai_embed(%L, chunk)', embedding_model);
    
            EXECUTE format(
                'SELECT array_length(ai_embed(%L, ''dummy''), 1)',
                embedding_model
            )
            INTO embedding_dims;
        END IF;
    
        RAISE NOTICE 'embedding dimension is: %', embedding_dims;
    
        -- 8. 建立 RAG 輸出動態表
        RAISE NOTICE 'create dynamic table: %', corpus_name;
        EXECUTE format(
            $f$
            CREATE DYNAMIC TABLE %s(
                CHECK(array_ndims(embedding_vector) = 1 AND array_length(embedding_vector, 1) = %s)
            )
            WITH (
                vectors = '{
                  "embedding_vector": {
                    "algorithm": "HGraph",
                    "distance_method": "Cosine",
                    "builder_params": {
                    "base_quantization_type": "sq8_uniform",
                    "max_degree": 64,
                    "ef_construction": 400,
                    "precise_quantization_type": "fp32",
                    "use_reorder": true
                    }
                  }
                }',
                auto_refresh_mode = 'incremental',
                freshness = '5 minutes',
                auto_refresh_enable = 'false'
            ) AS
            WITH parsed_doc AS (
                SELECT object_uri,
                       etag,
                       %s AS doc
                  FROM %s
            ),
            chunked_doc AS (
                SELECT object_uri,
                       etag,
                       unnest(%s) AS chunk
                  FROM parsed_doc
            )
            SELECT
                object_uri,
                etag,
                chunk,
                %s AS embedding_vector
              FROM chunked_doc;
            $f$,
            full_corpus_ident,
            embedding_dims,
            parse_expr,
            full_obj_ident,
            chunk_expr,
            embed_expr
        );
        COMMIT;
    
        -- 9. 建立全文索引(索引名 = 表名 || '_fulltext_idx')
        EXECUTE format(
            'CREATE INDEX %I ON %s USING FULLTEXT (chunk);',
            corpus_name || '_fulltext_idx',
            full_corpus_ident
        );
    
        RAISE NOTICE '';
        RAISE NOTICE 'Create RAG corpus success to table: %', corpus_table;
        RAISE NOTICE '    Vector index is: %.embedding_vector', corpus_table;
        RAISE NOTICE '    TextSearch index is: %.chunk', corpus_table;
    END;
    $$ LANGUAGE plpgsql;
  • 重新整理Object Table和Dynamic Table預存程序

    CREATE OR REPLACE PROCEDURE refresh_rag_corpus_table(
        corpus_table TEXT
    )
    AS $$
    DECLARE
        corpus_schema TEXT;
        corpus_name   TEXT;
        obj_table_name TEXT;
        full_corpus_ident TEXT;
        full_obj_ident    TEXT;
    BEGIN
        -- 1. 解析 schema 和表名
        IF position('.' in corpus_table) > 0 THEN
            corpus_schema := split_part(corpus_table, '.', 1);
            corpus_name   := split_part(corpus_table, '.', 2);
        ELSE
            corpus_schema := 'public';
            corpus_name   := corpus_table;
        END IF;
    
        obj_table_name := corpus_name || '_obj_table';
    
        full_corpus_ident := format('%I.%I', corpus_schema, corpus_name);
        full_obj_ident    := format('%I.%I', corpus_schema, obj_table_name);
    
        -- 2. 重新整理 Object Table
        RAISE NOTICE 'Refreshing Object Table: %', obj_table_name;
        EXECUTE format('REFRESH OBJECT TABLE %s;', full_obj_ident);
    
        -- 3. 重新整理 Dynamic Table
        RAISE NOTICE 'Refreshing Dynamic Table: %', corpus_name;
        EXECUTE format('REFRESH TABLE %s;', full_corpus_ident);
    
        RAISE NOTICE 'Refresh complete for corpus table %', corpus_table;
    END;
    $$ LANGUAGE plpgsql;
  • 刪除Object Table和Dynamic Table預存程序

    CREATE OR REPLACE PROCEDURE drop_rag_corpus_table(
        corpus_table TEXT
    )
    AS $$
    DECLARE
        corpus_schema TEXT;
        corpus_name   TEXT;
        obj_table_name TEXT;
        full_corpus_ident TEXT;
        full_obj_ident    TEXT;
        rec RECORD;
    BEGIN
        -- 1. 解析 schema 和表名
        IF position('.' in corpus_table) > 0 THEN
            corpus_schema := split_part(corpus_table, '.', 1);
            corpus_name   := split_part(corpus_table, '.', 2);
        ELSE
            corpus_schema := 'public';
            corpus_name   := corpus_table;
        END IF;
    
        obj_table_name := corpus_name || '_obj_table';
    
        full_corpus_ident := format('%I.%I', corpus_schema, corpus_name);
        full_obj_ident    := format('%I.%I', corpus_schema, obj_table_name);
    
        -- 2. 刪除表
        -- 2.1 關閉動態表自動重新整理
        -- RAISE NOTICE 'Disabling auto refresh for %', full_corpus_ident;
        -- EXECUTE format('ALTER TABLE IF EXISTS %s SET (auto_refresh_enable=false)', full_corpus_ident);
    
        -- 2.2 尋找 RUNNING 重新整理任務並取消
        FOR rec IN
            EXECUTE format(
                $f$
                SELECT query_job_id
                    FROM hologres.hg_dynamic_table_refresh_log(%L)
                    WHERE status = 'RUNNING';
                $f$,
                corpus_table
            )
        LOOP
            RAISE NOTICE 'Found running refresh job: %', rec.query_job_id;
            IF hologres.hg_internal_cancel_query_job(rec.query_job_id::bigint) THEN
                RAISE NOTICE 'Cancel job % succeeded.', rec.query_job_id;
            ELSE
                RAISE WARNING 'Cancel job % failed.', rec.query_job_id;
            END IF;
        END LOOP;
    
        -- 2.3 刪除 Dynamic Table
        RAISE NOTICE 'Dropping Dynamic Table: %', corpus_name;
        EXECUTE format('DROP TABLE IF EXISTS %s;', full_corpus_ident);
    
        -- 2.4 刪除 Object Table
        RAISE NOTICE 'Dropping Object Table: %', obj_table_name;
        EXECUTE format('DROP OBJECT TABLE IF EXISTS %s;', full_obj_ident);
    
        RAISE NOTICE 'Drop complete for corpus: %', corpus_table;
    END;
    $$ LANGUAGE plpgsql;

向量檢索函數

-- RAG向量單路召回問答
CREATE OR REPLACE FUNCTION qa_vector_search_retrieval(
    question TEXT,
    corpus_table TEXT,
    embedding_model TEXT DEFAULT NULL,
    llm_model TEXT DEFAULT NULL,
    ranking_model TEXT DEFAULT NULL,
    prompt TEXT DEFAULT 'Please answer the following question in ${language} based on the reference information.\n\n Question: ${question}\n\n Reference information:\n\n ${context}',
    language TEXT DEFAULT 'Chinese',
    vector_recall_count INT DEFAULT 20,
    rerank_recall_count INT DEFAULT 5,
    vector_col TEXT DEFAULT 'embedding_vector'
)
RETURNS TEXT AS
$$
DECLARE
    final_answer TEXT;
    sql TEXT;
    embedding_expr TEXT;
    ai_rank_expr TEXT;
    ai_gen_expr TEXT;
    embedding_model_valid BOOLEAN;
    llm_model_valid BOOLEAN;
    ranking_model_valid BOOLEAN;
BEGIN
    embedding_model_valid := (embedding_model IS NOT NULL AND trim(embedding_model) != '');
    llm_model_valid := (llm_model IS NOT NULL AND trim(llm_model) != '');
    ranking_model_valid := (ranking_model IS NOT NULL AND trim(ranking_model) != '');

    IF embedding_model_valid THEN
        embedding_expr := 'ai_embed(' || quote_literal(embedding_model) || ', ' || quote_literal(question) || ')';
    ELSE
        embedding_expr := 'ai_embed(' || quote_literal(question) || ')';
    END IF;

    IF ranking_model_valid THEN
        ai_rank_expr := 'ai_rank(' || quote_literal(ranking_model) || ', ' || quote_literal(question) || ', chunk)';
    ELSE
        ai_rank_expr := 'ai_rank(' || quote_literal(question) || ', chunk)';
    END IF;

    IF llm_model_valid THEN
        ai_gen_expr := 'ai_gen(' || quote_literal(llm_model) ||
            ', replace(replace(replace(' || quote_literal(prompt) ||
               ', ''${question}'', ' || quote_literal(question) || '), ''${context}'', merged_chunks), ''${language}'', ' || quote_literal(language) || ') )';
    ELSE
        ai_gen_expr := 'ai_gen(replace(replace(replace(' || quote_literal(prompt) ||
               ', ''${question}'', ' || quote_literal(question) || '), ''${context}'', merged_chunks), ''${language}'', ' || quote_literal(language) || '))';
    END IF;

    sql := '
      WITH
        embedding_recall AS (
            SELECT
              chunk,
              approx_cosine_distance(' || vector_col || ', ' || embedding_expr || ') AS distance
            FROM
              ' || corpus_table || '
            ORDER BY
              distance DESC
            LIMIT ' || vector_recall_count || '
        ),
        rerank AS (
            SELECT
              chunk,
              ' || ai_rank_expr || ' AS score
            FROM
              embedding_recall
            ORDER BY
              score DESC
            LIMIT ' || rerank_recall_count || '
        ),
        concat_top_chunks AS (
            SELECT string_agg(chunk, E''\n\n----\n\n'') AS merged_chunks FROM rerank
        )
      SELECT ' || ai_gen_expr || '
      FROM concat_top_chunks;
    ';

    EXECUTE sql INTO final_answer;
    RETURN final_answer;
END;
$$ LANGUAGE plpgsql;

全文檢索索引函數

CREATE OR REPLACE FUNCTION qa_text_search_retrieval(
    question TEXT,
    corpus_table TEXT,
    llm_model TEXT DEFAULT NULL,
    ranking_model TEXT DEFAULT NULL,
    prompt TEXT DEFAULT 'Please answer the following question in ${language} based on the reference information.\n\n Question: ${question}\n\n Reference information:\n\n ${context}',
    language TEXT DEFAULT 'Chinese',
    text_search_recall_count INT DEFAULT 20,
    rerank_recall_count INT DEFAULT 5,
    text_search_col TEXT DEFAULT 'chunk'
)
RETURNS TEXT AS
$$
DECLARE
    final_answer TEXT;
    sql TEXT;
    ai_rank_expr TEXT;
    ai_gen_expr TEXT;
    llm_model_valid BOOLEAN;
    ranking_model_valid BOOLEAN;
BEGIN
    llm_model_valid     := (llm_model IS NOT NULL AND trim(llm_model) != '');
    ranking_model_valid := (ranking_model IS NOT NULL AND trim(ranking_model) != '');

    IF ranking_model_valid THEN
        ai_rank_expr := 'ai_rank(' || quote_literal(ranking_model) || ', ' || quote_literal(question) || ', chunk)';
    ELSE
        ai_rank_expr := 'ai_rank(' || quote_literal(question) || ', chunk)';
    END IF;

    IF llm_model_valid THEN
        ai_gen_expr := 'ai_gen(' || quote_literal(llm_model) ||
            ', replace(replace(replace(' || quote_literal(prompt) ||
               ', ''${question}'', ' || quote_literal(question) ||
               '), ''${context}'', merged_chunks), ''${language}'', ' || quote_literal(language) || ') )';
    ELSE
        ai_gen_expr := 'ai_gen(replace(replace(replace(' || quote_literal(prompt) ||
               ', ''${question}'', ' || quote_literal(question) ||
               '), ''${context}'', merged_chunks), ''${language}'', ' || quote_literal(language) || '))';
    END IF;

    sql := '
      WITH
        text_search_recall AS (
            SELECT
              chunk
            FROM
              ' || corpus_table || '
            ORDER BY
              text_search(' || text_search_col || ', ' || quote_literal(question) || ') DESC
            LIMIT ' || text_search_recall_count || '
        ),
        rerank AS (
            SELECT
              chunk,
              ' || ai_rank_expr || ' AS score
            FROM
              text_search_recall
            ORDER BY
              score DESC
            LIMIT ' || rerank_recall_count || '
        ),
        concat_top_chunks AS (
            SELECT string_agg(chunk, E''\n\n----\n\n'') AS merged_chunks FROM rerank
        )
      SELECT ' || ai_gen_expr || '
      FROM concat_top_chunks;
    ';

    EXECUTE sql INTO final_answer;
    RETURN final_answer;
END;
$$ LANGUAGE plpgsql;

向量+全文雙路召回+Rank排序函數

CREATE OR REPLACE FUNCTION qa_hybrid_retrieval(
    question TEXT,
    corpus_table TEXT,
    embedding_model TEXT DEFAULT NULL,
    llm_model TEXT DEFAULT NULL,
    ranking_model TEXT DEFAULT NULL,
    prompt TEXT DEFAULT 'Please answer the following question in ${language} based on the reference information.\n\n Question: ${question}\n\n Reference information:\n\n ${context}',
    language TEXT DEFAULT 'Chinese',
    text_search_recall_count INT DEFAULT 20,
    vector_recall_count INT DEFAULT 20,
    rerank_recall_count INT DEFAULT 5,
    vector_col TEXT DEFAULT 'embedding_vector',
    text_search_col TEXT DEFAULT 'chunk'
)
RETURNS TEXT AS
$$
DECLARE
    final_answer TEXT;
    sql TEXT;
    embedding_expr TEXT;
    ai_rank_expr TEXT;
    ai_gen_expr TEXT;
    embedding_model_valid BOOLEAN;
    llm_model_valid BOOLEAN;
    ranking_model_valid BOOLEAN;
BEGIN
    embedding_model_valid := (embedding_model IS NOT NULL AND trim(embedding_model) != '');
    llm_model_valid       := (llm_model IS NOT NULL AND trim(llm_model) != '');
    ranking_model_valid   := (ranking_model IS NOT NULL AND trim(ranking_model) != '');

    IF embedding_model_valid THEN
        embedding_expr := 'ai_embed(' || quote_literal(embedding_model) || ', ' || quote_literal(question) || ')';
    ELSE
        embedding_expr := 'ai_embed(' || quote_literal(question) || ')';
    END IF;

    IF ranking_model_valid THEN
        ai_rank_expr := 'ai_rank(' || quote_literal(ranking_model) || ', ' || quote_literal(question) || ', chunk)';
    ELSE
        ai_rank_expr := 'ai_rank(' || quote_literal(question) || ', chunk)';
    END IF;

    IF llm_model_valid THEN
        ai_gen_expr := 'ai_gen(' || quote_literal(llm_model) ||
            ', replace(replace(replace(' || quote_literal(prompt) ||
               ', ''${question}'', ' || quote_literal(question) || '), ''${context}'', merged_chunks), ''${language}'', ' || quote_literal(language) || ') )';
    ELSE
        ai_gen_expr := 'ai_gen(replace(replace(replace(' || quote_literal(prompt) ||
               ', ''${question}'', ' || quote_literal(question) || '), ''${context}'', merged_chunks), ''${language}'', ' || quote_literal(language) || '))';
    END IF;

    sql := '
      WITH
        embedding_recall AS (
            SELECT
              chunk
            FROM
              ' || corpus_table || '
            ORDER BY
              approx_cosine_distance(' || vector_col || ', ' || embedding_expr || ') DESC
            LIMIT ' || vector_recall_count || '
        ),
        text_search_recall AS (
            SELECT
              chunk
            FROM
              ' || corpus_table || '
            ORDER BY
              text_search(' || text_search_col || ', ' || quote_literal(question) || ') DESC
            LIMIT ' || text_search_recall_count || '
        ),
        union_recall AS (
            SELECT chunk FROM embedding_recall
            UNION
            SELECT chunk FROM text_search_recall
        ),
        rerank AS (
            SELECT
              chunk,
              ' || ai_rank_expr || ' AS score
            FROM
              union_recall
            ORDER BY
              score DESC
            LIMIT ' || rerank_recall_count || '
        ),
        concat_top_chunks AS (
            SELECT string_agg(chunk, E''\n\n----\n\n'') AS merged_chunks FROM rerank
        )
      SELECT ' || ai_gen_expr || '
      FROM concat_top_chunks;
    ';

    EXECUTE sql INTO final_answer;
    RETURN final_answer;
END;
$$ LANGUAGE plpgsql;

向量+全文雙路召回+RRF排序函數

CREATE OR REPLACE FUNCTION qa_hybrid_retrieval_rrf(
    question TEXT,
    corpus_table TEXT,
    embedding_model TEXT DEFAULT NULL,
    llm_model TEXT DEFAULT NULL,
    ranking_model TEXT DEFAULT NULL,
    prompt TEXT DEFAULT 'Please answer the following question in ${language} based on the reference information.\n\n Question: ${question}\n\n Reference information:\n\n ${context}',
    language TEXT DEFAULT 'Chinese',
    text_search_recall_count INT DEFAULT 20,
    vector_recall_count INT DEFAULT 20,
    rerank_recall_count INT DEFAULT 5,
    rrf_k INT DEFAULT 60,
    vector_col TEXT DEFAULT 'embedding_vector',
    text_search_col TEXT DEFAULT 'chunk'
)
RETURNS TEXT AS
$$
DECLARE
    final_answer TEXT;
    sql TEXT;
    embedding_expr TEXT;
    ai_rank_expr TEXT;
    ai_gen_expr TEXT;
    embedding_model_valid BOOLEAN;
    llm_model_valid BOOLEAN;
    ranking_model_valid BOOLEAN;
BEGIN
    embedding_model_valid := (embedding_model IS NOT NULL AND trim(embedding_model) <> '');
    llm_model_valid       := (llm_model IS NOT NULL AND trim(llm_model) <> '');
    ranking_model_valid   := (ranking_model IS NOT NULL AND trim(ranking_model) <> '');

    IF embedding_model_valid THEN
        embedding_expr := 'ai_embed(' || quote_literal(embedding_model) || ', ' || quote_literal(question) || ')';
    ELSE
        embedding_expr := 'ai_embed(' || quote_literal(question) || ')';
    END IF;

    IF ranking_model_valid THEN
        ai_rank_expr := 'ai_rank(' || quote_literal(ranking_model) || ', ' || quote_literal(question) || ', chunk)';
    ELSE
        ai_rank_expr := 'ai_rank(' || quote_literal(question) || ', chunk)';
    END IF;

    IF llm_model_valid THEN
        ai_gen_expr := 'ai_gen(' || quote_literal(llm_model) ||
            ', replace(replace(replace(' || quote_literal(prompt) ||
                ', ''${question}'', ' || quote_literal(question) || '), ''${context}'', merged_chunks), ''${language}'', ' || quote_literal(language) || ') )';
    ELSE
        ai_gen_expr := 'ai_gen(replace(replace(replace(' || quote_literal(prompt) ||
                ', ''${question}'', ' || quote_literal(question) || '), ''${context}'', merged_chunks), ''${language}'', ' || quote_literal(language) || '))';
    END IF;

    sql := '
      WITH embedding_recall AS (
        SELECT
          chunk,
          vec_score,
          ROW_NUMBER() OVER (ORDER BY vec_score DESC) AS rank_vec
        FROM (
          SELECT
            chunk,
            approx_cosine_distance(' || vector_col || ', ' || embedding_expr || ') AS vec_score
          FROM
            ' || corpus_table || '
        ) t
        ORDER BY vec_score DESC
        LIMIT ' || vector_recall_count || '
      ),
      text_search_recall AS (
        SELECT
          chunk,
          text_score,
          ROW_NUMBER() OVER (ORDER BY text_score DESC) AS rank_text
        FROM (
          SELECT
            chunk,
            text_search(' || text_search_col || ', ' || quote_literal(question) || ') AS text_score
          FROM
            ' || corpus_table || '
        ) ts
        WHERE text_score > 0
        ORDER BY text_score DESC
        LIMIT ' || text_search_recall_count || '
      ),
      rrf_scores AS (
        SELECT
          chunk,
          SUM(1.0 / (' || rrf_k || ' + rank_val)) AS rrf_score
        FROM (
          SELECT chunk, rank_vec AS rank_val FROM embedding_recall
          UNION ALL
          SELECT chunk, rank_text AS rank_val FROM text_search_recall
        ) sub
        GROUP BY chunk
      ),
      top_chunks AS (
        SELECT chunk
        FROM rrf_scores
        ORDER BY rrf_score DESC
        LIMIT ' || rerank_recall_count || '
      ),
      concat_top_chunks AS (
        SELECT string_agg(chunk, E''\n\n----\n\n'') AS merged_chunks FROM top_chunks
      )
      SELECT ' || ai_gen_expr || '
      FROM concat_top_chunks;
    ';

    EXECUTE sql INTO final_answer;
    RETURN final_answer;
END;
$$ LANGUAGE plpgsql;