All Products
Search
Document Center

Hologres:Build a multimodal AI system for financial data analytics and retrieval

Last Updated:Mar 26, 2026

Run SQL. Get AI-powered answers from your PDF documents—no external embeddings service, no dedicated vector store, no ETL pipeline to maintain.

This guide walks through building a Retrieval-Augmented Generation (RAG) system on top of Hologres using a financial dataset: 80 company prospectuses sourced from ModelScope. By the end, you'll have a pipeline that ingests PDFs from Object Storage Service (OSS), chunks and embeds them incrementally, and answers natural language questions using four retrieval strategies.

How it works

image

The pipeline has three stages:

  1. Ingest — An Object Table reads PDF metadata directly from OSS. A Dynamic Table in incremental refresh mode processes new files automatically, applying ai_parse_document, ai_chunk, and ai_embed in sequence. Only new or changed files are processed on each refresh, so you're not re-embedding the entire corpus every time.

  2. Index — The Dynamic Table's sink table holds chunked text and its embedding vector. A vector index (HGraph algorithm) and a full-text index are built on this table at creation time.

  3. Retrieve and answer — At query time, embed the question with ai_embed, run one of the four retrieval functions, rerank or fuse results, then pass the top chunks to ai_gen to generate a final answer.

All of this runs in standard SQL. No Python orchestration layer, no external API calls to a separate AI service.

Key concepts

ConceptWhat it does in this pipeline
Object TableReads unstructured files (PDF, image, PPT) from OSS as a queryable table
Dynamic TableMaterializes query results incrementally; triggers embedding and chunking only on new data
AI functionsSQL operators (ai_embed, ai_chunk, ai_gen, ai_rank, ai_summarize) powered by built-in large language models (LLMs)
Vector searchApproximate nearest neighbor (ANN) search using cosine distance on embedding vectors
Full-text searchInverted index search with tokenization; supports keyword matching and phrase queries

When to use each retrieval mode

Four retrieval functions are available. Choose based on your query characteristics:

ModeBest forNot recommended forFunction
Vector searchSemantic questions, intent matching, queries where exact words don't appear in the documentLookups requiring exact financial figures or codesqa_vector_search_retrieval
Full-text searchExact term lookup, financial figures, company names, specific codesOpen-ended semantic questionsqa_text_search_retrieval
Hybrid with rerankingMixed queries needing both semantic understanding and keyword precision; uses ai_rank to select the single best chunkEnvironments without a fine-tuned reranking modelqa_hybrid_retrieval
Hybrid with RRFSame as hybrid, but uses Reciprocal Rank Fusion (RRF) scoring instead of a model-based reranker; faster and more predictableWhen you have a fine-tuned reranker availableqa_hybrid_retrieval_rrf

For financial document Q&A—where users ask about specific figures but also need contextual analysis—hybrid search typically outperforms either mode alone.

Prerequisites

Before you begin, ensure you have:

Model nameModelPurposevCPUsMemoryGPUReplicas
to_docds4sd/docling-modelsConvert PDF to structured document20100 GB1 card (48 GB)1
chunkrecursive-character-text-splitterChunk documents into segments1530 GB01
pdf_embedBAAI/bge-base-zh-v1.5Generate embedding vectors730 GB1 card (96 GB)1
llmQwen/Qwen3-32BGenerate answers from retrieved context730 GB1 card (96 GB)1
The table above uses the default resource allocations for each model.

Set up the pipeline

Step 1: Upload PDFs to OSS

Log on to the OSS console, create a bucket, and upload the 80 PDF files to your chosen bucket path. For upload instructions, see Simple upload.

Step 2: Grant permissions

Hologres needs a RAM role that can read from your OSS bucket.

For an Alibaba Cloud root account:

  1. Log on to the Resource Access Management (RAM) console and create a RAM role.

  2. Grant the role the AliyunOSSReadOnlyAccess permission.

  3. Update the trust policy so Hologres can assume the role:

{
  "Statement": [
    {
      "Action": "sts:AssumeRole",
      "Effect": "Allow",
      "Principal": {
        "RAM": [
          "acs:ram::1866xxxx:root"
        ],
        "Service": [
          "hologres.aliyuncs.com"
        ]
      }
    }
  ],
  "Version": "1"
}

Set Action to sts:AssumeRole and Service to hologres.aliyuncs.com.

For a RAM user (sub-account):

  1. On the Access Control > Permission Model page, click Create Policy and select Script Editor. Paste the following policy, replacing <RoleARN> with your role's ARN:

{
  "Version": "1",
  "Statement": [
    {
      "Effect": "Allow",
      "Action": "hologram:GrantAssumeRole",
      "Resource": "<RoleARN>"
    }
  ]
}

This policy lets Hologres check whether the RAM user has permission to use the RAM role.

  1. On the Identity Management > User page, click Add Permissions for the target RAM user and attach the policy you created. For details, see Manage permissions for RAM users.

  2. Update the trust policy for the RAM role using the same JSON shown above.

Step 3: Create the Object Table and Dynamic Table

