すべてのプロダクト
Search
ドキュメントセンター

Platform For AI:FeatureStore SDK for Python を使用して推薦システムを構築する

最終更新日:Nov 01, 2025

このトピックでは、FeatureStore SDK for Python を使用して推薦システムを構築し、公開する方法について説明します。

前提条件

このトピックで説明する操作を実行する前に、次の要件が満たされていることを確認してください。

サービス

要件

Platform for AI (PAI)

MaxCompute

Hologres

DataWorks

ステップ 1: データの準備

シミュレーションテーブルからのデータ同期

推薦システムには、ユーザー特徴テーブルアイテム特徴テーブルラベルテーブルシーケンス特徴テーブル、および行動テーブルが必要です。

このトピックでは、MaxCompute プロジェクト pai_online_project のシミュレーションテーブルを例として、FeatureStore SDK for Python の使用方法を説明します。 DataWorks で SQL 文を実行して、pai_online_project プロジェクトからご自身の MaxCompute プロジェクトにシミュレーションテーブルを同期する必要があります。 シミュレーションテーブルを同期するには、次のステップを実行します:

  1. DataWorks コンソールにログインします。

  2. 左側のナビゲーションウィンドウで、[データ開発と O&M] > [データ開発] を選択します。

  3. [データ開発] ページで、作成した DataWorks ワークスペースを選択し、[データ開発に進む] をクリックします。

  4. [作成] ボタンにポインターを合わせ、[ノードの作成] > [MaxCompute] > [ODPS SQL] を選択します。 [ノードの作成] ダイアログボックスで、次の表で説明されているノードパラメーターを設定します。

    パラメーター

    説明

    エンジンインスタンス

    作成した MaxCompute エンジンを選択します。

    ノードタイプ

    [ノードタイプ] ドロップダウンリストから ODPS SQL を選択します。

    パス

    [ビジネスフロー] > [ワークフロー] > [MaxCompute] を選択します。

    名前

    ノードの名前を指定します。

  5. [確認] をクリックします。

  6. 作成したノードのタブで、次の SQL 文を実行して、pai_online_project プロジェクトからご自身の MaxCompute プロジェクトに、シミュレーションされたユーザーテーブル、アイテムテーブル、ラベルテーブル、シーケンステーブル、および行動テーブルを同期します。 作成した専用スケジューリングリソースグループをリソースグループとして選択します。

    ユーザーテーブル rec_sln_demo_user_table_preprocess_all_feature_v1 の同期 (クリックして詳細を表示)

    CREATE TABLE IF NOT EXISTS rec_sln_demo_user_table_preprocess_all_feature_v1
    like pai_online_project.rec_sln_demo_user_table_preprocess_all_feature_v1
    STORED AS ALIORC  
    LIFECYCLE 90;
    
    INSERT OVERWRITE TABLE rec_sln_demo_user_table_preprocess_all_feature_v1 PARTITION (ds)
    SELECT *
    FROM pai_online_project.rec_sln_demo_user_table_preprocess_all_feature_v1
    WHERE ds >= '20231022' and ds <='20231024'

    次のパーティションのデータが同期されます:

    • ds=20231022

    • ds=20231023

    • ds=20231024

    アイテムテーブル rec_sln_demo_item_table_preprocess_all_feature_v1 の同期 (クリックして詳細を表示)

    CREATE TABLE IF NOT EXISTS rec_sln_demo_item_table_preprocess_all_feature_v1
    like pai_online_project.rec_sln_demo_item_table_preprocess_all_feature_v1
    STORED AS ALIORC  
    LIFECYCLE 90;
    
    INSERT OVERWRITE TABLE rec_sln_demo_item_table_preprocess_all_feature_v1 PARTITION(ds)
    SELECT *
    FROM pai_online_project.rec_sln_demo_item_table_preprocess_all_feature_v1
    WHERE ds >= '20231022' and ds <='20231024'

    次のパーティションのデータが同期されます:

    • ds=20231022

    • ds=20231023

    • ds=20231024

    ラベルテーブル rec_sln_demo_label_table の同期 (クリックして詳細を表示)

    CREATE TABLE IF NOT EXISTS rec_sln_demo_label_table
    like pai_online_project.rec_sln_demo_label_table
    STORED AS ALIORC  
    LIFECYCLE 90;
    
    INSERT OVERWRITE TABLE rec_sln_demo_label_table PARTITION (ds)
    SELECT *
    FROM pai_online_project.rec_sln_demo_label_table
    WHERE ds >= '20231022' and ds <='20231024'

    次のパーティションのデータが同期されます:

    • ds=20231022

    • ds=20231023

    • ds=20231024

    シーケンステーブル rec_sln_demo_behavior_table_preprocess_sequence_wide_seq_feature_v3 の同期 (クリックして詳細を表示)

    CREATE TABLE IF NOT EXISTS rec_sln_demo_behavior_table_preprocess_sequence_wide_seq_feature_v3
    like pai_online_project.rec_sln_demo_behavior_table_preprocess_sequence_wide_seq_feature_v3
    STORED AS ALIORC  
    LIFECYCLE 90;
    
    INSERT OVERWRITE TABLE rec_sln_demo_behavior_table_preprocess_sequence_wide_seq_feature_v3 PARTITION(ds)
    SELECT *
    FROM pai_online_project.rec_sln_demo_behavior_table_preprocess_sequence_wide_seq_feature_v3
    WHERE ds >= '20231022' and ds <='20231024'

    次のパーティションのデータが同期されます:

    • ds=20231022

    • ds=20231023

    • ds=20231024

    行動テーブル rec_sln_demo_behavior_table_preprocess_v3 の同期 (クリックして詳細を表示)

    CREATE TABLE IF NOT EXISTS rec_sln_demo_behavior_table_preprocess_v3
    like pai_online_project.rec_sln_demo_behavior_table_preprocess_v3
    STORED AS ALIORC  
    LIFECYCLE 90;
    
    
    INSERT OVERWRITE TABLE rec_sln_demo_behavior_table_preprocess_v3 PARTITION(ds)
    SELECT *
    FROM pai_online_project.rec_sln_demo_behavior_table_preprocess_v3
    WHERE ds >= '20231022' and ds <='20231024'

