All Products
Search
Document Center

Platform For AI:Use FeatureStore SDK for Python to build a recommendation system

Last Updated:Nov 14, 2024

This topic describes how to use FeatureStore SDK for Python to build and publish a recommendation system.

Prerequisites

Before you perform the operations described in this topic, make sure that the following requirements are met.

Service

Requirement

Platform for AI (PAI)

MaxCompute

Hologres

DataWorks

Step 1: Prepare data

Synchronize data from simulated tables

A recommendation system requires the following tables: user feature table, item feature table, label table, sequence feature table, and behavior table.

This topic uses the simulated tables in the MaxCompute project pai_online_project as examples to show how to use FeatureStore SDK for Python. You must execute SQL statements in DataWorks to synchronize the simulated tables from the pai_online_project project to your MaxCompute project. To synchronize the simulated tables, perform the following steps:

  1. Log on to the DataWorks console.

  2. In the left-side navigation pane, choose Data Development and Governance > DataStudio.

  3. On the DataStudio page, select the DataWorks workspace that you created and click Go to DataStudio.

  4. Move the pointer over the Create button and choose Create Node > MaxCompute > ODPS SQL. In the Create Node dialog box, configure the node parameters described in the following table.

    Parameter

    Description

    Engine Instance

    Select the MaxCompute engine that you created.

    Node Type

    Select ODPS SQL from the Node Type drop-down list.

    Path

    Choose Business Flow > Workflow > MaxCompute.

    Name

    Specify a name for the node.

  5. Click Confirm.

  6. On the tab of the node that you created, run the following SQL statements to synchronize the simulated user table, item table, label table, sequence table, and behavior table from the pai_online_project project to your MaxCompute project. Select the exclusive resource group that you created as the resource group.

    Synchronize the user table rec_sln_demo_user_table_preprocess_all_feature_v1 (Click to view details)

    CREATE TABLE IF NOT EXISTS rec_sln_demo_user_table_preprocess_all_feature_v1
    like pai_online_project.rec_sln_demo_user_table_preprocess_all_feature_v1
    STORED AS ALIORC  
    LIFECYCLE 90;
    
    INSERT OVERWRITE TABLE rec_sln_demo_user_table_preprocess_all_feature_v1 PARTITION (ds)
    SELECT *
    FROM pai_online_project.rec_sln_demo_user_table_preprocess_all_feature_v1
    WHERE ds >= '20231022' and ds <='20231024'

    The data in the following partitions is synchronized:

    • ds=20231022

    • ds=20231023

    • ds=20231024

    Synchronize the item table rec_sln_demo_item_table_preprocess_all_feature_v1 (Click to view details)

    CREATE TABLE IF NOT EXISTS rec_sln_demo_item_table_preprocess_all_feature_v1
    like pai_online_project.rec_sln_demo_item_table_preprocess_all_feature_v1
    STORED AS ALIORC  
    LIFECYCLE 90;
    
    INSERT OVERWRITE TABLE rec_sln_demo_item_table_preprocess_all_feature_v1 PARTITION(ds)
    SELECT *
    FROM pai_online_project.rec_sln_demo_item_table_preprocess_all_feature_v1
    WHERE ds >= '20231022' and ds <='20231024'

    The data in the following partitions is synchronized:

    • ds=20231022

    • ds=20231023

    • ds=20231024

    Synchronize the label table rec_sln_demo_label_table (Click to view details)

    CREATE TABLE IF NOT EXISTS rec_sln_demo_label_table
    like pai_online_project.rec_sln_demo_label_table
    STORED AS ALIORC  
    LIFECYCLE 90;
    
    INSERT OVERWRITE TABLE rec_sln_demo_label_table PARTITION (ds)
    SELECT *
    FROM pai_online_project.rec_sln_demo_label_table
    WHERE ds >= '20231022' and ds <='20231024'

    The data in the following partitions is synchronized:

    • ds=20231022

    • ds=20231023

    • ds=20231024

    Synchronize the sequence table rec_sln_demo_behavior_table_preprocess_sequence_wide_seq_feature_v3 (Click to view details)

    CREATE TABLE IF NOT EXISTS rec_sln_demo_behavior_table_preprocess_sequence_wide_seq_feature_v3
    like pai_online_project.rec_sln_demo_behavior_table_preprocess_sequence_wide_seq_feature_v3
    STORED AS ALIORC  
    LIFECYCLE 90;
    
    INSERT OVERWRITE TABLE rec_sln_demo_behavior_table_preprocess_sequence_wide_seq_feature_v3 PARTITION(ds)
    SELECT *
    FROM pai_online_project.rec_sln_demo_behavior_table_preprocess_sequence_wide_seq_feature_v3
    WHERE ds >= '20231022' and ds <='20231024'

    The data in the following partitions is synchronized:

    • ds=20231022

    • ds=20231023

    • ds=20231024

    Synchronize the behavior table rec_sln_demo_behavior_table_preprocess_v3 (Click to view details)

    CREATE TABLE IF NOT EXISTS rec_sln_demo_behavior_table_preprocess_v3
    like pai_online_project.rec_sln_demo_behavior_table_preprocess_v3
    STORED AS ALIORC  
    LIFECYCLE 90;
    
    
    INSERT OVERWRITE TABLE rec_sln_demo_behavior_table_preprocess_v3 PARTITION(ds)
    SELECT *
    FROM pai_online_project.rec_sln_demo_behavior_table_preprocess_v3
    WHERE ds >= '20231022' and ds <='20231024'