Call create_rag_corpus_from_oss to set up the entire ingestion layer in one step. This stored procedure:

  • Creates an Object Table that reads PDF metadata from your OSS path

  • Creates a Dynamic Table (incremental refresh mode) as the sink for processed chunks

  • Builds a vector index and a full-text index on the sink table

CALL create_rag_corpus_from_oss(
    oss_path => 'oss://xxxx/bs_challenge_financial_14b_dataset/pdf',
    oss_endpoint => 'oss-cn-hangzhou-internal.aliyuncs.com',
    oss_role_arn => 'acs:ram::186xxxx:role/xxxx',
    corpus_table => 'public.dt_bs_challenge_financial'
);

Replace the placeholder values:

ParameterDescriptionExample
oss_pathOSS URI pointing to your PDF foldeross://<bucket>/<path>/pdf
oss_endpointInternal OSS endpoint for your regionoss-cn-hangzhou-internal.aliyuncs.com
oss_role_arnARN of the RAM role you created in Step 2acs:ram::186xxxx:role/xxxx
corpus_tableTarget table name (schema.table format)public.dt_bs_challenge_financial

Step 4: Process the PDFs

After the tables are created, trigger the first data load by calling refresh_rag_corpus_table. This refreshes the Object Table to pick up file metadata, then refreshes the Dynamic Table to parse, chunk, and embed each PDF.

CALL refresh_rag_corpus_table(
    corpus_table => 'public.dt_bs_challenge_financial'
);

Processing 80 PDFs takes time. The Dynamic Table uses incremental refresh, so subsequent calls to refresh_rag_corpus_table only process new or changed files.

Query the corpus

The following examples all use the same question against Goke Microelectronics' prospectus. This lets you compare how each retrieval mode handles the same query.

Question: *By what percentage did the operating income and net profit of Goke Microelectronics increase year-over-year in 2014, 2015, and 2016 during the reporting period?*

Vector search

qa_vector_search_retrieval embeds the question, retrieves the top 20 semantically similar chunks, reranks them with ai_rank, and passes the top 5 to the LLM.

When to use: The query is conceptual or the exact wording differs from the source document. Vector search excels at intent matching.

-- Vector-only retrieval + AI reranking
SELECT qa_vector_search_retrieval(
  question => 'By what percentage did the operating income and net profit of Goke Microelectronics increase year-over-year in 2014, 2015, and 2016 during the reporting period?',
  corpus_table => 'dt_bs_challenge_financial',
  prompt => 'Please analyze whether the following performance trend is pessimistic or optimistic and provide reasons: ${question}\n\n Reference information:\n\n ${context}'
);

Sample output:

"Based on the information provided, the analysis of the performance trend of Goke Microelectronics leads to the following conclusions:

### I. Performance Trend Analysis: Pessimistic

#### 1. **Sluggish revenue growth**
- Revenue in 2014 increased by **15.13%** year-over-year, but in 2015, it **decreased by 5.21%**. Data for 2016 is not provided, but it is clear that the revenue growth trend saw a significant decline in 2015.
- The compound annual growth rate (CAGR) of revenue from 2012 to 2014 was only **4.47%**, indicating slow business expansion.

#### 2. **Continuous decline in net profit growth**
- Net profit grew by **5.43%** in 2014 but **decreased by 3.29%** in 2015.
- After deducting non-recurring gains and losses, the net profit attributable to parent company shareholders decreased by **3.14%** in 2014 and further reduced by **5.60%** in 2015.
- The CAGR of net profit after deducting non-recurring gains and losses from 2012 to 2014 was **-4.38%**—much lower than the revenue growth—indicating that the main business is not profitable and growth relies on non-recurring gains and losses.

...

### III. Conclusion
Overall, the performance trend of Goke Microelectronics is **pessimistic**."

Full-text search

qa_text_search_retrieval retrieves chunks using an inverted index, reranks with ai_rank, and generates an answer.

When to use: The question contains specific financial figures, company names, or terms that appear verbatim in the documents. Full-text search is faster and more precise for exact-term lookups.

-- Full-text search retrieval
SELECT qa_text_search_retrieval(
    question => 'By what percentage did the operating income and net profit of Goke Microelectronics increase year-over-year in 2014, 2015, and 2016 during the reporting period?',
    corpus_table => 'dt_bs_challenge_financial',
    prompt => 'Please analyze whether the following performance trend is pessimistic or optimistic and provide reasons: ${question}\n\n Reference information:\n\n ${context}'
);

Sample output:

"Based on the information provided, the overall performance trend of Goke Microelectronics in 2014, 2015, and 2016 is **pessimistic**.

### 1. **Sluggish revenue growth**
- Revenue growth rate in 2014 was **15.13%**, but turned to **-5.21%** in 2015.
- The CAGR of revenue from 2012 to 2014 was only **4.47%**.

### 2. **Poor growth in net profit**
- Net profit growth rate in 2014 was **5.43%**, decreased to **-3.29%** in 2015.
- Net profit after deducting non-recurring gains and losses: **-3.14%** in 2014, **-5.60%** in 2015.
- CAGR of net profit excluding non-recurring items from 2012 to 2014 was **-4.38%**.

