All Products
Search
Document Center

AnalyticDB:Migrate data from a self-managed Qdrant cluster to an AnalyticDB for PostgreSQL instance

Last Updated:Feb 28, 2026

Qdrant is a vector similarity search engine for storing, searching, and managing vector data. This topic describes how to export collection data from a self-managed Qdrant cluster and import it into an AnalyticDB for PostgreSQL instance by using Python scripts. The migration follows four phases: export, import, verify, and index.

How it works

A Python script reads data from Qdrant collections through the scroll API and writes the results to pipe-delimited CSV files. A second Python script bulk-loads those CSV files into AnalyticDB for PostgreSQL tables by using the PostgreSQL COPY command.

Each Qdrant collection maps to a set of CSV files and a CREATE TABLE SQL statement during export. During import, each collection becomes a table in AnalyticDB for PostgreSQL.

Data type mapping

The following table shows how Qdrant data types map to PostgreSQL column types in the generated CREATE TABLE statements.

Qdrant conceptPostgreSQL column typeDescription
Point IDbigintUnique identifier for each data point
Dense vector (single)real[]Stored as a PostgreSQL array of floats. Column named vector.
Dense vector (named/multi)real[]One column per named vector, using the vector name from Qdrant
PayloadjsonAll payload fields stored as a single JSON column
Sparse vectors are not supported by this migration tool. Only dense vectors are exported.

Prerequisites

Before you begin, make sure that you have:

Source (Qdrant)

  • A running, accessible self-managed Qdrant cluster

  • The host address, port, and (if applicable) API key for the Qdrant cluster

  • Collections that contain data (empty collections produce no CSV data files)

Target (AnalyticDB for PostgreSQL)

  • An AnalyticDB for PostgreSQL instance (for more information, see the product documentation)

  • The public endpoint, port, database name, and database account credentials for the instance

  • Network connectivity between the migration host and the AnalyticDB for PostgreSQL instance

Migration host

  • Python 3.8 or later

  • The following Python libraries:

      pip install psycopg2
      pip install qdrant-client==1.6.0
      pip install pyaml
      pip install tqdm