前述の操作を実行した後、ワークスペースで同期されたテーブルを表示できます。 これらのテーブルは、次のセクションで例として使用されます。

データストアの設定

ほとんどの場合、FeatureStore でオフラインデータストア (MaxCompute プロジェクトなど) とオンラインデータストア (FeatureDB インスタンス、Hologres インスタンス、Tablestore インスタンスなど) を設定する必要があります。 この例では、MaxCompute プロジェクトがオフラインデータストアとして設定され、Hologres インスタンスがオンラインデータストアとして設定されます。

  1. PAI コンソールにログインします。 左側のナビゲーションウィンドウで、[データ準備] > [FeatureStore] を選択します。

  2. [FeatureStore] ページで、ドロップダウンリストからワークスペースを選択し、[FeatureStore に入る] をクリックします。

  3. MaxCompute データストアを設定します。

    1. [ストア] タブで、[ストアの作成] をクリックします。 [ストアの作成] パネルで、次の表で説明されているパラメーターを設定します。

      パラメーター

      説明

      タイプ

      ドロップダウンリストから [MaxCompute] を選択します。

      名前

      データストアの名前を指定します。

      MaxCompute プロジェクト名

      作成した MaxCompute プロジェクトを選択します。

    2. 権限付与文をコピーし、[移動] をクリックして DataWorks でコピーした文を実行します。これにより、Hologres インスタンスに、MaxCompute プロジェクトからデータを同期する権限が付与されます。

      説明

      Hologres インスタンスに権限を付与するには、アカウントに管理者権限があることを確認してください。 詳細については、「コマンドを使用したユーザー権限の管理」または「MaxCompute コンソール (新バージョン) でのユーザー権限の管理」をご参照ください。

    3. [送信] をクリックします。

  4. Hologres データストアを設定します。

    1. [ストア] タブで、[ストアの作成] をクリックします。 [ストアの作成] パネルで、次の表で説明されているパラメーターを設定します。

      パラメーター

      説明

      タイプ

      ドロップダウンリストから [Hologres] を選択します。

      名前

      データストアの名前を指定します。

      インスタンス ID

      作成した Hologres インスタンスを選択します。

      データベース名

      Hologres インスタンスで作成したデータベースを選択します。

    2. [送信] をクリックします。

    3. Hologres インスタンスにアクセスするための権限を付与します。 詳細については、「データストアの作成」をご参照ください。

ステップ 2: FeatureStore SDK for Python の設定

FeatureStore SDK for Python をインストールします。 SDK には Python 3 が必要です。 次のサンプルコードを Jupyter Notebook で実行することを推奨します。

!  pip install https://feature-store-py.oss-cn-beijing.aliyuncs.com/package/feature_store_py-1.3.1-py3-none-any.whl

必要な機能モジュールをインポートします。

import unittest
import sys
import os
from os.path import dirname, join, abspath
from feature_store_py.fs_client import FeatureStoreClient, build_feature_store_client
from feature_store_py.fs_project import FeatureStoreProject
from feature_store_py.fs_datasource import UrlDataSource, MaxComputeDataSource, DatahubDataSource, HologresDataSource, SparkDataSource, LabelInput, TrainingSetOutput
from feature_store_py.fs_type import FSTYPE
from feature_store_py.fs_schema import OpenSchema, OpenField
from feature_store_py.fs_feature_view import FeatureView
from feature_store_py.fs_features import FeatureSelector
from feature_store_py.fs_config import LabelInputConfig, PartitionConfig, FeatureViewConfig, TrainSetOutputConfig, SequenceFeatureConfig, SequenceTableConfig
import logging
logger = logging.getLogger("foo")
logger.addHandler(logging.StreamHandler(stream=sys.stdout))