..."

Hybrid search with reranking

qa_hybrid_retrieval runs vector search and full-text search in parallel (top 20 each), unions the results, reranks all candidates with ai_rank, and selects the top chunk to send to the LLM.

When to use: Queries mix semantic intent with specific terminology. For financial document Q&A, this mode typically produces richer, more structured analysis than either single-mode approach.

-- Full-text and vector dual-retrieval + AI reranking
SELECT qa_hybrid_retrieval(
    question => 'By what percentage did the operating revenue and net profit of Hunan Goke Microelectronics Co., Ltd. increase year-over-year in 2014, 2015, and 2016 during the reporting period?',
    corpus_table => 'dt_bs_challenge_financial',
    prompt => 'Please analyze whether the following performance trend is pessimistic or optimistic and provide reasons: ${question}\n\n Reference information:\n\n ${context}'
);

Sample output (the hybrid mode produced a more structured, multi-dimensional analysis):

"Based on the information provided, we can analyze the performance trend of Goke Microelectronics as follows:

### I. **Operating revenue trend analysis**
1. **2012–2014 CAGR**: 4.47%, with operating income of CNY 181.5406 million in 2014—an increase of 15.13% from 2013.
2. In 2015, operating income **decreased by 5.21%** year-over-year.

### II. **Net profit trend analysis**
1. CAGR of net profit after deducting non-recurring gains and losses: **-4.38%** from 2012 to 2014.
2. Non-recurring gains and losses as a proportion of net profit: **17.54%** (2014), **10.25%** (2013), **8.06%** (2012).

...

### **Final conclusion: overall trend is pessimistic**
The continuous decline in net profit excluding non-recurring items, reliance on non-recurring gains and losses, and large short-term performance fluctuations indicate that performance growth lacks sustainability."

Hybrid search with RRF

qa_hybrid_retrieval_rrf uses Reciprocal Rank Fusion (RRF) instead of a model-based reranker to combine vector and full-text results.

When to use: Use RRF when you want deterministic, model-free fusion. Use qa_hybrid_retrieval when you have a fine-tuned reranking model available.

How RRF scoring works: Each chunk receives a score based on its rank position in both the vector and full-text result lists. A chunk ranked 3rd in vector search and 9th in full-text search gets a score of 1/(60+3) + 1/(60+9) ≈ 0.0159 + 0.0145 = 0.0304. The constant 60 is the rrf_k parameter. A higher rrf_k reduces the weight difference between rank positions, making scores more uniform across the list. The default value of 60 is a standard choice that balances rank position sensitivity without over-weighting the top result.

-- Full-text and vector dual-retrieval + RRF reranking
SELECT qa_hybrid_retrieval_rrf(
    question => 'By what percentage did the operating income and net profit of Goke Microelectronics increase year-over-year in 2014, 2015, and 2016 during the reporting period?',
    corpus_table => 'dt_bs_challenge_financial',
    prompt => 'Please analyze whether the following performance trend is pessimistic or optimistic and provide reasons: ${question}\n\n Reference information:\n\n ${context}'
);

Sample output:

"Based on the information provided, the analysis of the performance trend of Goke Microelectronics leads to the following conclusions:

### **Performance trend judgment: Pessimistic**

1. **Net profit growth is lower than revenue growth:** CAGR of operating income from 2012 to 2014 was 4.47%, but CAGR of net profit after deducting non-recurring gains and losses was **-4.38%**—significantly lower.

2. **Significant fluctuations in net profit:** Net profit from January to March 2015 decreased by 48.26% year-over-year, mainly due to lower gross profit margins on recognized projects.

3. **Projected profit decline in H1 2015:** Estimated net profit of CNY 23.40M–28.60M, lower than H1 2014's CNY 29.1266M.

### **Summary:**
Although operating revenue maintained stable growth, net profit growth significantly lagged or turned negative. If the company cannot increase gross profit margins or control costs, future performance may continue under pressure."

Appendix: Stored procedure and function definitions

Hologres does not support user-created functions. The definitions below are provided for reference only. Do not modify or execute them directly.

PDF processing stored procedures

Create the Object Table and Dynamic Table

CREATE OR REPLACE PROCEDURE create_rag_corpus_from_oss(
    oss_path TEXT,
    oss_endpoint TEXT,
    oss_role_arn TEXT,
    corpus_table TEXT,
    embedding_model TEXT DEFAULT NULL,
    parse_document_model TEXT DEFAULT NULL,
    chunk_model TEXT DEFAULT NULL,
    chunk_size INT DEFAULT 300,
    chunk_overlap INT DEFAULT 50,
    overwrite BOOLEAN DEFAULT FALSE
)
AS $$
DECLARE
    corpus_schema TEXT;
    corpus_name TEXT;
    obj_table_name TEXT;
    full_corpus_ident TEXT;
    full_obj_ident TEXT;
    embed_expr TEXT;
    chunk_expr TEXT;
    parse_expr TEXT;
    embedding_dims INT;