After you perform the preceding operations, you can view the synchronized tables in your workspace. These tables are used as examples in the following sections.

Configure data stores

In most cases, you need to configure an offline data store, such as a MaxCompute project, and an online data store, such as a Hologres instance, a GraphCompute instance, or a Tablestore instance, in FeatureStore. In this example, a MaxCompute project is configured as an offline data store, and a Hologres instance is configured as an online data store.

  1. Log on to the PAI console. In the left-side navigation pane, choose Data Preparation > FeatureStore.

  2. On the FeatureStore page, select a workspace from the drop-down list and click Enter FeatureStore.

  3. Configure a MaxCompute data store.

    1. On the Store tab, click Create Store. In the Create Store panel, configure the parameters described in the following table.

      Parameter

      Description

      Type

      Select MaxCompute from the drop-down list.

      Name

      Specify a name for the data store.

      MaxCompute Project Name

      Select the MaxCompute project that you created.

    2. Copy the authorization statement and click Go to to execute the copied statement in DataWorks. In this way, the Hologres instance is authorized to synchronize data from the MaxCompute project.

      Note

      To grant permissions to the Hologres instance, make sure that your account has admin permissions. For more information, see Manage user permissions by using commands or Manage user permissions in the MaxCompute console (new version).

    3. Click Submit.

  4. Configure a Hologres data store.

    1. On the Store tab, click Create Store. In the Create Store panel, configure the parameters described in the following table.

      Parameter

      Description

      Type

      Select Hologres from the drop-down list.

      Name

      Specify a name for the data store.

      Instance ID

      Select the Hologres instance that you created.

      Database Name

      Select the database that you created in the Hologres instance.

    2. Click Submit.

    3. Grant the permissions to access the Hologres instance. For more information, see Configure data stores.

Step 2: Configure FeatureStore SDK for Python

Install FeatureStore SDK for Python. The SDK requires Python 3. We recommend that you run the following sample code in Jupyter Notebook.

!  pip install https://feature-store-py.oss-cn-beijing.aliyuncs.com/package/feature_store_py-1.3.1-py3-none-any.whl

Import the required functional modules.

import unittest
import sys
import os
from os.path import dirname, join, abspath
from feature_store_py.fs_client import FeatureStoreClient, build_feature_store_client
from feature_store_py.fs_project import FeatureStoreProject
from feature_store_py.fs_datasource import UrlDataSource, MaxComputeDataSource, DatahubDataSource, HologresDataSource, SparkDataSource, LabelInput, TrainingSetOutput
from feature_store_py.fs_type import FSTYPE
from feature_store_py.fs_schema import OpenSchema, OpenField
from feature_store_py.fs_feature_view import FeatureView
from feature_store_py.fs_features import FeatureSelector
from feature_store_py.fs_config import LabelInputConfig, PartitionConfig, FeatureViewConfig, TrainSetOutputConfig, SequenceFeatureConfig, SequenceTableConfig
import logging
logger = logging.getLogger("foo")
logger.addHandler(logging.StreamHandler(stream=sys.stdout))

