【DSW Gallery】Characteristic platform

Dataset introduction
We are using an open source movie dataset, the official website of the dataset: http://moviedata.csuldw.com
Among them, movie data, user data, and rating data are mainly used. These three pieces of data can correspond to the material table, user table, and label table in the recommendation process.
We will show how to use the feature store to conveniently integrate the features of the three data sets to train the model offline and complete subsequent online services.
Project
We can create multiple project spaces through project, and each project space is independent. Basic information will be configured in the project, and each project will correspond to an offlinestore and onlinestore.
Running the notebook requires the cooperation of the feature store server. After purchasing the pairec configuration center instance, you can see the service interface address (host) and token in the configuration center.
host = ""
token = ""
fs = FeatureStoreClient(host, token)
cur_project_name = "fs_movie_7"
offline_datasource_id = 38
online_datasource_id = 1
project = fs.get_project(cur_project_name)
if project is None:
project = fs.create_project(cur_project_name, offline_datasource_id, online_datasource_id)
Get the corresponding project
project = fs.get_project(cur_project_name)
Print information about the project
project. print_summary()
FeatureEntity
FeatureEntity describes a set of related feature collections. Multiple FeatureViews can be associated with one FeatureEntity. Each Entity will have an Entity JoinId, and multiple FeatureView features can be associated through JoinId. Each FeatureView has a primary key (index key) to obtain the feature data below it, but the index key here can be different from the name defined by JoinId. Here we create three Entities of movie, user and rating.
cur_entity_name_movie = "movie_data"
join_id = 'movie_id'
entity_id = None
entity_id = project.get_entity(cur_entity_name_movie)
if entity_id is None:
entity_id = project.create_entity(name = cur_entity_name_movie, join_id=join_id)
print("entity_id = ", entity_id)
Get the corresponding entity
Get the corresponding entity
Print the information of the entity
feature_entity. print_summary()
cur_entity_name_user = "user_data"
join_id = 'user_md5'
entity_id = None
entity_id = project.get_entity(cur_entity_name_user)
if entity_id is None:
entity_id = project.create_entity(name = cur_entity_name_user, join_id=join_id)
print("entity_id = ", entity_id)
cur_entity_name_ratings = "rating_data"
join_id = 'rating_id'
entity_id = None
entity_id = project.get_entity(cur_entity_name_ratings)
if entity_id is None:
entity_id = project.create_entity(name = cur_entity_name_ratings, join_id=join_id)
print("entity_id = ", entity_id)
FeatureView
FeatureStore is a feature management platform. When external data enters FS, it needs to pass FeatureView. FeatureView specifies where the data comes from (DataSource), what transformations are required for data to enter FS (feature engineering/Transformation), feature schema (feature name + type), where the data needs to be placed (OnlineStore/OfflineStore), feature meta (primary key, event time, partition key, FeatureEntity, ttl ).
FeatureView will be divided into two types, BatchFeatureView and StreamFeatureView . BatchFeatureView can inject offline data into FS, and StreamFeatureView supports writing of real-time features. BatchFeatureView will manage data into OfflineStore, and then you can choose to synchronize to OnlineStore. StreamFeatureView will write data to OnlineStore, and then synchronize to OfflineStore, but in fact we will write the same data to it at the same time.
BatchFeatureView
The feature data in DataSource is written into FS, there are two situations.
1. Write data directly
UrlDataSource is written to the offlinestore of maxcompute, then the schema of the defined FeatureView needs to be created manually.
path = 'https://feature-store-test.oss-cn-beijing.aliyuncs.com/dataset/moviedata_all/movies.csv'
delimiter = ','
omit_header = True
ds = UrlDataSource(path, delimiter, omit_header)
print(ds)
The schema defines the names and types of the fields.
movie_schema = Schema(
Field(name='movie_id', type=FSTYPE. STRING),
Field(name='name', type=FSTYPE. STRING),
Field(name='alias', type=FSTYPE. STRING),
Field(name='actores', type=FSTYPE. STRING),
Field(name='cover', type=FSTYPE. STRING),
Field(name='directors', type=FSTYPE. STRING),
Field(name='double_score', type=FSTYPE. STRING),
Field(name='double_votes', type=FSTYPE. STRING),
Field(name='genres', type=FSTYPE. STRING),
Field(name='imdb_id', type=FSTYPE. STRING),
Field(name='languages', type=FSTYPE. STRING),
Field(name='mins', type=FSTYPE. STRING),
Field(name='official_site', type=FSTYPE. STRING),
Field(name='regions', type=FSTYPE. STRING),
Field(name='release_data', type=FSTYPE. STRING),
Field(name='slug', type=FSTYPE. STRING),
Field(name='story', type=FSTYPE. STRING),
Field(name='tags', type=FSTYPE. STRING),
Field(name='year', type=FSTYPE. STRING),
Field(name='actor_ids', type=FSTYPE. STRING),
Field(name='director_ids', type=FSTYPE. STRING),
Field(name='dt', type=FSTYPE. STRING)
)
print(movie_schema)
New batch_feature_view
feature_view_movie_name = "feature_view_movie"
batch_feature_view = project.get_feature_view(feature_view_movie_name)
if batch_feature_view is None:
batch_feature_view = project.create_batch_feature_view(name=feature_view_movie_name, owner='yancheng', schema=movie_schema, online = True, entity= cur_entity_name_movie, primary_key='movie_id', partitions=['dt'], ttl=-1)
batch_feature_view = project.get_feature_view(feature_view_movie_name)
batch_feature_view.print_summary()
Write data to mc table
cur_task = batch_feature_view.write_table(ds, partitions={'dt':'20220830'})
cur_task.wait()
View information about the current task
print(cur_task.task_summary)
Data is synchronized to onlinestore
cur_task = batch_feature_view. publish_table({'dt':'20220830'})
cur_task.wait()
print(cur_task.task_summary)
Get the corresponding FeatureView
batch_feature_view = project.get_feature_view(feature_view_movie_name)
Print the information of the FeatureView
batch_feature_view.print_summary()
In this order, we import the users table and the ratings table in turn.
users_path = 'https://feature-store-test.oss-cn-beijing.aliyuncs.com/dataset/moviedata_all/users.csv'
ds = UrlDataSource(users_path, delimiter, omit_header)
print(ds)
user_schema = Schema(
Field(name='user_md5', type=FSTYPE.STRING),
Field(name='user_nickname', type=FSTYPE. STRING),
Field(name='ds', type=FSTYPE. STRING)
)
print(user_schema)
feature_view_user_name = "feature_view_users_1"
batch_feature_view = project.get_feature_view(feature_view_user_name)
if batch_feature_view is None:
batch_feature_view = project.create_batch_feature_view(name=feature_view_user_name, owner='yancheng', schema=user_schema, online = True, entity= cur_entity_name_user, primary_key='user_md5', ttl=-1, partitions=['ds'])
write_table_task = batch_feature_view.write_table(ds, {'ds':'20220830'})
write_table_task.wait()
print(write_table_task.task_summary)
cur_task = batch_feature_view. publish_table({'ds':'20220830'})
cur_task.wait()
print(cur_task.task_summary)
batch_feature_view = project.get_feature_view(feature_view_user_name)
batch_feature_view.print_summary()
ratings_path = 'https://feature-store-test.oss-cn-beijing.aliyuncs.com/dataset/moviedata_all/ratings.csv'
ds = UrlDataSource(ratings_path, delimiter, omit_header)
print(ds)
ratings_schema = Schema(
Field(name='rating_id', type=FSTYPE. STRING),
Field(name='user_md5', type=FSTYPE. STRING),
Field(name='movie_id', type=FSTYPE. STRING),
Field(name='rating', type=FSTYPE. STRING),
Field(name='rating_time', type=FSTYPE. STRING),
Field(name='dt', type=FSTYPE. STRING)
)
feature_view_rating_name = "feature_view_ratings"
batch_feature_view = project.get_feature_view(feature_view_rating_name)
if batch_feature_view is None:
batch_feature_view = project.create_batch_feature_view(name=feature_view_rating_name, owner='yancheng', schema=ratings_schema, online = True, entity= cur_entity_name_ratings, primary_key='rating_id', event_time='rating_time', partitions=['dt'])
cur_task = batch_feature_view.write_table(ds, {'dt':'20220831'})
cur_task.wait()
print(cur_task.task_summary)
batch_feature_view = project.get_feature_view(feature_view_rating_name)
batch_feature_view.print_summary()
Offline store
The data warehouse for offline feature data storage is MaxCompute or HDFS on DS in our system, but data is written through spark. Through offlinestore, we can generate training set data, that is, samples, for model training. Another one can generate batch predition data for batch prediction.
online store
During online prediction, low-latency acquisition of feature data is required, and onlinestore provides storage of online feature data. We currently prioritize support for holgres or redis.
Acquisition of online features
We can get online features from FeatureView perspective
feature_view_movie_name = "feature_view_movie"
batch_feature_view = project.get_feature_view(feature_view_movie_name)
ret_features_1 = batch_feature_view.get_online_features(join_ids={'movie_id':['26357307']}, features=['name', 'actores', 'regions'])
print("ret_features = ", ret_features_1)
feature_view_movie_name = "feature_view_movie"
batch_feature_view = project.get_feature_view(feature_view_movie_name)
ret_features_2 = batch_feature_view.get_online_features(join_ids={'movie_id':['30444960']}, features=['name', 'actores', 'regions'])
print("ret_features = ", ret_features_2)
FeatureSelector
When we get features from offlinestore or onlinestore, we need to clearly point out which features to get. Features can be selected from the perspective of FeatureView.
feature_view_name = 'feature_view_movie'
# Select some features
feature_selector = FeatureSelector(feature_view_name, ['site_id', 'site_category'])
# select all features
feature_selector = FeatureSelector(feature_view_name, '*')
# support aliases
feature_selector = FeatureSelector(
feature_view='user1',
features = ['f1', 'f2', 'f3'],
alias={"f1":"f1_1"} # Field alias, which will eventually produce the field name of f1_1
)
Training Set
When we want to train the model, we must first construct a sample table. The sample table is composed of label data and feature data. When interacting with FS, the label data needs to be provided by the customer, and the feature name to be obtained needs to be defined, and then point-in-time join is performed according to the primary key (if event_time exists)
label_ds = MaxComputeDataSource(data_source_id=offline_datasource_id, table='fs_movie_6_feature_view_ratings_offline')
output_ds = MaxComputeDataSource(data_source_id=offline_datasource_id)
label_input = LabelInput(label_ds, event_time='rating_time')
train_set_output = TrainingSetOutput(output_ds)
feature_view_movie_name = "feature_view_movie"
feature_movie_selector = FeatureSelector(feature_view_movie_name, ['name', 'actores', 'regions','tags'])
feature_view_user_name = 'feature_view_users_1'
feature_user_selector = FeatureSelector(feature_view_user_name, ['user_nickname'])
train_set = project.create_training_set(label_input, train_set_output, [feature_movie_selector, feature_user_selector])
print("train_set = ", train_set)
model
From the perspective of offlinestore, we finally train the model and turn it into a service for business prediction. Then the training samples can be obtained from the above TrainingSet, followed by model training, and finally deployed as a service.
model_name = "fs_model_movie_rating_3"
owner = 'yancheng'
deploy_config = EASDeployConfig(ak_id='',region='',config='')
cur_model = project. get_model(model_name)
if cur_model is None:
cur_model = project.create_model(model_name, owner, train_set, deploy_config)
print("cur_model_train_set_table_name = ", cur_model.train_set_table_name)
export sample table
During actual training, we need to export the sample table
Specify the label table and the partition of each feature view, event_time
label_partitions = PartitionConfig(name = 'dt', value = '20220831')
label_input_config = LabelInputConfig(partition_config=label_partitions, event_time='1999-01-00 00:00:00')
movie_partitions = PartitionConfig(name = 'dt', value = '20220830')
feature_view_movie_config = FeatureViewConfig(name = 'feature_view_movie', partition_config=movie_partitions)
user_partitions = PartitionConfig(name = 'ds', value = '20220830')
feature_view_user_config = FeatureViewConfig(name = 'feature_view_users_1', partition_config=user_partitions)
feature_view_config_list = [feature_view_movie_config, feature_view_user_config]
train_set_partitions = PartitionConfig(name = 'dt', value = '20220831')
train_set_output_config = TrainSetOutputConfig(partition_config=train_set_partitions)
According to the specified conditions, export the sample table
task = cur_model.export_train_set(label_input_config, feature_view_config_list, train_set_output_config)
task. wait()
print("task = ", task.task_summary)

Related Articles

Explore More Special Offers

  1. Short Message Service(SMS) & Mail Service

    50,000 email package starts as low as USD 1.99, 120 short messages start at only USD 1.00

phone Contact Us