All Products
Search
Document Center

DataWorks:Embedding

Last Updated:Mar 26, 2026

Data Integration embedding vectorization lets you extract records from disparate data sources — such as OSS, MaxCompute, and Hadoop Distributed File System (HDFS) — convert them into vectors using an embedding model, and write the vectors directly to a vector storage destination. Supported destinations include Milvus, Elasticsearch, OpenSearch, and Hologres vector tables. This eliminates the need to write custom extract, transform, and load (ETL) scripts and helps you build AI scenarios such as retrieval-augmented generation (RAG), intelligent customer service, and search and recommendation.

How it works

Data Integration runs three stages in a single sync task:

Stage Role Description
Read Reader Extract records from the source (MaxCompute, OSS, and others).
Embed embedding-transformer Pass selected fields through an embedding model to produce vectors.
Write Writer Store the source fields and the generated vectors in the destination collection.

Because each stage runs as an independent operator, you can chain multiple transformers between a single Reader and Writer. When concurrency is set to 2, the job runs two parallel data processing streams.

Two configuration modes are available:

Limitations

  • This feature is only available for workspaces where the new version of Data Development is enabled.

  • Only Serverless resource groups are supported.

  • This feature is currently available for only some offline synchronization channels.

Billing

Data Integration tasks that use AI-assisted processing incur two types of costs: the standard Data Integration task cost and the cost of calling the embedding model. For details, see Data Integration scenarios.

Billing varies by model provider:

Model provider Billing reference
Alibaba Cloud DataWorks model service Billing of Serverless resource groups — Large language model services
Alibaba Cloud Model Studio Model inference (call) billing
Alibaba Cloud PAI model marketplace Elastic Algorithm Service (EAS) billing

Prerequisites

Before you begin, make sure you have:

The examples in this document use MaxCompute as the source and Milvus as the destination. Create both data sources before proceeding.

Prepare test data

The test data in this document comes from a public dataset (E-commerce Product Review Sentiment Prediction Dataset). User reviews are vectorized and written to Milvus for subsequent similarity searches.

MaxCompute source — create a test table and insert sample data:

-- Create a test table.
CREATE TABLE IF NOT EXISTS test_tb (
    sentence STRING,
    label STRING,
    dataset STRING
)
PARTITIONED BY (
    split STRING
)
LIFECYCLE 30;

-- Insert test data.
INSERT INTO test_tb PARTITION (split = 'dev')
SELECT * FROM VALUES
  ('Good for cleaning glass, but it is too small.', '1', 'jd'),
  ('The seller is so irresponsible. The quality of the clothes is terrible and not as shown in the picture.', '0', 'jd'),
  ('A great gift for international friends. Very nice!', '1', 'jd'),
  ('Very good. It will look beautiful once assembled.', '1', 'jd'),
  ('I returned the item to you, and you are trying to rip me off!!!', '0', 'jd'),
  ('Fast delivery. The book is genuine. JD.com is always my first choice for buying books!', '1', 'jd'),
  ('The taste is so good that I want to buy it again.', '1', 'jd'),
  ('The silicone smell is too strong, and it looks very different from the picture.', '0', 'jd'),
  ('So sad. I bought it for my Samsung N4, but it does not work. The customer service was unhelpful.', '0', 'jd'),
  ('The quality is good, the size is right, and it should be genuine! But I bought the black and gray one, and they sent me pure black. I am too lazy to exchange it, so I am leaving a bad review. Hope for improvement in the future!', '0', 'jd')
AS t (sentence, label, dataset);

-- Query the data.
SELECT * FROM test_tb WHERE split = 'dev';

Milvus destination — create a collection to receive the vectorized data. The collection has autoid enabled.

Field name Type Description
id Int64 Primary key. Auto-incrementing.
sentence VarChar(32) Stores the raw text.
sentence_e FloatVector(128) Vector field for similarity search. Uses the COSINE measure.

Codeless UI configuration

This section shows how to configure an offline sync task using the visual interface. The example reads from MaxCompute, vectorizes the sentence field, and writes the output to Milvus.

