DataWorks connects to Milvus as a data source, letting you read from and write to a Milvus vector database through batch synchronization tasks.
Supported Milvus versions
-
Milvus 2.4.x
-
Milvus 2.5.x
Supported field types
The following table lists the data type mappings between Milvus native types and the DataWorks internal type classifications used by Milvus Writer.
| Milvus data type | DataWorks type classification |
|---|---|
| Int8 | LONG |
| Int16 | LONG |
| Int32 | LONG |
| Int64 | LONG |
| Float | DOUBLE |
| Double | DOUBLE |
| FloatVector | DOUBLE |
| String | STRING |
| VarChar | STRING |
| SparseFloatVector | STRING |
| JSON | STRING |
| Array | STRING |
| Bool | BOOLEAN |
| BFloat16Vector | BYTES |
| Float16Vector | BYTES |
| BinaryVector | BYTES |
Prerequisites
Before you begin, ensure that you have:
-
A running Milvus instance and its endpoint URL, for example
http://xxxx.milvus.aliyuncs.com:19530 -
Milvus authentication credentials (username and password)
-
A DataWorks workspace with permission to manage data sources and create synchronization tasks
Add a data source
Add a Milvus data source to DataWorks before developing a synchronization task. Follow the instructions in Data source management. Parameter descriptions are available directly in the DataWorks console when you add the data source.
Develop a synchronization task
DataWorks supports offline synchronization tasks that read from or write to Milvus collections. The data flow is: reader pulls records from the source → field type mapping → writer batches records into the destination Milvus collection.
Configure an offline synchronization task for a single table
Use either of the following approaches:
-
Code editor configuration — see the Appendix for a full script demo and parameter reference
Appendix: Script demo and parameters
The following scripts and parameter tables apply to batch synchronization tasks configured through the code editor. For the general script format, see Configure a task in the code editor.
Reader
Script demo
{
"job": {
"content": [
{
"reader": {
"name": "milvusreader",
"parameter": {
"endpoint": "http://xxxx.milvus.aliyuncs.com:19530",
"database": "default",
"username": "root",
"password": "xxxxxxx",
"collection": "testColection",
"column": [
{
"name": "id",
"type": "Int64",
"primaryKey": "true"
},
{
"name": "int8col",
"type": "Int8"
},
{
"name": "int16col",
"type": "Int16"
}
]
}
},
"writer": {
"name": "Writer",
"category": "writer",
"stepType": "stream",
"parameter": {}
}
}
],
"setting": {
"errorLimit": {
"record": "0"
},
"speed": {
"throttle": false,
"concurrent": 1,
"channel": 1
}
}
}
}
Reader parameters
| Parameter | Description | Required | Default |
|---|---|---|---|
collection |
The collection to read from. | Yes | None |
column |
The fields to read. Configure as an array of field objects, each with name and type. For the primary key field, also set primaryKey to "true". To read dynamic fields, see the dynamic field options below. |
Yes | None |
filter |
A filter condition equivalent to a WHERE clause. For syntax, see the Milvus boolean expression documentation. | No | None |
batchSize |
The number of records to read per batch. | No | 1024 |
Dynamic field options for `column`
To synchronize all dynamic fields as a single JSON field:
"column": [
{
"name": "dynamicName",
"type": "json",
"dynamicFileType": "allDynamicField"
}
]
To synchronize a single dynamic field, where {singleDynamicName} is the field name in the collection:
"column": [
{
"name": "{singleDynamicName}",
"type": "int",
"dynamicFileType": "singleDynamicField"
}
]
Writer
Script demo
{
"transform": false,
"type": "job",
"version": "2.0",
"steps": [
{
"name": "Reader",
"category": "reader",
"stepType": "stream",
"parameter": {}
},
{
"name": "Writer",
"category": "writer",
"stepType": "milvus",
"parameter": {
"datasource": "zm_test",
"collection": "test",
"schemaCreateMode": "createIfNotExist",
"enableDynamicSchema": true,
"envType": 1,
"column": [
{
"name": "floatv1",
"type": "FloatVector",
"dimension": "3"
},
{
"name": "incol",
"type": "Int16"
}
],
"writeMode": "insert",
"batchSize": 1024
}
}
],
"setting": {
"errorLimit": {
"record": "0"
},
"speed": {
"concurrent": 2,
"throttle": false
}
}
}
Writer parameters
| Parameter | Description | Required | Default |
|---|---|---|---|
datasource |
The name of the data source as configured in DataWorks. | Yes | None |
collection |
The destination collection in Milvus. | Yes | None |
schemaCreateMode |
How DataWorks handles the collection before synchronization. See Collection creation modes. | Yes | createIfNotExist |
column |
The destination fields. Configure as an array of field objects. Each field requires name and type. Vector fields also require dimension, for example "dimension": "3". |
Yes | None |
writeMode |
The write behavior. See Write modes. | No | upsert |
partition |
The destination partition. Leave blank to write to the _default partition. |
No | _default |
batchSize |
The number of records to write per batch. | No | 1024 |
enableDynamicSchema |
Enables dynamic schema when DataWorks creates the collection. | No | true |
Write modes
| Mode | Behavior |
|---|---|
upsert |
For collections without auto-incrementing primary keys: updates an entity based on its primary key. For collections with auto-incrementing primary keys: replaces the primary key with an auto-generated value and inserts the data. |
insert |
Inserts data without checking for existing primary keys. Use for collections with auto-incrementing primary keys, where Milvus generates primary keys automatically. If you use insert on a collection without auto-incrementing primary keys, duplicate records may occur. |
Collection creation modes
schemaCreateMode controls what DataWorks does when the destination collection does not exist.
| Mode | Behavior |
|---|---|
createIfNotExist |
Creates the collection based on the configured column and other parameters, then starts the synchronization. If the collection already exists, uses it as-is. |
Ignore |
Reports an error if the collection does not exist. |
recreate |
Deletes the existing collection and creates a new one before each synchronization run. |