本文介紹了如何通過 VECTOR_SEARCH_AGG 函數使用向量搜尋的功能。向量搜尋允許根據給定的高維數字向量以尋找語義上最相似的項。相較於VECTOR_SEARCH,本函數以數組形式返回所有匹配項。
使用限制
Realtime ComputeFlink版VVR引擎11.3及以上版本支援流模式,11.4及以上版本支援批模式。
僅支援 Milvus 作為向量表進行搜尋。
僅支援處理非更新流(只包含INSERT類型訊息的稱為非更新流)。
文法
VECTOR_SEARCH_AGG(
TABLE <SEARCH_TABLE>,
DESCRIPTOR(<COLUMN_TO_SEARCH>),
<COLUMN_TO_QUERY>,
<TOP_K>[,
<CONFIG>]
)入參
參數 | 資料類型 | 說明 |
TABLE <SEARCH_TABLE> | TABLE | 向量表的表名。 |
DESCRIPTOR(<COLUMN_TO_SEARCH>) | DESC | 向量表中建立索引的向量列。輸入資料將會跟指定的向量列比較,並計算相似性。 |
COLUMN_TO_QUERY | ARRAY<FLOAT>/ARRAY<DOUBLE> | 輸入中的向量特徵列(例如使用者上傳的映像/文本的嵌入向量),並與建立索引的向量列進行相似性匹配。 |
TOP_K | INT | 輸出相似資料的最大條數。 |
CONFIG | MAP<STRING,STRING> | 可配置的運行參數。 |
返回
VECTOR_SEARCH_AGG 返回一張只有一條記錄的資料表,表結構為 ROW<ARRAY<ROW>>。
數組列名為 search_results,其中每個 ROW 都包含了向量表中的所有列以及一個額外的附加列 score。該附加列 score 採用 DOUBLE 類型描述了輸入資料與輸出資料的相似性。
運行參數
參數 | 資料類型 | 預設值 | 說明 |
async | Boolean | (none) | 是否啟用 async 模式。如果向量表使用的 connector 不支援指定的模式,引擎會報錯告知使用者。 預設情況下,引擎會根據連接器支援的模式來選擇執行模式。若連接器同時支援非同步和同步模式,引擎優先使用非同步模式提升整體的吞吐。 |
max-concurrent-operations | Integer | 10 | 非同步模式下,最大並行請求數。 |
output-mode | Enum | ORDERED | 非同步作業的輸出模式。 可能的取值:
關於兩個取值的含義,請參考Async I/O。 |
timeout | Duration | 3min | 從首次調用到非同步作業最終完成的逾時時間,可能包含多次重試,並且在發生容錯移轉(failover)時會被重設。 |
樣本
測試資料
假設向量表 vector_table中有如下資料:
id | topic | vector_index |
1 | "BigData" | [1, 1, 0] |
2 | "Streaming" | [-5, -12, -13] |
3 | "Batch" | [5, 12, 13] |
假設查詢資料表 query_table中有如下資料:
id | user_keyword | embedding |
1 | "Spark" | [5, 12, 13] |
2 | "Flink" | [-5, -12, -13] |
測試語句
通過以下 SQL 為 query_table中的每條資料搜尋vector_table擷取相似性前2的資料。
SELECT user_keyword, search_results
FROM
query_table,
LATERAL TABLE (VECTOR_SEARCH_AGG(
SEARCH_TABLE => TABLE vector_table,
COLUMN_TO_SEARCH => DESCRIPTOR(vector_index),
COLUMN_TO_QUERY => query_table.embedding,
TOP_K => 2,
MAP['async', 'false'] -- 開啟同步模式
))輸出結果
user_keyword | search_results |
"Spark" | [(3, "Batch", [5.0, 12.0, 13.0], 1.0), (1, "BigData", [1.0, 1.0, 0.0], 0.6538461538461539)] |
"Flink" | [(2, "Streaming", [-5.0, -12.0, -13.0], 1.0), (1, "BigData", [1.0, 1.0, 0.0], -0.6538461538461539)] |