All Products
Search
Document Center

Realtime Compute for Apache Flink:VECTOR_SEARCH_AGG

Last Updated:Mar 26, 2026

VECTOR_SEARCH_AGG searches a Milvus vector table for the top-K most similar records and returns all matches as a single array. Unlike VECTOR_SEARCH, VECTOR_SEARCH_AGG returns one row per input record, with all matches packed into an ARRAY<ROW> column named search_results.

Limitations

ConstraintDetails
Ververica Runtime (VVR) versionStream mode: VVR 11.3+. Batch mode: VVR 11.4+.
Supported vector tableMilvus only
Stream typeNon-updating streams (INSERT messages only)

Syntax

VECTOR_SEARCH_AGG(
  TABLE <SEARCH_TABLE>,
  DESCRIPTOR(<COLUMN_TO_SEARCH>),
  <COLUMN_TO_QUERY>,
  <TOP_K>[,
  <CONFIG>]
)

Input parameters

ParameterData typeRequiredDescription
TABLE <SEARCH_TABLE>TABLEYesThe name of the Milvus vector table to search.
DESCRIPTOR(<COLUMN_TO_SEARCH>)DESCYesThe indexed vector column in the vector table. Input vectors are compared against this column to compute similarity.
COLUMN_TO_QUERYARRAY<FLOAT> or ARRAY<DOUBLE>YesThe vector feature column from the input data, such as an image or text embedding. Each value is matched against the indexed column.
TOP_KINTYesThe maximum number of similar records to return per input row.
CONFIGMAP<STRING,STRING>NoRuntime parameters for tuning execution behavior.

Return value

VECTOR_SEARCH_AGG returns a table with one row per input record. The schema is ROW<ARRAY<ROW>>.

The array column is named search_results. Each element in the array is a ROW that includes all columns from the vector table, plus an additional score column.

About the `score` column:

  • Data type: DOUBLE

  • Indicates the similarity between the input data and the output data

Runtime parameters

Pass runtime parameters as a MAP<STRING,STRING> in the CONFIG argument.

ParameterData typeDefaultDescription
asyncBoolean(none)Whether to use asynchronous mode. If the connector does not support the specified mode, the engine reports an error. By default, the engine selects the mode the connector supports, preferring asynchronous mode when both are available.
max-concurrent-operationsInteger10Maximum number of in-flight requests in asynchronous mode.
output-modeEnumORDEREDOutput ordering for asynchronous operations. Valid values: ORDERED, ALLOW_UNORDERED. For details, see Async I/O.
timeoutDuration3 minTimeout for a single asynchronous operation, measured from the first call to final completion. Covers multiple retries and resets on failover.

Example

The following example searches vector_table for each record in query_table and retrieves the top 2 most similar entries. Synchronous mode is enabled explicitly via the async runtime parameter.

Test data

vector_table:

idtopicvector_index
1"BigData"[1, 1, 0]
2"Streaming"[-5, -12, -13]
3"Batch"[5, 12, 13]

query_table:

iduser_keywordembedding
1"Spark"[5, 12, 13]
2"Flink"[-5, -12, -13]

SQL

SELECT user_keyword, search_results
FROM
  query_table,
  LATERAL TABLE (VECTOR_SEARCH_AGG(
    SEARCH_TABLE => TABLE vector_table,
    COLUMN_TO_SEARCH => DESCRIPTOR(vector_index),
    COLUMN_TO_QUERY => query_table.embedding,
    TOP_K => 2,
    MAP['async', 'false'] -- Enable synchronous mode
    ))

Output

user_keywordsearch_results
"Spark"[(3, "Batch", [5.0, 12.0, 13.0], 1.0), (1, "BigData", [1.0, 1.0, 0.0], 0.6538461538461539)]
"Flink"[(2, "Streaming", [-5.0, -12.0, -13.0], 1.0), (1, "BigData", [1.0, 1.0, 0.0], -0.6538461538461539)]