本文為您介紹如何使用Milvus連接器。
背景資訊
Milvus是一個高度可擴充的向量資料庫,專為處理大規模非結構化資料(如映像、文本和音頻)設計。支援高效的相似性搜尋,適用於推薦系統、映像檢索、語義搜尋等情境。Milvus連接器支援的資訊如下:
類別 | 詳情 |
支援類型 | 結果表,向量表 |
運行模式 | 流模式 |
資料格式 | 無 |
特有監控指標 | 無 |
API種類 | SQL |
是否支援更新/刪除 | 是 |
特色功能
Milvus連接器深度整合Apache Flink與Milvus向量資料庫,為即時向量檢索情境提供高效能、高可靠的資料管道。以下為Milvus連接器核心功能特性說明:
高並發寫入:支援自訂結果表並行度;
自動重試機制:失敗後可自動重試,提升穩定性;
批量緩衝寫入:通過設定攢批寫入升寫入效能。
At-Least-Once 語義:主持主鍵等冪更新,實現資料最終一致性;
支援向量搜尋:允許使用者通過向量搜尋相似資料;
前提條件
已建立Milvus叢集,詳情請參見快速建立Milvus執行個體。
Milvus集合結構需提前建立。如使用分區寫入,請確保Milvus分區已存在。
使用限制
僅Realtime Compute引擎VVR 11.1及以上版本支援作為結果表寫入。
僅Realtime Compute引擎VVR 11.3及以上版本支援作為向量表供查詢。
僅支援2.4.x版本Milvus。
Milvus連接器僅支援At Least Once語義。
文法結構
CREATE TEMPORARY TABLE milvus_sink (
id BIGINT,
f1 STRING,
f2 BOOLEAN,
f3 TINYINT,
f4 SMALLINT,
f5 INTEGER,
f6 DATE,
f7 TIME(3),
f8 TIMESTAMP_LTZ(3),
f9 TIMESTAMP(3),
f10 FLOAT,
f11 DOUBLE,
f12 DECIMAL(10, 2),
f13 ARRAY<FLOAT>,
f14 ARRAY<DOUBLE>,
f15 ARRAY<INTEGER>,
f16 ARRAY<BIGINT>,
PRIMARY KEY (id) NOT ENFORCED -- 必須,Milvus只支援BIGINT或STRING類型作為主鍵
) WITH (
'connector'='milvus',
'endpoint'='<yourEndpoint>',
'port'='<yourPort>',
'userName'='<yourUserName>',
'password'='<yourPassword>',
'databaseName'='<yourDatabaseName>',
'collectionName'='<yourCollectionName>'
);WITH參數
通用參數
參數 | 說明 | 資料類型 | 是否必填 | 預設值 | 備忘 |
connector | 連接器名稱。 | String | 是 | 無 | 指定使用的連接器名稱,必須為 |
endPoint | Milvus資料庫的訪問地址(IP或網域名稱) | String | 是 | 無 | 詳情請參見網路訪問與安全設定。 |
port | Milvus資料庫服務的連接埠號碼。 | INTEGER | 否 | 19530 | |
username | Milvus資料庫服務的使用者名稱。 | STRING | 是 | 無 | 無。 |
password | Milvus資料庫服務的密碼。 | STRING | 是 | 無 | |
databaseName | Milvus資料庫名稱。 | STRING | 是 | 無 | |
collectionName | Milvus集合(Collection)名稱。 | STRING | 是 | 無 | |
partitionName | 寫入的分區名稱。 | STRING | 否 | _default | |
partitionKey.enabled | 集合是否啟用了標量欄位作為Partition Key。 | BOOLEAN | 否 | false | |
maxReries | 重試次數。 | INTEGER | 否 | 3 | 無。 |
結果表專屬參數
參數 | 說明 | 資料類型 | 是否必填 | 預設值 | 備忘 |
sink.parallelism | 自訂Sink的並行度。 | INTEGER | 否 | 無 | 上遊運算元的並發,由系統架構決定。 |
sink.maxRetries | 寫入失敗時的最大重試次數。 | INTEGER | 否 | 3 | 該參數從Realtime Compute引擎VVR 11.3起已廢棄,建議使用 maxReries 作為替代。 |
sink.buffer-flush.max-rows | 緩衝記錄的最大數量(包括append、upsert和delete),超過該值將重新整理資料到Milvus; | INTEGER | 否 | 10000 | 設定為 |
sink.buffer-flush.interval | 緩衝記錄的重新整理間隔時間,超過該時間將重新整理資料到Milvus。 | INTEGER | 否 | 1000 | 單位:毫秒(ms),設定為 |
sink.ignoreDelete | 是否忽略刪除操作。 | BOOLEAN | 否 | false | 參數取值如下:
|
向量表專屬參數
參數 | 說明 | 資料類型 | 是否必填 | 預設值 | 備忘 |
search.metric | 評價向量相似性指標。 | String | 否 | L2 | 支援的相似性指標請參考 milvus 文檔。目前,milvus 2.4 版本支援的指標如下:
|
類型映射
FlinkSQL類型 | Milvus類型 |
STRING | VarChar(n) |
BOOLEAN | Bool |
TINYINT | Int8 |
SMALLINT | Int16 |
INTEGER | Int32 |
BIGINT | Int64 |
DATE | VarChar(n) |
TIME(3) | VarChar(n) |
TIMESTAMP_LTZ(3) | Int64 說明 Epoch毫秒值。 |
TIMESTAMP(3) | VarChar(n) |
FLOAT | Float |
DOUBLE | Double |
DECIMAL(10, 2) | VarChar(n) |
ARRAY<FLOAT> | FloatVector 說明 在建立Milvus Collection之後,需要為向量欄位建立索引。 |
ARRAY<DOUBLE> | Array<Double>[m] |
ARRAY<INTEGER> | Array<Int32>[m] |
ARRAY<BIGINT> | Array<Int64>[m] |
使用樣本
產生類比流資料寫入
-- 產生類比流資料(每2秒產生一條)
CREATE TEMPORARY TABLE mock_source (
id STRING,
vector ARRAY<FLOAT>, -- 向量以FLOAT數組形式傳入,預設長度3
event_time AS PROCTIME() -- 事件時間(Flink自動產生)
) WITH (
'connector' = 'datagen',
'rows-per-second' = '100', -- 每秒產生100條
'fields.id.kind' = 'sequence',
'fields.id.start' = '1',
'fields.id.end' = '1000'
);
CREATE TEMPORARY TABLE milvus_sink (
id STRING, -- 唯一標識(如裝置ID)
vector ARRAY<FLOAT>, -- 向量資料(必須為FLOAT數組,且數組長度必須和上遊一致)
timestamp BIGINT -- 時間戳記(用於流處理)
PRIMARY KEY (id) NOT ENFORCED -- 必須,Milvus只支援BIGINT或STRING類型作為主鍵
) WITH (
'connector'='milvus',
'endpoint'='xxx',
'port'='19530',
'userName'='xxx',
'password'='xxx',
'databaseName'='xxxx',
'collectionName'='xxxx'
);
-- 轉換資料並寫入Milvus
INSERT INTO milvus_stream_sink
SELECT
id,
vector,
UNIX_TIMESTAMP() * 1000 AS timestamp -- 目前時間戳(毫秒)
FROM mock_source;向量搜尋
CREATE TEMPORARY TABLE milvus_table (
id STRING, -- 唯一標識(如裝置ID)
vector ARRAY<FLOAT>, -- 向量資料(必須為FLOAT數組,且數組長度必須和上遊一致)
PRIMARY KEY (id) NOT ENFORCED -- 必須,Milvus只支援BIGINT或STRING類型作為主鍵
) WITH (
'connector'='milvus',
'endpoint'='xxx',
'port'='19530',
'userName'='xxx',
'password'='xxx',
'databaseName'='xxxx',
'collectionName'='xxxx'
);
-- 根據向量[1.1, 2.2, 3.3],尋找 top2 相似的資料。
SELECT * FROM
LATERAL TABLE(
VECTOR_SEARCH(
TABLE milvus_table,
DESCRIPTOR(vector),
ARRAY[1.1, 2.2, 3.3],
2));注意:在作業啟動前請先手動載入該表,具體請參考文檔。