Qdrantは、ベクトルデータを保存、検索、および管理するために使用されるベクトル類似性検索エンジンです。 Pythonプログラミング言語を使用して、自己管理型QdrantクラスターからAnalyticDB for PostgreSQLインスタンスに収集データを移行できます。
前提条件
Qdrantクラスターが作成されます。
Pythonがインストールされています。 Python 3.8以降を使用することを推奨します。
必要なPythonライブラリがインストールされます。
pip install psycopg2 pip install qdrant-client==1.6.0 pip install pyaml pip install tqdm
手順
手順1: Qdrantクラスターからのデータのエクスポート
データエクスポート用に
export.pyスクリプトとqdrant2csv.yaml設定ファイルを準備し、出力ディレクトリを作成します。 このトピックでは、outputをディレクトリ名として使用します。export.pyスクリプトには、次のコンテンツが含まれています。import yaml import json from qdrant_client import QdrantClient import os from enum import IntEnum from tqdm import tqdm with open("./qdrant2csv.yaml", "r") as f: config = yaml.safe_load(f) print("configuration:") print(config) qdrant_config = config["qdrant"] class DataType(IntEnum): ID = 1 FLOAT_VECTOR = 2 JSON = 3 def data_convert_to_str(data, dtype, delimeter): if dtype == DataType.ID: return str(data) elif dtype == DataType.FLOAT_VECTOR: return "{" + ", ".join(str(x) for x in data) + "}" elif dtype == DataType.JSON: return str(data).replace(delimeter, f"\\{delimeter}").replace("\"", "\\\"") Exception(f"Unsupported DataType {dtype}") def csv_write_rows(datum, fd, fields_types, delimiter="|"): for data in datum: for i in range(len(data)): data[i] = data_convert_to_str(data[i], fields_types[i], delimiter) fd.write(delimiter.join(data) + "\n") def csv_write_header(headers, fd, delimiter="|"): fd.write(delimiter.join(headers) + "\n") def dump_collection(collection_name: str): results = [] file_cnt = 0 print("connecting to qdrant...") client = QdrantClient(**qdrant_config) export_config = config["export"] tmp_path = os.path.join(export_config["output_path"], collection_name) if not os.path.exists(tmp_path): os.mkdir(tmp_path) # fetch info of collection fields_meta_list = ["id bigint"] fields_types = [DataType.ID] headers = ["id"] collection = client.get_collection(collection_name) total_num = collection.points_count if isinstance(collection.config.params.vectors, dict): # multi vectors for vec_name in collection.config.params.vectors.keys(): fields_types.append(DataType.FLOAT_VECTOR) fields_meta_list.append(f"{vec_name} real[]") headers.append(vec_name) else: # single vector fields_types.append(DataType.FLOAT_VECTOR) fields_meta_list.append("vector real[]") headers.append("vector") fields_types.append(DataType.JSON) fields_meta_list.append("payload json") headers.append("payload") fields_meta_str = ','.join(fields_meta_list) create_table_sql = f"CREATE TABLE {collection_name} " \ f" ({fields_meta_str});" with open(os.path.join(export_config["output_path"], collection_name, "create_table.sql"), "w") as f_d: f_d.write(create_table_sql) print(create_table_sql) def write_to_csv_file(col_names, data): if len(results) == 0: return nonlocal file_cnt assert(file_cnt <= 1e9) output_file_name = os.path.join(export_config["output_path"], collection_name, f"{str(file_cnt).zfill(10)}.csv") with open(output_file_name, "w", newline="") as csv_file: # write header csv_write_header(col_names, csv_file) # write data csv_write_rows(data, csv_file, fields_types) file_cnt += 1 results.clear() offset_id = None with tqdm(total=total_num, bar_format="{l_bar}{bar}| {n_fmt}/{total_fmt}") as pbar: while True: res = client.scroll(collection_name=collection_name, limit=1000, offset=offset_id, with_payload=True, with_vectors=True) records = res[0] for record in records: # append id record_list = [record.id] # append vectors if isinstance(record.vector, dict): # multi vector for vector_name in headers[1:-1]: record_list.append(record.vector[vector_name]) else: # single vector record_list.append(record.vector) # append payload record_list.append(json.dumps(record.payload, ensure_ascii=False)) results.append(record_list) if len(results) >= export_config["max_line_in_file"]: write_to_csv_file(headers, data=results) pbar.update(1) if len(res) == 0 or len(res[0]) == 0 or res[1] is None: # finished break else: offset_id = res[1] write_to_csv_file(headers, data=results) for name in config["export"]["collections"]: dump_collection(name)qdrant2csv.yaml設定ファイルには、次の内容が含まれています。qdrant: # The configuration items that are used to connect to the Qdrant cluster. host: 'localhost' # The host address of the Qdrant service. port: 6333 # The port number of the Qdrant service. Default value: 6433. grpc_port: 6434 # The port number of the gRPC interface. Default value: 6334. api_key: '' # The API key for authentication in Qdrant Cloud. url: '' # The hostname or string."Optional[scheme], host, Optional[port], Optional[prefix]" location: '' # If you set this field to memory, this prompts the script to connect to the Qdrant cluster in in-memory mode. If you enter a regular string, this operation is the same as specifying a complete URL in the url field. If you do not specify this field, the cluster is connected by using the host and port fields. export: collections: - 'test_collection' - 'multi' # All Qdrant collections that you want to export. max_line_in_file: 40000 # The maximum number of lines that are contained in each output CSV file. output_path: './output' # The path to the directory in which the exported CSV files are stored.export.pyスクリプト、qdrant2csv.yaml設定ファイル、およびoutputディレクトリを同じディレクトリに保存します。 ディレクトリ階層:├── export.py ├── qdrant2csv.yaml └── outputQdrantクラスターに関する情報に基づいて、
qdrant2csv.yaml設定ファイルの設定項目を変更します。Pythonスクリプトを実行し、出力を表示します。
python export.pyサンプル出力:
. ├── export.py ├── qdrant2csv.yaml └── output ├── test_collection │ ├── 0000000000.csv │ ├── 0000000001.csv │ ├── 0000000002.csv │ └── create_table.sql └── multi ├── 0000000000.csv └── create_table.sql
手順2: AnalyticDB for PostgreSQLベクトルデータベースへのデータのインポート
import.pyスクリプト、csv2adbpg.yaml設定ファイル、および手順1で作成した出力ディレクトリのデータをインポート用に準備します。import.pyスクリプトには、次のコンテンツが含まれています。import psycopg2 import yaml import glob import os if __name__ == "__main__": with open('csv2adbpg.yaml', 'r') as config_file: config = yaml.safe_load(config_file) print("current config:" + str(config)) db_host = config['database']['host'] db_port = config['database']['port'] db_name = config['database']['name'] schema_name = config['database']['schema'] db_user = config['database']['user'] db_password = config['database']['password'] data_path = config['data_path'] conn = psycopg2.connect( host=db_host, port=db_port, database=db_name, user=db_user, password=db_password, options=f'-c search_path={schema_name},public' ) cur = conn.cursor() # check schema cur.execute("SELECT schema_name FROM information_schema.schemata WHERE schema_name = %s", (schema_name,)) existing_schema = cur.fetchone() if existing_schema: print(f"Schema {schema_name} already exists.") else: # create schema cur.execute(f"CREATE SCHEMA {schema_name}") print(f"Created schema: {schema_name}") for table_name in os.listdir(data_path): table_folder = os.path.join(data_path, table_name) print(f"Begin Process table: {table_name}") if os.path.isdir(table_folder): create_table_file = os.path.join(table_folder, 'create_table.sql') with open(create_table_file, 'r') as file: create_table_sql = file.read() try: cur.execute(create_table_sql) except psycopg2.errors.DuplicateTable as e: print(e) conn.rollback() continue print(f"Created table: {table_name}") cnt = 0 csv_files = glob.glob(os.path.join(table_folder, '*.csv')) for csv_file in csv_files: with open(csv_file, 'r') as file: copy_command = f"COPY {table_name} FROM STDIN DELIMITER '|' HEADER" cur.copy_expert(copy_command, file) cnt += 1 print(f"Imported data from: {csv_file} | {cnt}/{len(csv_files)} file(s) Done") conn.commit() print(f"Finished import table: {table_name}") print('#'*60) cur.close() conn.close()csv2adbpg.yaml設定ファイルには、次の内容が含まれています。database: host: "192.16.XX.XX" # The public endpoint of the AnalyticDB for PostgreSQL instance. port: 5432 # The port number of the AnalyticDB for PostgreSQL instance. name: "vector_database" # The name of the destination database. user: "username" # The database account of the AnalyticDB for PostgreSQL instance. password: "" # The password of the database account. schema: "public" # The name of the schema. If the schema does not exist, the schema is automatically created. data_path: "./data" # The data source.import.pyスクリプト、csv2adbpg.yaml設定ファイル、およびインポートするデータを同じディレクトリに保存します。 ディレクトリ階層:. ├── csv2adbpg.yaml ├── data │ ├── test_collection │ │ ├── 0000000000.csv │ │ ├── 0000000001.csv │ │ ├── 0000000002.csv │ │ └── create_table.sql │ └── multi │ ├── 0000000000.csv │ └── create_table.sql └── import.pyAnalyticDB for PostgreSQLインスタンスに関する情報に基づいて、
csv2adbpg.yaml設定ファイルの設定項目を変更します。Pythonスクリプトを実行します。
python import.pyデータがAnalyticDB for PostgreSQLベクトルデータベースにインポートされているかどうかを確認します。
必要なインデックスを再構築します。 詳細については、「ベクトルインデックスの作成」をご参照ください。
関連ドキュメント
Qdrantの詳細については、「Qdrantドキュメント」をご参照ください。