All Products
Search
Document Center

Platform For AI:FeatureStore SDK for Python

Last Updated:Aug 26, 2025

This topic describes how to use the FeatureStore SDK for Python to integrate and serve features for offline model training and online inference.

Background information

FeatureStore is a hub for creating, sharing, and managing features for machine learning models. It facilitates efficient teamwork, guarantees consistency across offline and online features, and offers rapid access to online features. It is ideal for a variety of use cases that depend on features, such as recommendation systems. FeatureStore automates the construction and maintenance of online and offline feature tables, ensuring data consistency and eliminating redundant storage to cut down on resource expenses. With just a single line of code, you can perform complex tasks such as exporting training tables using SQL or importing data into Hologres.

FeatureStore seamlessly integrates the entire process from feature creation to model development. It supports MaxCompute for offline tasks, along with FeatureDB, Hologres, and Tablestore for online operations. You can perform all necessary operations through the console or Python SDK without the need to learn about these platforms. This enhances team productivity and ensures consistency across offline and online environments. Moreover, FeatureStore offers deep integration with EasyRec for efficient feature generation (FG) and model training, along with direct online deployment. This enables rapid development of state-of-the-art recommendation systems.

If you experience any issues while using the service, join our DingTalk group (32260796).

Prerequisites

Before you go

We recommend that you run the code in this topic on a DSW instance.

  1. Install the Python SDK in a Python 3 environment.

    ! pip install https://feature-store-py.oss-cn-beijing.aliyuncs.com/package/feature_store_py-1.3.1-py3-none-any.whl
  2. To minimize the risk of sensitive information leakage, we recommend that you transmit the Alibaba Cloud account's AccessKey ID and AccessKey Secret as environment variables.

    1. Click Terminal from the top menu bar.

    2. Run the following command. Replace YOUR_AccessKey_ID with your actual AccessKey ID.

      echo "export AccessKeyID='YOUR_AccessKey_ID'" >> ~/.bashrc
      source ~/.bashrc
    3. Run the following command. Replace YOUR_Access_Key_Secret with your actual AccessKey Secret.

      echo "export AccessKeySecret='YOUR_Access_Key_Secret'" >> ~/.bashrc
      source ~/.bashrc
  3. Import the necessary modules.

    import unittest
    import sys
    import os
    from os.path import dirname, join, abspath
    from feature_store_py.fs_client import FeatureStoreClient
    from feature_store_py.fs_project import FeatureStoreProject
    from feature_store_py.fs_datasource import UrlDataSource, MaxComputeDataSource, DatahubDataSource, HologresDataSource, SparkDataSource, LabelInput, TrainingSetOutput
    from feature_store_py.fs_type import FSTYPE
    from feature_store_py.fs_schema import OpenSchema, OpenField
    from feature_store_py.fs_feature_view import FeatureView
    from feature_store_py.fs_features import FeatureSelector
    from feature_store_py.fs_config import EASDeployConfig, LabelInputConfig, PartitionConfig, FeatureViewConfig, TrainSetOutputConfig, SequenceFeatureConfig, SequenceTableConfig
    import logging
    logger = logging.getLogger("foo")
    logger.addHandler(logging.StreamHandler(stream=sys.stdout))

Sample dataset

This topic uses Moviedata, an open-source dataset as an example. Movie, User, and Rating are used, correspond to the item, user, and label tables in the recommendation process.

Configure FeatureStore project

You can create multiple independent projects in FeatureStore. To run a notebook, you must first configure data stores in FeatureStore.

Here, offline_datasource_id is ID of the offline data store. online_datasource_id is the ID of the online data store.

In the following example, the project name is fs_movie.

# Enter the access_key_id of your Alibaba Cloud account
access_id = os.getenv("AccessKeyID")
# Enter the access_key_secret of your Alibaba Cloud account
access_ak = os.getenv("AccessKeySecret")
# Enter the region where you activated FeatureStore. In this example, the China (Hangzhou) region is used
region = 'cn-hangzhou'
fs = FeatureStoreClient(access_key_id=access_id, access_key_secret=access_ak, region=region)
# Enter the name of your feature platform project. In this example, fs_movie is used
cur_project_name = "fs_movie"
project = fs.get_project(cur_project_name)
if project is None:
    raise ValueError("Need to create project : fs_movie")

