This topic describes how to use the FeatureStore Python software development kit (SDK) to build and publish an end-to-end recommendation system on Platform for AI (PAI) FeatureStore.
Prerequisites
Before you begin, ensure that you have completed the following preparations.
Product dependencies | Procedure |
Platform for AI (PAI) |
|
MaxCompute |
|
FeatureDB |
|
DataWorks |
|
Step 1. Data Preparation
Synchronize data tables
For a recommendation scenario, you generally need to prepare the following data tables: a user feature table, an item feature table, a label table, a sequence feature table, and a behavior table.
To simplify this tutorial, we have prepared sample user, item, label, sequence feature, and behavior tables in the pai_online_project MaxCompute project. You need to run SQL commands in DataWorks to synchronize these tables from the pai_online_project project to your own MaxCompute project. The procedure is as follows:
Log on to the DataWorks console.
In the navigation pane on the left, click Data Development & O&M > Data Development.
Select your DataWorks workspace and click Enter Data Development.
Hover over New, and choose New Node > MaxCompute > ODPS SQL. In the dialog box that appears, configure the node parameters.
Parameter
Suggested value
Engine Instance
Select your MaxCompute engine.
Node Type
ODPS SQL
Path
Business Flow/Workflow/MaxCompute
Name
You can specify a custom name.
Click Confirm.
In the node editor, run the following SQL commands to synchronize the user, item, label, sequence feature, and behavior tables from the pai_online_project project to your MaxCompute project. For the resource group, select the exclusive resource group that you created.
Synchronize the user table: rec_sln_demo_user_table_preprocess_all_feature_v1 (Click for details)
Synchronize the item table: rec_sln_demo_item_table_preprocess_all_feature_v1 (Click for details)
Synchronize the label table: rec_sln_demo_label_table (Click for details)
Synchronize the sequence feature table: rec_sln_demo_behavior_table_preprocess_sequence_wide_seq_feature_v3 (Click for details)
Synchronize the behavior table: rec_sln_demo_behavior_table_preprocess_v3 (Click for details)
After you complete these steps, the synchronized tables are available in your workspace. These five tables are used as examples in the following sections.
Configure data sources
You typically need to configure two data sources for FeatureStore: an offline data source (MaxCompute) and an online data source (FeatureDB, Hologres, or TableStore). This topic uses MaxCompute and FeatureDB as examples.
Log on to the PAI console. In the navigation pane on the left, click Data Preparation > FeatureStore.
Select a workspace and click Enter FeatureStore.
Configure a MaxCompute data source.
On the Data Source tab, click Create Store. In the dialog box that appears, configure the MaxCompute data source parameters.
Parameter
Suggested value
Type
MaxCompute
Name
Enter a custom name.
MaxCompute Project Name
Select your MaxCompute project.
After the configuration is complete, click Submit.
Configure a FeatureDB data source.
If you have already created a FeatureDB data source, you can skip this step.
On the Store tab, click Create Store. In the dialog box that appears, configure the FeatureDB data source parameters.
Parameter
Suggested value
Type
FeatureDB (If this is your first time using it, follow the on-screen instructions to activate FeatureDB)
Name
Custom names are not supported. The default value is feature_db.
Username
Set a username.
Password
Set a password.
VPC High-speed Connection (Optional)
After the configuration is successful, you can use the FeatureStore SDK in a VPC to directly access FeatureDB through a PrivateLink connection. This improves data read and write performance and reduces access latency.
VPC
Select the VPC where your online FeatureStore service is located.
Zone and vSwitch
Select a zone and vSwitch. Make sure to select the vSwitches in the zones where your online service machines are located. We recommend that you select vSwitches in at least two zones to ensure high availability and stability for your services.
After the configuration is complete, click Submit.
Step 2. Creation process using the FeatureStore Python SDK
Install the FeatureStore Python SDK. The SDK requires a Python 3 environment. We recommend running all the following code in a Jupyter Notebook environment.
! pip install https://feature-store-py.oss-cn-beijing.aliyuncs.com/package/feature_store_py-2.0.2-py3-none-any.whlYou can import the required functional modules.
import unittest
import sys
import os
from os.path import dirname, join, abspath
from feature_store_py.fs_client import FeatureStoreClient
from feature_store_py.fs_project import FeatureStoreProject
from feature_store_py.fs_datasource import UrlDataSource, MaxComputeDataSource, DatahubDataSource, HologresDataSource, SparkDataSource, LabelInput, TrainingSetOutput
from feature_store_py.fs_type import FSTYPE
from feature_store_py.fs_schema import OpenSchema, OpenField
from feature_store_py.fs_feature_view import FeatureView
from feature_store_py.fs_features import FeatureSelector
from feature_store_py.fs_config import LabelInputConfig, PartitionConfig, FeatureViewConfig, TrainSetOutputConfig, SequenceFeatureConfig, SequenceTableConfig
import logging
logger = logging.getLogger("foo")
logger.addHandler(logging.StreamHandler(stream=sys.stdout))Feature engineering project
You can create multiple independent projects in FeatureStore. For more information, see Configure a FeatureStore project. To run a Notebook, you need a FeatureStore server-side environment. After you activate FeatureStore, you must also configure data sources. For more information, see Create a new data source.
The `offline_datasource_id` and `online_datasource_id` parameters specify the offline and online data source IDs, respectively.
This topic uses a project named fs_movie as an example.
access_id = ''
access_ak = ''
region = 'cn-beijing'
fs = FeatureStoreClient(access_key_id=access_id, access_key_secret=access_ak, region=region)
cur_project_name = "fs_demo"
project = fs.get_project(cur_project_name)
if project is None:
raise ValueError("Need to create project : fs_movie")Run the following code to retrieve the current project and print its information.
project = fs.get_project(cur_project_name)
print(project)Feature entity (FeatureEntity)
A feature entity describes a set of related features. A single feature entity can be associated with multiple feature views. Each entity has a JoinId that is used to associate features from these views. Each feature view has a primary key (index key) to retrieve its feature data, but the name of the index key can be different from the name defined by the JoinId.
In a recommendation system, features are typically associated with only two feature entities: user and item. This means features belong to either the user or the item. This topic provides an example of creating user and item feature entities.
Create a user entity
user_entity_name = "user" user_join_id = 'user_id' user_entity = project.get_entity(user_entity_name) if user_entity is None: user_entity = project.create_entity(name = user_entity_name, join_id=user_join_id) user_entity.print_summary()Create an item entity
item_entity_name = "item" join_id = 'item_id' item_entity = project.get_entity(item_entity_name) if item_entity is None: item_entity = project.create_entity(name = item_entity_name, join_id=join_id) item_entity.print_summary()
Feature view (FeatureView)
FeatureStore is a platform for managing and organizing feature data. External data is ingested into FeatureStore through a feature view. A feature view defines the data source, required pre-processing or transformation operations (such as feature engineering or transformation), the data structure of the features (including feature names and types), and the storage location (online store or offline store). It also manages feature metadata, such as primary keys, event times, partition keys, feature entities, and time-to-live (TTL) settings. A TTL of -1 (the default) indicates that the data is stored permanently. A positive number means that online queries will retrieve the latest feature data within the specified TTL.
There are three types of feature views: Batch FeatureView (for offline features or T-1 day features), Stream FeatureView (for real-time features), and Sequence FeatureView (for sequence features).
Batch FeatureView (offline feature view)
A Batch FeatureView injects offline data into the offline store of FeatureStore and can synchronize the data to the online store to support real-time queries. This type of view is typically used for offline features or T-1 day features.
Register the user-side offline feature table
Register the rec_sln_demo_user_table_preprocess_all_feature_v1 table with FeatureStore.
user_feature_view_name = "user_table_preprocess_all_feature_v1" user_table_name = "rec_sln_demo_user_table_preprocess_all_feature_v1" user_feature_view = project.get_feature_view(user_feature_view_name) if user_feature_view is None: ds = MaxComputeDataSource(project.offline_datasource_id, user_table_name) user_feature_view = project.create_batch_feature_view(name=user_feature_view_name, datasource=ds, online=True, entity= user_entity_name, primary_key='user_id', register=True) print(user_feature_view)Synchronize data from the 20231023 partition of the rec_sln_demo_user_table_preprocess_all_feature_v1 table in the offline store to the online store.
user_task = user_feature_view.publish_table({'ds':'20231023'}) user_task.wait()View the task running status.
user_task.print_summary()
Register the item-side offline feature table
Register the rec_sln_demo_item_table_preprocess_all_feature_v1 table with FeatureStore.
item_feature_view_name = "item_table_preprocess_all_feature_v1" item_table_name = "rec_sln_demo_item_table_preprocess_all_feature_v1" item_feature_view = project.get_feature_view(item_feature_view_name) if item_feature_view is None: ds = MaxComputeDataSource(project.offline_datasource_id, item_table_name) item_feature_view = project.create_batch_feature_view(name=item_feature_view_name, datasource=ds, online = True, entity= item_entity_name, primary_key='item_id', register=True) print(item_feature_view)Synchronize data from the 20231023 partition of the rec_sln_demo_item_table_preprocess_all_feature_v1 table in the offline store to the online store.
item_task = item_feature_view.publish_table({'ds':'20231023'}) item_task.wait()View the task running status.
item_task.print_summary()
Sequence FeatureView (real-time sequence view)
A Sequence FeatureView supports writing offline sequence features and querying real-time sequence features. In a typical recommendation scenario, the offline sequence feature table (F1) is initially generated from simulated data and can later be replaced by online logs. During an online real-time sequence query, data is queried from two online behavior tables: the T-1 day behavior table (B1) and the real-time behavior table for day T (B2). B2 contains features that are updated in real-time. After data is queried from the B1 and B2 tables, a feature sequence is constructed for the user. This sequence is then combined with other features and sent to the model for scoring.
The online T-1 day behavior table (B1) is typically synchronized from the offline T-1 day behavior table (A1). During synchronization, FeatureStore automatically performs operations such as deduplication. You must write data to the online day T behavior table (B2) using API operations or other Alibaba Cloud products, such as Flink.
Therefore, when you register a real-time feature view, FeatureStore simultaneously manages four tables: the offline sequence table (F1), the offline T-1 day behavior table (A1), the online T-1 day behavior table (B1), and the online day T behavior table (B2).
During registration, you only need to provide the offline sequence table (F1) and the offline T-1 day behavior table (A1). FeatureStore handles the online behavior tables and the synchronization and deduplication process.
Register a real-time feature view.
seq_feature_view_name = "wide_seq_feature_v3" seq_feature_view = project.get_feature_view(seq_feature_view_name) if seq_feature_view is None: seq_table_name = "rec_sln_demo_behavior_table_preprocess_sequence_wide_seq_feature_v3" behavior_table_name = 'rec_sln_demo_behavior_table_preprocess_v3' ds = MaxComputeDataSource(project.offline_datasource_id, behavior_table_name) event_time = 'event_unix_time' # The name of the event time field in the behavior table. item_id = 'item_id' # The name of the item_id field in the behavior table. event = 'event' # The name of the event field in the behavior table. # deduplication_method = 1 indicates deduplication based on ['user_id', 'item_id', 'event']. # deduplication_method = 2 indicates deduplication based on ['user_id', 'item_id', 'event', 'event_time']. sequence_feature_config_list = [SequenceFeatureConfig(offline_seq_name='click_seq_50_seq', seq_event='click', online_seq_name='click_seq_50', seq_len=50)] # offline_seq_name is the name of the sequence feature field in the offline sequence table. seq_event is the name of the behavior field. # online_seq_name is the name used for the sequence of item_ids for the user, which is retrieved by the FeatureStore online Go SDK. # seq_len is the sequence length. Sequences longer than this length are truncated. seq_table_config = SequenceTableConfig(table_name=seq_table_name, primary_key='user_id', event_time='event_unix_time') seq_feature_view = project.create_sequence_feature_view(seq_feature_view_name, datasource=ds, event_time=event_time, item_id=item_id, event=event, deduplication_method=1, sequence_feature_config=sequence_feature_config_list, sequence_table_config=seq_table_config, entity=user_entity_name) # seq_feature_view.print_summary() print(seq_feature_view)Synchronize data from the 20231023 partition of the rec_sln_demo_behavior_table_preprocess_v3 table in the offline store to the online store. During synchronization, the system automatically checks for data from the previous N days' partitions. If data is missing, it is automatically backfilled. You can specify N using
days_to_load. The default value is 30, which is sufficient for most scenarios.seq_task = seq_feature_view.publish_table({'ds':'20231023'}, days_to_load=30) seq_task.wait()View the task running status.
seq_task.print_summary()
Stream FeatureView (real-time feature view)
Writing data directly to the OnlineStore and simultaneously syncing it to the OfflineStore is ideal for scenarios that require real-time feature updates, such as updating product prices and sales volumes.
Register the label table
label_table_name = 'rec_sln_demo_label_table'
ds = MaxComputeDataSource(data_source_id=project.offline_datasource_id, table=label_table_name)
label_table = project.get_label_table(label_table_name)
if label_table is None:
label_table = project.create_label_table(datasource=ds, event_time='event_unix_time')
print(label_table)Get online features
You can retrieve online features to troubleshoot data consistency between the offline and online stores, perform data analytics, and for other purposes. Hologres is the recommended data source.
user_feature_view_name = "user_table_preprocess_all_feature_v1"
user_feature_view = project.get_feature_view(user_feature_view_name)
ret_features_1 = user_feature_view.get_online_features(join_ids={'user_id':['169898460', '148811946']}, features=['user_id', 'gender', 'age', 'city'])
print("ret_features = ", ret_features_1)TrainingSet
When you train a model, you must first construct a sample table. The sample table consists of label data and feature data. To interact with FeatureStore, you must provide the label data and define the names of the features that you want to retrieve. A point-in-time join is then performed based on the primary key and event time, if an event time exists.
# Specify the label table.
label_table_name = 'rec_sln_demo_label_table'
output_ds = MaxComputeDataSource(data_source_id=project.offline_datasource_id)
train_set_output = TrainingSetOutput(output_ds)user_feature_view_name = "user_table_preprocess_all_feature_v1"
user_feature_selector = FeatureSelector(user_feature_view_name, '*') # '*' selects all features.
item_feature_view_name = "item_table_preprocess_all_feature_v1"
item_feature_selector = FeatureSelector(item_feature_view_name, '*')
seq_feature_view_name = "wide_seq_feature_v3"
seq_feature_selector = FeatureSelector(seq_feature_view_name, ['click_seq_50_seq'])
train_set = project.create_training_set(label_table_name=label_table_name, train_set_output= train_set_output, feature_selectors=[user_feature_selector, item_feature_selector, seq_feature_selector])
print("train_set = ", train_set)Model features
After you train a model and deploy it as a service, you can use it for business prediction. The training samples can be obtained from the train_set mentioned previously.
model_name = "fs_rank_v2"
cur_model = project.get_model(model_name)
if cur_model is None:
cur_model = project.create_model(model_name, train_set)
print("cur_model_train_set_table_name = ", cur_model.train_set_table_name)Step 3. Export the sample table and train the model
For actual training, you need to export the sample table.
Export the sample table
Specify the label table and the partitions and event_time for each feature view.
cur_day = '20231024'
pre_day = '20231023'
label_partitions = PartitionConfig(name = 'ds', value = cur_day)
label_input_config = LabelInputConfig(partition_config=label_partitions)
user_partitions = PartitionConfig(name = 'ds', value = pre_day)
feature_view_user_config = FeatureViewConfig(name = 'user_table_preprocess_all_feature_v1',
partition_config=user_partitions)
item_partitions = PartitionConfig(name = 'ds', value = pre_day)
feature_view_item_config = FeatureViewConfig(name = 'item_table_preprocess_all_feature_v1',
partition_config=item_partitions)
seq_partitions = PartitionConfig(name = 'ds', value = cur_day)
feature_view_seq_config = FeatureViewConfig(name = 'wide_seq_feature_v3', partition_config=seq_partitions, event_time='event_unix_time', equal=True)
feature_view_config_list = [feature_view_user_config, feature_view_item_config, feature_view_seq_config]
train_set_partitions = PartitionConfig(name = 'ds', value = cur_day)
train_set_output_config = TrainSetOutputConfig(partition_config=train_set_partitions)
model_name = 'fs_rank_v2'
cur_model = project.get_model(model_name)
task = cur_model.export_train_set(label_input_config, feature_view_config_list, train_set_output_config)
task.wait()
print("task_summary = ", task.task_summary)Train the model
EasyRec is an open source recommendation system framework that seamlessly integrates with FeatureStore for model training, exporting, and publishing. We recommend that you use the fs_demo_fs_rank_v2_training_set table as input to train a model with EasyRec.
For the EasyRec open source code, see EasyRec.
For the EasyRec documentation, see EasyRec Introduction.
For documentation related to EasyRec training, see EasyRec Training.
If you have questions about EasyRec, join the DingTalk group (32260796) to contact us.
Step 4. Publish the model
After you train and export the model, you can deploy and publish it. If you have a self-built recommendation system, FeatureStore provides Python, Go, C++, and Java SDKs that can integrate with your system. You can also contact us through the DingTalk group (32260796) to discuss specific solutions. If you use Alibaba Cloud products, you can seamlessly integrate them with FeatureStore to quickly build and publish a recommendation system.
This topic uses Alibaba Cloud products as an example to describe how to publish a model.
Schedule data synchronization nodes
Before publishing, you must schedule the data synchronization nodes to regularly synchronize data from the offline store to the online store.
Log on to the DataWorks console.
In the navigation pane on the left, click Data Development & O&M > Data Development.
Select your DataWorks workspace and click Enter Data Development.
Routinely sync the user table.
Hover over New, and choose New Node > MaxCompute > PyODPS 3.
In the dialog box that appears, configure the node parameters and click Confirm.
Copy the following content into the script to complete the scheduled synchronization of user_table_preprocess_all_feature_v1.
In the right-side pane, click Scheduling Configuration. In the dialog box that appears, configure the scheduling parameters.
Parameter
Suggested value
Scheduling Parameters
Parameter Name
dt
Parameter Value
$[yyyymmdd-1]
Resource Properties
Resource Group for Scheduling
Select your exclusive resource group for scheduling.
Scheduling Dependencies
Select the user table that you created.
After you configure and test the node, save and submit the node configuration.
Perform a data backfill operation. For more information, see Synchronize data tables.
Schedule the item table synchronization.
Hover over New, and choose New Node > MaxCompute > PyODPS 3.
In the dialog box that appears, configure the node parameters and click Confirm.
Copy the following content into the script.
In the right-side pane, click Scheduling Configuration. In the dialog box that appears, configure the scheduling parameters.
Parameter
Suggested value
Scheduling Parameters
Parameter Name
dt
Parameter Value
$[yyyymmdd-1]
Resource Properties
Resource Group for Scheduling
Select your exclusive resource group for scheduling.
Scheduling Dependencies
Select the item table that you created.
After you configure and test the node, save and submit the node configuration.
Perform a data backfill operation. For more information, see Synchronize data tables.
Schedule the real-time sequence behavior table synchronization.
Hover over New, and choose New Node > MaxCompute > PyODPS 3.
In the dialog box that appears, configure the node parameters and click Confirm.
Copy the following content into the script.
In the right-side pane, click Scheduling Configuration. In the dialog box that appears, configure the scheduling parameters.
Parameter
Suggested value
Scheduling Parameters
Parameter Name
dt
Parameter Value
$[yyyymmdd-1]
Resource Properties
Resource Group for Scheduling
Select your exclusive resource group for scheduling.
Scheduling Dependencies
Select the item table that you created.
After you configure and test the node, save and submit the node configuration.
Perform a data backfill operation. For more information, see Synchronize data tables.
After the synchronization is complete, you can view the latest synchronized features in Hologres.
Create and deploy an EAS model service
The model service receives requests from the recommendation engine, scores the item set based on the request, and returns the scores. The EasyRec processor includes the FeatureStore C++ SDK, which allows for low-latency, high-performance feature retrieval. The EasyRec processor retrieves features from the FeatureStore C++ SDK, sends them to the model for inference, retrieves the scores, and returns them to the recommendation engine.
The procedure for deploying the model service is as follows.
Log on to the DataWorks console.
In the navigation pane on the left, click Data Development & O&M > Data Development.
Select your DataWorks workspace and click Enter Data Development.
Hover over New, and choose New Node > MaxCompute > PyODPS 3.
In the dialog box that appears, configure the node parameters and click Confirm.
Copy the following content into the script.
import os import json config = { "name": "fs_demo_v1", "metadata": { "cpu": 4, "rpc.max_queue_size": 256, "rpc.enable_jemalloc": 1, "gateway": "default", "memory": 16000 }, "model_path": f"oss://beijing0009/EasyRec/deploy/rec_sln_demo_dbmtl_v1/{args['ymd']}/export/final_with_fg", "model_config": { "access_key_id": f'{o.account.access_id}', "access_key_secret": f'{o.account.secret_access_key}', "region": "cn-beijing", "fs_project": "fs_demo", "fs_model": "fs_rank_v2", "fs_entity": "item", "load_feature_from_offlinestore": True, "steady_mode": True, "period": 2880, "outputs": "probs_is_click,y_ln_playtime,probs_is_praise", "fg_mode": "tf" }, "processor": "easyrec-1.8", "processor_type": "cpp" } with open("echo.json", "w") as output_file: json.dump(config, output_file) # Run this line for the first deployment. os.system(f"/home/admin/usertools/tools/eascmd -i {o.account.access_id} -k {o.account.secret_access_key} -e pai-eas.cn-beijing.aliyuncs.com create echo.json") # Run the following line for scheduled updates. # os.system(f"/home/admin/usertools/tools/eascmd -i {o.account.access_id} -k {o.account.secret_access_key} -e pai-eas.cn-beijing.aliyuncs.com modify fs_demo_v1 -s echo.json")In the right-side pane, click Scheduling Configuration. In the dialog box that appears, configure the scheduling parameters.
Parameter
Suggested value
Scheduling Parameters
Parameter Name
dt
Parameter Value
$[yyyymmdd-1]
Resource Properties
Resource Group for Scheduling
Select your exclusive resource group for scheduling.
Scheduling Dependencies
Select the corresponding training task and item_table_preprocess_all_feature_v1.
After you configure and test the node, run it to view the deployment status.
After the deployment is complete, comment out line 34, uncomment line 37, and submit the task for scheduled execution.
(Optional) You can view the deployed service on the Inference Service tab of the Elastic Algorithm Service (EAS) page. For more information, see Custom deployment.
Configure PAI-Rec
PAI-Rec is a recommendation engine service that integrates the FeatureStore Go SDK. It can seamlessly connect with FeatureStore and EAS.
The configuration procedure is as follows.
Configure FeatureStoreConfs.
RegionId: Change this to the region where your product is located. This topic uses cn-beijing as an example.ProjectName: The name of your FeatureStore project. In this example, the project name is fs_demo.
"FeatureStoreConfs": { "pairec-fs": { "RegionId": "cn-beijing", "AccessId": "${AccessKey}", "AccessKey": "${AccessSecret}", "ProjectName": "fs_demo" } },Configure FeatureConfs.
FeatureStoreName: Keep this consistent with the pairec-fs setting in the previous FeatureStoreConfs step.FeatureStoreModelName: The name of your model feature. In this example, the model feature name is fs_rank_v1.FeatureStoreEntityName: The name of your feature entity, user. This indicates that the PAI-Rec engine uses the FeatureStore Go SDK to retrieve the user features for the fs_rank_v1 model.
"FeatureConfs": { "recreation_rec": { "AsynLoadFeature": true, "FeatureLoadConfs": [ { "FeatureDaoConf": { "AdapterType": "featurestore", "FeatureStoreName": "pairec-fs", "FeatureKey": "user:uid", "FeatureStoreModelName": "fs_rank_v1", "FeatureStoreEntityName": "user", "FeatureStore": "user" } } ] } },Configure AlgoConfs.
This configuration tells PAI-Rec which EAS model scoring service to connect to.
Name: Must be the same as the name of the deployed EAS service.UrlandAuth: This is the information provided by the EAS service. On the Elastic Algorithm Service (EAS) page, click the service name. Then, on the Overview tab, click View Endpoint Information to retrieve the URL and token. For more configuration details, see EAS FAQ.
"AlgoConfs": [ { "Name": "fs_demo_v1", "Type": "EAS", "EasConf": { "Processor": "EasyRec", "Timeout": 300, "ResponseFuncName": "easyrecMutValResponseFunc", "Url": "eas_url_xxx", "EndpointType": "DIRECT", "Auth": "eas_token" } } ],