すべてのプロダクト
Search
ドキュメントセンター

Realtime Compute for Apache Flink:Realtime Compute for Apache Flinkを使用してベクトルデータをインポートする

最終更新日:Jan 16, 2025

AnalyticDB for PostgreSQLでは、flink-adbpg-connectorを使用してベクトルデータをインポートできます。 このトピックでは、ベクトルデータをAnalyticDB for PostgreSQLにインポートする方法について説明します。 この例では、ApsaraMQ for Kafkaデータが使用されます。

前提条件

  • AnalyticDB for PostgreSQLインスタンスが作成されました。 詳細については、「インスタンスの作成」をご参照ください。

  • フルマネージドFlinkワークスペースが作成されます。 Flinkワークスペースは、AnalyticDB for PostgreSQLインスタンスと同じ仮想プライベートクラウド (VPC) にあります。 詳細については、「フルマネージドFlinkの有効化」をご参照ください。

    • オープンソースのセルフマネージドFlinkを使用する場合は、flink-adbpg-connector$FLINK_HOME/libディレクトリにインストールされていることを確認してください。

    • フルマネージドFlinkを使用する場合、操作は必要ありません。

  • ベクトル検索拡張機能FastANNは、AnalyticDB for PostgreSQLデータベースにインストールされます。

    psqlクライアントで \dx fastannコマンドを実行して、FastANN拡張機能がインストールされているかどうかを確認できます。

    • 拡張機能に関する関連情報が返された場合、拡張機能がインストールされます。

    • 情報が返されない場合、 拡張機能をインストールするには、チケットを起票してください。

  • ApsaraMQ for Kafkaインスタンスが購入され、デプロイされます。 インスタンスは、AnalyticDB for PostgreSQLインスタンスと同じVPCにあります。 詳細については、「インターネットおよびVPC接続インスタンスの購入とデプロイ」をご参照ください。

  • FlinkワークスペースとApsaraMQ for KafkaインスタンスのCIDRブロックが、AnalyticDB for PostgreSQLインスタンスのIPアドレスホワイトリストに追加されます。 詳細については、「IPアドレスホワイトリストの設定」をご参照ください。

テストデータ

テストを容易にするために、AnalyticDB for PostgreSQLvector_sample_data.csvという名前のテストデータファイルを提供します。

次の表に、ファイルのスキーマを示します。

項目

データ型

説明

id

bigint

車のシリアル番号。

market_time

timestamp

車が市場に投入される時間。

color

varchar (10)

車の色。

price

int

車の価格。

特徴

float4[]

車の画像の特徴ベクトル。

Linuxシステムでは、コマンドを実行してテストデータをダウンロードできます。 サンプルコマンド:

wget https://help-static-aliyun-doc.aliyuncs.com/file-manage-files/zh-CN/20230606/uzkx/vector_sample_data.cs

手順

  1. 構造化インデックスとベクトルインデックスの作成

  2. ApsaraMQ for Kafkaトピックにベクターテストデータを書き込みます

  3. マッピングテーブルの作成とデータのインポート

構造化インデックスとベクトルインデックスの作成

  1. AnalyticDB for PostgreSQLデータベースに接続します。 この例では、psqlクライアントを使用してインスタンスに接続します。 詳細については、「クライアントツールを使用してインスタンスに接続する」トピックの「psql」セクションを参照してください。

  2. テストデータベースを作成して切り替えます。

    CREATE DATABASE adbpg_test;
    \c adbpg_test
  3. 宛先テーブルを作成します。

    vector_testが存在しない場合は

    CREATE SCHEMA IF NOT EXISTS vector_test;
    CREATE TABLE IF NOT EXISTS vector_test.car_info
    (
      id bigint NOT NULL,
      market_time timestamp,
      color varchar(10),
      price int,
      feature float4[],
      PRIMARY KEY(id)
    ) DISTRIBUTED BY(id);
  4. 構造化インデックスとベクトルインデックスを作成します。

    -- Change the storage format of the vector column to PLAIN. 
    ALTER TABLE vector_test.car_info ALTER COLUMN feature SET STORAGE PLAIN;
    
    -- Create structured indexes. 
    CREATE INDEX ON vector_test.car_info(market_time);
    CREATE INDEX ON vector_test.car_info(color);
    CREATE INDEX ON vector_test.car_info(price);
    
    -- Create a vector index. 
    CREATE INDEX ON vector_test.car_info USING ann(feature) 
    WITH (dim='10', pq_enable='0');

ApsaraMQ for Kafkaトピックへのベクターテストデータの書き込み

  1. ApsaraMQ for Kafkaトピックを作成します。

    bin/kafka-topics.sh --create --topic vector_ingest --partitions 1 
    --bootstrap-server <your_broker_list>
  2. ApsaraMQ for Kafkaトピックにベクターテストデータを書き込みます。

    bin/kafka-console-producer.sh 
    --bootstrap-server <your_broker_list>
    --topic vector_ingest < ../vector_sample_data.csv

