全部產品
Search
文件中心

Realtime Compute for Apache Flink:VECTOR_SEARCH_AGG

更新時間:Dec 04, 2025

本文介紹了如何通過 VECTOR_SEARCH_AGG 函數使用向量搜尋的功能。向量搜尋允許根據給定的高維數字向量以尋找語義上最相似的項。相較於VECTOR_SEARCH,本函數以數組形式返回所有匹配項。

使用限制

  • Realtime ComputeFlink版VVR引擎11.3及以上版本支援流模式,11.4及以上版本支援批模式。

  • 僅支援 Milvus 作為向量表進行搜尋。

  • 僅支援處理非更新流(只包含INSERT類型訊息的稱為非更新流)。

文法

VECTOR_SEARCH_AGG(
  TABLE <SEARCH_TABLE>,
  DESCRIPTOR(<COLUMN_TO_SEARCH>),
  <COLUMN_TO_QUERY>,
  <TOP_K>[,
  <CONFIG>]
)

入參

參數

資料類型

說明

TABLE <SEARCH_TABLE>

TABLE

向量表的表名。

DESCRIPTOR(<COLUMN_TO_SEARCH>)

DESC

向量表中建立索引的向量列。輸入資料將會跟指定的向量列比較,並計算相似性。

COLUMN_TO_QUERY

ARRAY<FLOAT>/ARRAY<DOUBLE>

輸入中的向量特徵列(例如使用者上傳的映像/文本的嵌入向量),並與建立索引的向量列進行相似性匹配。

TOP_K

INT

輸出相似資料的最大條數。

CONFIG

MAP<STRING,STRING>

可配置的運行參數

返回

VECTOR_SEARCH_AGG 返回一張只有一條記錄的資料表,表結構為 ROW<ARRAY<ROW>>。

數組列名為 search_results,其中每個 ROW 都包含了向量表中的所有列以及一個額外的附加列 score。該附加列 score 採用 DOUBLE 類型描述了輸入資料與輸出資料的相似性。

運行參數

參數

資料類型

預設值

說明

async

Boolean

(none)

是否啟用 async 模式。如果向量表使用的 connector 不支援指定的模式,引擎會報錯告知使用者。

預設情況下,引擎會根據連接器支援的模式來選擇執行模式。若連接器同時支援非同步和同步模式,引擎優先使用非同步模式提升整體的吞吐。

max-concurrent-operations

Integer

10

非同步模式下,最大並行請求數。

output-mode

Enum

ORDERED

非同步作業的輸出模式。

可能的取值:

  • ORDERED

  • ALLOW_UNORDERED

關於兩個取值的含義,請參考Async I/O

timeout

Duration

3min

從首次調用到非同步作業最終完成的逾時時間,可能包含多次重試,並且在發生容錯移轉(failover)時會被重設。

樣本

測試資料

假設向量表 vector_table中有如下資料:

id

topic

vector_index

1

"BigData"

[1, 1, 0]

2

"Streaming"

[-5, -12, -13]

3

"Batch"

[5, 12, 13]

假設查詢資料表 query_table中有如下資料:

id

user_keyword

embedding

1

"Spark"

[5, 12, 13]

2

"Flink"

[-5, -12, -13]

測試語句

通過以下 SQL 為 query_table中的每條資料搜尋vector_table擷取相似性前2的資料。

SELECT user_keyword, search_results
FROM 
  query_table,
  LATERAL TABLE (VECTOR_SEARCH_AGG(
    SEARCH_TABLE => TABLE vector_table, 
    COLUMN_TO_SEARCH => DESCRIPTOR(vector_index), 
    COLUMN_TO_QUERY => query_table.embedding, 
    TOP_K => 2,
    MAP['async', 'false'] -- 開啟同步模式
    ))

輸出結果

user_keyword

search_results

"Spark"

[(3, "Batch", [5.0, 12.0, 13.0], 1.0), (1, "BigData", [1.0, 1.0, 0.0], 0.6538461538461539)]

"Flink"

[(2, "Streaming", [-5.0, -12.0, -13.0], 1.0), (1, "BigData", [1.0, 1.0, 0.0], -0.6538461538461539)]