FeatureStore project

You can create multiple projects in FeatureStore. Each project is independent. For more information, see Configure FeatureStore projects. To run the sample code in Jupyter Notebook, specific configurations are required in FeatureStore. For example, you must configure data stores in FeatureStore. For more information, see Configure data stores.

In the sample code in this topic, the offline_datasource_id parameter specifies the ID of the offline data store, and the online_datasource_id parameter specifies the ID of the online data store.

The following sample code uses the fs_movie project as an example to show how to configure a FeatureStore project.

access_id = ''
access_ak = ''
region = 'cn-beijing'
fs = FeatureStoreClient(access_key_id=access_id, access_key_secret=access_ak, region=region)
cur_project_name = "fs_demo"
project = fs.get_project(cur_project_name)
if project is None:
  raise ValueError("Need to create project : fs_movie")

Obtain the project and print the project information.

project = fs.get_project(cur_project_name)
project.print_summary()

Feature entity

A feature entity is a collection of semantically related features. A feature entity can be associated with multiple feature views. Each entity has a join ID. You can use join IDs to associate features across multiple feature views. Each feature view has a primary key (index) that can be used to retrieve features. The primary key can be different from the name of the join ID.

Typically, a recommendation system has two feature entities: user and item. The following sample code provides an example on how to create user and item entities:

  • Create user entity

    user_entity_name = "user"
    user_join_id = 'user_id'
    user_entity = project.get_entity(user_entity_name)
    if user_entity is None:
     user_entity = project.create_entity(name = user_entity_name, join_id=user_join_id)
    user_entity.print_summary()
    
  • Create item entity

    item_entity_name = "item"
    join_id = 'item_id'
    item_entity = project.get_entity(item_entity_name)
    if item_entity is None:
     item_entity = project.create_entity(name = item_entity_name, join_id=join_id)
    item_entity.print_summary()

Feature view

You can create feature views to ingest feature data from external sources into FeatureStore for centralized management. A feature view contains all information required to manage features, including the data source, required transformations, feature table schema, and online and offline data stores. A feature view also allows you to manage the metadata, including primary keys, event time, partition fields, feature entities, and time to live (TTL). TTL is a parameter that specifies the period of time during which feature data is available in the online data store. The default value of -1 specifies that the online data store retains all feature data. A positive value specifies that the online data store retains only the most recent feature data within the specified time period.

FeatureStore supports the following types of feature views: batch feature view, stream feature view, and sequence feature view.

Batch feature view

A batch feature view allows you to ingest offline features into the offline data store and synchronize offline features to the online data store. This way, you can query features in real time.

  • Register the user table.

    1. Register the user table rec_sln_demo_user_table_preprocess_all_feature_v1 in FeatureStore.

      user_feature_view_name = "user_table_preprocess_all_feature_v1"
      user_table_name = "rec_sln_demo_user_table_preprocess_all_feature_v1"
      user_feature_view = project.get_feature_view(user_feature_view_name)
      if user_feature_view is None:
       ds = MaxComputeDataSource(project.offline_datasource_id, user_table_name)
       user_feature_view = project.create_batch_feature_view(name=user_feature_view_name, datasource=ds, online=True, entity= user_entity_name, primary_key='user_id', register=True)
      print(user_feature_view)
    2. Synchronize data in the ds=20231023 partition from the offline data store to the online data store.

      user_task = user_feature_view.publish_table({'ds':'20231023'})
      user_task.wait()
    3. View the task running status.

      user_task.print_summary()
  • Register the item table.

    1. Register the item table rec_sln_demo_item_table_preprocess_all_feature_v1 in FeatureStore.

      item_feature_view_name = "item_table_preprocess_all_feature_v1"
      item_table_name = "rec_sln_demo_item_table_preprocess_all_feature_v1"
      item_feature_view = project.get_feature_view(item_feature_view_name)
      if item_feature_view is None:
        ds = MaxComputeDataSource(project.offline_datasource_id, item_table_name)
        item_feature_view = project.create_batch_feature_view(name=item_feature_view_name, datasource=ds, online = True, entity= item_entity_name, primary_key='item_id', register=True)
      print(item_feature_view)
    2. Synchronize data in the ds=20231023 partition from the offline data store to the online data store.

      item_task = item_feature_view.publish_table({'ds':'20231023'})
      item_task.wait()
    3. View the task running status.

      item_task.print_summary()