<your_broker_list>: ApsaraMQ for Kafkaインスタンスのエンドポイント。 ApsaraMQ for Kafkaコンソールに移動し、[インスタンスの詳細] ページの [エンドポイント情報] セクションでインスタンスのエンドポイントを表示できます。

マッピングテーブルの作成とデータのインポート

  1. Flinkドラフトを作成します。

    1. Realtime Compute for Apache Flinkコンソールにログインします。 [完全管理Flink] タブで、管理するワークスペースを見つけ、[操作] 列の [コンソール] をクリックします。

    2. 左側のナビゲーションウィンドウで、[開発] > [ETL] をクリックします。 SQLエディターページの左上隅で、[新規作成] をクリックします。 [新しいドラフト] ダイアログボックスで、[SQLスクリプト] タブの [空白のストリームドラフト] をクリックし、[次へ] をクリックします。

    3. [新しいドラフト] ダイアログボックスで、ドラフトのパラメーターを設定します。 下表に、各パラメーターを説明します。

      パラメーター

      説明

      名前

      作成するドラフトの名前。

      説明

      ドラフト名は、現在のプロジェクトで一意である必要があります。

      adbpg-test

      場所

      ドラフトのコードファイルが格納されているフォルダ。

      フォルダの右側にある新建文件夹アイコンをクリックすると、サブフォルダを作成できます。

      ドラフト

      エンジン版

      ドラフトで使用されるFlinkのエンジンバージョン。 エンジンのバージョン、バージョンマッピング、および各バージョンのライフサイクルにおける重要な時点の詳細については、「エンジンバージョン」をご参照ください。

      vvr-6.0.6-flink-1.15

  2. AnalyticDB for PostgreSQLマッピングテーブルを作成します。

    CREATE TABLE vector_ingest (
      id INT,
      market_time TIMESTAMP,
      color VARCHAR(10),
      price int,
      feature VARCHAR
    )WITH (
       'connector' = 'adbpg-nightly-1.13',
       'url' = 'jdbc:postgresql://<your_instance_url>:5432/adbpg_test',
       'tablename' = 'car_info',
       'username' = '<your_username>',
       'password' = '<your_password>',
       'targetschema' = 'vector_test',
       'maxretrytimes' = '2',
       'batchsize' = '3000',
       'batchwritetimeoutms' = '10000',
       'connectionmaxactive' = '20',
       'conflictmode' = 'ignore',
       'exceptionmode' = 'ignore',
       'casesensitive' = '0',
       'writemode' = '1',
       'retrywaittime' = '200'
    );

    パラメーターの詳細については、「Realtime Compute For Apache Flinkを使用してAnalyticDB for PostgreSQLにデータを書き込む」をご参照ください。

  3. ApsaraMQ for Kafkaマッピングテーブルを作成します。

    CREATE TABLE vector_kafka (
      id INT,
      market_time TIMESTAMP,
      color VARCHAR(10),
      price int,
      feature string
    ) 
    WITH (
        'connector' = 'kafka',
        'properties.bootstrap.servers' = '<your_broker_list>',
        'topic' = 'vector_ingest',
        'format' = 'csv',
        'csv.field-delimiter' = '\t',
        'scan.startup.mode' = 'earliest-offset'
    );

    下表に、各パラメーターを説明します。

    パラメーター

    必須

    説明

    コネクター

    継続する

    コネクタの名前。 値をkafkaに設定します。

    properties.bootstrap.servers

    継続する

    ApsaraMQ for Kafkaインスタンスのエンドポイント。 ApsaraMQ for Kafkaコンソールに移動し、[インスタンスの詳細] ページの [エンドポイント情報] セクションでインスタンスのエンドポイントを表示できます。

    topic

    継続する

    ApsaraMQ for Kafkaメッセージを含むトピックの名前。

    フォーマット

    継続する

    ApsaraMQ for Kafkaメッセージの値フィールドの書き込みに使用される形式。 有効な値:

    • csv

    • JSON

    • avro

    • debezium-json

    • 運河-json

    • maxwell-json

    • avro-confluent

    csv.field-区切り文字

    継続する

    CSVフィールドの区切り文字。

    scan.startup.mode

    継続する

    ApsaraMQ for Kafkaインスタンスからデータを読み取る開始オフセット。 有効な値:

    • bearest-offset: ApsaraMQ for Kafkaインスタンスの最も早いパーティションからデータが読み取られます。

    • latest-offset: ApsaraMQ for Kafkaインスタンスの最新パーティションからデータが読み取られます。

  4. インポートタスクを作成します。

    INSERT INTO vector_ingest SELECT * FROM vector_kafka;