BEGIN
    -- 1. Split the schema name and table name.
    IF position('.' in corpus_table) > 0 THEN
        corpus_schema := split_part(corpus_table, '.', 1);
        corpus_name   := split_part(corpus_table, '.', 2);
    ELSE
        corpus_schema := 'public';
        corpus_name   := corpus_table;
    END IF;

    obj_table_name := corpus_name || '_obj_table';

    full_corpus_ident := format('%I.%I', corpus_schema, corpus_name);
    full_obj_ident    := format('%I.%I', corpus_schema, obj_table_name);

    -- 2. If overwrite is needed, drop the table and index first.
    IF overwrite THEN
        DECLARE
            dyn_table_exists BOOLEAN;
            rec RECORD;
        BEGIN
            -- Check if the dynamic table exists.
            SELECT EXISTS (
                SELECT 1
                FROM pg_class c
                JOIN pg_namespace n ON n.oid = c.relnamespace
                WHERE c.relname = corpus_name
                AND n.nspname = corpus_schema
            )
            INTO dyn_table_exists;

            IF dyn_table_exists THEN
                -- 2.2 Find and cancel RUNNING refresh tasks.
                FOR rec IN
                    EXECUTE format(
                        $f$
                        SELECT query_job_id
                            FROM hologres.hg_dynamic_table_refresh_log(%L)
                            WHERE status = 'RUNNING';
                        $f$,
                        corpus_table
                    )
                LOOP
                    RAISE NOTICE 'Found running refresh job: %', rec.query_job_id;
                    IF hologres.hg_internal_cancel_query_job(rec.query_job_id::bigint) THEN
                        RAISE NOTICE 'Cancel job % succeeded.', rec.query_job_id;
                    ELSE
                        RAISE WARNING 'Cancel job % failed.', rec.query_job_id;
                    END IF;
                END LOOP;

                -- 2.3 Drop the Dynamic Table.
                EXECUTE format('DROP TABLE IF EXISTS %s;', full_corpus_ident);
            ELSE
                RAISE NOTICE 'Dynamic table % does not exist, skip cancel job and drop.', full_corpus_ident;
            END IF;

            -- 2.4 In any case, the Object Table must be dropped.
            EXECUTE format('DROP OBJECT TABLE IF EXISTS %s;', full_obj_ident);
        END;
    END IF;

    -- 3. Create an Object Table.
    RAISE NOTICE 'Create object table: %', obj_table_name;
    EXECUTE format(
        $f$
        CREATE OBJECT TABLE %s
        WITH (
            path = %L,
            oss_endpoint = %L,
            role_arn = %L
        );
        $f$,
        full_obj_ident,
        oss_path,
        oss_endpoint,
        oss_role_arn
    );

    COMMIT;

    -- 4. Refresh the Object Table.
    RAISE NOTICE 'Refresh object table: %', obj_table_name;
    EXECUTE format('REFRESH OBJECT TABLE %s;', full_obj_ident);

    COMMIT;

    -- 5. Select a document parsing model.
    IF parse_document_model IS NULL OR length(trim(parse_document_model)) = 0 THEN
        parse_expr := 'ai_parse_document(file, ''auto'', ''markdown'')';
    ELSE
        parse_expr := format(
            'ai_parse_document(%L, file, ''auto'', ''markdown'')',
            parse_document_model
        );
    END IF;

    -- 6. Select a chunking model.
    IF chunk_model IS NULL OR length(trim(chunk_model)) = 0 THEN
        chunk_expr := format('ai_chunk(doc, %s, %s)', chunk_size, chunk_overlap);
    ELSE
        chunk_expr := format(
            'ai_chunk(%L, doc, %s, %s)',
            chunk_model,
            chunk_size,
            chunk_overlap
        );
    END IF;

    -- 7. Select an embedding model.
    IF embedding_model IS NULL OR length(trim(embedding_model)) = 0 THEN
        embed_expr := 'ai_embed(chunk)';

        EXECUTE 'SELECT array_length(ai_embed(''dummy''), 1)'
        INTO embedding_dims;
    ELSE
        embed_expr := format('ai_embed(%L, chunk)', embedding_model);

        EXECUTE format(
            'SELECT array_length(ai_embed(%L, ''dummy''), 1)',
            embedding_model
        )
        INTO embedding_dims;
    END IF;

    RAISE NOTICE 'embedding dimension is: %', embedding_dims;

    -- 8. Create a dynamic table to store RAG outputs.
    RAISE NOTICE 'create dynamic table: %', corpus_name;
    EXECUTE format(
        $f$
        CREATE DYNAMIC TABLE %s(
            CHECK(array_ndims(embedding_vector) = 1 AND array_length(embedding_vector, 1) = %s)
        )
        WITH (
            vectors = '{
              "embedding_vector": {
                "algorithm": "HGraph",
                "distance_method": "Cosine",
                "builder_params": {
                "base_quantization_type": "sq8_uniform",
                "max_degree": 64,
                "ef_construction": 400,
                "precise_quantization_type": "fp32",
                "use_reorder": true
                }
              }
            }',
            auto_refresh_mode = 'incremental',
            freshness = '5 minutes',
            auto_refresh_enable = 'false'
        ) AS
        WITH parsed_doc AS (
            SELECT object_uri,
                   etag,
                   %s AS doc
              FROM %s
        ),
        chunked_doc AS (
            SELECT object_uri,
                   etag,
                   unnest(%s) AS chunk
              FROM parsed_doc
        )
        SELECT
            object_uri,
            etag,
            chunk,
            %s AS embedding_vector
          FROM chunked_doc;
        $f$,
        full_corpus_ident,
        embedding_dims,
        parse_expr,
        full_obj_ident,
        chunk_expr,
        embed_expr
    );
    COMMIT;

    -- 9. Create a full-text index (index name = table name || '_fulltext_idx').
    EXECUTE format(
        'CREATE INDEX %I ON %s USING FULLTEXT (chunk);',
        corpus_name || '_fulltext_idx',
        full_corpus_ident
    );

    RAISE NOTICE '';
    RAISE NOTICE 'Create RAG corpus success to table: %', corpus_table;
    RAISE NOTICE '    Vector index is: %.embedding_vector', corpus_table;
    RAISE NOTICE '    TextSearch index is: %.chunk', corpus_table;
