Qdrant is a vector similarity search engine for storing, searching, and managing vector data. This topic describes how to export collection data from a self-managed Qdrant cluster and import it into an AnalyticDB for PostgreSQL instance by using Python scripts. The migration follows four phases: export, import, verify, and index.
How it works
A Python script reads data from Qdrant collections through the scroll API and writes the results to pipe-delimited CSV files. A second Python script bulk-loads those CSV files into AnalyticDB for PostgreSQL tables by using the PostgreSQL COPY command.
Each Qdrant collection maps to a set of CSV files and a CREATE TABLE SQL statement during export. During import, each collection becomes a table in AnalyticDB for PostgreSQL.
Data type mapping
The following table shows how Qdrant data types map to PostgreSQL column types in the generated CREATE TABLE statements.
| Qdrant concept | PostgreSQL column type | Description |
|---|---|---|
| Point ID | bigint | Unique identifier for each data point |
| Dense vector (single) | real[] | Stored as a PostgreSQL array of floats. Column named vector. |
| Dense vector (named/multi) | real[] | One column per named vector, using the vector name from Qdrant |
| Payload | json | All payload fields stored as a single JSON column |
Sparse vectors are not supported by this migration tool. Only dense vectors are exported.
Prerequisites
Before you begin, make sure that you have:
Source (Qdrant)
A running, accessible self-managed Qdrant cluster
The host address, port, and (if applicable) API key for the Qdrant cluster
Collections that contain data (empty collections produce no CSV data files)
Target (AnalyticDB for PostgreSQL)
An AnalyticDB for PostgreSQL instance (for more information, see the product documentation)
The public endpoint, port, database name, and database account credentials for the instance
Network connectivity between the migration host and the AnalyticDB for PostgreSQL instance
Migration host
Python 3.8 or later
The following Python libraries:
pip install psycopg2 pip install qdrant-client==1.6.0 pip install pyaml pip install tqdm
Step 1: Export data from the Qdrant cluster
Prepare the
export.pyscript, theqdrant2csv.yamlconfiguration file, and anoutputdirectory.The
export.pyscript contains the following content:import yaml import json from qdrant_client import QdrantClient import os from enum import IntEnum from tqdm import tqdm with open("./qdrant2csv.yaml", "r") as f: config = yaml.safe_load(f) print("configuration:") print(config) qdrant_config = config["qdrant"] class DataType(IntEnum): ID = 1 FLOAT_VECTOR = 2 JSON = 3 def data_convert_to_str(data, dtype, delimeter): if dtype == DataType.ID: return str(data) elif dtype == DataType.FLOAT_VECTOR: return "{" + ", ".join(str(x) for x in data) + "}" elif dtype == DataType.JSON: return str(data).replace(delimeter, f"\\{delimeter}").replace("\"", "\\\"") Exception(f"Unsupported DataType {dtype}") def csv_write_rows(datum, fd, fields_types, delimiter="|"): for data in datum: for i in range(len(data)): data[i] = data_convert_to_str(data[i], fields_types[i], delimiter) fd.write(delimiter.join(data) + "\n") def csv_write_header(headers, fd, delimiter="|"): fd.write(delimiter.join(headers) + "\n") def dump_collection(collection_name: str): results = [] file_cnt = 0 print("connecting to qdrant...") client = QdrantClient(**qdrant_config) export_config = config["export"] tmp_path = os.path.join(export_config["output_path"], collection_name) if not os.path.exists(tmp_path): os.mkdir(tmp_path) # fetch info of collection fields_meta_list = ["id bigint"] fields_types = [DataType.ID] headers = ["id"] collection = client.get_collection(collection_name) total_num = collection.points_count if isinstance(collection.config.params.vectors, dict): # multi vectors for vec_name in collection.config.params.vectors.keys(): fields_types.append(DataType.FLOAT_VECTOR) fields_meta_list.append(f"{vec_name} real[]") headers.append(vec_name) else: # single vector fields_types.append(DataType.FLOAT_VECTOR) fields_meta_list.append("vector real[]") headers.append("vector") fields_types.append(DataType.JSON) fields_meta_list.append("payload json") headers.append("payload") fields_meta_str = ','.join(fields_meta_list) create_table_sql = f"CREATE TABLE {collection_name} " \ f" ({fields_meta_str});" with open(os.path.join(export_config["output_path"], collection_name, "create_table.sql"), "w") as f_d: f_d.write(create_table_sql) print(create_table_sql) def write_to_csv_file(col_names, data): if len(results) == 0: return nonlocal file_cnt assert(file_cnt <= 1e9) output_file_name = os.path.join(export_config["output_path"], collection_name, f"{str(file_cnt).zfill(10)}.csv") with open(output_file_name, "w", newline="") as csv_file: # write header csv_write_header(col_names, csv_file) # write data csv_write_rows(data, csv_file, fields_types) file_cnt += 1 results.clear() offset_id = None with tqdm(total=total_num, bar_format="{l_bar}{bar}| {n_fmt}/{total_fmt}") as pbar: while True: res = client.scroll(collection_name=collection_name, limit=1000, offset=offset_id, with_payload=True, with_vectors=True) records = res[0] for record in records: # append id record_list = [record.id] # append vectors if isinstance(record.vector, dict): # multi vector for vector_name in headers[1:-1]: record_list.append(record.vector[vector_name]) else: # single vector record_list.append(record.vector) # append payload record_list.append(json.dumps(record.payload, ensure_ascii=False)) results.append(record_list) if len(results) >= export_config["max_line_in_file"]: write_to_csv_file(headers, data=results) pbar.update(1) if len(res) == 0 or len(res[0]) == 0 or res[1] is None: # finished break else: offset_id = res[1] write_to_csv_file(headers, data=results) for name in config["export"]["collections"]: dump_collection(name)The
qdrant2csv.yamlconfiguration file contains the following content:qdrant: # The configuration items that are used to connect to the Qdrant cluster. host: 'localhost' # The host address of the Qdrant service. port: 6333 # The port number of the Qdrant service. Default value: 6433. grpc_port: 6434 # The port number of the gRPC interface. Default value: 6334. api_key: '' # The API key for authentication in Qdrant Cloud. url: '' # The hostname or string."Optional[scheme], host, Optional[port], Optional[prefix]" location: '' # If you set this field to memory, this prompts the script to connect to the Qdrant cluster in in-memory mode. If you enter a regular string, this operation is the same as specifying a complete URL in the url field. If you do not specify this field, the cluster is connected by using the host and port fields. export: collections: - 'test_collection' - 'multi' # All Qdrant collections that you want to export. max_line_in_file: 40000 # The maximum number of lines that are contained in each output CSV file. output_path: './output' # The path to the directory in which the exported CSV files are stored.Note: [Needs verification] The YAML comments list default port values (6433 for REST, 6334 for gRPC) that differ from the configured values (6333 and 6434) and from the standard Qdrant defaults (REST: 6333, gRPC: 6334). Verify the correct port values for your Qdrant deployment before running the export.
Place
export.py,qdrant2csv.yaml, and theoutputdirectory in the same directory:├── export.py ├── qdrant2csv.yaml └── outputModify the configuration in
qdrant2csv.yamlbased on your Qdrant cluster settings.Run the export script:
python export.pyThe script displays a progress bar during export. After the export completes, the output directory contains one subdirectory per collection. Each subdirectory includes numbered CSV files and a
create_table.sqlfile:. ├── export.py ├── qdrant2csv.yaml └── output ├── test_collection │ ├── 0000000000.csv │ ├── 0000000001.csv │ ├── 0000000002.csv │ └── create_table.sql └── multi ├── 0000000000.csv └── create_table.sql
Step 2: Import data into the AnalyticDB for PostgreSQL instance
Prepare the
import.pyscript, thecsv2adbpg.yamlconfiguration file, and the output directory from Step 1.The
import.pyscript contains the following content:import psycopg2 import yaml import glob import os if __name__ == "__main__": with open('csv2adbpg.yaml', 'r') as config_file: config = yaml.safe_load(config_file) print("current config:" + str(config)) db_host = config['database']['host'] db_port = config['database']['port'] db_name = config['database']['name'] schema_name = config['database']['schema'] db_user = config['database']['user'] db_password = config['database']['password'] data_path = config['data_path'] conn = psycopg2.connect( host=db_host, port=db_port, database=db_name, user=db_user, password=db_password, options=f'-c search_path={schema_name},public' ) cur = conn.cursor() # check schema cur.execute("SELECT schema_name FROM information_schema.schemata WHERE schema_name = %s", (schema_name,)) existing_schema = cur.fetchone() if existing_schema: print(f"Schema {schema_name} already exists.") else: # create schema cur.execute(f"CREATE SCHEMA {schema_name}") print(f"Created schema: {schema_name}") for table_name in os.listdir(data_path): table_folder = os.path.join(data_path, table_name) print(f"Begin Process table: {table_name}") if os.path.isdir(table_folder): create_table_file = os.path.join(table_folder, 'create_table.sql') with open(create_table_file, 'r') as file: create_table_sql = file.read() try: cur.execute(create_table_sql) except psycopg2.errors.DuplicateTable as e: print(e) conn.rollback() continue print(f"Created table: {table_name}") cnt = 0 csv_files = glob.glob(os.path.join(table_folder, '*.csv')) for csv_file in csv_files: with open(csv_file, 'r') as file: copy_command = f"COPY {table_name} FROM STDIN DELIMITER '|' HEADER" cur.copy_expert(copy_command, file) cnt += 1 print(f"Imported data from: {csv_file} | {cnt}/{len(csv_files)} file(s) Done") conn.commit() print(f"Finished import table: {table_name}") print('#'*60) cur.close() conn.close()The
csv2adbpg.yamlconfiguration file contains the following content:database: host: "192.16.XX.XX" # The public endpoint of the AnalyticDB for PostgreSQL instance. port: 5432 # The port number of the AnalyticDB for PostgreSQL instance. name: "vector_database" # The name of the destination database. user: "username" # The database account of the AnalyticDB for PostgreSQL instance. password: "" # The password of the database account. schema: "public" # The name of the schema. If the schema does not exist, the schema is automatically created. data_path: "./data" # The data source.Place
import.py,csv2adbpg.yaml, and the data directory in the same directory:. ├── csv2adbpg.yaml ├── data │ ├── test_collection │ │ ├── 0000000000.csv │ │ ├── 0000000001.csv │ │ ├── 0000000002.csv │ │ └── create_table.sql │ └── multi │ ├── 0000000000.csv │ └── create_table.sql └── import.pyModify the configuration in
csv2adbpg.yamlbased on your AnalyticDB for PostgreSQL instance settings.Run the import script:
python import.pyThe script creates the schema (if it does not exist), creates tables from the generated SQL files, and bulk-loads data from the CSV files. If a table already exists, the script skips that table and continues with the next one.
Step 3: Verify the imported data
After the import completes, connect to the AnalyticDB for PostgreSQL instance and verify the data.
Compare row counts. Run the following query for each imported table and compare the result against the
points_countof the corresponding Qdrant collection:SELECT COUNT(*) FROM <table_name>;Run a sample query to verify that vectors and payloads were imported correctly:
SELECT id, payload FROM <table_name> LIMIT 5;If the row counts match and the sample data looks correct, the data import is complete.
Step 4: Rebuild indexes
The CSV-based migration does not preserve indexes from the source Qdrant cluster. After verifying the data, create vector indexes on the imported tables to enable efficient similarity search. For more information, see Create a vector index.
Limitations
| Limitation | Description |
|---|---|
| No index preservation | Qdrant indexes (HNSW or other) are not migrated. Rebuild indexes after import. |
| CSV format constraints | The export uses a pipe-delimited (|) CSV format. Payload values that contain the pipe character are escaped, but complex nested data may require manual validation. |
| qdrant-client version | The export script requires qdrant-client==1.6.0. Other versions may not be compatible. |
| Point ID type | Point IDs are stored as bigint. If your Qdrant collection uses UUID-style string IDs, the export script may not handle them correctly. |
| Table name conflicts | If a table with the same name as a Qdrant collection already exists in the target schema, the import script skips that table without importing data. |