本文為您介紹通過FeatureStore整合資料特徵並進行模型離線訓練,以及後續的上線服務作業流程。
背景資訊
特徵平台是一個用於生產、共用和管理機器學習模型特徵的存放庫,支援多人多團隊高效協作,確保離線與線上特徵的一致性,並提供快速的線上特徵訪問能力。它適用於各類需要特徵的情境,典型情境如推薦情境。通過自動構建和管理線上與離線特徵表,保證資料一致性,同時避免重複儲存,降低資源成本。複雜操作如SQL匯出訓練表或資料匯入Hologres等,均可通過一行程式碼完成,大幅節省時間成本。
目前,特徵平台已封裝從特徵到模型的完整流程,離線支援MaxCompute,線上支援FeatureDB、Hologres和TableStore等平台,開發人員無需深入瞭解各平台細節,所有操作可通過控制台或Python SDK完成,從而提升團隊效率並規避離線與線上不一致等問題。此外,平台已深度整合EasyRec,可高效完成特徵產生(FG)與模型訓練,並直接部署到線上,協助使用者快速搭建前沿推薦系統,取得良好效果。
如在使用過程中遇到任何問題,請通過DingTalk群(32260796)搜尋加入諮詢。
前提條件
準備工作
本文中的代碼建議在DSW執行個體中運行。
在Python 3環境下,執行以下命令安裝特徵平台的Python SDK。
! pip install https://feature-store-py.oss-cn-beijing.aliyuncs.com/package/feature_store_py-2.0.2-py3-none-any.whl為了降低敏感資訊泄露的風險,建議通過環境變數的方式傳入阿里雲賬戶的
AccessKey ID和AccessKey Secret。單擊頂部功能表列的Terminal,進入終端介面。
執行以下命令,將
YOUR_AccessKey_ID替換為您的實際AccessKey ID。echo "export AccessKeyID='YOUR_AccessKey_ID'" >> ~/.bashrc source ~/.bashrc執行以下命令,將
YOUR_Access_Key_Secret替換為您的實際AccessKey Secret。echo "export AccessKeySecret='YOUR_Access_Key_Secret'" >> ~/.bashrc source ~/.bashrc
執行以下命令,匯入所需的功能模組。
import unittest import sys import os from os.path import dirname, join, abspath from feature_store_py.fs_client import FeatureStoreClient from feature_store_py.fs_project import FeatureStoreProject from feature_store_py.fs_datasource import UrlDataSource, MaxComputeDataSource, DatahubDataSource, HologresDataSource, SparkDataSource, LabelInput, TrainingSetOutput from feature_store_py.fs_type import FSTYPE from feature_store_py.fs_schema import OpenSchema, OpenField from feature_store_py.fs_feature_view import FeatureView from feature_store_py.fs_features import FeatureSelector from feature_store_py.fs_config import EASDeployConfig, LabelInputConfig, PartitionConfig, FeatureViewConfig, TrainSetOutputConfig, SequenceFeatureConfig, SequenceTableConfig import logging logger = logging.getLogger("foo") logger.addHandler(logging.StreamHandler(stream=sys.stdout))
資料集介紹
開源電影資料集樣本Moviedata,主要使用Movie、User和Rating三份資料,分別對應推薦流程中的物料表、使用者表和label表。
配置特徵專案
您可以通過特徵平台建立多重專案空間,每個專案空間是獨立的。運行notebook需要FeatureStore服務端配合運行,開通特徵平台後需要配置資料來源。
其中,offline_datasource_id指離線資料來源ID,online_datasource_id指線上資料來源ID。
此處以專案名稱是fs_movie為例進行說明。
# 輸入您阿里雲賬戶的access_key_id
access_id = os.getenv("AccessKeyID")
# 輸入您阿里雲賬戶的access_key_secret
access_ak = os.getenv("AccessKeySecret")
# 輸入您開通特徵平台所在地區,此處以華東1(杭州)為例
region = 'cn-hangzhou'
fs = FeatureStoreClient(access_key_id=access_id, access_key_secret=access_ak, region=region)
# 輸入您特徵平台的專案名,此處以fs_movie為例
cur_project_name = "fs_movie"
project = fs.get_project(cur_project_name)
if project is None:
raise ValueError("Need to create project : fs_movie")運行以下代碼擷取當前專案並列印其資訊。
project = fs.get_project(cur_project_name)
print(project)配置特徵實體(FeatureEntity)
特徵實體描述了一組相關的特徵集合。多個特徵視圖可以關聯一個特徵實體,每個實體都會有一個JoinId,通過JoinId可以關聯多個特徵視圖特徵。每一個特徵視圖都有一個主鍵(索引鍵)來擷取它的特徵資料,但是索引鍵名稱可以和JoinId不同。
參考如下樣本,建立Movie、User和Rating三個實體。
cur_entity_name_movie = "movie_data"
join_id = 'movie_id'
entity_movie = project.get_entity(cur_entity_name_movie)
if entity_movie is None:
entity_movie = project.create_entity(name = cur_entity_name_movie, join_id=join_id)
entity_movie.print_summary()cur_entity_name_user = "user_data"
join_id = 'user_md5'
entity_user = project.get_entity(cur_entity_name_user)
if entity_user is None:
entity_user = project.create_entity(name = cur_entity_name_user, join_id=join_id)
entity_user.print_summary()cur_entity_name_ratings = "rating_data"
join_id = 'rating_id'
entity_ratings = project.get_entity(cur_entity_name_ratings)
if entity_ratings is None:
entity_ratings = project.create_entity(name = cur_entity_name_ratings, join_id=join_id)
entity_ratings.print_summary()配置特徵視圖(FeatureView)
特徵平台用於管理和組織特徵資料,外部資料需要通過特徵視圖進入特徵平台。特徵視圖定義了資料來源(DataSource)、預先處理或轉換操作(如特徵工程/Transformation)、特徵資料結構(包含特徵名稱和類型在內的特徵schema)、資料存放區位置(OnlineStore/OfflineStore),並提供特徵元資訊管理,如主鍵、事件時間、分區鍵、特徵實體以及有效期間設定ttl(預設-1,表示永久有效,正數則表示線上查詢時會取ttl內的最新特徵資料)。
特徵視圖分為三種類型:
BatchFeatureView:離線特徵或T-1天特徵。將離線資料注入到特徵平台的離線資料來源中,並可以根據需求同步至線上資料來源以支援即時查詢。
StreamFeatureView:即時特徵。直接將資料寫入離線資料來源,並同時同步到線上資料來源。
Sequence FeatureView:序列特徵。支援離線寫入序列特徵,以及查詢和讀取即時序列特徵。
BatchFeatureView
如果資料存在於CSV檔案中,通過URL下載寫入到MaxCompute,定義的FeatureView的schema需要手動建立。
path = 'https://feature-store-test.oss-cn-beijing.aliyuncs.com/dataset/moviedata_all/movies.csv'
delimiter = ','
omit_header = True
ds = UrlDataSource(path, delimiter, omit_header)
print(ds)schema定義了欄位的名稱和類型。
movie_schema = OpenSchema(
OpenField(name='movie_id', type='STRING'),
OpenField(name='name', type='STRING'),
OpenField(name='alias', type='STRING'),
OpenField(name='actores', type='STRING'),
OpenField(name='cover', type='STRING'),
OpenField(name='directors', type='STRING'),
OpenField(name='double_score', type='STRING'),
OpenField(name='double_votes', type='STRING'),
OpenField(name='genres', type='STRING'),
OpenField(name='imdb_id', type='STRING'),
OpenField(name='languages', type='STRING'),
OpenField(name='mins', type='STRING'),
OpenField(name='official_site', type='STRING'),
OpenField(name='regions', type='STRING'),
OpenField(name='release_data', type='STRING'),
OpenField(name='slug', type='STRING'),
OpenField(name='story', type='STRING'),
OpenField(name='tags', type='STRING'),
OpenField(name='year', type='STRING'),
OpenField(name='actor_ids', type='STRING'),
OpenField(name='director_ids', type='STRING'),
OpenField(name='dt', type='STRING')
)
print(movie_schema)建立batch_feature_view。
feature_view_movie_name = "feature_view_movie"
batch_feature_view = project.get_feature_view(feature_view_movie_name)
if batch_feature_view is None:
batch_feature_view = project.create_batch_feature_view(name=feature_view_movie_name, schema=movie_schema, online = True, entity= cur_entity_name_movie, primary_key='movie_id', partitions=['dt'], ttl=-1)batch_feature_view = project.get_feature_view(feature_view_movie_name)
batch_feature_view.print_summary()資料寫入MaxCompute表。
cur_task = batch_feature_view.write_table(ds, partitions={'dt':'20220830'})
cur_task.wait()查看當前task的資訊。
print(cur_task.task_summary)資料同步到OnlineStore中。
cur_task = batch_feature_view.publish_table({'dt':'20220830'})
cur_task.wait()print(cur_task.task_summary)擷取對應的FeatureView。
batch_feature_view = project.get_feature_view(feature_view_movie_name)列印該FeatureView的資訊。
batch_feature_view.print_summary()我們按此順序,依次匯入users表,ratings表。
users_path = 'https://feature-store-test.oss-cn-beijing.aliyuncs.com/dataset/moviedata_all/users.csv'
ds = UrlDataSource(users_path, delimiter, omit_header)
print(ds)user_schema = OpenSchema(
OpenField(name='user_md5', type='STRING'),
OpenField(name='user_nickname', type='STRING'),
OpenField(name='ds', type='STRING')
)
print(user_schema)feature_view_user_name = "feature_view_users"
batch_feature_view = project.get_feature_view(feature_view_user_name)
if batch_feature_view is None:
batch_feature_view = project.create_batch_feature_view(name=feature_view_user_name, schema=user_schema, online = True, entity= cur_entity_name_user, primary_key='user_md5',ttl=-1, partitions=['ds'])write_table_task = batch_feature_view.write_table(ds, {'ds':'20220830'})
write_table_task.wait()
print(write_table_task.task_summary)cur_task = batch_feature_view.publish_table({'ds':'20220830'})
cur_task.wait()print(cur_task.task_summary)batch_feature_view = project.get_feature_view(feature_view_user_name)
batch_feature_view.print_summary()ratings_path = 'https://feature-store-test.oss-cn-beijing.aliyuncs.com/dataset/moviedata_all/ratings.csv'
ds = UrlDataSource(ratings_path, delimiter, omit_header)
print(ds)ratings_schema = OpenSchema(
OpenField(name='rating_id', type='STRING'),
OpenField(name='user_md5', type='STRING'),
OpenField(name='movie_id', type='STRING'),
OpenField(name='rating', type='STRING'),
OpenField(name='rating_time', type='STRING'),
OpenField(name='dt', type='STRING')
)feature_view_rating_name = "feature_view_ratings"
batch_feature_view = project.get_feature_view(feature_view_rating_name)
if batch_feature_view is None:
batch_feature_view = project.create_batch_feature_view(name=feature_view_rating_name, schema=ratings_schema, online = True, entity= cur_entity_name_ratings, primary_key='rating_id', event_time='rating_time', partitions=['dt'])cur_task = batch_feature_view.write_table(ds, {'dt':'20220831'})
cur_task.wait()print(cur_task.task_summary)batch_feature_view = project.get_feature_view(feature_view_rating_name)
batch_feature_view.print_summary()StreamFeatureView
自訂如下schema。
online_schema = OpenSchema(
OpenField(name='id', type='STRING'),
OpenField(name='count_value', type='INT64'),
OpenField(name='metric_value', type='DOUBLE')
)複製下面的 SQL 放在您的 project 中的 MaxCompute 或者 DataWorks 執行,產生一份測試資料(該資料無具體含義,僅供測試功能使用)。
CREATE TABLE IF NOT EXISTS online_stream_test_t1 (
id STRING COMMENT '標識ID',
count_value BIGINT COMMENT '統計時間',
metric_value DOUBLE COMMENT '目標值'
)
PARTITIONED BY (
ds string COMMENT '業務日期'
)
LIFECYCLE 365
;
INSERT INTO TABLE online_stream_test_t1 PARTITION (ds='20250815')
SELECT
CONCAT('str_', CAST(id AS STRING)) AS id,
CAST(FLOOR(RAND() * 1000000) AS BIGINT) AS count_value,
ROUND(RAND() * 1000, 2) AS metric_value
FROM (
SELECT SEQUENCE(1, 1000) AS id_list
) tmp
LATERAL VIEW EXPLODE(id_list) table_tmp AS id;運行成功即建立即時特徵表online_stream_test_t1並成功同步ds=20250815分區的資料至表中。
建立stream_feature_view。
feature_view_rating_name_stream = "feature_view_online_stream"
stream_feature_view = project.get_feature_view(feature_view_rating_name_stream)
if stream_feature_view is None:
stream_feature_view = project.create_stream_feature_view(name=feature_view_rating_name_stream, schema=online_schema,
online=True, entity=cur_entity_name_user,
primary_key='id', event_time='count_value')即時FeatureView中配置的event_time(事件時間)欄位有特殊含義,即配置事件時間欄位後會根據該欄位的值來清理到期資料,具體參考:即時特徵資料生命週期設定。
列印該Featureview資訊。
stream_feature_view = project.get_feature_view(feature_view_rating_name_stream)
stream_feature_view.print_summary()資料同步到OnlineStore中。
# offline_datasource_id更改為您的fs專案的離線資料來源ID
# table_name即離線要推送到線上的特徵表
stream_task = stream_feature_view.publish_table(partitions={'ds': '20250815'}, mode='Merge', offline_to_online=True,
publish_config={'offline_datasource_id': project.offline_datasource_id,
'table_name': 'online_stream_test_t1'})
stream_task.wait()
print(stream_task.task_summary)Sequence FeatureView
資料表放在有公開讀取許可權的 pai_online_project 中,可以根據下面的操作,將下面的 SQL 放在對應 project 中的 MaxCompute 或者 DataWorks 執行,將序列特徵資料表複製到自己的 MaxCompute 或 Dataworks project 中。
CREATE TABLE IF NOT EXISTS rec_sln_demo_behavior_table_preprocess_sequence_wide_seq_feature_v3
like pai_online_project.rec_sln_demo_behavior_table_preprocess_sequence_wide_seq_feature_v3
STORED AS ALIORC
LIFECYCLE 90;
INSERT OVERWRITE TABLE rec_sln_demo_behavior_table_preprocess_sequence_wide_seq_feature_v3 PARTITION(ds)
SELECT *
FROM pai_online_project.rec_sln_demo_behavior_table_preprocess_sequence_wide_seq_feature_v3
WHERE ds >= '20231022' and ds <='20231024'運行成功即建立序列特徵表rec_sln_demo_behavior_table_preprocess_sequence_wide_seq_feature_v3並成功同步ds=20231022、20231023、20231024這三個分區的資料至表中。
建立seq_feature_view。
user_entity_name = "user"
seq_feature_view_name = "wide_seq_feature_v3"
seq_feature_view = project.get_feature_view(seq_feature_view_name)
if seq_feature_view is None:
seq_table_name = "rec_sln_demo_behavior_table_preprocess_sequence_wide_seq_feature_v3"
behavior_table_name = 'rec_sln_demo_behavior_table_preprocess_v3'
ds = MaxComputeDataSource(project.offline_datasource_id, behavior_table_name)
event_time = 'event_unix_time' # 行為表中的 event time 欄位名
item_id = 'item_id' # 行為表中的 item_id 欄位名
event = 'event' # 行為表中的 event 欄位名
# deduplication_method = 1, 表示 ['user_id', 'item_id', 'event'] 去重
# deduplication_method = 2, 表示 ['user_id', 'item_id', 'event', 'event_time'] 去重
sequence_feature_config_list = [
SequenceFeatureConfig(offline_seq_name='click_seq_50_seq', seq_event='click', online_seq_name='click_seq_50',
seq_len=50)]
# offline_seq_name 指的是離線序列表中序列特徵欄位名,seq_event 指的是行為欄位名,online_seq_name 指的是特徵平台線上 Go SDK 查出該 user 對應的行為序列 item_ids
# 後,會以該名稱命名。 seq_len 指的是序列長度,大於該長度的序列會被截斷。
seq_table_config = SequenceTableConfig(table_name=seq_table_name, primary_key='user_id',
event_time='event_unix_time')
seq_feature_view = project.create_sequence_feature_view(seq_feature_view_name, datasource=ds,
event_time=event_time, item_id=item_id, event=event,
deduplication_method=1,
sequence_feature_config=sequence_feature_config_list,
sequence_table_config=seq_table_config,
entity=user_entity_name)列印該Featureview資訊。
seq_feature_view.print_summary()資料同步到OnlineStore中。
seq_task = seq_feature_view.publish_table({'ds': '20231023'}, days_to_load=30)
seq_task.wait()
seq_task.print_summary()label表註冊。
label_table_name = 'fs_movie_feature_view_ratings_offline'
ds = MaxComputeDataSource(data_source_id=project.offline_datasource_id, table=label_table_name)
label_table = project.get_label_table(label_table_name)
if label_table is None:
label_table = project.create_label_table(datasource=ds, event_time='rating_time')配置離線資料來源(Offlinestore)
離線特徵資料存放區的資料倉儲,在MaxCompute或DS上的HDFS,通過Spark進行資料寫入。通過離線資料來源可以產生樣本資料TrainingSet,用於模型訓練;也可以產生batch prediction資料,用於批量預測。
配置線上資料來源(Onlinestore)
線上預測時,需要低延遲擷取特徵資料,線上資料來源提供線上特徵資料的儲存。目前優先支援FeatureDB、Hologres和Tablestore。
擷取線上特徵
從特徵視圖的角度擷取線上特徵,目前優先支援FeatureDB。(FeatureDB相關文檔可參考:FeatureDB概述)
feature_view_movie_name = "feature_view_movie"
batch_feature_view = project.get_feature_view(feature_view_movie_name)
ret_features_1 = batch_feature_view.list_feature_view_online_features(join_ids=['26357307'])
print("ret_features1 = ", ret_features_1)
ret_features_2 = batch_feature_view.list_feature_view_online_features(join_ids=['30444960', '3317352'])
print("ret_features2 = ", ret_features_2)配置FeatureSelector
從離線資料來源或線上資料來源擷取特徵時,需要明確指出應該擷取哪些特徵。可以從特徵視圖的角度選擇特徵。
feature_view_name = 'feature_view_movie'
# 選擇部分特徵
feature_selector = FeatureSelector(feature_view_name, ['site_id', 'site_category'])
#選擇全部特徵
feature_selector = FeatureSelector(feature_view_name, '*')
# 支援別名
feature_selector = FeatureSelector(
feature_view='user1',
features = ['f1','f2', 'f3'],
alias={"f1":"f1_1"} # 欄位別名,最終會產出 f1_1 的欄位名稱
)配置樣本表(TrainingSet)
訓練模型時,首先要構造樣本表,樣本表由Label資料和特徵資料群組成。在與FeatureStore互動時,Label資料需要由客戶提供,並且需要定義要擷取的特徵名稱,然後根據主鍵進行point-in-time join(存在event_time的情況下)。
label_table_name = 'fs_movie_feature_view_ratings_offline'
output_ds = MaxComputeDataSource(data_source_id=project.offline_datasource_id)
train_set_output = TrainingSetOutput(output_ds)feature_view_movie_name = "feature_view_movie"
feature_movie_selector = FeatureSelector(feature_view_movie_name, ['name', 'actores', 'regions','tags'])
feature_view_user_name = 'feature_view_users'
feature_user_selector = FeatureSelector(feature_view_user_name, ['user_nickname'])
train_set = project.create_training_set(label_table_name=label_table_name, train_set_output= train_set_output, feature_selectors=[feature_movie_selector, feature_user_selector])
print("train_set = ", train_set)訓練模型(Model)
訓練模型並部署成服務後,進行業務預測。其中,訓練樣本可以從上文的train_set獲得。
model_name = "fs_rank_v1"
cur_model = project.get_model(model_name)
if cur_model is None:
cur_model = project.create_model(model_name, train_set)
print("cur_model_train_set_table_name = ", cur_model.train_set_table_name)匯出樣本表
實際訓練時,需要匯出樣本表。指定Label表以及各個特徵視圖的分區、event_time。
label_partitions = PartitionConfig(name = 'dt', value = '20220831')
label_input_config = LabelInputConfig(partition_config=label_partitions, event_time='1999-01-00 00:00:00')
movie_partitions = PartitionConfig(name = 'dt', value = '20220830')
feature_view_movie_config = FeatureViewConfig(name = 'feature_view_movie', partition_config=movie_partitions)
user_partitions = PartitionConfig(name = 'ds', value = '20220830')
feature_view_user_config = FeatureViewConfig(name = 'feature_view_users', partition_config=user_partitions)
feature_view_config_list = [feature_view_movie_config, feature_view_user_config]
train_set_partitions = PartitionConfig(name = 'dt', value = '20220831')
train_set_output_config = TrainSetOutputConfig(partition_config=train_set_partitions)根據指定的條件,匯出樣本表。
task = cur_model.export_train_set(label_input_config, feature_view_config_list, train_set_output_config)
task.wait()print(task.summary)