FeatureStore プロジェクト

FeatureStore では複数のプロジェクトを作成できます。 各プロジェクトは独立しています。 詳細については、「FeatureStore プロジェクトの設定」をご参照ください。 Jupyter Notebook でサンプルコードを実行するには、FeatureStore で特定の設定が必要です。 たとえば、FeatureStore でデータストアを設定する必要があります。 詳細については、「データストアの作成」をご参照ください。

このトピックのサンプルコードでは、offline_datasource_id パラメーターはオフラインデータストアの ID を指定し、online_datasource_id パラメーターはオンラインデータストアの ID を指定します。

次のサンプルコードでは、fs_movie プロジェクトを例として、FeatureStore プロジェクトの設定方法を示します。

access_id = ''
access_ak = ''
region = 'cn-beijing'
fs = FeatureStoreClient(access_key_id=access_id, access_key_secret=access_ak, region=region)
cur_project_name = "fs_demo"
project = fs.get_project(cur_project_name)
if project is None:
  raise ValueError("Need to create project : fs_movie")

プロジェクトを取得し、プロジェクト情報を出力します。

project = fs.get_project(cur_project_name)
project.print_summary()

特徴エンティティ

特徴エンティティは、意味的に関連する特徴のコレクションです。 特徴エンティティは、複数の特徴ビューに関連付けることができます。 各エンティティには join ID があります。 join ID を使用して、複数の特徴ビューにまたがる特徴を関連付けることができます。 各特徴ビューには、特徴を取得するために使用できるプライマリキー (インデックス) があります。 プライマリキーは、join ID の名前とは異なる場合があります。

通常、推薦システムには、ユーザーとアイテムの 2 つの特徴エンティティがあります。 次のサンプルコードは、ユーザーエンティティとアイテムエンティティを作成する方法の例を示しています:

  • ユーザーエンティティの作成

    user_entity_name = "user"
    user_join_id = 'user_id'
    user_entity = project.get_entity(user_entity_name)
    if user_entity is None:
     user_entity = project.create_entity(name = user_entity_name, join_id=user_join_id)
    user_entity.print_summary()
    
  • アイテムエンティティの作成

    item_entity_name = "item"
    join_id = 'item_id'
    item_entity = project.get_entity(item_entity_name)
    if item_entity is None:
     item_entity = project.create_entity(name = item_entity_name, join_id=join_id)
    item_entity.print_summary()

特徴ビュー

特徴ビューを作成して、外部ソースから FeatureStore に特徴データを取り込み、一元管理できます。 特徴ビューには、データソース、必要な変換、特徴テーブルスキーマ、オンラインおよびオフラインデータストアなど、特徴を管理するために必要なすべての情報が含まれています。 特徴ビューでは、プライマリキー、イベント時間、パーティションフィールド、特徴エンティティ、存続時間 (TTL) などのメタデータも管理できます。 TTL は、オンラインデータストアで特徴データが利用可能な期間を指定するパラメーターです。 デフォルト値の -1 は、オンラインデータストアがすべての特徴データを保持することを指定します。 正の値は、オンラインデータストアが指定された期間内の最新の特徴データのみを保持することを指定します。

FeatureStore は、バッチ特徴ビュー、ストリーム特徴ビュー、シーケンス特徴ビューの各タイプをサポートしています。

バッチ特徴ビュー

バッチ特徴ビューを使用すると、オフライン特徴をオフラインデータストアに取り込み、オフライン特徴をオンラインデータストアに同期できます。 これにより、リアルタイムで特徴をクエリできます。

  • ユーザーテーブルを登録します。

    1. FeatureStore にユーザーテーブル rec_sln_demo_user_table_preprocess_all_feature_v1 を登録します。

      user_feature_view_name = "user_table_preprocess_all_feature_v1"
      user_table_name = "rec_sln_demo_user_table_preprocess_all_feature_v1"
      user_feature_view = project.get_feature_view(user_feature_view_name)
      if user_feature_view is None:
       ds = MaxComputeDataSource(project.offline_datasource_id, user_table_name)
       user_feature_view = project.create_batch_feature_view(name=user_feature_view_name, datasource=ds, online=True, entity= user_entity_name, primary_key='user_id', register=True)
      print(user_feature_view)
    2. オフラインデータストアからオンラインデータストアに ds=20231023 パーティションのデータを同期します。

      user_task = user_feature_view.publish_table({'ds':'20231023'})
      user_task.wait()
    3. タスクの実行ステータスを表示します。

      user_task.print_summary()
  • アイテムテーブルを登録します。

    1. FeatureStore にアイテムテーブル rec_sln_demo_item_table_preprocess_all_feature_v1 を登録します。

      item_feature_view_name = "item_table_preprocess_all_feature_v1"
      item_table_name = "rec_sln_demo_item_table_preprocess_all_feature_v1"
      item_feature_view = project.get_feature_view(item_feature_view_name)
      if item_feature_view is None:
        ds = MaxComputeDataSource(project.offline_datasource_id, item_table_name)
        item_feature_view = project.create_batch_feature_view(name=item_feature_view_name, datasource=ds, online = True, entity= item_entity_name, primary_key='item_id', register=True)
      print(item_feature_view)
    2. オフラインデータストアからオンラインデータストアに ds=20231023 パーティションのデータを同期します。

      item_task = item_feature_view.publish_table({'ds':'20231023'})
      item_task.wait()
    3. タスクの実行ステータスを表示します。

      item_task.print_summary()

