Machine learning features often contain text fields -- product reviews, descriptions, user queries -- that models cannot consume directly. PAI FeatureStore integrates with large language model (LLM) APIs to convert text fields into dense vector representations (embeddings) as part of the feature pipeline. Both offline batch and real-time streaming workflows are supported.
FeatureStore works with all embedding models, whether hosted on Alibaba Cloud or self-hosted. All examples in this guide use the Qwen3 text-embedding-v3 model.
Workflow overview
The end-to-end embedding pipeline has six stages:
-
Prepare data -- Create a MaxCompute table with the text field to embed.
-
Register the LLM -- Add the embedding model's API endpoint and credentials to FeatureStore.
-
Create a feature view -- Define a feature view that maps the text field to an embedding output through LLM-based feature production.
-
Synchronize data -- Populate the online store (FeatureDB) with embedding vectors.
-
Export for training -- Create model features, export training samples, and configure TorchEasyRec.
-
Deploy -- Serve the model through EAS and configure the PAI-Rec engine.
Prerequisites
Before you begin, make sure you have:
-
An API key from Alibaba Cloud Model Studio
-
An online store (FeatureDB) configured as a data source
FeatureStore LLM embedding only supports FeatureDB as the online store.
Prepare the data table
The following example uses a product review table. The review_text field contains the raw text to embed.
Create the MaxCompute table and insert sample data. Run these statements in DataWorks.
create table if not exists item_fea_review_text_v1(
item_id bigint COMMENT 'item_id'
,review_text string COMMENT 'review_text'
)
COMMENT 'item_fea_review_text_v1'
PARTITIONED BY (
ds string COMMENT 'data timestamp'
)
LIFECYCLE 365
;
INSERT INTO TABLE item_fea_review_text_v1 PARTITION (ds='20250509') -- If it's a partitioned table, insert into the latest partition by default
VALUES (901, 'Good Quality Dog Food,I have bought several of the Vitality canned dog food products and have found them all to be of good quality. The product looks more like a stew than a processed meat and it smells better. My Labrador is finicky and she appreciates this product better than most.'); -- Adjust values according to actual field types
select *
from item_fea_review_text_v1 where ds = '20250509';
Register model call information
Register the embedding model's API connection details so FeatureStore can call it during feature production.
-
Log on to the PAI console, and in the left-side navigation pane, choose Data Preparation > FeatureStore.
-
Select a workspace and click Enter FeatureStore.
-
On the Feature Production tab, click Create LLM Call Information and configure the following parameters. Keep other parameters at their default values.
Parameter Description Name Enter a custom name. base url Set to https://dashscope-intl.aliyuncs.com/compatible-mode/v1Model Name Select text-embedding-v3. Then, set Concurrency to 30, Maximum Input Tokens to 8192, and BatchSize to 10 (10 is the recommended maximum parallelism).
For parameter details, see Synchronous API details.
API Key Enter an API key from Alibaba Cloud Model Studio. -
Click OK.
Register a feature view
Define how FeatureStore transforms the raw text field into an embedding vector.
-
On the Feature View tab, click Create Feature View and configure the following parameters. Keep other parameters at their default values.
Parameter Description View Name Enter a custom name. Type Select Real Time. Feature Entity Select item. Feature Field Choose Table and add two fields: - item_id, INT64, select Primary Key. - review_text, ARRAY\<FLOAT\>, click Feature Production and configure: Transformation Type: Select LLMEmbedding. LLM Call Information: Select the registered LLM call information. Field Name: Enter review_text. The input field name must match the original field name, which maps to the feature production input field in the MaxCompute table for offline use or in Flink SQL / SDK for online use. -
Click Submit.
-
Click the feature view name to view its schema in JSON format.
Synchronize data
After registering the feature view, choose a synchronization strategy to populate FeatureDB with embedding vectors.
| Strategy | When to use | Data flow |
|---|---|---|
| Real-time sync (DataHub + Flink) | Real-time embedding extraction as data arrives | DataHub topic -> Flink SQL -> FeatureStore Connector -> FeatureDB |
| Offline sync (PyODPS) | Initial full sync or subsequent incremental sync | MaxCompute table -> FeatureStore SDK -> FeatureDB |
| Pre-extracted embeddings | Embeddings already computed outside FeatureStore | MaxCompute table (with embedding column) -> FeatureStore SDK -> FeatureDB (no LLM call) |
Real-time synchronization
Stream text data through DataHub and Flink. FeatureStore extracts embeddings in real time as records arrive.
Step 1: Create a DataHub topic
-
Log on to the DataHub console and create a project.
-
Click the project name to open the project details page.
-
Click Create Topic and configure the following parameters. Keep other parameters at their default values.
Parameter Description Name Enter a custom name. Schema Details Add two fields: - item_id, BIGINT, clear the Allow Null checkbox. - review_text, STRING, select the Allow NULL checkbox. -
Click Create.
-
Click the created topic to open the topic details page.
-
On the Subscription List tab, click Subscribe, fill in the relevant information, and click Create.
Step 2: Write Flink SQL to bridge DataHub and FeatureStore
The Flink SQL job has three parts: a source table reading from DataHub, a sink table writing to the FeatureStore Connector, and an INSERT statement linking the two.
CREATE TEMPORARY TABLE item_fea_embedding_debug_v1_dh
(
item_id bigint
,review_text string
)
WITH (
'connector' = 'datahub'
,'subId' = '1747297545688WFPX0'
,'endPoint' = 'http://dh-cn-beijing.aliyuncs.com'
,'project' = 'fs_test'
,'topic' = 'feature_store_llm_embedding_test_v1'
,'accessId' = 'xxx'
,'accessKey' = 'xxx'
)
;
CREATE TEMPORARY TABLE item_fea_embedding_debug_v1
(
item_id bigint
,review_text string
)
WITH (
'connector' = 'featurestore'
,'region_id' = 'cn-beijing'
,'project' = 'fs_demo_featuredb'
,'feature_view' = 'item_fea_embedding_debug_v1'
,'username' = 'xxx' -- Replace with your FeatureDB username
,'password' = 'xxx' -- Replace with your FeatureDB password
,'aliyun_access_id' = 'xxx'
,'aliyun_access_key' = 'xxx'
)
;
INSERT INTO item_fea_embedding_debug_v1
SELECT
item_id
,review_text
FROM item_fea_embedding_debug_v1_dh
;
The FeatureStore Connector automatically invokes the registered LLM to extract embeddings before writing to FeatureDB.
Step 3: Verify online features
Use the FeatureStore SDK to read features from the online store and confirm the real-time pipeline works.
Offline synchronization
Use PyODPS to publish MaxCompute table data to the online store. FeatureStore handles embedding extraction during the publish process.
Full sync
For initial deployments, synchronize the entire offline table to FeatureDB. Copy the following code to a PyODPS 3 node in DataWorks and run it.
from feature_store_py.fs_client import FeatureStoreClient
import datetime
from feature_store_py.fs_datasource import MaxComputeDataSource
import sys
from odps.accounts import StsAccount
cur_day = args['dt']
print('cur_day = ', cur_day)
access_key_id = o.account.access_id
access_key_secret = o.account.secret_access_key
sts_token = None
endpoint = 'paifeaturestore-vpc.cn-beijing.aliyuncs.com'
if isinstance(o.account, StsAccount):
sts_token = o.account.sts_token
fs = FeatureStoreClient(access_key_id=access_key_id, access_key_secret=access_key_secret, security_token=sts_token, endpoint=endpoint)
cur_project_name = 'fs_demo_featuredb'
project = fs.get_project(cur_project_name)
feature_view_name = 'item_fea_embedding_debug_v1'
batch_feature_view = project.get_feature_view(feature_view_name)
task = batch_feature_view.publish_table(partitions={'ds':cur_day}, mode='Merge', offline_to_online=True, publish_config={'offline_datasource_id':19, 'table_name': 'item_fea_review_text_v1'})
task.wait()
The publish_table() method triggers LLM embedding extraction and writes the results to FeatureDB.
| Parameter | Description |
|---|---|
mode='Merge' |
Upserts records into the online store. |
offline_to_online=True |
Enables the offline-to-online sync pipeline. |
publish_config |
Specifies the MaxCompute data source ID and table name. |
Incremental sync
After the initial full sync, synchronize only new records on a daily schedule. This involves two steps: computing the delta and publishing it.
Step 1: Compute the new item set. Run this SQL in DataWorks to identify items added since the previous day.
CREATE TABLE IF NOT EXISTS item_fea_review_text_new_tmp_v1 (
item_id STRING COMMENT 'Item ID',
review_text STRING COMMENT 'English description'
)
PARTITIONED BY (
ds string COMMENT 'data timestamp'
)
LIFECYCLE 365
;
-- Insert new data
INSERT INTO TABLE item_fea_review_text_new_tmp_v1 PARTITION (ds='${bdp.system.bizdate}')
SELECT
t1.item_id,
t1.review_text
FROM
item_fea_review_text_v1 t1
WHERE
t1.ds = '${bdp.system.bizdate}'
AND NOT EXISTS (
SELECT 1
FROM item_fea_review_text_v1 t2
WHERE t2.ds = TO_CHAR(DATEADD(TO_DATE('${bdp.system.bizdate}','yyyymmdd'), -1, 'dd'),'yyyymmdd')
AND t1.item_id = t2.item_id
);
Step 2: Publish the incremental table. Copy the following code to a PyODPS 3 node in DataWorks and run it. The only difference from the full sync script is the table name in publish_config.
from feature_store_py.fs_client import FeatureStoreClient
import datetime
from feature_store_py.fs_datasource import MaxComputeDataSource
import sys
from odps.accounts import StsAccount
cur_day = args['dt']
print('cur_day = ', cur_day)
access_key_id = o.account.access_id
access_key_secret = o.account.secret_access_key
sts_token = None
endpoint = 'paifeaturestore-vpc.cn-beijing.aliyuncs.com'
if isinstance(o.account, StsAccount):
sts_token = o.account.sts_token
fs = FeatureStoreClient(access_key_id=access_key_id, access_key_secret=access_key_secret, security_token=sts_token, endpoint=endpoint)
cur_project_name = 'fs_demo_featuredb'
project = fs.get_project(cur_project_name)
feature_view_name = 'item_fea_embedding_debug_v1'
batch_feature_view = project.get_feature_view(feature_view_name)
task = batch_feature_view.publish_table(partitions={'ds':cur_day}, mode='Merge', offline_to_online=True, publish_config={'offline_datasource_id':19, 'table_name': 'item_fea_review_text_new_tmp_v1'})
task.wait()
Synchronize pre-extracted embeddings
If embeddings are already computed outside FeatureStore, synchronize them directly without re-extraction. The source table must include a review_text_embedding field of type ARRAY<FLOAT>.
Create and populate the embedding table:
create table if not exists item_fea_review_text_embedding_debug_v1(
item_id bigint COMMENT 'item_id'
,review_text_embedding array<float> COMMENT 'review_text_embedding'
)
COMMENT 'item_fea_review_text_embedding_debug_v1'
PARTITIONED BY (
ds string COMMENT 'data timestamp'
)
LIFECYCLE 365
;
INSERT INTO TABLE item_fea_review_text_embedding_debug_v1 PARTITION (ds='20250509') -- If it's a partitioned table, insert into the latest partition by default
VALUES (902, cast(array(1,2,3,4,5,6,7.0,8.1,9.0) as array<float>)); -- Adjust values according to actual field types
select *
from item_fea_review_text_embedding_debug_v1 where ds = '20250509';
Publish the pre-extracted embeddings to FeatureDB. Set 'transform': False in publish_config to skip LLM embedding extraction.
from feature_store_py.fs_client import FeatureStoreClient
import datetime
from feature_store_py.fs_datasource import MaxComputeDataSource
import sys
from odps.accounts import StsAccount
cur_day = args['dt']
print('cur_day = ', cur_day)
access_key_id = o.account.access_id
access_key_secret = o.account.secret_access_key
sts_token = None
endpoint = 'paifeaturestore-vpc.cn-beijing.aliyuncs.com'
if isinstance(o.account, StsAccount):
sts_token = o.account.sts_token
fs = FeatureStoreClient(access_key_id=access_key_id, access_key_secret=access_key_secret, security_token=sts_token, endpoint=endpoint)
cur_project_name = 'fs_demo_featuredb'
project = fs.get_project(cur_project_name)
feature_view_name = 'item_fea_embedding_debug_v1'
batch_feature_view = project.get_feature_view(feature_view_name)
task = batch_feature_view.publish_table(partitions={'ds':cur_day}, mode='Merge', offline_to_online=True, publish_config={'offline_datasource_id':19, 'table_name': 'item_fea_review_text_embedding_debug_v1', 'transform': False})
task.wait()
task.print_summary()
Export training samples
-
Create model features -- Define model features combining embeddings with other views. See Configure FeatureStore projects.
-
Export training samples -- Export labeled training data including embedding features. See Export a training dataset and train a model.
-
Configure Feature Generator and training -- Use
create_fg_jsonscript in TorchEasyRec to generatefg.jsonfrom config. See Configure Feature Generator and training. For embedding features, see TorchEasyRec features.
Deploy model
-
Deploy EAS services -- Deploy trained model as online inference service through EAS. See Create and deploy EAS model services.
-
Configure PAI-Rec engine -- Connect deployed model to PAI-Rec recommendation engine. See Configure PAI-REC.