VECTOR_SEARCH is a function that finds the most semantically similar items based on a specified high-dimensional numerical vector, querying a Milvus vector table from a Flink SQL job and returning the top-K most similar entries.
Limitations
Version support: Ververica Runtime (VVR) 11.3 and later support stream mode. VVR 11.4 and later support batch mode.
Vector table: Only Milvus is supported as the vector table.
Stream type: Only insert-only streams are supported (streams containing only
INSERTmessages).Execution mode:
VECTOR_SEARCHruns in stream mode only; batch mode is not supported.
Syntax
VECTOR_SEARCH(
TABLE <SEARCH_TABLE>,
DESCRIPTOR(<COLUMN_TO_SEARCH>),
<COLUMN_TO_QUERY>,
<TOP_K>[,
<CONFIG>]
)Parameters
| Parameter | Type | Description |
|---|---|---|
TABLE <SEARCH_TABLE> | TABLE | The name of the vector table. |
DESCRIPTOR(<COLUMN_TO_SEARCH>) | DESC | The indexed vector column in the vector table. Input data is compared against this column to compute similarity. |
COLUMN_TO_QUERY | ARRAY<FLOAT> / ARRAY<DOUBLE> | The embedding column from the input stream, such as the embedding of an uploaded image or text. |
TOP_K | INT | The maximum number of similar entries to return per input row. |
CONFIG | MAP<STRING,STRING> | Optional runtime parameters. See Runtime parameters. |
Return value
VECTOR_SEARCH returns a table. Each row contains all columns from the vector table plus a score column of type DOUBLE. The score column indicates the similarity between the input data and the output data.
Runtime parameters
Pass runtime parameters as a MAP<STRING,STRING> in the CONFIG argument, for example: MAP['async', 'false'].
By default, the engine selects execution mode based on what the Milvus connector supports. If the connector supports both asynchronous and synchronous modes, the engine prioritizes asynchronous mode to maximize throughput. Set async explicitly to override this behavior.
| Parameter | Type | Default | Description |
|---|---|---|---|
async | Boolean | (none) | Whether to use asynchronous mode. If the connector does not support the specified mode, the engine reports an error. |
max-concurrent-operations | Integer | 10 | The maximum number of concurrent requests in asynchronous mode. |
output-mode | Enum | ORDERED | The output mode for asynchronous operations. Valid values: ORDERED, ALLOW_UNORDERED. For details, see Async I/O — Order of results. |
timeout | Duration | 3 min | The timeout for an asynchronous operation, from the first call until completion. This period can include multiple retries and is reset on failover. |
Example
Test data
vector_table:
| id | topic | vector_index |
|---|---|---|
| 1 | "BigData" | [1, 1, 0] |
| 2 | "Streaming" | [-5, -12, -13] |
| 3 | "Batch" | [5, 12, 13] |
query_table:
| id | user_keyword | embedding |
|---|---|---|
| 1 | "Spark" | [5, 12, 13] |
| 2 | "Flink" | [-5, -12, -13] |
Query
The following statement uses each row in query_table to search vector_table and retrieve the two most similar records. The MAP['async', 'false'] option enables synchronous mode explicitly.
SELECT user_keyword, topic
FROM
query_table,
LATERAL TABLE (VECTOR_SEARCH(
SEARCH_TABLE => TABLE vector_table,
COLUMN_TO_SEARCH => DESCRIPTOR(vector_index),
COLUMN_TO_QUERY => query_table.embedding,
TOP_K => 2,
MAP['async', 'false'] -- Enable synchronous mode
))Results
| user_keyword | topic |
|---|---|
| "Spark" | "Batch" |
| "Spark" | "BigData" |
| "Flink" | "Streaming" |
| "Flink" | "BigData" |