全部產品
Search
文件中心

Realtime Compute for Apache Flink:Milvus

更新時間:Oct 24, 2025

本文為您介紹如何使用Milvus連接器。

背景資訊

Milvus是一個高度可擴充的向量資料庫,專為處理大規模非結構化資料(如映像、文本和音頻)設計。支援高效的相似性搜尋,適用於推薦系統、映像檢索、語義搜尋等情境。Milvus連接器支援的資訊如下:

類別

詳情

支援類型

結果表,向量表

運行模式

流模式

資料格式

特有監控指標

API種類

SQL

是否支援更新/刪除

特色功能

Milvus連接器深度整合Apache Flink與Milvus向量資料庫,為即時向量檢索情境提供高效能、高可靠的資料管道。以下為Milvus連接器核心功能特性說明:

  • 高並發寫入:支援自訂結果表並行度;

  • 自動重試機制:失敗後可自動重試,提升穩定性;

  • 批量緩衝寫入:通過設定攢批寫入升寫入效能。

  • At-Least-Once 語義:主持主鍵等冪更新,實現資料最終一致性;

  • 支援向量搜尋:允許使用者通過向量搜尋相似資料;

前提條件

  • 已建立Milvus叢集,詳情請參見快速建立Milvus執行個體

  • Milvus集合結構需提前建立。如使用分區寫入,請確保Milvus分區已存在。

使用限制

  • 僅Realtime Compute引擎VVR 11.1及以上版本支援作為結果表寫入。

  • 僅Realtime Compute引擎VVR 11.3及以上版本支援作為向量表供查詢。

  • 僅支援2.4.x版本Milvus

  • Milvus連接器僅支援At Least Once語義。

文法結構

CREATE TEMPORARY TABLE milvus_sink (
  id BIGINT,
  f1 STRING,
  f2 BOOLEAN,
  f3 TINYINT,
  f4 SMALLINT,
  f5 INTEGER,
  f6 DATE,
  f7 TIME(3),
  f8 TIMESTAMP_LTZ(3),
  f9 TIMESTAMP(3),
  f10 FLOAT,
  f11 DOUBLE,
  f12 DECIMAL(10, 2),
  f13 ARRAY<FLOAT>,
  f14 ARRAY<DOUBLE>,
  f15 ARRAY<INTEGER>,
  f16 ARRAY<BIGINT>,
  PRIMARY KEY (id) NOT ENFORCED  -- 必須,Milvus只支援BIGINT或STRING類型作為主鍵
) WITH (
  'connector'='milvus',
  'endpoint'='<yourEndpoint>',
  'port'='<yourPort>',
  'userName'='<yourUserName>',
  'password'='<yourPassword>',
  'databaseName'='<yourDatabaseName>',
  'collectionName'='<yourCollectionName>'
);

WITH參數

通用參數

參數

說明

資料類型

是否必填

預設值

備忘

connector

連接器名稱。

String

指定使用的連接器名稱,必須為milvus

endPoint

Milvus資料庫的訪問地址(IP或網域名稱)

String

詳情請參見網路訪問與安全設定

port

Milvus資料庫服務的連接埠號碼。

INTEGER

19530

username

Milvus資料庫服務的使用者名稱。

STRING

無。

password

Milvus資料庫服務的密碼。

STRING

databaseName

Milvus資料庫名稱。

STRING

collectionName

Milvus集合(Collection)名稱。

STRING

partitionName

寫入的分區名稱。

STRING

_default

partitionKey.enabled

集合是否啟用了標量欄位作為Partition Key。

BOOLEAN

false

maxReries

重試次數。

INTEGER

3

無。

結果表專屬參數

參數

說明

資料類型

是否必填

預設值

備忘

sink.parallelism

自訂Sink的並行度。

INTEGER

上遊運算元的並發,由系統架構決定。

sink.maxRetries

寫入失敗時的最大重試次數。

INTEGER

3

該參數從Realtime Compute引擎VVR 11.3起已廢棄,建議使用 maxReries 作為替代。

sink.buffer-flush.max-rows

緩衝記錄的最大數量(包括append、upsert和delete),超過該值將重新整理資料到Milvus;

INTEGER

10000

