すべてのプロダクト
Search
ドキュメントセンター

Platform For AI:特徴プラットフォームと特徴エンジニアリング

最終更新日:Nov 07, 2025

FeatureStore の特徴エンジニアリング機能は、レコメンデーション、広告、リスク管理、機械学習などの分野で広く使用されています。これらの機能は、特徴エンジニアリングの複雑さを軽減するように設計されています。一般的な関数を標準化することで、簡単な構成で特徴エンジニアリングを実装できます。このトピックでは、特徴エンジニアリングのプロセスについて詳しく説明します。

前提条件

次の表に記載されている準備が完了していることを確認してください。

サービス

操作

PAI

MaxCompute

DataWorks

1. 準備

生データの準備

このトピックでは、次の 4 つのソーステーブルを使用します:

  • ユーザーテーブル (rec_sln_demo_user_table_preprocess_v1): 性別、年齢、都市、フォロワー数などの基本的なユーザー特徴が含まれます。

  • 行動テーブル (rec_sln_demo_behavior_table_preprocess_v1): 特定の時点でのユーザーによるアイテムのクリックなどの行動データが含まれます。

  • アイテムテーブル (rec_sln_demo_item_table_preprocess_v1): カテゴリ、作成者、累積クリック数、累積いいね数などの基本的なアイテム特徴が含まれます。

  • ワイド行動テーブル (rec_sln_demo_behavior_table_preprocess_wide_v1): このテーブルは、上記の 3 つのテーブルを結合して作成されます。

データテーブルは、すべてのユーザーに表示される pai_online_project ワークスペースに保存されます。データテーブルにはシミュレーションデータのみが保存されます。DataWorks で SQL 文を実行して、pai_online_project ワークスペースから MaxCompute プロジェクトに上記のテーブルのデータを同期する必要があります。次の手順を実行します:

  1. DataWorks コンソールにログインします。

  2. 左側のナビゲーションウィンドウで、[データ開発 & O&M] > [データ開発] をクリックします。

  3. 作成した DataWorks ワークスペースを選択し、[データ開発に入る] をクリックします。

  4. [新規] にカーソルを合わせ、[新規ノード] > [MaxCompute] > [ODPS SQL] を選択します。表示されるページで、ノードパラメーターを設定します。

    パラメーター

    推奨値

    エンジンインスタンス

    作成した MaxCompute エンジンを選択します。

    ノードタイプ

    ODPS SQL

    パス

    Business Flow/Workflow/MaxCompute

    名前

    カスタム名を入力します。

  5. [確認] をクリックします。

  6. 新しいノードエディターで、次の SQL コマンドを実行して、pai_online_project プロジェクトのユーザー、アイテム、行動、およびワイド行動テーブルから MaxCompute プロジェクトにデータを同期します。リソースグループには、専用スケジューリングリソースグループを選択します。

    • 次の SQL 文を実行して、ユーザーテーブル rec_sln_demo_user_table_preprocess_v1 のデータを同期します:

      CREATE TABLE IF NOT EXISTS rec_sln_demo_user_table_preprocess_v1
      like pai_online_project.rec_sln_demo_user_table_preprocess_v1
      STORED AS ALIORC  
      LIFECYCLE 90;
      
      INSERT OVERWRITE TABLE rec_sln_demo_user_table_preprocess_v1 PARTITION(ds)
      SELECT *
      FROM pai_online_project.rec_sln_demo_user_table_preprocess_v1
      WHERE ds >= '20240530' and ds <='20240605';
    • 次の SQL 文を実行して、行動テーブル rec_sln_demo_behavior_table_preprocess_v1 のデータを同期します:

      CREATE TABLE IF NOT EXISTS rec_sln_demo_behavior_table_preprocess_v1
      like pai_online_project.rec_sln_demo_behavior_table_preprocess_v1
      STORED AS ALIORC  
      LIFECYCLE 90;
      
      INSERT OVERWRITE TABLE rec_sln_demo_behavior_table_preprocess_v1 PARTITION(ds)
      SELECT *
      FROM pai_online_project.rec_sln_demo_behavior_table_preprocess_v1
      WHERE ds >= '20240530' and ds <='20240605';
    • 次の SQL 文を実行して、アイテムテーブル rec_sln_demo_item_table_preprocess_v1 のデータを同期します:

      CREATE TABLE IF NOT EXISTS rec_sln_demo_item_table_preprocess_v1
      like pai_online_project.rec_sln_demo_item_table_preprocess_v1
      STORED AS ALIORC  
      LIFECYCLE 90;
      
      INSERT OVERWRITE TABLE rec_sln_demo_item_table_preprocess_v1 PARTITION(ds)
      SELECT *
      FROM pai_online_project.rec_sln_demo_item_table_preprocess_v1
      WHERE ds >= '20240530' and ds <='20240605';
    • ワイド行動テーブルを同期します: rec_sln_demo_behavior_table_preprocess_wide_v1

      CREATE TABLE IF NOT EXISTS rec_sln_demo_behavior_table_preprocess_wide_v1
      like pai_online_project.rec_sln_demo_behavior_table_preprocess_wide_v1
      STORED AS ALIORC  
      LIFECYCLE 90;
      
      INSERT OVERWRITE TABLE rec_sln_demo_behavior_table_preprocess_wide_v1 PARTITION(ds)
      SELECT *
      FROM pai_online_project.rec_sln_demo_behavior_table_preprocess_wide_v1
      WHERE ds >= '20240530' and ds <='20240605';

