All Products
Search
Document Center

DataWorks:Milvus

Last Updated:Mar 27, 2026

DataWorks connects to Milvus as a data source, letting you read from and write to a Milvus vector database through batch synchronization tasks.

Supported Milvus versions

  • Milvus 2.4.x

  • Milvus 2.5.x

Supported field types

The following table lists the data type mappings between Milvus native types and the DataWorks internal type classifications used by Milvus Writer.

Milvus data type DataWorks type classification
Int8 LONG
Int16 LONG
Int32 LONG
Int64 LONG
Float DOUBLE
Double DOUBLE
FloatVector DOUBLE
String STRING
VarChar STRING
SparseFloatVector STRING
JSON STRING
Array STRING
Bool BOOLEAN
BFloat16Vector BYTES
Float16Vector BYTES
BinaryVector BYTES

Prerequisites

Before you begin, ensure that you have:

  • A running Milvus instance and its endpoint URL, for example http://xxxx.milvus.aliyuncs.com:19530

  • Milvus authentication credentials (username and password)

  • A DataWorks workspace with permission to manage data sources and create synchronization tasks

Add a data source

Add a Milvus data source to DataWorks before developing a synchronization task. Follow the instructions in Data source management. Parameter descriptions are available directly in the DataWorks console when you add the data source.

Develop a synchronization task

DataWorks supports offline synchronization tasks that read from or write to Milvus collections. The data flow is: reader pulls records from the source → field type mapping → writer batches records into the destination Milvus collection.

Configure an offline synchronization task for a single table

Use either of the following approaches:

Appendix: Script demo and parameters

The following scripts and parameter tables apply to batch synchronization tasks configured through the code editor. For the general script format, see Configure a task in the code editor.

Reader

Script demo

{
  "job": {
    "content": [
      {
        "reader": {
          "name": "milvusreader",
          "parameter": {
            "endpoint": "http://xxxx.milvus.aliyuncs.com:19530",
            "database": "default",
            "username": "root",
            "password": "xxxxxxx",
            "collection": "testColection",
            "column": [
              {
                "name": "id",
                "type": "Int64",
                "primaryKey": "true"
              },
              {
                "name": "int8col",
                "type": "Int8"
              },
              {
                "name": "int16col",
                "type": "Int16"
              }
            ]
          }
        },
        "writer": {
          "name": "Writer",
          "category": "writer",
          "stepType": "stream",
          "parameter": {}
        }
      }
    ],
    "setting": {
      "errorLimit": {
        "record": "0"
      },
      "speed": {
        "throttle": false,
        "concurrent": 1,
        "channel": 1
      }
    }
  }
}

Reader parameters

Parameter Description Required Default
collection The collection to read from. Yes None
column The fields to read. Configure as an array of field objects, each with name and type. For the primary key field, also set primaryKey to "true". To read dynamic fields, see the dynamic field options below. Yes None
filter A filter condition equivalent to a WHERE clause. For syntax, see the Milvus boolean expression documentation. No None
batchSize The number of records to read per batch. No 1024

Dynamic field options for `column`

To synchronize all dynamic fields as a single JSON field:

"column": [
  {
    "name": "dynamicName",
    "type": "json",
    "dynamicFileType": "allDynamicField"
  }
]

To synchronize a single dynamic field, where {singleDynamicName} is the field name in the collection:

"column": [
  {
    "name": "{singleDynamicName}",
    "type": "int",
    "dynamicFileType": "singleDynamicField"
  }
]

Writer

Script demo

{
  "transform": false,
  "type": "job",
  "version": "2.0",
  "steps": [
    {
      "name": "Reader",
      "category": "reader",
      "stepType": "stream",
      "parameter": {}
    },
    {
      "name": "Writer",
      "category": "writer",
      "stepType": "milvus",
      "parameter": {
        "datasource": "zm_test",
        "collection": "test",
        "schemaCreateMode": "createIfNotExist",
        "enableDynamicSchema": true,
        "envType": 1,
        "column": [
          {
            "name": "floatv1",
            "type": "FloatVector",
            "dimension": "3"
          },
          {
            "name": "incol",
            "type": "Int16"
          }
        ],
        "writeMode": "insert",
        "batchSize": 1024
      }
    }
  ],
  "setting": {
    "errorLimit": {
      "record": "0"
    },
    "speed": {
      "concurrent": 2,
      "throttle": false
    }
  }
}

Writer parameters

Parameter Description Required Default
datasource The name of the data source as configured in DataWorks. Yes None
collection The destination collection in Milvus. Yes None
schemaCreateMode How DataWorks handles the collection before synchronization. See Collection creation modes. Yes createIfNotExist
column The destination fields. Configure as an array of field objects. Each field requires name and type. Vector fields also require dimension, for example "dimension": "3". Yes None
writeMode The write behavior. See Write modes. No upsert
partition The destination partition. Leave blank to write to the _default partition. No _default
batchSize The number of records to write per batch. No 1024
enableDynamicSchema Enables dynamic schema when DataWorks creates the collection. No true

Write modes

Mode Behavior
upsert For collections without auto-incrementing primary keys: updates an entity based on its primary key. For collections with auto-incrementing primary keys: replaces the primary key with an auto-generated value and inserts the data.
insert Inserts data without checking for existing primary keys. Use for collections with auto-incrementing primary keys, where Milvus generates primary keys automatically. If you use insert on a collection without auto-incrementing primary keys, duplicate records may occur.

Collection creation modes

schemaCreateMode controls what DataWorks does when the destination collection does not exist.

Mode Behavior
createIfNotExist Creates the collection based on the configured column and other parameters, then starts the synchronization. If the collection already exists, uses it as-is.
Ignore Reports an error if the collection does not exist.
recreate Deletes the existing collection and creates a new one before each synchronization run.