すべてのプロダクト
Search
ドキュメントセンター

Realtime Compute for Apache Flink:Milvus コネクタ (パブリックプレビュー)

最終更新日:Oct 25, 2025

このトピックでは、Milvus コネクタの使用方法について説明します。

概要

Milvus は、画像、テキスト、音声などの大規模な非構造化データを処理するために設計された、拡張性の高いベクトルデータベースです。効率的な類似検索をサポートしており、レコメンデーションシステム、画像取得、セマンティック検索などのユースケースに最適です。Milvus コネクタは、次の機能をサポートしています:

カテゴリ

詳細

サポートされるタイプ

結果テーブル、ベクトルテーブル

実行モード

ストリーミング

データフォーマット

なし

特定のモニタリングメトリック

なし

API タイプ

SQL

更新/削除のサポート

はい

特徴

Milvus コネクタは、Apache Flink と Milvus ベクトルデータベースを緊密に統合し、リアルタイムのベクトル検索シナリオ向けに、パフォーマンス専有型で信頼性の高いデータパイプラインを作成します。Milvus コネクタの主な特徴は次のとおりです:

  • 高並列書き込み: 設定可能なシンク並列度をサポートします。

  • 自動リトライ: 失敗した操作をリトライして安定性を向上させます。

  • バッチバッファリング: フラッシュされる前にレコードをバッチ処理することで、書き込みパフォーマンスを向上させます。

  • At-least-once セマンティクス: プライマリキーに基づくべき等な更新により、結果整合性を保証します。

  • ベクトル検索: Flink SQL 内で直接リアルタイムのベクトル類似検索を可能にします。

前提条件

  • Milvus クラスターを作成済みであること。詳細については、「Milvus インスタンスをすばやく作成する」をご参照ください。

  • Milvus コレクションを作成済みであること。特定のパーティションに書き込む予定がある場合は、その Milvus パーティションがすでに存在することを確認してください。

制限事項

  • 結果テーブルへの書き込みには、Ververica Runtime (VVR) 11.1 以降が必要です。

  • ベクトルテーブルのクエリには、VVR 11.3 以降が必要です。

  • Milvus 2.4.x のみがサポートされています。

  • Milvus コネクタは at-least-once セマンティクスのみをサポートします。

構文

CREATE TEMPORARY TABLE milvus_sink (
  id BIGINT,
  f1 STRING,
  f2 BOOLEAN,
  f3 TINYINT,
  f4 SMALLINT,
  f5 INTEGER,
  f6 DATE,
  f7 TIME(3),
  f8 TIMESTAMP_LTZ(3),
  f9 TIMESTAMP(3),
  f10 FLOAT,
  f11 DOUBLE,
  f12 DECIMAL(10, 2),
  f13 ARRAY<FLOAT>,
  f14 ARRAY<DOUBLE>,
  f15 ARRAY<INTEGER>,
  f16 ARRAY<BIGINT>,
  PRIMARY KEY (id) NOT ENFORCED  -- 必須。Milvus はプライマリキーとして BIGINT または STRING のみをサポートします。
) WITH (
  'connector'='milvus',
  'endpoint'='<yourEndpoint>',
  'port'='<yourPort>',
  'userName'='<yourUserName>',
  'password'='<yourPassword>',
  'databaseName'='<yourDatabaseName>',
  'collectionName'='<yourCollectionName>'
);

コネクタオプション

一般

オプション

説明

データ型

必須

デフォルト値

注意

connector

コネクタの名前。

String

はい

milvus に設定します。

endpoint

Milvus データベースのエンドポイント (IP アドレスまたはドメイン名)。

String

はい

詳細については、「ネットワークアクセスを設定する」をご参照ください。

port

Milvus データベースのポート番号。

INTEGER

いいえ

19530

username

Milvus データベースのユーザー名。

STRING

はい

なし。

password

Milvus データベースのパスワード。

STRING

はい

databaseName

Milvus データベースの名前。

STRING

はい

collectionName

Milvus コレクションの名前。

STRING

はい

partitionName

書き込み先のパーティションの名前。

STRING

いいえ

_default

partitionKey.enabled

コレクションがパーティションキーとしてスカラーフィールドを使用するかどうかを指定します。

BOOLEAN

いいえ

false

maxRetries

失敗した操作のリトライ回数。

INTEGER

いいえ

3

なし。

シンク固有

オプション

説明

データ型

必須

デフォルト値

注意

sink.parallelism

シンクオペレーターの並列度。

INTEGER

いいえ

設定されていない場合、並列度はアップストリームオペレーターから継承されます。

sink.maxRetries

書き込み失敗時の最大リトライ回数。

INTEGER

いいえ

3

VVR 11.3 以降では、このオプションは非推奨です。代わりに maxRetries を使用してください。