FeatureStore SDK for Python のインストール

Jupyter Notebook を使用して次のコードを実行することをお勧めします。

  • Python3 環境に FeatureStore SDK for Python をインストールします。

    %pip install https://feature-store-py.oss-cn-beijing.aliyuncs.com/package/feature_store_py-2.0.2-py3-none-any.whl
  • 必要な機能モジュールをインポートします。

    import os
    from feature_store_py import FeatureStoreClient
    from feature_store_py.fs_datasource import MaxComputeDataSource
    from feature_store_py.feature_engineering import TableTransform, Condition, DayOf, ComboTransform, Feature, AggregationTransform, auto_count_feature_transform, WindowTransform, auto_window_feature_transform

2. テーブルと特徴の変換プロセス

Jupyter Notebook 環境で次のコードを実行することをお勧めします。

  1. テーブル変換を定義します。

    1. クライアントを初期化します。

      access_key_id=os.environ.get ("ALIBABA_CLOUD_ACCESS_KEY_ID") # AccessKey ID を入力します。
      access_key_secret=os.environ.get ("ALIBABA_CLOUD_ACCESS_KEY_SECRET") # AccessKey Secret を入力します。
      project='project_name' # プロジェクト名を入力します。
      region='cn-hangzhou' # プロジェクトが存在するリージョンを入力します。たとえば、プロジェクトが中国 (杭州) にある場合は、cn-hangzhou と入力します。
      fs_client = FeatureStoreClient(access_key_id=access_key_id, access_key_secret=access_key_secret, region=region)
    2. データソースを指定します。

      input_table_name = "rec_sln_demo_behavior_table_preprocess_v1"
      ds = MaxComputeDataSource(table=input_table_name, project=project)
    3. 変換の出力テーブルの名前を指定します。

      output_table_name = "rec_sln_demo_v1_fs_test_v1"
    4. テーブル変換を定義します。

      trans_name = "drop_duplicates"  # テーブル変換の名前。
      keys = ["user_id", "item_id"]  # 重複排除のフィールド。
      sort_keys = ["event_unix_time"]  # ソートフィールド。
      sort_order = ["desc"]  # 順序定義。
      tran_i = TableTransform(trans_name, keys, sort_keys, sort_order)
      
  2. 特徴変換を定義します。

    feature1 = Feature(
        name="page_net_type",
        input=['page', 'net_type'],
        transform=ComboTransform(
            separator='_'
        )
    )
    feature2 = Feature(
        name="trim_playtime",
        type="double",
        transform="playtime/10"
    )
    
  3. パイプラインを生成します。

    pipeline = fs_client.create_pipeline(ds, output_table_name).add_table_transform(tran_i).add_feature_transform([feature1, feature2], keep_input_columns=True)
  4. 変換を生成して実行します。

    execute_date = '20240605'
    output_table = pipeline.execute(execute_date, drop_table=True)

    上記のコードには、次の 2 つのステップが含まれます:

    1. 変換構成を生成します。これらの構成は、SQL 文と、入力、出力、パラメーター、依存関係など、変換に必要な情報を指定します。

    2. 変換を実行します。変換は前のステップの構成に基づいて実行され、結果は出力テーブルに保存されます。

  5. 結果を表示します。

    1. 生成されたテーブルで結果を表示します。結果は pandas.DataFrame 形式で直接レンダリングされます。

      pd_ret = output_table.to_pandas(execute_date, limit=20)
    2. pd_ret の内容を表示します。

      pd_ret
    3. 生成された構成を表示します。これらには、入力テーブル定義、変換 SQL、依存関係、パラメーター、および出力テーブル定義が含まれます。保存後、これらの構成はデバッグおよび後続の定期的なオンラインタスクに使用できます。

      transform_info = output_table.transform_info
    4. transform_info の内容を表示します。

      transform_info
    5. 最初のステージの入力構成を表示します。

      pipeline_config = pipeline.pipeline_config
    6. pipeline_config の内容を表示します。

      pipeline_config

3. 統計的特徴変換

