All Products
Search
Document Center

AnalyticDB:Migrate data from a self-managed Milvus cluster to an AnalyticDB for PostgreSQL instance

Last Updated:Aug 05, 2025

Milvus is a database designed to handle queries over input vectors and can index vectors on a trillion scale. You can migrate data from a self-managed Milvus cluster to an AnalyticDB for PostgreSQL instance using a Python program.

Prerequisites

  • A Milvus cluster of version 2.3.x or later is created.

  • Python 3.8 or later is installed.

  • The required Python libraries are installed.

    pip install psycopg2
    pip install pymilvus==2.3.0
    pip install pyaml
    pip install tqdm

Migration operations

Step 1: Export data from Milvus

  1. Prepare the export.py export script and the milvus2csv.yaml export configuration file, and create an output folder. In this example, the folder is named output.

    The export.py export script is as follows.

    import yaml
    import json
    from pymilvus import (
        connections,
        DataType,
        Collection,
    )
    import os
    from tqdm import tqdm
    
    with open("./milvus2csv.yaml", "r") as f:
        config = yaml.safe_load(f)
    
    print("configuration:")
    print(config)
    
    milvus_config = config["milvus"]
    
    milvus_type_to_adbpg_type = {
        DataType.BOOL: "bool",
        DataType.INT8: "smallint",
        DataType.INT16: "smallint",
        DataType.INT32: "integer",
        DataType.INT64: "bigint",
    
        DataType.FLOAT: "real",
        DataType.DOUBLE: "double precision",
    
        DataType.STRING: "text",
        DataType.VARCHAR: "varchar",
        DataType.JSON: "json",
    
        DataType.BINARY_VECTOR: "bit[]",
        DataType.FLOAT_VECTOR: "real[]",
    }
    
    
    def convert_to_binary(binary_data):
        decimal_value = int.from_bytes(binary_data, byteorder='big')
        binary_string = bin(decimal_value)[2:].zfill(len(binary_data)*8)
        return ','.join(list(binary_string))
    
    
    def data_convert_to_str(data, dtype, delimeter):
        if dtype == DataType.BOOL:
            return "1" if data else "0"
        elif dtype in [DataType.INT8, DataType.INT16,
                       DataType.INT32, DataType.INT64,
                       DataType.FLOAT, DataType.DOUBLE]:
            return str(data)
        elif dtype in [DataType.STRING, DataType.VARCHAR]:
            return str(data).replace(delimeter, f"\\{delimeter}").replace("\"", "\\\"").replace("\n", "\\n")
        elif dtype == DataType.JSON:
            return str(data).replace(delimeter, f"\\{delimeter}").replace("\"", "\\\"").replace("\n", "\\n")
        elif dtype == DataType.BINARY_VECTOR:
            return "{" + ','.join([convert_to_binary(d) for d in data]) + "}"
        elif dtype == DataType.FLOAT_VECTOR:
            return data
    
        Exception(f"Unsupported DataType {dtype}")
    
    
    def csv_write_rows(datum, fd, fields_types, delimiter="|"):
        for data in datum:
            for i in range(len(data)):
                ftype = fields_types[i]
                data[i] = data_convert_to_str(data[i], ftype, delimiter)
            fd.write(delimiter.join(data) + "\n")
    
    
    def csv_write_header(headers, fd, delimiter="|"):
        fd.write(delimiter.join(headers) + "\n")
    
    
    def dump_collection(collection_name: str):
        results = []
        file_cnt = 0
        print("connecting to milvus...")
        connections.connect("default", **milvus_config)
    
        export_config = config["export"]
        collection = Collection(collection_name)
        collection.load()
        tmp_path = os.path.join(export_config["output_path"], collection_name)
        if not os.path.exists(tmp_path):
            os.mkdir(tmp_path)
    
        fields_meta_str = ""
        fields_types = []
        headers = []
        for schema in collection.schema.fields:
            print(schema)
            fields_types.append(schema.dtype)
            headers.append(schema.name)
            if len(fields_meta_str) != 0:
                fields_meta_str += ","
            fields_meta_str += f"{schema.name} {milvus_type_to_adbpg_type[schema.dtype]}"
            if schema.dtype == DataType.VARCHAR and "max_length" in schema.params.keys():
                fields_meta_str += f"({schema.params['max_length']})"
            if schema.is_primary:
                fields_meta_str += " PRIMARY KEY"
    
        create_table_sql = f"CREATE TABLE {collection.name} " \
                           f" ({fields_meta_str});"
    
        with open(os.path.join(export_config["output_path"], collection_name, "create_table.sql"), "w") as f:
            f.write(create_table_sql)
    
        print(create_table_sql)
    
        print(headers)
    
        total_num = collection.num_entities
        collection.load()
        query_iterator = collection.query_iterator(batch_size=1000, expr="", output_fields=headers)
    
        def write_to_csv_file(col_names, data):
            if len(results) == 0:
                return
            nonlocal file_cnt
            assert(file_cnt <= 1e9)
            output_file_name = os.path.join(export_config["output_path"], collection_name, f"{str(file_cnt).zfill(10)}.csv")
            with open(output_file_name, "w", newline="") as csv_file:
                # write header
                csv_write_header(col_names, csv_file)
                # write data
                csv_write_rows(data, csv_file, fields_types)
                file_cnt += 1
                results.clear()
    
        with tqdm(total=total_num, bar_format="{l_bar}{bar}| {n_fmt}/{total_fmt}") as pbar:
            while True:
                res = query_iterator.next()
                if len(res) == 0:
                    print("query iteration finished, close")
                    # close the iterator
                    query_iterator.close()
                    break
                for row in res:
                    row_list = []
                    for i in range(len(headers)):
                        field = row[headers[i]]
                        if isinstance(field, list) and fields_types[i] != DataType.BINARY_VECTOR:
                            row_list.append("{" + ", ".join(str(x) for x in field) + "}")
                        elif isinstance(field, dict):
                            row_list.append(json.dumps(field, ensure_ascii=False))
                        else:
                            row_list.append(field)
                    results.append(row_list)
                    if len(results) >= export_config["max_line_in_file"]:
                        write_to_csv_file(headers, data=results)
                    pbar.update(1)
    
        write_to_csv_file(headers, data=results)
    
    if __name__ == "__main__":
      for name in config["export"]["collections"]:
          dump_collection(name)
    

    The milvus2csv.yaml export configuration file is as follows.

    milvus:
       host: '<localhost>'        # The host address of the Milvus service.
       port: 19530                # The service port of Milvus.
       user: '<user_name>'        # The username.
       password: '<password>'     # The password.
       db_name: '<database_name>' # The database name.
       token: '<token_id>'        # The access token.
    
    export:
       collections:
        - 'test'
        - 'medium_articles_with_json'
        # - 'hello_milvus'
        # - 'car'
        # - 'medium_articles_with_dynamic'
        # Specify the names of all collections that you want to export.
      max_line_in_file: 40000     # The number of rows for chunking the exported file.
      output_path: './output'     # The destination folder for export. This topic uses ./output as an example.
  2. Store the export.py export script, the milvus2csv.yaml export configuration file, and the output folder in the same directory. The directory structure is as follows.

    ├── export.py
    ├── milvus2csv.yaml
    └── output
  3. Modify the configuration items in the milvus2csv.yaml file based on the information about the Milvus cluster.

  4. Run the Python script and view the output.

    python export.py

    The output is as follows.

    .
    ├── export.py
    ├── milvus2csv.yaml
    └── output
        ├── medium_articles_with_json
        │   ├── 0000000000.csv
        │   ├── 0000000001.csv
        │   ├── 0000000002.csv
        │   └── create_table.sql
        └── test
            ├── 0000000000.csv
            └── create_table.sql