Step 1: Export data from the Qdrant cluster

  1. Prepare the export.py script, the qdrant2csv.yaml configuration file, and an output directory.

    The export.py script contains the following content:

       import yaml
       import json
       from qdrant_client import QdrantClient
       import os
       from enum import IntEnum
       from tqdm import tqdm
    
       with open("./qdrant2csv.yaml", "r") as f:
           config = yaml.safe_load(f)
    
    
       print("configuration:")
       print(config)
    
       qdrant_config = config["qdrant"]
    
    
       class DataType(IntEnum):
           ID = 1
           FLOAT_VECTOR = 2
           JSON = 3
    
    
       def data_convert_to_str(data, dtype, delimeter):
           if dtype == DataType.ID:
               return str(data)
           elif dtype == DataType.FLOAT_VECTOR:
               return "{" + ", ".join(str(x) for x in data) + "}"
           elif dtype == DataType.JSON:
               return str(data).replace(delimeter, f"\\{delimeter}").replace("\"", "\\\"")
           Exception(f"Unsupported DataType {dtype}")
    
    
       def csv_write_rows(datum, fd, fields_types, delimiter="|"):
           for data in datum:
               for i in range(len(data)):
                   data[i] = data_convert_to_str(data[i], fields_types[i], delimiter)
               fd.write(delimiter.join(data) + "\n")
    
    
       def csv_write_header(headers, fd, delimiter="|"):
           fd.write(delimiter.join(headers) + "\n")
    
    
       def dump_collection(collection_name: str):
           results = []
           file_cnt = 0
           print("connecting to qdrant...")
           client = QdrantClient(**qdrant_config)
    
           export_config = config["export"]
           tmp_path = os.path.join(export_config["output_path"], collection_name)
           if not os.path.exists(tmp_path):
               os.mkdir(tmp_path)
    
           # fetch info of collection
           fields_meta_list = ["id bigint"]
           fields_types = [DataType.ID]
           headers = ["id"]
           collection = client.get_collection(collection_name)
           total_num = collection.points_count
           if isinstance(collection.config.params.vectors, dict):
               # multi vectors
               for vec_name in collection.config.params.vectors.keys():
                   fields_types.append(DataType.FLOAT_VECTOR)
                   fields_meta_list.append(f"{vec_name} real[]")
                   headers.append(vec_name)
           else:
               # single vector
               fields_types.append(DataType.FLOAT_VECTOR)
               fields_meta_list.append("vector real[]")
               headers.append("vector")
    
           fields_types.append(DataType.JSON)
           fields_meta_list.append("payload json")
           headers.append("payload")
    
           fields_meta_str = ','.join(fields_meta_list)
           create_table_sql = f"CREATE TABLE {collection_name} " \
                              f" ({fields_meta_str});"
    
           with open(os.path.join(export_config["output_path"], collection_name, "create_table.sql"), "w") as f_d:
               f_d.write(create_table_sql)
    
           print(create_table_sql)
    
           def write_to_csv_file(col_names, data):
               if len(results) == 0:
                   return
               nonlocal file_cnt
               assert(file_cnt <= 1e9)
               output_file_name = os.path.join(export_config["output_path"], collection_name, f"{str(file_cnt).zfill(10)}.csv")
               with open(output_file_name, "w", newline="") as csv_file:
                   # write header
                   csv_write_header(col_names, csv_file)
                   # write data
                   csv_write_rows(data, csv_file, fields_types)
                   file_cnt += 1
                   results.clear()
    
           offset_id = None
    
           with tqdm(total=total_num, bar_format="{l_bar}{bar}| {n_fmt}/{total_fmt}") as pbar:
               while True:
                   res = client.scroll(collection_name=collection_name,
                                       limit=1000,
                                       offset=offset_id,
                                       with_payload=True,
                                       with_vectors=True)
    
                   records = res[0]
                   for record in records:
                       # append id
                       record_list = [record.id]
                       # append vectors
                       if isinstance(record.vector, dict):
                           # multi vector
                           for vector_name in headers[1:-1]:
                               record_list.append(record.vector[vector_name])
                       else:
                           # single vector
                           record_list.append(record.vector)
                       # append payload
                       record_list.append(json.dumps(record.payload, ensure_ascii=False))
                       results.append(record_list)
    
                       if len(results) >= export_config["max_line_in_file"]:
                           write_to_csv_file(headers, data=results)
    
                       pbar.update(1)
    
                   if len(res) == 0 or len(res[0]) == 0 or res[1] is None:
                       # finished
                       break
                   else:
                       offset_id = res[1]
    
           write_to_csv_file(headers, data=results)
    
    
       for name in config["export"]["collections"]:
           dump_collection(name)

    The qdrant2csv.yaml configuration file contains the following content:

       qdrant: # The configuration items that are used to connect to the Qdrant cluster.
           host: 'localhost'  # The host address of the Qdrant service.
           port: 6333        # The port number of the Qdrant service. Default value: 6433.
           grpc_port: 6434   # The port number of the gRPC interface. Default value:  6334.
           api_key: ''  # The API key for authentication in Qdrant Cloud.
           url: ''      # The hostname or string."Optional[scheme], host, Optional[port], Optional[prefix]"
           location: '' # If you set this field to memory, this prompts the script to connect to the Qdrant cluster in in-memory mode. If you enter a regular string, this operation is the same as specifying a complete URL in the url field. If you do not specify this field, the cluster is connected by using the host and port fields.
    
       export:
          collections:
           - 'test_collection'
           - 'multi'                 # All Qdrant collections that you want to export.
         max_line_in_file: 40000     # The maximum number of lines that are contained in each output CSV file.
         output_path: './output'     # The path to the directory in which the exported CSV files are stored.
    Note: [Needs verification] The YAML comments list default port values (6433 for REST, 6334 for gRPC) that differ from the configured values (6333 and 6434) and from the standard Qdrant defaults (REST: 6333, gRPC: 6334). Verify the correct port values for your Qdrant deployment before running the export.
  2. Place export.py, qdrant2csv.yaml, and the output directory in the same directory:

       ├── export.py
       ├── qdrant2csv.yaml
       └── output
  3. Modify the configuration in qdrant2csv.yaml based on your Qdrant cluster settings.

  4. Run the export script:

       python export.py

    The script displays a progress bar during export. After the export completes, the output directory contains one subdirectory per collection. Each subdirectory includes numbered CSV files and a create_table.sql file:

       .
       ├── export.py
       ├── qdrant2csv.yaml
       └── output
           ├── test_collection
           │   ├── 0000000000.csv
           │   ├── 0000000001.csv
           │   ├── 0000000002.csv
           │   └── create_table.sql
           └── multi
               ├── 0000000000.csv
               └── create_table.sql