統計的特徴は、機械学習とデータ分析における一般的なデータ前処理手法です。より代表的で解釈しやすい特徴を生成するために使用されます。これらの変換は、生データから情報を要約、計算、抽出し、モデルがデータの時間的傾向、周期性、異常をよりよく理解できるようにします。利点は次のとおりです:

  • 時間的傾向の把握: ユーザーの行動データでは、直近の期間の行動が現在の状態に大きな影響を与えることがよくあります。

  • ノイズの低減: 生データには大量のノイズが含まれている場合があります。統計的変換を使用すると、集約操作を使用してこのノイズの影響を低減できます。

  • 特徴の拡充: 統計的変換は新しい特徴を生成し、モデルの表現力を高めます。

  • モデルパフォーマンスの向上: 統計的特徴を導入することで、モデルの予測パフォーマンスを大幅に向上させることができます。

  • 解釈可能性の向上: 統計的特徴は解釈と理解が容易であり、問題の診断と分析がより便利になります。

  • データ圧縮: 場合によっては、統計的特徴がデータの次元を効果的に削減できます。

統計的特徴の実装プロセスは複雑ですが、以下で説明する簡単な定義を使用して多くの統計的特徴を作成できます。

単一の統計的特徴変換の定義と実行

  1. 入力テーブルと出力テーブルの名前を定義します。

    input_agg_table_name = "rec_sln_demo_behavior_table_preprocess_wide_v1"
    ds_agg = MaxComputeDataSource(table=input_agg_table_name, project=project)
    output_agg_table_name = "rec_sln_demo_behavior_test_agg_v1"
  2. 統計的特徴を定義します。

    feature_agg1 = Feature(
                name="user_avg_praise_count_1d",
                input=["praise_count"],
                transform=AggregationTransform(
                    agg_func="avg", # 集計関数の名前。オプション値は 'avg'、'sum'、'min'、'max' です。
                    condition=Condition(field="event", value="expr", operator="<>"), # "event" フィールドの値が "expr" と等しくない場合に満たされる条件を設定します。生成された SQL 文から具体的なロジックを理解できます。
                    group_by_keys="user_id", # group by に対応するキー。
                    window_size=DayOf(1), # ウィンドウサイズ。ここでは 1 日です。
                ),
            )
  3. パイプラインを作成し、統計的特徴変換を実行できます。

    agg_pipeline = fs_client.create_pipeline(ds_agg, output_agg_table_name).add_feature_transform([feature_agg1])
  4. 変換を生成して実行します。

    execute_date = '20240605'
    print("transform_info = ", agg_pipeline.transform_info)
    output_agg_table = agg_pipeline.execute(execute_date, drop_table=True)
  5. transform_info の内容を表示します。

    transform_info_agg = output_agg_table.transform_info
    transform_info_agg
  6. 結果を表示します。

    pd_ret = output_agg_table.to_pandas(execute_date, limit=20)
    pd_ret

異なるウィンドウを持つ統計的特徴変換の自動 JOIN

  1. 出力テーブル名を定義します。

    output_agg_table_name_2 = "rec_sln_demo_behavior_test_agg_v2"
  2. 複数の異なるウィンドウに対して統計的特徴を定義します。定義されたウィンドウサイズは 1、3、7、15、および 30 日です。

    feature_agg1 = Feature(
                name="user_avg_praise_count_1d",
                input=["praise_count"],
                transform=AggregationTransform(
                    agg_func="avg",
                    condition=Condition(field="event", value="expr", operator="<>"),
                    group_by_keys="user_id",
                    window_size=DayOf(1),
                ),
            )
    feature_agg2 = Feature(
                name="user_avg_praise_count_3d",
                input=["praise_count"],
                transform=AggregationTransform(
                    agg_func="avg",
                    condition=Condition(field="event", value="expr", operator="<>"),
                    group_by_keys="user_id",
                    window_size=DayOf(3),
                ),
            )
    feature_agg3 = Feature(
                name="user_avg_praise_count_7d",
                input=["praise_count"],
                transform=AggregationTransform(
                    agg_func="avg",
                    condition=Condition(field="event", value="expr", operator="<>"),
                    group_by_keys="user_id",
                    window_size=DayOf(7),
                ),
            )
    feature_agg4 = Feature(
                name="user_avg_praise_count_15d",
                input=["praise_count"],
                transform=AggregationTransform(
                    agg_func="avg",
                    condition=Condition(field="event", value="expr", operator="<>"),
                    group_by_keys="user_id",
                    window_size=DayOf(15),
                ),
            )
    feature_agg5 = Feature(
                name="user_avg_praise_count_30d",
                input=["praise_count"],
                transform=AggregationTransform(
                    agg_func="avg",
                    condition=Condition(field="event", value="expr", operator="<>"),
                    group_by_keys="user_id",
                    window_size=DayOf(30),
                ),
            )
  3. パイプラインを作成します。

    agg_pipeline_2 = fs_client.create_pipeline(ds_agg, output_agg_table_name_2).add_feature_transform([feature_agg1, feature_agg2, feature_agg3, feature_agg4, feature_agg5])
  4. パイプラインを生成して実行します。

    execute_date = '20240605'
    output_agg_table_2 = agg_pipeline_2.execute(execute_date, drop_table=True)
  5. 変換結果を表示します。

    transform_info_agg_2 = output_agg_table_2.transform_info
    transform_info_agg_2
  6. テーブル実行の結果を表示します。

    pd_ret_2 = output_agg_table_2.to_pandas(execute_date, limit=20)
    pd_ret_2