設定為0表示禁用。

sink.buffer-flush.interval

緩衝記錄的重新整理間隔時間,超過該時間將重新整理資料到Milvus。

INTEGER

1000

單位:毫秒(ms),設定為0表示禁用。

sink.ignoreDelete

是否忽略刪除操作。

BOOLEAN

false

參數取值如下:

  • true:忽略Delete操作。

  • false:不忽略Delete操作。

向量表專屬參數

參數

說明

資料類型

是否必填

預設值

備忘

search.metric

評價向量相似性指標。

String

L2

支援的相似性指標請參考 milvus 文檔。目前,milvus 2.4 版本支援的指標如下:

  • L2:使用歐幾裡得距離衡量相似性。

  • IP:使用向量點積計算相似性。

  • COSINE: 使用三角函數 Cosine 計算相似性。

類型映射

FlinkSQL類型

Milvus類型

STRING

VarChar(n)

BOOLEAN

Bool

TINYINT

Int8

SMALLINT

Int16

INTEGER

Int32

BIGINT

Int64

DATE

VarChar(n)

TIME(3)

VarChar(n)

TIMESTAMP_LTZ(3)

Int64

說明

Epoch毫秒值。

TIMESTAMP(3)

VarChar(n)

FLOAT

Float

DOUBLE

Double

DECIMAL(10, 2)

VarChar(n)

ARRAY<FLOAT>

FloatVector

說明

在建立Milvus Collection之後,需要為向量欄位建立索引。

ARRAY<DOUBLE>

Array<Double>[m]

ARRAY<INTEGER>

Array<Int32>[m]

ARRAY<BIGINT>

Array<Int64>[m]

使用樣本

  • 產生類比流資料寫入

-- 產生類比流資料(每2秒產生一條)
CREATE TEMPORARY TABLE mock_source (
    id STRING,
    vector ARRAY<FLOAT>,       -- 向量以FLOAT數組形式傳入,預設長度3
    event_time AS PROCTIME() -- 事件時間(Flink自動產生)
) WITH (
    'connector' = 'datagen',
    'rows-per-second' = '100',  -- 每秒產生100條
    'fields.id.kind' = 'sequence',
    'fields.id.start' = '1',
    'fields.id.end' = '1000'
);

CREATE TEMPORARY TABLE milvus_sink (
  id STRING,               -- 唯一標識(如裝置ID)
  vector ARRAY<FLOAT>,     -- 向量資料(必須為FLOAT數組,且數組長度必須和上遊一致)
  timestamp BIGINT         -- 時間戳記(用於流處理)
  PRIMARY KEY (id) NOT ENFORCED  -- 必須,Milvus只支援BIGINT或STRING類型作為主鍵
) WITH (
  'connector'='milvus',
  'endpoint'='xxx',
  'port'='19530',
  'userName'='xxx',
  'password'='xxx',
  'databaseName'='xxxx',
  'collectionName'='xxxx'
);

-- 轉換資料並寫入Milvus
INSERT INTO milvus_stream_sink
SELECT 
    id,
    vector,
    UNIX_TIMESTAMP() * 1000 AS timestamp  -- 目前時間戳(毫秒)
FROM mock_source;
  • 向量搜尋

CREATE TEMPORARY TABLE milvus_table (
  id STRING,               -- 唯一標識(如裝置ID)
  vector ARRAY<FLOAT>,     -- 向量資料(必須為FLOAT數組,且數組長度必須和上遊一致)
  PRIMARY KEY (id) NOT ENFORCED  -- 必須,Milvus只支援BIGINT或STRING類型作為主鍵
) WITH (
  'connector'='milvus',
  'endpoint'='xxx',
  'port'='19530',
  'userName'='xxx',
  'password'='xxx',
  'databaseName'='xxxx',
  'collectionName'='xxxx'
);
-- 根據向量[1.1, 2.2, 3.3],尋找 top2 相似的資料。
SELECT * FROM 
LATERAL TABLE(
  VECTOR_SEARCH(
    TABLE milvus_table, 
    DESCRIPTOR(vector), 
    ARRAY[1.1, 2.2, 3.3], 
    2));

注意:在作業啟動前請先手動載入該表,具體請參考文檔