シーケンス特徴ビュー

シーケンス特徴ビューを使用すると、シーケンス特徴をオフラインで書き込み、リアルタイムのシーケンス特徴をオンラインで読み取ることができます。 推薦システムを構築する際に、シミュレートされたオフラインシーケンステーブル (F1 テーブル) を FeatureStore に登録できます。 シミュレートされたテーブルのデータは、その後オンラインログに置き換えられます。 リアルタイムのシーケンス特徴をクエリするために、システムは当日のオンライン行動テーブル (B2 テーブル) と前日のオンライン行動テーブル (B1 テーブル) からデータを取得します。 これら 2 つのテーブルから取得されたデータは連結されてシーケンス特徴を生成し、スコアリングのためにモデルに送信されます。

B1 テーブルは、前日のオフライン行動テーブル (A1 テーブル) からデータを同期します。 重複データは、同期中に FeatureStore によって自動的にフィルター処理されます。 B2 テーブルには、リアルタイムの行動特徴が含まれています。 API 操作を呼び出すか、Realtime Compute for Apache Flink を使用して、リアルタイムデータを B2 テーブルに書き込むことができます。

シーケンス特徴ビューを作成すると、FeatureStore は F1、A1、B1、および B2 テーブルを一元的に管理します。

オフラインデータストアの F1 テーブルと A1 テーブルのみをシーケンス特徴ビューに登録する必要があります。 FeatureStore は、同期と重複排除によってオンライン行動テーブル (B1 テーブルと B2 テーブル) を自動的に作成します。

  1. シーケンス特徴ビューを作成します。

    seq_feature_view_name = "wide_seq_feature_v3"
    seq_feature_view = project.get_feature_view(seq_feature_view_name)
    if seq_feature_view is None:
      seq_table_name = "rec_sln_demo_behavior_table_preprocess_sequence_wide_seq_feature_v3"
      behavior_table_name = 'rec_sln_demo_behavior_table_preprocess_v3'
      ds = MaxComputeDataSource(project.offline_datasource_id, behavior_table_name)
      event_time='event_unix_time ' # 行動テーブルのイベント時間フィールドの名前。
      item_id = 'item_id' # 行動テーブルの item_id フィールドの名前。
      event = 'event' # 行動テーブルのイベントフィールドの名前。
      # deduplication_method = 1 は、user_id、item_id、および event フィールドに基づいてデータを重複排除することを指定します。
      # deduplication_method = 2 は、user_id、item_id、event、および event_time フィールドに基づいてデータを重複排除することを指定します。
      sequence_feature_config_list = [SequenceFeatureConfig(offline_seq_name='click_seq_50_seq', seq_event='click', online_seq_name='click_seq_50', seq_len=50)]
      # offline_seq_name パラメーターは、オフラインシーケンステーブルのシーケンス特徴フィールドの名前を指定します。 seq_event パラメーターは、イベントフィールドの名前を指定します。 online_seq_name パラメーターは、FeatureStore SDK for Go を使用して取得されるオンラインシーケンス特徴のフィールド名を指定します。
      # seq_len パラメーターは、シーケンス特徴の最大長を指定します。 制限を超えたシーケンスは切り捨てられます。
      seq_table_config = SequenceTableConfig(table_name=seq_table_name, primary_key='user_id', event_time='event_unix_time')
      seq_feature_view = project.create_sequence_feature_view(seq_feature_view_name, datasource=ds,
                                                  event_time=event_time, item_id=item_id, event=event, deduplication_method=1,
                                                  sequence_feature_config=sequence_feature_config_list, sequence_table_config=seq_table_config, entity=user_entity_name)
    # seq_feature_view.print_summary()
    print(seq_feature_view)
  2. オフラインデータストアからオンラインデータストアに行動テーブル rec_sln_demo_behavior_table_preprocess_v3 の ds=20231023 パーティションのデータを同期します。 同期中に、FeatureStore はデータが欠落している場合に自動的にチェックし、既存データをロードします。days_to_load パラメーターは、FeatureStore がデータの可用性をチェックする過去の日数を指定します。 このパラメーターのデフォルト値は 30 です。

    seq_task = seq_feature_view.publish_table({'ds':'20231023'}, days_to_load=30)
    seq_task.wait()
  3. タスクの実行ステータスを表示します。

    seq_task.print_summary()