複数の統計的特徴変換プロセスの自動マージと型派生

計算を最適化するために、同じウィンドウサイズを持つ複数の特徴は、同じグループウィンドウブロックで自動的にマージおよび計算されます。計算プロセスには型の変更が含まれます。たとえば、`avg` は `bigint` 型を `double` 型に変換します。入力特徴の多数の型を覚えるのは困難です。したがって、統計的特徴変換プロセスは自動型派生をサポートしており、事前に型を指定する必要はありません。結果として得られる特徴の型は、特徴定義中に自動的に派生します。

  1. 出力テーブル名を定義します。

    output_agg_table_name_3 = "rec_sln_demo_behavior_test_agg_v3"
  2. 異なる型の特徴をさらに定義します。

    feature_agg6 = Feature(
                name="user_expr_cnt_1d",
                transform=AggregationTransform(
                    agg_func="count",
                    condition=Condition(field="event", value="expr", operator="="),
                    group_by_keys="user_id",
                    window_size=DayOf(1),
                )
            )
    feature_agg7 = Feature(
                name="user_expr_item_id_dcnt_1d",
                input=['item_id'],
                transform=AggregationTransform(
                    agg_func="count",
                    condition=Condition(field="event", value="expr", operator="="),
                    group_by_keys="user_id",
                    window_size=DayOf(1),
                ),
            )
    feature_agg8 = Feature(
                name="user_sum_praise_count_1d",
                input=["praise_count"],
                transform=AggregationTransform(
                    agg_func="sum",
                    condition=Condition(field="event", value="expr", operator="<>"),
                    group_by_keys="user_id",
                    window_size=DayOf(1),
                ),
            )
    feature_agg9 = Feature(
                name="user_sum_praise_count_3d",
                input=["praise_count"],
                transform=AggregationTransform(
                    agg_func="sum",
                    condition=Condition(field="event", value="expr", operator="<>"),
                    group_by_keys="user_id",
                    window_size=DayOf(3),
                ),
            )
  3. パイプラインを作成します。

    agg_pipeline_3 = fs_client.create_pipeline(ds_agg, output_agg_table_name_3).add_feature_transform([feature_agg1, feature_agg2, feature_agg3, feature_agg4, feature_agg5, feature_agg6, feature_agg7, feature_agg8, feature_agg9])
  4. パイプラインを生成して実行します。

    execute_date = '20240605'
    output_agg_table_3 = agg_pipeline_3.execute(execute_date, drop_table=True)
  5. 変換結果を表示します。

    transform_info_agg_3 = output_agg_table_3.transform_info
    transform_info_agg_3
  6. テーブル実行の結果を表示します。

    pd_ret_3 = output_agg_table_3.to_pandas(execute_date, limit=20)
    pd_ret_3

統計的特徴変換の自動拡張をサポートする組み込み自動拡張関数

各統計的特徴を手動で実装するのは、特徴の数が多く、異なるウィンドウサイズや集計関数計算の多数の組み合わせが含まれるため、複雑です。このシステムは、組み込みの自動拡張関数を提供します。カウントする入力特徴を指定するだけで、システムが数百の統計的特徴の定義を自動的に生成および完了します。

  1. カウントする入力特徴を指定します。

    name_prefix = "user_"
    input_list = ["playtime", "duration", "click_count", "praise_count"]
    event_name = 'event'
    event_type = 'expr'
    group_by_key = "user_id"
    count_feature_list = auto_count_feature_transform(name_prefix, input_list, event_name, event_type, group_by_key)
    print("len_count_feature_list = ", len(count_feature_list))
    print("count_feature_list = ", count_feature_list)
  2. 出力テーブル名を定義し、パイプラインを作成します。

    output_agg_table_name_4 = "rec_sln_demo_behavior_test_agg_v4"
    agg_pipeline_4 =fs_client.create_pipeline(ds_agg, output_agg_table_name_4).add_feature_transform(count_feature_list)
  3. パイプラインを生成して実行します。

    execute_date = '20240605'
    output_agg_table_4 = agg_pipeline_4.execute(execute_date, drop_table=True)
  4. 変換結果を表示します。

    transform_info_agg_4 = output_agg_table_4.transform_info
    transform_info_agg_4
  5. テーブル実行の結果を表示します。

    pd_ret_4 = output_agg_table_4.to_pandas(execute_date, limit=20)
    pd_ret_4

