This tutorial walks you through building and publishing an end-to-end recommendation system on Platform for AI (PAI) FeatureStore using the FeatureStore Python SDK. You will:
-
Prepare data — synchronize sample tables and configure offline and online data sources.
-
Define feature engineering objects — create a project, entities, and three types of feature views using the SDK.
-
Export a training set and train a ranking model with EasyRec.
-
Publish the model — schedule data sync jobs, deploy an Elastic Algorithm Service (EAS) inference service, and configure the PAI-Rec recommendation engine.
Prerequisites
Before you begin, ensure that you have:
-
Platform for AI (PAI): An activated PAI instance with a default workspace. Store your AccessKey ID and AccessKey secret as environment variables. For setup instructions, see Activate PAI and create a default workspace and Create an AccessKey pair.
-
MaxCompute: An activated MaxCompute instance with a project created. See Activate MaxCompute and Create a MaxCompute project.
-
FeatureDB: An activated FeatureDB instance. See Create an online data source: FeatureDB.
-
DataWorks: An activated DataWorks instance with the following configured:
-
A DataWorks workspace. See Create a workspace.
-
An exclusive resource group for scheduling attached to the workspace. See Use exclusive resource groups for scheduling.
-
A MaxCompute data source configured for the PAI workspace. See Configure a data source.
-
A MaxCompute compute engine attached to the PAI workspace. See Attach a MaxCompute Engine.
-
A Hologres compute engine attached to the PAI workspace. See Attach a Hologres computing engine.
-
Step 1: Prepare data
Synchronize data tables
A recommendation system typically requires five types of data tables: a user feature table, an item feature table, a label table, a sequence feature table, and a behavior table.
This tutorial uses pre-built sample tables from the pai_online_project MaxCompute project. Run the following SQL commands in DataWorks to copy these tables into your own MaxCompute project.
-
Log on to the DataWorks console.
-
In the left navigation pane, click Data Development & O&M > Data Development.
-
Select your DataWorks workspace and click Enter Data Development.
-
Hover over New and choose New Node > MaxCompute > ODPS SQL. In the dialog, configure the node:
Parameter Value Engine instance Select your MaxCompute engine Node type ODPS SQL Path Business Flow/Workflow/MaxCompute Name Any name you choose -
Click Confirm.
-
In the node editor, run the following SQL commands. Select your exclusive resource group for each command.
Synchronize the user table(rec_sln_demo_user_table_preprocess_all_feature_v1):
Synchronize the item table(rec_sln_demo_item_table_preprocess_all_feature_v1):
Synchronize the label table(rec_sln_demo_label_table):
Synchronize the sequence feature table(rec_sln_demo_behavior_table_preprocess_sequence_wide_seq_feature_v3):
Synchronize the behavior table(rec_sln_demo_behavior_table_preprocess_v3):
After all five commands run successfully, the synchronized tables are available in your workspace.
Configure data sources
FeatureStore needs two data sources: an offline data source (MaxCompute, for training data) and an online data source (FeatureDB, Hologres, or TableStore, for real-time feature serving). This tutorial uses MaxCompute and FeatureDB.
-
Log on to the PAI console. In the left navigation pane, click Data Preparation > FeatureStore.
-
Select a workspace and click Enter FeatureStore.
-
On the Data Source tab, click Create Store and configure the MaxCompute data source: Click Submit.
Parameter Value Type MaxCompute Name Any name you choose MaxCompute project name Select your MaxCompute project -
Click Create Store again and configure the FeatureDB data source: Click Submit.
Parameter Value Type FeatureDB Name feature_db(fixed; custom names are not supported)Username Set a username Password Set a password VPC high-speed connection (optional) Enables direct FeatureDB access through a PrivateLink connection from within a VPC, improving read/write performance and reducing latency VPC Select the VPC where your online FeatureStore service is located Zone and vSwitch Select vSwitches in at least two zones for high availability
Step 2: Create feature engineering objects using the SDK
Install the FeatureStore Python SDK. A Python 3 environment is required. Run all code in this step in a Jupyter Notebook environment.
pip install https://feature-store-py.oss-cn-beijing.aliyuncs.com/package/feature_store_py-2.0.2-py3-none-any.whl
Import the required modules:
import sys
import os
from feature_store_py.fs_client import FeatureStoreClient
from feature_store_py.fs_project import FeatureStoreProject
from feature_store_py.fs_datasource import (
MaxComputeDataSource, LabelInput, TrainingSetOutput
)
from feature_store_py.fs_feature_view import FeatureView
from feature_store_py.fs_features import FeatureSelector
from feature_store_py.fs_config import (
LabelInputConfig, PartitionConfig, FeatureViewConfig,
TrainSetOutputConfig, SequenceFeatureConfig, SequenceTableConfig
)
import logging
logger = logging.getLogger("fs_demo")
logger.addHandler(logging.StreamHandler(stream=sys.stdout))
Create a project
A FeatureStore project is an isolated namespace that groups entities, feature views, and models. The offline_datasource_id and online_datasource_id parameters link the project to the data sources you configured in Step 1.
This tutorial uses a project named fs_demo.
access_id = os.environ.get("ACCESS_KEY_ID") # AccessKey ID from environment variable
access_ak = os.environ.get("ACCESS_KEY_SECRET") # AccessKey secret from environment variable
region = 'cn-beijing'
fs = FeatureStoreClient(access_key_id=access_id, access_key_secret=access_ak, region=region)
cur_project_name = "fs_demo"
project = fs.get_project(cur_project_name)
if project is None:
raise ValueError(f"Project '{cur_project_name}' not found. Create the project in the PAI console first.")
print(project)
Create feature entities
A feature entity represents a domain object (such as a user or an item) and acts as the join key that links features from multiple feature views together. Each entity has a join_id — the logical identifier used for joining. When you create a feature view, you also specify a primary_key, which is the physical column name in the underlying data table used to look up features. The join_id and primary_key can differ in name.
In a recommendation system, all features belong to one of two entities: user or item.
Create the user entity:
user_entity_name = "user"
user_join_id = 'user_id' # Logical join key; used to associate features across views
user_entity = project.get_entity(user_entity_name)
if user_entity is None:
user_entity = project.create_entity(name=user_entity_name, join_id=user_join_id)
user_entity.print_summary()
Create the item entity:
item_entity_name = "item"
item_join_id = 'item_id'
item_entity = project.get_entity(item_entity_name)
if item_entity is None:
item_entity = project.create_entity(name=item_entity_name, join_id=item_join_id)
item_entity.print_summary()
Register feature views
A feature view is the core abstraction in FeatureStore. It defines the data source, the feature schema (names and types), and the storage target (online store, offline store, or both). FeatureStore supports three types of feature views:
| Type | Use case |
|---|---|
| Batch FeatureView | Offline or T-1 day features synchronized from a data warehouse |
| Sequence FeatureView | User behavior sequences combining T-1 day historical data and real-time events |
| Stream FeatureView | Real-time features written directly to the online store |
Register batch feature views
A Batch FeatureView loads data from the offline store (MaxCompute) and can publish it to the online store for real-time serving. This type is suited for features that are updated daily, such as user profiles and item metadata.
Register the user feature view:
user_feature_view_name = "user_table_preprocess_all_feature_v1"
user_table_name = "rec_sln_demo_user_table_preprocess_all_feature_v1"
user_feature_view = project.get_feature_view(user_feature_view_name)
if user_feature_view is None:
ds = MaxComputeDataSource(project.offline_datasource_id, user_table_name)
user_feature_view = project.create_batch_feature_view(
name=user_feature_view_name,
datasource=ds,
online=True, # Enables publishing data to the online store
entity=user_entity_name,
primary_key='user_id', # Physical column in the table used to look up user features
register=True
)
print(user_feature_view)
Publish the 20231023 partition to the online store:
user_task = user_feature_view.publish_table({'ds': '20231023'})
user_task.wait()
user_task.print_summary()
Register the item feature view:
item_feature_view_name = "item_table_preprocess_all_feature_v1"
item_table_name = "rec_sln_demo_item_table_preprocess_all_feature_v1"
item_feature_view = project.get_feature_view(item_feature_view_name)
if item_feature_view is None:
ds = MaxComputeDataSource(project.offline_datasource_id, item_table_name)
item_feature_view = project.create_batch_feature_view(
name=item_feature_view_name,
datasource=ds,
online=True,
entity=item_entity_name,
primary_key='item_id',
register=True
)
print(item_feature_view)
Publish the 20231023 partition to the online store:
item_task = item_feature_view.publish_table({'ds': '20231023'})
item_task.wait()
item_task.print_summary()
Register a Sequence FeatureView
A Sequence FeatureView captures user behavior sequences for real-time recommendation. When a sequence query runs online, FeatureStore automatically merges data from two sources:
-
B1 (T-1 day behavior table): synchronized from the offline behavior table (A1), with deduplication applied automatically.
-
B2 (real-time day-T behavior table): written in real time via API or streaming tools such as Flink.
During registration, you provide the offline sequence table (F1) and the offline behavior table (A1). FeatureStore manages the online tables and the synchronization process automatically.
seq_feature_view_name = "wide_seq_feature_v3"
seq_feature_view = project.get_feature_view(seq_feature_view_name)
if seq_feature_view is None:
seq_table_name = "rec_sln_demo_behavior_table_preprocess_sequence_wide_seq_feature_v3"
behavior_table_name = 'rec_sln_demo_behavior_table_preprocess_v3'
ds = MaxComputeDataSource(project.offline_datasource_id, behavior_table_name)
# Field names in the behavior table
event_time = 'event_unix_time'
item_id = 'item_id'
event = 'event'
# Define which sequence features to build and their online names
# deduplication_method=1: deduplicate by [user_id, item_id, event]
# deduplication_method=2: deduplicate by [user_id, item_id, event, event_time]
sequence_feature_config_list = [
SequenceFeatureConfig(
offline_seq_name='click_seq_50_seq', # Column name in the offline sequence table
seq_event='click', # Filter behavior records to this event type
online_seq_name='click_seq_50', # Name used when serving the sequence online
seq_len=50 # Max sequence length; longer sequences are truncated
)
]
seq_table_config = SequenceTableConfig(
table_name=seq_table_name,
primary_key='user_id',
event_time='event_unix_time'
)
seq_feature_view = project.create_sequence_feature_view(
seq_feature_view_name,
datasource=ds,
event_time=event_time,
item_id=item_id,
event=event,
deduplication_method=1,
sequence_feature_config=sequence_feature_config_list,
sequence_table_config=seq_table_config,
entity=user_entity_name
)
print(seq_feature_view)
Publish the 20231023 partition. The days_to_load=30 parameter tells FeatureStore to check the previous 30 days of partitions and backfill any missing data automatically:
seq_task = seq_feature_view.publish_table({'ds': '20231023'}, days_to_load=30)
seq_task.wait()
seq_task.print_summary()
Register the label table
A Stream FeatureView writes data directly to the online store and syncs it back to the offline store, making it suitable for real-time features such as current item prices or click counts.
Register the label table used to build training samples:
label_table_name = 'rec_sln_demo_label_table'
ds = MaxComputeDataSource(data_source_id=project.offline_datasource_id, table=label_table_name)
label_table = project.get_label_table(label_table_name)
if label_table is None:
label_table = project.create_label_table(datasource=ds, event_time='event_unix_time')
print(label_table)
Retrieve online features
To verify that offline-to-online synchronization is working correctly, retrieve features directly from the online store:
user_feature_view_name = "user_table_preprocess_all_feature_v1"
user_feature_view = project.get_feature_view(user_feature_view_name)
# Retrieve specific features for two users
ret_features = user_feature_view.get_online_features(
join_ids={'user_id': ['169898460', '148811946']},
features=['user_id', 'gender', 'age', 'city']
)
print("Online features:", ret_features)
Build the training set
A training set joins label data with features from multiple feature views using a point-in-time join — for each label record, FeatureStore looks up the feature values that were valid at the label's event time. This prevents future feature values from leaking into training data.
label_table_name = 'rec_sln_demo_label_table'
output_ds = MaxComputeDataSource(data_source_id=project.offline_datasource_id)
train_set_output = TrainingSetOutput(output_ds)
# Select all features from the user and item views; select the click sequence from the sequence view
user_feature_selector = FeatureSelector("user_table_preprocess_all_feature_v1", '*')
item_feature_selector = FeatureSelector("item_table_preprocess_all_feature_v1", '*')
seq_feature_selector = FeatureSelector("wide_seq_feature_v3", ['click_seq_50_seq'])
train_set = project.create_training_set(
label_table_name=label_table_name,
train_set_output=train_set_output,
feature_selectors=[user_feature_selector, item_feature_selector, seq_feature_selector]
)
print("Training set:", train_set)
Create a model
Register a model that links to the training set. The model tracks which feature views and training set table the model was built from.
model_name = "fs_rank_v2"
cur_model = project.get_model(model_name)
if cur_model is None:
cur_model = project.create_model(model_name, train_set)
print("Training set table name:", cur_model.train_set_table_name)
Step 3: Export the training set and train the model
Export the training set
Export the sample table by specifying the label partition and the partition (or event time) for each feature view. FeatureStore performs point-in-time joins across all views to produce a consistent snapshot.
cur_day = '20231024'
pre_day = '20231023'
# Label table: use today's partition
label_partitions = PartitionConfig(name='ds', value=cur_day)
label_input_config = LabelInputConfig(partition_config=label_partitions)
# User and item features: use yesterday's T-1 partition
feature_view_user_config = FeatureViewConfig(
name='user_table_preprocess_all_feature_v1',
partition_config=PartitionConfig(name='ds', value=pre_day)
)
feature_view_item_config = FeatureViewConfig(
name='item_table_preprocess_all_feature_v1',
partition_config=PartitionConfig(name='ds', value=pre_day)
)
# Sequence features: use today's partition with exact event time matching
feature_view_seq_config = FeatureViewConfig(
name='wide_seq_feature_v3',
partition_config=PartitionConfig(name='ds', value=cur_day),
event_time='event_unix_time',
equal=True
)
feature_view_config_list = [
feature_view_user_config,
feature_view_item_config,
feature_view_seq_config
]
train_set_output_config = TrainSetOutputConfig(
partition_config=PartitionConfig(name='ds', value=cur_day)
)
model_name = 'fs_rank_v2'
cur_model = project.get_model(model_name)
task = cur_model.export_train_set(label_input_config, feature_view_config_list, train_set_output_config)
task.wait()
print("Export task summary:", task.task_summary)
Train the model
EasyRec is an open-source recommendation system framework that integrates with FeatureStore. Use the exported fs_demo_fs_rank_v2_training_set table as input to train a ranking model.
For questions about EasyRec, join the DingTalk group (ID: 32260796).
Step 4: Publish the model
Schedule data synchronization jobs
Before serving live traffic, set up scheduled jobs to keep the online store up to date by regularly publishing new offline partitions.
-
Log on to the DataWorks console.
-
In the left navigation pane, click Data Development & O&M > Data Development.
-
Select your DataWorks workspace and click Enter Data Development.
After synchronization completes, verify the latest features in Hologres.
Deploy an EAS model service
The EAS inference service receives scoring requests from the recommendation engine. The EasyRec processor bundles the FeatureStore C++ SDK, which retrieves features with low latency and passes them to the model for scoring.
-
Log on to the DataWorks console and navigate to Data Development.
-
Create a new PyODPS 3 node and copy in the following deployment script:
import os
import json
config = {
"name": "fs_demo_v1",
"metadata": {
"cpu": 4,
"rpc.max_queue_size": 256,
"rpc.enable_jemalloc": 1,
"gateway": "default",
"memory": 16000
},
"model_path": f"oss://beijing0009/EasyRec/deploy/rec_sln_demo_dbmtl_v1/{args['ymd']}/export/final_with_fg",
"model_config": {
"access_key_id": f"{o.account.access_id}",
"access_key_secret": f"{o.account.secret_access_key}",
"region": "cn-beijing",
"fs_project": "fs_demo",
"fs_model": "fs_rank_v2",
"fs_entity": "item",
"load_feature_from_offlinestore": True,
"steady_mode": True,
"period": 2880,
"outputs": "probs_is_click,y_ln_playtime,probs_is_praise",
"fg_mode": "tf"
},
"processor": "easyrec-1.8",
"processor_type": "cpp"
}
with open("echo.json", "w") as f:
json.dump(config, f)
# Run for the first deployment only
os.system(f"/home/admin/usertools/tools/eascmd -i {o.account.access_id} -k {o.account.secret_access_key} -e pai-eas.cn-beijing.aliyuncs.com create echo.json")
# For scheduled updates, comment out the line above and uncomment the line below:
# os.system(f"/home/admin/usertools/tools/eascmd -i {o.account.access_id} -k {o.account.secret_access_key} -e pai-eas.cn-beijing.aliyuncs.com modify fs_demo_v1 -s echo.json")
Configure scheduling:
| Parameter | Suggested value | |
|---|---|---|
| Scheduling Parameters | Parameter Name | dt |
| Parameter Value | $[yyyymmdd-1] |
|
| Resource Properties | Resource Group for Scheduling | Select your exclusive resource group |
| Scheduling Dependencies | Select the corresponding training task and item_table_preprocess_all_feature_v1 |
|
Run the node to complete the first deployment and verify the service status.
After the first deployment succeeds, comment out the create command and uncomment the modify command, then submit the node for scheduled execution.
(Optional) View the deployed service on the Inference service tab of the Elastic Algorithm Service (EAS) page. See Deploy a custom inference service.
Configure PAI-Rec
PAI-Rec is a recommendation engine that integrates the FeatureStore Go SDK and connects directly to both FeatureStore and EAS. Configure it in three parts.
1. FeatureStoreConfs — connects PAI-Rec to your FeatureStore project:
"FeatureStoreConfs": {
"pairec-fs": {
"RegionId": "cn-beijing",
"AccessId": "${AccessKey}",
"AccessKey": "${AccessSecret}",
"ProjectName": "fs_demo"
}
}
2. FeatureConfs — specifies which features PAI-Rec retrieves and from which model:
"FeatureConfs": {
"recreation_rec": {
"AsynLoadFeature": true,
"FeatureLoadConfs": [
{
"FeatureDaoConf": {
"AdapterType": "featurestore",
"FeatureStoreName": "pairec-fs", // Must match the key in FeatureStoreConfs
"FeatureKey": "user:uid",
"FeatureStoreModelName": "fs_rank_v1", // Model feature name in FeatureStore
"FeatureStoreEntityName": "user", // Entity whose features to retrieve
"FeatureStore": "user"
}
}
]
}
}
3. AlgoConfs — connects PAI-Rec to the EAS scoring service:
To get the Url and Auth values: on the Elastic Algorithm Service (EAS) page, click the service name, then on the Overview tab, click View endpoint information. See EAS FAQ for more configuration details.
"AlgoConfs": [
{
"Name": "fs_demo_v1", // Must match the deployed EAS service name
"Type": "EAS",
"EasConf": {
"Processor": "EasyRec",
"Timeout": 300,
"ResponseFuncName": "easyrecMutValResponseFunc",
"Url": "eas_url_xxx",
"EndpointType": "DIRECT",
"Auth": "eas_token"
}
}
]
What's next
-
To manage features across multiple recommendation scenarios, see Configure a FeatureStore project.
-
To use additional data sources with FeatureStore, see Create a new data source.
-
For FeatureStore SDK documentation, see FeatureStore SDK.
-
If you have a self-built recommendation system, FeatureStore provides Python, Go, C++, and Java SDKs. Contact the team through the DingTalk group (ID: 32260796) to discuss integration options.