All Products
Search
Document Center

Realtime Compute for Apache Flink:VECTOR_SEARCH

Last Updated:Mar 26, 2026

VECTOR_SEARCH is a function that finds the most semantically similar items based on a specified high-dimensional numerical vector, querying a Milvus vector table from a Flink SQL job and returning the top-K most similar entries.

Limitations

  • Version support: Ververica Runtime (VVR) 11.3 and later support stream mode. VVR 11.4 and later support batch mode.

  • Vector table: Only Milvus is supported as the vector table.

  • Stream type: Only insert-only streams are supported (streams containing only INSERT messages).

  • Execution mode: VECTOR_SEARCH runs in stream mode only; batch mode is not supported.

Syntax

VECTOR_SEARCH(
  TABLE <SEARCH_TABLE>,
  DESCRIPTOR(<COLUMN_TO_SEARCH>),
  <COLUMN_TO_QUERY>,
  <TOP_K>[,
  <CONFIG>]
)

Parameters

ParameterTypeDescription
TABLE <SEARCH_TABLE>TABLEThe name of the vector table.
DESCRIPTOR(<COLUMN_TO_SEARCH>)DESCThe indexed vector column in the vector table. Input data is compared against this column to compute similarity.
COLUMN_TO_QUERYARRAY<FLOAT> / ARRAY<DOUBLE>The embedding column from the input stream, such as the embedding of an uploaded image or text.
TOP_KINTThe maximum number of similar entries to return per input row.
CONFIGMAP<STRING,STRING>Optional runtime parameters. See Runtime parameters.

Return value

VECTOR_SEARCH returns a table. Each row contains all columns from the vector table plus a score column of type DOUBLE. The score column indicates the similarity between the input data and the output data.

Runtime parameters

Pass runtime parameters as a MAP<STRING,STRING> in the CONFIG argument, for example: MAP['async', 'false'].

By default, the engine selects execution mode based on what the Milvus connector supports. If the connector supports both asynchronous and synchronous modes, the engine prioritizes asynchronous mode to maximize throughput. Set async explicitly to override this behavior.

ParameterTypeDefaultDescription
asyncBoolean(none)Whether to use asynchronous mode. If the connector does not support the specified mode, the engine reports an error.
max-concurrent-operationsInteger10The maximum number of concurrent requests in asynchronous mode.
output-modeEnumORDEREDThe output mode for asynchronous operations. Valid values: ORDERED, ALLOW_UNORDERED. For details, see Async I/O — Order of results.
timeoutDuration3 minThe timeout for an asynchronous operation, from the first call until completion. This period can include multiple retries and is reset on failover.

Example

Test data

vector_table:

idtopicvector_index
1"BigData"[1, 1, 0]
2"Streaming"[-5, -12, -13]
3"Batch"[5, 12, 13]

query_table:

iduser_keywordembedding
1"Spark"[5, 12, 13]
2"Flink"[-5, -12, -13]

Query

The following statement uses each row in query_table to search vector_table and retrieve the two most similar records. The MAP['async', 'false'] option enables synchronous mode explicitly.

SELECT user_keyword, topic
FROM
  query_table,
  LATERAL TABLE (VECTOR_SEARCH(
    SEARCH_TABLE => TABLE vector_table,
    COLUMN_TO_SEARCH => DESCRIPTOR(vector_index),
    COLUMN_TO_QUERY => query_table.embedding,
    TOP_K => 2,
    MAP['async', 'false'] -- Enable synchronous mode
    ))

Results

user_keywordtopic
"Spark""Batch"
"Spark""BigData"
"Flink""Streaming"
"Flink""BigData"