All Products
Search
Document Center

AnalyticDB:Gunakan Realtime Compute for Apache Flink untuk mengimpor data vektor

Last Updated:Jan 14, 2026

AnalyticDB for PostgreSQL memungkinkan Anda mengimpor data vektor menggunakan flink-adbpg-connector. Topik ini menjelaskan cara mengimpor data vektor ke AnalyticDB for PostgreSQL. Dalam contoh ini, data ApsaraMQ for Kafka digunakan.

Prasyarat

  • Sebuah instance AnalyticDB for PostgreSQL telah dibuat. Untuk informasi lebih lanjut, lihat Buat instance.

  • Sebuah ruang kerja Flink yang sepenuhnya dikelola telah dibuat. Ruang kerja Flink berada di dalam virtual private cloud (VPC) yang sama dengan instance AnalyticDB for PostgreSQL. Untuk informasi lebih lanjut, lihat Aktifkan Flink yang sepenuhnya dikelola.

  • Ekstensi pengambilan vektor FastANN telah diinstal di database AnalyticDB for PostgreSQL.

    Anda dapat menjalankan perintah \dx fastann pada klien psql untuk memeriksa apakah ekstensi FastANN telah diinstal.

    • Jika informasi relevan tentang ekstensi dikembalikan, ekstensi telah diinstal.

    • Jika tidak ada informasi yang dikembalikan, Submit a ticket untuk menginstal ekstensi.

  • Sebuah instance ApsaraMQ for Kafka telah dibeli dan diterapkan. Instance tersebut berada di VPC yang sama dengan instance AnalyticDB for PostgreSQL. Untuk informasi lebih lanjut, lihat Beli dan terapkan instance yang terhubung ke Internet dan VPC.

  • Blok CIDR dari ruang kerja Flink dan instance ApsaraMQ for Kafka ditambahkan ke daftar putih alamat IP instance AnalyticDB for PostgreSQL. Untuk informasi lebih lanjut, lihat Konfigurasikan daftar putih alamat IP.

Data uji

Untuk memudahkan pengujian Anda, AnalyticDB for PostgreSQL menyediakan file data uji bernama vector_sample_data.csv.

Tabel berikut menjelaskan skema file tersebut.

Bidang

Tipe

Deskripsi

id

bigint

Nomor seri mobil.

market_time

timestamp

Waktu ketika mobil diluncurkan ke pasar.

color

varchar(10)

Warna mobil.

price

int

Harga mobil.

feature

float4[]

Vektor fitur gambar mobil.

Di sistem Linux, Anda dapat menjalankan perintah berikut untuk mengunduh data uji:

wget https://help-static-aliyun-doc.aliyuncs.com/file-manage-files/zh-CN/20230606/uzkx/vector_sample_data.cs

Prosedur

  1. Buat indeks terstruktur dan indeks vektor.

  2. Tulis data uji vektor ke topik ApsaraMQ for Kafka.

  3. Buat tabel pemetaan dan impor data.

Buat indeks terstruktur dan indeks vektor

  1. Sambungkan ke database AnalyticDB for PostgreSQL. Dalam contoh ini, klien psql digunakan untuk menyambungkan ke instance. Untuk informasi lebih lanjut, lihat bagian "psql" dari topik Gunakan alat klien untuk menyambungkan ke instance.

  2. Buat dan beralih ke database uji.

    CREATE DATABASE adbpg_test;
    \c adbpg_test
  3. Buat tabel tujuan.

    CREATE SCHEMA IF NOT EXISTS vector_test;
    CREATE TABLE IF NOT EXISTS vector_test.car_info
    (
      id bigint NOT NULL,
      market_time timestamp,
      color varchar(10),
      price int,
      feature float4[],
      PRIMARY KEY(id)
    ) DISTRIBUTED BY(id);
  4. Buat indeks terstruktur dan indeks vektor.

    -- Ubah format penyimpanan kolom vektor menjadi PLAIN. 
    ALTER TABLE vector_test.car_info ALTER COLUMN feature SET STORAGE PLAIN;
    
    -- Buat indeks terstruktur. 
    CREATE INDEX ON vector_test.car_info(market_time);
    CREATE INDEX ON vector_test.car_info(color);
    CREATE INDEX ON vector_test.car_info(price);
    
    -- Buat indeks vektor. 
    CREATE INDEX ON vector_test.car_info USING ann(feature) 
    WITH (dim='10', pq_enable='0');

Tulis data uji vektor ke topik ApsaraMQ for Kafka

  1. Buat topik ApsaraMQ for Kafka.

    bin/kafka-topics.sh --create --topic vector_ingest --partitions 1 
    --bootstrap-server <your_broker_list>
  2. Tulis data uji vektor ke topik ApsaraMQ for Kafka.

    bin/kafka-console-producer.sh 
    --bootstrap-server <your_broker_list>
    --topic vector_ingest < ../vector_sample_data.csv

