全部产品
Search
文档中心

Realtime Compute for Apache Flink:Konektor Milvus (pratinjau publik)

更新时间:Oct 25, 2025

Topik ini menjelaskan cara menggunakan Konektor Milvus.

Ikhtisar

Milvus adalah database vektor yang sangat skalabel dan dirancang untuk menangani data tidak terstruktur berskala besar seperti citra, teks, dan audio. Dengan dukungan pencarian kesamaan yang efisien, Milvus ideal untuk kasus penggunaan seperti sistem rekomendasi, pengambilan citra, dan pencarian semantik. Konektor Milvus mendukung kemampuan berikut:

Kategori

Rincian

Jenis yang didukung

Tabel sink, tabel vektor

Mode operasi

Streaming

Format data

Tidak ada

Metrik pemantauan spesifik

Tidak ada

Jenis API

SQL

Mendukung pembaruan/penghapusan

Ya

Fitur

Konektor Milvus mengintegrasikan Apache Flink dengan database vektor Milvus secara ketat untuk menciptakan pipa data berkinerja tinggi dan andal dalam skenario pencarian vektor waktu nyata. Fitur utama dari Konektor Milvus meliputi:

  • Tulis dengan konkurensi tinggi: Mendukung paralelisme sink yang dapat dikonfigurasi.

  • Percobaan ulang otomatis: Mengulangi operasi gagal untuk meningkatkan stabilitas.

  • Buffering dalam batch: Meningkatkan kinerja penulisan dengan mengelompokkan catatan sebelum disiram.

  • Semantik setidaknya sekali: Memastikan konsistensi akhir melalui pembaruan idempoten berdasarkan kunci utama.

  • Pencarian vektor: Memungkinkan pencarian kesamaan vektor waktu nyata langsung di dalam Flink SQL.

Prasyarat

  • Anda telah membuat kluster Milvus. Untuk informasi lebih lanjut, lihat Buat instans Milvus dengan cepat.

  • Anda telah membuat koleksi Milvus. Jika Anda berencana menulis ke partisi tertentu, pastikan bahwa partisi Milvus sudah ada.

Keterbatasan

  • Menulis ke tabel sink memerlukan Ververica Runtime (VVR) versi 11.1 atau lebih baru.

  • Meminta tabel vektor memerlukan VVR versi 11.3 atau lebih baru.

  • Hanya Milvus 2.4.x yang didukung.

  • Konektor Milvus hanya mendukung semantik setidaknya sekali.

Sintaksis

CREATE TEMPORARY TABLE milvus_sink (
  id BIGINT,
  f1 STRING,
  f2 BOOLEAN,
  f3 TINYINT,
  f4 SMALLINT,
  f5 INTEGER,
  f6 DATE,
  f7 TIME(3),
  f8 TIMESTAMP_LTZ(3),
  f9 TIMESTAMP(3),
  f10 FLOAT,
  f11 DOUBLE,
  f12 DECIMAL(10, 2),
  f13 ARRAY<FLOAT>,
  f14 ARRAY<DOUBLE>,
  f15 ARRAY<INTEGER>,
  f16 ARRAY<BIGINT>,
  PRIMARY KEY (id) NOT ENFORCED  -- Diperlukan. Milvus hanya mendukung BIGINT atau STRING sebagai kunci utama.
) WITH (
  'connector'='milvus',
  'endpoint'='<yourEndpoint>',
  'port'='<yourPort>',
  'userName'='<yourUserName>',
  'password'='<yourPassword>',
  'databaseName'='<yourDatabaseName>',
  'collectionName'='<yourCollectionName>'
);

Opsi konektor

Umum

Opsi

Deskripsi

Tipe data

Diperlukan

Nilai default

Catatan

connector

Nama konektor.

String

Ya

Atur ke milvus.

endpoint

Titik akhir (alamat IP atau nama domain) dari database Milvus.

String

Ya

Lihat Konfigurasikan akses jaringan.

port

Nomor port dari database Milvus.

INTEGER

Tidak

19530

username

Nama pengguna untuk database Milvus.

STRING

Ya

Tidak ada.

password

Kata sandi untuk database Milvus.

STRING

Ya

databaseName

Nama database Milvus.

STRING

Ya

collectionName

Nama koleksi Milvus.

STRING

Ya

partitionName

Nama partisi untuk ditulis.

STRING

Tidak

_default

partitionKey.enabled

Menentukan apakah koleksi menggunakan bidang skalar sebagai kunci partisi.

BOOLEAN

Tidak

false

maxRetries

Jumlah percobaan ulang untuk operasi gagal.

INTEGER

Tidak

3

Tidak ada.

Spesifik Sink

Opsi

Deskripsi

Tipe data

Diperlukan

Nilai default

Catatan

sink.parallelism

Paralelisme untuk operator sink.

INTEGER

Tidak

Jika tidak diatur, paralelisme diwarisi dari operator hulu.

sink.maxRetries

Jumlah maksimum percobaan ulang saat penulisan gagal.

INTEGER

Tidak

3