Sequence feature view

A sequence feature view allows you to write sequence features offline and read real-time sequence features online. When you build a recommendation system, you can register a simulated offline sequence table (F1 table) in FeatureStore. The data in the simulated table is subsequently replaced by online logs. To query real-time sequence features, the system retrieves data from the online behavior table of the current day (B2 table) and the online behavior table of the previous day (B1 table). The data retrieved from these two tables is concatenated to generate sequence features, which are then sent to a model for scoring.

The B1 table synchronizes data from the offline behavior table of the previous day (A1 table). The duplicated data is automatically filtered by FeatureStore during synchronization. The B2 table contains the real-time behavior features. You can call API operations or use Realtime Compute for Apache Flink to write real-time data to the B2 table.

After you create a sequence feature view, FeatureStore manages the F1, A1, B1, and B2 tables in a centralized manner.

You need to register only the F1 and A1 tables in the offline data store to the sequence feature view. FeatureStore automatically creates the online behavior tables (B1 and B2 tables) by synchronization and deduplication.

  1. Create a sequence feature view.

    seq_feature_view_name = "wide_seq_feature_v3"
    seq_feature_view = project.get_feature_view(seq_feature_view_name)
    if seq_feature_view is None:
      seq_table_name = "rec_sln_demo_behavior_table_preprocess_sequence_wide_seq_feature_v3"
      behavior_table_name = 'rec_sln_demo_behavior_table_preprocess_v3'
      ds = MaxComputeDataSource(project.offline_datasource_id, behavior_table_name)
      event_time='event_unix_time ' # The name of the event time field in the behavior table.
      item_id = 'item_id' # The name of the item_id field in the behavior table.
      event = 'event' # The name of the event field in the behavior table.
      # deduplication_method = 1 specifies to deduplicate data based on the user_id, item_id, and event fields.
      # deduplication_method = 2 specifies to deduplicate data based on the user_id, item_id, event, and event_time fields.
      sequence_feature_config_list = [SequenceFeatureConfig(offline_seq_name='click_seq_50_seq', seq_event='click', online_seq_name='click_seq_50', seq_len=50)]
      # The offline_seq_name parameter specifies the name of the sequence feature field in the offline sequence table. The seq_event parameter specifies the name of the event field. The online_seq_name parameter specifies the field name for the online sequence features retrieved by using FeatureStore SDK for Go. 
      # The seq_len parameter specifies the maximum length of the sequence feature. Sequences that exceed the limit are truncated. 
      seq_table_config = SequenceTableConfig(table_name=seq_table_name, primary_key='user_id', event_time='event_unix_time')
      seq_feature_view = project.create_sequence_feature_view(seq_feature_view_name, datasource=ds,
                                                  event_time=event_time, item_id=item_id, event=event, deduplication_method=1,
                                                  sequence_feature_config=sequence_feature_config_list, sequence_table_config=seq_table_config, entity=user_entity_name)
    # seq_feature_view.print_summary()
    print(seq_feature_view)
  2. Synchronize data in the ds=20231023 partition of the behavior table rec_sln_demo_behavior_table_preprocess_v3 from the offline data store to the online data store. During synchronization, FeatureStore automatically checks and loads historical data if the data is missing. The days_to_load parameter specifies the number of previous days for which FeatureStore checks the data availability. The default value of this parameter is 30.

    seq_task = seq_feature_view.publish_table({'ds':'20231023'}, days_to_load=30)
    seq_task.wait()
  3. View the task running status.

    seq_task.print_summary()