異なるグループキーの同時変換のサポート

前のセクションでは、すべてのグループキーが同じ場合の処理方法について説明しました。このシステムは、異なるグループキーに対する変換操作もサポートしています。以下に例を示します:

  1. 出力テーブル名を定義します。

    input_agg_table_name = "rec_sln_demo_behavior_table_preprocess_wide_v1"
    ds_agg = MaxComputeDataSource(table=input_agg_table_name, project=project)
    output_agg_table_name_5 = "rec_sln_demo_behavior_test_agg_v5"
  2. 異なるグループキーの特徴を定義します。

    feature_agg1 = Feature(
                name="item__sum_follow_cnt_15d",
                input=['follow_cnt'],
                transform=AggregationTransform(
                    agg_func="sum",
                    condition=Condition(field="event", value="expr", operator="="),
                    group_by_keys="item_id",
                    window_size=DayOf(1),
                )
            )
    feature_agg2 = Feature(
                name="author__max_follow_cnt_15d",
                input=['follow_cnt'],
                transform=AggregationTransform(
                    agg_func="max",
                    condition=Condition(field="event", value="expr", operator="="),
                    group_by_keys="author",
                    window_size=DayOf(15),
                ),
            )
  3. パイプラインを作成します。

    agg_pipeline_5 = fs_client.create_pipeline(ds_agg, output_agg_table_name_5).add_feature_transform([feature_agg1, feature_agg2])
  4. パイプラインを生成して実行します。

    execute_date = '20240605'
    output_agg_table_5 = agg_pipeline_5.execute(execute_date, drop_table=True)
  5. 変換結果を表示します。

    transform_info_agg_5 = output_agg_table_5.transform_info
    transform_info_agg_5

4. WindowTransform 特徴変換

前述の統計的特徴変換は、一般的な特徴エンジニアリングシナリオには十分です。ただし、一部の大規模なレコメンデーションシナリオでは、より高度な要件があります。FeatureStore は WindowTransform 特徴変換をサポートしており、KV 特徴を簡単に取得し、日次中間テーブルを使用して計算プロセスを最適化できます。これにより、特徴計算時間が短縮され、計算コストが節約されます。利点は次のとおりです:

  • 複雑な非線形相互作用の把握: 単純な特徴 (ユーザーの年齢や性別など) では、複雑なユーザーの好みを表現するのが困難です。特徴交差は、ユーザーとアイテム間のより複雑な非線形相互作用関係を把握するのに役立ちます。

  • 予測精度の向上: 交差特徴は、レコメンデーションシステムや広告システムのパフォーマンスを大幅に向上させることができます。

  • ストレージスペースの削減: 大規模なユーザーとアイテムのコレクションの場合、各ユーザーとアイテムのペアの相互作用特徴を直接保存することは現実的ではありません。特徴抽出と特徴変換により、保存する必要のある特徴の数を効果的に削減できます。

  • 推論効率の向上: 交差特徴を事前に計算して保存することで、リアルタイム推論中にこれらの特徴を迅速に見つけて使用でき、システムの応答速度が向上します。

WindowTransform 特徴変換の実装プロセスについては、次のセクションで説明します:

単純な集計関数の計算プロセス