Dalam VVR 11.3 dan versi lebih baru, opsi ini sudah ditinggalkan. Gunakan maxRetries sebagai gantinya.

sink.buffer-flush.max-rows

Jumlah maksimum catatan untuk dibuffer (termasuk append, upsert, dan delete). Flush dipicu ketika jumlah ini tercapai.

INTEGER

Tidak

10000

Atur ke 0 untuk menonaktifkan pemicu ini.

sink.buffer-flush.interval

Interval waktu dalam milidetik (ms) untuk menyiram catatan yang dibuffer. Flush dipicu ketika interval ini terlampaui.

INTEGER

Tidak

1000

Atur ke 0 untuk menonaktifkan penyiraman berbasis waktu.

sink.ignoreDelete

Menentukan apakah akan mengabaikan operasi penghapusan.

BOOLEAN

Tidak

false

Nilai valid:

  • true: Mengabaikan operasi penghapusan.

  • false: Memproses operasi penghapusan.

Spesifik tabel vektor

Parameter

Deskripsi

Tipe data

Diperlukan

Nilai default

Catatan

search.metric

Metrik yang digunakan untuk mengukur kesamaan vektor.

String

Tidak

L2

Untuk informasi tentang metrik kesamaan yang didukung, lihat dokumentasi Milvus. Milvus v2.4 saat ini mendukung metrik berikut:

  • L2: Jarak Euclidean

  • IP: Produk dalam

  • COSINE: Kesamaan kosinus

Pemetaan tipe

Tipe Flink SQL

Tipe Milvus

STRING

VarChar(n)

BOOLEAN

Bool

TINYINT

Int8

SMALLINT

Int16

INTEGER

Int32

BIGINT

Int64

DATE

VarChar(n)

TIME(3)

VarChar(n)

TIMESTAMP_LTZ(3)

Int64

Catatan

Disimpan sebagai waktu epoch dalam milidetik.

TIMESTAMP(3)

VarChar(n)

FLOAT

Float

DOUBLE

Double

DECIMAL(10, 2)

VarChar(n)

ARRAY<FLOAT>

FloatVector

Catatan

Setelah membuat koleksi Milvus, buat indeks untuk bidang vektor.

ARRAY<DOUBLE>

Array<Double>[m]

ARRAY<INTEGER>

Array<Int32>[m]

ARRAY<BIGINT>

Array<Int64>[m]

Contoh

  • Tulis data streaming ke Milvus

-- Buat sumber data tiruan yang menghasilkan 100 baris per detik. Simulasi data streaming.
CREATE TEMPORARY TABLE mock_source (
    id STRING,
    vector ARRAY<FLOAT>,       -- Vektor dilewatkan sebagai larik FLOAT.
    event_time AS PROCTIME() -- Atribut waktu pemrosesan.
) WITH (
    'connector' = 'datagen',
    'rows-per-second' = '100',  -- Menghasilkan 100 baris per detik.
    'fields.id.kind' = 'sequence',
    'fields.id.start' = '1',
    'fields.id.end' = '1000'
);

CREATE TEMPORARY TABLE milvus_sink (
  id STRING,               -- Pengenal unik, seperti ID perangkat.
  vector ARRAY<FLOAT>,     -- Data vektor. Panjang larik harus konsisten dengan sumber.
  timestamp BIGINT         -- Timestamp untuk pemrosesan aliran.
  PRIMARY KEY (id) NOT ENFORCED  -- Diperlukan. Milvus hanya mendukung BIGINT atau STRING sebagai kunci utama.
) WITH (
  'connector'='milvus',
  'endpoint'='xxx',
  'port'='19530',
  'userName'='xxx',
  'password'='xxx',
  'databaseName'='xxxx',
  'collectionName'='xxxx'
);

-- Transformasikan data dan tulis ke Milvus.
INSERT INTO milvus_sink
SELECT 
    id,
    vector,
    UNIX_TIMESTAMP() * 1000 AS timestamp  -- Timestamp saat ini dalam milidetik.
FROM mock_source;
  • Lakukan pencarian vektor

CREATE TEMPORARY TABLE milvus_table (
  id STRING,               -- Pengenal unik.
  vector ARRAY<FLOAT>,     -- Data vektor. Panjang larik harus konsisten dengan sumber.
  PRIMARY KEY (id) NOT ENFORCED  -- Diperlukan. Milvus hanya mendukung BIGINT atau STRING sebagai kunci utama.
) WITH (
  'connector'='milvus',
  'endpoint'='xxx',
  'port'='19530',
  'userName'='xxx',
  'password'='xxx',
  'databaseName'='xxxx',
  'collectionName'='xxxx'
);
-- Temukan 2 item paling mirip untuk vektor [1.1, 2.2, 3.3].
SELECT * FROM 
LATERAL TABLE(
  VECTOR_SEARCH(
    TABLE milvus_table, 
    DESCRIPTOR(vector), 
    ARRAY[1.1, 2.2, 3.3], 
    2));

Catatan: Koleksi Milvus harus dimuat ke dalam memori sebelum menjalankan kueri. Untuk informasi lebih lanjut, lihat dokumentasi Milvus tentang Muat koleksi.