Run SQL. Get AI-powered answers from your PDF documents—no external embeddings service, no dedicated vector store, no ETL pipeline to maintain.
This guide walks through building a Retrieval-Augmented Generation (RAG) system on top of Hologres using a financial dataset: 80 company prospectuses sourced from ModelScope. By the end, you'll have a pipeline that ingests PDFs from Object Storage Service (OSS), chunks and embeds them incrementally, and answers natural language questions using four retrieval strategies.
How it works
The pipeline has three stages:
Ingest — An Object Table reads PDF metadata directly from OSS. A Dynamic Table in incremental refresh mode processes new files automatically, applying
ai_parse_document,ai_chunk, andai_embedin sequence. Only new or changed files are processed on each refresh, so you're not re-embedding the entire corpus every time.Index — The Dynamic Table's sink table holds chunked text and its embedding vector. A vector index (HGraph algorithm) and a full-text index are built on this table at creation time.
Retrieve and answer — At query time, embed the question with
ai_embed, run one of the four retrieval functions, rerank or fuse results, then pass the top chunks toai_gento generate a final answer.
All of this runs in standard SQL. No Python orchestration layer, no external API calls to a separate AI service.
Key concepts
| Concept | What it does in this pipeline |
|---|---|
| Object Table | Reads unstructured files (PDF, image, PPT) from OSS as a queryable table |
| Dynamic Table | Materializes query results incrementally; triggers embedding and chunking only on new data |
| AI functions | SQL operators (ai_embed, ai_chunk, ai_gen, ai_rank, ai_summarize) powered by built-in large language models (LLMs) |
| Vector search | Approximate nearest neighbor (ANN) search using cosine distance on embedding vectors |
| Full-text search | Inverted index search with tokenization; supports keyword matching and phrase queries |
When to use each retrieval mode
Four retrieval functions are available. Choose based on your query characteristics:
| Mode | Best for | Not recommended for | Function |
|---|---|---|---|
| Vector search | Semantic questions, intent matching, queries where exact words don't appear in the document | Lookups requiring exact financial figures or codes | qa_vector_search_retrieval |
| Full-text search | Exact term lookup, financial figures, company names, specific codes | Open-ended semantic questions | qa_text_search_retrieval |
| Hybrid with reranking | Mixed queries needing both semantic understanding and keyword precision; uses ai_rank to select the single best chunk | Environments without a fine-tuned reranking model | qa_hybrid_retrieval |
| Hybrid with RRF | Same as hybrid, but uses Reciprocal Rank Fusion (RRF) scoring instead of a model-based reranker; faster and more predictable | When you have a fine-tuned reranker available | qa_hybrid_retrieval_rrf |
For financial document Q&A—where users ask about specific figures but also need contextual analysis—hybrid search typically outperforms either mode alone.
Prerequisites
Before you begin, ensure you have:
A Hologres instance V4.0 or later with a database created (Purchase a Hologres instance, Create a database)
AI resources purchased for your instance (Purchase AI resources) — this guide uses one large-96core-512GB-384GB node
The following models deployed (Deploy AI models):
| Model name | Model | Purpose | vCPUs | Memory | GPU | Replicas |
|---|---|---|---|---|---|---|
to_doc | ds4sd/docling-models | Convert PDF to structured document | 20 | 100 GB | 1 card (48 GB) | 1 |
chunk | recursive-character-text-splitter | Chunk documents into segments | 15 | 30 GB | 0 | 1 |
pdf_embed | BAAI/bge-base-zh-v1.5 | Generate embedding vectors | 7 | 30 GB | 1 card (96 GB) | 1 |
llm | Qwen/Qwen3-32B | Generate answers from retrieved context | 7 | 30 GB | 1 card (96 GB) | 1 |
The table above uses the default resource allocations for each model.
The Bosera-JM 14B Challenge Dataset for Finance downloaded locally (80 company prospectuses in the
pdf/folder)
Set up the pipeline
Step 1: Upload PDFs to OSS
Log on to the OSS console, create a bucket, and upload the 80 PDF files to your chosen bucket path. For upload instructions, see Simple upload.
Step 2: Grant permissions
Hologres needs a RAM role that can read from your OSS bucket.
For an Alibaba Cloud root account:
Log on to the Resource Access Management (RAM) console and create a RAM role.
Grant the role the AliyunOSSReadOnlyAccess permission.
Update the trust policy so Hologres can assume the role:
{
"Statement": [
{
"Action": "sts:AssumeRole",
"Effect": "Allow",
"Principal": {
"RAM": [
"acs:ram::1866xxxx:root"
],
"Service": [
"hologres.aliyuncs.com"
]
}
}
],
"Version": "1"
}Set Action to sts:AssumeRole and Service to hologres.aliyuncs.com.
For a RAM user (sub-account):
On the Access Control > Permission Model page, click Create Policy and select Script Editor. Paste the following policy, replacing
<RoleARN>with your role's ARN:
{
"Version": "1",
"Statement": [
{
"Effect": "Allow",
"Action": "hologram:GrantAssumeRole",
"Resource": "<RoleARN>"
}
]
}This policy lets Hologres check whether the RAM user has permission to use the RAM role.
On the Identity Management > User page, click Add Permissions for the target RAM user and attach the policy you created. For details, see Manage permissions for RAM users.
Update the trust policy for the RAM role using the same JSON shown above.
Step 3: Create the Object Table and Dynamic Table
Call create_rag_corpus_from_oss to set up the entire ingestion layer in one step. This stored procedure:
Creates an Object Table that reads PDF metadata from your OSS path
Creates a Dynamic Table (incremental refresh mode) as the sink for processed chunks
Builds a vector index and a full-text index on the sink table
CALL create_rag_corpus_from_oss(
oss_path => 'oss://xxxx/bs_challenge_financial_14b_dataset/pdf',
oss_endpoint => 'oss-cn-hangzhou-internal.aliyuncs.com',
oss_role_arn => 'acs:ram::186xxxx:role/xxxx',
corpus_table => 'public.dt_bs_challenge_financial'
);Replace the placeholder values:
| Parameter | Description | Example |
|---|---|---|
oss_path | OSS URI pointing to your PDF folder | oss://<bucket>/<path>/pdf |
oss_endpoint | Internal OSS endpoint for your region | oss-cn-hangzhou-internal.aliyuncs.com |
oss_role_arn | ARN of the RAM role you created in Step 2 | acs:ram::186xxxx:role/xxxx |
corpus_table | Target table name (schema.table format) | public.dt_bs_challenge_financial |
Step 4: Process the PDFs
After the tables are created, trigger the first data load by calling refresh_rag_corpus_table. This refreshes the Object Table to pick up file metadata, then refreshes the Dynamic Table to parse, chunk, and embed each PDF.
CALL refresh_rag_corpus_table(
corpus_table => 'public.dt_bs_challenge_financial'
);Processing 80 PDFs takes time. The Dynamic Table uses incremental refresh, so subsequent calls to refresh_rag_corpus_table only process new or changed files.
Query the corpus
The following examples all use the same question against Goke Microelectronics' prospectus. This lets you compare how each retrieval mode handles the same query.
Question: *By what percentage did the operating income and net profit of Goke Microelectronics increase year-over-year in 2014, 2015, and 2016 during the reporting period?*
Vector search
qa_vector_search_retrieval embeds the question, retrieves the top 20 semantically similar chunks, reranks them with ai_rank, and passes the top 5 to the LLM.
When to use: The query is conceptual or the exact wording differs from the source document. Vector search excels at intent matching.
-- Vector-only retrieval + AI reranking
SELECT qa_vector_search_retrieval(
question => 'By what percentage did the operating income and net profit of Goke Microelectronics increase year-over-year in 2014, 2015, and 2016 during the reporting period?',
corpus_table => 'dt_bs_challenge_financial',
prompt => 'Please analyze whether the following performance trend is pessimistic or optimistic and provide reasons: ${question}\n\n Reference information:\n\n ${context}'
);Sample output:
"Based on the information provided, the analysis of the performance trend of Goke Microelectronics leads to the following conclusions:
### I. Performance Trend Analysis: Pessimistic
#### 1. **Sluggish revenue growth**
- Revenue in 2014 increased by **15.13%** year-over-year, but in 2015, it **decreased by 5.21%**. Data for 2016 is not provided, but it is clear that the revenue growth trend saw a significant decline in 2015.
- The compound annual growth rate (CAGR) of revenue from 2012 to 2014 was only **4.47%**, indicating slow business expansion.
#### 2. **Continuous decline in net profit growth**
- Net profit grew by **5.43%** in 2014 but **decreased by 3.29%** in 2015.
- After deducting non-recurring gains and losses, the net profit attributable to parent company shareholders decreased by **3.14%** in 2014 and further reduced by **5.60%** in 2015.
- The CAGR of net profit after deducting non-recurring gains and losses from 2012 to 2014 was **-4.38%**—much lower than the revenue growth—indicating that the main business is not profitable and growth relies on non-recurring gains and losses.
...
### III. Conclusion
Overall, the performance trend of Goke Microelectronics is **pessimistic**."Full-text search
qa_text_search_retrieval retrieves chunks using an inverted index, reranks with ai_rank, and generates an answer.
When to use: The question contains specific financial figures, company names, or terms that appear verbatim in the documents. Full-text search is faster and more precise for exact-term lookups.
-- Full-text search retrieval
SELECT qa_text_search_retrieval(
question => 'By what percentage did the operating income and net profit of Goke Microelectronics increase year-over-year in 2014, 2015, and 2016 during the reporting period?',
corpus_table => 'dt_bs_challenge_financial',
prompt => 'Please analyze whether the following performance trend is pessimistic or optimistic and provide reasons: ${question}\n\n Reference information:\n\n ${context}'
);Sample output:
"Based on the information provided, the overall performance trend of Goke Microelectronics in 2014, 2015, and 2016 is **pessimistic**.
### 1. **Sluggish revenue growth**
- Revenue growth rate in 2014 was **15.13%**, but turned to **-5.21%** in 2015.
- The CAGR of revenue from 2012 to 2014 was only **4.47%**.
### 2. **Poor growth in net profit**
- Net profit growth rate in 2014 was **5.43%**, decreased to **-3.29%** in 2015.
- Net profit after deducting non-recurring gains and losses: **-3.14%** in 2014, **-5.60%** in 2015.
- CAGR of net profit excluding non-recurring items from 2012 to 2014 was **-4.38%**.
..."Hybrid search with reranking
qa_hybrid_retrieval runs vector search and full-text search in parallel (top 20 each), unions the results, reranks all candidates with ai_rank, and selects the top chunk to send to the LLM.
When to use: Queries mix semantic intent with specific terminology. For financial document Q&A, this mode typically produces richer, more structured analysis than either single-mode approach.
-- Full-text and vector dual-retrieval + AI reranking
SELECT qa_hybrid_retrieval(
question => 'By what percentage did the operating revenue and net profit of Hunan Goke Microelectronics Co., Ltd. increase year-over-year in 2014, 2015, and 2016 during the reporting period?',
corpus_table => 'dt_bs_challenge_financial',
prompt => 'Please analyze whether the following performance trend is pessimistic or optimistic and provide reasons: ${question}\n\n Reference information:\n\n ${context}'
);Sample output (the hybrid mode produced a more structured, multi-dimensional analysis):
"Based on the information provided, we can analyze the performance trend of Goke Microelectronics as follows:
### I. **Operating revenue trend analysis**
1. **2012–2014 CAGR**: 4.47%, with operating income of CNY 181.5406 million in 2014—an increase of 15.13% from 2013.
2. In 2015, operating income **decreased by 5.21%** year-over-year.
### II. **Net profit trend analysis**
1. CAGR of net profit after deducting non-recurring gains and losses: **-4.38%** from 2012 to 2014.
2. Non-recurring gains and losses as a proportion of net profit: **17.54%** (2014), **10.25%** (2013), **8.06%** (2012).
...
### **Final conclusion: overall trend is pessimistic**
The continuous decline in net profit excluding non-recurring items, reliance on non-recurring gains and losses, and large short-term performance fluctuations indicate that performance growth lacks sustainability."Hybrid search with RRF
qa_hybrid_retrieval_rrf uses Reciprocal Rank Fusion (RRF) instead of a model-based reranker to combine vector and full-text results.
When to use: Use RRF when you want deterministic, model-free fusion. Use qa_hybrid_retrieval when you have a fine-tuned reranking model available.
How RRF scoring works: Each chunk receives a score based on its rank position in both the vector and full-text result lists. A chunk ranked 3rd in vector search and 9th in full-text search gets a score of 1/(60+3) + 1/(60+9) ≈ 0.0159 + 0.0145 = 0.0304. The constant 60 is the rrf_k parameter. A higher rrf_k reduces the weight difference between rank positions, making scores more uniform across the list. The default value of 60 is a standard choice that balances rank position sensitivity without over-weighting the top result.
-- Full-text and vector dual-retrieval + RRF reranking
SELECT qa_hybrid_retrieval_rrf(
question => 'By what percentage did the operating income and net profit of Goke Microelectronics increase year-over-year in 2014, 2015, and 2016 during the reporting period?',
corpus_table => 'dt_bs_challenge_financial',
prompt => 'Please analyze whether the following performance trend is pessimistic or optimistic and provide reasons: ${question}\n\n Reference information:\n\n ${context}'
);Sample output:
"Based on the information provided, the analysis of the performance trend of Goke Microelectronics leads to the following conclusions:
### **Performance trend judgment: Pessimistic**
1. **Net profit growth is lower than revenue growth:** CAGR of operating income from 2012 to 2014 was 4.47%, but CAGR of net profit after deducting non-recurring gains and losses was **-4.38%**—significantly lower.
2. **Significant fluctuations in net profit:** Net profit from January to March 2015 decreased by 48.26% year-over-year, mainly due to lower gross profit margins on recognized projects.
3. **Projected profit decline in H1 2015:** Estimated net profit of CNY 23.40M–28.60M, lower than H1 2014's CNY 29.1266M.
### **Summary:**
Although operating revenue maintained stable growth, net profit growth significantly lagged or turned negative. If the company cannot increase gross profit margins or control costs, future performance may continue under pressure."Appendix: Stored procedure and function definitions
Hologres does not support user-created functions. The definitions below are provided for reference only. Do not modify or execute them directly.
PDF processing stored procedures
Create the Object Table and Dynamic Table
CREATE OR REPLACE PROCEDURE create_rag_corpus_from_oss(
oss_path TEXT,
oss_endpoint TEXT,
oss_role_arn TEXT,
corpus_table TEXT,
embedding_model TEXT DEFAULT NULL,
parse_document_model TEXT DEFAULT NULL,
chunk_model TEXT DEFAULT NULL,
chunk_size INT DEFAULT 300,
chunk_overlap INT DEFAULT 50,
overwrite BOOLEAN DEFAULT FALSE
)
AS $$
DECLARE
corpus_schema TEXT;
corpus_name TEXT;
obj_table_name TEXT;
full_corpus_ident TEXT;
full_obj_ident TEXT;
embed_expr TEXT;
chunk_expr TEXT;
parse_expr TEXT;
embedding_dims INT;
BEGIN
-- 1. Split the schema name and table name.
IF position('.' in corpus_table) > 0 THEN
corpus_schema := split_part(corpus_table, '.', 1);
corpus_name := split_part(corpus_table, '.', 2);
ELSE
corpus_schema := 'public';
corpus_name := corpus_table;
END IF;
obj_table_name := corpus_name || '_obj_table';
full_corpus_ident := format('%I.%I', corpus_schema, corpus_name);
full_obj_ident := format('%I.%I', corpus_schema, obj_table_name);
-- 2. If overwrite is needed, drop the table and index first.
IF overwrite THEN
DECLARE
dyn_table_exists BOOLEAN;
rec RECORD;
BEGIN
-- Check if the dynamic table exists.
SELECT EXISTS (
SELECT 1
FROM pg_class c
JOIN pg_namespace n ON n.oid = c.relnamespace
WHERE c.relname = corpus_name
AND n.nspname = corpus_schema
)
INTO dyn_table_exists;
IF dyn_table_exists THEN
-- 2.2 Find and cancel RUNNING refresh tasks.
FOR rec IN
EXECUTE format(
$f$
SELECT query_job_id
FROM hologres.hg_dynamic_table_refresh_log(%L)
WHERE status = 'RUNNING';
$f$,
corpus_table
)
LOOP
RAISE NOTICE 'Found running refresh job: %', rec.query_job_id;
IF hologres.hg_internal_cancel_query_job(rec.query_job_id::bigint) THEN
RAISE NOTICE 'Cancel job % succeeded.', rec.query_job_id;
ELSE
RAISE WARNING 'Cancel job % failed.', rec.query_job_id;
END IF;
END LOOP;
-- 2.3 Drop the Dynamic Table.
EXECUTE format('DROP TABLE IF EXISTS %s;', full_corpus_ident);
ELSE
RAISE NOTICE 'Dynamic table % does not exist, skip cancel job and drop.', full_corpus_ident;
END IF;
-- 2.4 In any case, the Object Table must be dropped.
EXECUTE format('DROP OBJECT TABLE IF EXISTS %s;', full_obj_ident);
END;
END IF;
-- 3. Create an Object Table.
RAISE NOTICE 'Create object table: %', obj_table_name;
EXECUTE format(
$f$
CREATE OBJECT TABLE %s
WITH (
path = %L,
oss_endpoint = %L,
role_arn = %L
);
$f$,
full_obj_ident,
oss_path,
oss_endpoint,
oss_role_arn
);
COMMIT;
-- 4. Refresh the Object Table.
RAISE NOTICE 'Refresh object table: %', obj_table_name;
EXECUTE format('REFRESH OBJECT TABLE %s;', full_obj_ident);
COMMIT;
-- 5. Select a document parsing model.
IF parse_document_model IS NULL OR length(trim(parse_document_model)) = 0 THEN
parse_expr := 'ai_parse_document(file, ''auto'', ''markdown'')';
ELSE
parse_expr := format(
'ai_parse_document(%L, file, ''auto'', ''markdown'')',
parse_document_model
);
END IF;
-- 6. Select a chunking model.
IF chunk_model IS NULL OR length(trim(chunk_model)) = 0 THEN
chunk_expr := format('ai_chunk(doc, %s, %s)', chunk_size, chunk_overlap);
ELSE
chunk_expr := format(
'ai_chunk(%L, doc, %s, %s)',
chunk_model,
chunk_size,
chunk_overlap
);
END IF;
-- 7. Select an embedding model.
IF embedding_model IS NULL OR length(trim(embedding_model)) = 0 THEN
embed_expr := 'ai_embed(chunk)';
EXECUTE 'SELECT array_length(ai_embed(''dummy''), 1)'
INTO embedding_dims;
ELSE
embed_expr := format('ai_embed(%L, chunk)', embedding_model);
EXECUTE format(
'SELECT array_length(ai_embed(%L, ''dummy''), 1)',
embedding_model
)
INTO embedding_dims;
END IF;
RAISE NOTICE 'embedding dimension is: %', embedding_dims;
-- 8. Create a dynamic table to store RAG outputs.
RAISE NOTICE 'create dynamic table: %', corpus_name;
EXECUTE format(
$f$
CREATE DYNAMIC TABLE %s(
CHECK(array_ndims(embedding_vector) = 1 AND array_length(embedding_vector, 1) = %s)
)
WITH (
vectors = '{
"embedding_vector": {
"algorithm": "HGraph",
"distance_method": "Cosine",
"builder_params": {
"base_quantization_type": "sq8_uniform",
"max_degree": 64,
"ef_construction": 400,
"precise_quantization_type": "fp32",
"use_reorder": true
}
}
}',
auto_refresh_mode = 'incremental',
freshness = '5 minutes',
auto_refresh_enable = 'false'
) AS
WITH parsed_doc AS (
SELECT object_uri,
etag,
%s AS doc
FROM %s
),
chunked_doc AS (
SELECT object_uri,
etag,
unnest(%s) AS chunk
FROM parsed_doc
)
SELECT
object_uri,
etag,
chunk,
%s AS embedding_vector
FROM chunked_doc;
$f$,
full_corpus_ident,
embedding_dims,
parse_expr,
full_obj_ident,
chunk_expr,
embed_expr
);
COMMIT;
-- 9. Create a full-text index (index name = table name || '_fulltext_idx').
EXECUTE format(
'CREATE INDEX %I ON %s USING FULLTEXT (chunk);',
corpus_name || '_fulltext_idx',
full_corpus_ident
);
RAISE NOTICE '';
RAISE NOTICE 'Create RAG corpus success to table: %', corpus_table;
RAISE NOTICE ' Vector index is: %.embedding_vector', corpus_table;
RAISE NOTICE ' TextSearch index is: %.chunk', corpus_table;
END;
$$ LANGUAGE plpgsql;Refresh the Object Table and Dynamic Table
CREATE OR REPLACE PROCEDURE refresh_rag_corpus_table(
corpus_table TEXT
)
AS $$
DECLARE
corpus_schema TEXT;
corpus_name TEXT;
obj_table_name TEXT;
full_corpus_ident TEXT;
full_obj_ident TEXT;
BEGIN
-- 1. Parse the schema and table name.
IF position('.' in corpus_table) > 0 THEN
corpus_schema := split_part(corpus_table, '.', 1);
corpus_name := split_part(corpus_table, '.', 2);
ELSE
corpus_schema := 'public';
corpus_name := corpus_table;
END IF;
obj_table_name := corpus_name || '_obj_table';
full_corpus_ident := format('%I.%I', corpus_schema, corpus_name);
full_obj_ident := format('%I.%I', corpus_schema, obj_table_name);
-- 2. Refresh the Object Table.
RAISE NOTICE 'Refreshing Object Table: %', obj_table_name;
EXECUTE format('REFRESH OBJECT TABLE %s;', full_obj_ident);
-- 3. Refresh the Dynamic Table.
RAISE NOTICE 'Refreshing Dynamic Table: %', corpus_name;
EXECUTE format('REFRESH TABLE %s;', full_corpus_ident);
RAISE NOTICE 'Refresh complete for corpus table %', corpus_table;
END;
$$ LANGUAGE plpgsql;Drop the Object Table and Dynamic Table
CREATE OR REPLACE PROCEDURE drop_rag_corpus_table(
corpus_table TEXT
)
AS $$
DECLARE
corpus_schema TEXT;
corpus_name TEXT;
obj_table_name TEXT;
full_corpus_ident TEXT;
full_obj_ident TEXT;
rec RECORD;
BEGIN
-- 1. Parse the schema and table name.
IF position('.' in corpus_table) > 0 THEN
corpus_schema := split_part(corpus_table, '.', 1);
corpus_name := split_part(corpus_table, '.', 2);
ELSE
corpus_schema := 'public';
corpus_name := corpus_table;
END IF;
obj_table_name := corpus_name || '_obj_table';
full_corpus_ident := format('%I.%I', corpus_schema, corpus_name);
full_obj_ident := format('%I.%I', corpus_schema, obj_table_name);
-- 2.2 Find and cancel RUNNING refresh tasks.
FOR rec IN
EXECUTE format(
$f$
SELECT query_job_id
FROM hologres.hg_dynamic_table_refresh_log(%L)
WHERE status = 'RUNNING';
$f$,
corpus_table
)
LOOP
RAISE NOTICE 'Found running refresh job: %', rec.query_job_id;
IF hologres.hg_internal_cancel_query_job(rec.query_job_id::bigint) THEN
RAISE NOTICE 'Cancel job % succeeded.', rec.query_job_id;
ELSE
RAISE WARNING 'Cancel job % failed.', rec.query_job_id;
END IF;
END LOOP;
-- 2.3 Drop the Dynamic Table.
RAISE NOTICE 'Dropping Dynamic Table: %', corpus_name;
EXECUTE format('DROP TABLE IF EXISTS %s;', full_corpus_ident);
-- 2.4 Drop the Object Table.
RAISE NOTICE 'Dropping Object Table: %', obj_table_name;
EXECUTE format('DROP OBJECT TABLE IF EXISTS %s;', full_obj_ident);
RAISE NOTICE 'Drop complete for corpus: %', corpus_table;
END;
$$ LANGUAGE plpgsql;Retrieval functions
Vector search function
-- Vector search for Q&A
CREATE OR REPLACE FUNCTION qa_vector_search_retrieval(
question TEXT,
corpus_table TEXT,
embedding_model TEXT DEFAULT NULL,
llm_model TEXT DEFAULT NULL,
ranking_model TEXT DEFAULT NULL,
prompt TEXT DEFAULT 'Please answer the following question in ${language} based on the reference information.\n\n Question: ${question}\n\n Reference information:\n\n ${context}',
language TEXT DEFAULT 'Chinese',
vector_recall_count INT DEFAULT 20,
rerank_recall_count INT DEFAULT 5,
vector_col TEXT DEFAULT 'embedding_vector'
)
RETURNS TEXT AS
$$
DECLARE
final_answer TEXT;
sql TEXT;
embedding_expr TEXT;
ai_rank_expr TEXT;
ai_gen_expr TEXT;
embedding_model_valid BOOLEAN;
llm_model_valid BOOLEAN;
ranking_model_valid BOOLEAN;
BEGIN
embedding_model_valid := (embedding_model IS NOT NULL AND trim(embedding_model) != '');
llm_model_valid := (llm_model IS NOT NULL AND trim(llm_model) != '');
ranking_model_valid := (ranking_model IS NOT NULL AND trim(ranking_model) != '');
IF embedding_model_valid THEN
embedding_expr := 'ai_embed(' || quote_literal(embedding_model) || ', ' || quote_literal(question) || ')';
ELSE
embedding_expr := 'ai_embed(' || quote_literal(question) || ')';
END IF;
IF ranking_model_valid THEN
ai_rank_expr := 'ai_rank(' || quote_literal(ranking_model) || ', ' || quote_literal(question) || ', chunk)';
ELSE
ai_rank_expr := 'ai_rank(' || quote_literal(question) || ', chunk)';
END IF;
IF llm_model_valid THEN
ai_gen_expr := 'ai_gen(' || quote_literal(llm_model) ||
', replace(replace(replace(' || quote_literal(prompt) ||
', ''${question}'', ' || quote_literal(question) || '), ''${context}'', merged_chunks), ''${language}'', ' || quote_literal(language) || ') )';
ELSE
ai_gen_expr := 'ai_gen(replace(replace(replace(' || quote_literal(prompt) ||
', ''${question}'', ' || quote_literal(question) || '), ''${context}'', merged_chunks), ''${language}'', ' || quote_literal(language) || '))';
END IF;
sql := '
WITH
embedding_recall AS (
SELECT
chunk,
approx_cosine_distance(' || vector_col || ', ' || embedding_expr || ') AS distance
FROM
' || corpus_table || '
ORDER BY
distance DESC
LIMIT ' || vector_recall_count || '
),
rerank AS (
SELECT
chunk,
' || ai_rank_expr || ' AS score
FROM
embedding_recall
ORDER BY
score DESC
LIMIT ' || rerank_recall_count || '
),
concat_top_chunks AS (
SELECT string_agg(chunk, E''\n\n----\n\n'') AS merged_chunks FROM rerank
)
SELECT ' || ai_gen_expr || '
FROM concat_top_chunks;
';
EXECUTE sql INTO final_answer;
RETURN final_answer;
END;
$$ LANGUAGE plpgsql;Full-text search function
CREATE OR REPLACE FUNCTION qa_text_search_retrieval(
question TEXT,
corpus_table TEXT,
llm_model TEXT DEFAULT NULL,
ranking_model TEXT DEFAULT NULL,
prompt TEXT DEFAULT 'Please answer the following question in ${language} based on the reference information.\n\n Question: ${question}\n\n Reference information:\n\n ${context}',
language TEXT DEFAULT 'Chinese',
text_search_recall_count INT DEFAULT 20,
rerank_recall_count INT DEFAULT 5,
text_search_col TEXT DEFAULT 'chunk'
)
RETURNS TEXT AS
$$
DECLARE
final_answer TEXT;
sql TEXT;
ai_rank_expr TEXT;
ai_gen_expr TEXT;
llm_model_valid BOOLEAN;
ranking_model_valid BOOLEAN;
BEGIN
llm_model_valid := (llm_model IS NOT NULL AND trim(llm_model) != '');
ranking_model_valid := (ranking_model IS NOT NULL AND trim(ranking_model) != '');
IF ranking_model_valid THEN
ai_rank_expr := 'ai_rank(' || quote_literal(ranking_model) || ', ' || quote_literal(question) || ', chunk)';
ELSE
ai_rank_expr := 'ai_rank(' || quote_literal(question) || ', chunk)';
END IF;
IF llm_model_valid THEN
ai_gen_expr := 'ai_gen(' || quote_literal(llm_model) ||
', replace(replace(replace(' || quote_literal(prompt) ||
', ''${question}'', ' || quote_literal(question) ||
'), ''${context}'', merged_chunks), ''${language}'', ' || quote_literal(language) || ') )';
ELSE
ai_gen_expr := 'ai_gen(replace(replace(replace(' || quote_literal(prompt) ||
', ''${question}'', ' || quote_literal(question) ||
'), ''${context}'', merged_chunks), ''${language}'', ' || quote_literal(language) || '))';
END IF;
sql := '
WITH
text_search_recall AS (
SELECT
chunk
FROM
' || corpus_table || '
ORDER BY
text_search(' || text_search_col || ', ' || quote_literal(question) || ') DESC
LIMIT ' || text_search_recall_count || '
),
rerank AS (
SELECT
chunk,
' || ai_rank_expr || ' AS score
FROM
text_search_recall
ORDER BY
score DESC
LIMIT ' || rerank_recall_count || '
),
concat_top_chunks AS (
SELECT string_agg(chunk, E''\n\n----\n\n'') AS merged_chunks FROM rerank
)
SELECT ' || ai_gen_expr || '
FROM concat_top_chunks;
';
EXECUTE sql INTO final_answer;
RETURN final_answer;
END;
$$ LANGUAGE plpgsql;Hybrid search function with reranking
CREATE OR REPLACE FUNCTION qa_hybrid_retrieval(
question TEXT,
corpus_table TEXT,
embedding_model TEXT DEFAULT NULL,
llm_model TEXT DEFAULT NULL,
ranking_model TEXT DEFAULT NULL,
prompt TEXT DEFAULT 'Please answer the following question in ${language} based on the reference information.\n\n Question: ${question}\n\n Reference information:\n\n ${context}',
language TEXT DEFAULT 'Chinese',
text_search_recall_count INT DEFAULT 20,
vector_recall_count INT DEFAULT 20,
rerank_recall_count INT DEFAULT 5,
vector_col TEXT DEFAULT 'embedding_vector',
text_search_col TEXT DEFAULT 'chunk'
)
RETURNS TEXT AS
$$
DECLARE
final_answer TEXT;
sql TEXT;
embedding_expr TEXT;
ai_rank_expr TEXT;
ai_gen_expr TEXT;
embedding_model_valid BOOLEAN;
llm_model_valid BOOLEAN;
ranking_model_valid BOOLEAN;
BEGIN
embedding_model_valid := (embedding_model IS NOT NULL AND trim(embedding_model) != '');
llm_model_valid := (llm_model IS NOT NULL AND trim(llm_model) != '');
ranking_model_valid := (ranking_model IS NOT NULL AND trim(ranking_model) != '');
IF embedding_model_valid THEN
embedding_expr := 'ai_embed(' || quote_literal(embedding_model) || ', ' || quote_literal(question) || ')';
ELSE
embedding_expr := 'ai_embed(' || quote_literal(question) || ')';
END IF;
IF ranking_model_valid THEN
ai_rank_expr := 'ai_rank(' || quote_literal(ranking_model) || ', ' || quote_literal(question) || ', chunk)';
ELSE
ai_rank_expr := 'ai_rank(' || quote_literal(question) || ', chunk)';
END IF;
IF llm_model_valid THEN
ai_gen_expr := 'ai_gen(' || quote_literal(llm_model) ||
', replace(replace(replace(' || quote_literal(prompt) ||
', ''${question}'', ' || quote_literal(question) || '), ''${context}'', merged_chunks), ''${language}'', ' || quote_literal(language) || ') )';
ELSE
ai_gen_expr := 'ai_gen(replace(replace(replace(' || quote_literal(prompt) ||
', ''${question}'', ' || quote_literal(question) || '), ''${context}'', merged_chunks), ''${language}'', ' || quote_literal(language) || '))';
END IF;
sql := '
WITH
embedding_recall AS (
SELECT
chunk
FROM
' || corpus_table || '
ORDER BY
approx_cosine_distance(' || vector_col || ', ' || embedding_expr || ') DESC
LIMIT ' || vector_recall_count || '
),
text_search_recall AS (
SELECT
chunk
FROM
' || corpus_table || '
ORDER BY
text_search(' || text_search_col || ', ' || quote_literal(question) || ') DESC
LIMIT ' || text_search_recall_count || '
),
union_recall AS (
SELECT chunk FROM embedding_recall
UNION
SELECT chunk FROM text_search_recall
),
rerank AS (
SELECT
chunk,
' || ai_rank_expr || ' AS score
FROM
union_recall
ORDER BY
score DESC
LIMIT ' || rerank_recall_count || '
),
concat_top_chunks AS (
SELECT string_agg(chunk, E''\n\n----\n\n'') AS merged_chunks FROM rerank
)
SELECT ' || ai_gen_expr || '
FROM concat_top_chunks;
';
EXECUTE sql INTO final_answer;
RETURN final_answer;
END;
$$ LANGUAGE plpgsql;Hybrid search function with RRF
CREATE OR REPLACE FUNCTION qa_hybrid_retrieval_rrf(
question TEXT,
corpus_table TEXT,
embedding_model TEXT DEFAULT NULL,
llm_model TEXT DEFAULT NULL,
ranking_model TEXT DEFAULT NULL,
prompt TEXT DEFAULT 'Please answer the following question in ${language} based on the reference information.\n\n Question: ${question}\n\n Reference information:\n\n ${context}',
language TEXT DEFAULT 'Chinese',
text_search_recall_count INT DEFAULT 20,
vector_recall_count INT DEFAULT 20,
rerank_recall_count INT DEFAULT 5,
rrf_k INT DEFAULT 60,
vector_col TEXT DEFAULT 'embedding_vector',
text_search_col TEXT DEFAULT 'chunk'
)
RETURNS TEXT AS
$$
DECLARE
final_answer TEXT;
sql TEXT;
embedding_expr TEXT;
ai_rank_expr TEXT;
ai_gen_expr TEXT;
embedding_model_valid BOOLEAN;
llm_model_valid BOOLEAN;
ranking_model_valid BOOLEAN;
BEGIN
embedding_model_valid := (embedding_model IS NOT NULL AND trim(embedding_model) <> '');
llm_model_valid := (llm_model IS NOT NULL AND trim(llm_model) <> '');
ranking_model_valid := (ranking_model IS NOT NULL AND trim(ranking_model) <> '');
IF embedding_model_valid THEN
embedding_expr := 'ai_embed(' || quote_literal(embedding_model) || ', ' || quote_literal(question) || ')';
ELSE
embedding_expr := 'ai_embed(' || quote_literal(question) || ')';
END IF;
IF ranking_model_valid THEN
ai_rank_expr := 'ai_rank(' || quote_literal(ranking_model) || ', ' || quote_literal(question) || ', chunk)';
ELSE
ai_rank_expr := 'ai_rank(' || quote_literal(question) || ', chunk)';
END IF;
IF llm_model_valid THEN
ai_gen_expr := 'ai_gen(' || quote_literal(llm_model) ||
', replace(replace(replace(' || quote_literal(prompt) ||
', ''${question}'', ' || quote_literal(question) || '), ''${context}'', merged_chunks), ''${language}'', ' || quote_literal(language) || ') )';
ELSE
ai_gen_expr := 'ai_gen(replace(replace(replace(' || quote_literal(prompt) ||
', ''${question}'', ' || quote_literal(question) || '), ''${context}'', merged_chunks), ''${language}'', ' || quote_literal(language) || '))';
END IF;
sql := '
WITH embedding_recall AS (
SELECT
chunk,
vec_score,
ROW_NUMBER() OVER (ORDER BY vec_score DESC) AS rank_vec
FROM (
SELECT
chunk,
approx_cosine_distance(' || vector_col || ', ' || embedding_expr || ') AS vec_score
FROM
' || corpus_table || '
) t
ORDER BY vec_score DESC
LIMIT ' || vector_recall_count || '
),
text_search_recall AS (
SELECT
chunk,
text_score,
ROW_NUMBER() OVER (ORDER BY text_score DESC) AS rank_text
FROM (
SELECT
chunk,
text_search(' || text_search_col || ', ' || quote_literal(question) || ') AS text_score
FROM
' || corpus_table || '
) ts
WHERE text_score > 0
ORDER BY text_score DESC
LIMIT ' || text_search_recall_count || '
),
rrf_scores AS (
SELECT
chunk,
SUM(1.0 / (' || rrf_k || ' + rank_val)) AS rrf_score
FROM (
SELECT chunk, rank_vec AS rank_val FROM embedding_recall
UNION ALL
SELECT chunk, rank_text AS rank_val FROM text_search_recall
) sub
GROUP BY chunk
),
top_chunks AS (
SELECT chunk
FROM rrf_scores
ORDER BY rrf_score DESC
LIMIT ' || rerank_recall_count || '
),
concat_top_chunks AS (
SELECT string_agg(chunk, E''\n\n----\n\n'') AS merged_chunks FROM top_chunks
)
SELECT ' || ai_gen_expr || '
FROM concat_top_chunks;
';
EXECUTE sql INTO final_answer;
RETURN final_answer;
END;
$$ LANGUAGE plpgsql;What's next
Object Table — manage unstructured data from OSS in tabular format
AI functions — full reference for
ai_embed,ai_chunk,ai_gen,ai_rank, and other operatorsDynamic Table — configure incremental refresh modes and freshness settings