すべてのプロダクト
Search
ドキュメントセンター

Realtime Compute for Apache Flink:VECTOR_SEARCH_AGG

最終更新日:Dec 04, 2025

このトピックでは、VECTOR_SEARCH_AGG 関数を使用してベクトル検索を実行する方法について説明します。ベクトル検索を使用すると、指定された高次元数値ベクターに基づいて、意味的に最も類似した項目を見つけることができます。VECTOR_SEARCH 関数とは異なり、この関数は一致するすべての項目を配列で返します。

制限事項

  • ストリームモードは、Ververica Runtime (VVR) エンジン 11.3 以降を使用する Realtime Compute for Apache Flink でサポートされています。バッチモードは、VVR エンジン 11.4 以降でサポートされています。

  • 検索は Milvus ベクターテーブルでのみ実行できます。

  • 非更新ストリームのみがサポートされています。非更新ストリームには INSERT メッセージのみが含まれます。

構文

VECTOR_SEARCH_AGG(
  TABLE <SEARCH_TABLE>,
  DESCRIPTOR(<COLUMN_TO_SEARCH>),
  <COLUMN_TO_QUERY>,
  <TOP_K>[,
  <CONFIG>]
)

入力パラメーター

パラメーター

データ型

説明

TABLE <SEARCH_TABLE>

TABLE

ベクターテーブルの名前。

DESCRIPTOR(<COLUMN_TO_SEARCH>)

DESC

ベクターテーブル内のインデックス付きベクター列。入力データはこの列と比較され、類似度が計算されます。

COLUMN_TO_QUERY

ARRAY<FLOAT>/ARRAY<DOUBLE>

ユーザーがアップロードした画像やテキストの埋め込みなど、入力からのベクター特徴量列。この列は、インデックス付きベクター列との類似度マッチングに使用されます。

TOP_K

INT

返される類似データレコードの最大数。

CONFIG

MAP<STRING,STRING>

設定可能な実行時パラメーター

戻り値

VECTOR_SEARCH_AGG は、単一のレコードを持つデータテーブルを返します。テーブルスキーマは ROW<ARRAY<ROW>> です。

配列列の名前は search_results です。配列内の各 ROW には、ベクターテーブルのすべての列と、score という名前の追加列が含まれます。score 列は DOUBLE 型で、入力データと出力データの類似度を示します。

実行時パラメーター

パラメーター

データ型

デフォルト値

説明

async

Boolean

(なし)

非同期モードを有効にするかどうかを指定します。ベクターテーブルで使用されるコネクタが指定されたモードをサポートしていない場合、エンジンはエラーをレポートします。

デフォルトでは、エンジンはコネクタがサポートするモードに基づいて実行モードを選択します。コネクタが非同期モードと同期モードの両方をサポートしている場合、エンジンは全体的なスループットを向上させるために非同期モードを優先します。

max-concurrent-operations

Integer

10

非同期モードでの同時リクエストの最大数。

output-mode

Enum

ORDERED

非同期操作の出力モード。

指定可能な値:

  • ORDERED

  • ALLOW_UNORDERED

これらの値の詳細については、「Async I/O」をご参照ください。

timeout

Duration

3 min

最初の呼び出しから非同期操作が完了するまでのタイムアウト期間。この期間には複数の再試行が含まれる場合があり、フェールオーバー時にリセットされます。

テストデータ

ベクターテーブル vector_table には、次のデータが含まれていると仮定します。

id

topic

vector_index

1

"BigData"

[1, 1, 0]

2

"Streaming"

[-5, -12, -13]

3

"Batch"

[5, 12, 13]

データテーブル query_table には、次のデータが含まれていると仮定します。

id

user_keyword

embedding

1

"Spark"

[5, 12, 13]

2

"Flink"

[-5, -12, -13]

テスト文

次の SQL 文は、query_table 内の各レコードについて vector_table を検索し、最も類似性の高い上位 2 件の項目を取得します。

SELECT user_keyword, search_results
FROM 
  query_table,
  LATERAL TABLE (VECTOR_SEARCH_AGG(
    SEARCH_TABLE => TABLE vector_table, 
    COLUMN_TO_SEARCH => DESCRIPTOR(vector_index), 
    COLUMN_TO_QUERY => query_table.embedding, 
    TOP_K => 2,
    MAP['async', 'false'] -- 同期モードを有効にする
    ))

出力

user_keyword

search_results

"Spark"

[(3, "Batch", [5.0, 12.0, 13.0], 1.0), (1, "BigData", [1.0, 1.0, 0.0], 0.6538461538461539)]

"Flink"

[(2, "Streaming", [-5.0, -12.0, -13.0], 1.0), (1, "BigData", [1.0, 1.0, 0.0], -0.6538461538461539)]