Topik ini menjelaskan cara menggunakan FeatureStore SDK untuk Python guna mengintegrasikan dan menyajikan fitur untuk pelatihan model offline serta inferensi online.
Informasi latar belakang
FeatureStore merupakan pusat untuk membuat, berbagi, dan mengelola fitur yang digunakan dalam model pembelajaran mesin. Alat ini mendukung kerja tim yang efisien, memastikan konsistensi antara fitur offline dan online, serta memberikan akses cepat ke fitur secara real-time. FeatureStore sangat cocok untuk berbagai kasus penggunaan yang bergantung pada fitur, seperti Sistem rekomendasi. Dengan mengotomatiskan pembuatan dan pemeliharaan tabel fitur online dan offline, FeatureStore menjamin konsistensi data serta menghilangkan penyimpanan redundan, sehingga mengurangi biaya sumber daya. Hanya dengan satu baris kode, Anda dapat menyelesaikan tugas-tugas kompleks seperti mengekspor tabel pelatihan menggunakan SQL atau mengimpor data ke Hologres.
FeatureStore mengintegrasikan seluruh proses, mulai dari pembuatan fitur hingga pengembangan model, secara mulus. FeatureStore mendukung MaxCompute untuk tugas offline serta FeatureDB, Hologres, dan Tablestore untuk operasi online. Semua operasi yang diperlukan dapat dilakukan melalui konsol atau Python SDK tanpa harus mempelajari platform-platform tersebut, sehingga meningkatkan produktivitas tim dan memastikan konsistensi di lingkungan offline maupun online. Selain itu, FeatureStore menawarkan integrasi mendalam dengan EasyRec untuk generasi fitur (FG) dan pelatihan model yang efisien, serta penyebaran online langsung. Hal ini memungkinkan pengembangan cepat sistem rekomendasi mutakhir.
Jika Anda mengalami masalah saat menggunakan layanan ini, bergabunglah dengan grup DingTalk kami (32260796).
Prasyarat
Sebelum Anda mulai
Kami merekomendasikan agar Anda menjalankan kode dalam topik ini pada contoh DSW.
Instal Python SDK dalam lingkungan Python 3.
! pip install https://feature-store-py.oss-cn-beijing.aliyuncs.com/package/feature_store_py-1.3.1-py3-none-any.whlUntuk meminimalkan risiko kebocoran informasi sensitif, kami merekomendasikan Anda mentransmisikan
ID AccessKeydanRahasia AccessKeyAkun Alibaba Cloud sebagai variabel lingkungan.Klik Terminal dari bilah menu atas.
Jalankan perintah berikut. Ganti
YOUR_AccessKey_IDdengan ID AccessKey Anda yang sebenarnya.echo "export AccessKeyID='YOUR_AccessKey_ID'" >> ~/.bashrc source ~/.bashrcJalankan perintah berikut. Ganti
YOUR_Access_Key_Secretdengan Rahasia AccessKey Anda yang sebenarnya.echo "export AccessKeySecret='YOUR_Access_Key_Secret'" >> ~/.bashrc source ~/.bashrc
Impor modul yang diperlukan.
import unittest import sys import os from os.path import dirname, join, abspath from feature_store_py.fs_client import FeatureStoreClient from feature_store_py.fs_project import FeatureStoreProject from feature_store_py.fs_datasource import UrlDataSource, MaxComputeDataSource, DatahubDataSource, HologresDataSource, SparkDataSource, LabelInput, TrainingSetOutput from feature_store_py.fs_type import FSTYPE from feature_store_py.fs_schema import OpenSchema, OpenField from feature_store_py.fs_feature_view import FeatureView from feature_store_py.fs_features import FeatureSelector from feature_store_py.fs_config import EASDeployConfig, LabelInputConfig, PartitionConfig, FeatureViewConfig, TrainSetOutputConfig, SequenceFeatureConfig, SequenceTableConfig import logging logger = logging.getLogger("foo") logger.addHandler(logging.StreamHandler(stream=sys.stdout))
Dataset contoh
Topik ini menggunakan Moviedata, dataset open-source sebagai contoh. Movie, User, dan Rating digunakan, sesuai dengan tabel item, user, dan label dalam proses rekomendasi.
Konfigurasikan Proyek FeatureStore
Anda dapat membuat beberapa proyek independen di FeatureStore. Untuk menjalankan notebook, Anda harus terlebih dahulu mengonfigurasi penyimpanan data di FeatureStore.
Di sini, offline_datasource_id adalah ID dari toko data offline. online_datasource_id adalah ID dari toko data online.
Dalam contoh berikut, nama proyeknya adalah fs_movie.
# Masukkan access_key_id akun Alibaba Cloud Anda
access_id = os.getenv("AccessKeyID")
# Masukkan access_key_secret akun Alibaba Cloud Anda
access_ak = os.getenv("AccessKeySecret")
# Masukkan wilayah tempat Anda mengaktifkan FeatureStore. Dalam contoh ini, wilayah China (Hangzhou) digunakan
region = 'cn-hangzhou'
fs = FeatureStoreClient(access_key_id=access_id, access_key_secret=access_ak, region=region)
# Masukkan nama proyek platform fitur Anda. Dalam contoh ini, fs_movie digunakan
cur_project_name = "fs_movie"
project = fs.get_project(cur_project_name)
if project is None:
raise ValueError("Perlu membuat proyek : fs_movie")Peroleh dan cetak informasi proyek.
project = fs.get_project(cur_project_name)
print(project)Konfigurasikan entitas fitur
Entitas fitur adalah sekumpulan fitur yang saling terkait secara semantik. Setiap entitas fitur dapat dikaitkan dengan beberapa Tampilan fitur. Setiap entitas memiliki JoinId untuk mengaitkan fitur dari berbagai Tampilan fitur. Setiap Tampilan fitur memiliki kunci utama untuk pengambilan fitur, tetapi nama kunci utama bisa berbeda dari JoinId.
Buat tiga entitas: movie, user, dan rating.
cur_entity_name_movie = "movie_data"
join_id = 'movie_id'
entity_movie = project.get_entity(cur_entity_name_movie)
if entity_movie is None:
entity_movie = project.create_entity(name = cur_entity_name_movie, join_id=join_id)
entity_movie.print_summary()cur_entity_name_user = "user_data"
join_id = 'user_md5'
entity_user = project.get_entity(cur_entity_name_user)
if entity_user is None:
entity_user = project.create_entity(name = cur_entity_name_user, join_id=join_id)
entity_user.print_summary()cur_entity_name_ratings = "rating_data"
join_id = 'rating_id'
entity_ratings = project.get_entity(cur_entity_name_ratings)
if entity_ratings is None:
entity_ratings = project.create_entity(name = cur_entity_name_ratings, join_id=join_id)
entity_ratings.print_summary()Konfigurasikan Tampilan fitur
FeatureStore mengelola dan mengatur data fitur. Data eksternal diperkenalkan ke platform melalui Tampilan fitur, yang mendefinisikan penyimpanan data, operasi pra-pemrosesan atau transformasi, struktur data, lokasi penyimpanan, dan manajemen metadata fitur. Ini mencakup kunci utama, waktu kejadian, kunci partisi, entitas fitur, dan pengaturan TTL (Time to Live). Nilai default -1 menentukan bahwa penyimpanan data online menyimpan semua data fitur. Nilai positif menentukan bahwa penyimpanan data online hanya menyimpan data fitur terbaru dalam periode waktu tertentu.
Tampilan fitur memiliki jenis-jenis berikut:
BatchFeatureView: Fitur offline atau hari T-1. Data offline disuntikkan ke penyimpanan data offline dan dapat disinkronkan ke penyimpanan data online untuk kueri real-time.
StreamFeatureView: Fitur real-time. Data ditulis langsung ke penyimpanan data offline dan disinkronkan ke penyimpanan data online.
Sequence FeatureView: Fitur urutan, memungkinkan penulisan offline dan pembacaan online real-time.
BatchFeatureView
Untuk data yang disimpan dalam file CSV, unggah ke MaxCompute dengan menentukan URL file CSV. Anda harus membuat skema untuk Tampilan fitur secara manual.
path = 'https://feature-store-test.oss-cn-beijing.aliyuncs.com/dataset/moviedata_all/movies.csv'
delimiter = ','
omit_header = True
ds = UrlDataSource(path, delimiter, omit_header)
print(ds)schema menentukan nama dan tipe setiap bidang.
movie_schema = OpenSchema(
OpenField(name='movie_id', type='STRING'),
OpenField(name='name', type='STRING'),
OpenField(name='alias', type='STRING'),
OpenField(name='actors', type='STRING'),
OpenField(name='cover', type='STRING'),
OpenField(name='directors', type='STRING'),
OpenField(name='double_score', type='STRING'),
OpenField(name='double_votes', type='STRING'),
OpenField(name='genres', type='STRING'),
OpenField(name='imdb_id', type='STRING'),
OpenField(name='languages', type='STRING'),
OpenField(name='mins', type='STRING'),
OpenField(name='official_site', type='STRING'),
OpenField(name='regions', type='STRING'),
OpenField(name='release_date', type='STRING'),
OpenField(name='slug', type='STRING'),
OpenField(name='story', type='STRING'),
OpenField(name='tags', type='STRING'),
OpenField(name='year', type='STRING'),
OpenField(name='actor_ids', type='STRING'),
OpenField(name='director_ids', type='STRING'),
OpenField(name='dt', type='STRING')
)
print(movie_schema)Buat Tampilan fitur batch.
feature_view_movie_name = "feature_view_movie"
batch_feature_view = project.get_feature_view(feature_view_movie_name)
if batch_feature_view is None:
batch_feature_view = project.create_batch_feature_view(name=feature_view_movie_name, schema=movie_schema, online = True, entity= cur_entity_name_movie, primary_key='movie_id', partitions=['dt'], ttl=-1)batch_feature_view = project.get_feature_view(feature_view_movie_name)
batch_feature_view.print_summary()Tulis data ke tabel MaxCompute.
cur_task = batch_feature_view.write_table(ds, partitions={'dt':'20220830'})
cur_task.wait()Lihat informasi tentang tugas saat ini.
print(cur_task.task_summary)Sinkronkan data ke penyimpanan data online.
cur_task = batch_feature_view.publish_table({'dt':'20220830'})
cur_task.wait()print(cur_task.task_summary)Dapatkan Tampilan fitur.
batch_feature_view = project.get_feature_view(feature_view_movie_name)Cetak informasi Tampilan fitur.
batch_feature_view.print_summary()Impor tabel user dan ratings secara berurutan.
users_path = 'https://feature-store-test.oss-cn-beijing.aliyuncs.com/dataset/moviedata_all/users.csv'
ds = UrlDataSource(users_path, delimiter, omit_header)
print(ds)user_schema = OpenSchema(
OpenField(name='user_md5', type='STRING'),
OpenField(name='user_nickname', type='STRING'),
OpenField(name='ds', type='STRING')
)
print(user_schema)feature_view_user_name = "feature_view_users"
batch_feature_view = project.get_feature_view(feature_view_user_name)
if batch_feature_view is None:
batch_feature_view = project.create_batch_feature_view(name=feature_view_user_name, schema=user_schema, online = True, entity= cur_entity_name_user, primary_key='user_md5',ttl=-1, partitions=['ds'])write_table_task = batch_feature_view.write_table(ds, {'ds':'20220830'})
write_table_task.wait()
print(write_table_task.task_summary)cur_task = batch_feature_view.publish_table({'ds':'20220830'})
cur_task.wait()print(cur_task.task_summary)batch_feature_view = project.get_feature_view(feature_view_user_name)
batch_feature_view.print_summary()ratings_path = 'https://feature-store-test.oss-cn-beijing.aliyuncs.com/dataset/moviedata_all/ratings.csv'
ds = UrlDataSource(ratings_path, delimiter, omit_header)
print(ds)ratings_schema = OpenSchema(
OpenField(name='rating_id', type='STRING'),
OpenField(name='user_md5', type='STRING'),
OpenField(name='movie_id', type='STRING'),
OpenField(name='rating', type='STRING'),
OpenField(name='rating_time', type='STRING'),
OpenField(name='dt', type='STRING')
)feature_view_rating_name = "feature_view_ratings"
batch_feature_view = project.get_feature_view(feature_view_rating_name)
if batch_feature_view is None:
batch_feature_view = project.create_batch_feature_view(name=feature_view_rating_name, schema=ratings_schema, online = True, entity= cur_entity_name_ratings, primary_key='rating_id', event_time='rating_time', partitions=['dt'])cur_task = batch_feature_view.write_table(ds, {'dt':'20220831'})
cur_task.wait()print(cur_task.task_summary)batch_feature_view = project.get_feature_view(feature_view_rating_name)
batch_feature_view.print_summary()StreamFeatureView
Anda dapat mendefinisikan skema sebagai berikut.
online_schema = OpenSchema(
OpenField(name='id', type='STRING'),
OpenField(name='count_value', type='INT64'),
OpenField(name='metric_value', type='DOUBLE')
)Anda dapat menyalin pernyataan SQL berikut dan menjalankannya di MaxCompute atau DataWorks dalam proyek Anda. Pernyataan ini menghasilkan data uji. Data tersebut hanya untuk tujuan pengujian dan tidak memiliki arti khusus.
CREATE TABLE IF NOT EXISTS online_stream_test_t1 (
id STRING COMMENT 'ID',
count_value BIGINT COMMENT 'Nilai hitungan',
metric_value DOUBLE COMMENT 'Nilai metrik'
)
PARTITIONED BY (
ds string COMMENT 'Cap waktu data'
)
LIFECYCLE 365
;
INSERT INTO TABLE online_stream_test_t1 PARTITION (ds='20250815')
SELECT
CONCAT('str_', CAST(id AS STRING)) AS id,
CAST(FLOOR(RAND() * 1000000) AS BIGINT) AS count_value,
ROUND(RAND() * 1000, 2) AS metric_value
FROM (
SELECT SEQUENCE(1, 1000) AS id_list
) tmp
LATERAL VIEW EXPLODE(id_list) table_tmp AS id;Setelah pernyataan berhasil dijalankan, tabel fitur real-time online_stream_test_t1 dibuat, dan data dari partisi ds=20250815 disinkronkan ke tabel ini.
Anda dapat membuat stream_feature_view baru.
feature_view_rating_name_stream = "feature_view_online_stream"
stream_feature_view = project.get_feature_view(feature_view_rating_name_stream)
if stream_feature_view is None:
stream_feature_view = project.create_stream_feature_view(name=feature_view_rating_name_stream, schema=online_schema,
online=True, entity=cur_entity_name_user,
primary_key='id', event_time='count_value')Bidang event_time dalam StreamFeatureView memiliki tujuan khusus. Saat bidang ini dikonfigurasikan, nilainya digunakan untuk membersihkan data yang kedaluwarsa. Untuk informasi lebih lanjut, lihat Atur masa hidup untuk data fitur real-time.
Anda dapat mencetak informasi tentang tampilan fitur.
stream_feature_view = project.get_feature_view(feature_view_rating_name_stream)
stream_feature_view.print_summary()Anda dapat menyinkronkan data ke toko online.
# Ubah offline_datasource_id menjadi ID toko offline Proyek FeatureStore Anda.
# table_name adalah tabel fitur offline yang akan didorong ke toko online.
stream_task = stream_feature_view.publish_table(partitions={'ds': '20250815'}, mode='Merge', offline_to_online=True,
publish_config={'offline_datasource_id': project.offline_datasource_id,
'table_name': 'online_stream_test_t1'})
stream_task.wait()
print(stream_task.task_summary)Sequence FeatureView
Tabel data sumber terletak di pai_online_project, yang memiliki izin baca publik. Anda dapat menjalankan pernyataan SQL berikut di MaxCompute atau DataWorks dalam proyek Anda untuk menyalin tabel fitur urutan ke proyek Anda sendiri.
CREATE TABLE IF NOT EXISTS rec_sln_demo_behavior_table_preprocess_sequence_wide_seq_feature_v3
like pai_online_project.rec_sln_demo_behavior_table_preprocess_sequence_wide_seq_feature_v3
STORED AS ALIORC
LIFECYCLE 90;
INSERT OVERWRITE TABLE rec_sln_demo_behavior_table_preprocess_sequence_wide_seq_feature_v3 PARTITION(ds)
SELECT *
FROM pai_online_project.rec_sln_demo_behavior_table_preprocess_sequence_wide_seq_feature_v3
WHERE ds >= '20231022' and ds <='20231024'Setelah pernyataan berhasil dijalankan, tabel fitur urutan rec_sln_demo_behavior_table_preprocess_sequence_wide_seq_feature_v3 dibuat, dan data dari partisi ds=20231022, ds=20231023, dan ds=20231024 disinkronkan ke tabel ini.
Anda dapat membuat seq_feature_view baru.
user_entity_name = "user"
seq_feature_view_name = "wide_seq_feature_v3"
seq_feature_view = project.get_feature_view(seq_feature_view_name)
if seq_feature_view is None:
seq_table_name = "rec_sln_demo_behavior_table_preprocess_sequence_wide_seq_feature_v3"
behavior_table_name = 'rec_sln_demo_behavior_table_preprocess_v3'
ds = MaxComputeDataSource(project.offline_datasource_id, behavior_table_name)
event_time = 'event_unix_time' # Nama bidang waktu peristiwa dalam tabel perilaku.
item_id = 'item_id' # Nama bidang item_id dalam tabel perilaku.
event = 'event' # Nama bidang peristiwa dalam tabel perilaku.
# deduplication_method = 1 menunjukkan bahwa duplikat dihapus berdasarkan ['user_id', 'item_id', 'event'].
# deduplication_method = 2 menunjukkan bahwa duplikat dihapus berdasarkan ['user_id', 'item_id', 'event', 'event_time'].
sequence_feature_config_list = [
SequenceFeatureConfig(offline_seq_name='click_seq_50_seq', seq_event='click', online_seq_name='click_seq_50',
seq_len=50)]
# offline_seq_name adalah nama bidang fitur urutan dalam daftar urutan offline. seq_event adalah nama bidang perilaku. online_seq_name adalah nama yang diberikan untuk urutan item_ids perilaku pengguna saat di-query oleh FeatureStore online Go SDK.
# seq_len adalah panjang urutan. Urutan yang lebih panjang dari panjang ini dipotong.
seq_table_config = SequenceTableConfig(table_name=seq_table_name, primary_key='user_id',
event_time='event_unix_time')
seq_feature_view = project.create_sequence_feature_view(seq_feature_view_name, datasource=ds,
event_time=event_time, item_id=item_id, event=event,
deduplication_method=1,
sequence_feature_config=sequence_feature_config_list,
sequence_table_config=seq_table_config,
entity=user_entity_name)Anda dapat mencetak informasi tentang tampilan fitur.
seq_feature_view.print_summary()Anda dapat menyinkronkan data ke toko online.
seq_task = seq_feature_view.publish_table({'ds': '20231023'}, days_to_load=30)
seq_task.wait()
seq_task.print_summary()Anda dapat mendaftarkan tabel label.
label_table_name = 'fs_movie_feature_view_ratings_offline'
ds = MaxComputeDataSource(data_source_id=project.offline_datasource_id, table=label_table_name)
label_table = project.get_label_table(label_table_name)
if label_table is None:
label_table = project.create_label_table(datasource=ds, event_time='rating_time')Konfigurasikan penyimpanan data offline
Penyimpanan data offline adalah gudang data untuk menyimpan fitur offline. Fitur offline ditulis ke MaxCompute atau Hadoop Distributed File System (HDFS) menggunakan Apache Spark. Penyimpanan data offline digunakan untuk menghasilkan set pelatihan untuk pelatihan model dan menyajikan fitur untuk prediksi batch.
Konfigurasikan penyimpanan data online
Penyimpanan data online adalah gudang data untuk menyimpan fitur real-time. Ini memungkinkan akses latensi rendah ke fitur terbaru untuk inferensi online. FeatureDB, Hologres, dan Tablestore didukung.
Ambil fitur online
Ambil fitur online dari perspektif tampilan fitur, saat ini memprioritaskan dukungan untuk FeatureDB. (Untuk dokumentasi terkait FeatureDB, silakan merujuk ke: FeatureDB)
feature_view_movie_name = "feature_view_movie"
batch_feature_view = project.get_feature_view(feature_view_movie_name)
ret_features_1 = batch_feature_view.list_feature_view_online_features(join_ids=['26357307'])
print("ret_features1 = ", ret_features_1)ret_features_2 = batch_feature_view.list_feature_view_online_features(join_ids=['30444960', '3317352'])
print("ret_features2 = ", ret_features_2)Konfigurasikan featureSelector
FeatureSelector mendefinisikan jangkauan fitur yang akan diambil dari penyimpanan data online dan offline. Anda dapat menentukan Tampilan fitur untuk mengekstraksi fitur darinya.
feature_view_name = 'feature_view_movie'
# Ambil fitur spesifik
feature_selector = FeatureSelector(feature_view_name, ['site_id', 'site_category'])
# Ambil semua fitur
feature_selector = FeatureSelector(feature_view_name, '*')
# Konfigurasikan alias untuk fitur yang ingin Anda ambil
feature_selector = FeatureSelector(
feature_view='user1',
features = ['f1','f2', 'f3'],
alias={"f1":"f1_1"} # Tentukan f1_1 sebagai alias untuk bidang f1
)Konfigurasikan tabel sampel (set pelatihan)
FeatureStore dapat menghasilkan tabel sampel untuk pelatihan model, yang mencakup label dan fitur. Anda harus menyiapkan label untuk pelatihan model dan mendefinisikan fitur yang dibutuhkan model untuk diambil dari Tampilan fitur. Label dihubungkan dengan fitur menggunakan penggabungan berdasarkan waktu menggunakan kunci utama.
label_table_name = 'fs_movie_feature_view_ratings_offline'
output_ds = MaxComputeDataSource(data_source_id=project.offline_datasource_id)
train_set_output = TrainingSetOutput(output_ds)feature_view_movie_name = "feature_view_movie"
feature_movie_selector = FeatureSelector(feature_view_movie_name, ['name', 'actors', 'regions','tags'])
feature_view_user_name = 'feature_view_users'
feature_user_selector = FeatureSelector(feature_view_user_name, ['user_nickname'])
train_set = project.create_training_set(label_table_name=label_table_name, train_set_output= train_set_output, feature_selectors=[feature_movie_selector, feature_user_selector])
print("train_set = ", train_set)Pelatihan model
Pelatihan model menggunakan train_set yang dihasilkan oleh FeatureStore dan sebarkan model yang dilatih sebagai layanan inferensi.
model_name = "fs_rank_v1"
cur_model = project.get_model(model_name)
if cur_model is None:
cur_model = project.create_model(model_name, train_set)
print("cur_model_train_set_table_name = ", cur_model.train_set_table_name)Ekspor tabel sampel
Untuk mengekspor tabel sampel untuk pelatihan model, tentukan tabel label dan waktu kejadian serta partisi setiap Tampilan fitur.
label_partitions = PartitionConfig(name = 'dt', value = '20220831')
label_input_config = LabelInputConfig(partition_config=label_partitions, event_time='1999-01-00 00:00:00')
movie_partitions = PartitionConfig(name = 'dt', value = '20220830')
feature_view_movie_config = FeatureViewConfig(name = 'feature_view_movie', partition_config=movie_partitions)
user_partitions = PartitionConfig(name = 'ds', value = '20220830')
feature_view_user_config = FeatureViewConfig(name = 'feature_view_users', partition_config=user_partitions)
feature_view_config_list = [feature_view_movie_config, feature_view_user_config]
train_set_partitions = PartitionConfig(name = 'dt', value = '20220831')
train_set_output_config = TrainSetOutputConfig(partition_config=train_set_partitions)Ekspor tabel sampel berdasarkan kondisi yang ditentukan.
task = cur_model.export_train_set(label_input_config, feature_view_config_list, train_set_output_config)
task.wait()print(task.summary)