END;
$$ LANGUAGE plpgsql;

Refresh the Object Table and Dynamic Table

CREATE OR REPLACE PROCEDURE refresh_rag_corpus_table(
    corpus_table TEXT
)
AS $$
DECLARE
    corpus_schema TEXT;
    corpus_name   TEXT;
    obj_table_name TEXT;
    full_corpus_ident TEXT;
    full_obj_ident    TEXT;
BEGIN
    -- 1. Parse the schema and table name.
    IF position('.' in corpus_table) > 0 THEN
        corpus_schema := split_part(corpus_table, '.', 1);
        corpus_name   := split_part(corpus_table, '.', 2);
    ELSE
        corpus_schema := 'public';
        corpus_name   := corpus_table;
    END IF;

    obj_table_name := corpus_name || '_obj_table';

    full_corpus_ident := format('%I.%I', corpus_schema, corpus_name);
    full_obj_ident    := format('%I.%I', corpus_schema, obj_table_name);

    -- 2. Refresh the Object Table.
    RAISE NOTICE 'Refreshing Object Table: %', obj_table_name;
    EXECUTE format('REFRESH OBJECT TABLE %s;', full_obj_ident);

    -- 3. Refresh the Dynamic Table.
    RAISE NOTICE 'Refreshing Dynamic Table: %', corpus_name;
    EXECUTE format('REFRESH TABLE %s;', full_corpus_ident);

    RAISE NOTICE 'Refresh complete for corpus table %', corpus_table;
END;
$$ LANGUAGE plpgsql;

Drop the Object Table and Dynamic Table

CREATE OR REPLACE PROCEDURE drop_rag_corpus_table(
    corpus_table TEXT
)
AS $$
DECLARE
    corpus_schema TEXT;
    corpus_name   TEXT;
    obj_table_name TEXT;
    full_corpus_ident TEXT;
    full_obj_ident    TEXT;
    rec RECORD;
BEGIN
    -- 1. Parse the schema and table name.
    IF position('.' in corpus_table) > 0 THEN
        corpus_schema := split_part(corpus_table, '.', 1);
        corpus_name   := split_part(corpus_table, '.', 2);
    ELSE
        corpus_schema := 'public';
        corpus_name   := corpus_table;
    END IF;

    obj_table_name := corpus_name || '_obj_table';

    full_corpus_ident := format('%I.%I', corpus_schema, corpus_name);
    full_obj_ident    := format('%I.%I', corpus_schema, obj_table_name);

    -- 2.2 Find and cancel RUNNING refresh tasks.
    FOR rec IN
        EXECUTE format(
            $f$
            SELECT query_job_id
                FROM hologres.hg_dynamic_table_refresh_log(%L)
                WHERE status = 'RUNNING';
            $f$,
            corpus_table
        )
    LOOP
        RAISE NOTICE 'Found running refresh job: %', rec.query_job_id;
        IF hologres.hg_internal_cancel_query_job(rec.query_job_id::bigint) THEN
            RAISE NOTICE 'Cancel job % succeeded.', rec.query_job_id;
        ELSE
            RAISE WARNING 'Cancel job % failed.', rec.query_job_id;
        END IF;
    END LOOP;

    -- 2.3 Drop the Dynamic Table.
    RAISE NOTICE 'Dropping Dynamic Table: %', corpus_name;
    EXECUTE format('DROP TABLE IF EXISTS %s;', full_corpus_ident);

    -- 2.4 Drop the Object Table.
    RAISE NOTICE 'Dropping Object Table: %', obj_table_name;
    EXECUTE format('DROP OBJECT TABLE IF EXISTS %s;', full_obj_ident);

    RAISE NOTICE 'Drop complete for corpus: %', corpus_table;
END;
$$ LANGUAGE plpgsql;

Retrieval functions

