このトピックでは、Python 用 FeatureStore SDK を使用して、オフラインモデルのトレーニングとオンライン推論のために特徴を統合および提供する方法について説明します。
背景情報
FeatureStore は、機械学習モデルの特徴を作成、共有、および管理するための中心となるものです。効率的なチームワークを促進し、オフラインとオンラインの特徴間の一貫性を保証し、オンライン特徴への迅速なアクセスを提供します。レコメンデーションシステムなど、特徴に依存するさまざまなユースケースに最適です。 FeatureStore は、オンラインおよびオフラインの特徴テーブルの構築とメンテナンスを自動化し、データの一貫性を確保し、冗長なストレージを排除してリソースの費用を削減します。わずか 1 行のコードで、SQL を使用したトレーニングテーブルのエクスポートや Hologres へのデータのインポートなどの複雑なタスクを実行できます。
FeatureStore は、特徴作成からモデル開発までのプロセス全体をシームレスに統合します。オフラインタスクには MaxCompute をサポートし、オンライン操作には FeatureDB、Hologres、および Tablestore をサポートしています。これらのプラットフォームについて学習する必要なく、コンソールまたは Python SDK を介して必要なすべての操作を実行できます。これにより、チームの生産性が向上し、オフライン環境とオンライン環境全体の一貫性が確保されます。さらに、FeatureStore は、効率的な EasyRec と緊密に統合されており、効率的な 特徴生成 (FG) とモデルトレーニング、および直接オンラインデプロイメントを実現します。これにより、最先端のレコメンデーションシステムを迅速に開発できます。
サービスの使用中に問題が発生した場合は、DingTalk グループ ( 32260796 ) に参加してください。
前提条件
始める前に
DSW インスタンス でこのトピックのコードを実行することをお勧めします。
Python 3 環境に Python SDK をインストールします。
! pip install https://feature-store-py.oss-cn-beijing.aliyuncs.com/package/feature_store_py-1.3.1-py3-none-any.whl機密情報の漏洩のリスクを最小限に抑えるために、Alibaba Cloud アカウントの
AccessKey IDとAccessKey Secretを環境変数として送信することをお勧めします。トップメニューバーから [ターミナル] をクリックします。
次のコマンドを実行します。
YOUR_AccessKey_IDを実際の AccessKey ID に置き換えます。echo "export AccessKeyID='YOUR_AccessKey_ID'" >> ~/.bashrc source ~/.bashrc次のコマンドを実行します。
YOUR_Access_Key_Secretを実際の AccessKey Secret に置き換えます。echo "export AccessKeySecret='YOUR_Access_Key_Secret'" >> ~/.bashrc source ~/.bashrc
必要なモジュールをインポートします。
import unittest import sys import os from os.path import dirname, join, abspath from feature_store_py.fs_client import FeatureStoreClient from feature_store_py.fs_project import FeatureStoreProject from feature_store_py.fs_datasource import UrlDataSource, MaxComputeDataSource, DatahubDataSource, HologresDataSource, SparkDataSource, LabelInput, TrainingSetOutput from feature_store_py.fs_type import FSTYPE from feature_store_py.fs_schema import OpenSchema, OpenField from feature_store_py.fs_feature_view import FeatureView from feature_store_py.fs_features import FeatureSelector from feature_store_py.fs_config import EASDeployConfig, LabelInputConfig, PartitionConfig, FeatureViewConfig, TrainSetOutputConfig, SequenceFeatureConfig, SequenceTableConfig import logging logger = logging.getLogger("foo") logger.addHandler(logging.StreamHandler(stream=sys.stdout))
サンプルデータセット
このトピックでは、オープンソースのデータセットである Moviedata を例として使用します。Movie、User、および Rating が使用され、レコメンデーションプロセスにおけるアイテム、ユーザー、およびラベルテーブルに対応します。
FeatureStore プロジェクトの構成
FeatureStore では、複数の独立したプロジェクトを作成できます。ノートブックを実行するには、まず FeatureStore で データストアを構成する必要があります。
ここで、offline_datasource_id はオフラインデータストアの ID です。online_datasource_id はオンラインデータストアの ID です。
次の例では、プロジェクト名は fs_movie です。
# Alibaba Cloud アカウントの access_key_id を入力します
access_id = os.getenv("AccessKeyID")
# Alibaba Cloud アカウントの access_key_secret を入力します
access_ak = os.getenv("AccessKeySecret")
# FeatureStore をアクティブ化したリージョンを入力します。この例では、中国 (杭州) リージョンが使用されています
region = 'cn-hangzhou'
fs = FeatureStoreClient(access_key_id=access_id, access_key_secret=access_ak, region=region)
# 特徴プラットフォームプロジェクトの名前を入力します。この例では、fs_movie が使用されています
cur_project_name = "fs_movie"
project = fs.get_project(cur_project_name)
if project is None:
raise ValueError("プロジェクトを作成する必要があります : fs_movie")プロジェクト情報を取得して出力します。
project = fs.get_project(cur_project_name)
print(project)特徴エンティティの構成
特徴エンティティは、意味的に関連する特徴のセットです。各特徴エンティティは、複数の特徴ビューに関連付けることができます。各エンティティには、さまざまな特徴ビューからの特徴を関連付けるための JoinId があります。各特徴ビューには、特徴を取得するためのプライマリキーがありますが、プライマリキーの名前は JoinId と異なる場合があります。
movie、user、rating の 3 つのエンティティを作成します。
cur_entity_name_movie = "movie_data"
join_id = 'movie_id'
entity_movie = project.get_entity(cur_entity_name_movie)
if entity_movie is None:
entity_movie = project.create_entity(name = cur_entity_name_movie, join_id=join_id)
entity_movie.print_summary()cur_entity_name_user = "user_data"
join_id = 'user_md5'
entity_user = project.get_entity(cur_entity_name_user)
if entity_user is None:
entity_user = project.create_entity(name = cur_entity_name_user, join_id=join_id)
entity_user.print_summary()cur_entity_name_ratings = "rating_data"
join_id = 'rating_id'
entity_ratings = project.get_entity(cur_entity_name_ratings)
if entity_ratings is None:
entity_ratings = project.create_entity(name = cur_entity_name_ratings, join_id=join_id)
entity_ratings.print_summary()特徴ビューの構成
FeatureStore は、特徴データを管理および編成します。外部データは、特徴ビューを通じてプラットフォームに導入されます。特徴ビューは、データストア、前処理または変換操作、データ構造、ストレージの場所、および特徴メタデータ管理を定義します。これには、プライマリキー、イベント時間、パーティションキー、特徴エンティティ、および存続時間 ( TTL ) 設定が含まれます。デフォルト値 -1 は、オンラインデータストアがすべての特徴データを保持することを指定します。正の値は、オンラインデータストアが指定された期間内の最新の機能データのみを保持することを指定します。
特徴ビューには次のタイプがあります。
BatchFeatureView: オフラインまたは T-1 日の特徴。オフラインデータはオフラインデータストアに挿入され、リアルタイムクエリのためにオンラインデータストアに同期できます。
StreamFeatureView: リアルタイムの特徴。データはオフラインデータストアに直接書き込まれ、オンラインデータストアに同期されます。
Sequence FeatureView: シーケンス特徴。オフライン書き込みとオンラインリアルタイム読み取りが可能です。
BatchFeatureView
CSV ファイルに格納されているデータの場合、CSV ファイルの URL を指定して MaxCompute にアップロードします。特徴ビューのスキーマを手動で作成する必要があります。
path = 'https://feature-store-test.oss-cn-beijing.aliyuncs.com/dataset/moviedata_all/movies.csv'
delimiter = ','
omit_header = True
ds = UrlDataSource(path, delimiter, omit_header)
print(ds)schema は、各フィールドの名前とタイプを指定します。
movie_schema = OpenSchema(
OpenField(name='movie_id', type='STRING'),
OpenField(name='name', type='STRING'),
OpenField(name='alias', type='STRING'),
OpenField(name='actors', type='STRING'),
OpenField(name='cover', type='STRING'),
OpenField(name='directors', type='STRING'),
OpenField(name='double_score', type='STRING'),
OpenField(name='double_votes', type='STRING'),
OpenField(name='genres', type='STRING'),
OpenField(name='imdb_id', type='STRING'),
OpenField(name='languages', type='STRING'),
OpenField(name='mins', type='STRING'),
OpenField(name='official_site', type='STRING'),
OpenField(name='regions', type='STRING'),
OpenField(name='release_date', type='STRING'),
OpenField(name='slug', type='STRING'),
OpenField(name='story', type='STRING'),
OpenField(name='tags', type='STRING'),
OpenField(name='year', type='STRING'),
OpenField(name='actor_ids', type='STRING'),
OpenField(name='director_ids', type='STRING'),
OpenField(name='dt', type='STRING')
)
print(movie_schema)バッチ特徴ビューを作成します。
feature_view_movie_name = "feature_view_movie"
batch_feature_view = project.get_feature_view(feature_view_movie_name)
if batch_feature_view is None:
batch_feature_view = project.create_batch_feature_view(name=feature_view_movie_name, schema=movie_schema, online = True, entity= cur_entity_name_movie, primary_key='movie_id', partitions=['dt'], ttl=-1)batch_feature_view = project.get_feature_view(feature_view_movie_name)
batch_feature_view.print_summary()MaxCompute テーブルにデータを書き込みます。
cur_task = batch_feature_view.write_table(ds, partitions={'dt':'20220830'})
cur_task.wait()現在のタスクに関する情報を表示します。
print(cur_task.task_summary)データをオンラインデータストアに同期します。
cur_task = batch_feature_view.publish_table({'dt':'20220830'})
cur_task.wait()print(cur_task.task_summary)特徴ビューを取得します。
batch_feature_view = project.get_feature_view(feature_view_movie_name)特徴ビュー情報を出力します。
batch_feature_view.print_summary()user テーブルと ratings テーブルを順番にインポートします。
users_path = 'https://feature-store-test.oss-cn-beijing.aliyuncs.com/dataset/moviedata_all/users.csv'
ds = UrlDataSource(users_path, delimiter, omit_header)
print(ds)user_schema = OpenSchema(
OpenField(name='user_md5', type='STRING'),
OpenField(name='user_nickname', type='STRING'),
OpenField(name='ds', type='STRING')
)
print(user_schema)feature_view_user_name = "feature_view_users"
batch_feature_view = project.get_feature_view(feature_view_user_name)
if batch_feature_view is None:
batch_feature_view = project.create_batch_feature_view(name=feature_view_user_name, schema=user_schema, online = True, entity= cur_entity_name_user, primary_key='user_md5',ttl=-1, partitions=['ds'])write_table_task = batch_feature_view.write_table(ds, {'ds':'20220830'})
write_table_task.wait()
print(write_table_task.task_summary)cur_task = batch_feature_view.publish_table({'ds':'20220830'})
cur_task.wait()print(cur_task.task_summary)batch_feature_view = project.get_feature_view(feature_view_user_name)
batch_feature_view.print_summary()ratings_path = 'https://feature-store-test.oss-cn-beijing.aliyuncs.com/dataset/moviedata_all/ratings.csv'
ds = UrlDataSource(ratings_path, delimiter, omit_header)
print(ds)ratings_schema = OpenSchema(
OpenField(name='rating_id', type='STRING'),
OpenField(name='user_md5', type='STRING'),
OpenField(name='movie_id', type='STRING'),
OpenField(name='rating', type='STRING'),
OpenField(name='rating_time', type='STRING'),
OpenField(name='dt', type='STRING')
)feature_view_rating_name = "feature_view_ratings"
batch_feature_view = project.get_feature_view(feature_view_rating_name)
if batch_feature_view is None:
batch_feature_view = project.create_batch_feature_view(name=feature_view_rating_name, schema=ratings_schema, online = True, entity= cur_entity_name_ratings, primary_key='rating_id', event_time='rating_time', partitions=['dt'])cur_task = batch_feature_view.write_table(ds, {'dt':'20220831'})
cur_task.wait()print(cur_task.task_summary)batch_feature_view = project.get_feature_view(feature_view_rating_name)
batch_feature_view.print_summary()StreamFeatureView
スキーマは次のように定義できます。
online_schema = OpenSchema(
OpenField(name='id', type='STRING'),
OpenField(name='count_value', type='INT64'),
OpenField(name='metric_value', type='DOUBLE')
)次の SQL 文をコピーして、プロジェクト内の MaxCompute または DataWorks で実行できます。この文はテストデータを生成します。このデータはテスト目的のみのものであり、特定の意味はありません。
CREATE TABLE IF NOT EXISTS online_stream_test_t1 (
id STRING COMMENT 'ID',
count_value BIGINT COMMENT 'カウント値',
metric_value DOUBLE COMMENT 'メトリック値'
)
PARTITIONED BY (
ds string COMMENT 'データのタイムスタンプ'
)
LIFECYCLE 365
;
INSERT INTO TABLE online_stream_test_t1 PARTITION (ds='20250815')
SELECT
CONCAT('str_', CAST(id AS STRING)) AS id,
CAST(FLOOR(RAND() * 1000000) AS BIGINT) AS count_value,
ROUND(RAND() * 1000, 2) AS metric_value
FROM (
SELECT SEQUENCE(1, 1000) AS id_list
) tmp
LATERAL VIEW EXPLODE(id_list) table_tmp AS id;文が正常に実行されると、online_stream_test_t1 リアルタイム特徴テーブルが作成され、ds=20250815 パーティションのデータがこのテーブルに同期されます。
新しい stream_feature_view を作成できます。
feature_view_rating_name_stream = "feature_view_online_stream"
stream_feature_view = project.get_feature_view(feature_view_rating_name_stream)
if stream_feature_view is None:
stream_feature_view = project.create_stream_feature_view(name=feature_view_rating_name_stream, schema=online_schema,
online=True, entity=cur_entity_name_user,
primary_key='id', event_time='count_value')StreamFeatureView の event_time フィールドには特別な目的があります。このフィールドが構成されている場合、その値は期限切れのデータをクリーンアップするために使用されます。詳細については、「リアルタイム特徴データの存続時間を設定する」をご参照ください。
特徴ビューに関する情報を出力できます。
stream_feature_view = project.get_feature_view(feature_view_rating_name_stream)
stream_feature_view.print_summary()データをオンラインストアに同期できます。
# offline_datasource_id を FeatureStore プロジェクトのオフラインストア ID に変更します。
# table_name は、オンラインストアにプッシュするオフライン特徴テーブルです。
stream_task = stream_feature_view.publish_table(partitions={'ds': '20250815'}, mode='Merge', offline_to_online=True,
publish_config={'offline_datasource_id': project.offline_datasource_id,
'table_name': 'online_stream_test_t1'})
stream_task.wait()
print(stream_task.task_summary)Sequence FeatureView
ソースデータテーブルは、パブリック読み取り権限を持つ pai_online_project にあります。プロジェクト内の MaxCompute または DataWorks で次の SQL 文を実行して、シーケンス特徴データテーブルを自分のプロジェクトにコピーできます。
CREATE TABLE IF NOT EXISTS rec_sln_demo_behavior_table_preprocess_sequence_wide_seq_feature_v3
like pai_online_project.rec_sln_demo_behavior_table_preprocess_sequence_wide_seq_feature_v3
STORED AS ALIORC
LIFECYCLE 90;
INSERT OVERWRITE TABLE rec_sln_demo_behavior_table_preprocess_sequence_wide_seq_feature_v3 PARTITION(ds)
SELECT *
FROM pai_online_project.rec_sln_demo_behavior_table_preprocess_sequence_wide_seq_feature_v3
WHERE ds >= '20231022' and ds <='20231024'文が正常に実行されると、rec_sln_demo_behavior_table_preprocess_sequence_wide_seq_feature_v3 シーケンス特徴テーブルが作成され、ds=20231022、ds=20231023、および ds=20231024 パーティションのデータがこのテーブルに同期されます。
新しい seq_feature_view を作成できます。
user_entity_name = "user"
seq_feature_view_name = "wide_seq_feature_v3"
seq_feature_view = project.get_feature_view(seq_feature_view_name)
if seq_feature_view is None:
seq_table_name = "rec_sln_demo_behavior_table_preprocess_sequence_wide_seq_feature_v3"
behavior_table_name = 'rec_sln_demo_behavior_table_preprocess_v3'
ds = MaxComputeDataSource(project.offline_datasource_id, behavior_table_name)
event_time = 'event_unix_time' # 動作テーブルのイベント時間フィールドの名前。
item_id = 'item_id' # 動作テーブルの item_id フィールドの名前。
event = 'event' # 動作テーブルのイベントフィールドの名前。
# deduplication_method = 1 は、['user_id', 'item_id', 'event'] に基づいて重複が削除されることを示します。
# deduplication_method = 2 は、['user_id', 'item_id', 'event', 'event_time'] に基づいて重複が削除されることを示します。
sequence_feature_config_list = [
SequenceFeatureConfig(offline_seq_name='click_seq_50_seq', seq_event='click', online_seq_name='click_seq_50',
seq_len=50)]
# offline_seq_name は、オフラインシーケンスリストのシーケンス特徴フィールドの名前です。seq_event は、動作フィールドの名前です。online_seq_name は、FeatureStore オンライン Go SDK によってクエリされたときにユーザーの動作シーケンス item_id に割り当てられる名前です。
# seq_len はシーケンスの長さです。この長さより長いシーケンスは切り捨てられます。
seq_table_config = SequenceTableConfig(table_name=seq_table_name, primary_key='user_id',
event_time='event_unix_time')
seq_feature_view = project.create_sequence_feature_view(seq_feature_view_name, datasource=ds,
event_time=event_time, item_id=item_id, event=event,
deduplication_method=1,
sequence_feature_config=sequence_feature_config_list,
sequence_table_config=seq_table_config,
entity=user_entity_name)特徴ビューに関する情報を出力できます。
seq_feature_view.print_summary()データをオンラインストアに同期できます。
seq_task = seq_feature_view.publish_table({'ds': '20231023'}, days_to_load=30)
seq_task.wait()
seq_task.print_summary()ラベルテーブルを登録できます。
label_table_name = 'fs_movie_feature_view_ratings_offline'
ds = MaxComputeDataSource(data_source_id=project.offline_datasource_id, table=label_table_name)
label_table = project.get_label_table(label_table_name)
if label_table is None:
label_table = project.create_label_table(datasource=ds, event_time='rating_time')オフラインデータストアの構成
オフラインデータストアは、オフライン特徴を格納するためのデータウェアハウスです。オフライン特徴は、Apache Spark を使用して MaxCompute または Hadoop 分散ファイルシステム ( HDFS ) に書き込まれます。オフラインデータストアは、モデルトレーニング用のトレーニングセットを生成し、バッチ予測用の特徴を提供するために使用されます。
オンラインデータストアの構成
オンラインデータストアは、リアルタイム特徴を格納するためのデータウェアハウスです。オンライン推論のために最新の特徴に低レイテンシでアクセスできます。FeatureDB、Hologres、および Tablestore がサポートされています。
オンライン特徴の取得
特徴ビューの観点からオンライン特徴を取得します。現在は FeatureDB のサポートを優先しています。( FeatureDB 関連のドキュメントについては、FeatureDBをご参照ください )
feature_view_movie_name = "feature_view_movie"
batch_feature_view = project.get_feature_view(feature_view_movie_name)
ret_features_1 = batch_feature_view.list_feature_view_online_features(join_ids=['26357307'])
print("ret_features1 = ", ret_features_1)ret_features_2 = batch_feature_view.list_feature_view_online_features(join_ids=['30444960', '3317352'])
print("ret_features2 = ", ret_features_2)featureSelector の構成
FeatureSelector は、オンラインおよびオフラインデータストアから取得する特徴の範囲を定義します。特徴ビューを指定して、そこから特徴を抽出できます。
feature_view_name = 'feature_view_movie'
# 特定の特徴を取得します
feature_selector = FeatureSelector(feature_view_name, ['site_id', 'site_category'])
# すべての特徴を取得します
feature_selector = FeatureSelector(feature_view_name, '*')
# 取得する特徴のエイリアスを構成します
feature_selector = FeatureSelector(
feature_view='user1',
features = ['f1','f2', 'f3'],
alias={"f1":"f1_1"} # f1 フィールドのエイリアスとして f1_1 フィールドを指定します
)サンプルテーブル ( トレーニングセット ) の構成
FeatureStore は、ラベルと特徴を含む、モデルトレーニング用のサンプルテーブルを生成できます。モデルトレーニング用のラベルを準備し、モデルが特徴ビューからフェッチする必要のある特徴を定義する必要があります。ラベルは、プライマリキーに基づくポイントインタイム結合を使用して特徴に関連付けられます。
label_table_name = 'fs_movie_feature_view_ratings_offline'
output_ds = MaxComputeDataSource(data_source_id=project.offline_datasource_id)
train_set_output = TrainingSetOutput(output_ds)feature_view_movie_name = "feature_view_movie"
feature_movie_selector = FeatureSelector(feature_view_movie_name, ['name', 'actors', 'regions','tags'])
feature_view_user_name = 'feature_view_users'
feature_user_selector = FeatureSelector(feature_view_user_name, ['user_nickname'])
train_set = project.create_training_set(label_table_name=label_table_name, train_set_output= train_set_output, feature_selectors=[feature_movie_selector, feature_user_selector])
print("train_set = ", train_set)モデルのトレーニング
FeatureStore によって生成された train_set を使用してモデルをトレーニングし、トレーニングされたモデルを推論サービスとしてデプロイします。
model_name = "fs_rank_v1"
cur_model = project.get_model(model_name)
if cur_model is None:
cur_model = project.create_model(model_name, train_set)
print("cur_model_train_set_table_name = ", cur_model.train_set_table_name)サンプルテーブルのエクスポート
モデルトレーニング用のサンプルテーブルをエクスポートするには、ラベルテーブルと、各特徴ビューのイベント時間とパーティションを指定します。
label_partitions = PartitionConfig(name = 'dt', value = '20220831')
label_input_config = LabelInputConfig(partition_config=label_partitions, event_time='1999-01-00 00:00:00')
movie_partitions = PartitionConfig(name = 'dt', value = '20220830')
feature_view_movie_config = FeatureViewConfig(name = 'feature_view_movie', partition_config=movie_partitions)
user_partitions = PartitionConfig(name = 'ds', value = '20220830')
feature_view_user_config = FeatureViewConfig(name = 'feature_view_users', partition_config=user_partitions)
feature_view_config_list = [feature_view_movie_config, feature_view_user_config]
train_set_partitions = PartitionConfig(name = 'dt', value = '20220831')
train_set_output_config = TrainSetOutputConfig(partition_config=train_set_partitions)指定された条件に基づいてサンプルテーブルをエクスポートします。
task = cur_model.export_train_set(label_input_config, feature_view_config_list, train_set_output_config)
task.wait()print(task.summary)