すべてのプロダクト
Search
ドキュメントセンター

AnalyticDB:自己管理型QdrantクラスターからAnalyticDB for PostgreSQLインスタンスへのデータの移行

最終更新日:Sep 24, 2024

Qdrantは、ベクトルデータを保存、検索、および管理するために使用されるベクトル類似性検索エンジンです。 Pythonプログラミング言語を使用して、自己管理型QdrantクラスターからAnalyticDB for PostgreSQLインスタンスに収集データを移行できます。

前提条件

  • Qdrantクラスターが作成されます。

  • Pythonがインストールされています。 Python 3.8以降を使用することを推奨します。

  • 必要なPythonライブラリがインストールされます。

    pip install psycopg2
    pip install qdrant-client==1.6.0
    pip install pyaml
    pip install tqdm

手順

手順1: Qdrantクラスターからのデータのエクスポート

  1. データエクスポート用にexport.pyスクリプトとqdrant2csv.yaml設定ファイルを準備し、出力ディレクトリを作成します。 このトピックでは、outputをディレクトリ名として使用します。

    export.pyスクリプトには、次のコンテンツが含まれています。

    import yaml
    import json
    from qdrant_client import QdrantClient
    import os
    from enum import IntEnum
    from tqdm import tqdm
    
    with open("./qdrant2csv.yaml", "r") as f:
        config = yaml.safe_load(f)
    
    
    print("configuration:")
    print(config)
    
    qdrant_config = config["qdrant"]
    
    
    class DataType(IntEnum):
        ID = 1
        FLOAT_VECTOR = 2
        JSON = 3
    
    
    def data_convert_to_str(data, dtype, delimeter):
        if dtype == DataType.ID:
            return str(data)
        elif dtype == DataType.FLOAT_VECTOR:
            return "{" + ", ".join(str(x) for x in data) + "}"
        elif dtype == DataType.JSON:
            return str(data).replace(delimeter, f"\\{delimeter}").replace("\"", "\\\"")
        Exception(f"Unsupported DataType {dtype}")
    
    
    def csv_write_rows(datum, fd, fields_types, delimiter="|"):
        for data in datum:
            for i in range(len(data)):
                data[i] = data_convert_to_str(data[i], fields_types[i], delimiter)
            fd.write(delimiter.join(data) + "\n")
    
    
    def csv_write_header(headers, fd, delimiter="|"):
        fd.write(delimiter.join(headers) + "\n")
    
    
    def dump_collection(collection_name: str):
        results = []
        file_cnt = 0
        print("connecting to qdrant...")
        client = QdrantClient(**qdrant_config)
    
        export_config = config["export"]
        tmp_path = os.path.join(export_config["output_path"], collection_name)
        if not os.path.exists(tmp_path):
            os.mkdir(tmp_path)
    
        # fetch info of collection
        fields_meta_list = ["id bigint"]
        fields_types = [DataType.ID]
        headers = ["id"]
        collection = client.get_collection(collection_name)
        total_num = collection.points_count
        if isinstance(collection.config.params.vectors, dict):
            # multi vectors
            for vec_name in collection.config.params.vectors.keys():
                fields_types.append(DataType.FLOAT_VECTOR)
                fields_meta_list.append(f"{vec_name} real[]")
                headers.append(vec_name)
        else:
            # single vector
            fields_types.append(DataType.FLOAT_VECTOR)
            fields_meta_list.append("vector real[]")
            headers.append("vector")
    
        fields_types.append(DataType.JSON)
        fields_meta_list.append("payload json")
        headers.append("payload")
    
        fields_meta_str = ','.join(fields_meta_list)
        create_table_sql = f"CREATE TABLE {collection_name} " \
                           f" ({fields_meta_str});"
    
        with open(os.path.join(export_config["output_path"], collection_name, "create_table.sql"), "w") as f_d:
            f_d.write(create_table_sql)
    
        print(create_table_sql)
    
        def write_to_csv_file(col_names, data):
            if len(results) == 0:
                return
            nonlocal file_cnt
            assert(file_cnt <= 1e9)
            output_file_name = os.path.join(export_config["output_path"], collection_name, f"{str(file_cnt).zfill(10)}.csv")
            with open(output_file_name, "w", newline="") as csv_file:
                # write header
                csv_write_header(col_names, csv_file)
                # write data
                csv_write_rows(data, csv_file, fields_types)
                file_cnt += 1
                results.clear()
    
        offset_id = None
    
        with tqdm(total=total_num, bar_format="{l_bar}{bar}| {n_fmt}/{total_fmt}") as pbar:
            while True:
                res = client.scroll(collection_name=collection_name,
                                    limit=1000,
                                    offset=offset_id,
                                    with_payload=True,
                                    with_vectors=True)
    
                records = res[0]
                for record in records:
                    # append id
                    record_list = [record.id]
                    # append vectors
                    if isinstance(record.vector, dict):
                        # multi vector
                        for vector_name in headers[1:-1]:
                            record_list.append(record.vector[vector_name])
                    else:
                        # single vector
                        record_list.append(record.vector)
                    # append payload
                    record_list.append(json.dumps(record.payload, ensure_ascii=False))
                    results.append(record_list)
    
                    if len(results) >= export_config["max_line_in_file"]:
                        write_to_csv_file(headers, data=results)
    
                    pbar.update(1)
    
                if len(res) == 0 or len(res[0]) == 0 or res[1] is None:
                    # finished
                    break
                else:
                    offset_id = res[1]
    
        write_to_csv_file(headers, data=results)
    
    
    for name in config["export"]["collections"]:
        dump_collection(name)
    

    qdrant2csv.yaml設定ファイルには、次の内容が含まれています。

    qdrant: # The configuration items that are used to connect to the Qdrant cluster.
        host: 'localhost'  # The host address of the Qdrant service.
        port: 6333        # The port number of the Qdrant service. Default value: 6433. 
        grpc_port: 6434   # The port number of the gRPC interface. Default value:  6334.
        api_key: ''  # The API key for authentication in Qdrant Cloud. 
        url: ''      # The hostname or string."Optional[scheme], host, Optional[port], Optional[prefix]"
        location: '' # If you set this field to memory, this prompts the script to connect to the Qdrant cluster in in-memory mode. If you enter a regular string, this operation is the same as specifying a complete URL in the url field. If you do not specify this field, the cluster is connected by using the host and port fields.        
    
    export:
       collections:
        - 'test_collection'
        - 'multi'                 # All Qdrant collections that you want to export.
      max_line_in_file: 40000     # The maximum number of lines that are contained in each output CSV file.
      output_path: './output'     # The path to the directory in which the exported CSV files are stored.
    
  2. export.pyスクリプト、qdrant2csv.yaml設定ファイル、およびoutputディレクトリを同じディレクトリに保存します。 ディレクトリ階層:

    ├── export.py
    ├── qdrant2csv.yaml
    └── output
  3. Qdrantクラスターに関する情報に基づいて、qdrant2csv.yaml設定ファイルの設定項目を変更します。

  4. Pythonスクリプトを実行し、出力を表示します。

    python export.py

    サンプル出力:

    .
    ├── export.py
    ├── qdrant2csv.yaml
    └── output
        ├── test_collection
        │   ├── 0000000000.csv
        │   ├── 0000000001.csv
        │   ├── 0000000002.csv
        │   └── create_table.sql
        └── multi
            ├── 0000000000.csv
            └── create_table.sql

