VECTOR_SEARCH_AGG searches a Milvus vector table for the top-K most similar records and returns all matches as a single array. Unlike VECTOR_SEARCH, VECTOR_SEARCH_AGG returns one row per input record, with all matches packed into an ARRAY<ROW> column named search_results.
Limitations
| Constraint | Details |
|---|---|
| Ververica Runtime (VVR) version | Stream mode: VVR 11.3+. Batch mode: VVR 11.4+. |
| Supported vector table | Milvus only |
| Stream type | Non-updating streams (INSERT messages only) |
Syntax
VECTOR_SEARCH_AGG(
TABLE <SEARCH_TABLE>,
DESCRIPTOR(<COLUMN_TO_SEARCH>),
<COLUMN_TO_QUERY>,
<TOP_K>[,
<CONFIG>]
)Input parameters
| Parameter | Data type | Required | Description |
|---|---|---|---|
TABLE <SEARCH_TABLE> | TABLE | Yes | The name of the Milvus vector table to search. |
DESCRIPTOR(<COLUMN_TO_SEARCH>) | DESC | Yes | The indexed vector column in the vector table. Input vectors are compared against this column to compute similarity. |
COLUMN_TO_QUERY | ARRAY<FLOAT> or ARRAY<DOUBLE> | Yes | The vector feature column from the input data, such as an image or text embedding. Each value is matched against the indexed column. |
TOP_K | INT | Yes | The maximum number of similar records to return per input row. |
CONFIG | MAP<STRING,STRING> | No | Runtime parameters for tuning execution behavior. |
Return value
VECTOR_SEARCH_AGG returns a table with one row per input record. The schema is ROW<ARRAY<ROW>>.
The array column is named search_results. Each element in the array is a ROW that includes all columns from the vector table, plus an additional score column.
About the `score` column:
Data type: DOUBLE
Indicates the similarity between the input data and the output data
Runtime parameters
Pass runtime parameters as a MAP<STRING,STRING> in the CONFIG argument.
| Parameter | Data type | Default | Description |
|---|---|---|---|
async | Boolean | (none) | Whether to use asynchronous mode. If the connector does not support the specified mode, the engine reports an error. By default, the engine selects the mode the connector supports, preferring asynchronous mode when both are available. |
max-concurrent-operations | Integer | 10 | Maximum number of in-flight requests in asynchronous mode. |
output-mode | Enum | ORDERED | Output ordering for asynchronous operations. Valid values: ORDERED, ALLOW_UNORDERED. For details, see Async I/O. |
timeout | Duration | 3 min | Timeout for a single asynchronous operation, measured from the first call to final completion. Covers multiple retries and resets on failover. |
Example
The following example searches vector_table for each record in query_table and retrieves the top 2 most similar entries. Synchronous mode is enabled explicitly via the async runtime parameter.
Test data
vector_table:
| id | topic | vector_index |
|---|---|---|
| 1 | "BigData" | [1, 1, 0] |
| 2 | "Streaming" | [-5, -12, -13] |
| 3 | "Batch" | [5, 12, 13] |
query_table:
| id | user_keyword | embedding |
|---|---|---|
| 1 | "Spark" | [5, 12, 13] |
| 2 | "Flink" | [-5, -12, -13] |
SQL
SELECT user_keyword, search_results
FROM
query_table,
LATERAL TABLE (VECTOR_SEARCH_AGG(
SEARCH_TABLE => TABLE vector_table,
COLUMN_TO_SEARCH => DESCRIPTOR(vector_index),
COLUMN_TO_QUERY => query_table.embedding,
TOP_K => 2,
MAP['async', 'false'] -- Enable synchronous mode
))Output
| user_keyword | search_results |
|---|---|
| "Spark" | [(3, "Batch", [5.0, 12.0, 13.0], 1.0), (1, "BigData", [1.0, 1.0, 0.0], 0.6538461538461539)] |
| "Flink" | [(2, "Streaming", [-5.0, -12.0, -13.0], 1.0), (1, "BigData", [1.0, 1.0, 0.0], -0.6538461538461539)] |