Stream feature view

A stream feature view allows you to write real-time features to the online data store and synchronize data to the offline data store. This works for scenarios where features are updated in real time, such as the price and sales of goods.

Register the label table

label_table_name = 'rec_sln_demo_label_table'
ds = MaxComputeDataSource(data_source_id=project.offline_datasource_id, table=label_table_name)
label_table = project.get_label_table(label_table_name)
if label_table is None:
  label_table = project.create_label_table(datasource=ds, event_time='event_unix_time')
print(label_table)

Retrieve online features

You can retrieve online features to analyze data and check whether features are consistent in the online and offline data stores.

user_feature_view_name = "user_table_preprocess_all_feature_v1"
user_feature_view = project.get_feature_view(user_feature_view_name)
ret_features_1 = user_feature_view.get_online_features(join_ids={'user_id':['169898460', '148811946']}, features=['user_id', 'gender', 'age', 'city'])
print("ret_features = ", ret_features_1)

Training dataset

You can use FeatureStore to generate a training dataset for model training. A training dataset contains labels and features. You must prepare the labels for model training and define the features that the model needs to fetch from the feature views. The labels are associated with the features by using point-in-time joins based on the primary keys.

# Specify the label table.
label_table_name = 'rec_sln_demo_label_table'

output_ds = MaxComputeDataSource(data_source_id=project.offline_datasource_id)
train_set_output = TrainingSetOutput(output_ds)
user_feature_view_name = "user_table_preprocess_all_feature_v1"
user_feature_selector=FeatureSelector (user_feature_view_name, '*') # '*' Specifies all features.
item_feature_view_name = "item_table_preprocess_all_feature_v1"
item_feature_selector = FeatureSelector(item_feature_view_name, '*')
seq_feature_view_name = "wide_seq_feature_v3"
seq_feature_selector = FeatureSelector(seq_feature_view_name, ['click_seq_50_seq'])
train_set = project.create_training_set(label_table_name=label_table_name, train_set_output= train_set_output, feature_selectors=[user_feature_selector, item_feature_selector, seq_feature_selector])
print("train_set = ", train_set)

Model features

You can train a model on the training dataset generated by FeatureStore and deploy the trained model as an inference service in PAI.

model_name = "fs_rank_v2"
cur_model = project.get_model(model_name)
if cur_model is None:
  cur_model = project.create_model(model_name, train_set)
print("cur_model_train_set_table_name = ", cur_model.train_set_table_name)

Step 3: Export a training dataset and train a model

You can export a training dataset from FeatureStore for model training.

Export a training dataset

Specify the label table and the event time and partition of each feature view.

cur_day = '20231024'
pre_day = '20231023'
label_partitions = PartitionConfig(name = 'ds', value = cur_day)
label_input_config = LabelInputConfig(partition_config=label_partitions)

user_partitions = PartitionConfig(name = 'ds', value = pre_day)
feature_view_user_config = FeatureViewConfig(name = 'user_table_preprocess_all_feature_v1',
partition_config=user_partitions)

item_partitions = PartitionConfig(name = 'ds', value = pre_day)
feature_view_item_config = FeatureViewConfig(name = 'item_table_preprocess_all_feature_v1',
partition_config=item_partitions)

seq_partitions = PartitionConfig(name = 'ds', value = cur_day)
feature_view_seq_config = FeatureViewConfig(name = 'wide_seq_feature_v3', partition_config=seq_partitions, event_time='event_unix_time', equal=True)
feature_view_config_list = [feature_view_user_config, feature_view_item_config, feature_view_seq_config]
train_set_partitions = PartitionConfig(name = 'ds', value = cur_day)
train_set_output_config = TrainSetOutputConfig(partition_config=train_set_partitions)


model_name = 'fs_rank_v2'
cur_model = project.get_model(model_name)
task = cur_model.export_train_set(label_input_config, feature_view_config_list, train_set_output_config)
task.wait()
print("task_summary = ", task.task_summary)

Train a model

