目前在特徵平台(FeatureStore)中支援的特徵生產功能在推薦、廣告、風控以及機器學習等領域都有廣泛的應用。該功能旨在降低特徵生產的複雜度,通過將特徵生產中通用常見的功能固定下來,通過配置的方式即可實現特徵生產。本文為您介紹特徵生產的詳細過程。
前提條件
在開始執行操作前,請確認您已完成以下準備工作:
依賴產品 | 具體操作 |
人工智慧平台PAI |
|
雲原生MaxCompute |
|
巨量資料開發治理平台DataWorks |
|
一、準備工作
準備未經處理資料
本文需要用到的四張原始表分別為:
使用者表(rec_sln_demo_user_table_preprocess_v1):包含一些基礎的使用者特徵,例如性別、年齡、城市和關注數等。
行為表(rec_sln_demo_behavior_table_preprocess_v1):包含一些行為特徵,例如某時使用者點擊某個物品等。
物品表(rec_sln_demo_item_table_preprocess_v1):包含一些基礎的物品特徵,例如類別、作者、累計點擊數和累計點贊數等。
行為寬表(rec_sln_demo_behavior_table_preprocess_wide_v1):該表由前三張表串連而成。
資料表存放在有公開讀取許可權的pai_online_project中,其資料均為類比資料產生。您需要在DataWorks中執行SQL命令,將上表資料從pai_online_project專案同步到您的MaxCompute專案中。具體操作步驟如下:
登入DataWorks控制台。
在左側導覽列單擊資料開發與營運 > 資料開發。
選擇已建立的DataWorks工作空間後,單擊進入資料開發。
滑鼠移至上方至建立,選擇建立節點 > MaxCompute > ODPS SQL,在彈出的頁面中配置節點參數。
參數
取值建議
引擎執行個體
選擇已建立的MaxCompute引擎。
節點類型
ODPS SQL
路徑
商務程序/Workflow/MaxCompute
名稱
可自訂名稱。
單擊確認。
在建立節點地區運行以下SQL命令,將使用者表、物品表、行為表以及行為寬表資料從pai_online_project專案同步到您的MaxCompute專案中。資源群組選擇已建立的獨享資源群組。
同步處理的使用者表:rec_sln_demo_user_table_preprocess_v1
CREATE TABLE IF NOT EXISTS rec_sln_demo_user_table_preprocess_v1 like pai_online_project.rec_sln_demo_user_table_preprocess_v1 STORED AS ALIORC LIFECYCLE 90; INSERT OVERWRITE TABLE rec_sln_demo_user_table_preprocess_v1 PARTITION(ds) SELECT * FROM pai_online_project.rec_sln_demo_user_table_preprocess_v1 WHERE ds >= '20240530' and ds <='20240605';同步行為表:rec_sln_demo_behavior_table_preprocess_v1
CREATE TABLE IF NOT EXISTS rec_sln_demo_behavior_table_preprocess_v1 like pai_online_project.rec_sln_demo_behavior_table_preprocess_v1 STORED AS ALIORC LIFECYCLE 90; INSERT OVERWRITE TABLE rec_sln_demo_behavior_table_preprocess_v1 PARTITION(ds) SELECT * FROM pai_online_project.rec_sln_demo_behavior_table_preprocess_v1 WHERE ds >= '20240530' and ds <='20240605';同步物品表:rec_sln_demo_item_table_preprocess_v1
CREATE TABLE IF NOT EXISTS rec_sln_demo_item_table_preprocess_v1 like pai_online_project.rec_sln_demo_item_table_preprocess_v1 STORED AS ALIORC LIFECYCLE 90; INSERT OVERWRITE TABLE rec_sln_demo_item_table_preprocess_v1 PARTITION(ds) SELECT * FROM pai_online_project.rec_sln_demo_item_table_preprocess_v1 WHERE ds >= '20240530' and ds <='20240605';同步行為寬表:rec_sln_demo_behavior_table_preprocess_wide_v1
CREATE TABLE IF NOT EXISTS rec_sln_demo_behavior_table_preprocess_wide_v1 like pai_online_project.rec_sln_demo_behavior_table_preprocess_wide_v1 STORED AS ALIORC LIFECYCLE 90; INSERT OVERWRITE TABLE rec_sln_demo_behavior_table_preprocess_wide_v1 PARTITION(ds) SELECT * FROM pai_online_project.rec_sln_demo_behavior_table_preprocess_wide_v1 WHERE ds >= '20240530' and ds <='20240605';
安裝FeatureStore Python SDK
以下代碼均建議在Jupyter Notebook環境下運行。
安裝特徵平台Python SDK,要求在Python3環境下運行。
%pip install https://feature-store-py.oss-cn-beijing.aliyuncs.com/package/feature_store_py-2.0.2-py3-none-any.whl匯入需要的功能模組:
import os from feature_store_py import FeatureStoreClient from feature_store_py.fs_datasource import MaxComputeDataSource from feature_store_py.feature_engineering import TableTransform, Condition, DayOf, ComboTransform, Feature, AggregationTransform, auto_count_feature_transform, WindowTransform, auto_window_feature_transform
二、表變換和特徵變換操作流程
以下代碼均建議在Jupyter Notebook環境下運行。
定義表變換。
初始化Client。
access_key_id = os.environ.get("ALIBABA_CLOUD_ACCESS_KEY_ID") # 填入您的Access Key ID access_key_secret = os.environ.get("ALIBABA_CLOUD_ACCESS_KEY_SECRET") # 填入您的Access Key Secret project = 'project_name' # 填入您的專案名 region = 'cn-hangzhou' # 填入您的專案所在地區,例如華東1(杭州)為cn-hangzhou fs_client = FeatureStoreClient(access_key_id=access_key_id, access_key_secret=access_key_secret, region=region)指定資料來源。
input_table_name = "rec_sln_demo_behavior_table_preprocess_v1" ds = MaxComputeDataSource(table=input_table_name, project=project)指定變換後的輸出表名稱。
output_table_name = "rec_sln_demo_v1_fs_test_v1"定義表變換。
trans_name = "drop_duplicates" # 表變換名稱 keys = ["user_id", "item_id"] # 去重欄位 sort_keys = ["event_unix_time"] # 排序欄位 sort_order = ["desc"] # 順序定義 tran_i = TableTransform(trans_name, keys, sort_keys, sort_order)
定義特徵變換。
feature1 = Feature( name="page_net_type", input=['page', 'net_type'], transform=ComboTransform( separator='_' ) ) feature2 = Feature( name="trim_playtime", type="double", transform="playtime/10" )產生pipeline流程。
pipeline = fs_client.create_pipeline(ds, output_table_name).add_table_transform(tran_i).add_feature_transform([feature1, feature2], keep_input_columns=True)產生與執行變換。
execute_date = '20240605' output_table = pipeline.execute(execute_date, drop_table=True)執行上述代碼共分以下兩個步驟:
產生變換配置。該配置除了指定SQL外,還指定了變換所需的各種資訊,包括輸入輸出、參數和依賴等。
執行。根據第一個步驟產生的配置執行,將變換後的結果存放在輸出表中。
查看結果。
查看產生表的結果,該結果直接以pandas.DataFrame的形式呈現。
pd_ret = output_table.to_pandas(execute_date, limit=20)展示pd_ret的內容。
pd_ret查看產生的配置,該配置包含輸入表定義、變換SQL、依賴、參數以及輸出表定義等。該配置既適用於Debug調試,也適用於儲存後進行後續的上線例行任務等。
transform_info = output_table.transform_info查看transform_info的內容。
transform_info查看第一階段的輸入配置。
pipeline_config = pipeline.pipeline_config查看pipeline_config的內容。
pipeline_config
三、統計類型特徵變換
統計特徵普遍存在於特徵生產情境中,是機器學習和資料分析中常見的一種資料預先處理方法,通常用於產生更具代表性和更容易解釋的特徵。這些變換通過對未經處理資料進行匯總、計算和抽取,使得模型能夠更好地理解資料的時間趨勢、周期性和異常情況。具體優勢如下:
捕捉時間趨勢:在使用者行為資料中,最近一段時間的行為往往對目前狀態有更大的影響。
降低噪音:未經處理資料中可能包含大量噪音。通過統計變換,您可以通過彙總操作來減少這些噪音的影響。
豐富特徵:統計變換可以產生新特徵,增加模型的表達能力。
提高模型效能:通過引入統計特徵,您可以顯著提高模型的預測效能。
增強解釋性:統計特徵更加易於解釋和理解,使得問題的診斷和分析更為方便。
資料壓縮:在某些情況下,統計特徵可以有效減少資料維度。
雖然統計特徵的具體實現過程較為複雜,但通過如下介紹的簡單的統計特徵定義,就可以生產出大量的統計特徵。
單個統計類型特徵變化定義與運行
定義輸入和輸出表名。
input_agg_table_name = "rec_sln_demo_behavior_table_preprocess_wide_v1" ds_agg = MaxComputeDataSource(table=input_agg_table_name, project=project) output_agg_table_name = "rec_sln_demo_behavior_test_agg_v1"定義一個統計類型特徵。
feature_agg1 = Feature( name="user_avg_praise_count_1d", input=["praise_count"], transform=AggregationTransform( agg_func="avg", # 彙總函式名稱,可選的有 'avg', 'sum', 'min', 'max' condition=Condition(field="event", value="expr", operator="<>"), # 設定條件判斷,表示當欄位 "event" 的值不等於 "expr" 時成立。具體邏輯可結合產生的SQL語句來進一步理解。 group_by_keys="user_id", # group by 對應的 key window_size=DayOf(1), # 視窗大小,這裡是 1 天 ), )建立pipeline,運行統計類型特徵變換。
agg_pipeline = fs_client.create_pipeline(ds_agg, output_agg_table_name).add_feature_transform([feature_agg1])生產與執行變換。
execute_date = '20240605' print("transform_info = ", agg_pipeline.transform_info) output_agg_table = agg_pipeline.execute(execute_date, drop_table=True)查看transform_info內容。
transform_info_agg = output_agg_table.transform_info transform_info_agg查看結果。
pd_ret = output_agg_table.to_pandas(execute_date, limit=20) pd_ret
多個不同視窗統計類型特徵變換自動JOIN
定義輸出表名。
output_agg_table_name_2 = "rec_sln_demo_behavior_test_agg_v2"定義多個不同視窗的統計類型特徵,定義的視窗大小分為別1、3、7、15和30。
feature_agg1 = Feature( name="user_avg_praise_count_1d", input=["praise_count"], transform=AggregationTransform( agg_func="avg", condition=Condition(field="event", value="expr", operator="<>"), group_by_keys="user_id", window_size=DayOf(1), ), ) feature_agg2 = Feature( name="user_avg_praise_count_3d", input=["praise_count"], transform=AggregationTransform( agg_func="avg", condition=Condition(field="event", value="expr", operator="<>"), group_by_keys="user_id", window_size=DayOf(3), ), ) feature_agg3 = Feature( name="user_avg_praise_count_7d", input=["praise_count"], transform=AggregationTransform( agg_func="avg", condition=Condition(field="event", value="expr", operator="<>"), group_by_keys="user_id", window_size=DayOf(7), ), ) feature_agg4 = Feature( name="user_avg_praise_count_15d", input=["praise_count"], transform=AggregationTransform( agg_func="avg", condition=Condition(field="event", value="expr", operator="<>"), group_by_keys="user_id", window_size=DayOf(15), ), ) feature_agg5 = Feature( name="user_avg_praise_count_30d", input=["praise_count"], transform=AggregationTransform( agg_func="avg", condition=Condition(field="event", value="expr", operator="<>"), group_by_keys="user_id", window_size=DayOf(30), ), )建立pipeline。
agg_pipeline_2 = fs_client.create_pipeline(ds_agg, output_agg_table_name_2).add_feature_transform([feature_agg1, feature_agg2, feature_agg3, feature_agg4, feature_agg5])執行pipeline生產過程。
execute_date = '20240605' output_agg_table_2 = agg_pipeline_2.execute(execute_date, drop_table=True)查看變換結果。
transform_info_agg_2 = output_agg_table_2.transform_info transform_info_agg_2查看錶運行結果。
pd_ret_2 = output_agg_table_2.to_pandas(execute_date, limit=20) pd_ret_2
多個統計特徵變換過程支援自動歸併和類型自動推導
為了最佳化計算過程,多個特徵同視窗大小時會自動歸併,在同一個group視窗塊中計算完成。 整個計算過程會涉及類型的改變,例如avg會將bigint類型變換為double類型,而且輸入特徵的類型眾多比較難記住。因此在整個統計特徵的變換過程中,會支援類型自動推導,即不需要預先指定類型,特徵定義過程中會自動推匯出結果特徵的類型。
定義輸出表名。
output_agg_table_name_3 = "rec_sln_demo_behavior_test_agg_v3"定義更多不同類型的特徵。
feature_agg6 = Feature( name="user_expr_cnt_1d", transform=AggregationTransform( agg_func="count", condition=Condition(field="event", value="expr", operator="="), group_by_keys="user_id", window_size=DayOf(1), ) ) feature_agg7 = Feature( name="user_expr_item_id_dcnt_1d", input=['item_id'], transform=AggregationTransform( agg_func="count", condition=Condition(field="event", value="expr", operator="="), group_by_keys="user_id", window_size=DayOf(1), ), ) feature_agg8 = Feature( name="user_sum_praise_count_1d", input=["praise_count"], transform=AggregationTransform( agg_func="sum", condition=Condition(field="event", value="expr", operator="<>"), group_by_keys="user_id", window_size=DayOf(1), ), ) feature_agg9 = Feature( name="user_sum_praise_count_3d", input=["praise_count"], transform=AggregationTransform( agg_func="sum", condition=Condition(field="event", value="expr", operator="<>"), group_by_keys="user_id", window_size=DayOf(3), ), )建立pipeline。
agg_pipeline_3 = fs_client.create_pipeline(ds_agg, output_agg_table_name_3).add_feature_transform([feature_agg1, feature_agg2, feature_agg3, feature_agg4, feature_agg5, feature_agg6, feature_agg7, feature_agg8, feature_agg9])執行pipeline生產過程。
execute_date = '20240605' output_agg_table_3 = agg_pipeline_3.execute(execute_date, drop_table=True)查看變換結果。
transform_info_agg_3 = output_agg_table_3.transform_info transform_info_agg_3查看錶運行結果。
pd_ret_3 = output_agg_table_3.to_pandas(execute_date, limit=20) pd_ret_3
內建自動擴充函數,支援對統計特徵變換自動擴充
因統計特徵較多,包括不同的視窗大小和眾多彙總函式計算群組合,手動實現每個特徵比較複雜。系統內建了自動擴充函數,僅需指定要統計的輸入特徵,即可自動產生並完成數百種統計特徵的定義。
指定要統計的輸入特徵。
name_prefix = "user_" input_list = ["playtime", "duration", "click_count", "praise_count"] event_name = 'event' event_type = 'expr' group_by_key = "user_id" count_feature_list = auto_count_feature_transform(name_prefix, input_list, event_name, event_type, group_by_key) print("len_count_feature_list = ", len(count_feature_list)) print("count_feature_list = ", count_feature_list)定義輸出表名,並建立pipeline。
output_agg_table_name_4 = "rec_sln_demo_behavior_test_agg_v4" agg_pipeline_4 =fs_client.create_pipeline(ds_agg, output_agg_table_name_4).add_feature_transform(count_feature_list)執行pipeline生產過程。
execute_date = '20240605' output_agg_table_4 = agg_pipeline_4.execute(execute_date, drop_table=True)查看變換結果。
transform_info_agg_4 = output_agg_table_4.transform_info transform_info_agg_4查看錶運行結果。
pd_ret_4 = output_agg_table_4.to_pandas(execute_date, limit=20) pd_ret_4
支援不同的group key同時變換
以上內容闡述了當所有group key均相同時的處理方法,此外,系統還支援對不同group key進行轉換操作。具體樣本如下:
定義輸出表名。
input_agg_table_name = "rec_sln_demo_behavior_table_preprocess_wide_v1" ds_agg = MaxComputeDataSource(table=input_agg_table_name, project=project) output_agg_table_name_5 = "rec_sln_demo_behavior_test_agg_v5"定義不同group key的特徵。
feature_agg1 = Feature( name="item__sum_follow_cnt_15d", input=['follow_cnt'], transform=AggregationTransform( agg_func="sum", condition=Condition(field="event", value="expr", operator="="), group_by_keys="item_id", window_size=DayOf(1), ) ) feature_agg2 = Feature( name="author__max_follow_cnt_15d", input=['follow_cnt'], transform=AggregationTransform( agg_func="max", condition=Condition(field="event", value="expr", operator="="), group_by_keys="author", window_size=DayOf(15), ), )建立pipeline。
agg_pipeline_5 = fs_client.create_pipeline(ds_agg, output_agg_table_name_5).add_feature_transform([feature_agg1, feature_agg2])執行pipeline生產過程。
execute_date = '20240605' output_agg_table_5 = agg_pipeline_5.execute(execute_date, drop_table=True)查看變換結果。
transform_info_agg_5 = output_agg_table_5.transform_info transform_info_agg_5
四、WindowTransform視窗變換特徵
上述統計類型特徵變換基本能滿足常規特徵生產情境。但在某些大規模推薦等情境中,還會面臨更高層次要求。FeatureStore支援WindowTransform視窗變換特徵,您可以方便地得到KV特徵,還可以利用天級中間表的形式來最佳化計算過程,進而達到降低特徵計算時間、節省計算成本的目的。具體優勢如下:
捕捉複雜非線性互動:簡單的特徵(如使用者的年齡、性別等)難以表達使用者的複雜偏好。特徵交叉可以協助捕捉使用者和物品之間更複雜的非線性互動關係。
提升預測準確性:交叉特徵可以顯著提升推薦系統和廣告系統的效能。
減少儲存空間:對於大規模的使用者和物品集合,直接儲存每一對使用者和物品的互動特徵是不現實的。特徵抽取和特徵轉換能有效減少需要儲存的特徵數量。
提升推理效率:通過預先計算並儲存交叉特徵,在即時推理時可以快速尋找並利用這些特徵,從而提升系統響應速度。
按照以下幾個維度為您介紹WindowTransform視窗變換特徵的實現過程:
簡單彙總函式計算過程
簡單彙總函式包括count、sum、max和min 。這些彙總函式的計算過程比較直接,即通過天層級匯總後,再進一步進行多天的匯總即可得到最終的結果。此外,本文還將介紹天級中間表,以及引入了UDF(MaxCompute UDF概述)以得到最終的計算結果。在實際的計算過程中,使用者執行特徵生產的過程還是和上述常規特徵變換一樣,Feature Store Python SDK會自動處理天級中間表的建立、UDF的產生、資源上傳以及函數自動註冊等,使用者無需感知這些細節即可實現特徵生產過程。
定義輸入和輸出表名。
input_window_table_name = "rec_sln_demo_behavior_table_preprocess_wide_v1" ds_window_1 = MaxComputeDataSource(table=input_agg_table_name, project=project) output_window_table_name_1 = "rec_sln_demo_behavior_test_window_v1"定義WindowTransform特徵。
win_feature1 = Feature( name="item__kv_gender_click_7d", input=["gender"], transform=WindowTransform( agg_func="count", condition=Condition(field="event", value="click", operator="="), group_by_keys="item_id", window_size=DayOf(7), ), ) win_feature2 = Feature( name="item__kv_gender_click_cnt_7d", input=["gender"], transform=WindowTransform( agg_func="sum", # 彙總函式,可選的有 'sum', 'avg', 'max', 'min' agg_field="click_count", # 對該特徵進資料列彙總函式的計算 condition=Condition(field="event", value="click", operator="="), group_by_keys="item_id", window_size=DayOf(7), ), )建立pipeline,運行WindowTransform類型特徵變換。
window_pipeline_1 = fs_client.create_pipeline(ds_window_1, output_window_table_name_1).add_feature_transform([win_feature1, win_feature2], keep_input_columns=True)生產與執行變換。
該生產過程有天級臨時中間表產生,此時的drop table只會刪掉最終的結果,不會刪除中間的暫存資料表。
execute_date = '20240605' print("transform_info = ", window_pipeline_1.transform_info) output_window_table_1 = window_pipeline_1.execute(execute_date, drop_table=True)另外,因涉及多天的統計,例如上述樣本是統計7天的資料,中間的暫存資料表一般只會計算最新分區的資料。為此系統在執行特徵生產過程中提供了一個參數 backfill_partitions,當第一次執行時可以將其設定為True,系統會自動補全依賴的天數。例如本次統計是涉及7天的資料,系統會自動將7天的資料補全,這樣在後面例行運行時,還可以再設定為False,只需要補全最新一天分區的資料即可。
execute_date = '20240506' output_window_table_1 = window_pipeline_1.execute(execute_date, backfill_partitions=True)當設定backfill_partitions參數為True時,系統會自動補全臨時中間表的依賴天數的資料,建議在第一次例行運行時運行。
當統計天數較多時,運行上述代碼的時間較長。
查看結果表資料。
window_ret_1 = output_window_table_1.to_pandas(execute_date, limit=50) window_ret_1查看實際的計算過程。
window_pipeline_1.transform_info通過計算過程可以看出,系統會產生天層級的中間暫存資料表 rec_sln_demo_behavior_table_preprocess_wide_v1_tmp_daily ,該表會將天級的結果匯總儲存在固定的分區中,這樣會避免重複計算。
另外在計算最終結果時,還會使用到一個UDF:count_kv,該UDF會自動將統計結果分類匯總到結果map中,以String的形式存在,方便後續進行離線和線上結果推理。
上述內容介紹了簡單彙總函式計算過程,以count、sum為例執行了整個生產過程,該生產過程雖然涉及天級的中間暫存資料表、UDF等概念,但核心流程和常規的資料變換操作一樣,未增加操作複雜度。其它簡單彙總函式max和min同理。
avg彙總函式計算過程
因為在一天中計算avg結果後,再通過多天來匯總結果,會導致計算結果不準確,因此單獨介紹avg彙總函式計算過程。正確的計算方法是先計算多天的 sum_v 累加值和 count_v 計數結果,然後再通過sum_v/count_v得出多天的avg值。
儘管該彙總函式被單獨介紹,其複雜的計算細節已被封裝在transform_info中。因此,使用者無須深入瞭解這些細節,而只需像處理常規特徵那樣進行操作,即可順利產生最終結果。
定義輸入和輸出表名。
input_window_table_name = "rec_sln_demo_behavior_table_preprocess_wide_v1" ds_window_1 = MaxComputeDataSource(table=input_window_table_name, project=project) output_window_table_name_2 = "rec_sln_demo_behavior_test_window_v2"定義WindowTransform特徵。
win_feature1 = Feature( name="item__kv_gender_click_avg_7d", input=["gender"], transform=WindowTransform( agg_func="avg", agg_field="click_count", condition=Condition(field="event", value="click", operator="="), group_by_keys="item_id", window_size=DayOf(7), ), ) win_feature2 = Feature( name="item__kv_gender_click_avg_15d", input=["gender"], transform=WindowTransform( agg_func="avg", agg_field="click_count", condition=Condition(field="event", value="click", operator="="), group_by_keys="item_id", window_size=DayOf(15), ), )建立pipeline,運行WindowTransform類型特徵變換。
window_pipeline_2 = fs_client.create_pipeline(ds_window_1, output_window_table_name_2).add_feature_transform([win_feature1, win_feature2])生產與執行變換。
execute_date = '20240605' print("transform_info = ", window_pipeline_2.transform_info) output_window_table_2 = window_pipeline_2.execute(execute_date, drop_table=True)查看結果表資料。
window_ret_2 = output_window_table_2.to_pandas(execute_date, limit=50) window_ret_2
多種group key的Function Compute過程
同樣,WindowTransform支援多種group key來同時計算,結果最終left join在輸入表中。具體樣本如下所示:
定義輸入和輸出表名。
input_window_table_name = "rec_sln_demo_behavior_table_preprocess_wide_v1" ds_window_1 = MaxComputeDataSource(table=input_window_table_name, project=project) output_window_table_name_3 = "rec_sln_demo_behavior_test_window_v3"定義WindowTransform特徵。
win_feature1 = Feature( name="item__kv_gender_click_7d", input=["gender"], transform=WindowTransform( agg_func="count", condition=Condition(field="event", value="click", operator="="), group_by_keys="item_id", window_size=DayOf(7), ), ) win_feature2 = Feature( name="item__kv_gender_click_cnt_7d", input=["gender"], transform=WindowTransform( agg_func="sum", agg_field="click_count", condition=Condition(field="event", value="click", operator="="), group_by_keys="item_id", window_size=DayOf(7), ), ) win_feature3 = Feature( name="author__kv_gender_click_15d", input=["gender"], transform=WindowTransform( agg_func="count", condition=Condition(field="event", value="click", operator="="), group_by_keys="author", window_size=DayOf(7), ), ) win_feature4 = Feature( name="author__kv_gender_click_cnt_15d", input=["gender"], transform=WindowTransform( agg_func="sum", agg_field="click_count", condition=Condition(field="event", value="click", operator="="), group_by_keys="author", window_size=DayOf(7), ), )建立pipeline,運行WindowTransform類型特徵變換。
window_pipeline_3 = fs_client.create_pipeline(ds_window_1, output_window_table_name_3).add_feature_transform([win_feature1, win_feature2, win_feature3, win_feature4])生產與執行變換。
execute_date = '20240605' print("transform_info = ", window_pipeline_3.transform_info) output_window_table_3 = window_pipeline_3.execute(execute_date, drop_table=True)查看結果表資料。
window_ret_3 = output_window_table_3.to_pandas(execute_date, limit=50) window_ret_3
內建自動擴充函數,支援對WindowTransform特徵自動擴充
和統計類型特徵變換類似,WindowTransform統計特徵較多,包括不同的視窗大小和眾多彙總函式計算群組合,手動實現每個特徵比較複雜。系統內建了自動擴充函數,僅需指定要統計的輸入特徵,即可自動產生並完成數百種統計特徵的定義。
指定要統計的輸入特徵。
name_prefix = "item" input_list = ['gender'] agg_field = ["click_count"] event_name = 'event' event_type = 'click' group_by_key = "item_id" window_size = [7, 15, 30, 45] window_transform_feature_list = auto_window_feature_transform(name_prefix, input_list, agg_field, event_name, event_type, group_by_key, window_size) print("len_window_transform_feature_list = ", len(window_transform_feature_list)) print("window_transform_feature_list = ", window_transform_feature_list)定義輸出表名,建立pipeline。
input_window_table_name = "rec_sln_demo_behavior_table_preprocess_wide_v1" ds_window_1 = MaxComputeDataSource(table=input_window_table_name, project=project) output_window_table_name_4 = "rec_sln_demo_behavior_test_window_v4" window_pipeline_4 =fs_client.create_pipeline(ds_window_1, output_window_table_name_4).add_feature_transform(window_transform_feature_list)生產與執行變換。
execute_date = '20240605' print("transform_info = ", window_pipeline_4.transform_info) output_window_table_4 = window_pipeline_4.execute(execute_date, drop_table=True)
JoinTransform聯結轉換
上述特徵生產過程,尤其是AggregationTransform和WindowTransform的輸入均為行為表,輸出結果也是存放於一個行為表中。而在大部分情況下,最終上線需要的不是一個行為表,而是要和別的表進行串連,產生特徵表,例如user表或item表。
因此引入joinTransform,支援將AggregationTransform和WindowTransform的特徵和已有的user表或item 表串連起來。
JoinTransform和WindowTransform進行關聯
定義WindowTransform輸入表。
window_table_name = 'rec_sln_demo_behavior_table_preprocess_wide_v1' ds_window_1 = MaxComputeDataSource(table=window_table_name, project=project)定義WindowTransform特徵。
win_fea1 = Feature( name="item__kv_gender_click_7d", input=["gender"], transform=WindowTransform( agg_func="count", condition=Condition(field="event", value="click", operator="="), group_by_keys="item_id", window_size=DayOf(7), ) )建立pipeline。
說明因後續需要join其他表,此處未指定輸出表。
win_pipeline_1 = fs_client.create_pipeline(ds_window_1).add_feature_transform([win_fea1])定義JoinTransform輸入和輸出表。
item_table_name = 'rec_sln_demo_item_table_preprocess_v1' ds_join_1 = MaxComputeDataSource(table=item_table_name, project=project) output_table_name = 'rec_sln_demo_item_table_v1_fs_window_debug_v1'建立JoinTransform pipeline,和Window Transform pipeline串連在一起。
join_pipeline_1 = fs_client.create_pipeline(ds_join_1, output_table_name).merge(win_pipeline_1)生產和執行變換。
execute_date = '20240605' output_join_table_1 = join_pipeline_1.execute(execute_date, drop_table=True)查看結果表資料。
join_ret_1 = output_join_table_1.to_pandas(execute_date, limit = 50) join_ret_1查看實際的計算過程。
output_join_table_1.transform_info
JoinTransform和AggregationTransform進行關聯
定義AggregationTransform輸入表。
agg_table_name = 'rec_sln_demo_behavior_table_preprocess_wide_v1' ds_agg_1 = MaxComputeDataSource(table=agg_table_name, project=project)定義AggregationTransform特徵。
agg_fea1 = Feature( name="user_avg_praise_count_1d", input=["praise_count"], transform=AggregationTransform( agg_func="avg", condition=Condition(field="event", value="expr", operator="<>"), group_by_keys="user_id", window_size=DayOf(1), ), )建立pipeline。
說明因後續需要join其他表,此處未指定輸出表。
agg_pipeline_1 = fs_client.create_pipeline(ds_agg_1).add_feature_transform([agg_fea1])定義JoinTransform輸入和輸出表。
user_table_name = 'rec_sln_demo_user_table_preprocess_v1' ds_join_2 = MaxComputeDataSource(table=user_table_name, project=project) output_table_name_2 = 'rec_sln_demo_user_table_v1_fs_window_debug_v1'建立JoinTransform pipeline,和AggregationTransform串連在一起。
join_pipeline_2 = fs_client.create_pipeline(ds_join_2, output_table_name_2).merge(agg_pipeline_1, keep_input_columns=False)生產和執行變換。
execute_date = '20240605' output_join_table_2 = join_pipeline_2.execute(execute_date, drop_table=True)查看結果表資料。
join_ret_2 = output_join_table_2.to_pandas(execute_date, limit = 50) join_ret_2查看實際的計算過程。
output_join_table_2.transform_info
相關文檔
參考特徵生產最佳實務,瞭解實際應用情境。
阿里雲特徵平台(FeatureStore)基本適用於所有需要特徵的情境,比如推薦情境、金融風控情境、使用者增長情境。同時,FeatureStore與阿里雲常用資料來源引擎、建議服務引擎完成對接,可為您提供端到端高效便捷的一站式從特徵註冊管理到模型開發應用的全流程操作平台。更多關於FeatureStore的資訊,請參見FeatureStore概述。
如果您在配置或使用過程中有任何問題,可以搜尋DingTalk群號:34415007523,進入答疑群聯絡技術人員進行諮詢。