手順2: AnalyticDB for PostgreSQLベクトルデータベースへのデータのインポート

  1. import.pyスクリプト、csv2adbpg.yaml設定ファイル、および手順1で作成した出力ディレクトリのデータをインポート用に準備します。

    import.pyスクリプトには、次のコンテンツが含まれています。

    import psycopg2
    import yaml
    import glob
    import os
    
    if __name__ == "__main__":
        with open('csv2adbpg.yaml', 'r') as config_file:
            config = yaml.safe_load(config_file)
    
        print("current config:" + str(config))
    
        db_host = config['database']['host']
        db_port = config['database']['port']
        db_name = config['database']['name']
        schema_name = config['database']['schema']
        db_user = config['database']['user']
        db_password = config['database']['password']
        data_path = config['data_path']
    
        conn = psycopg2.connect(
            host=db_host,
            port=db_port,
            database=db_name,
            user=db_user,
            password=db_password,
            options=f'-c search_path={schema_name},public'
        )
    
        cur = conn.cursor()
    
        # check schema
        cur.execute("SELECT schema_name FROM information_schema.schemata WHERE schema_name = %s", (schema_name,))
        existing_schema = cur.fetchone()
        if existing_schema:
            print(f"Schema {schema_name} already exists.")
        else:
            # create schema
            cur.execute(f"CREATE SCHEMA {schema_name}")
            print(f"Created schema: {schema_name}")
    
        for table_name in os.listdir(data_path):
            table_folder = os.path.join(data_path, table_name)
            print(f"Begin Process table: {table_name}")
            if os.path.isdir(table_folder):
                create_table_file = os.path.join(table_folder, 'create_table.sql')
                with open(create_table_file, 'r') as file:
                    create_table_sql = file.read()
                try:
                    cur.execute(create_table_sql)
                except psycopg2.errors.DuplicateTable as e:
                    print(e)
                    conn.rollback()
                    continue
                print(f"Created table: {table_name}")
    
                cnt = 0
                csv_files = glob.glob(os.path.join(table_folder, '*.csv'))
                for csv_file in csv_files:
                    with open(csv_file, 'r') as file:
                        copy_command = f"COPY {table_name} FROM STDIN DELIMITER '|' HEADER"
                        cur.copy_expert(copy_command, file)
                    cnt += 1
                    print(f"Imported data from: {csv_file} | {cnt}/{len(csv_files)} file(s) Done")
    
            conn.commit()
            print(f"Finished import table: {table_name}")
            print('#'*60)
    
        cur.close()
        conn.close()
    

    csv2adbpg.yaml設定ファイルには、次の内容が含まれています。

    database:
        host: "192.16.XX.XX"         # The public endpoint of the AnalyticDB for PostgreSQL instance.
        port: 5432                   # The port number of the AnalyticDB for PostgreSQL instance.
        name: "vector_database"      # The name of the destination database. 
        user: "username"             # The database account of the AnalyticDB for PostgreSQL instance.
        password: ""                 # The password of the database account.
        schema: "public"             # The name of the schema. If the schema does not exist, the schema is automatically created.
    
    data_path: "./data"            # The data source.
    
  2. import.pyスクリプト、csv2adbpg.yaml設定ファイル、およびインポートするデータを同じディレクトリに保存します。 ディレクトリ階層:

    .
    ├── csv2adbpg.yaml
    ├── data
    │   ├── test_collection
    │   │   ├── 0000000000.csv
    │   │   ├── 0000000001.csv
    │   │   ├── 0000000002.csv
    │   │   └── create_table.sql
    │   └── multi
    │       ├── 0000000000.csv
    │       └── create_table.sql
    └── import.py
  3. AnalyticDB for PostgreSQLインスタンスに関する情報に基づいて、csv2adbpg.yaml設定ファイルの設定項目を変更します。

  4. Pythonスクリプトを実行します。

    python import.py
  5. データがAnalyticDB for PostgreSQLベクトルデータベースにインポートされているかどうかを確認します。

  6. 必要なインデックスを再構築します。 詳細については、「ベクトルインデックスの作成」をご参照ください。

関連ドキュメント

Qdrantの詳細については、「Qdrantドキュメント」をご参照ください。