ストリーム特徴ビュー

ストリーム特徴ビューを使用すると、リアルタイム特徴をオンラインデータストアに書き込み、データをオフラインデータストアに同期できます。 これは、商品の価格や売上など、特徴がリアルタイムで更新されるシナリオで機能します。

ラベルテーブルの登録

label_table_name = 'rec_sln_demo_label_table'
ds = MaxComputeDataSource(data_source_id=project.offline_datasource_id, table=label_table_name)
label_table = project.get_label_table(label_table_name)
if label_table is None:
  label_table = project.create_label_table(datasource=ds, event_time='event_unix_time')
print(label_table)

オンライン特徴の取得

オンライン特徴を取得してデータを分析し、オンラインデータストアとオフラインデータストアで特徴が一致しているかどうかを確認できます。

user_feature_view_name = "user_table_preprocess_all_feature_v1"
user_feature_view = project.get_feature_view(user_feature_view_name)
ret_features_1 = user_feature_view.get_online_features(join_ids={'user_id':['169898460', '148811946']}, features=['user_id', 'gender', 'age', 'city'])
print("ret_features = ", ret_features_1)

トレーニングデータセット

FeatureStore を使用して、モデルトレーニング用のトレーニングデータセットを生成できます。 トレーニングデータセットには、ラベルと特徴が含まれています。 モデルトレーニング用のラベルを準備し、モデルが特徴ビューからフェッチする必要がある特徴を定義する必要があります。 ラベルは、プライマリキーに基づいてポイントインタイム結合を使用して特徴に関連付けられます。

# Specify the label table.
label_table_name = 'rec_sln_demo_label_table'

output_ds = MaxComputeDataSource(data_source_id=project.offline_datasource_id)
train_set_output = TrainingSetOutput(output_ds)
user_feature_view_name = "user_table_preprocess_all_feature_v1"
user_feature_selector=FeatureSelector (user_feature_view_name, '*') # '*' Specifies all features.
item_feature_view_name = "item_table_preprocess_all_feature_v1"
item_feature_selector = FeatureSelector(item_feature_view_name, '*')
seq_feature_view_name = "wide_seq_feature_v3"
seq_feature_selector = FeatureSelector(seq_feature_view_name, ['click_seq_50_seq'])
train_set = project.create_training_set(label_table_name=label_table_name, train_set_output= train_set_output, feature_selectors=[user_feature_selector, item_feature_selector, seq_feature_selector])
print("train_set = ", train_set)

モデル特徴

FeatureStore によって生成されたトレーニングデータセットでモデルをトレーニングし、トレーニング済みのモデルを PAI の推論サービスとしてデプロイできます。

model_name = "fs_rank_v2"
cur_model = project.get_model(model_name)
if cur_model is None:
  cur_model = project.create_model(model_name, train_set)
print("cur_model_train_set_table_name = ", cur_model.train_set_table_name)

ステップ 3: トレーニングデータセットのエクスポートとモデルのトレーニング

FeatureStore からモデルトレーニング用のトレーニングデータセットをエクスポートできます。

トレーニングデータセットのエクスポート

ラベルテーブルと、各特徴ビューのイベント時間およびパーティションを指定します。

cur_day = '20231024'
pre_day = '20231023'
label_partitions = PartitionConfig(name = 'ds', value = cur_day)
label_input_config = LabelInputConfig(partition_config=label_partitions)

user_partitions = PartitionConfig(name = 'ds', value = pre_day)
feature_view_user_config = FeatureViewConfig(name = 'user_table_preprocess_all_feature_v1',
partition_config=user_partitions)

item_partitions = PartitionConfig(name = 'ds', value = pre_day)
feature_view_item_config = FeatureViewConfig(name = 'item_table_preprocess_all_feature_v1',
partition_config=item_partitions)

seq_partitions = PartitionConfig(name = 'ds', value = cur_day)
feature_view_seq_config = FeatureViewConfig(name = 'wide_seq_feature_v3', partition_config=seq_partitions, event_time='event_unix_time', equal=True)
feature_view_config_list = [feature_view_user_config, feature_view_item_config, feature_view_seq_config]
train_set_partitions = PartitionConfig(name = 'ds', value = cur_day)
train_set_output_config = TrainSetOutputConfig(partition_config=train_set_partitions)


model_name = 'fs_rank_v2'
cur_model = project.get_model(model_name)
task = cur_model.export_train_set(label_input_config, feature_view_config_list, train_set_output_config)
task.wait()
print("task_summary = ", task.task_summary)

モデルのトレーニング

