このトピックでは、Milvus コネクタの使用方法について説明します。
概要
Milvus は、画像、テキスト、音声などの大規模な非構造化データを処理するために設計された、拡張性の高いベクトルデータベースです。効率的な類似検索をサポートしており、レコメンデーションシステム、画像取得、セマンティック検索などのユースケースに最適です。Milvus コネクタは、次の機能をサポートしています:
カテゴリ | 詳細 |
サポートされるタイプ | 結果テーブル、ベクトルテーブル |
実行モード | ストリーミング |
データフォーマット | なし |
特定のモニタリングメトリック | なし |
API タイプ | SQL |
更新/削除のサポート | はい |
特徴
Milvus コネクタは、Apache Flink と Milvus ベクトルデータベースを緊密に統合し、リアルタイムのベクトル検索シナリオ向けに、パフォーマンス専有型で信頼性の高いデータパイプラインを作成します。Milvus コネクタの主な特徴は次のとおりです:
高並列書き込み: 設定可能なシンク並列度をサポートします。
自動リトライ: 失敗した操作をリトライして安定性を向上させます。
バッチバッファリング: フラッシュされる前にレコードをバッチ処理することで、書き込みパフォーマンスを向上させます。
At-least-once セマンティクス: プライマリキーに基づくべき等な更新により、結果整合性を保証します。
ベクトル検索: Flink SQL 内で直接リアルタイムのベクトル類似検索を可能にします。
前提条件
Milvus クラスターを作成済みであること。詳細については、「Milvus インスタンスをすばやく作成する」をご参照ください。
Milvus コレクションを作成済みであること。特定のパーティションに書き込む予定がある場合は、その Milvus パーティションがすでに存在することを確認してください。
制限事項
結果テーブルへの書き込みには、Ververica Runtime (VVR) 11.1 以降が必要です。
ベクトルテーブルのクエリには、VVR 11.3 以降が必要です。
Milvus 2.4.x のみがサポートされています。
Milvus コネクタは at-least-once セマンティクスのみをサポートします。
構文
CREATE TEMPORARY TABLE milvus_sink (
id BIGINT,
f1 STRING,
f2 BOOLEAN,
f3 TINYINT,
f4 SMALLINT,
f5 INTEGER,
f6 DATE,
f7 TIME(3),
f8 TIMESTAMP_LTZ(3),
f9 TIMESTAMP(3),
f10 FLOAT,
f11 DOUBLE,
f12 DECIMAL(10, 2),
f13 ARRAY<FLOAT>,
f14 ARRAY<DOUBLE>,
f15 ARRAY<INTEGER>,
f16 ARRAY<BIGINT>,
PRIMARY KEY (id) NOT ENFORCED -- 必須。Milvus はプライマリキーとして BIGINT または STRING のみをサポートします。
) WITH (
'connector'='milvus',
'endpoint'='<yourEndpoint>',
'port'='<yourPort>',
'userName'='<yourUserName>',
'password'='<yourPassword>',
'databaseName'='<yourDatabaseName>',
'collectionName'='<yourCollectionName>'
);コネクタオプション
一般
オプション | 説明 | データ型 | 必須 | デフォルト値 | 注意 |
connector | コネクタの名前。 | String | はい |
| |
endpoint | Milvus データベースのエンドポイント (IP アドレスまたはドメイン名)。 | String | はい | 詳細については、「ネットワークアクセスを設定する」をご参照ください。 | |
port | Milvus データベースのポート番号。 | INTEGER | いいえ |
| |
username | Milvus データベースのユーザー名。 | STRING | はい | なし。 | |
password | Milvus データベースのパスワード。 | STRING | はい | ||
databaseName | Milvus データベースの名前。 | STRING | はい | ||
collectionName | Milvus コレクションの名前。 | STRING | はい | ||
partitionName | 書き込み先のパーティションの名前。 | STRING | いいえ |
| |
partitionKey.enabled | コレクションがパーティションキーとしてスカラーフィールドを使用するかどうかを指定します。 | BOOLEAN | いいえ |
| |
maxRetries | 失敗した操作のリトライ回数。 | INTEGER | いいえ |
| なし。 |
シンク固有
オプション | 説明 | データ型 | 必須 | デフォルト値 | 注意 |
sink.parallelism | シンクオペレーターの並列度。 | INTEGER | いいえ | 設定されていない場合、並列度はアップストリームオペレーターから継承されます。 | |
sink.maxRetries | 書き込み失敗時の最大リトライ回数。 | INTEGER | いいえ |
| VVR 11.3 以降では、このオプションは非推奨です。代わりに |
sink.buffer-flush.max-rows | バッファリングするレコードの最大数 (追加、アップサート、削除を含む)。この数に達するとフラッシュがトリガーされます。 | INTEGER | いいえ |
| このトリガーを無効にするには |
sink.buffer-flush.interval | バッファリングされたレコードをフラッシュする時間間隔 (ミリ秒単位)。この間隔を超えるとフラッシュがトリガーされます。 | INTEGER | いいえ |
| 時間ベースのフラッシュを無効にするには |
sink.ignoreDelete | 削除操作を無視するかどうかを指定します。 | BOOLEAN | いいえ |
| 有効な値:
|
ベクトルテーブル固有
パラメーター | 説明 | データ型 | 必須 | デフォルト値 | 注意 |
search.metric | ベクトル類似度を測定するために使用されるメトリック。 | String | いいえ |
| サポートされている類似性メトリックの詳細については、「Milvus ドキュメント」をご参照ください。Milvus v2.4 は現在、次のメトリックをサポートしています:
|
型マッピング
Flink SQL 型 | Milvus 型 |
STRING | VarChar(n) |
BOOLEAN | Bool |
TINYINT | Int8 |
SMALLINT | Int16 |
INTEGER | Int32 |
BIGINT | Int64 |
DATE | VarChar(n) |
TIME(3) | VarChar(n) |
TIMESTAMP_LTZ(3) | Int64 説明 エポックタイム (ミリ秒) として保存されます。 |
TIMESTAMP(3) | VarChar(n) |
FLOAT | Float |
DOUBLE | Double |
DECIMAL(10, 2) | VarChar(n) |
ARRAY<FLOAT> | FloatVector 説明 Milvus コレクションを作成した後、ベクトルフィールドのインデックスを作成します。 |
ARRAY<DOUBLE> | Array<Double>[m] |
ARRAY<INTEGER> | Array<Int32>[m] |
ARRAY<BIGINT> | Array<Int64>[m] |
例
ストリーミングデータを Milvus に書き込む
-- 毎秒 100 行を生成するモックデータソースを作成します。ストリーミングデータをシミュレートします。
CREATE TEMPORARY TABLE mock_source (
id STRING,
vector ARRAY<FLOAT>, -- FLOAT 配列として渡されるベクトル。
event_time AS PROCTIME() -- 処理時間属性。
) WITH (
'connector' = 'datagen',
'rows-per-second' = '100', -- 毎秒 100 行を生成します。
'fields.id.kind' = 'sequence',
'fields.id.start' = '1',
'fields.id.end' = '1000'
);
CREATE TEMPORARY TABLE milvus_sink (
id STRING, -- デバイス ID などの一意の識別子。
vector ARRAY<FLOAT>, -- ベクトルデータ。配列の長さはソースと一致している必要があります。
timestamp BIGINT -- ストリーム処理用のタイムスタンプ。
PRIMARY KEY (id) NOT ENFORCED -- 必須。Milvus はプライマリキーとして BIGINT または STRING のみをサポートします。
) WITH (
'connector'='milvus',
'endpoint'='xxx',
'port'='19530',
'userName'='xxx',
'password'='xxx',
'databaseName'='xxxx',
'collectionName'='xxxx'
);
-- データを変換して Milvus に書き込みます。
INSERT INTO milvus_sink
SELECT
id,
vector,
UNIX_TIMESTAMP() * 1000 AS timestamp -- 現在のタイムスタンプ (ミリ秒)。
FROM mock_source;ベクトル検索を実行する
CREATE TEMPORARY TABLE milvus_table (
id STRING, -- 一意の識別子。
vector ARRAY<FLOAT>, -- ベクトルデータ。配列の長さはソースと一致している必要があります。
PRIMARY KEY (id) NOT ENFORCED -- 必須。Milvus はプライマリキーとして BIGINT または STRING のみをサポートします。
) WITH (
'connector'='milvus',
'endpoint'='xxx',
'port'='19530',
'userName'='xxx',
'password'='xxx',
'databaseName'='xxxx',
'collectionName'='xxxx'
);
-- ベクトル [1.1, 2.2, 3.3] に最も類似する上位 2 アイテムを検索します。
SELECT * FROM
LATERAL TABLE(
VECTOR_SEARCH(
TABLE milvus_table,
DESCRIPTOR(vector),
ARRAY[1.1, 2.2, 3.3],
2));注意: クエリを実行する前に、Milvus コレクションをメモリにロードする必要があります。詳細については、Milvus ドキュメントの「コレクションをロードする」をご参照ください。