Data Integration embedding vectorization lets you extract records from disparate data sources — such as OSS, MaxCompute, and Hadoop Distributed File System (HDFS) — convert them into vectors using an embedding model, and write the vectors directly to a vector storage destination. Supported destinations include Milvus, Elasticsearch, OpenSearch, and Hologres vector tables. This eliminates the need to write custom extract, transform, and load (ETL) scripts and helps you build AI scenarios such as retrieval-augmented generation (RAG), intelligent customer service, and search and recommendation.
How it works
Data Integration runs three stages in a single sync task:
| Stage | Role | Description |
|---|---|---|
| Read | Reader | Extract records from the source (MaxCompute, OSS, and others). |
| Embed | embedding-transformer | Pass selected fields through an embedding model to produce vectors. |
| Write | Writer | Store the source fields and the generated vectors in the destination collection. |
Because each stage runs as an independent operator, you can chain multiple transformers between a single Reader and Writer. When concurrency is set to 2, the job runs two parallel data processing streams.
Two configuration modes are available:
-
Codeless UI configuration: configure an offline embedding sync task through a visual interface.
-
Code editor configuration: write a JSON script for advanced or custom pipeline requirements.
Limitations
-
This feature is only available for workspaces where the new version of Data Development is enabled.
-
Only Serverless resource groups are supported.
-
This feature is currently available for only some offline synchronization channels.
Billing
Data Integration tasks that use AI-assisted processing incur two types of costs: the standard Data Integration task cost and the cost of calling the embedding model. For details, see Data Integration scenarios.
Billing varies by model provider:
| Model provider | Billing reference |
|---|---|
| Alibaba Cloud DataWorks model service | Billing of Serverless resource groups — Large language model services |
| Alibaba Cloud Model Studio | Model inference (call) billing |
| Alibaba Cloud PAI model marketplace | Elastic Algorithm Service (EAS) billing |
Prerequisites
Before you begin, make sure you have:
-
A workspace with the new version of Data Development enabled
-
A Serverless resource group attached to the workspace
-
An embedding model service from one of the supported providers:
-
Alibaba Cloud DataWorks model service: Deploy a model in Large Language Model Service Management.
-
Alibaba Cloud Model Studio: Activate Model Studio and obtain an API key.
-
Alibaba Cloud PAI model marketplace: Activate Platform for AI (PAI) and obtain a token for the model service.
-
-
Source and destination data sources created for the offline sync task
The examples in this document use MaxCompute as the source and Milvus as the destination. Create both data sources before proceeding.
Codeless UI configuration
This section shows how to configure an offline sync task using the visual interface. The example reads from MaxCompute, vectorizes the sentence field, and writes the output to Milvus.
Step 1: Create an offline sync node
-
Go to the Workspaces page in the DataWorks console. In the top navigation bar, select the target region. Find the workspace and choose Shortcuts > Data Studio in the Actions column.
-
In the project folder, click  > Create Node > Data Integration > Batch Synchronization. Set the Data Source and Destination (source: MaxCompute, destination: Milvus) and the node Name, then click Confirm.
Step 2: Configure the offline sync task
-
Configure basic information.
-
Data Source: Select the data sources for the source and destination.
-
Resource Group: Select the Serverless resource group attached to the workspace.
If no data sources or resource groups are available, complete the prerequisites first.
-
-
Configure the Data Source. The following table describes the key parameters for the MaxCompute source. If you use a different source, parameters may vary. Click Data Preview to verify the configuration.
Parameter Description Tunnel Resource Group The default value for Tunnel Quota is Public Transmission Resource, which is the free quota for MaxCompute. For details on selecting a transmission resource, see Purchase and use exclusive Data Transmission Service resource groups.ImportantIf an exclusive Tunnel Quota becomes unavailable due to an overdue payment or expiration, a running job automatically switches to
Public Transmission Resource.Table The source table to synchronize. If no source tables are available, prepare the test data first. Filtering Method Supports Partition Filter and Data Filtering: if the source table is partitioned, filter by partition; if it is not partitioned, use a WHEREclause to select rows. -
Configure Data Processing.
-
Enable the data processing switch. In the Data Processing List, click Add Node > Data Embedding to add an embedding processing node.