単純な集計関数には、count、sum、max、min が含まれます。これらの集計関数の計算プロセスは比較的簡単です。日次レベルの集計後、複数日にわたるさらなる集計が実行され、最終結果が取得されます。さらに、このセクションでは、日次中間テーブルとユーザー定義関数 (UDF) (MaxCompute UDF の概要) を使用して最終的な計算結果を取得する方法も紹介します。実際の計算プロセスでは、ユーザーが特徴エンジニアリングを実行するプロセスは、前述の従来の特徴変換と同じです。FeatureStore Python SDK は、日次中間テーブルの作成、UDF の生成、リソースのアップロード、および関数の自動登録を自動的に処理します。これらの詳細を意識することなく、特徴エンジニアリングプロセスを実装できます。

  1. 入力テーブルと出力テーブルの名前を定義します。

    input_window_table_name = "rec_sln_demo_behavior_table_preprocess_wide_v1"
    ds_window_1 = MaxComputeDataSource(table=input_agg_table_name, project=project)
    output_window_table_name_1 = "rec_sln_demo_behavior_test_window_v1"
  2. WindowTransform 特徴を定義します。

    win_feature1 = Feature(
        name="item__kv_gender_click_7d",
        input=["gender"],
        transform=WindowTransform(
            agg_func="count",
            condition=Condition(field="event", value="click", operator="="),
            group_by_keys="item_id",
            window_size=DayOf(7),
        ),
    )
    win_feature2 = Feature(
        name="item__kv_gender_click_cnt_7d",
        input=["gender"],
        transform=WindowTransform(
            agg_func="sum", # 集計関数。オプション値は 'sum'、'avg'、'max'、'min' です。
            agg_field="click_count", # この特徴に対して集計関数計算を実行します。
            condition=Condition(field="event", value="click", operator="="),
            group_by_keys="item_id",
            window_size=DayOf(7),
        ),
    )
  3. パイプラインを作成し、WindowTransform 特徴変換を実行します。

    window_pipeline_1 = fs_client.create_pipeline(ds_window_1, output_window_table_name_1).add_feature_transform([win_feature1, win_feature2], keep_input_columns=True)
  4. 変換を生成して実行します。

    この生成プロセスでは、日次の一時中間テーブルが作成されます。この時点で、`DROP TABLE` は最終結果のみを削除し、中間の一時テーブルは削除しません。

    execute_date = '20240605'
    print("transform_info = ", window_pipeline_1.transform_info)
    output_window_table_1 = window_pipeline_1.execute(execute_date, drop_table=True)

    さらに、複数日の統計が関係するため (たとえば、上記の例では 7 日間のデータをカウントします)、中間の一時テーブルは通常、最新のパーティションのデータのみを計算します。このため、システムは `backfill_partitions` パラメーターを提供します。プロセスを初めて実行するときに、このパラメーターを `True` に設定すると、システムは依存する日のデータを自動的にバックフィルします。たとえば、カウントに 7 日間のデータが含まれる場合、システムは自動的に 7 日間のデータを完了します。その後の定期的な実行では、パラメーターを `False` に設定して、最新の日のパーティションのデータのみを完了できます。

    execute_date = '20240506'
    output_window_table_1 = window_pipeline_1.execute(execute_date, backfill_partitions=True)
    • `backfill_partitions` パラメーターが `True` に設定されている場合、システムは一時中間テーブルの依存する日のデータを自動的に完了します。最初の定期実行時にこれを行うことをお勧めします。

    • カウントする日数が多い場合、上記のコードの実行には時間がかかります。

  5. 結果テーブルのデータを表示します。

    window_ret_1 = output_window_table_1.to_pandas(execute_date, limit=50)
    window_ret_1
  6. 実際の計算プロセスを表示します。

    window_pipeline_1.transform_info

    計算プロセスからわかるように、システムは `rec_sln_demo_behavior_table_preprocess_wide_v1_tmp_daily` という名前の日次中間一時テーブルを生成します。このテーブルは、日次結果を要約し、固定パーティションに保存することで、繰り返し計算を回避します。

    さらに、`count_kv` という名前の UDF を使用して最終結果を計算します。この UDF は、統計結果を自動的に分類および要約して結果マップにし、文字列形式で保存します。これにより、その後のオフラインおよびオンラインの結果推論が容易になります。

上記の内容では、`count` と `sum` を例として、単純な集計関数の計算プロセスを紹介しました。このプロセスには、日次中間一時テーブルや UDF などの概念が含まれますが、コアフローは従来のデータ変換操作と同じであり、操作の複雑さは増しません。`max` や `min` などの他の単純な集計関数も同様です。

avg 集計関数の計算プロセス

日次平均の結果を平均化すると不正確な計算になるため、avg 集計関数には特定の計算プロセスがあります。正しい方法は、まず期間全体の合計 (sum_v) と合計カウント (count_v) を計算し、次に数式 sum_v/count_v を使用して平均を計算することです。

この集計関数は個別に文書化されていますが、その複雑な計算の詳細は transform_info にカプセル化されています。したがって、基になる詳細を理解する必要はありません。この関数を従来の特徴のように使用して、最終結果を生成できます。

  1. 入力テーブルと出力テーブルの名前を定義します。

    input_window_table_name = "rec_sln_demo_behavior_table_preprocess_wide_v1"
    ds_window_1 = MaxComputeDataSource(table=input_window_table_name, project=project)
    output_window_table_name_2 = "rec_sln_demo_behavior_test_window_v2"
  2. WindowTransform 特徴を定義できます。

    win_feature1 = Feature(
        name="item__kv_gender_click_avg_7d",
        input=["gender"],
        transform=WindowTransform(
            agg_func="avg",
            agg_field="click_count",
            condition=Condition(field="event", value="click", operator="="),
            group_by_keys="item_id",
            window_size=DayOf(7),
        ),
    )
    win_feature2 = Feature(
        name="item__kv_gender_click_avg_15d",
        input=["gender"],
        transform=WindowTransform(
            agg_func="avg",
            agg_field="click_count",
            condition=Condition(field="event", value="click", operator="="),
            group_by_keys="item_id",
            window_size=DayOf(15),
        ),
    )
  3. パイプラインを作成し、WindowTransform 特徴変換を実行します。

    window_pipeline_2 = fs_client.create_pipeline(ds_window_1, output_window_table_name_2).add_feature_transform([win_feature1, win_feature2])
  4. 変換を生成して実行します。

    execute_date = '20240605'
    print("transform_info = ", window_pipeline_2.transform_info)
    output_window_table_2 = window_pipeline_2.execute(execute_date, drop_table=True)
  5. 結果テーブルのデータを表示します。

    window_ret_2 = output_window_table_2.to_pandas(execute_date, limit=50)
    window_ret_2

