Topik ini menjelaskan cara menggunakan Konektor Milvus.
Ikhtisar
Milvus adalah database vektor yang sangat skalabel dan dirancang untuk menangani data tidak terstruktur berskala besar seperti citra, teks, dan audio. Dengan dukungan pencarian kesamaan yang efisien, Milvus ideal untuk kasus penggunaan seperti sistem rekomendasi, pengambilan citra, dan pencarian semantik. Konektor Milvus mendukung kemampuan berikut:
Kategori | Rincian |
Jenis yang didukung | Tabel sink, tabel vektor |
Mode operasi | Streaming |
Format data | Tidak ada |
Metrik pemantauan spesifik | Tidak ada |
Jenis API | SQL |
Mendukung pembaruan/penghapusan | Ya |
Fitur
Konektor Milvus mengintegrasikan Apache Flink dengan database vektor Milvus secara ketat untuk menciptakan pipa data berkinerja tinggi dan andal dalam skenario pencarian vektor waktu nyata. Fitur utama dari Konektor Milvus meliputi:
Tulis dengan konkurensi tinggi: Mendukung paralelisme sink yang dapat dikonfigurasi.
Percobaan ulang otomatis: Mengulangi operasi gagal untuk meningkatkan stabilitas.
Buffering dalam batch: Meningkatkan kinerja penulisan dengan mengelompokkan catatan sebelum disiram.
Semantik setidaknya sekali: Memastikan konsistensi akhir melalui pembaruan idempoten berdasarkan kunci utama.
Pencarian vektor: Memungkinkan pencarian kesamaan vektor waktu nyata langsung di dalam Flink SQL.
Prasyarat
Anda telah membuat kluster Milvus. Untuk informasi lebih lanjut, lihat Buat instans Milvus dengan cepat.
Anda telah membuat koleksi Milvus. Jika Anda berencana menulis ke partisi tertentu, pastikan bahwa partisi Milvus sudah ada.
Keterbatasan
Menulis ke tabel sink memerlukan Ververica Runtime (VVR) versi 11.1 atau lebih baru.
Meminta tabel vektor memerlukan VVR versi 11.3 atau lebih baru.
Hanya Milvus 2.4.x yang didukung.
Konektor Milvus hanya mendukung semantik setidaknya sekali.
Sintaksis
CREATE TEMPORARY TABLE milvus_sink (
id BIGINT,
f1 STRING,
f2 BOOLEAN,
f3 TINYINT,
f4 SMALLINT,
f5 INTEGER,
f6 DATE,
f7 TIME(3),
f8 TIMESTAMP_LTZ(3),
f9 TIMESTAMP(3),
f10 FLOAT,
f11 DOUBLE,
f12 DECIMAL(10, 2),
f13 ARRAY<FLOAT>,
f14 ARRAY<DOUBLE>,
f15 ARRAY<INTEGER>,
f16 ARRAY<BIGINT>,
PRIMARY KEY (id) NOT ENFORCED -- Diperlukan. Milvus hanya mendukung BIGINT atau STRING sebagai kunci utama.
) WITH (
'connector'='milvus',
'endpoint'='<yourEndpoint>',
'port'='<yourPort>',
'userName'='<yourUserName>',
'password'='<yourPassword>',
'databaseName'='<yourDatabaseName>',
'collectionName'='<yourCollectionName>'
);Opsi konektor
Umum
Opsi | Deskripsi | Tipe data | Diperlukan | Nilai default | Catatan |
connector | Nama konektor. | String | Ya | Atur ke | |
endpoint | Titik akhir (alamat IP atau nama domain) dari database Milvus. | String | Ya | ||
port | Nomor port dari database Milvus. | INTEGER | Tidak |
| |
username | Nama pengguna untuk database Milvus. | STRING | Ya | Tidak ada. | |
password | Kata sandi untuk database Milvus. | STRING | Ya | ||
databaseName | Nama database Milvus. | STRING | Ya | ||
collectionName | Nama koleksi Milvus. | STRING | Ya | ||
partitionName | Nama partisi untuk ditulis. | STRING | Tidak |
| |
partitionKey.enabled | Menentukan apakah koleksi menggunakan bidang skalar sebagai kunci partisi. | BOOLEAN | Tidak |
| |
maxRetries | Jumlah percobaan ulang untuk operasi gagal. | INTEGER | Tidak |
| Tidak ada. |
Spesifik Sink
Opsi | Deskripsi | Tipe data | Diperlukan | Nilai default | Catatan |
sink.parallelism | Paralelisme untuk operator sink. | INTEGER | Tidak | Jika tidak diatur, paralelisme diwarisi dari operator hulu. | |
sink.maxRetries | Jumlah maksimum percobaan ulang saat penulisan gagal. | INTEGER | Tidak |
| Dalam VVR 11.3 dan versi lebih baru, opsi ini sudah ditinggalkan. Gunakan |
sink.buffer-flush.max-rows | Jumlah maksimum catatan untuk dibuffer (termasuk append, upsert, dan delete). Flush dipicu ketika jumlah ini tercapai. | INTEGER | Tidak |
| Atur ke |
sink.buffer-flush.interval | Interval waktu dalam milidetik (ms) untuk menyiram catatan yang dibuffer. Flush dipicu ketika interval ini terlampaui. | INTEGER | Tidak |
| Atur ke |
sink.ignoreDelete | Menentukan apakah akan mengabaikan operasi penghapusan. | BOOLEAN | Tidak |
| Nilai valid:
|
Spesifik tabel vektor
Parameter | Deskripsi | Tipe data | Diperlukan | Nilai default | Catatan |
search.metric | Metrik yang digunakan untuk mengukur kesamaan vektor. | String | Tidak |
| Untuk informasi tentang metrik kesamaan yang didukung, lihat dokumentasi Milvus. Milvus v2.4 saat ini mendukung metrik berikut:
|
Pemetaan tipe
Tipe Flink SQL | Tipe Milvus |
STRING | VarChar(n) |
BOOLEAN | Bool |
TINYINT | Int8 |
SMALLINT | Int16 |
INTEGER | Int32 |
BIGINT | Int64 |
DATE | VarChar(n) |
TIME(3) | VarChar(n) |
TIMESTAMP_LTZ(3) | Int64 Catatan Disimpan sebagai waktu epoch dalam milidetik. |
TIMESTAMP(3) | VarChar(n) |
FLOAT | Float |
DOUBLE | Double |
DECIMAL(10, 2) | VarChar(n) |
ARRAY<FLOAT> | FloatVector Catatan Setelah membuat koleksi Milvus, buat indeks untuk bidang vektor. |
ARRAY<DOUBLE> | Array<Double>[m] |
ARRAY<INTEGER> | Array<Int32>[m] |
ARRAY<BIGINT> | Array<Int64>[m] |
Contoh
Tulis data streaming ke Milvus
-- Buat sumber data tiruan yang menghasilkan 100 baris per detik. Simulasi data streaming.
CREATE TEMPORARY TABLE mock_source (
id STRING,
vector ARRAY<FLOAT>, -- Vektor dilewatkan sebagai larik FLOAT.
event_time AS PROCTIME() -- Atribut waktu pemrosesan.
) WITH (
'connector' = 'datagen',
'rows-per-second' = '100', -- Menghasilkan 100 baris per detik.
'fields.id.kind' = 'sequence',
'fields.id.start' = '1',
'fields.id.end' = '1000'
);
CREATE TEMPORARY TABLE milvus_sink (
id STRING, -- Pengenal unik, seperti ID perangkat.
vector ARRAY<FLOAT>, -- Data vektor. Panjang larik harus konsisten dengan sumber.
timestamp BIGINT -- Timestamp untuk pemrosesan aliran.
PRIMARY KEY (id) NOT ENFORCED -- Diperlukan. Milvus hanya mendukung BIGINT atau STRING sebagai kunci utama.
) WITH (
'connector'='milvus',
'endpoint'='xxx',
'port'='19530',
'userName'='xxx',
'password'='xxx',
'databaseName'='xxxx',
'collectionName'='xxxx'
);
-- Transformasikan data dan tulis ke Milvus.
INSERT INTO milvus_sink
SELECT
id,
vector,
UNIX_TIMESTAMP() * 1000 AS timestamp -- Timestamp saat ini dalam milidetik.
FROM mock_source;Lakukan pencarian vektor
CREATE TEMPORARY TABLE milvus_table (
id STRING, -- Pengenal unik.
vector ARRAY<FLOAT>, -- Data vektor. Panjang larik harus konsisten dengan sumber.
PRIMARY KEY (id) NOT ENFORCED -- Diperlukan. Milvus hanya mendukung BIGINT atau STRING sebagai kunci utama.
) WITH (
'connector'='milvus',
'endpoint'='xxx',
'port'='19530',
'userName'='xxx',
'password'='xxx',
'databaseName'='xxxx',
'collectionName'='xxxx'
);
-- Temukan 2 item paling mirip untuk vektor [1.1, 2.2, 3.3].
SELECT * FROM
LATERAL TABLE(
VECTOR_SEARCH(
TABLE milvus_table,
DESCRIPTOR(vector),
ARRAY[1.1, 2.2, 3.3],
2));Catatan: Koleksi Milvus harus dimuat ke dalam memori sebelum menjalankan kueri. Untuk informasi lebih lanjut, lihat dokumentasi Milvus tentang Muat koleksi.