Step 2: Import data to the AnalyticDB for PostgreSQL vector database

  1. Prepare the import.py import script, the csv2adbpg.yaml import configuration file, and the data to import (the output folder from the export step).

    The import.py import script is as follows.

    import psycopg2
    import yaml
    import glob
    import os
    
    if __name__ == "__main__":
        with open('csv2adbpg.yaml', 'r') as config_file:
            config = yaml.safe_load(config_file)
    
        print("current config:" + str(config))
    
        db_host = config['database']['host']
        db_port = config['database']['port']
        db_name = config['database']['name']
        schema_name = config['database']['schema']
        db_user = config['database']['user']
        db_password = config['database']['password']
        data_path = config['data_path']
    
        conn = psycopg2.connect(
            host=db_host,
            port=db_port,
            database=db_name,
            user=db_user,
            password=db_password,
            options=f'-c search_path={schema_name},public'
        )
    
        cur = conn.cursor()
    
        # check schema
        cur.execute("SELECT schema_name FROM information_schema.schemata WHERE schema_name = %s", (schema_name,))
        existing_schema = cur.fetchone()
        if existing_schema:
            print(f"Schema {schema_name} already exists.")
        else:
            # create schema
            cur.execute(f"CREATE SCHEMA {schema_name}")
            print(f"Created schema: {schema_name}")
    
        for table_name in os.listdir(data_path):
            table_folder = os.path.join(data_path, table_name)
            print(f"Begin Process table: {table_name}")
            if os.path.isdir(table_folder):
                create_table_file = os.path.join(table_folder, 'create_table.sql')
                with open(create_table_file, 'r') as file:
                    create_table_sql = file.read()
                try:
                    cur.execute(create_table_sql)
                except psycopg2.errors.DuplicateTable as e:
                    print(e)
                    conn.rollback()
                    continue
                print(f"Created table: {table_name}")
    
                cnt = 0
                csv_files = glob.glob(os.path.join(table_folder, '*.csv'))
                for csv_file in csv_files:
                    with open(csv_file, 'r') as file:
                        copy_command = f"COPY {table_name} FROM STDIN DELIMITER '|' HEADER"
                        cur.copy_expert(copy_command, file)
                    cnt += 1
                    print(f"Imported data from: {csv_file} | {cnt}/{len(csv_files)} file(s) Done")
    
            conn.commit()
            print(f"Finished import table: {table_name}")
            print(' # '*60)
    
        cur.close()
        conn.close()
    

    The csv2adbpg.yaml import configuration file is as follows.

    database:
      host: "192.16.XX.XX"         # The public endpoint of the AnalyticDB for PostgreSQL instance.
      port: 5432                   # The port of the AnalyticDB for PostgreSQL instance.
      name: "vector_database"      # The name of the destination database. 
      user: "username"             # The database account of the AnalyticDB for PostgreSQL instance.
      password: ""                 # The password of the account.
      schema: "public"             # The name of the schema to import. If the schema does not exist, it is automatically created.
    
    data_path: "./data"            # The data source to import.
  2. Store the import.py import script and the csv2adbpg.yaml import configuration file in the same directory as the data to import. The directory structure is as follows.

    .
    ├── csv2adbpg.yaml
    ├── data
    │   ├── medium_articles_with_json
    │   │   ├── 0000000000.csv
    │   │   ├── 0000000001.csv
    │   │   ├── 0000000002.csv
    │   │   └── create_table.sql
    │   └── test
    │       ├── 0000000000.csv
    │       └── create_table.sql
    └── import.py
  3. Modify the configuration items in the csv2adbpg.yaml file based on the information about the AnalyticDB for PostgreSQL instance.

  4. Run the Python script.

    python import.py
  5. Check whether the data is imported to the AnalyticDB for PostgreSQL vector database correctly.

  6. Rebuild the required indexes. For more information, see Create a vector index.

References

For more information about Milvus, see Milvus product documentation.