EasyRec is an open source recommendation system framework that can be seamlessly connected to FeatureStore to train, export, and publish models. We recommend that you use EasyRec to train your model on the fs_demo_fs_rank_v1_trainning_set training dataset.

  • For more information about the open source code of EasyRec, see EasyRec.

  • For more information about EasyRec, see What is EasyRec?

  • For more information about how to use EasyRec to train models, see train_config.

If you have other questions about EasyRec, join the DingTalk group (ID: 32260796) for technical support.

Step 4: Publish a model

After you train and export your model, you can deploy and publish the model. If you use a self-managed recommendation system, you can use FeatureStore SDK for Python, FeatureStore SDK for Go, FeatureStore SDK for C++, or FeatureStore SDK for Java to connect your recommendation system to FeatureStore. You can join the DingTalk group (ID 32260796) for technical support on how to connect your recommendation system to FeatureStore. FeatureStore is seamlessly integrated with other Alibaba Cloud services. You can use Alibaba Cloud services to quickly build and publish a recommendation system.

In this example, Alibaba Cloud services are used to publish a model.

Configure data synchronization nodes

Before you publish a model, you must configure data synchronization nodes so that they can regularly synchronize data from the offline data store to the online data store. To configure a data synchronization node, perform the following steps:

  1. Log on to the DataWorks console.

  2. In the left-side navigation pane, choose Data Development and Governance > DataStudio.

  3. On the DataStudio page, select the DataWorks workspace that you created and click Go to DataStudio.

  4. Synchronize the user table on a regular basis.

    1. Move the pointer over the Create button and choose Create Node > MaxCompute > PyODPS 3.

    2. In the Create Node dialog box, configure the node parameters and click Confirm.

    3. Copy the following code to the code editor to synchronize data from the user_table_preprocess_all_feature_v1 feature view on a regular basis:

      Synchronize data from the user_table_preprocess_all_feature_v1 feature view (Click to view details)

      from feature_store_py.fs_client import FeatureStoreClient
      import datetime
      from feature_store_py.fs_datasource import MaxComputeDataSource
      import sys
      
      cur_day = args['dt']
      print('cur_day = ', cur_day)
      
      access_key_id = o.account.access_id
      access_key_secret = o.account.secret_access_key
      fs = FeatureStoreClient(access_key_id=access_key_id, access_key_secret=access_key_secret, region='cn-beijing')
      cur_project_name = 'fs_demo'
      project = fs.get_project(cur_project_name)
      
      feature_view_name = 'user_table_preprocess_all_feature_v1'
      batch_feature_view = project.get_feature_view(feature_view_name)
      task = batch_feature_view.publish_table(partitions={'ds':cur_day}, mode='Overwrite')
      task.wait()
      task.print_summary()
    4. Click Properties on the right side of the page. In the Properties panel, configure the scheduling parameters that are described in the following table.

      Parameter

      Description

      Scheduling Parameter

      Parameter Name

      dt

      Parameter Value

      $[yyyymmdd-1]

      Resource Group

      Resource Group

      Select the exclusive resource group that you created.

      Dependencies

      Select the user table that you created.

    5. After the node is configured and tested, save and submit the node configurations.

    6. Backfill data for the node. For more information, see the Synchronize data from simulated tables section of this topic.

  5. Synchronize the item table.

    1. Move the pointer over the Create button and choose Create Node > MaxCompute > PyODPS 3.

    2. In the Create Node dialog box, configure the node parameters and click Confirm.

    3. Copy the following code to the code editor:

      Synchronize data from the item_table_preprocess_all_feature_v1 feature view (Click to view details)

      from feature_store_py.fs_client import FeatureStoreClient
      import datetime
      from feature_store_py.fs_datasource import MaxComputeDataSource
      import sys
      
      cur_day = args['dt']
      print('cur_day = ', cur_day)
      
      access_key_id = o.account.access_id
      access_key_secret = o.account.secret_access_key
      fs = FeatureStoreClient(access_key_id=access_key_id, access_key_secret=access_key_secret, region='cn-beijing')
      cur_project_name = 'fs_demo'
      project = fs.get_project(cur_project_name)
      
      feature_view_name = 'item_table_preprocess_all_feature_v1'
      batch_feature_view = project.get_feature_view(feature_view_name)
      task = batch_feature_view.publish_table(partitions={'ds':cur_day}, mode='Overwrite')
      task.wait()
      task.print_summary()
    4. Click Properties on the right side of the page. In the Properties panel, configure the scheduling parameters that are described in the following table.

      Parameter

      Description

      Scheduling Parameter

      Parameter Name

      dt

      Parameter Value

      $[yyyymmdd-1]

      Resource Group

      Resource Group

      Select the exclusive resource group that you created.

      Dependencies

      Select the item table that you created.

    5. After the node is configured and tested, save and submit the node configurations.

    6. Backfill data for the node. For more information, see the Synchronize data from simulated tables section of this topic.

  6. Synchronize the real-time sequence table.

    1. Move the pointer over the Create button and choose Create Node > MaxCompute > PyODPS 3.

    2. In the Create Node dialog box, configure the node parameters and click Confirm.

    3. Copy the following code to the code editor:

      Synchronize data from the wide_seq_feature_v3 feature view (Click to view details)

      from feature_store_py.fs_client import FeatureStoreClient
      import datetime
      from feature_store_py.fs_datasource import MaxComputeDataSource
      import sys
      
      cur_day = args['dt']
      print('cur_day = ', cur_day)
      
      access_key_id = o.account.access_id
      access_key_secret = o.account.secret_access_key
      fs = FeatureStoreClient(access_key_id=access_key_id, access_key_secret=access_key_secret, region='cn-beijing')
      cur_project_name = 'fs_demo'
      project = fs.get_project(cur_project_name)
      
      feature_view_name = 'wide_seq_feature_v3'
      batch_feature_view = project.get_feature_view(feature_view_name)
      task = batch_feature_view.publish_table(partitions={'ds':cur_day},days_to_load=30)
      task.wait()
      task.print_summary()
    4. Click Properties on the right side of the page. In the Properties panel, configure the scheduling parameters that are described in the following table.

      Parameter

      Description

      Scheduling Parameter

      Parameter Name

      dt

      Parameter Value

      $[yyyymmdd-1]

      Resource Group

      Resource Group

      Select the exclusive resource group that you created.

      Dependencies

      Select the item table that you created.

    5. After the node is configured and tested, save and submit the node configurations.

    6. Backfill data for the node. For more information, see the Synchronize data from simulated tables section of this topic.

  7. After the data is synchronized, you can view the latest features that are synchronized in the Hologres data store.