複数のグループキーの関数計算プロセス

同様に、WindowTransform は複数のグループキーによる同時計算をサポートしています。結果は入力テーブルに左結合されます。以下に例を示します:

  1. 入力テーブルと出力テーブルの名前を定義します。

    input_window_table_name = "rec_sln_demo_behavior_table_preprocess_wide_v1"
    ds_window_1 = MaxComputeDataSource(table=input_window_table_name, project=project)
    output_window_table_name_3 = "rec_sln_demo_behavior_test_window_v3"
  2. WindowTransform 特徴を定義します。

    win_feature1 = Feature(
        name="item__kv_gender_click_7d",
        input=["gender"],
        transform=WindowTransform(
            agg_func="count",
            condition=Condition(field="event", value="click", operator="="),
            group_by_keys="item_id",
            window_size=DayOf(7),
        ),
    )
    win_feature2 = Feature(
        name="item__kv_gender_click_cnt_7d",
        input=["gender"],
        transform=WindowTransform(
            agg_func="sum",
            agg_field="click_count",
            condition=Condition(field="event", value="click", operator="="),
            group_by_keys="item_id",
            window_size=DayOf(7),
        ),
    )
    win_feature3 = Feature(
        name="author__kv_gender_click_15d",
        input=["gender"],
        transform=WindowTransform(
            agg_func="count",
            condition=Condition(field="event", value="click", operator="="),
            group_by_keys="author",
            window_size=DayOf(7),
        ),
    )
    win_feature4 = Feature(
        name="author__kv_gender_click_cnt_15d",
        input=["gender"],
        transform=WindowTransform(
            agg_func="sum",
            agg_field="click_count",
            condition=Condition(field="event", value="click", operator="="),
            group_by_keys="author",
            window_size=DayOf(7),
        ),
    )
  3. パイプラインを作成し、WindowTransform 特徴変換を実行します。

    window_pipeline_3 = fs_client.create_pipeline(ds_window_1, output_window_table_name_3).add_feature_transform([win_feature1, win_feature2, win_feature3, win_feature4])
  4. 変換を生成し、実行します。

    execute_date = '20240605'
    print("transform_info = ", window_pipeline_3.transform_info)
    output_window_table_3 = window_pipeline_3.execute(execute_date, drop_table=True)
  5. 結果テーブルのデータを表示します。

    window_ret_3 = output_window_table_3.to_pandas(execute_date, limit=50)
    window_ret_3

WindowTransform 特徴の自動拡張をサポートする組み込み自動拡張関数

統計的特徴変換と同様に、各 WindowTransform 統計的特徴を手動で実装するのは、特徴の数が多く、異なるウィンドウサイズや集計関数計算の多数の組み合わせが含まれるため、複雑です。このシステムは、組み込みの自動拡張関数を提供します。カウントする入力特徴を指定するだけで、システムが数百の統計的特徴の定義を自動的に生成および完了します。

  1. カウントする入力特徴を指定します。

    name_prefix = "item"
    input_list = ['gender']
    agg_field = ["click_count"]
    event_name = 'event'
    event_type = 'click'
    group_by_key = "item_id"
    window_size = [7, 15, 30, 45]
    window_transform_feature_list = auto_window_feature_transform(name_prefix, input_list, agg_field, event_name, event_type, group_by_key, window_size)
    print("len_window_transform_feature_list = ", len(window_transform_feature_list))
    print("window_transform_feature_list = ", window_transform_feature_list)
  2. 出力テーブル名を定義し、パイプラインを作成します。

    input_window_table_name = "rec_sln_demo_behavior_table_preprocess_wide_v1"
    ds_window_1 = MaxComputeDataSource(table=input_window_table_name, project=project)
    output_window_table_name_4 = "rec_sln_demo_behavior_test_window_v4"
    window_pipeline_4 =fs_client.create_pipeline(ds_window_1, output_window_table_name_4).add_feature_transform(window_transform_feature_list)
  3. 変換を生成して実行します。

    execute_date = '20240605'
    print("transform_info = ", window_pipeline_4.transform_info)
    output_window_table_4 = window_pipeline_4.execute(execute_date, drop_table=True)

JoinTransform 変換