sink.buffer-flush.max-rows

バッファリングするレコードの最大数 (追加、アップサート、削除を含む)。この数に達するとフラッシュがトリガーされます。

INTEGER

いいえ

10000

このトリガーを無効にするには 0 に設定します。

sink.buffer-flush.interval

バッファリングされたレコードをフラッシュする時間間隔 (ミリ秒単位)。この間隔を超えるとフラッシュがトリガーされます。

INTEGER

いいえ

1000

時間ベースのフラッシュを無効にするには 0 に設定します。

sink.ignoreDelete

削除操作を無視するかどうかを指定します。

BOOLEAN

いいえ

false

有効な値:

  • true: 削除操作を無視します。

  • false: 削除操作を処理します。

ベクトルテーブル固有

パラメーター

説明

データ型

必須

デフォルト値

注意

search.metric

ベクトル類似度を測定するために使用されるメトリック。

String

いいえ

L2

サポートされている類似性メトリックの詳細については、「Milvus ドキュメント」をご参照ください。Milvus v2.4 は現在、次のメトリックをサポートしています:

  • L2: ユークリッド距離

  • IP: 内積

  • COSINE: コサイン類似度

型マッピング

Flink SQL 型

Milvus 型

STRING

VarChar(n)

BOOLEAN

Bool

TINYINT

Int8

SMALLINT

Int16

INTEGER

Int32

BIGINT

Int64

DATE

VarChar(n)

TIME(3)

VarChar(n)

TIMESTAMP_LTZ(3)

Int64

説明

エポックタイム (ミリ秒) として保存されます。

TIMESTAMP(3)

VarChar(n)

FLOAT

Float

DOUBLE

Double

DECIMAL(10, 2)

VarChar(n)

ARRAY<FLOAT>

FloatVector

説明

Milvus コレクションを作成した後、ベクトルフィールドのインデックスを作成します。

ARRAY<DOUBLE>

Array<Double>[m]

ARRAY<INTEGER>

Array<Int32>[m]

ARRAY<BIGINT>

Array<Int64>[m]

  • ストリーミングデータを Milvus に書き込む

-- 毎秒 100 行を生成するモックデータソースを作成します。ストリーミングデータをシミュレートします。
CREATE TEMPORARY TABLE mock_source (
    id STRING,
    vector ARRAY<FLOAT>,       -- FLOAT 配列として渡されるベクトル。
    event_time AS PROCTIME() -- 処理時間属性。
) WITH (
    'connector' = 'datagen',
    'rows-per-second' = '100',  -- 毎秒 100 行を生成します。
    'fields.id.kind' = 'sequence',
    'fields.id.start' = '1',
    'fields.id.end' = '1000'
);

CREATE TEMPORARY TABLE milvus_sink (
  id STRING,               -- デバイス ID などの一意の識別子。
  vector ARRAY<FLOAT>,     -- ベクトルデータ。配列の長さはソースと一致している必要があります。
  timestamp BIGINT         -- ストリーム処理用のタイムスタンプ。
  PRIMARY KEY (id) NOT ENFORCED  -- 必須。Milvus はプライマリキーとして BIGINT または STRING のみをサポートします。
) WITH (
  'connector'='milvus',
  'endpoint'='xxx',
  'port'='19530',
  'userName'='xxx',
  'password'='xxx',
  'databaseName'='xxxx',
  'collectionName'='xxxx'
);

-- データを変換して Milvus に書き込みます。
INSERT INTO milvus_sink
SELECT 
    id,
    vector,
    UNIX_TIMESTAMP() * 1000 AS timestamp  -- 現在のタイムスタンプ (ミリ秒)。
FROM mock_source;
  • ベクトル検索を実行する

CREATE TEMPORARY TABLE milvus_table (
  id STRING,               -- 一意の識別子。
  vector ARRAY<FLOAT>,     -- ベクトルデータ。配列の長さはソースと一致している必要があります。
  PRIMARY KEY (id) NOT ENFORCED  -- 必須。Milvus はプライマリキーとして BIGINT または STRING のみをサポートします。
) WITH (
  'connector'='milvus',
  'endpoint'='xxx',
  'port'='19530',
  'userName'='xxx',
  'password'='xxx',
  'databaseName'='xxxx',
  'collectionName'='xxxx'
);
-- ベクトル [1.1, 2.2, 3.3] に最も類似する上位 2 アイテムを検索します。
SELECT * FROM 
LATERAL TABLE(
  VECTOR_SEARCH(
    TABLE milvus_table, 
    DESCRIPTOR(vector), 
    ARRAY[1.1, 2.2, 3.3], 
    2));

注意: クエリを実行する前に、Milvus コレクションをメモリにロードする必要があります。詳細については、Milvus ドキュメントの「コレクションをロードする」をご参照ください。