Create and deploy a model service by using EAS

You can use Elastic Algorithm Service (EAS) to deploy a model service. The model service receives requests from the recommendation engine, scores the items based on the requests, and then returns the scores. The EasyRec processor integrates FeatureStore SDK for C++ to retrieve features at low latency. After the EasyRec processor retrieves features by using the SDK, it passes the features to the model for inference and returns the scores to the recommendation engine.

To deploy a model service, perform the following steps:

  1. Log on to the DataWorks console.

  2. In the left-side navigation pane, choose Data Development and Governance > DataStudio.

  3. Select the DataWorks workspace that you created and click Go to DataStudio.

  4. Move the pointer over the Create button and choose Create Node > MaxCompute > PyODPS 3.

  5. In the Create Node dialog box, configure the node parameters and click Confirm.

  6. Copy the following code to the code editor:

    import os
    import json
    config = {
      "name": "fs_demo_v1",
      "metadata": {
        "cpu": 4,
        "rpc.max_queue_size": 256,
        "rpc.enable_jemalloc": 1,
        "gateway": "default",
        "memory": 16000
      },
      "model_path": f"oss://beijing0009/EasyRec/deploy/rec_sln_demo_dbmtl_v1/{args['ymd']}/export/final_with_fg",
      "model_config": {
        "access_key_id": f'{o.account.access_id}',
        "access_key_secret": f'{o.account.secret_access_key}',
        "region": "cn-beijing",
        "fs_project": "fs_demo",
        "fs_model": "fs_rank_v2",
        "fs_entity": "item",
        "load_feature_from_offlinestore": True,
        "steady_mode": True,
        "period": 2880,
        "outputs": "probs_is_click,y_ln_playtime,probs_is_praise",
        "fg_mode": "tf"
      },
      "processor": "easyrec-1.8",
      "processor_type": "cpp"
    }
    
    with open("echo.json", "w") as output_file:
        json.dump(config, output_file)
    
    # Run the following line of code for the first deployment:
    os.system(f"/home/admin/usertools/tools/eascmd -i {o.account.access_id} -k {o.account.secret_access_key} -e pai-eas.cn-beijing.aliyuncs.com create echo.json")
    
    # Run the following line of code for routine updates:
    # os.system(f"/home/admin/usertools/tools/eascmd -i {o.account.access_id} -k {o.account.secret_access_key} -e pai-eas.cn-beijing.aliyuncs.com modify fs_demo_v1 -s echo.json")
  7. Click Properties on the right side of the page. In the Properties panel, configure the scheduling parameters that are described in the following table.

    Parameter

    Description

    Scheduling Parameter

    Parameter Name

    dt

    Parameter Value

    $[yyyymmdd-1]

    Resource Group

    Resource Group

    Select the exclusive resource group that you created.

    Dependencies

    Select the training job and the item_table_preprocess_all_feature_v1 feature view.

  8. After the node is configured and tested, run the node to view the deployment status.

  9. After the deployment is complete, comment out Line 34 and uncomment Line 37 in the code to run the job on a regular basis.

  10. You can view the deployed service on the Inference Service tab of the Elastic Algorithm Service (EAS) page in the PAI console. For more information, see Deploy a model service in the PAI console.