EasyRec は、FeatureStore とシームレスに接続してモデルのトレーニング、エクスポート、公開を行うことができるオープンソースの推薦システムフレームワークです。 fs_demo_fs_rank_v1_trainning_set トレーニングデータセットでモデルをトレーニングするには、EasyRec を使用することを推奨します。

  • EasyRec のオープンソースコードの詳細については、「EasyRec」をご参照ください。

  • EasyRec の詳細については、「What is EasyRec?」をご参照ください。

  • EasyRec を使用したモデルのトレーニング方法の詳細については、「train_config」をご参照ください。

EasyRec に関するその他の質問がある場合は、DingTalk グループ (ID: 32260796) に参加して技術サポートを受けてください。

ステップ 4: モデルの公開

モデルをトレーニングしてエクスポートした後、モデルをデプロイして公開できます。 自己管理の推薦システムを使用している場合は、FeatureStore SDK for Python、FeatureStore SDK for Go、FeatureStore SDK for C++、または FeatureStore SDK for Java を使用して、推薦システムを FeatureStore に接続できます。 推薦システムを FeatureStore に接続する方法に関する技術サポートについては、DingTalk グループ (ID 32260796) に参加してください。 FeatureStore は、他の Alibaba Cloud サービスとシームレスに統合されています。 Alibaba Cloud サービスを使用して、推薦システムを迅速に構築および公開できます。

この例では、Alibaba Cloud サービスを使用してモデルを公開します。

データ同期ノードの設定

モデルを公開する前に、オフラインデータストアからオンラインデータストアに定期的にデータを同期できるように、データ同期ノードを設定する必要があります。 データ同期ノードを設定するには、次のステップを実行します:

  1. DataWorks コンソールにログインします。

  2. 左側のナビゲーションウィンドウで、[データ開発と O&M] > [データ開発] を選択します。

  3. [データ開発] ページで、作成した DataWorks ワークスペースを選択し、[データ開発に進む] をクリックします。

  4. ユーザーテーブルを定期的に同期します。

    1. [作成] ボタンにポインターを合わせ、[ノードの作成] > [MaxCompute] > [PyODPS 3] を選択します。

    2. [ノードの作成] ダイアログボックスで、ノードパラメーターを設定し、[確認] をクリックします。

    3. 次のコードをコードエディタにコピーして、user_table_preprocess_all_feature_v1 特徴ビューから定期的にデータを同期します:

      user_table_preprocess_all_feature_v1 特徴ビューからのデータ同期 (クリックして詳細を表示)

      from feature_store_py.fs_client import FeatureStoreClient
      import datetime
      from feature_store_py.fs_datasource import MaxComputeDataSource
      import sys
      
      cur_day = args['dt']
      print('cur_day = ', cur_day)
      
      access_key_id = o.account.access_id
      access_key_secret = o.account.secret_access_key
      fs = FeatureStoreClient(access_key_id=access_key_id, access_key_secret=access_key_secret, region='cn-beijing')
      cur_project_name = 'fs_demo'
      project = fs.get_project(cur_project_name)
      
      feature_view_name = 'user_table_preprocess_all_feature_v1'
      batch_feature_view = project.get_feature_view(feature_view_name)
      task = batch_feature_view.publish_table(partitions={'ds':cur_day}, mode='Overwrite')
      task.wait()
      task.print_summary()
    4. ページの右側にある [プロパティ] をクリックします。 [プロパティ] パネルで、次の表で説明されているスケジューリングパラメーターを設定します。

      パラメーター

      説明

      スケジューリングパラメーター

      パラメーター名

      dt

      パラメーター値

      $[yyyymmdd-1]

      リソースグループ

      リソースグループ

      作成した専用スケジューリングリソースグループを選択します。

      依存関係

      作成したユーザーテーブルを選択します。

    5. ノードが設定およびテストされた後、ノード設定を保存して送信します。

    6. ノードのデータをバックフィルします。 詳細については、このトピックの「シミュレーションテーブルからのデータ同期」セクションをご参照ください。

  5. アイテムテーブルを同期します。

    1. [作成] ボタンにポインターを合わせ、[ノードの作成] > [MaxCompute] > [PyODPS 3] を選択します。

    2. [ノードの作成] ダイアログボックスで、ノードパラメーターを設定し、[確認] をクリックします。

    3. 次のコードをコードエディタにコピーします:

      item_table_preprocess_all_feature_v1 特徴ビューからのデータ同期 (クリックして詳細を表示)

      from feature_store_py.fs_client import FeatureStoreClient
      import datetime
      from feature_store_py.fs_datasource import MaxComputeDataSource
      import sys
      
      cur_day = args['dt']
      print('cur_day = ', cur_day)
      
      access_key_id = o.account.access_id
      access_key_secret = o.account.secret_access_key
      fs = FeatureStoreClient(access_key_id=access_key_id, access_key_secret=access_key_secret, region='cn-beijing')
      cur_project_name = 'fs_demo'
      project = fs.get_project(cur_project_name)
      
      feature_view_name = 'item_table_preprocess_all_feature_v1'
      batch_feature_view = project.get_feature_view(feature_view_name)
      task = batch_feature_view.publish_table(partitions={'ds':cur_day}, mode='Overwrite')
      task.wait()
      task.print_summary()
    4. ページの右側にある [プロパティ] をクリックします。 [プロパティ] パネルで、次の表で説明されているスケジューリングパラメーターを設定します。

      パラメーター

      説明

      スケジューリングパラメーター

      パラメーター名

      dt

      パラメーター値

      $[yyyymmdd-1]

      リソースグループ

      リソースグループ

      作成した専用スケジューリングリソースグループを選択します。

      依存関係

      作成したアイテムテーブルを選択します。

    5. ノードが設定およびテストされた後、ノード設定を保存して送信します。

    6. ノードのデータをバックフィルします。 詳細については、このトピックの「シミュレーションテーブルからのデータ同期」セクションをご参照ください。

  6. リアルタイムシーケンステーブルを同期します。

    1. [作成] ボタンにポインターを合わせ、[ノードの作成] > [MaxCompute] > [PyODPS 3] を選択します。

    2. [ノードの作成] ダイアログボックスで、ノードパラメーターを設定し、[確認] をクリックします。

    3. 次のコードをコードエディタにコピーします:

      wide_seq_feature_v3 特徴ビューからのデータ同期 (クリックして詳細を表示)

      from feature_store_py.fs_client import FeatureStoreClient
      import datetime
      from feature_store_py.fs_datasource import MaxComputeDataSource
      import sys
      
      cur_day = args['dt']
      print('cur_day = ', cur_day)
      
      access_key_id = o.account.access_id
      access_key_secret = o.account.secret_access_key
      fs = FeatureStoreClient(access_key_id=access_key_id, access_key_secret=access_key_secret, region='cn-beijing')
      cur_project_name = 'fs_demo'
      project = fs.get_project(cur_project_name)
      
      feature_view_name = 'wide_seq_feature_v3'
      batch_feature_view = project.get_feature_view(feature_view_name)
      task = batch_feature_view.publish_table(partitions={'ds':cur_day},days_to_load=30)
      task.wait()
      task.print_summary()
    4. ページの右側にある [プロパティ] をクリックします。 [プロパティ] パネルで、次の表で説明されているスケジューリングパラメーターを設定します。

      パラメーター

      説明

      スケジューリングパラメーター

      パラメーター名

      dt

      パラメーター値

      $[yyyymmdd-1]

      リソースグループ

      リソースグループ

      作成した専用スケジューリングリソースグループを選択します。

      依存関係

      作成したアイテムテーブルを選択します。

    5. ノードが設定およびテストされた後、ノード設定を保存して送信します。

    6. ノードのデータをバックフィルします。 詳細については、このトピックの「シミュレーションテーブルからのデータ同期」セクションをご参照ください。

  7. データが同期された後、Hologres データストアで同期された最新の特徴を表示できます。