Vector search function

-- Vector search for Q&A
CREATE OR REPLACE FUNCTION qa_vector_search_retrieval(
    question TEXT,
    corpus_table TEXT,
    embedding_model TEXT DEFAULT NULL,
    llm_model TEXT DEFAULT NULL,
    ranking_model TEXT DEFAULT NULL,
    prompt TEXT DEFAULT 'Please answer the following question in ${language} based on the reference information.\n\n Question: ${question}\n\n Reference information:\n\n ${context}',
    language TEXT DEFAULT 'Chinese',
    vector_recall_count INT DEFAULT 20,
    rerank_recall_count INT DEFAULT 5,
    vector_col TEXT DEFAULT 'embedding_vector'
)
RETURNS TEXT AS
$$
DECLARE
    final_answer TEXT;
    sql TEXT;
    embedding_expr TEXT;
    ai_rank_expr TEXT;
    ai_gen_expr TEXT;
    embedding_model_valid BOOLEAN;
    llm_model_valid BOOLEAN;
    ranking_model_valid BOOLEAN;
BEGIN
    embedding_model_valid := (embedding_model IS NOT NULL AND trim(embedding_model) != '');
    llm_model_valid := (llm_model IS NOT NULL AND trim(llm_model) != '');
    ranking_model_valid := (ranking_model IS NOT NULL AND trim(ranking_model) != '');

    IF embedding_model_valid THEN
        embedding_expr := 'ai_embed(' || quote_literal(embedding_model) || ', ' || quote_literal(question) || ')';
    ELSE
        embedding_expr := 'ai_embed(' || quote_literal(question) || ')';
    END IF;

    IF ranking_model_valid THEN
        ai_rank_expr := 'ai_rank(' || quote_literal(ranking_model) || ', ' || quote_literal(question) || ', chunk)';
    ELSE
        ai_rank_expr := 'ai_rank(' || quote_literal(question) || ', chunk)';
    END IF;

    IF llm_model_valid THEN
        ai_gen_expr := 'ai_gen(' || quote_literal(llm_model) ||
            ', replace(replace(replace(' || quote_literal(prompt) ||
               ', ''${question}'', ' || quote_literal(question) || '), ''${context}'', merged_chunks), ''${language}'', ' || quote_literal(language) || ') )';
    ELSE
        ai_gen_expr := 'ai_gen(replace(replace(replace(' || quote_literal(prompt) ||
               ', ''${question}'', ' || quote_literal(question) || '), ''${context}'', merged_chunks), ''${language}'', ' || quote_literal(language) || '))';
    END IF;

    sql := '
      WITH
        embedding_recall AS (
            SELECT
              chunk,
              approx_cosine_distance(' || vector_col || ', ' || embedding_expr || ') AS distance
            FROM
              ' || corpus_table || '
            ORDER BY
              distance DESC
            LIMIT ' || vector_recall_count || '
        ),
        rerank AS (
            SELECT
              chunk,
              ' || ai_rank_expr || ' AS score
            FROM
              embedding_recall
            ORDER BY
              score DESC
            LIMIT ' || rerank_recall_count || '
        ),
        concat_top_chunks AS (
            SELECT string_agg(chunk, E''\n\n----\n\n'') AS merged_chunks FROM rerank
        )
      SELECT ' || ai_gen_expr || '
      FROM concat_top_chunks;
    ';

    EXECUTE sql INTO final_answer;
    RETURN final_answer;
END;
$$ LANGUAGE plpgsql;

Full-text search function

CREATE OR REPLACE FUNCTION qa_text_search_retrieval(
    question TEXT,
    corpus_table TEXT,
    llm_model TEXT DEFAULT NULL,
    ranking_model TEXT DEFAULT NULL,
    prompt TEXT DEFAULT 'Please answer the following question in ${language} based on the reference information.\n\n Question: ${question}\n\n Reference information:\n\n ${context}',
    language TEXT DEFAULT 'Chinese',
    text_search_recall_count INT DEFAULT 20,
    rerank_recall_count INT DEFAULT 5,
    text_search_col TEXT DEFAULT 'chunk'
)
RETURNS TEXT AS
$$
DECLARE
    final_answer TEXT;
    sql TEXT;
    ai_rank_expr TEXT;
    ai_gen_expr TEXT;
    llm_model_valid BOOLEAN;
    ranking_model_valid BOOLEAN;