Obtain and print the project information.

project = fs.get_project(cur_project_name)
print(project)

Configure feature entities

A feature entity is a set of semantically related features. Each feature entity can be associated with multiple feature views. Each entity has a JoinId for associating features from various feature views. Each feature view has a primary key for feature retrieval, but the primary key's name can be different from the JoinId.

Create three entities: movie, user, and rating.

cur_entity_name_movie = "movie_data"
join_id = 'movie_id'
entity_movie = project.get_entity(cur_entity_name_movie)
if entity_movie is None:
	entity_movie = project.create_entity(name = cur_entity_name_movie, join_id=join_id)
entity_movie.print_summary()
cur_entity_name_user = "user_data"
join_id = 'user_md5'
entity_user = project.get_entity(cur_entity_name_user)
if entity_user is None:
  entity_user = project.create_entity(name = cur_entity_name_user, join_id=join_id)
entity_user.print_summary()
cur_entity_name_ratings = "rating_data"
join_id = 'rating_id'
entity_ratings = project.get_entity(cur_entity_name_ratings)
if entity_ratings is None:
  entity_ratings = project.create_entity(name = cur_entity_name_ratings, join_id=join_id)
entity_ratings.print_summary()

Configure feature view

FeatureStore manages and organizes feature data. External data is introduced to the platform through feature views, which define the data store, pre-processing or transformation operations, data structure, storage location, and feature metadata management. This includes primary key, event time, partition key, feature entity, and time to live (TTL) settings. The default value of -1 specifies that the online data store retains all feature data. A positive value specifies that the online data store retains only the most recent feature data within the specified time period.

Feature views have the following types:

  • BatchFeatureView: Offline or T-1 day features. Offline data is injected into the offline data store and can be synchronized to the online data store for real-time queries.

  • StreamFeatureView: Real-time features. Data is written directly into the offline data store and synchronized to the online data store.

  • Sequence FeatureView: Sequence features, allowing offline writing and online real-time reading.

BatchFeatureView

For data stored in a CSV file, upload it to MaxCompute by specifying the CSV file's URL. You must manually create the schema for the Feature view.

path = 'https://feature-store-test.oss-cn-beijing.aliyuncs.com/dataset/moviedata_all/movies.csv'
delimiter = ','
omit_header = True
ds = UrlDataSource(path, delimiter, omit_header)
print(ds)

schema specifies the name and type of each field.

movie_schema = OpenSchema(
    OpenField(name='movie_id', type='STRING'),
    OpenField(name='name', type='STRING'),
    OpenField(name='alias', type='STRING'),
    OpenField(name='actors', type='STRING'),
    OpenField(name='cover', type='STRING'),
    OpenField(name='directors', type='STRING'),
    OpenField(name='double_score', type='STRING'),
    OpenField(name='double_votes', type='STRING'),
    OpenField(name='genres', type='STRING'),
    OpenField(name='imdb_id', type='STRING'),
    OpenField(name='languages', type='STRING'),
    OpenField(name='mins', type='STRING'),
    OpenField(name='official_site', type='STRING'),
    OpenField(name='regions', type='STRING'),
    OpenField(name='release_date', type='STRING'),
    OpenField(name='slug', type='STRING'),
    OpenField(name='story', type='STRING'),
    OpenField(name='tags', type='STRING'),
    OpenField(name='year', type='STRING'),
    OpenField(name='actor_ids', type='STRING'),
    OpenField(name='director_ids', type='STRING'),
    OpenField(name='dt', type='STRING')
)
print(movie_schema)

Create a batch feature view.

feature_view_movie_name = "feature_view_movie"
batch_feature_view = project.get_feature_view(feature_view_movie_name)
if batch_feature_view is None:
  batch_feature_view = project.create_batch_feature_view(name=feature_view_movie_name, schema=movie_schema, online = True, entity= cur_entity_name_movie, primary_key='movie_id', partitions=['dt'], ttl=-1)
batch_feature_view = project.get_feature_view(feature_view_movie_name)
batch_feature_view.print_summary()

Write data to the MaxCompute table.

cur_task = batch_feature_view.write_table(ds, partitions={'dt':'20220830'})
cur_task.wait()

View information about the current task.

print(cur_task.task_summary)