Configure PAI-Rec

PAI-Rec is a recommendation engine service that integrates FeatureStore SDK for Go and can be seamlessly integrated with FeatureStore and EAS.

To configure PAI-Rec, perform the following steps:

  1. Configure the FeatureStoreConfs parameter.

    • RegionId: the ID of the region in which FeatureStore is activated. In this example, cn-beijing is used.

    • ProjectName: the name of the project that you created in FeatureStore. In this example, fs_demo is used.

        "FeatureStoreConfs": {
            "pairec-fs": {
                "RegionId": "cn-beijing",
                "AccessId": "${AccessKey}",
                "AccessKey": "${AccessSecret}",
                "ProjectName": "fs_demo"
            }
        },
  2. Configure the FeatureConfs parameter.

    • FeatureStoreName: Set this parameter to pairec-fs which is specified in the FeatureStoreConfs parameter.

    • FeatureStoreModelName: the name of the model feature that you created. In this example, fs_rank_v1 is used.

    • FeatureStoreEntityName: the name of the feature entity that you created. In this example, user is used. The parameter settings enable PAI-Rec to retrieve user features in the fs_rank_v1 model by using FeatureStore SDK for Go.

        "FeatureConfs": {
            "recreation_rec": {
                "AsynLoadFeature": true,
                "FeatureLoadConfs": [
                    {
                        "FeatureDaoConf": {
                            "AdapterType": "featurestore",
                            "FeatureStoreName": "pairec-fs",
                            "FeatureKey": "user:uid",
                            "FeatureStoreModelName": "fs_rank_v1",
                            "FeatureStoreEntityName": "user",
                            "FeatureStore": "user"
                        }
                    }
                ]
            }
        },
  3. Configure the AlgoConfs parameter.

    The AlgoConfs parameter specifies the scoring service in EAS to which PAI-Rec connects.

    • Name: the name of the service that you deployed by using EAS.

    • Url and Auth: the URL and token that are used to access the service that you deployed by using EAS. You can click the service name on the Elastic Algorithm Service (EAS) page, and then click View Endpoint Information on the Service Details tab to obtain the URL and token. For more information, see FAQ about EAS.

        "AlgoConfs": [
            {
                "Name": "fs_demo_v1",
                "Type": "EAS",
                "EasConf": {
                    "Processor": "EasyRec",
                    "Timeout": 300,
                    "ResponseFuncName": "easyrecMutValResponseFunc",
                    "Url": "eas_url_xxx",
                    "EndpointType": "DIRECT",
                    "Auth": "eas_token"
                }
            }
        ],