EAS を使用したモデルサービスの作成とデプロイ

Elastic Algorithm Service (EAS) を使用してモデルサービスをデプロイできます。 モデルサービスは、推薦エンジンからリクエストを受け取り、リクエストに基づいてアイテムをスコアリングし、スコアを返します。 EasyRec プロセッサーは、FeatureStore SDK for C++ を統合して、低レイテンシーで特徴を取得します。 EasyRec プロセッサーは、SDK を使用して特徴を取得した後、特徴をモデルに渡して推論を行い、スコアを推薦エンジンに返します。

モデルサービスをデプロイするには、次のステップを実行します:

  1. DataWorks コンソールにログインします。

  2. 左側のナビゲーションウィンドウで、[データ開発と O&M] > [データ開発] を選択します。

  3. 作成した DataWorks ワークスペースを選択し、[データ開発に進む] をクリックします。

  4. [作成] ボタンにポインターを合わせ、[ノードの作成] > [MaxCompute] > [PyODPS 3] を選択します。

  5. [ノードの作成] ダイアログボックスで、ノードパラメーターを設定し、[確認] をクリックします。

  6. 次のコードをコードエディタにコピーします:

    import os
    import json
    config = {
      "name": "fs_demo_v1",
      "metadata": {
        "cpu": 4,
        "rpc.max_queue_size": 256,
        "rpc.enable_jemalloc": 1,
        "gateway": "default",
        "memory": 16000
      },
      "model_path": f"oss://beijing0009/EasyRec/deploy/rec_sln_demo_dbmtl_v1/{args['ymd']}/export/final_with_fg",
      "model_config": {
        "access_key_id": f'{o.account.access_id}',
        "access_key_secret": f'{o.account.secret_access_key}',
        "region": "cn-beijing",
        "fs_project": "fs_demo",
        "fs_model": "fs_rank_v2",
        "fs_entity": "item",
        "load_feature_from_offlinestore": True,
        "steady_mode": True,
        "period": 2880,
        "outputs": "probs_is_click,y_ln_playtime,probs_is_praise",
        "fg_mode": "tf"
      },
      "processor": "easyrec-1.8",
      "processor_type": "cpp"
    }
    
    with open("echo.json", "w") as output_file:
        json.dump(config, output_file)
    
    # 最初のデプロイメントでは、次のコード行を実行します:
    os.system(f"/home/admin/usertools/tools/eascmd -i {o.account.access_id} -k {o.account.secret_access_key} -e pai-eas.cn-beijing.aliyuncs.com create echo.json")
    
    # 定期的な更新では、次のコード行を実行します:
    # os.system(f"/home/admin/usertools/tools/eascmd -i {o.account.access_id} -k {o.account.secret_access_key} -e pai-eas.cn-beijing.aliyuncs.com modify fs_demo_v1 -s echo.json")
  7. ページの右側にある [プロパティ] をクリックします。 [プロパティ] パネルで、次の表で説明されているスケジューリングパラメーターを設定します。

    パラメーター

    説明

    スケジューリングパラメーター

    パラメーター名

    dt

    パラメーター値

    $[yyyymmdd-1]

    リソースグループ

    リソースグループ

    作成した専用スケジューリングリソースグループを選択します。

    依存関係

    トレーニングジョブと item_table_preprocess_all_feature_v1 特徴ビューを選択します。

  8. ノードが設定およびテストされた後、ノードを実行してデプロイメントステータスを表示します。

  9. デプロイメントが完了したら、コードの 34 行目をコメントアウトし、37 行目のコメントを解除して、ジョブを定期的に実行します。

  10. デプロイされたサービスは、PAI コンソールの [Elastic Algorithm Service (EAS)] ページの [推論サービス] タブで表示できます。 詳細については、「カスタムデプロイメント」をご参照ください。