Synchronize the data to the online data store.

cur_task = batch_feature_view.publish_table({'dt':'20220830'})
cur_task.wait()
print(cur_task.task_summary)

Get the feature view.

batch_feature_view = project.get_feature_view(feature_view_movie_name)

Print the feature view information.

batch_feature_view.print_summary()

Sequentially import the user and ratings tables.

users_path = 'https://feature-store-test.oss-cn-beijing.aliyuncs.com/dataset/moviedata_all/users.csv'
ds = UrlDataSource(users_path, delimiter, omit_header)
print(ds)
user_schema = OpenSchema(
  OpenField(name='user_md5', type='STRING'),
  OpenField(name='user_nickname', type='STRING'),
  OpenField(name='ds', type='STRING')
)
print(user_schema)
feature_view_user_name = "feature_view_users"
batch_feature_view = project.get_feature_view(feature_view_user_name)
if batch_feature_view is None:
  batch_feature_view = project.create_batch_feature_view(name=feature_view_user_name, schema=user_schema, online = True, entity= cur_entity_name_user, primary_key='user_md5',ttl=-1, partitions=['ds'])
write_table_task = batch_feature_view.write_table(ds, {'ds':'20220830'})
write_table_task.wait()
print(write_table_task.task_summary)
cur_task = batch_feature_view.publish_table({'ds':'20220830'})
cur_task.wait()
print(cur_task.task_summary)
batch_feature_view = project.get_feature_view(feature_view_user_name)
batch_feature_view.print_summary()
ratings_path = 'https://feature-store-test.oss-cn-beijing.aliyuncs.com/dataset/moviedata_all/ratings.csv'
ds = UrlDataSource(ratings_path, delimiter, omit_header)
print(ds)
ratings_schema = OpenSchema(
  OpenField(name='rating_id', type='STRING'),
  OpenField(name='user_md5', type='STRING'),
  OpenField(name='movie_id', type='STRING'),
  OpenField(name='rating', type='STRING'),
  OpenField(name='rating_time', type='STRING'),
  OpenField(name='dt', type='STRING')
)
feature_view_rating_name = "feature_view_ratings"
batch_feature_view = project.get_feature_view(feature_view_rating_name)
if batch_feature_view is None:
  batch_feature_view = project.create_batch_feature_view(name=feature_view_rating_name, schema=ratings_schema, online = True, entity= cur_entity_name_ratings, primary_key='rating_id', event_time='rating_time', partitions=['dt'])
cur_task = batch_feature_view.write_table(ds, {'dt':'20220831'})
cur_task.wait()
print(cur_task.task_summary)
batch_feature_view = project.get_feature_view(feature_view_rating_name)
batch_feature_view.print_summary()

StreamFeatureView

You can define the schema as follows.

online_schema = OpenSchema(
    OpenField(name='id', type='STRING'),
    OpenField(name='count_value', type='INT64'),
    OpenField(name='metric_value', type='DOUBLE')
)

You can copy the following SQL statement and execute it in MaxCompute or DataWorks within your project. This statement generates test data. The data is for testing purposes only and has no specific meaning.

CREATE TABLE IF NOT EXISTS online_stream_test_t1 (
  id STRING  COMMENT 'ID',
  count_value BIGINT  COMMENT 'Count value',
  metric_value DOUBLE COMMENT 'Metric value'
)
PARTITIONED BY (
    ds string COMMENT 'Data timestamp'
)
LIFECYCLE 365
;


INSERT INTO TABLE online_stream_test_t1 PARTITION (ds='20250815')
SELECT 
    CONCAT('str_', CAST(id AS STRING)) AS id,
    CAST(FLOOR(RAND() * 1000000) AS BIGINT) AS count_value,
    ROUND(RAND() * 1000, 2) AS metric_value
FROM (
SELECT SEQUENCE(1, 1000) AS id_list
) tmp
LATERAL VIEW EXPLODE(id_list) table_tmp AS id;

After the statement is successfully executed, the online_stream_test_t1 real-time feature table is created, and data from the ds=20250815 partition is synced to this table.

You can create a new stream_feature_view.

