在資料驅動時代,非結構化資料(文本、映像、音視頻、日誌等)與結構化、半結構化資料(JSON)共同構成企業的核心資料資產。其中,非結構化資料以更原始、多元的形態蘊含著海量的業務洞察(如使用者反饋、合約條款、產品缺陷映像),本文將會類比金融情境中對招股書、合約等PDF檔案的檢索與分析,以輔助業務進行下一步的精細化營運決策。
核心能力介紹
本最佳實務主要是對PDF非結構化資料的處理與檢索,包含的主要能力如下:
非結構化資料(Object Table):支援通過表的形式讀取OSS中非結構化資料(PDF/IMAGE/PPT等)。
AI Function:在Hologres中可以用標準SQL的方式調用AI Function,自動調用內建大模型,完成AI服務建設情境
資料加工:提供Embed、Chunk運算元,可以對非結構化資料加工成結構化資料存放區,無需使用外部演算法就能自動Embed。
資料檢索和分析:提供
ai_gen、ai_summarize等運算元,能夠通過SQL對資料進行推理、問題總結及翻譯等操作。
Dynamic Table介紹:支援增量重新整理模式對非結構化資料自動加工,每次只計算增量的資料有效減少重複計算,降低資源使用率。
向量檢索:支援標準SQL的向量檢索,用於非結構化資料的相似性搜尋、情境識別等,在同一個查詢中可以自由地實現向量和標量的檢索。
全文檢索索引:通過倒排索引、分詞等機制實現對非結構化資料的高效檢索,支援關鍵詞匹配、短語檢索等豐富的檢索方式,實現更加靈活的檢索。
方案優勢
通過如上核心能力,在Hologres中多模態AI檢索與分析的核心優勢如下:
完整的AI資料處理流程:涵蓋從資料Embed、Chunk、增量加工、檢索/分析的全流程,開發人員可以使用巨量資料系統一樣,輕鬆構建AI應用。
標準SQL加工和分析非結構化資料:無需使用專用開發語言,純SQL就能完成非結構化資料提取、加工,也無需藉助外部系統,資料處理更加高效和簡單,降低開發人員學習成本。
檢索更精準、靈活和智能:可以輕鬆構建“關鍵詞+語義+多模態”的混合檢索鏈路,覆蓋從精準搜尋到意圖理解的全情境需求。還能結合AI Function實現對使用者意圖的深度理解,語義關聯和上下文推理,實現更智能的檢索能力。
資料不出庫,更安全:不需要將資料匯出到外部系統,與hologres的多種安全能力無縫整合,高效保護資料安全。
本實踐文檔將會介紹如何通過上訴核心能力在Hologres中對非結構化資料加工和檢索,助力搭建企業級多模態AI資料平台,打破資料孤島,釋放全域資料價值
方案流程
本次方案的流程如下:
資料集準備。
將金融資料集中的PDF檔案上傳至OSS儲存。
PDF資料加工。
使用Object Table讀取PDF的中繼資料資訊,然後建立增量重新整理的Dynamic Table,並對資料進行Embed和Chunk,同時也對Dynamic Table構建向量索引和全文索引,以便後續檢索可以使用索引的能力。
使用
ai_embed運算元對將自然語言的問題進行Embedding,然後使用全文和向量雙路召回結果,並對結果進行排序,結合大模型的推理能力,最終輸出相似性最高的答案。
準備工作
資料準備
本文使用ModelScope公開的金融資料集中的PDF檔案夾中的檔案,共80份公司招股說明書。
環境準備
購買Hologres V4.0及以上版本執行個體並建立資料庫。
本文以large-96core-512GB-384GB、1個節點為例。
模型部署。本次方案使用的模型以及分配的資源為:
模型名稱
模型類別
模型作用描述
單副本CPU(Core)
單副本記憶體
單副本GPU
資源副本數
to_doc
ds4sd/docling-models
將PDF轉換成文檔。
20
100 GB
1卡(48 GB)
1
chunk
recursive-character-text-splitter
文檔切片,每個PDF較大,建議使用切片。
15
30 GB
0卡(0 GB)
1
pdf_embed
BAAI/bge-base-zh-v1.5
文檔Embedding。
7
30 GB
1卡(96 GB)
1
llm
Qwen/Qwen3-32B
使用大模型對檢索出的文檔內容按照提示詞推理.
7
30 GB
1卡(96 GB)
1
說明上述模型的資源均為預設分配的資源。
操作步驟
下載PDF檔案並上傳至OSS。
下載博金大模型挑戰賽-金融千問14b資料集中80份招股書(PDF)。
帳號授權。
登入RAM控制台,建立阿里雲RAM角色並授予OSS的相關許可權。
推薦授予AliyunOSSReadOnlyAccess許可權。
為上述阿里雲RAM角色添加登入和Hologres的存取權限。
阿里雲帳號(主帳號)
修改RAM角色的信任策略。重點需更新如下參數:
Action:更新為
sts:AssumeRole。Service:更新為
hologres.aliyuncs.com。
{ "Statement": [ { "Action": "sts:AssumeRole", "Effect": "Allow", "Principal": { "RAM": [ "acs:ram::1866xxxx:root" ], "Service": [ "hologres.aliyuncs.com" ] } } ], "Version": "1" }RAM使用者(子帳號)
為RAM使用者授權。
為已建立的RAM角色授權。
修改RAM角色的信任策略。重點需更新如下參數:
Action:更新為
sts:AssumeRole。Service:更新為
hologres.aliyuncs.com。
{ "Statement": [ { "Action": "sts:AssumeRole", "Effect": "Allow", "Principal": { "RAM": [ "acs:ram::1866xxxx:root" ], "Service": [ "hologres.aliyuncs.com" ] } } ], "Version": "1" }
對PDF檔案進行Embedding和Chunk。
需要建立Object Table和Dynamic Table對PDF的中繼資料讀取以及加工。因為流程較長,Hologres直接將其封裝為預存程序。該預存程序包括的能力如下:
會建立一張Object Table,用於取PDF的中繼資料。
建立一張增量重新整理模式的Dynamic Table結果表,用於儲存加工後的資料。同時,該表需設定向量索引和全文索引,且Dynamic Table不設定自動重新整理,需要手動重新整理。
Dynamic Table的重新整理過程中會使用
ai_embed、ai_chunk對資料進行Embed和切片。
該預存程序代碼如下:
CALL create_rag_corpus_from_oss( oss_path => 'oss://xxxx/bs_challenge_financial_14b_dataset/pdf', oss_endpoint => 'oss-cn-hangzhou-internal.aliyuncs.com', oss_role_arn => 'acs:ram::186xxxx:role/xxxx', corpus_table => 'public.dt_bs_challenge_financial' );重新整理結果表。
通過如上預存程序建立的Object Table和Dynamic Table均需手動重新整理,才能完成資料加工。該步驟已被封裝為PDF加工預存程序,該預存程序包括的能力如下:
重新整理一次Object Table擷取PDF中繼資料
重新整理一次Dynamic Table,進行PDF的Embedding和Chunk加工。
該預存程序使用代碼如下:
CALL refresh_rag_corpus_table( corpus_table => 'public.dt_bs_challenge_financial' );PDF檢索。
加工好的資料可以根據業務使用情境,通過向量、全文等方式進行檢索。例如:可以根據招股書來查詢某個公司的業績走勢,以此來判斷公司後續的走勢是悲觀還是樂觀,以便對後續的投資意向提供輔助建議。
向量檢索
在向量檢索時,為了檢索方便,我們將問題Embedding和Prompt構建,大模型輸出答案等過程封裝成為向量檢索函數,直接調用如下該預存程序可以實現向量召回。
-- 向量單路召回 + AI重排 SELECT qa_vector_search_retrieval( question => '報告期內,湖南國科微電子股份有限公司2014年度、2015年度、2016年度營業收入和淨利潤分別較上年增長多大幅度?', corpus_table => 'dt_bs_challenge_financial', prompt => '請分析如下業績走勢是悲觀還是樂觀,並給出原因:${question}\n\n 參考資訊:\n\n ${context}' )檢索答案如下:
qa_retrieval --------- "根據提供的資訊,對湖南國科微電子股份有限公司的業績走勢進行分析,可以得出以下結論: ### 一、業績走勢分析:悲觀 #### 1. **營業收入增長乏力** - 2014年度營業收入較上年增長 **15.13%**,但2015年度營業收入卻 **下降5.21%**,2016年度資料未提供,但可以看出營業收入增長趨勢在2015年出現明顯下滑。 - 2012年至2014年營業收入的年複合增長率僅為 **4.47%**,表明公司業務擴張較為緩慢。 #### 2. **淨利潤增長持續下降** - 2014年度淨利潤增長 **5.43%**,2015年度淨利潤 **下降3.29%**。 - 扣除非經常性損益後,2014年度歸屬於母公司股東的淨利潤增長 **-3.14%**,2015年度進一步下降 **-5.60%**,表明公司主營業務盈利能力在持續惡化。 - 2012年至2014年扣除非經常性損益後淨利潤的年複合增長率為 **-4.38%**,遠低於營業收入增長,說明公司主營業務盈利能力不足,增長主要依賴非經常性損益。 #### 3. **非經常性損益佔比偏高** - 報告期內,非經常性損益占淨利潤的比例較高,2014年、2013年、2012年分別為 **17.54%、10.25%、8.06%**,表明公司利潤中有一部分來自政策扶持、政府補貼等非經常性因素,而非核心業務的持續增長。 - 依賴非經常性損益來維持利潤增長,不利於公司長期的穩定發展。 #### 4. **淨資產收益率下降** - 加權平均淨資產收益率從2014年的 **18.10%** 下降到2015年的 **24.82%**,再到2016年的 **28.23%**,雖然資料看似增長,但需注意該指標是以扣除非經常性損益後的淨利潤計算的,而淨利潤本身在下降,因此這種增長可能與資本結構變化有關,而非盈利能力的實質性提升。 ### 二、原因總結 1. **主營業務增長乏力**:營業收入和淨利潤增長均呈現下降趨勢,尤其是淨利潤的下降表明公司盈利能力在減弱。 2. **非經常性損益依賴度高**:公司利潤中非經常性損益佔比較高,說明主營業務的盈利能力不足,公司業績的持久性存疑。 3. **市場競爭激烈**:公司採購的工控機、顯示器、電源等產品市場競爭激烈,價格平穩,利潤空間受到擠壓。 4. **行業環境影響**:不鏽鋼市場價格波動、原材料價格波動可能對公司經營業績造成一定影響,雖然公司已採取措施降低影響,但長期來看仍需關注。 ### 三、結論 總體來看,湖南國科微電子股份有限公司的業績走勢偏 **悲觀**。公司主營業務增長乏力,淨利潤持續下降,對非經常性損益依賴度高,未來盈利能力的可持久性存疑。公司需要加強核心業務的競爭力,最佳化成本結構,提高主營業務盈利能力,以實現長期穩健發展。"全文檢索索引
在全文檢索索引時,為了檢索方便,我們將問題Embedding和Prompt構建,大模型輸出答案等過程封裝為全文檢索索引函數,直接調用如下預存程序可以實現全文召回:
--全文檢索索引召回 SELECT qa_text_search_retrieval( question => '報告期內,湖南國科微電子股份有限公司2014年度、2015年度、2016年度營業收入和淨利潤分別較上年增長多大幅度?', corpus_table => 'dt_bs_challenge_financial', prompt => '請分析如下業績走勢是悲觀還是樂觀,並給出原因:${question}\n\n 參考資訊:\n\n ${context}' );檢索答案如下:
qa_text_search_retrieval ---------------- "根據提供的資訊,湖南國科微電子股份有限公司在2014年、2015年和2016年的業績走勢整體上呈現**悲觀**趨勢,具體原因如下: ### 1. **營業收入增長乏力** - 2014年營業收入增長率為**15.13%**,但2015年營業收入增長率轉為**-5.21%**,即出現負增長。 - 2012年至2014年的營業收入年複合增長率僅為**4.47%**,說明公司營業收入增長較為緩慢,業務發展不夠強勁。 - 2015年上半年營業收入預測與2014年同期相比大致持平,但2015年上半年淨利潤較上年同期**略有下降**,表明盈利能力下降。 ### 2. **淨利潤和扣非淨利潤增長不佳** - 2014年淨利潤增長率為**5.43%**,2015年淨利潤增長率下降為**-3.29%**,即淨利潤出現下滑。 - 扣除非經常性損益後的淨利潤增長率在2014年為**-3.14%**,2015年進一步下降為**-5.60%**,說明公司主營業務的盈利能力持續下降。 - 2012年至2014年扣非淨利潤的年複合增長率為**-4.38%**,明顯低於營業收入的年複合增長率,說明公司利潤品質不高,主營業務盈利能力較弱。 ### 3. **經營活動現金流波動** - 2014年銷售商品、提供勞務收到的現金占營業收入的比例較前兩年有所下降,主要與部分收入確認專案的**收款跨期**有關,說明公司現金流管理存在問題。 - 2013年購買商品、接受勞務支付的現金占營業成本比例較高,主要是由於當年**採購原材料並完成生產**,但部分成本在2014年才結轉,導致2014年該比例較低,反映公司採購和生產節奏不夠穩定。 ### 4. **投資和盈利能力指標** - 加權平均淨資產收益率(ROE)在2014年為**18.10%**,2015年上升至**24.82%**,2016年進一步上升至**28.23%**,雖然有所提升,但ROE的提高可能主要依賴**財務槓桿**,而非核心業務盈利能力的提升。 - 考慮到淨利潤和扣非淨利潤持續下降,ROE的提升並不能完全反映公司經營品質的改善。 ### 5. **2015年上半年業績預測** - 2015年上半年營業收入預計為**8,505萬元至10,395萬元**,與2014年同期的**10,127.35萬元**大致持平,但淨利潤預計為**2,340萬元至2,860萬元**,低於2014年同期的**2,912.66萬元**,說明公司盈利能力下降。 ### 總結 綜合來看,湖南國科微電子股份有限公司在2014年至2016年的業績走勢**偏向悲觀**。雖然ROE有所提升,但營業收入增長乏力、淨利潤和扣非淨利潤持續下降、經營活動現金流波動較大,表明公司主營業務盈利能力較弱,經營品質有待提升。"向量+全文混合檢索
在向量、全文結合Rank排序混合檢索情境中,為了檢索方便,Hologres將其封裝為向量+全文雙路召回+Rank排序函數,該預存程序的能力如下:
根據問題使用向量計算,召回TOP 20的答案。
根據問題使用全文檢索索引,召回TOP 20的答案。
使用
ai_rank,對向量和全文召回的答案進行排序,最後輸出Top1的答案。使用
ai_gen,結合大模型根據提示詞以及檢索的答案,產生最終答案並進行輸出。
-- 全文、向量雙路召回 + AI重排 SELECT qa_hybrid_retrieval( question => '報告期內,湖南國科微電子股份有限公司2014年度、2015年度、2016年度營業收入和淨利潤分別較上年增長多大幅度?', corpus_table => 'dt_bs_challenge_financial', prompt => '請分析如下業績走勢是悲觀還是樂觀,並給出原因:${question}\n\n 參考資訊:\n\n ${context}' );檢索答案如下:
qa_hybrid_retrieval --- "根據提供的資訊,我們可以對湖南國科微電子股份有限公司的業績走勢進行如下分析,並判斷其趨勢是悲觀還是樂觀: --- ### 一、**營業收入走勢分析** 1. **2012-2014年複合增長率**: - 營業收入的年複合增長率為 **4.47%**,表明公司營業收入的增長較為平穩。 - 2014年營業收入為 **18,154.06萬元**,較2013年增長 **15.13%**。 - 2015年營業收入較上年 **下降5.21%**,出現了負增長。 2. **結論**: - 營業收入的增長在2014年有所回升,但2015年出現明顯下滑,表明公司業務擴張遇到了一定阻力。 --- ### 二、**淨利潤走勢分析** 1. **2012-2014年複合增長率**: - 扣除非經常性損益後的淨利潤年複合增長率為 **-4.38%**,低於營業收入的年複合增長率,說明公司盈利能力有所下降。 - 2014年扣非淨利潤為 **42,731,071.18元**,較2013年下降 **3.14%**。 - 2015年扣非淨利潤較上年進一步下降 **5.60%**。 2. **非經常性損益影響**: - 2014年、2013年及2012年非經常性損益占淨利潤的比例分別為 **17.54%、10.25%、8.06%**,呈上升趨勢。 - 非經常性損益的增加主要來自於政府補貼和理財產品收益,而非主營業務帶來的持續增長。 3. **結論**: - 扣非淨利潤連續兩年下降,說明公司主營業務盈利能力減弱,業績增長依賴於非經常性損益,這是令人擔憂的訊號。 --- ### 三、**現金流量與經營穩定性** 1. **經營活動現金流**: - 2014年營業收入為 **18,154.06萬元**,但銷售商品、提供勞務收到的現金並未明確給出,無法判斷現金流是否健康。 - 報告期內,公司銀行存款分別為 **13,063.38萬元、4,152.54萬元、9,864.61萬元**,資金流動性波動較大,但主要客戶、供應商及經營模式保持穩定。 2. **結論**: - 雖然公司現金流存在波動,但客戶和供應商穩定,經營模式未發生重大變化,這為公司未來的發展提供了一定保障。 --- ### 四、**2015年上半年業績預測** - 2015年1至6月預計營業收入為 **8,505萬元至10,395萬元**,較2014年同期的 **4,641.19萬元**有明顯增長。 - 但2015年1至3月淨利潤較上年同期下降 **48.26%**,主要是因為確認收入的專案毛利率較低。 --- ### 五、**綜合分析與判斷** 1. **樂觀因素**: - 2014年營業收入增長較快,達到 **15.13%**。 - 2015年上半年營業收入預計增長顯著,表明公司可能正在逐步恢複。 - 主要客戶、供應商及經營模式保持穩定,為公司提供了良好的營運基礎。 2. **悲觀因素**: - 2015年營業收入較上年 **下降5.21%**,淨利潤也出現下滑。 - 扣非淨利潤連續兩年下降,表明公司主營業務盈利能力不足。 - 非經常性損益佔比上升,業績增長依賴於政府補貼和理財產品收益,缺乏內生增長動力。 - 2015年1至3月淨利潤大幅下滑 **48.26%**,表明短期業績波動較大。 --- ### **最終結論:整體趨勢偏悲觀** - 儘管公司在2014年營業收入有所回升,且2015年上半年預計增長,但 **扣非淨利潤連續下降**、**淨利潤增長依賴非經常性損益**、**短期業績波動較大**,表明公司目前的業績增長缺乏持久性和穩定性。 - 因此,從長期來看,公司業績走勢偏 **悲觀**,需關注其主營業務盈利能力的改善和非經常性損益的依賴問題。 --- ### **建議** 1. 關注公司未來主營業務的盈利能力是否能有所提升。 2. 降低對非經常性損益的依賴,提高內生增長動力。 3. 穩定客戶和供應商關係,最佳化業務結構,提高毛利率。"向量+全文雙路召回+RRF排序
使用向量和全文檢索索引後,通過RRF (Reciprocal Rank Fusion)排序召回結果。為了檢索方便,Hologres已經封裝為向量+全文雙路召回+RRF排序函數(詳細定義見下方附錄),該預存程序的能力如下:
根據問題使用向量計算,召回TOP 20的答案。
根據問題使用全文檢索索引,召回TOP 20的答案。
對向量和全文召回的答案,計算RRF分數,最後輸出Top N的答案。
使用
ai_gen、大模型根據提示詞,以及檢索的答案,拼裝成最終答案並輸出。
-- 全文、向量雙路召回 + RRF重排 SELECT qa_hybrid_retrieval_rrf( question => '報告期內,湖南國科微電子股份有限公司2014年度、2015年度、2016年度營業收入和淨利潤分別較上年增長多大幅度?', corpus_table => 'dt_bs_challenge_financial', prompt => '請分析如下業績走勢是悲觀還是樂觀,並給出原因:${question}\n\n 參考資訊:\n\n ${context}' );檢索答案如下:
qa_hybrid_retrieval_rrf ------------------ "根據提供的資訊,對湖南國科微電子股份有限公司的業績走勢進行分析,可以得出以下結論: ### **業績走勢判斷:悲觀** #### **原因分析如下:** 1. **淨利潤增長低於營業收入增長:** - 提供的資訊指出,公司2012年至2014年的**營業收入年複合增長率為4.47%**,表明公司整體業務增長較為平穩。 - 但**扣除非經常性損益後歸屬於母公司股東的淨利潤年複合增長率為-4.38%**,明顯低於營業收入的增長率。這說明公司在收入增長的同時,盈利能力並未同步提升,甚至出現下滑,可能受到成本上升、毛利率下降或非經常性損益減少等因素影響。 2. **淨利潤波動較大:** - 2015年1至3月的淨利潤較上年同期減少了48.26%,且主要原因在於確認收入的專案毛利率較低(如無錫地鐵一號線專案以模組外購為主)。這表明公司短期業績容易受到業務結構變化的影響,存在一定的不穩定性。 3. **毛利率和盈利能力下降:** - 提到“主營業務利潤對公司淨利潤的貢獻”在2014年較2013年略有下降,而2013年又較2012年下降。這說明公司核心業務的盈利能力可能在減弱,可能受到市場競爭加劇、成本上升或產品結構變化的影響。 4. **2015年上半年預測利潤下降:** - 2015年1至6月預計營業收入為8,505萬元至10,395萬元,與2014年同期大致持平,但淨利潤預計為2,340萬元至2,860萬元,低於2014年同期的2,912.66萬元。這表明公司盈利能力在進一步下降,可能面臨一定的經營壓力。 5. **業務增長乏力:** - 雖然營業收入增長較為平穩,但淨利潤的下降表明公司業務增長的品質不高,未能有效轉化為利潤。這可能影響投資者對公司未來發展的信心。 ### **總結:** 湖南國科微電子股份有限公司的業績走勢整體偏向**悲觀**。雖然營業收入保持了平穩增長,但淨利潤的增長明顯滯後甚至出現負增長,表明公司盈利能力在下降,業務發展品質不高,且存在短期業績波動的風險。如果公司不能有效提升毛利率、控製成本或最佳化產品結構,未來業績可能繼續承壓。"
附錄:預存程序定義
上述文檔中使用的預存程序定義如下,方便您做參考。
在Hologres中不支援建立Function,如下預存程序僅做參考,無法修改後直接執行。
PDF加工預存程序
建立Object Table和Dynamic Table
CREATE OR REPLACE PROCEDURE create_rag_corpus_from_oss( oss_path TEXT, oss_endpoint TEXT, oss_role_arn TEXT, corpus_table TEXT, embedding_model TEXT DEFAULT NULL, parse_document_model TEXT DEFAULT NULL, chunk_model TEXT DEFAULT NULL, chunk_size INT DEFAULT 300, chunk_overlap INT DEFAULT 50, overwrite BOOLEAN DEFAULT FALSE ) AS $$ DECLARE corpus_schema TEXT; corpus_name TEXT; obj_table_name TEXT; full_corpus_ident TEXT; full_obj_ident TEXT; embed_expr TEXT; chunk_expr TEXT; parse_expr TEXT; embedding_dims INT; BEGIN -- 1. 拆 schema name + table name IF position('.' in corpus_table) > 0 THEN corpus_schema := split_part(corpus_table, '.', 1); corpus_name := split_part(corpus_table, '.', 2); ELSE corpus_schema := 'public'; corpus_name := corpus_table; END IF; obj_table_name := corpus_name || '_obj_table'; full_corpus_ident := format('%I.%I', corpus_schema, corpus_name); full_obj_ident := format('%I.%I', corpus_schema, obj_table_name); -- 2. 如果需要覆蓋,先刪表和索引 IF overwrite THEN DECLARE dyn_table_exists BOOLEAN; rec RECORD; BEGIN -- 檢查 dynamic table 是否存在 SELECT EXISTS ( SELECT 1 FROM pg_class c JOIN pg_namespace n ON n.oid = c.relnamespace WHERE c.relname = corpus_name AND n.nspname = corpus_schema ) INTO dyn_table_exists; IF dyn_table_exists THEN -- 2.1 關閉動態表自動重新整理 -- RAISE NOTICE 'Disabling auto refresh for %', full_corpus_ident; -- EXECUTE format('ALTER TABLE IF EXISTS %s SET (auto_refresh_enable=false)', full_corpus_ident); -- 2.2 尋找 RUNNING 重新整理任務並取消 FOR rec IN EXECUTE format( $f$ SELECT query_job_id FROM hologres.hg_dynamic_table_refresh_log(%L) WHERE status = 'RUNNING'; $f$, corpus_table ) LOOP RAISE NOTICE 'Found running refresh job: %', rec.query_job_id; IF hologres.hg_internal_cancel_query_job(rec.query_job_id::bigint) THEN RAISE NOTICE 'Cancel job % succeeded.', rec.query_job_id; ELSE RAISE WARNING 'Cancel job % failed.', rec.query_job_id; END IF; END LOOP; -- 2.3 刪除 Dynamic Table EXECUTE format('DROP TABLE IF EXISTS %s;', full_corpus_ident); ELSE RAISE NOTICE 'Dynamic table % does not exist, skip cancel job and drop.', full_corpus_ident; END IF; -- 2.4 無論如何,Object Table 都要刪除 EXECUTE format('DROP OBJECT TABLE IF EXISTS %s;', full_obj_ident); END; END IF; -- 3. 建立 Object Table RAISE NOTICE 'Create object table: %', obj_table_name; EXECUTE format( $f$ CREATE OBJECT TABLE %s WITH ( path = %L, oss_endpoint = %L, role_arn = %L ); $f$, full_obj_ident, oss_path, oss_endpoint, oss_role_arn ); COMMIT; -- 4. 重新整理 Object Table RAISE NOTICE 'Refresh object table: %', obj_table_name; EXECUTE format('REFRESH OBJECT TABLE %s;', full_obj_ident); COMMIT; -- 5. 文檔解析模型選擇 IF parse_document_model IS NULL OR length(trim(parse_document_model)) = 0 THEN parse_expr := 'ai_parse_document(file, ''auto'', ''markdown'')'; ELSE parse_expr := format( 'ai_parse_document(%L, file, ''auto'', ''markdown'')', parse_document_model ); END IF; -- 6. chunk 模型選擇 IF chunk_model IS NULL OR length(trim(chunk_model)) = 0 THEN chunk_expr := format('ai_chunk(doc, %s, %s)', chunk_size, chunk_overlap); ELSE chunk_expr := format( 'ai_chunk(%L, doc, %s, %s)', chunk_model, chunk_size, chunk_overlap ); END IF; -- 7. embedding 模型選擇 IF embedding_model IS NULL OR length(trim(embedding_model)) = 0 THEN embed_expr := 'ai_embed(chunk)'; EXECUTE 'SELECT array_length(ai_embed(''dummy''), 1)' INTO embedding_dims; ELSE embed_expr := format('ai_embed(%L, chunk)', embedding_model); EXECUTE format( 'SELECT array_length(ai_embed(%L, ''dummy''), 1)', embedding_model ) INTO embedding_dims; END IF; RAISE NOTICE 'embedding dimension is: %', embedding_dims; -- 8. 建立 RAG 輸出動態表 RAISE NOTICE 'create dynamic table: %', corpus_name; EXECUTE format( $f$ CREATE DYNAMIC TABLE %s( CHECK(array_ndims(embedding_vector) = 1 AND array_length(embedding_vector, 1) = %s) ) WITH ( vectors = '{ "embedding_vector": { "algorithm": "HGraph", "distance_method": "Cosine", "builder_params": { "base_quantization_type": "sq8_uniform", "max_degree": 64, "ef_construction": 400, "precise_quantization_type": "fp32", "use_reorder": true } } }', auto_refresh_mode = 'incremental', freshness = '5 minutes', auto_refresh_enable = 'false' ) AS WITH parsed_doc AS ( SELECT object_uri, etag, %s AS doc FROM %s ), chunked_doc AS ( SELECT object_uri, etag, unnest(%s) AS chunk FROM parsed_doc ) SELECT object_uri, etag, chunk, %s AS embedding_vector FROM chunked_doc; $f$, full_corpus_ident, embedding_dims, parse_expr, full_obj_ident, chunk_expr, embed_expr ); COMMIT; -- 9. 建立全文索引(索引名 = 表名 || '_fulltext_idx') EXECUTE format( 'CREATE INDEX %I ON %s USING FULLTEXT (chunk);', corpus_name || '_fulltext_idx', full_corpus_ident ); RAISE NOTICE ''; RAISE NOTICE 'Create RAG corpus success to table: %', corpus_table; RAISE NOTICE ' Vector index is: %.embedding_vector', corpus_table; RAISE NOTICE ' TextSearch index is: %.chunk', corpus_table; END; $$ LANGUAGE plpgsql;重新整理Object Table和Dynamic Table預存程序
CREATE OR REPLACE PROCEDURE refresh_rag_corpus_table( corpus_table TEXT ) AS $$ DECLARE corpus_schema TEXT; corpus_name TEXT; obj_table_name TEXT; full_corpus_ident TEXT; full_obj_ident TEXT; BEGIN -- 1. 解析 schema 和表名 IF position('.' in corpus_table) > 0 THEN corpus_schema := split_part(corpus_table, '.', 1); corpus_name := split_part(corpus_table, '.', 2); ELSE corpus_schema := 'public'; corpus_name := corpus_table; END IF; obj_table_name := corpus_name || '_obj_table'; full_corpus_ident := format('%I.%I', corpus_schema, corpus_name); full_obj_ident := format('%I.%I', corpus_schema, obj_table_name); -- 2. 重新整理 Object Table RAISE NOTICE 'Refreshing Object Table: %', obj_table_name; EXECUTE format('REFRESH OBJECT TABLE %s;', full_obj_ident); -- 3. 重新整理 Dynamic Table RAISE NOTICE 'Refreshing Dynamic Table: %', corpus_name; EXECUTE format('REFRESH TABLE %s;', full_corpus_ident); RAISE NOTICE 'Refresh complete for corpus table %', corpus_table; END; $$ LANGUAGE plpgsql;刪除Object Table和Dynamic Table預存程序
CREATE OR REPLACE PROCEDURE drop_rag_corpus_table( corpus_table TEXT ) AS $$ DECLARE corpus_schema TEXT; corpus_name TEXT; obj_table_name TEXT; full_corpus_ident TEXT; full_obj_ident TEXT; rec RECORD; BEGIN -- 1. 解析 schema 和表名 IF position('.' in corpus_table) > 0 THEN corpus_schema := split_part(corpus_table, '.', 1); corpus_name := split_part(corpus_table, '.', 2); ELSE corpus_schema := 'public'; corpus_name := corpus_table; END IF; obj_table_name := corpus_name || '_obj_table'; full_corpus_ident := format('%I.%I', corpus_schema, corpus_name); full_obj_ident := format('%I.%I', corpus_schema, obj_table_name); -- 2. 刪除表 -- 2.1 關閉動態表自動重新整理 -- RAISE NOTICE 'Disabling auto refresh for %', full_corpus_ident; -- EXECUTE format('ALTER TABLE IF EXISTS %s SET (auto_refresh_enable=false)', full_corpus_ident); -- 2.2 尋找 RUNNING 重新整理任務並取消 FOR rec IN EXECUTE format( $f$ SELECT query_job_id FROM hologres.hg_dynamic_table_refresh_log(%L) WHERE status = 'RUNNING'; $f$, corpus_table ) LOOP RAISE NOTICE 'Found running refresh job: %', rec.query_job_id; IF hologres.hg_internal_cancel_query_job(rec.query_job_id::bigint) THEN RAISE NOTICE 'Cancel job % succeeded.', rec.query_job_id; ELSE RAISE WARNING 'Cancel job % failed.', rec.query_job_id; END IF; END LOOP; -- 2.3 刪除 Dynamic Table RAISE NOTICE 'Dropping Dynamic Table: %', corpus_name; EXECUTE format('DROP TABLE IF EXISTS %s;', full_corpus_ident); -- 2.4 刪除 Object Table RAISE NOTICE 'Dropping Object Table: %', obj_table_name; EXECUTE format('DROP OBJECT TABLE IF EXISTS %s;', full_obj_ident); RAISE NOTICE 'Drop complete for corpus: %', corpus_table; END; $$ LANGUAGE plpgsql;
向量檢索函數
-- RAG向量單路召回問答
CREATE OR REPLACE FUNCTION qa_vector_search_retrieval(
question TEXT,
corpus_table TEXT,
embedding_model TEXT DEFAULT NULL,
llm_model TEXT DEFAULT NULL,
ranking_model TEXT DEFAULT NULL,
prompt TEXT DEFAULT 'Please answer the following question in ${language} based on the reference information.\n\n Question: ${question}\n\n Reference information:\n\n ${context}',
language TEXT DEFAULT 'Chinese',
vector_recall_count INT DEFAULT 20,
rerank_recall_count INT DEFAULT 5,
vector_col TEXT DEFAULT 'embedding_vector'
)
RETURNS TEXT AS
$$
DECLARE
final_answer TEXT;
sql TEXT;
embedding_expr TEXT;
ai_rank_expr TEXT;
ai_gen_expr TEXT;
embedding_model_valid BOOLEAN;
llm_model_valid BOOLEAN;
ranking_model_valid BOOLEAN;
BEGIN
embedding_model_valid := (embedding_model IS NOT NULL AND trim(embedding_model) != '');
llm_model_valid := (llm_model IS NOT NULL AND trim(llm_model) != '');
ranking_model_valid := (ranking_model IS NOT NULL AND trim(ranking_model) != '');
IF embedding_model_valid THEN
embedding_expr := 'ai_embed(' || quote_literal(embedding_model) || ', ' || quote_literal(question) || ')';
ELSE
embedding_expr := 'ai_embed(' || quote_literal(question) || ')';
END IF;
IF ranking_model_valid THEN
ai_rank_expr := 'ai_rank(' || quote_literal(ranking_model) || ', ' || quote_literal(question) || ', chunk)';
ELSE
ai_rank_expr := 'ai_rank(' || quote_literal(question) || ', chunk)';
END IF;
IF llm_model_valid THEN
ai_gen_expr := 'ai_gen(' || quote_literal(llm_model) ||
', replace(replace(replace(' || quote_literal(prompt) ||
', ''${question}'', ' || quote_literal(question) || '), ''${context}'', merged_chunks), ''${language}'', ' || quote_literal(language) || ') )';
ELSE
ai_gen_expr := 'ai_gen(replace(replace(replace(' || quote_literal(prompt) ||
', ''${question}'', ' || quote_literal(question) || '), ''${context}'', merged_chunks), ''${language}'', ' || quote_literal(language) || '))';
END IF;
sql := '
WITH
embedding_recall AS (
SELECT
chunk,
approx_cosine_distance(' || vector_col || ', ' || embedding_expr || ') AS distance
FROM
' || corpus_table || '
ORDER BY
distance DESC
LIMIT ' || vector_recall_count || '
),
rerank AS (
SELECT
chunk,
' || ai_rank_expr || ' AS score
FROM
embedding_recall
ORDER BY
score DESC
LIMIT ' || rerank_recall_count || '
),
concat_top_chunks AS (
SELECT string_agg(chunk, E''\n\n----\n\n'') AS merged_chunks FROM rerank
)
SELECT ' || ai_gen_expr || '
FROM concat_top_chunks;
';
EXECUTE sql INTO final_answer;
RETURN final_answer;
END;
$$ LANGUAGE plpgsql;全文檢索索引函數
CREATE OR REPLACE FUNCTION qa_text_search_retrieval(
question TEXT,
corpus_table TEXT,
llm_model TEXT DEFAULT NULL,
ranking_model TEXT DEFAULT NULL,
prompt TEXT DEFAULT 'Please answer the following question in ${language} based on the reference information.\n\n Question: ${question}\n\n Reference information:\n\n ${context}',
language TEXT DEFAULT 'Chinese',
text_search_recall_count INT DEFAULT 20,
rerank_recall_count INT DEFAULT 5,
text_search_col TEXT DEFAULT 'chunk'
)
RETURNS TEXT AS
$$
DECLARE
final_answer TEXT;
sql TEXT;
ai_rank_expr TEXT;
ai_gen_expr TEXT;
llm_model_valid BOOLEAN;
ranking_model_valid BOOLEAN;
BEGIN
llm_model_valid := (llm_model IS NOT NULL AND trim(llm_model) != '');
ranking_model_valid := (ranking_model IS NOT NULL AND trim(ranking_model) != '');
IF ranking_model_valid THEN
ai_rank_expr := 'ai_rank(' || quote_literal(ranking_model) || ', ' || quote_literal(question) || ', chunk)';
ELSE
ai_rank_expr := 'ai_rank(' || quote_literal(question) || ', chunk)';
END IF;
IF llm_model_valid THEN
ai_gen_expr := 'ai_gen(' || quote_literal(llm_model) ||
', replace(replace(replace(' || quote_literal(prompt) ||
', ''${question}'', ' || quote_literal(question) ||
'), ''${context}'', merged_chunks), ''${language}'', ' || quote_literal(language) || ') )';
ELSE
ai_gen_expr := 'ai_gen(replace(replace(replace(' || quote_literal(prompt) ||
', ''${question}'', ' || quote_literal(question) ||
'), ''${context}'', merged_chunks), ''${language}'', ' || quote_literal(language) || '))';
END IF;
sql := '
WITH
text_search_recall AS (
SELECT
chunk
FROM
' || corpus_table || '
ORDER BY
text_search(' || text_search_col || ', ' || quote_literal(question) || ') DESC
LIMIT ' || text_search_recall_count || '
),
rerank AS (
SELECT
chunk,
' || ai_rank_expr || ' AS score
FROM
text_search_recall
ORDER BY
score DESC
LIMIT ' || rerank_recall_count || '
),
concat_top_chunks AS (
SELECT string_agg(chunk, E''\n\n----\n\n'') AS merged_chunks FROM rerank
)
SELECT ' || ai_gen_expr || '
FROM concat_top_chunks;
';
EXECUTE sql INTO final_answer;
RETURN final_answer;
END;
$$ LANGUAGE plpgsql;向量+全文雙路召回+Rank排序函數
CREATE OR REPLACE FUNCTION qa_hybrid_retrieval(
question TEXT,
corpus_table TEXT,
embedding_model TEXT DEFAULT NULL,
llm_model TEXT DEFAULT NULL,
ranking_model TEXT DEFAULT NULL,
prompt TEXT DEFAULT 'Please answer the following question in ${language} based on the reference information.\n\n Question: ${question}\n\n Reference information:\n\n ${context}',
language TEXT DEFAULT 'Chinese',
text_search_recall_count INT DEFAULT 20,
vector_recall_count INT DEFAULT 20,
rerank_recall_count INT DEFAULT 5,
vector_col TEXT DEFAULT 'embedding_vector',
text_search_col TEXT DEFAULT 'chunk'
)
RETURNS TEXT AS
$$
DECLARE
final_answer TEXT;
sql TEXT;
embedding_expr TEXT;
ai_rank_expr TEXT;
ai_gen_expr TEXT;
embedding_model_valid BOOLEAN;
llm_model_valid BOOLEAN;
ranking_model_valid BOOLEAN;
BEGIN
embedding_model_valid := (embedding_model IS NOT NULL AND trim(embedding_model) != '');
llm_model_valid := (llm_model IS NOT NULL AND trim(llm_model) != '');
ranking_model_valid := (ranking_model IS NOT NULL AND trim(ranking_model) != '');
IF embedding_model_valid THEN
embedding_expr := 'ai_embed(' || quote_literal(embedding_model) || ', ' || quote_literal(question) || ')';
ELSE
embedding_expr := 'ai_embed(' || quote_literal(question) || ')';
END IF;
IF ranking_model_valid THEN
ai_rank_expr := 'ai_rank(' || quote_literal(ranking_model) || ', ' || quote_literal(question) || ', chunk)';
ELSE
ai_rank_expr := 'ai_rank(' || quote_literal(question) || ', chunk)';
END IF;
IF llm_model_valid THEN
ai_gen_expr := 'ai_gen(' || quote_literal(llm_model) ||
', replace(replace(replace(' || quote_literal(prompt) ||
', ''${question}'', ' || quote_literal(question) || '), ''${context}'', merged_chunks), ''${language}'', ' || quote_literal(language) || ') )';
ELSE
ai_gen_expr := 'ai_gen(replace(replace(replace(' || quote_literal(prompt) ||
', ''${question}'', ' || quote_literal(question) || '), ''${context}'', merged_chunks), ''${language}'', ' || quote_literal(language) || '))';
END IF;
sql := '
WITH
embedding_recall AS (
SELECT
chunk
FROM
' || corpus_table || '
ORDER BY
approx_cosine_distance(' || vector_col || ', ' || embedding_expr || ') DESC
LIMIT ' || vector_recall_count || '
),
text_search_recall AS (
SELECT
chunk
FROM
' || corpus_table || '
ORDER BY
text_search(' || text_search_col || ', ' || quote_literal(question) || ') DESC
LIMIT ' || text_search_recall_count || '
),
union_recall AS (
SELECT chunk FROM embedding_recall
UNION
SELECT chunk FROM text_search_recall
),
rerank AS (
SELECT
chunk,
' || ai_rank_expr || ' AS score
FROM
union_recall
ORDER BY
score DESC
LIMIT ' || rerank_recall_count || '
),
concat_top_chunks AS (
SELECT string_agg(chunk, E''\n\n----\n\n'') AS merged_chunks FROM rerank
)
SELECT ' || ai_gen_expr || '
FROM concat_top_chunks;
';
EXECUTE sql INTO final_answer;
RETURN final_answer;
END;
$$ LANGUAGE plpgsql;向量+全文雙路召回+RRF排序函數
CREATE OR REPLACE FUNCTION qa_hybrid_retrieval_rrf(
question TEXT,
corpus_table TEXT,
embedding_model TEXT DEFAULT NULL,
llm_model TEXT DEFAULT NULL,
ranking_model TEXT DEFAULT NULL,
prompt TEXT DEFAULT 'Please answer the following question in ${language} based on the reference information.\n\n Question: ${question}\n\n Reference information:\n\n ${context}',
language TEXT DEFAULT 'Chinese',
text_search_recall_count INT DEFAULT 20,
vector_recall_count INT DEFAULT 20,
rerank_recall_count INT DEFAULT 5,
rrf_k INT DEFAULT 60,
vector_col TEXT DEFAULT 'embedding_vector',
text_search_col TEXT DEFAULT 'chunk'
)
RETURNS TEXT AS
$$
DECLARE
final_answer TEXT;
sql TEXT;
embedding_expr TEXT;
ai_rank_expr TEXT;
ai_gen_expr TEXT;
embedding_model_valid BOOLEAN;
llm_model_valid BOOLEAN;
ranking_model_valid BOOLEAN;
BEGIN
embedding_model_valid := (embedding_model IS NOT NULL AND trim(embedding_model) <> '');
llm_model_valid := (llm_model IS NOT NULL AND trim(llm_model) <> '');
ranking_model_valid := (ranking_model IS NOT NULL AND trim(ranking_model) <> '');
IF embedding_model_valid THEN
embedding_expr := 'ai_embed(' || quote_literal(embedding_model) || ', ' || quote_literal(question) || ')';
ELSE
embedding_expr := 'ai_embed(' || quote_literal(question) || ')';
END IF;
IF ranking_model_valid THEN
ai_rank_expr := 'ai_rank(' || quote_literal(ranking_model) || ', ' || quote_literal(question) || ', chunk)';
ELSE
ai_rank_expr := 'ai_rank(' || quote_literal(question) || ', chunk)';
END IF;
IF llm_model_valid THEN
ai_gen_expr := 'ai_gen(' || quote_literal(llm_model) ||
', replace(replace(replace(' || quote_literal(prompt) ||
', ''${question}'', ' || quote_literal(question) || '), ''${context}'', merged_chunks), ''${language}'', ' || quote_literal(language) || ') )';
ELSE
ai_gen_expr := 'ai_gen(replace(replace(replace(' || quote_literal(prompt) ||
', ''${question}'', ' || quote_literal(question) || '), ''${context}'', merged_chunks), ''${language}'', ' || quote_literal(language) || '))';
END IF;
sql := '
WITH embedding_recall AS (
SELECT
chunk,
vec_score,
ROW_NUMBER() OVER (ORDER BY vec_score DESC) AS rank_vec
FROM (
SELECT
chunk,
approx_cosine_distance(' || vector_col || ', ' || embedding_expr || ') AS vec_score
FROM
' || corpus_table || '
) t
ORDER BY vec_score DESC
LIMIT ' || vector_recall_count || '
),
text_search_recall AS (
SELECT
chunk,
text_score,
ROW_NUMBER() OVER (ORDER BY text_score DESC) AS rank_text
FROM (
SELECT
chunk,
text_search(' || text_search_col || ', ' || quote_literal(question) || ') AS text_score
FROM
' || corpus_table || '
) ts
WHERE text_score > 0
ORDER BY text_score DESC
LIMIT ' || text_search_recall_count || '
),
rrf_scores AS (
SELECT
chunk,
SUM(1.0 / (' || rrf_k || ' + rank_val)) AS rrf_score
FROM (
SELECT chunk, rank_vec AS rank_val FROM embedding_recall
UNION ALL
SELECT chunk, rank_text AS rank_val FROM text_search_recall
) sub
GROUP BY chunk
),
top_chunks AS (
SELECT chunk
FROM rrf_scores
ORDER BY rrf_score DESC
LIMIT ' || rerank_recall_count || '
),
concat_top_chunks AS (
SELECT string_agg(chunk, E''\n\n----\n\n'') AS merged_chunks FROM top_chunks
)
SELECT ' || ai_gen_expr || '
FROM concat_top_chunks;
';
EXECUTE sql INTO final_answer;
RETURN final_answer;
END;
$$ LANGUAGE plpgsql;