BEGIN
    llm_model_valid     := (llm_model IS NOT NULL AND trim(llm_model) != '');
    ranking_model_valid := (ranking_model IS NOT NULL AND trim(ranking_model) != '');

    IF ranking_model_valid THEN
        ai_rank_expr := 'ai_rank(' || quote_literal(ranking_model) || ', ' || quote_literal(question) || ', chunk)';
    ELSE
        ai_rank_expr := 'ai_rank(' || quote_literal(question) || ', chunk)';
    END IF;

    IF llm_model_valid THEN
        ai_gen_expr := 'ai_gen(' || quote_literal(llm_model) ||
            ', replace(replace(replace(' || quote_literal(prompt) ||
               ', ''${question}'', ' || quote_literal(question) ||
               '), ''${context}'', merged_chunks), ''${language}'', ' || quote_literal(language) || ') )';
    ELSE
        ai_gen_expr := 'ai_gen(replace(replace(replace(' || quote_literal(prompt) ||
               ', ''${question}'', ' || quote_literal(question) ||
               '), ''${context}'', merged_chunks), ''${language}'', ' || quote_literal(language) || '))';
    END IF;

    sql := '
      WITH
        text_search_recall AS (
            SELECT
              chunk
            FROM
              ' || corpus_table || '
            ORDER BY
              text_search(' || text_search_col || ', ' || quote_literal(question) || ') DESC
            LIMIT ' || text_search_recall_count || '
        ),
        rerank AS (
            SELECT
              chunk,
              ' || ai_rank_expr || ' AS score
            FROM
              text_search_recall
            ORDER BY
              score DESC
            LIMIT ' || rerank_recall_count || '
        ),
        concat_top_chunks AS (
            SELECT string_agg(chunk, E''\n\n----\n\n'') AS merged_chunks FROM rerank
        )
      SELECT ' || ai_gen_expr || '
      FROM concat_top_chunks;
    ';

    EXECUTE sql INTO final_answer;
    RETURN final_answer;
END;
$$ LANGUAGE plpgsql;

Hybrid search function with reranking

CREATE OR REPLACE FUNCTION qa_hybrid_retrieval(
    question TEXT,
    corpus_table TEXT,
    embedding_model TEXT DEFAULT NULL,
    llm_model TEXT DEFAULT NULL,
    ranking_model TEXT DEFAULT NULL,
    prompt TEXT DEFAULT 'Please answer the following question in ${language} based on the reference information.\n\n Question: ${question}\n\n Reference information:\n\n ${context}',
    language TEXT DEFAULT 'Chinese',
    text_search_recall_count INT DEFAULT 20,
    vector_recall_count INT DEFAULT 20,
    rerank_recall_count INT DEFAULT 5,
    vector_col TEXT DEFAULT 'embedding_vector',
    text_search_col TEXT DEFAULT 'chunk'
)
RETURNS TEXT AS
$$
DECLARE
    final_answer TEXT;
    sql TEXT;
    embedding_expr TEXT;
    ai_rank_expr TEXT;
    ai_gen_expr TEXT;
    embedding_model_valid BOOLEAN;
    llm_model_valid BOOLEAN;
    ranking_model_valid BOOLEAN;
BEGIN
    embedding_model_valid := (embedding_model IS NOT NULL AND trim(embedding_model) != '');
    llm_model_valid       := (llm_model IS NOT NULL AND trim(llm_model) != '');
    ranking_model_valid   := (ranking_model IS NOT NULL AND trim(ranking_model) != '');

    IF embedding_model_valid THEN
        embedding_expr := 'ai_embed(' || quote_literal(embedding_model) || ', ' || quote_literal(question) || ')';
    ELSE
        embedding_expr := 'ai_embed(' || quote_literal(question) || ')';
    END IF;

    IF ranking_model_valid THEN
        ai_rank_expr := 'ai_rank(' || quote_literal(ranking_model) || ', ' || quote_literal(question) || ', chunk)';
    ELSE
        ai_rank_expr := 'ai_rank(' || quote_literal(question) || ', chunk)';
    END IF;

    IF llm_model_valid THEN
        ai_gen_expr := 'ai_gen(' || quote_literal(llm_model) ||
            ', replace(replace(replace(' || quote_literal(prompt) ||
               ', ''${question}'', ' || quote_literal(question) || '), ''${context}'', merged_chunks), ''${language}'', ' || quote_literal(language) || ') )';
    ELSE
        ai_gen_expr := 'ai_gen(replace(replace(replace(' || quote_literal(prompt) ||
               ', ''${question}'', ' || quote_literal(question) || '), ''${context}'', merged_chunks), ''${language}'', ' || quote_literal(language) || '))';
    END IF;

    sql := '
      WITH
        embedding_recall AS (
            SELECT
              chunk
            FROM
              ' || corpus_table || '
            ORDER BY
              approx_cosine_distance(' || vector_col || ', ' || embedding_expr || ') DESC
            LIMIT ' || vector_recall_count || '
        ),
        text_search_recall AS (
            SELECT
              chunk
            FROM
              ' || corpus_table || '
            ORDER BY
              text_search(' || text_search_col || ', ' || quote_literal(question) || ') DESC
            LIMIT ' || text_search_recall_count || '
        ),
        union_recall AS (
            SELECT chunk FROM embedding_recall
            UNION
            SELECT chunk FROM text_search_recall
        ),
        rerank AS (
            SELECT
              chunk,
              ' || ai_rank_expr || ' AS score
            FROM
              union_recall
            ORDER BY
              score DESC
            LIMIT ' || rerank_recall_count || '
        ),
        concat_top_chunks AS (
            SELECT string_agg(chunk, E''\n\n----\n\n'') AS merged_chunks FROM rerank
        )
      SELECT ' || ai_gen_expr || '
      FROM concat_top_chunks;
    ';

    EXECUTE sql INTO final_answer;
    RETURN final_answer;