feature_view_rating_name_stream = "feature_view_online_stream"
stream_feature_view = project.get_feature_view(feature_view_rating_name_stream)
if stream_feature_view is None:
    stream_feature_view = project.create_stream_feature_view(name=feature_view_rating_name_stream, schema=online_schema,
                                                            online=True, entity=cur_entity_name_user,
                                                            primary_key='id', event_time='count_value')

The event_time field in a StreamFeatureView has a special purpose. When this field is configured, its value is used to clean up expired data. For more information, see Real-time feature lifecycle.

You can print the information about the feature view.

stream_feature_view = project.get_feature_view(feature_view_rating_name_stream)
stream_feature_view.print_summary()

You can sync the data to the online store.

# Change offline_datasource_id to the offline store ID of your FeatureStore project.
# table_name is the offline feature table to push to the online store.
stream_task = stream_feature_view.publish_table(partitions={'ds': '20250815'}, mode='Merge', offline_to_online=True,
                                                publish_config={'offline_datasource_id': project.offline_datasource_id,
                                                                'table_name': 'online_stream_test_t1'})
stream_task.wait()
print(stream_task.task_summary)

Sequence FeatureView

The source data table is located in the pai_online_project, which has public read permissions. You can execute the following SQL statement in MaxCompute or DataWorks within your project to copy the sequence feature data table to your own project.

CREATE TABLE IF NOT EXISTS rec_sln_demo_behavior_table_preprocess_sequence_wide_seq_feature_v3
like pai_online_project.rec_sln_demo_behavior_table_preprocess_sequence_wide_seq_feature_v3
STORED AS ALIORC  
LIFECYCLE 90;

INSERT OVERWRITE TABLE rec_sln_demo_behavior_table_preprocess_sequence_wide_seq_feature_v3 PARTITION(ds)
SELECT *
FROM pai_online_project.rec_sln_demo_behavior_table_preprocess_sequence_wide_seq_feature_v3
WHERE ds >= '20231022' and ds <='20231024'

After the statement is successfully executed, the rec_sln_demo_behavior_table_preprocess_sequence_wide_seq_feature_v3 sequence feature table is created, and data from the ds=20231022, ds=20231023, and ds=20231024 partitions is synced to this table.

You can create a new seq_feature_view.

user_entity_name = "user"
seq_feature_view_name = "wide_seq_feature_v3"
seq_feature_view = project.get_feature_view(seq_feature_view_name)
if seq_feature_view is None:
    seq_table_name = "rec_sln_demo_behavior_table_preprocess_sequence_wide_seq_feature_v3"
    behavior_table_name = 'rec_sln_demo_behavior_table_preprocess_v3'
    ds = MaxComputeDataSource(project.offline_datasource_id, behavior_table_name)
    event_time = 'event_unix_time'  # The name of the event time field in the behavior table.
    item_id = 'item_id'  # The name of the item_id field in the behavior table.
    event = 'event'  # The name of the event field in the behavior table.
    # deduplication_method = 1 indicates that duplicates are removed based on ['user_id', 'item_id', 'event'].
    # deduplication_method = 2 indicates that duplicates are removed based on ['user_id', 'item_id', 'event', 'event_time'].
    sequence_feature_config_list = [
        SequenceFeatureConfig(offline_seq_name='click_seq_50_seq', seq_event='click', online_seq_name='click_seq_50',
                              seq_len=50)]
    # offline_seq_name is the name of the sequence feature field in the offline sequence list. seq_event is the name of the behavior field. online_seq_name is the name assigned to the user's behavior sequence item_ids when queried by the FeatureStore online Go SDK.
    # seq_len is the sequence length. Sequences longer than this length are truncated.
    seq_table_config = SequenceTableConfig(table_name=seq_table_name, primary_key='user_id',
                                           event_time='event_unix_time')
    seq_feature_view = project.create_sequence_feature_view(seq_feature_view_name, datasource=ds,
                                                            event_time=event_time, item_id=item_id, event=event,
                                                            deduplication_method=1,
                                                            sequence_feature_config=sequence_feature_config_list,
                                                            sequence_table_config=seq_table_config,
                                                            entity=user_entity_name)

You can print the information about the feature view.

seq_feature_view.print_summary()

You can sync the data to the online store.

seq_task = seq_feature_view.publish_table({'ds': '20231023'}, days_to_load=30)
seq_task.wait()

seq_task.print_summary()

You can register the label table.