PAI-Rec の設定

PAI-Rec は、FeatureStore SDK for Go を統合し、FeatureStore および EAS とシームレスに統合できる推薦エンジンサービスです。

PAI-Rec を設定するには、次のステップを実行します:

  1. FeatureStoreConfs パラメーターを設定します。

    • RegionId: FeatureStore が有効化されているリージョンの ID。 この例では、cn-beijing が使用されます。

    • ProjectName: FeatureStore で作成したプロジェクトの名前。 この例では、fs_demo が使用されます。

        "FeatureStoreConfs": {
            "pairec-fs": {
                "RegionId": "cn-beijing",
                "AccessId": "${AccessKey}",
                "AccessKey": "${AccessSecret}",
                "ProjectName": "fs_demo"
            }
        },
  2. FeatureConfs パラメーターを設定します。

    • FeatureStoreName: このパラメーターを、FeatureStoreConfs パラメーターで指定された pairec-fs に設定します。

    • FeatureStoreModelName: 作成したモデル特徴の名前。 この例では、fs_rank_v1 が使用されます。

    • FeatureStoreEntityName: 作成した特徴エンティティの名前。 この例では、user が使用されます。 このパラメーター設定により、PAI-Rec は FeatureStore SDK for Go を使用して fs_rank_v1 モデルのユーザー特徴を取得できます。

        "FeatureConfs": {
            "recreation_rec": {
                "AsynLoadFeature": true,
                "FeatureLoadConfs": [
                    {
                        "FeatureDaoConf": {
                            "AdapterType": "featurestore",
                            "FeatureStoreName": "pairec-fs",
                            "FeatureKey": "user:uid",
                            "FeatureStoreModelName": "fs_rank_v1",
                            "FeatureStoreEntityName": "user",
                            "FeatureStore": "user"
                        }
                    }
                ]
            }
        },
  3. AlgoConfs パラメーターを設定します。

    AlgoConfs パラメーターは、PAI-Rec が接続する EAS のスコアリングサービスを指定します。

    • Name: EAS を使用してデプロイしたサービスの名前。

    • Url および Auth: EAS を使用してデプロイしたサービスにアクセスするために使用される URL とトークン。 [Elastic Algorithm Service (EAS)] ページでサービス名をクリックし、[基本情報] タブの [エンドポイント情報を表示] をクリックして、URL とトークンを取得できます。 詳細については、「EAS のよくある質問」をご参照ください。

        "AlgoConfs": [
            {
                "Name": "fs_demo_v1",
                "Type": "EAS",
                "EasConf": {
                    "Processor": "EasyRec",
                    "Timeout": 300,
                    "ResponseFuncName": "easyrecMutValResponseFunc",
                    "Url": "eas_url_xxx",
                    "EndpointType": "DIRECT",
                    "Auth": "eas_token"
                }
            }
        ],