AnalyticDB for PostgreSQL memungkinkan Anda mengimpor data vektor menggunakan flink-adbpg-connector. Topik ini menjelaskan cara mengimpor data vektor ke AnalyticDB for PostgreSQL. Dalam contoh ini, data ApsaraMQ for Kafka digunakan.
Prasyarat
Sebuah instance AnalyticDB for PostgreSQL telah dibuat. Untuk informasi lebih lanjut, lihat Buat instance.
Sebuah ruang kerja Flink yang sepenuhnya dikelola telah dibuat. Ruang kerja Flink berada di dalam virtual private cloud (VPC) yang sama dengan instance AnalyticDB for PostgreSQL. Untuk informasi lebih lanjut, lihat Aktifkan Flink yang sepenuhnya dikelola.
Jika Anda menggunakan Flink sumber terbuka yang dikelola sendiri, pastikan bahwa flink-adbpg-connector telah diinstal di direktori
$FLINK_HOME/lib.Jika Anda menggunakan Flink yang sepenuhnya dikelola, tidak ada operasi tambahan yang diperlukan.
Ekstensi pengambilan vektor FastANN telah diinstal di database AnalyticDB for PostgreSQL.
Anda dapat menjalankan perintah
\dx fastannpada klien psql untuk memeriksa apakah ekstensi FastANN telah diinstal.Jika informasi relevan tentang ekstensi dikembalikan, ekstensi telah diinstal.
Jika tidak ada informasi yang dikembalikan, Submit a ticket untuk menginstal ekstensi.
Sebuah instance ApsaraMQ for Kafka telah dibeli dan diterapkan. Instance tersebut berada di VPC yang sama dengan instance AnalyticDB for PostgreSQL. Untuk informasi lebih lanjut, lihat Beli dan terapkan instance yang terhubung ke Internet dan VPC.
Blok CIDR dari ruang kerja Flink dan instance ApsaraMQ for Kafka ditambahkan ke daftar putih alamat IP instance AnalyticDB for PostgreSQL. Untuk informasi lebih lanjut, lihat Konfigurasikan daftar putih alamat IP.
Data uji
Untuk memudahkan pengujian Anda, AnalyticDB for PostgreSQL menyediakan file data uji bernama vector_sample_data.csv.
Tabel berikut menjelaskan skema file tersebut.
Bidang | Tipe | Deskripsi |
id | bigint | Nomor seri mobil. |
market_time | timestamp | Waktu ketika mobil diluncurkan ke pasar. |
color | varchar(10) | Warna mobil. |
price | int | Harga mobil. |
feature | float4[] | Vektor fitur gambar mobil. |
Di sistem Linux, Anda dapat menjalankan perintah berikut untuk mengunduh data uji:
wget https://help-static-aliyun-doc.aliyuncs.com/file-manage-files/zh-CN/20230606/uzkx/vector_sample_data.csProsedur
Buat indeks terstruktur dan indeks vektor
Sambungkan ke database AnalyticDB for PostgreSQL. Dalam contoh ini, klien psql digunakan untuk menyambungkan ke instance. Untuk informasi lebih lanjut, lihat bagian "psql" dari topik Gunakan alat klien untuk menyambungkan ke instance.
Buat dan beralih ke database uji.
CREATE DATABASE adbpg_test; \c adbpg_testBuat tabel tujuan.
CREATE SCHEMA IF NOT EXISTS vector_test; CREATE TABLE IF NOT EXISTS vector_test.car_info ( id bigint NOT NULL, market_time timestamp, color varchar(10), price int, feature float4[], PRIMARY KEY(id) ) DISTRIBUTED BY(id);Buat indeks terstruktur dan indeks vektor.
-- Ubah format penyimpanan kolom vektor menjadi PLAIN. ALTER TABLE vector_test.car_info ALTER COLUMN feature SET STORAGE PLAIN; -- Buat indeks terstruktur. CREATE INDEX ON vector_test.car_info(market_time); CREATE INDEX ON vector_test.car_info(color); CREATE INDEX ON vector_test.car_info(price); -- Buat indeks vektor. CREATE INDEX ON vector_test.car_info USING ann(feature) WITH (dim='10', pq_enable='0');
Tulis data uji vektor ke topik ApsaraMQ for Kafka
Buat topik ApsaraMQ for Kafka.
bin/kafka-topics.sh --create --topic vector_ingest --partitions 1 --bootstrap-server <your_broker_list>Tulis data uji vektor ke topik ApsaraMQ for Kafka.
bin/kafka-console-producer.sh --bootstrap-server <your_broker_list> --topic vector_ingest < ../vector_sample_data.csv
<your_broker_list>: titik akhir instance ApsaraMQ for Kafka. Anda dapat masuk ke Konsol ApsaraMQ for Kafka dan melihat titik akhir instance di bagian Endpoint Information halaman Instance Details.
Buat tabel pemetaan dan impor data
Buat draf Flink.
Masuk ke Konsol Realtime Compute for Apache Flink. Pada tab Flink yang Sepenuhnya Dikelola, temukan ruang kerja yang ingin Anda kelola dan klik Console di kolom Actions.
Di panel navigasi di sebelah kiri, klik . Di pojok kiri atas halaman SQL Editor, klik New. Di kotak dialog Draf Baru, klik Blank Stream Draft pada tab Skrip SQL dan klik Next.
Di kotak dialog New Draft, konfigurasikan parameter draf. Tabel berikut menjelaskan parameter tersebut.
Parameter
Deskripsi
Contoh
Name
Nama draf yang ingin Anda buat.
CatatanNama draf harus unik di proyek saat ini.
adbpg-test
Location
Folder tempat file kode draf disimpan.
Anda dapat mengklik ikon
di sebelah kanan folder untuk membuat subfolder. Draft
Engine Version
Versi mesin Flink yang digunakan oleh draf. Untuk informasi lebih lanjut tentang versi mesin, pemetaan versi, dan titik waktu penting dalam siklus hidup setiap versi, lihat Versi Mesin.
vvr-6.0.6-flink-1.15
Buat tabel pemetaan AnalyticDB for PostgreSQL.
CREATE TABLE vector_ingest ( id INT, market_time TIMESTAMP, color VARCHAR(10), price int, feature VARCHAR )WITH ( 'connector' = 'adbpg-nightly-1.13', 'url' = 'jdbc:postgresql://<your_instance_url>:5432/adbpg_test', 'tablename' = 'car_info', 'username' = '<your_username>', 'password' = '<your_password>', 'targetschema' = 'vector_test', 'maxretrytimes' = '2', 'batchsize' = '3000', 'batchwritetimeoutms' = '10000', 'connectionmaxactive' = '20', 'conflictmode' = 'ignore', 'exceptionmode' = 'ignore', 'casesensitive' = '0', 'writemode' = '1', 'retrywaittime' = '200' );Untuk informasi lebih lanjut tentang parameter, lihat Gunakan Realtime Compute for Apache Flink untuk menulis data ke AnalyticDB for PostgreSQL.
Buat tabel pemetaan ApsaraMQ for Kafka.
CREATE TABLE vector_kafka ( id INT, market_time TIMESTAMP, color VARCHAR(10), price int, feature string ) WITH ( 'connector' = 'kafka', 'properties.bootstrap.servers' = '<your_broker_list>', 'topic' = 'vector_ingest', 'format' = 'csv', 'csv.field-delimiter' = '\t', 'scan.startup.mode' = 'earliest-offset' );Tabel berikut menjelaskan parameter tersebut.
Parameter
Diperlukan
Deskripsi
connector
Ya
Nama konektor. Setel nilai ke kafka.
properties.bootstrap.servers
Ya
Titik akhir instance ApsaraMQ for Kafka. Anda dapat masuk ke Konsol ApsaraMQ for Kafka dan melihat titik akhir instance di bagian Informasi Titik Akhir halaman Detail Instance.
topic
Ya
Nama topik yang berisi pesan ApsaraMQ for Kafka.
format
Ya
Format yang digunakan untuk menulis bidang nilai pesan ApsaraMQ for Kafka. Nilai valid:
csv
json
avro
debezium-json
canal-json
maxwell-json
avro-confluent
raw
csv.field-delimiter
Ya
Pemisah bidang CSV.
scan.startup.mode
Ya
Offset awal dari mana data dibaca dari instance ApsaraMQ for Kafka. Nilai valid:
earliest-offset: Data dibaca dari partisi paling awal instance ApsaraMQ for Kafka.
latest-offset: Data dibaca dari partisi paling baru instance ApsaraMQ for Kafka.
Buat tugas impor.
INSERT INTO vector_ingest SELECT * FROM vector_kafka;