-
Configure the data vectorization node.
Note-
Throughput depends on the embedding model. Alibaba Cloud Model Studio (QWen models) has a queries per second (QPS) limit. For the Alibaba Cloud PAI model marketplace, performance depends on the resource specifications of the deployed EAS service.
-
For a given set of parameters, embedding models produce deterministic vectors. Data Integration uses a Least Frequently Used (LFU) cache to skip redundant embedding calls for identical input data, improving throughput and reducing cost.
Parameter Description Model Provider The embedding model provider. Supported options: Alibaba Cloud DataWorks model service, Alibaba Cloud Model Studio, and Alibaba Cloud PAI model marketplace. Model Name The embedding model to use. Select one based on your provider. Model API Key The API key for the selected model. For Alibaba Cloud Model Studio, see Obtain a Model Studio API key. For the Alibaba Cloud PAI model marketplace, open Online Debugging on the deployed EAS task and copy the value of the Authorizationheader.Model Endpoint Required when Model Provider is Alibaba Cloud PAI Model Marketplace. Enter the endpoint API address. Batch Size The number of records sent to the embedding model per call. Batch processing improves throughput and reduces cost. Default: 10.Select Fields To Vectorize The source columns to vectorize. Supports a single field or a concatenation of multiple fields. Also defines the output field name. Vectorization Output Field The name of the field that stores the output vector. Vector Dimension The dimension of the output vector. The configured embedding model must support the defined vector dimensions. Default: 1024.Convert NULL To Empty String Embedding models reject NULL inputs. Enable this option to convert NULL values to empty strings and avoid errors. Disabled by default. Concatenate Field Name When enabled, prepends the field name to the field value before vectorization. Requires configuring Field Name Delimiter. Disabled by default. Skip Empty Fields When concatenating multiple fields for vectorization, specifies whether to skip empty fields. By default, this option is selected, and empty fields are skipped. -
-
Preview the output. Click Data Output Preview in the upper-right corner of the vectorization node configuration area, then click Preview to inspect the vectorized results before running the full task. > Tip: Click Simulate Run at the top of the offline sync edit page for an alternative preview.
-
-
Configure the Destination. The following table describes the key parameters for the Milvus destination.
Parameter Description Collection The Milvus collection that receives the vector data. Partition Key Optional. If the collection is partitioned, specify the target partition. Write Mode upsert: if autoid is disabled, updates an entity by primary key; if autoid is enabled, replaces the primary key with an auto-generated one and inserts the record.insert: inserts data into collections with autoid enabled; Milvus generates the primary key automatically.NoteUsing
inserton a collection without autoid enabled causes data duplication.
-
Configure Destination Field Mapping. After configuring the source, data processing, and destination, the task generates a field mapping automatically. Because the destination has no fixed schema, mappings are created row by row. Click Edit next to a Source Field or Target Field to adjust the mapping order or remove unnecessary fields. For this example, delete the fields that are not needed. The adjusted mapping looks like this:

-
Configure Advanced Configuration. Click Advanced Configuration on the right side of the node configuration page to set task concurrency, sync rate, and dirty data policy.
Step 3: Test run
-
On the right side of the offline sync node edit page, click Run Configuration. Set the Resource Group and Script Parameters, then click Run in the top toolbar to test the pipeline.
-
Go to Milvus and verify that the destination collection contains the expected data.
Step 4: Configure scheduling and publish
Click Scheduling on the right side of the offline sync task. Set the scheduling configuration parameters for periodic runs, then click Publish in the top toolbar. Follow the on-screen instructions to publish the task.
Code editor configuration
This section shows how to configure the same MaxCompute-to-Milvus pipeline using a JSON script. The code editor supports advanced configuration that the visual interface does not expose.
Step 1: Create an offline sync node
Follow the same steps as Step 1 in the codeless UI configuration.
Step 2: Configure the sync script
-
Click
in the toolbar to switch to the code editor. -
Enter the JSON configuration for the sync task. All examples use the
embedding-transformerstep to define the embedding stage between the Reader and Writer. The script for this example is:-
For Reader and Writer parameter details, see MaxCompute data source and Milvus data source.
-
For other source and destination types, see Data source list.
{ "type": "job", "version": "2.0", "steps": [ { "stepType": "odps", "parameter": { "partition": [ "split=dev" ], "datasource": "MaxCompute_Source", "successOnNoPartition": true, "tunnelQuota": "default", "column": [ "sentence" ], "enableWhere": false, "table": "test_tb" }, "name": "Reader", "category": "reader" }, { "category": "flatmap", "stepType": "embedding-transformer", "parameter": { "modelProvider": "bailian", "modelName": "text-embedding-v4", "embeddingColumns": { "sourceColumnNames": [ "sentence" ], "embeddingColumnName": "sentence_e" }, "apiKey": "sk-****", "dimension": 128, "nullAsEmptyString": true }, "displayName": "sentence2emb", "description": "" }, { "stepType": "milvus", "parameter": { "schemaCreateMode": "ignore", "enableDynamicSchema": true, "datasource": "Milvus_Source", "column": [ { "name": "sentence", "type": "VarChar", "elementType": "None", "maxLength": "32" }, { "name": "sentence_e", "type": "FloatVector", "dimension": "128", "elementType": "None", "maxLength": "65535" } ], "writeMode": "insert", "collection": "Milvus_Collection", "batchSize": 1024, "columnMapping": [ { "sourceColName": "sentence", "dstColName": "sentence" }, { "sourceColName": "sentence_e", "dstColName": "sentence_e" } ] }, "name": "Writer", "category": "writer" } ], "setting": { "errorLimit": { "record": "0" }, "speed": { "concurrent": 2, "throttle": false } }, "order": { "hops": [ { "from": "Reader", "to": "Writer" } ] } } -
-
The
embedding-transformerstep parameters are:Parameter Description Required modelProviderThe embedding model provider. Options: dataworksModelService(DataWorks Large Language Model Service),bailian(Alibaba Cloud Model Studio, supports QWen models),paiModelGallery(Alibaba Cloud PAI model marketplace, supports BGE-M3 models).Yes modelNameThe embedding model name. When modelProviderisbailian:text-embedding-v4ortext-embedding-v3. WhenmodelProviderispaiModelGallery:bge-m3.Yes apiKeyThe API key for the model provider. Yes endpointThe endpoint API address. Required when modelProviderispaiModelGallery.No batchSizeNumber of records per embedding call. Batch processing improves throughput and reduces cost. Default: 10.No embeddingColumnsDefines which source columns to vectorize and the output field name. Supports a single field or multiple concatenated fields. Example: {"sourceColumnNames": ["col1", "col2"], "embeddingColumnName": "my_vector"}.Yes appendDelimiterDelimiter used to join multiple field values before vectorization. Default: \n.No skipEmptyValueWhen concatenating multiple fields, skip fields with empty values. Default: false.No dimensionOutput vector dimensions. The chosen model must support the configured value. Default: 1024.No nullAsEmptyStringConvert NULL field values to empty strings before embedding. Embedding models reject NULL inputs. Default: false.No appendFieldNameEnablePrepend the field name to the field value before vectorization. When enabled, also configure appendFieldNameDelimiter. Default:false.No appendFieldNameDelimiterDelimiter for concatenating field names. Takes effect only when appendFieldNameEnableistrue.No -
Click Dry Run at the top of the offline sync node edit page. Then, click Start Sampling and Preview to view the vectorized results and confirm that the configuration is correct.
-
Click Advanced Configuration on the right side of the node configuration page to set task concurrency, sync rate, and dirty data policy.
Step 3: Test run
-
Click Run Configuration on the right side of the edit page. Set the Resource Group and Script Parameters, then click Run to test the pipeline.
-
Go to Milvus and verify that the destination collection contains the expected data.
Step 4: Configure scheduling and publish
Click Scheduling on the right side of the task. Set the scheduling configuration parameters for periodic runs, then click Publish in the top toolbar. Follow the on-screen instructions to publish the task.
Appendix 1: Code editor format
The basic structure of a sync script is:
{
"type": "job",
"version": "2.0",
"steps": [
{
"stepType": "xxx",
"parameter": {},
"name": "Reader",
"category": "reader"
},
{
"stepType": "xxx",
"parameter": {},
"name": "transformer1",
"category": "map/flatmap"
},
{
"stepType": "xxx",
"parameter": {},
"name": "transformer2",
"category": "map/flatmap"
},
{
"stepType": "xxx",
"parameter": {},
"name": "Writer",
"category": "writer"
}
],
"setting": {}
}
The steps array must contain at least one Reader and one Writer. Include as many Transformer steps as needed between them. Data flows through steps in the order they appear in the JSON. Each Reader, Transformer, and Writer runs as an independent operator; when concurrency is set to 2, the job runs two parallel data processing streams.
For the full parameter reference for all supported sources and destinations, see Supported data sources and sync solutions.
Appendix 2: OSS-to-Milvus example
This example reads JSONL files from OSS, extracts a nested JSON field, vectorizes the text, and writes the results to Milvus.
{
"type": "job",
"version": "2.0",
"steps": [
{
"stepType": "oss",
"parameter": {
"datasource": "${OSS_Data_Source_Name}",
"column": [
{
"name": "chunk_text",
"index": 0,
"type": "string"
}
],
"fieldDelimiter": ",",
"encoding": "UTF-8",
"fileFormat": "jsonl",
"object": [
"embedding/chunk1.jsonl"
]
},
"name": "Reader",
"category": "reader"
},
{
"stepType": "json-extracting",
"parameter": {
"column": [
{
"name": "text",
"fromColumn": "chunk_text",
"jsonPath": "$.text",
"type": "STRING",
"nullOrInvalidDataAction": "DIRTY_DATA"
}
]
},
"name": "jsonextract",
"category": "flatmap"
},
{
"stepType": "embedding-transformer",
"parameter": {
"modelProvider": "bailian",
"modelName": "text-embedding-v4",
"apiKey": "${Your_API_Key}",
"embeddingColumns": {
"sourceColumnNames": [
"text"
],
"embeddingColumnName": "my_vector"
},
"batchSize": 8,
"dimension": 1024
},
"name": "embedding",
"category": "flatmap"
},
{
"stepType": "milvus",
"parameter": {
"schemaCreateMode": "ignore",
"enableDynamicSchema": true,
"datasource": "${Milvus_Data_Source_Name}",
"column": [
{
"name": "my_vector",
"type": "FloatVector",
"dimension": "1024",
"elementType": "None",
"maxLength": "65535"
},
{
"name": "text",
"type": "VarChar",
"elementType": "None",
"maxLength": "65535"
}
],
"collection": "yunshi_vector_07171130",
"writeMode": "insert",
"batchSize": 1024,
"columnMapping": [
{
"sourceColName": "my_vector",
"dstColName": "my_vector"
},
{
"sourceColName": "text",
"dstColName": "text"
}
]
},
"name": "Writer",
"category": "writer"
}
],
"setting": {
"errorLimit": {
"record": "0"
},
"speed": {
"concurrent": 1
}
}
}
This pipeline uses a json-extracting transformer to parse the text field from the raw JSONL before passing it to embedding-transformer. When your source data contains nested JSON, embeddingColumns.sourceColumnNames must reference flat column names — use a json-extracting step to extract nested fields first.