label_table_name = 'fs_movie_feature_view_ratings_offline'
ds = MaxComputeDataSource(data_source_id=project.offline_datasource_id, table=label_table_name)
label_table = project.get_label_table(label_table_name)
if label_table is None:
  label_table = project.create_label_table(datasource=ds, event_time='rating_time')

Configure offline data store

An offline data store is a data warehouse for storing offline features. Offline features are written to MaxCompute or Hadoop Distributed File System (HDFS) using Apache Spark. Offline data stores are used to generate training sets for model training and to serve features for batch predictions.

Configure online data store

An online data store is a data warehouse for storing real-time features. It enables low-latency access to the latest features for online inference. FeatureDB, Hologres, and Tablestore are supported.

Retrieve online features

Retrieve online features from a feature view perspective, currently prioritizing support for FeatureDB. (For FeatureDB-related documentation, please refer to: FeatureDB overview)

feature_view_movie_name = "feature_view_movie"
batch_feature_view = project.get_feature_view(feature_view_movie_name)
ret_features_1 = batch_feature_view.list_feature_view_online_features(join_ids=['26357307'])
print("ret_features1 = ", ret_features_1)
ret_features_2 = batch_feature_view.list_feature_view_online_features(join_ids=['30444960', '3317352'])
print("ret_features2 = ", ret_features_2)

Configure featureSelector

FeatureSelector defines the range of features to retrieve from online and offline data stores. You can specify a feature view to extract features from it.

feature_view_name = 'feature_view_movie'
# Retrieve specific features
feature_selector = FeatureSelector(feature_view_name, ['site_id', 'site_category'])

# Retrieve all features
feature_selector = FeatureSelector(feature_view_name, '*')

# Configure an alias for the feature that you want to retrieve
feature_selector = FeatureSelector(
    feature_view='user1',
    features = ['f1','f2', 'f3'],
    alias={"f1":"f1_1"} # Specify the f1_1 field as the alias for the f1 field 
)

Configure sample table (training set)

FeatureStore can generate a sample table for model training, which includes labels and features. You must prepare the labels for model training and define the features that the model needs to fetch from the feature views. The labels are associated with the features by using point-in-time joins based on the primary keys.

label_table_name = 'fs_movie_feature_view_ratings_offline'
output_ds = MaxComputeDataSource(data_source_id=project.offline_datasource_id)
train_set_output = TrainingSetOutput(output_ds)
feature_view_movie_name = "feature_view_movie"
feature_movie_selector = FeatureSelector(feature_view_movie_name, ['name', 'actors', 'regions','tags'])
feature_view_user_name = 'feature_view_users'
feature_user_selector = FeatureSelector(feature_view_user_name, ['user_nickname'])
train_set = project.create_training_set(label_table_name=label_table_name, train_set_output= train_set_output, feature_selectors=[feature_movie_selector, feature_user_selector])
print("train_set = ", train_set)

Train model

Train a model using the train_set generated by FeatureStore and deploy the trained model as an inference service.

model_name = "fs_rank_v1"
cur_model = project.get_model(model_name)
if cur_model is None:
  cur_model = project.create_model(model_name, train_set)
print("cur_model_train_set_table_name = ", cur_model.train_set_table_name)

Export sample table

To export a sample table for model training, specify the label table and the event time and partition of each feature view.

label_partitions = PartitionConfig(name = 'dt', value = '20220831')
label_input_config = LabelInputConfig(partition_config=label_partitions, event_time='1999-01-00 00:00:00')

movie_partitions = PartitionConfig(name = 'dt', value = '20220830')
feature_view_movie_config = FeatureViewConfig(name = 'feature_view_movie', partition_config=movie_partitions)

user_partitions = PartitionConfig(name = 'ds', value = '20220830')
feature_view_user_config = FeatureViewConfig(name = 'feature_view_users', partition_config=user_partitions)
feature_view_config_list = [feature_view_movie_config, feature_view_user_config]
train_set_partitions = PartitionConfig(name = 'dt', value = '20220831')
train_set_output_config = TrainSetOutputConfig(partition_config=train_set_partitions)

Export the sample table based on the specified conditions.

task = cur_model.export_train_set(label_input_config, feature_view_config_list, train_set_output_config)
task.wait()
print(task.summary)