Kemampuan rekayasa fitur dari FeatureStore banyak digunakan di bidang seperti rekomendasi, periklanan, pengendalian risiko, dan pembelajaran mesin. Kemampuan ini dirancang untuk mengurangi kompleksitas rekayasa fitur dengan menstandarkan fungsi-fungsi umum, sehingga Anda dapat menerapkan rekayasa fitur melalui konfigurasi sederhana. Topik ini menjelaskan secara rinci proses rekayasa fitur.
Prasyarat
Pastikan bahwa persiapan yang dijelaskan dalam tabel berikut sudah selesai.
Layanan | Operasi |
PAI |
|
MaxCompute |
|
DataWorks |
|
1. Persiapan
Siapkan data mentah
Dalam topik ini, empat tabel sumber berikut digunakan:
Tabel pengguna (rec_sln_demo_user_table_preprocess_v1): berisi fitur dasar pengguna, seperti jenis kelamin, usia, kota, dan jumlah pengikut.
Tabel perilaku (rec_sln_demo_behavior_table_preprocess_v1): berisi data perilaku, seperti klik pengguna pada item pada waktu tertentu.
Tabel item (rec_sln_demo_item_table_preprocess_v1): berisi fitur dasar item, seperti kategori, penulis, jumlah total klik, dan jumlah total suka.
Tabel perilaku lebar (rec_sln_demo_behavior_table_preprocess_wide_v1): Tabel ini terbentuk dengan menggabungkan tiga tabel sebelumnya.
Tabel data disimpan dalam ruang kerja pai_online_project yang terlihat oleh semua pengguna. Tabel data hanya menyimpan data simulasi. Anda harus menjalankan pernyataan SQL di DataWorks untuk menyinkronkan data dalam tabel-tabel sebelumnya dari ruang kerja pai_online_project ke proyek MaxCompute Anda. Langkah-langkahnya adalah sebagai berikut:
Masuk ke Konsol DataWorks.
Di panel navigasi di sebelah kiri, klik Data Development & O&M > Data Development.
Pilih ruang kerja DataWorks yang Anda buat dan klik Enter Data Development.
Arahkan mouse ke New, lalu pilih New Node > MaxCompute > ODPS SQL. Di halaman yang muncul, konfigurasikan parameter node.
Parameter
Nilai yang Disarankan
Engine Instance
Pilih mesin MaxCompute yang Anda buat.
Node Type
ODPS SQL
Path
Business Flow/Workflow/MaxCompute
Name
Masukkan nama kustom.
Klik Confirm.
Di editor node baru, jalankan perintah SQL berikut untuk menyinkronkan data dari tabel pengguna, item, perilaku, dan tabel perilaku lebar di proyek pai_online_project ke proyek MaxCompute Anda. Untuk grup sumber daya, pilih grup sumber daya eksklusif Anda untuk penjadwalan.
Jalankan pernyataan SQL berikut untuk menyinkronkan data dalam tabel pengguna rec_sln_demo_user_table_preprocess_v1:
CREATE TABLE IF NOT EXISTS rec_sln_demo_user_table_preprocess_v1 like pai_online_project.rec_sln_demo_user_table_preprocess_v1 STORED AS ALIORC LIFECYCLE 90; INSERT OVERWRITE TABLE rec_sln_demo_user_table_preprocess_v1 PARTITION(ds) SELECT * FROM pai_online_project.rec_sln_demo_user_table_preprocess_v1 WHERE ds >= '20240530' and ds <='20240605';Jalankan pernyataan SQL berikut untuk menyinkronkan data dalam tabel perilaku rec_sln_demo_behavior_table_preprocess_v1:
CREATE TABLE IF NOT EXISTS rec_sln_demo_behavior_table_preprocess_v1 like pai_online_project.rec_sln_demo_behavior_table_preprocess_v1 STORED AS ALIORC LIFECYCLE 90; INSERT OVERWRITE TABLE rec_sln_demo_behavior_table_preprocess_v1 PARTITION(ds) SELECT * FROM pai_online_project.rec_sln_demo_behavior_table_preprocess_v1 WHERE ds >= '20240530' and ds <='20240605';Jalankan pernyataan SQL berikut untuk menyinkronkan data dalam tabel item rec_sln_demo_item_table_preprocess_v1:
CREATE TABLE IF NOT EXISTS rec_sln_demo_item_table_preprocess_v1 like pai_online_project.rec_sln_demo_item_table_preprocess_v1 STORED AS ALIORC LIFECYCLE 90; INSERT OVERWRITE TABLE rec_sln_demo_item_table_preprocess_v1 PARTITION(ds) SELECT * FROM pai_online_project.rec_sln_demo_item_table_preprocess_v1 WHERE ds >= '20240530' and ds <='20240605';Sinkronkan tabel perilaku lebar: rec_sln_demo_behavior_table_preprocess_wide_v1
CREATE TABLE IF NOT EXISTS rec_sln_demo_behavior_table_preprocess_wide_v1 like pai_online_project.rec_sln_demo_behavior_table_preprocess_wide_v1 STORED AS ALIORC LIFECYCLE 90; INSERT OVERWRITE TABLE rec_sln_demo_behavior_table_preprocess_wide_v1 PARTITION(ds) SELECT * FROM pai_online_project.rec_sln_demo_behavior_table_preprocess_wide_v1 WHERE ds >= '20240530' and ds <='20240605';
Instal SDK FeatureStore untuk Python
Disarankan menggunakan Jupyter Notebook untuk menjalankan kode berikut.
Instal SDK FeatureStore untuk Python di lingkungan Python3.
%pip install https://feature-store-py.oss-cn-beijing.aliyuncs.com/package/feature_store_py-2.0.2-py3-none-any.whlImpor modul fungsional yang diperlukan.
import os from feature_store_py import FeatureStoreClient from feature_store_py.fs_datasource import MaxComputeDataSource from feature_store_py.feature_engineering import TableTransform, Condition, DayOf, ComboTransform, Feature, AggregationTransform, auto_count_feature_transform, WindowTransform, auto_window_feature_transform
2. Proses transformasi tabel dan fitur
Disarankan menjalankan kode berikut di lingkungan Jupyter Notebook.
Definisikan transformasi tabel.
Inisialisasi klien.
access_key_id=os.environ.get ("ALIBABA_CLOUD_ACCESS_KEY_ID") # Masukkan ID AccessKey Anda. access_key_secret=os.environ.get ("ALIBABA_CLOUD_ACCESS_KEY_SECRET") # Masukkan rahasia AccessKey Anda. project='project_name' # Masukkan nama proyek Anda. region='cn-hangzhou' # Masukkan wilayah tempat proyek Anda berada. Misalnya, jika proyek Anda berada di China (Hangzhou), masukkan cn-hangzhou. fs_client = FeatureStoreClient(access_key_id=access_key_id, access_key_secret=access_key_secret, region=region)Tentukan sumber data.
input_table_name = "rec_sln_demo_behavior_table_preprocess_v1" ds = MaxComputeDataSource(table=input_table_name, project=project)Tentukan nama tabel output untuk transformasi.
output_table_name = "rec_sln_demo_v1_fs_test_v1"Definisikan transformasi tabel.
trans_name = "drop_duplicates" # Nama transformasi tabel. keys = ["user_id", "item_id"] # Bidang untuk deduplikasi. sort_keys = ["event_unix_time"] # Bidang sortir. sort_order = ["desc"] # Definisi urutan. tran_i = TableTransform(trans_name, keys, sort_keys, sort_order)
Definisikan transformasi fitur.
feature1 = Feature( name="page_net_type", input=['page', 'net_type'], transform=ComboTransform( separator='_' ) ) feature2 = Feature( name="trim_playtime", type="double", transform="playtime/10" )Hasilkan pipeline.
pipeline = fs_client.create_pipeline(ds, output_table_name).add_table_transform(tran_i).add_feature_transform([feature1, feature2], keep_input_columns=True)Hasilkan dan jalankan transformasi.
execute_date = '20240605' output_table = pipeline.execute(execute_date, drop_table=True)Kode sebelumnya melibatkan dua langkah berikut:
Hasilkan konfigurasi transformasi. Konfigurasi ini menentukan pernyataan SQL dan informasi yang diperlukan untuk transformasi, seperti input, output, parameter, dan dependensi.
Jalankan transformasi. Transformasi dijalankan berdasarkan konfigurasi dari langkah sebelumnya, dan hasilnya disimpan di tabel output.
Lihat hasilnya.
Lihat hasilnya di tabel yang dihasilkan. Hasilnya langsung dirender dalam format pandas.DataFrame.
pd_ret = output_table.to_pandas(execute_date, limit=20)Tampilkan isi pd_ret.
pd_retLihat konfigurasi yang dihasilkan. Ini termasuk definisi tabel input, transformasi SQL, dependensi, parameter, dan definisi tabel output. Setelah disimpan, konfigurasi ini dapat digunakan untuk debugging dan tugas online rutin selanjutnya.
transform_info = output_table.transform_infoLihat isi transform_info.
transform_infoLihat konfigurasi input untuk tahap pertama.
pipeline_config = pipeline.pipeline_configLihat isi pipeline_config.
pipeline_config
3. Transformasi fitur statistik
Fitur statistik adalah metode pra-pemrosesan data umum dalam pembelajaran mesin dan analitik data. Mereka digunakan untuk menghasilkan fitur yang lebih representatif dan mudah diinterpretasikan. Transformasi ini merangkum, menghitung, dan mengekstrak informasi dari data mentah, memungkinkan model untuk lebih memahami tren waktu, periodisitas, dan anomali dalam data. Keuntungannya adalah sebagai berikut:
Tangkap tren waktu: Dalam data perilaku pengguna, perilaku dari periode terbaru sering kali memiliki dampak lebih besar pada kondisi saat ini.
Mengurangi noise: Data mentah mungkin mengandung sejumlah besar noise. Dengan menggunakan transformasi statistik, Anda dapat menggunakan operasi agregat untuk mengurangi dampak noise ini.
Memperkaya fitur: Transformasi statistik dapat menghasilkan fitur baru, meningkatkan kemampuan ekspresif model.
Tingkatkan performa model: Dengan memperkenalkan fitur statistik, Anda dapat secara signifikan meningkatkan performa prediksi model.
Tingkatkan interpretabilitas: Fitur statistik lebih mudah diinterpretasikan dan dipahami, yang membuat diagnosis masalah dan analisis lebih nyaman.
Kompresi data: Dalam beberapa kasus, fitur statistik dapat secara efektif mengurangi dimensi data.
Meskipun implementasi fitur statistik kompleks, Anda dapat membuat banyak fitur statistik menggunakan definisi sederhana yang dijelaskan di bawah ini.
Definisi dan pelaksanaan transformasi fitur statistik tunggal
Definisikan nama tabel input dan output.
input_agg_table_name = "rec_sln_demo_behavior_table_preprocess_wide_v1" ds_agg = MaxComputeDataSource(table=input_agg_table_name, project=project) output_agg_table_name = "rec_sln_demo_behavior_test_agg_v1"Definisikan fitur statistik.
feature_agg1 = Feature( name="user_avg_praise_count_1d", input=["praise_count"], transform=AggregationTransform( agg_func="avg", # Nama fungsi agregat. Nilai opsional adalah 'avg', 'sum', 'min', 'max'. condition=Condition(field="event", value="expr", operator="<>"), # Tetapkan kondisi yang terpenuhi ketika nilai bidang "event" tidak sama dengan "expr". Anda dapat memahami logika spesifik dari pernyataan SQL yang dihasilkan. group_by_keys="user_id", # Kunci yang sesuai dengan group by. window_size=DayOf(1), # Ukuran jendela, yang satu hari di sini. ), )Anda dapat membuat pipeline dan menjalankan transformasi fitur statistik.
agg_pipeline = fs_client.create_pipeline(ds_agg, output_agg_table_name).add_feature_transform([feature_agg1])Hasilkan dan jalankan transformasi.
execute_date = '20240605' print("transform_info = ", agg_pipeline.transform_info) output_agg_table = agg_pipeline.execute(execute_date, drop_table=True)Lihat isi transform_info.
transform_info_agg = output_agg_table.transform_info transform_info_aggLihat hasilnya.
pd_ret = output_agg_table.to_pandas(execute_date, limit=20) pd_ret
Gabungan otomatis untuk transformasi fitur statistik dengan jendela berbeda
Definisikan nama tabel output.
output_agg_table_name_2 = "rec_sln_demo_behavior_test_agg_v2"Definisikan fitur statistik untuk beberapa jendela berbeda. Ukuran jendela yang didefinisikan adalah 1, 3, 7, 15, dan 30 hari.
feature_agg1 = Feature( name="user_avg_praise_count_1d", input=["praise_count"], transform=AggregationTransform( agg_func="avg", condition=Condition(field="event", value="expr", operator="<>"), group_by_keys="user_id", window_size=DayOf(1), ), ) feature_agg2 = Feature( name="user_avg_praise_count_3d", input=["praise_count"], transform=AggregationTransform( agg_func="avg", condition=Condition(field="event", value="expr", operator="<>"), group_by_keys="user_id", window_size=DayOf(3), ), ) feature_agg3 = Feature( name="user_avg_praise_count_7d", input=["praise_count"], transform=AggregationTransform( agg_func="avg", condition=Condition(field="event", value="expr", operator="<>"), group_by_keys="user_id", window_size=DayOf(7), ), ) feature_agg4 = Feature( name="user_avg_praise_count_15d", input=["praise_count"], transform=AggregationTransform( agg_func="avg", condition=Condition(field="event", value="expr", operator="<>"), group_by_keys="user_id", window_size=DayOf(15), ), ) feature_agg5 = Feature( name="user_avg_praise_count_30d", input=["praise_count"], transform=AggregationTransform( agg_func="avg", condition=Condition(field="event", value="expr", operator="<>"), group_by_keys="user_id", window_size=DayOf(30), ), )Buat pipeline.
agg_pipeline_2 = fs_client.create_pipeline(ds_agg, output_agg_table_name_2).add_feature_transform([feature_agg1, feature_agg2, feature_agg3, feature_agg4, feature_agg5])Hasilkan dan jalankan pipeline.
execute_date = '20240605' output_agg_table_2 = agg_pipeline_2.execute(execute_date, drop_table=True)Lihat hasil transformasi.
transform_info_agg_2 = output_agg_table_2.transform_info transform_info_agg_2Lihat hasil run tabel.
pd_ret_2 = output_agg_table_2.to_pandas(execute_date, limit=20) pd_ret_2
Penggabungan otomatis dan derivasi tipe untuk beberapa proses transformasi fitur statistik
Untuk mengoptimalkan perhitungan, beberapa fitur dengan ukuran jendela yang sama digabungkan dan dihitung secara otomatis dalam blok jendela grup yang sama. Proses perhitungan melibatkan perubahan tipe. Sebagai contoh, `avg` mengubah tipe `bigint` menjadi tipe `double`. Sulit untuk mengingat banyak tipe fitur input. Oleh karena itu, proses transformasi fitur statistik mendukung derivasi tipe otomatis, yang berarti Anda tidak perlu menentukan tipe terlebih dahulu. Tipe fitur yang dihasilkan secara otomatis diturunkan selama definisi fitur.
Definisikan nama tabel output.
output_agg_table_name_3 = "rec_sln_demo_behavior_test_agg_v3"Definisikan lebih banyak fitur dari tipe berbeda.
feature_agg6 = Feature( name="user_expr_cnt_1d", transform=AggregationTransform( agg_func="count", condition=Condition(field="event", value="expr", operator="="), group_by_keys="user_id", window_size=DayOf(1), ) ) feature_agg7 = Feature( name="user_expr_item_id_dcnt_1d", input=['item_id'], transform=AggregationTransform( agg_func="count", condition=Condition(field="event", value="expr", operator="="), group_by_keys="user_id", window_size=DayOf(1), ), ) feature_agg8 = Feature( name="user_sum_praise_count_1d", input=["praise_count"], transform=AggregationTransform( agg_func="sum", condition=Condition(field="event", value="expr", operator="<>"), group_by_keys="user_id", window_size=DayOf(1), ), ) feature_agg9 = Feature( name="user_sum_praise_count_3d", input=["praise_count"], transform=AggregationTransform( agg_func="sum", condition=Condition(field="event", value="expr", operator="<>"), group_by_keys="user_id", window_size=DayOf(3), ), )Buat pipeline.
agg_pipeline_3 = fs_client.create_pipeline(ds_agg, output_agg_table_name_3).add_feature_transform([feature_agg1, feature_agg2, feature_agg3, feature_agg4, feature_agg5, feature_agg6, feature_agg7, feature_agg8, feature_agg9])Hasilkan dan jalankan pipeline.
execute_date = '20240605' output_agg_table_3 = agg_pipeline_3.execute(execute_date, drop_table=True)Lihat hasil transformasi.
transform_info_agg_3 = output_agg_table_3.transform_info transform_info_agg_3Lihat hasil run tabel.
pd_ret_3 = output_agg_table_3.to_pandas(execute_date, limit=20) pd_ret_3
Fungsi ekstensi otomatis bawaan untuk mendukung ekstensi otomatis transformasi fitur statistik
Mengimplementasikan setiap fitur statistik secara manual kompleks karena jumlah fitur yang besar, termasuk ukuran jendela yang berbeda dan banyak kombinasi perhitungan fungsi agregat. Sistem menyediakan fungsi ekstensi otomatis bawaan. Anda hanya perlu menentukan fitur input yang akan dihitung, dan sistem secara otomatis menghasilkan dan menyelesaikan definisi untuk ratusan fitur statistik.
Tentukan fitur input yang akan dihitung.
name_prefix = "user_" input_list = ["playtime", "duration", "click_count", "praise_count"] event_name = 'event' event_type = 'expr' group_by_key = "user_id" count_feature_list = auto_count_feature_transform(name_prefix, input_list, event_name, event_type, group_by_key) print("len_count_feature_list = ", len(count_feature_list)) print("count_feature_list = ", count_feature_list)Definisikan nama tabel output dan buat pipeline.
output_agg_table_name_4 = "rec_sln_demo_behavior_test_agg_v4" agg_pipeline_4 =fs_client.create_pipeline(ds_agg, output_agg_table_name_4).add_feature_transform(count_feature_list)Hasilkan dan jalankan pipeline.
execute_date = '20240605' output_agg_table_4 = agg_pipeline_4.execute(execute_date, drop_table=True)Lihat hasil transformasi.
transform_info_agg_4 = output_agg_table_4.transform_info transform_info_agg_4Lihat hasil run tabel.
pd_ret_4 = output_agg_table_4.to_pandas(execute_date, limit=20) pd_ret_4
Dukungan untuk transformasi berbeda pada kunci grup yang berbeda
Bagian sebelumnya menjelaskan metode pemrosesan ketika semua kunci grup sama. Sistem juga mendukung operasi transformasi pada kunci grup yang berbeda. Berikut adalah contohnya:
Definisikan nama tabel output.
input_agg_table_name = "rec_sln_demo_behavior_table_preprocess_wide_v1" ds_agg = MaxComputeDataSource(table=input_agg_table_name, project=project) output_agg_table_name_5 = "rec_sln_demo_behavior_test_agg_v5"Definisikan fitur untuk kunci grup yang berbeda.
feature_agg1 = Feature( name="item__sum_follow_cnt_15d", input=['follow_cnt'], transform=AggregationTransform( agg_func="sum", condition=Condition(field="event", value="expr", operator="="), group_by_keys="item_id", window_size=DayOf(1), ) ) feature_agg2 = Feature( name="author__max_follow_cnt_15d", input=['follow_cnt'], transform=AggregationTransform( agg_func="max", condition=Condition(field="event", value="expr", operator="="), group_by_keys="author", window_size=DayOf(15), ), )Buat pipeline.
agg_pipeline_5 = fs_client.create_pipeline(ds_agg, output_agg_table_name_5).add_feature_transform([feature_agg1, feature_agg2])Hasilkan dan jalankan pipeline.
execute_date = '20240605' output_agg_table_5 = agg_pipeline_5.execute(execute_date, drop_table=True)Lihat hasil transformasi.
transform_info_agg_5 = output_agg_table_5.transform_info transform_info_agg_5
4. Transformasi fitur WindowTransform
Transformasi fitur statistik yang disebutkan sebelumnya sudah cukup untuk skenario rekayasa fitur umum. Namun, dalam beberapa skenario rekomendasi berskala besar, ada persyaratan yang lebih canggih. FeatureStore mendukung transformasi fitur WindowTransform, yang memungkinkan Anda dengan mudah mendapatkan fitur KV dan menggunakan tabel antara harian untuk mengoptimalkan proses perhitungan. Ini mengurangi waktu perhitungan fitur dan menghemat biaya komputasi. Keuntungannya adalah sebagai berikut:
Tangkap interaksi non-linear kompleks: Fitur sederhana (seperti usia pengguna dan jenis kelamin) sulit mengekspresikan preferensi pengguna yang kompleks. Penyilangan fitur dapat membantu menangkap hubungan interaksi non-linear yang lebih kompleks antara pengguna dan item.
Tingkatkan akurasi prediksi: Fitur silang dapat secara signifikan meningkatkan performa sistem rekomendasi dan sistem periklanan.
Kurangi ruang penyimpanan: Untuk koleksi pengguna dan item berskala besar, menyimpan fitur interaksi untuk setiap pasangan pengguna-item tidaklah memungkinkan. Ekstraksi fitur dan transformasi fitur dapat secara efektif mengurangi jumlah fitur yang perlu disimpan.
Tingkatkan efisiensi inferensi: Dengan menghitung dan menyimpan fitur silang terlebih dahulu, Anda dapat dengan cepat menemukan dan menggunakan fitur-fitur ini selama inferensi real-time, yang meningkatkan kecepatan respons sistem.
Proses implementasi transformasi fitur WindowTransform diperkenalkan di bagian berikut:
Proses perhitungan fungsi agregat sederhana
Fungsi agregat sederhana mencakup count, sum, max, dan min. Proses perhitungan untuk fungsi agregat ini relatif sederhana. Setelah ringkasan tingkat harian, ringkasan lebih lanjut selama beberapa hari dilakukan untuk mendapatkan hasil akhir. Selain itu, bagian ini juga memperkenalkan tabel antara harian dan penggunaan Fungsi yang Ditentukan Pengguna (UDF) (Ikhtisar UDF MaxCompute) untuk mendapatkan hasil perhitungan akhir. Dalam proses perhitungan aktual, proses untuk pengguna menjalankan rekayasa fitur sama dengan transformasi fitur konvensional yang dijelaskan sebelumnya. SDK Python FeatureStore secara otomatis menangani pembuatan tabel antara harian, pembuatan UDF, pengunggahan sumber daya, dan pendaftaran fungsi otomatis. Anda dapat menerapkan proses rekayasa fitur tanpa menyadari detail-detail ini.
Definisikan nama tabel input dan output.
input_window_table_name = "rec_sln_demo_behavior_table_preprocess_wide_v1" ds_window_1 = MaxComputeDataSource(table=input_agg_table_name, project=project) output_window_table_name_1 = "rec_sln_demo_behavior_test_window_v1"Definisikan fitur WindowTransform.
win_feature1 = Feature( name="item__kv_gender_click_7d", input=["gender"], transform=WindowTransform( agg_func="count", condition=Condition(field="event", value="click", operator="="), group_by_keys="item_id", window_size=DayOf(7), ), ) win_feature2 = Feature( name="item__kv_gender_click_cnt_7d", input=["gender"], transform=WindowTransform( agg_func="sum", # Fungsi agregat. Nilai opsional adalah 'sum', 'avg', 'max', 'min'. agg_field="click_count", # Lakukan perhitungan fungsi agregat pada fitur ini. condition=Condition(field="event", value="click", operator="="), group_by_keys="item_id", window_size=DayOf(7), ), )Buat pipeline dan jalankan transformasi fitur WindowTransform.
window_pipeline_1 = fs_client.create_pipeline(ds_window_1, output_window_table_name_1).add_feature_transform([win_feature1, win_feature2], keep_input_columns=True)Hasilkan dan jalankan transformasi.
Proses generasi ini membuat tabel antara sementara harian. Pada titik ini, `DROP TABLE` hanya menghapus hasil akhir dan tidak menghapus tabel antara sementara.
execute_date = '20240605' print("transform_info = ", window_pipeline_1.transform_info) output_window_table_1 = window_pipeline_1.execute(execute_date, drop_table=True)Selain itu, karena melibatkan statistik untuk beberapa hari (misalnya, contoh sebelumnya menghitung data selama 7 hari), tabel antara sementara biasanya hanya menghitung data untuk partisi terbaru. Untuk alasan ini, sistem menyediakan parameter `backfill_partitions`. Saat Anda menjalankan proses untuk pertama kali, Anda dapat mengatur parameter ini ke `True`, dan sistem secara otomatis mengisi ulang data untuk hari-hari yang bergantung. Misalnya, jika penghitungan melibatkan data selama 7 hari, sistem secara otomatis menyelesaikan 7 hari data tersebut. Untuk rutinitas selanjutnya, Anda dapat mengatur parameter ke `False` untuk menyelesaikan data hanya untuk partisi hari terbaru.
execute_date = '20240506' output_window_table_1 = window_pipeline_1.execute(execute_date, backfill_partitions=True)Ketika parameter `backfill_partitions` diatur ke `True`, sistem secara otomatis menyelesaikan data untuk hari-hari yang bergantung dari tabel antara sementara. Kami sarankan Anda melakukan ini selama rutinitas pertama kali.
Jika jumlah hari yang harus dihitung besar, menjalankan kode sebelumnya membutuhkan waktu lama.
Lihat data tabel sink.
window_ret_1 = output_window_table_1.to_pandas(execute_date, limit=50) window_ret_1Lihat proses perhitungan aktual.
window_pipeline_1.transform_infoSeperti yang bisa Anda lihat dari proses perhitungan, sistem menghasilkan tabel antara sementara harian bernama `rec_sln_demo_behavior_table_preprocess_wide_v1_tmp_daily`. Tabel ini merangkum hasil harian dan menyimpannya di partisi tetap, yang menghindari perhitungan berulang.
Selain itu, UDF bernama `count_kv` digunakan untuk menghitung hasil akhir. UDF ini secara otomatis mengklasifikasikan dan merangkum hasil statistik ke dalam peta hasil, yang disimpan dalam format string. Ini memudahkan inferensi hasil offline dan online selanjutnya.
Konten sebelumnya memperkenalkan proses perhitungan untuk fungsi agregat sederhana, menggunakan `count` dan `sum` sebagai contoh. Meskipun proses ini melibatkan konsep seperti tabel antara sementara harian dan UDF, alur inti sama dengan operasi transformasi data konvensional dan tidak menambah kompleksitas operasional. Fungsi agregat sederhana lainnya, seperti `max` dan `min`, serupa.
Proses perhitungan fungsi agregat avg
Karena merata-ratakan hasil rata-rata harian mengarah pada perhitungan yang tidak akurat, fungsi agregat avg memiliki proses perhitungan khusus. Metode yang benar adalah pertama-tama menghitung total sum (sum_v) dan total count (count_v) untuk seluruh periode, kemudian menghitung rata-rata menggunakan rumus sum_v/count_v.
Meskipun fungsi agregat ini didokumentasikan secara terpisah, detail perhitungan kompleksnya dienkapsulasi dalam transform_info. Oleh karena itu, Anda tidak perlu memahami detail dasarnya. Anda dapat menggunakan fungsi ini seperti fitur konvensional untuk menghasilkan hasil akhir.
Definisikan nama tabel input dan output.
input_window_table_name = "rec_sln_demo_behavior_table_preprocess_wide_v1" ds_window_1 = MaxComputeDataSource(table=input_window_table_name, project=project) output_window_table_name_2 = "rec_sln_demo_behavior_test_window_v2"Anda dapat mendefinisikan fitur WindowTransform.
win_feature1 = Feature( name="item__kv_gender_click_avg_7d", input=["gender"], transform=WindowTransform( agg_func="avg", agg_field="click_count", condition=Condition(field="event", value="click", operator="="), group_by_keys="item_id", window_size=DayOf(7), ), ) win_feature2 = Feature( name="item__kv_gender_click_avg_15d", input=["gender"], transform=WindowTransform( agg_func="avg", agg_field="click_count", condition=Condition(field="event", value="click", operator="="), group_by_keys="item_id", window_size=DayOf(15), ), )Buat pipeline dan jalankan transformasi fitur WindowTransform.
window_pipeline_2 = fs_client.create_pipeline(ds_window_1, output_window_table_name_2).add_feature_transform([win_feature1, win_feature2])Hasilkan dan jalankan transformasi.
execute_date = '20240605' print("transform_info = ", window_pipeline_2.transform_info) output_window_table_2 = window_pipeline_2.execute(execute_date, drop_table=True)Lihat data tabel sink.
window_ret_2 = output_window_table_2.to_pandas(execute_date, limit=50) window_ret_2
Proses perhitungan fungsi untuk beberapa kunci grup
Demikian pula, WindowTransform mendukung perhitungan simultan dengan beberapa kunci grup. Hasilnya kemudian digabungkan ke tabel input. Berikut adalah contohnya:
Definisikan nama tabel input dan output.
input_window_table_name = "rec_sln_demo_behavior_table_preprocess_wide_v1" ds_window_1 = MaxComputeDataSource(table=input_window_table_name, project=project) output_window_table_name_3 = "rec_sln_demo_behavior_test_window_v3"Definisikan fitur WindowTransform.
win_feature1 = Feature( name="item__kv_gender_click_7d", input=["gender"], transform=WindowTransform( agg_func="count", condition=Condition(field="event", value="click", operator="="), group_by_keys="item_id", window_size=DayOf(7), ), ) win_feature2 = Feature( name="item__kv_gender_click_cnt_7d", input=["gender"], transform=WindowTransform( agg_func="sum", agg_field="click_count", condition=Condition(field="event", value="click", operator="="), group_by_keys="item_id", window_size=DayOf(7), ), ) win_feature3 = Feature( name="author__kv_gender_click_15d", input=["gender"], transform=WindowTransform( agg_func="count", condition=Condition(field="event", value="click", operator="="), group_by_keys="author", window_size=DayOf(7), ), ) win_feature4 = Feature( name="author__kv_gender_click_cnt_15d", input=["gender"], transform=WindowTransform( agg_func="sum", agg_field="click_count", condition=Condition(field="event", value="click", operator="="), group_by_keys="author", window_size=DayOf(7), ), )Buat pipeline dan jalankan transformasi fitur WindowTransform.
window_pipeline_3 = fs_client.create_pipeline(ds_window_1, output_window_table_name_3).add_feature_transform([win_feature1, win_feature2, win_feature3, win_feature4])Hasilkan dan jalankan transformasi.
execute_date = '20240605' print("transform_info = ", window_pipeline_3.transform_info) output_window_table_3 = window_pipeline_3.execute(execute_date, drop_table=True)Lihat data tabel sink.
window_ret_3 = output_window_table_3.to_pandas(execute_date, limit=50) window_ret_3
Fungsi ekstensi otomatis bawaan untuk mendukung ekstensi otomatis fitur WindowTransform
Menyerupai transformasi fitur statistik, mengimplementasikan setiap fitur statistik WindowTransform secara manual kompleks karena jumlah fitur yang besar, termasuk ukuran jendela yang berbeda dan banyak kombinasi perhitungan fungsi agregat. Sistem menyediakan fungsi ekstensi otomatis bawaan. Anda hanya perlu menentukan fitur input yang akan dihitung, dan sistem secara otomatis menghasilkan dan menyelesaikan definisi untuk ratusan fitur statistik.
Tentukan fitur input yang akan dihitung.
name_prefix = "item" input_list = ['gender'] agg_field = ["click_count"] event_name = 'event' event_type = 'click' group_by_key = "item_id" window_size = [7, 15, 30, 45] window_transform_feature_list = auto_window_feature_transform(name_prefix, input_list, agg_field, event_name, event_type, group_by_key, window_size) print("len_window_transform_feature_list = ", len(window_transform_feature_list)) print("window_transform_feature_list = ", window_transform_feature_list)Definisikan nama tabel output dan buat pipeline.
input_window_table_name = "rec_sln_demo_behavior_table_preprocess_wide_v1" ds_window_1 = MaxComputeDataSource(table=input_window_table_name, project=project) output_window_table_name_4 = "rec_sln_demo_behavior_test_window_v4" window_pipeline_4 =fs_client.create_pipeline(ds_window_1, output_window_table_name_4).add_feature_transform(window_transform_feature_list)Hasilkan dan jalankan transformasi.
execute_date = '20240605' print("transform_info = ", window_pipeline_4.transform_info) output_window_table_4 = window_pipeline_4.execute(execute_date, drop_table=True)
Transformasi JoinTransform
Dalam proses rekayasa fitur sebelumnya, terutama untuk AggregationTransform dan WindowTransform, inputnya adalah tabel perilaku, dan hasil output juga disimpan dalam tabel perilaku. Namun, dalam kebanyakan kasus, tabel akhir yang dibutuhkan untuk penggunaan online bukanlah tabel perilaku, tetapi tabel fitur yang dihasilkan dengan menggabungkan tabel perilaku dengan tabel lain, seperti tabel pengguna atau tabel item.
Oleh karena itu, `JoinTransform` diperkenalkan untuk mendukung penggabungan fitur dari AggregationTransform dan WindowTransform dengan tabel pengguna atau item yang ada.
Hubungkan JoinTransform dengan WindowTransform
Definisikan tabel input WindowTransform.
window_table_name = 'rec_sln_demo_behavior_table_preprocess_wide_v1' ds_window_1 = MaxComputeDataSource(table=window_table_name, project=project)Definisikan fitur WindowTransform.
win_fea1 = Feature( name="item__kv_gender_click_7d", input=["gender"], transform=WindowTransform( agg_func="count", condition=Condition(field="event", value="click", operator="="), group_by_keys="item_id", window_size=DayOf(7), ) )Buat pipeline.
CatatanKarena tabel lain perlu digabungkan nanti, tabel output tidak ditentukan di sini.
win_pipeline_1 = fs_client.create_pipeline(ds_window_1).add_feature_transform([win_fea1])Definisikan tabel input dan output JoinTransform.
item_table_name = 'rec_sln_demo_item_table_preprocess_v1' ds_join_1 = MaxComputeDataSource(table=item_table_name, project=project) output_table_name = 'rec_sln_demo_item_table_v1_fs_window_debug_v1'Buat pipeline JoinTransform dan hubungkannya dengan pipeline WindowTransform.
join_pipeline_1 = fs_client.create_pipeline(ds_join_1, output_table_name).merge(win_pipeline_1)Hasilkan dan jalankan transformasi.
execute_date = '20240605' output_join_table_1 = join_pipeline_1.execute(execute_date, drop_table=True)Lihat data tabel sink.
join_ret_1 = output_join_table_1.to_pandas(execute_date, limit = 50) join_ret_1Lihat proses perhitungan aktual.
output_join_table_1.transform_info
Hubungkan JoinTransform dengan AggregationTransform
Definisikan tabel input AggregationTransform.
agg_table_name = 'rec_sln_demo_behavior_table_preprocess_wide_v1' ds_agg_1 = MaxComputeDataSource(table=agg_table_name, project=project)Definisikan fitur AggregationTransform.
agg_fea1 = Feature( name="user_avg_praise_count_1d", input=["praise_count"], transform=AggregationTransform( agg_func="avg", condition=Condition(field="event", value="expr", operator="<>"), group_by_keys="user_id", window_size=DayOf(1), ), )Buat pipeline.
CatatanKarena tabel lain perlu digabungkan nanti, tabel output tidak ditentukan di sini.
agg_pipeline_1 = fs_client.create_pipeline(ds_agg_1).add_feature_transform([agg_fea1])Definisikan tabel input dan output JoinTransform.
user_table_name = 'rec_sln_demo_user_table_preprocess_v1' ds_join_2 = MaxComputeDataSource(table=user_table_name, project=project) output_table_name_2 = 'rec_sln_demo_user_table_v1_fs_window_debug_v1'Buat pipeline JoinTransform dan hubungkannya dengan pipeline AggregationTransform.
join_pipeline_2 = fs_client.create_pipeline(ds_join_2, output_table_name_2).merge(agg_pipeline_1, keep_input_columns=False)Hasilkan dan jalankan transformasi.
execute_date = '20240605' output_join_table_2 = join_pipeline_2.execute(execute_date, drop_table=True)Lihat data tabel sink.
join_ret_2 = output_join_table_2.to_pandas(execute_date, limit = 50) join_ret_2Lihat proses perhitungan aktual.
output_join_table_2.transform_info
Referensi
Untuk informasi lebih lanjut tentang skenario aplikasi, lihat Praktik terbaik untuk rekayasa fitur.
FeatureStore cocok untuk semua skenario yang memerlukan fitur, seperti skenario rekomendasi, pengendalian risiko keuangan, dan skenario pertumbuhan pengguna. FeatureStore terintegrasi dengan mesin sumber data umum dan mesin layanan rekomendasi Alibaba Cloud. FeatureStore menyediakan platform end-to-end yang efisien dan nyaman dari pendaftaran fitur hingga pengembangan model dan aplikasi. Untuk informasi lebih lanjut tentang FeatureStore, lihat Ikhtisar.
Jika Anda memiliki pertanyaan saat menggunakan FeatureStore, Anda dapat bergabung dengan grup DingTalk 34415007523 untuk bantuan teknis.