前述の特徴エンジニアリングプロセス、特に AggregationTransform と WindowTransform では、入力は行動テーブルであり、出力結果も行動テーブルに保存されます。ただし、ほとんどの場合、オンラインで使用するために必要な最終テーブルは行動テーブルではなく、行動テーブルをユーザーテーブルやアイテムテーブルなどの他のテーブルと結合して生成された特徴テーブルです。

したがって、AggregationTransform および WindowTransform からの特徴を既存のユーザーテーブルまたはアイテムテーブルと結合するために `JoinTransform` が導入されています。

JoinTransform と WindowTransform の関連付け
  1. WindowTransform 入力テーブルを定義します。

    window_table_name = 'rec_sln_demo_behavior_table_preprocess_wide_v1'
    ds_window_1 = MaxComputeDataSource(table=window_table_name, project=project)
  2. WindowTransform 特徴を定義します。

    win_fea1 = Feature(
        name="item__kv_gender_click_7d",
        input=["gender"],
        transform=WindowTransform(
            agg_func="count",
            condition=Condition(field="event", value="click", operator="="),
            group_by_keys="item_id",
            window_size=DayOf(7),
      )
    )
  3. パイプラインを作成します。

    説明

    後で他のテーブルを結合する必要があるため、ここでは出力テーブルは指定されていません。

    win_pipeline_1 = fs_client.create_pipeline(ds_window_1).add_feature_transform([win_fea1])
  4. JoinTransform の入力テーブルと出力テーブルを定義します。

    item_table_name = 'rec_sln_demo_item_table_preprocess_v1'
    ds_join_1 = MaxComputeDataSource(table=item_table_name, project=project)
    output_table_name = 'rec_sln_demo_item_table_v1_fs_window_debug_v1'
  5. JoinTransform パイプラインを作成し、WindowTransform パイプラインと接続します。

    join_pipeline_1 = fs_client.create_pipeline(ds_join_1, output_table_name).merge(win_pipeline_1)
  6. 変換を生成して実行します。

    execute_date = '20240605'
    output_join_table_1 = join_pipeline_1.execute(execute_date, drop_table=True)
  7. 結果テーブルのデータを表示します。

    join_ret_1 = output_join_table_1.to_pandas(execute_date, limit = 50)
    join_ret_1
  8. 実際の計算プロセスを表示します。

    output_join_table_1.transform_info
JoinTransform と AggregationTransform の関連付け
  1. AggregationTransform 入力テーブルを定義します。

    agg_table_name = 'rec_sln_demo_behavior_table_preprocess_wide_v1'
    ds_agg_1 = MaxComputeDataSource(table=agg_table_name, project=project)
  2. AggregationTransform 特徴を定義します。

    agg_fea1 = Feature(
        name="user_avg_praise_count_1d",
        input=["praise_count"],
        transform=AggregationTransform(
            agg_func="avg",
            condition=Condition(field="event", value="expr", operator="<>"),
            group_by_keys="user_id",
            window_size=DayOf(1),
        ),
    )
  3. パイプラインを作成します。

    説明

    後で他のテーブルを結合する必要があるため、ここでは出力テーブルは指定されていません。

    agg_pipeline_1 = fs_client.create_pipeline(ds_agg_1).add_feature_transform([agg_fea1])
  4. JoinTransform の入力テーブルと出力テーブルを定義します。

    user_table_name = 'rec_sln_demo_user_table_preprocess_v1'
    ds_join_2 = MaxComputeDataSource(table=user_table_name, project=project)
    output_table_name_2 = 'rec_sln_demo_user_table_v1_fs_window_debug_v1'
  5. JoinTransform パイプラインを作成し、AggregationTransform パイプラインと接続します。

    join_pipeline_2 = fs_client.create_pipeline(ds_join_2, output_table_name_2).merge(agg_pipeline_1, keep_input_columns=False)
  6. 変換を生成して実行します。

    execute_date = '20240605'
    output_join_table_2 = join_pipeline_2.execute(execute_date, drop_table=True)
  7. 結果テーブルのデータを表示します。

    join_ret_2 = output_join_table_2.to_pandas(execute_date, limit = 50)
    join_ret_2
  8. 実際の計算プロセスを表示します。

    output_join_table_2.transform_info

リファレンス

  • アプリケーションシナリオの詳細については、「特徴エンジニアリングのベストプラクティス」をご参照ください。

  • FeatureStore は、レコメンデーション、金融リスク管理、ユーザーグロースシナリオなど、特徴を必要とするすべてのシナリオに適しています。FeatureStore は、Alibaba Cloud の一般的なデータソースエンジンおよびレコメンデーションサービスエンジンと統合されています。FeatureStore は、特徴の登録からモデル開発、アプリケーションまで、効率的で便利なエンドツーエンドのプラットフォームを提供します。FeatureStore の詳細については、「概要」をご参照ください。

  • FeatureStore の使用中に質問がある場合は、DingTalk グループ 34415007523 に参加して技術支援を受けることができます。