Step 2: Import data into the AnalyticDB for PostgreSQL instance

  1. Prepare the import.py script, the csv2adbpg.yaml configuration file, and the output directory from Step 1.

    The import.py script contains the following content:

       import psycopg2
       import yaml
       import glob
       import os
    
       if __name__ == "__main__":
           with open('csv2adbpg.yaml', 'r') as config_file:
               config = yaml.safe_load(config_file)
    
           print("current config:" + str(config))
    
           db_host = config['database']['host']
           db_port = config['database']['port']
           db_name = config['database']['name']
           schema_name = config['database']['schema']
           db_user = config['database']['user']
           db_password = config['database']['password']
           data_path = config['data_path']
    
           conn = psycopg2.connect(
               host=db_host,
               port=db_port,
               database=db_name,
               user=db_user,
               password=db_password,
               options=f'-c search_path={schema_name},public'
           )
    
           cur = conn.cursor()
    
           # check schema
           cur.execute("SELECT schema_name FROM information_schema.schemata WHERE schema_name = %s", (schema_name,))
           existing_schema = cur.fetchone()
           if existing_schema:
               print(f"Schema {schema_name} already exists.")
           else:
               # create schema
               cur.execute(f"CREATE SCHEMA {schema_name}")
               print(f"Created schema: {schema_name}")
    
           for table_name in os.listdir(data_path):
               table_folder = os.path.join(data_path, table_name)
               print(f"Begin Process table: {table_name}")
               if os.path.isdir(table_folder):
                   create_table_file = os.path.join(table_folder, 'create_table.sql')
                   with open(create_table_file, 'r') as file:
                       create_table_sql = file.read()
                   try:
                       cur.execute(create_table_sql)
                   except psycopg2.errors.DuplicateTable as e:
                       print(e)
                       conn.rollback()
                       continue
                   print(f"Created table: {table_name}")
    
                   cnt = 0
                   csv_files = glob.glob(os.path.join(table_folder, '*.csv'))
                   for csv_file in csv_files:
                       with open(csv_file, 'r') as file:
                           copy_command = f"COPY {table_name} FROM STDIN DELIMITER '|' HEADER"
                           cur.copy_expert(copy_command, file)
                       cnt += 1
                       print(f"Imported data from: {csv_file} | {cnt}/{len(csv_files)} file(s) Done")
    
               conn.commit()
               print(f"Finished import table: {table_name}")
               print('#'*60)
    
           cur.close()
           conn.close()

    The csv2adbpg.yaml configuration file contains the following content:

       database:
           host: "192.16.XX.XX"         # The public endpoint of the AnalyticDB for PostgreSQL instance.
           port: 5432                   # The port number of the AnalyticDB for PostgreSQL instance.
           name: "vector_database"      # The name of the destination database.
           user: "username"             # The database account of the AnalyticDB for PostgreSQL instance.
           password: ""                 # The password of the database account.
           schema: "public"             # The name of the schema. If the schema does not exist, the schema is automatically created.
    
       data_path: "./data"            # The data source.
  2. Place import.py, csv2adbpg.yaml, and the data directory in the same directory:

       .
       ├── csv2adbpg.yaml
       ├── data
       │   ├── test_collection
       │   │   ├── 0000000000.csv
       │   │   ├── 0000000001.csv
       │   │   ├── 0000000002.csv
       │   │   └── create_table.sql
       │   └── multi
       │       ├── 0000000000.csv
       │       └── create_table.sql
       └── import.py
  3. Modify the configuration in csv2adbpg.yaml based on your AnalyticDB for PostgreSQL instance settings.

  4. Run the import script:

       python import.py

    The script creates the schema (if it does not exist), creates tables from the generated SQL files, and bulk-loads data from the CSV files. If a table already exists, the script skips that table and continues with the next one.

Step 3: Verify the imported data

After the import completes, connect to the AnalyticDB for PostgreSQL instance and verify the data.

  1. Compare row counts. Run the following query for each imported table and compare the result against the points_count of the corresponding Qdrant collection:

       SELECT COUNT(*) FROM <table_name>;
  2. Run a sample query to verify that vectors and payloads were imported correctly:

       SELECT id, payload FROM <table_name> LIMIT 5;
  3. If the row counts match and the sample data looks correct, the data import is complete.

Step 4: Rebuild indexes

The CSV-based migration does not preserve indexes from the source Qdrant cluster. After verifying the data, create vector indexes on the imported tables to enable efficient similarity search. For more information, see Create a vector index.

Limitations

LimitationDescription
No index preservationQdrant indexes (HNSW or other) are not migrated. Rebuild indexes after import.
CSV format constraintsThe export uses a pipe-delimited (|) CSV format. Payload values that contain the pipe character are escaped, but complex nested data may require manual validation.
qdrant-client versionThe export script requires qdrant-client==1.6.0. Other versions may not be compatible.
Point ID typePoint IDs are stored as bigint. If your Qdrant collection uses UUID-style string IDs, the export script may not handle them correctly.
Table name conflictsIf a table with the same name as a Qdrant collection already exists in the target schema, the import script skips that table without importing data.

References