Step 1: Create an offline sync node

  1. Go to the Workspaces page in the DataWorks console. In the top navigation bar, select the target region. Find the workspace and choose Shortcuts > Data Studio in the Actions column.

  2. In the project folder, click ![image](https://help-static-aliyun-doc.aliyuncs.com/assets/img/en-US/8271236571/p998470.png) > Create Node > Data Integration > Batch Synchronization. Set the Data Source and Destination (source: MaxCompute, destination: Milvus) and the node Name, then click Confirm.

Step 2: Configure the offline sync task

  1. Configure basic information.

    • Data Source: Select the data sources for the source and destination.

    • Resource Group: Select the Serverless resource group attached to the workspace.

    If no data sources or resource groups are available, complete the prerequisites first.
  2. Configure the Data Source. The following table describes the key parameters for the MaxCompute source. If you use a different source, parameters may vary. Click Data Preview to verify the configuration.

    Parameter Description
    Tunnel Resource Group The default value for Tunnel Quota is Public Transmission Resource, which is the free quota for MaxCompute. For details on selecting a transmission resource, see Purchase and use exclusive Data Transmission Service resource groups.
    Important

    If an exclusive Tunnel Quota becomes unavailable due to an overdue payment or expiration, a running job automatically switches to Public Transmission Resource.

    Table The source table to synchronize. If no source tables are available, prepare the test data first.
    Filtering Method Supports Partition Filter and Data Filtering: if the source table is partitioned, filter by partition; if it is not partitioned, use a WHERE clause to select rows.
  3. Configure Data Processing.

    1. Enable the data processing switch. In the Data Processing List, click Add Node > Data Embedding to add an embedding processing node. image

    2. Configure the data vectorization node. image

      Note
      • Throughput depends on the embedding model. Alibaba Cloud Model Studio (QWen models) has a queries per second (QPS) limit. For the Alibaba Cloud PAI model marketplace, performance depends on the resource specifications of the deployed EAS service.

      • For a given set of parameters, embedding models produce deterministic vectors. Data Integration uses a Least Frequently Used (LFU) cache to skip redundant embedding calls for identical input data, improving throughput and reducing cost.

      Parameter Description
      Model Provider The embedding model provider. Supported options: Alibaba Cloud DataWorks model service, Alibaba Cloud Model Studio, and Alibaba Cloud PAI model marketplace.
      Model Name The embedding model to use. Select one based on your provider.
      Model API Key The API key for the selected model. For Alibaba Cloud Model Studio, see Obtain a Model Studio API key. For the Alibaba Cloud PAI model marketplace, open Online Debugging on the deployed EAS task and copy the value of the Authorization header.
      Model Endpoint Required when Model Provider is Alibaba Cloud PAI Model Marketplace. Enter the endpoint API address.
      Batch Size The number of records sent to the embedding model per call. Batch processing improves throughput and reduces cost. Default: 10.
      Select Fields To Vectorize The source columns to vectorize. Supports a single field or a concatenation of multiple fields. Also defines the output field name.
      Vectorization Output Field The name of the field that stores the output vector.
      Vector Dimension The dimension of the output vector. The configured embedding model must support the defined vector dimensions. Default: 1024.
      Convert NULL To Empty String Embedding models reject NULL inputs. Enable this option to convert NULL values to empty strings and avoid errors. Disabled by default.
      Concatenate Field Name When enabled, prepends the field name to the field value before vectorization. Requires configuring Field Name Delimiter. Disabled by default.
      Skip Empty Fields When concatenating multiple fields for vectorization, specifies whether to skip empty fields. By default, this option is selected, and empty fields are skipped.
    3. Preview the output. Click Data Output Preview in the upper-right corner of the vectorization node configuration area, then click Preview to inspect the vectorized results before running the full task. > Tip: Click Simulate Run at the top of the offline sync edit page for an alternative preview.

  4. Configure the Destination. The following table describes the key parameters for the Milvus destination.

    Parameter Description
    Collection The Milvus collection that receives the vector data.
    Partition Key Optional. If the collection is partitioned, specify the target partition.
    Write Mode upsert: if autoid is disabled, updates an entity by primary key; if autoid is enabled, replaces the primary key with an auto-generated one and inserts the record. insert: inserts data into collections with autoid enabled; Milvus generates the primary key automatically.
    Note

    Using insert on a collection without autoid enabled causes data duplication.

    image

  5. Configure Destination Field Mapping. After configuring the source, data processing, and destination, the task generates a field mapping automatically. Because the destination has no fixed schema, mappings are created row by row. Click Edit next to a Source Field or Target Field to adjust the mapping order or remove unnecessary fields. For this example, delete the fields that are not needed. The adjusted mapping looks like this:

    image

  6. Configure Advanced Configuration. Click Advanced Configuration on the right side of the node configuration page to set task concurrency, sync rate, and dirty data policy.

Step 3: Test run

  1. On the right side of the offline sync node edit page, click Run Configuration. Set the Resource Group and Script Parameters, then click Run in the top toolbar to test the pipeline.

  2. Go to Milvus and verify that the destination collection contains the expected data.

Step 4: Configure scheduling and publish

Click Scheduling on the right side of the offline sync task. Set the scheduling configuration parameters for periodic runs, then click Publish in the top toolbar. Follow the on-screen instructions to publish the task.

Code editor configuration

This section shows how to configure the same MaxCompute-to-Milvus pipeline using a JSON script. The code editor supports advanced configuration that the visual interface does not expose.

Step 1: Create an offline sync node

Follow the same steps as Step 1 in the codeless UI configuration.

Step 2: Configure the sync script

  1. Click image in the toolbar to switch to the code editor.

  2. Enter the JSON configuration for the sync task. All examples use the embedding-transformer step to define the embedding stage between the Reader and Writer. The script for this example is:

    {
        "type": "job",
        "version": "2.0",
        "steps": [
            {
                "stepType": "odps",
                "parameter": {
                    "partition": [
                        "split=dev"
                    ],
                    "datasource": "MaxCompute_Source",
                    "successOnNoPartition": true,
                    "tunnelQuota": "default",
                    "column": [
                        "sentence"
                    ],
                    "enableWhere": false,
                    "table": "test_tb"
                },
                "name": "Reader",
                "category": "reader"
            },
            {
                "category": "flatmap",
                "stepType": "embedding-transformer",
                "parameter": {
                    "modelProvider": "bailian",
                    "modelName": "text-embedding-v4",
                    "embeddingColumns": {
                        "sourceColumnNames": [
                            "sentence"
                        ],
                        "embeddingColumnName": "sentence_e"
                    },
                    "apiKey": "sk-****",
                    "dimension": 128,
                    "nullAsEmptyString": true
                },
                "displayName": "sentence2emb",
                "description": ""
            },
            {
                "stepType": "milvus",
                "parameter": {
                    "schemaCreateMode": "ignore",
                    "enableDynamicSchema": true,
                    "datasource": "Milvus_Source",
                    "column": [
                        {
                            "name": "sentence",
                            "type": "VarChar",
                            "elementType": "None",
                            "maxLength": "32"
                        },
                        {
                            "name": "sentence_e",
                            "type": "FloatVector",
                            "dimension": "128",
                            "elementType": "None",
                            "maxLength": "65535"
                        }
                    ],
                    "writeMode": "insert",
                    "collection": "Milvus_Collection",
                    "batchSize": 1024,
                    "columnMapping": [
                        {
                            "sourceColName": "sentence",
                            "dstColName": "sentence"
                        },
                        {
                            "sourceColName": "sentence_e",
                            "dstColName": "sentence_e"
                        }
                    ]
                },
                "name": "Writer",
                "category": "writer"
            }
        ],
        "setting": {
            "errorLimit": {
                "record": "0"
            },
            "speed": {
                "concurrent": 2,
                "throttle": false
            }
        },
        "order": {
            "hops": [
                {
                    "from": "Reader",
                    "to": "Writer"
                }
            ]
        }
    }
  3. The embedding-transformer step parameters are:

    Parameter Description Required
    modelProvider The embedding model provider. Options: dataworksModelService (DataWorks Large Language Model Service), bailian (Alibaba Cloud Model Studio, supports QWen models), paiModelGallery (Alibaba Cloud PAI model marketplace, supports BGE-M3 models). Yes
    modelName The embedding model name. When modelProvider is bailian: text-embedding-v4 or text-embedding-v3. When modelProvider is paiModelGallery: bge-m3. Yes
    apiKey The API key for the model provider. Yes
    endpoint The endpoint API address. Required when modelProvider is paiModelGallery. No
    batchSize Number of records per embedding call. Batch processing improves throughput and reduces cost. Default: 10. No
    embeddingColumns Defines which source columns to vectorize and the output field name. Supports a single field or multiple concatenated fields. Example: {"sourceColumnNames": ["col1", "col2"], "embeddingColumnName": "my_vector"}. Yes
    appendDelimiter Delimiter used to join multiple field values before vectorization. Default: \n. No
    skipEmptyValue When concatenating multiple fields, skip fields with empty values. Default: false. No
    dimension Output vector dimensions. The chosen model must support the configured value. Default: 1024. No
    nullAsEmptyString Convert NULL field values to empty strings before embedding. Embedding models reject NULL inputs. Default: false. No
    appendFieldNameEnable Prepend the field name to the field value before vectorization. When enabled, also configure appendFieldNameDelimiter. Default: false. No
    appendFieldNameDelimiter Delimiter for concatenating field names. Takes effect only when appendFieldNameEnable is true. No
  4. Click Dry Run at the top of the offline sync node edit page. Then, click Start Sampling and Preview to view the vectorized results and confirm that the configuration is correct.

  5. Click Advanced Configuration on the right side of the node configuration page to set task concurrency, sync rate, and dirty data policy.

Step 3: Test run

  1. Click Run Configuration on the right side of the edit page. Set the Resource Group and Script Parameters, then click Run to test the pipeline.

  2. Go to Milvus and verify that the destination collection contains the expected data.

Step 4: Configure scheduling and publish

Click Scheduling on the right side of the task. Set the scheduling configuration parameters for periodic runs, then click Publish in the top toolbar. Follow the on-screen instructions to publish the task.

Appendix 1: Code editor format

The basic structure of a sync script is:

{
    "type": "job",
    "version": "2.0",
    "steps": [
        {
            "stepType": "xxx",
            "parameter": {},
            "name": "Reader",
            "category": "reader"
        },
        {
            "stepType": "xxx",
            "parameter": {},
            "name": "transformer1",
            "category": "map/flatmap"
        },
        {
            "stepType": "xxx",
            "parameter": {},
            "name": "transformer2",
            "category": "map/flatmap"
        },
        {
            "stepType": "xxx",
            "parameter": {},
            "name": "Writer",
            "category": "writer"
        }
    ],
    "setting": {}
}

The steps array must contain at least one Reader and one Writer. Include as many Transformer steps as needed between them. Data flows through steps in the order they appear in the JSON. Each Reader, Transformer, and Writer runs as an independent operator; when concurrency is set to 2, the job runs two parallel data processing streams.

image

For the full parameter reference for all supported sources and destinations, see Supported data sources and sync solutions.

Appendix 2: OSS-to-Milvus example

This example reads JSONL files from OSS, extracts a nested JSON field, vectorizes the text, and writes the results to Milvus.

{
    "type": "job",
    "version": "2.0",
    "steps": [
        {
            "stepType": "oss",
            "parameter": {
                "datasource": "${OSS_Data_Source_Name}",
                "column": [
                    {
                        "name": "chunk_text",
                        "index": 0,
                        "type": "string"
                    }
                ],
                "fieldDelimiter": ",",
                "encoding": "UTF-8",
                "fileFormat": "jsonl",
                "object": [
                    "embedding/chunk1.jsonl"
                ]
            },
            "name": "Reader",
            "category": "reader"
        },
        {
            "stepType": "json-extracting",
            "parameter": {
                "column": [
                    {
                        "name": "text",
                        "fromColumn": "chunk_text",
                        "jsonPath": "$.text",
                        "type": "STRING",
                        "nullOrInvalidDataAction": "DIRTY_DATA"
                    }
                ]
            },
            "name": "jsonextract",
            "category": "flatmap"
        },
        {
            "stepType": "embedding-transformer",
            "parameter": {
                "modelProvider": "bailian",
                "modelName": "text-embedding-v4",
                "apiKey": "${Your_API_Key}",
                "embeddingColumns": {
                    "sourceColumnNames": [
                        "text"
                    ],
                    "embeddingColumnName": "my_vector"
                },
                "batchSize": 8,
                "dimension": 1024
            },
            "name": "embedding",
            "category": "flatmap"
        },
        {
            "stepType": "milvus",
            "parameter": {
                "schemaCreateMode": "ignore",
                "enableDynamicSchema": true,
                "datasource": "${Milvus_Data_Source_Name}",
                "column": [
                    {
                        "name": "my_vector",
                        "type": "FloatVector",
                        "dimension": "1024",
                        "elementType": "None",
                        "maxLength": "65535"
                    },
                    {
                        "name": "text",
                        "type": "VarChar",
                        "elementType": "None",
                        "maxLength": "65535"
                    }
                ],
                "collection": "yunshi_vector_07171130",
                "writeMode": "insert",
                "batchSize": 1024,
                "columnMapping": [
                    {
                        "sourceColName": "my_vector",
                        "dstColName": "my_vector"
                    },
                    {
                        "sourceColName": "text",
                        "dstColName": "text"
                    }
                ]
            },
            "name": "Writer",
            "category": "writer"
        }
    ],
    "setting": {
        "errorLimit": {
            "record": "0"
        },
        "speed": {
            "concurrent": 1
        }
    }
}

This pipeline uses a json-extracting transformer to parse the text field from the raw JSONL before passing it to embedding-transformer. When your source data contains nested JSON, embeddingColumns.sourceColumnNames must reference flat column names — use a json-extracting step to extract nested fields first.