<your_broker_list>: titik akhir instance ApsaraMQ for Kafka. Anda dapat masuk ke Konsol ApsaraMQ for Kafka dan melihat titik akhir instance di bagian Endpoint Information halaman Instance Details.

Buat tabel pemetaan dan impor data

  1. Buat draf Flink.

    1. Masuk ke Konsol Realtime Compute for Apache Flink. Pada tab Flink yang Sepenuhnya Dikelola, temukan ruang kerja yang ingin Anda kelola dan klik Console di kolom Actions.

    2. Di panel navigasi di sebelah kiri, klik Development > ETL. Di pojok kiri atas halaman SQL Editor, klik New. Di kotak dialog Draf Baru, klik Blank Stream Draft pada tab Skrip SQL dan klik Next.

    3. Di kotak dialog New Draft, konfigurasikan parameter draf. Tabel berikut menjelaskan parameter tersebut.

      Parameter

      Deskripsi

      Contoh

      Name

      Nama draf yang ingin Anda buat.

      Catatan

      Nama draf harus unik di proyek saat ini.

      adbpg-test

      Location

      Folder tempat file kode draf disimpan.

      Anda dapat mengklik ikon 新建文件夹 di sebelah kanan folder untuk membuat subfolder.

      Draft

      Engine Version

      Versi mesin Flink yang digunakan oleh draf. Untuk informasi lebih lanjut tentang versi mesin, pemetaan versi, dan titik waktu penting dalam siklus hidup setiap versi, lihat Versi Mesin.

      vvr-6.0.6-flink-1.15

  2. Buat tabel pemetaan AnalyticDB for PostgreSQL.

    CREATE TABLE vector_ingest (
      id INT,
      market_time TIMESTAMP,
      color VARCHAR(10),
      price int,
      feature VARCHAR
    )WITH (
       'connector' = 'adbpg-nightly-1.13',
       'url' = 'jdbc:postgresql://<your_instance_url>:5432/adbpg_test',
       'tablename' = 'car_info',
       'username' = '<your_username>',
       'password' = '<your_password>',
       'targetschema' = 'vector_test',
       'maxretrytimes' = '2',
       'batchsize' = '3000',
       'batchwritetimeoutms' = '10000',
       'connectionmaxactive' = '20',
       'conflictmode' = 'ignore',
       'exceptionmode' = 'ignore',
       'casesensitive' = '0',
       'writemode' = '1',
       'retrywaittime' = '200'
    );

    Untuk informasi lebih lanjut tentang parameter, lihat Gunakan Realtime Compute for Apache Flink untuk menulis data ke AnalyticDB for PostgreSQL.

  3. Buat tabel pemetaan ApsaraMQ for Kafka.

    CREATE TABLE vector_kafka (
      id INT,
      market_time TIMESTAMP,
      color VARCHAR(10),
      price int,
      feature string
    ) 
    WITH (
        'connector' = 'kafka',
        'properties.bootstrap.servers' = '<your_broker_list>',
        'topic' = 'vector_ingest',
        'format' = 'csv',
        'csv.field-delimiter' = '\t',
        'scan.startup.mode' = 'earliest-offset'
    );

    Tabel berikut menjelaskan parameter tersebut.

    Parameter

    Diperlukan

    Deskripsi

    connector

    Ya

    Nama konektor. Setel nilai ke kafka.

    properties.bootstrap.servers

    Ya

    Titik akhir instance ApsaraMQ for Kafka. Anda dapat masuk ke Konsol ApsaraMQ for Kafka dan melihat titik akhir instance di bagian Informasi Titik Akhir halaman Detail Instance.

    topic

    Ya

    Nama topik yang berisi pesan ApsaraMQ for Kafka.

    format

    Ya

    Format yang digunakan untuk menulis bidang nilai pesan ApsaraMQ for Kafka. Nilai valid:

    • csv

    • json

    • avro

    • debezium-json

    • canal-json

    • maxwell-json

    • avro-confluent

    • raw

    csv.field-delimiter

    Ya

    Pemisah bidang CSV.

    scan.startup.mode

    Ya

    Offset awal dari mana data dibaca dari instance ApsaraMQ for Kafka. Nilai valid:

    • earliest-offset: Data dibaca dari partisi paling awal instance ApsaraMQ for Kafka.

    • latest-offset: Data dibaca dari partisi paling baru instance ApsaraMQ for Kafka.

  4. Buat tugas impor.

    INSERT INTO vector_ingest SELECT * FROM vector_kafka;