END;
$$ LANGUAGE plpgsql;

Hybrid search function with RRF

CREATE OR REPLACE FUNCTION qa_hybrid_retrieval_rrf(
    question TEXT,
    corpus_table TEXT,
    embedding_model TEXT DEFAULT NULL,
    llm_model TEXT DEFAULT NULL,
    ranking_model TEXT DEFAULT NULL,
    prompt TEXT DEFAULT 'Please answer the following question in ${language} based on the reference information.\n\n Question: ${question}\n\n Reference information:\n\n ${context}',
    language TEXT DEFAULT 'Chinese',
    text_search_recall_count INT DEFAULT 20,
    vector_recall_count INT DEFAULT 20,
    rerank_recall_count INT DEFAULT 5,
    rrf_k INT DEFAULT 60,
    vector_col TEXT DEFAULT 'embedding_vector',
    text_search_col TEXT DEFAULT 'chunk'
)
RETURNS TEXT AS
$$
DECLARE
    final_answer TEXT;
    sql TEXT;
    embedding_expr TEXT;
    ai_rank_expr TEXT;
    ai_gen_expr TEXT;
    embedding_model_valid BOOLEAN;
    llm_model_valid BOOLEAN;
    ranking_model_valid BOOLEAN;
BEGIN
    embedding_model_valid := (embedding_model IS NOT NULL AND trim(embedding_model) <> '');
    llm_model_valid       := (llm_model IS NOT NULL AND trim(llm_model) <> '');
    ranking_model_valid   := (ranking_model IS NOT NULL AND trim(ranking_model) <> '');

    IF embedding_model_valid THEN
        embedding_expr := 'ai_embed(' || quote_literal(embedding_model) || ', ' || quote_literal(question) || ')';
    ELSE
        embedding_expr := 'ai_embed(' || quote_literal(question) || ')';
    END IF;

    IF ranking_model_valid THEN
        ai_rank_expr := 'ai_rank(' || quote_literal(ranking_model) || ', ' || quote_literal(question) || ', chunk)';
    ELSE
        ai_rank_expr := 'ai_rank(' || quote_literal(question) || ', chunk)';
    END IF;

    IF llm_model_valid THEN
        ai_gen_expr := 'ai_gen(' || quote_literal(llm_model) ||
            ', replace(replace(replace(' || quote_literal(prompt) ||
                ', ''${question}'', ' || quote_literal(question) || '), ''${context}'', merged_chunks), ''${language}'', ' || quote_literal(language) || ') )';
    ELSE
        ai_gen_expr := 'ai_gen(replace(replace(replace(' || quote_literal(prompt) ||
                ', ''${question}'', ' || quote_literal(question) || '), ''${context}'', merged_chunks), ''${language}'', ' || quote_literal(language) || '))';
    END IF;

    sql := '
      WITH embedding_recall AS (
        SELECT
          chunk,
          vec_score,
          ROW_NUMBER() OVER (ORDER BY vec_score DESC) AS rank_vec
        FROM (
          SELECT
            chunk,
            approx_cosine_distance(' || vector_col || ', ' || embedding_expr || ') AS vec_score
          FROM
            ' || corpus_table || '
        ) t
        ORDER BY vec_score DESC
        LIMIT ' || vector_recall_count || '
      ),
      text_search_recall AS (
        SELECT
          chunk,
          text_score,
          ROW_NUMBER() OVER (ORDER BY text_score DESC) AS rank_text
        FROM (
          SELECT
            chunk,
            text_search(' || text_search_col || ', ' || quote_literal(question) || ') AS text_score
          FROM
            ' || corpus_table || '
        ) ts
        WHERE text_score > 0
        ORDER BY text_score DESC
        LIMIT ' || text_search_recall_count || '
      ),
      rrf_scores AS (
        SELECT
          chunk,
          SUM(1.0 / (' || rrf_k || ' + rank_val)) AS rrf_score
        FROM (
          SELECT chunk, rank_vec AS rank_val FROM embedding_recall
          UNION ALL
          SELECT chunk, rank_text AS rank_val FROM text_search_recall
        ) sub
        GROUP BY chunk
      ),
      top_chunks AS (
        SELECT chunk
        FROM rrf_scores
        ORDER BY rrf_score DESC
        LIMIT ' || rerank_recall_count || '
      ),
      concat_top_chunks AS (
        SELECT string_agg(chunk, E''\n\n----\n\n'') AS merged_chunks FROM top_chunks
      )
      SELECT ' || ai_gen_expr || '
      FROM concat_top_chunks;
    ';

    EXECUTE sql INTO final_answer;
    RETURN final_answer;
END;
$$ LANGUAGE plpgsql;

What's next

  • Object Table — manage unstructured data from OSS in tabular format

  • AI functions — full reference for ai_embed, ai_chunk, ai_gen, ai_rank, and other operators

  • Dynamic Table — configure incremental refresh modes and freshness settings