すべてのプロダクト
Search
ドキュメントセンター

Realtime Compute for Apache Flink:VECTOR_SEARCH

最終更新日:Dec 10, 2025

このトピックでは、ベクトル検索に VECTOR_SEARCH 関数を使用する方法について説明します。この関数は、指定された高次元数値ベクトルに基づいて、意味的に最も類似した項目を検索します。

制限事項

  • バージョンサポート:Ververica Runtime (VVR) 11.3 以降はストリームモードをサポートし、VVR 11.4 以降はバッチモードをサポートします。

  • ベクトルテーブル:ベクトルテーブルとしてサポートされているのは Milvus のみです。

  • ストリームタイプ:非更新ストリーム (INSERT メッセージのみを含む) のみがサポートされています。

  • 実行モード:この関数はストリームモードでのみ実行され、バッチモードはサポートされていません。

構文

VECTOR_SEARCH(
  TABLE <SEARCH_TABLE>,
  DESCRIPTOR(<COLUMN_TO_SEARCH>),
  <COLUMN_TO_QUERY>,
  <TOP_K>[,
  <CONFIG>]
)

入力パラメーター

パラメーター

データ型

説明

TABLE <SEARCH_TABLE>

TABLE

ベクトルテーブルの名前。

DESCRIPTOR(<COLUMN_TO_SEARCH>)

DESC

ベクトルテーブル内のインデックス付きベクトル列。入力データはこの列と比較され、類似度が計算されます。

COLUMN_TO_QUERY

ARRAY<FLOAT>/ARRAY<DOUBLE>

アップロードされた画像やテキストの埋め込みなど、入力データからのベクトル特徴列。この列は、インデックス付きベクトル列と照合され、類似項目を検索します。

TOP_K

INT

返される類似データエントリの最大数。

CONFIG

MAP<STRING,STRING>

設定可能な実行時パラメーター

戻り値

VECTOR_SEARCH 関数はテーブルを返します。各行には、ベクトルテーブルのすべての列と追加のスコア列が含まれます。スコア列は DOUBLE データ型で、入力データと出力データの類似度を示します。

実行時パラメーター

パラメーター

データ型

デフォルト

説明

async

ブール値

(なし)

非同期モードを有効にするかどうかを指定します。ベクトルテーブルのコネクタが指定されたモードをサポートしていない場合、エンジンはエラーを報告します。

デフォルトでは、エンジンはコネクタがサポートする内容に基づいて実行モードを選択します。コネクタが非同期モードと同期モードの両方をサポートしている場合、エンジンはスループットを向上させるために非同期モードを優先します。

max-concurrent-operations

整数

10

非同期モードでの同時リクエストの最大数。

output-mode

列挙型

ORDERED

非同期操作の出力モード。

有効な値:

  • ORDERED

  • ALLOW_UNORDERED

これらの値の詳細については、「Async I/O」をご参照ください。

timeout

期間

3 min

非同期操作のタイムアウト。最初の呼び出しから完了までの期間です。この期間には複数のリトライが含まれる場合があり、フェールオーバー時にリセットされます。

テストデータ

vector_table には次のデータが含まれていると仮定します:

id

topic

vector_index

1

"BigData"

[1, 1, 0]

2

"Streaming"

[-5, -12, -13]

3

"Batch"

[5, 12, 13]

query_table には次のデータが含まれていると仮定します:

id

user_keyword

embedding

1

"Spark"

[5, 12, 13]

2

"Flink"

[-5, -12, -13]

テスト文

次の SQL 文は、query_table の各行を使用して vector_table を検索し、最も類似した上位 2 つのレコードを取得します。

SELECT user_keyword, topic
FROM 
  query_table,
  LATERAL TABLE (VECTOR_SEARCH(
    SEARCH_TABLE => TABLE vector_table, 
    COLUMN_TO_SEARCH => DESCRIPTOR(vector_index), 
    COLUMN_TO_QUERY => query_table.embedding, 
    TOP_K => 2,
    MAP['async', 'false'] -- 同期モードを有効にする
    ))

結果

user_keyword

topic

"Spark"

"Batch"

"Spark"

"BigData"

"Flink"

"Streaming"

"Flink"

"BigData"