All Products
Search
Document Center

Platform For AI:Feature platform and feature engineering

Last Updated:Nov 28, 2025

The feature engineering capabilities of FeatureStore are widely used in fields such as recommendation, advertising, risk control, and machine learning. These capabilities are designed to reduce the complexity of feature engineering. By standardizing common functions, you can implement feature engineering through simple configurations. This topic describes the feature engineering process in detail.

Prerequisites

Make sure that preparations described in the following table are complete.

Service

Operation

PAI

MaxCompute

DataWorks

1. Preparations

Prepare raw data

In this topic, the following four source tables are used:

  • User table (rec_sln_demo_user_table_preprocess_v1): contains basic user features, such as gender, age, city, and number of followers.

  • Behavior table (rec_sln_demo_behavior_table_preprocess_v1): contains behavioral data, such as user clicks on items at specific times.

  • Item table (rec_sln_demo_item_table_preprocess_v1): contains basic item features, such as category, author, cumulative number of clicks, and cumulative number of likes.

  • Wide behavior table (rec_sln_demo_behavior_table_preprocess_wide_v1): This table is formed by joining the preceding three tables.

Data tables are stored in the pai_online_project workspace which is visible to all users. The data tables store only simulation data. You must execute SQL statements in DataWorks to synchronize data in the preceding tables from the pai_online_project workspace to your MaxCompute project. Perform the following steps:

  1. Log on to the DataWorks console.

  2. In the navigation pane on the left, click Data Development and O&M > Data Development.

  3. Select the DataWorks workspace that you created and click Go to Data Studio.

  4. Hover over Create, and choose Create Node > MaxCompute > ODPS SQL. In the page that appears, configure the node parameters.

    Parameter

    Suggested value

    Node Type

    ODPS SQL

    Path

    Business Flow/Workflow/MaxCompute

    Name

    Enter a custom name.

  5. Click Confirm.

  6. In the new node editor, run the following SQL commands to synchronize data from the user, item, behavior, and wide behavior tables in the pai_online_project project to your MaxCompute project. For the resource group, select your exclusive resource group for scheduling.

    • Execute the following SQL statements to synchronize data in the user table rec_sln_demo_user_table_preprocess_v1:

      CREATE TABLE IF NOT EXISTS rec_sln_demo_user_table_preprocess_v1
      like pai_online_project.rec_sln_demo_user_table_preprocess_v1
      STORED AS ALIORC  
      LIFECYCLE 90;
      
      INSERT OVERWRITE TABLE rec_sln_demo_user_table_preprocess_v1 PARTITION(ds)
      SELECT *
      FROM pai_online_project.rec_sln_demo_user_table_preprocess_v1
      WHERE ds >= '20240530' and ds <='20240605';
    • Execute the following SQL statements to synchronize data in the behavior table rec_sln_demo_behavior_table_preprocess_v1:

      CREATE TABLE IF NOT EXISTS rec_sln_demo_behavior_table_preprocess_v1
      like pai_online_project.rec_sln_demo_behavior_table_preprocess_v1
      STORED AS ALIORC  
      LIFECYCLE 90;
      
      INSERT OVERWRITE TABLE rec_sln_demo_behavior_table_preprocess_v1 PARTITION(ds)
      SELECT *
      FROM pai_online_project.rec_sln_demo_behavior_table_preprocess_v1
      WHERE ds >= '20240530' and ds <='20240605';
    • Execute the following SQL statements to synchronize data in the item table rec_sln_demo_item_table_preprocess_v1:

      CREATE TABLE IF NOT EXISTS rec_sln_demo_item_table_preprocess_v1
      like pai_online_project.rec_sln_demo_item_table_preprocess_v1
      STORED AS ALIORC  
      LIFECYCLE 90;
      
      INSERT OVERWRITE TABLE rec_sln_demo_item_table_preprocess_v1 PARTITION(ds)
      SELECT *
      FROM pai_online_project.rec_sln_demo_item_table_preprocess_v1
      WHERE ds >= '20240530' and ds <='20240605';
    • Synchronize the wide behavior table: rec_sln_demo_behavior_table_preprocess_wide_v1

      CREATE TABLE IF NOT EXISTS rec_sln_demo_behavior_table_preprocess_wide_v1
      like pai_online_project.rec_sln_demo_behavior_table_preprocess_wide_v1
      STORED AS ALIORC  
      LIFECYCLE 90;
      
      INSERT OVERWRITE TABLE rec_sln_demo_behavior_table_preprocess_wide_v1 PARTITION(ds)
      SELECT *
      FROM pai_online_project.rec_sln_demo_behavior_table_preprocess_wide_v1
      WHERE ds >= '20240530' and ds <='20240605';

Install FeatureStore SDK for Python

We recommend that you use Jupyter Notebook to run the following code.

  • Install FeatureStore SDK for Python in the Python3 environment.

    %pip install https://feature-store-py.oss-cn-beijing.aliyuncs.com/package/feature_store_py-2.0.2-py3-none-any.whl
  • Import the required functional modules.

    import os
    from feature_store_py import FeatureStoreClient
    from feature_store_py.fs_datasource import MaxComputeDataSource
    from feature_store_py.feature_engineering import TableTransform, Condition, DayOf, ComboTransform, Feature, AggregationTransform, auto_count_feature_transform, WindowTransform, auto_window_feature_transform

2. Table and feature transformation process

We recommend that you run the following code in a Jupyter Notebook environment.

  1. Define the table transformation.

    1. Initialize the client.

      access_key_id=os.environ.get ("ALIBABA_CLOUD_ACCESS_KEY_ID") # Enter your AccessKey ID.
      access_key_secret=os.environ.get ("ALIBABA_CLOUD_ACCESS_KEY_SECRET") # Enter your AccessKey secret.
      project='project_name' # Enter your project name.
      region='cn-hangzhou' # Enter the region in which your project resides. For example, if your project resides in China (Hangzhou), enter cn-hangzhou.
      fs_client = FeatureStoreClient(access_key_id=access_key_id, access_key_secret=access_key_secret, region=region)
    2. Specify the data source.

      input_table_name = "rec_sln_demo_behavior_table_preprocess_v1"
      ds = MaxComputeDataSource(table=input_table_name, project=project)
    3. Specify the name of the output table for the transformation.

      output_table_name = "rec_sln_demo_v1_fs_test_v1"
    4. Define the table transformation.

      trans_name = "drop_duplicates"  # Name of the table transformation.
      keys = ["user_id", "item_id"]  # Fields for deduplication.
      sort_keys = ["event_unix_time"]  # Sort fields.
      sort_order = ["desc"]  # Order definition.
      tran_i = TableTransform(trans_name, keys, sort_keys, sort_order)
      
  2. Define the feature transformation.

    feature1 = Feature(
        name="page_net_type",
        input=['page', 'net_type'],
        transform=ComboTransform(
            separator='_'
        )
    )
    feature2 = Feature(
        name="trim_playtime",
        type="double",
        transform="playtime/10"
    )
    
  3. Generate the pipeline.

    pipeline = fs_client.create_pipeline(ds, output_table_name).add_table_transform(tran_i).add_feature_transform([feature1, feature2], keep_input_columns=True)
  4. Generate and execute the transformation.

    execute_date = '20240605'
    output_table = pipeline.execute(execute_date, drop_table=True)

    The preceding code involves the following two steps:

    1. Generate transformation configurations. These configurations specify the SQL statements and information required for the transformation, such as inputs, outputs, parameters, and dependencies.

    2. Execute the transformation. The transformation is executed based on the configurations from the previous step, and the results are stored in the output table.

  5. View the results.

    1. View the results in the generated table. The results are rendered directly in the pandas.DataFrame format.

      pd_ret = output_table.to_pandas(execute_date, limit=20)
    2. Display the content of pd_ret.

      pd_ret
    3. View the generated configurations. These include the input table definition, transformation SQL, dependencies, parameters, and output table definition. After being saved, these configurations can be used for debugging and subsequent routine online tasks.

      transform_info = output_table.transform_info
    4. View the content of transform_info.

      transform_info
    5. View the input configurations for the first stage.

      pipeline_config = pipeline.pipeline_config
    6. View the content of pipeline_config.

      pipeline_config

3. Statistical feature transformation

Statistical features are a common data pre-processing method in machine learning and data analytics. They are used to generate more representative and easily interpretable features. These transformations summarize, calculate, and extract information from raw data, which enables models to better understand time trends, periodicity, and anomalies in the data. The advantages are as follows:

  • Capture time trends: In user behavioral data, behavior from the most recent period often has a greater impact on the current state.

  • Reduce noise: Raw data may contain a large amount of noise. Using statistical transformations, you can use aggregate operations to reduce the impact of this noise.

  • Enrich features: Statistical transformations can generate new features, increasing the model's expressive power.

  • Improve model performance: By introducing statistical features, you can significantly improve the model's prediction performance.

  • Enhance interpretability: Statistical features are easier to interpret and understand, which makes problem diagnosis and analysis more convenient.

  • Data compression: In some cases, statistical features can effectively reduce the data dimensionality.

Although the implementation process for statistical features is complex, you can create many statistical features using the simple definitions described below.

Definition and running of a single statistical feature transformation

  1. Define the input and output table names.

    input_agg_table_name = "rec_sln_demo_behavior_table_preprocess_wide_v1"
    ds_agg = MaxComputeDataSource(table=input_agg_table_name, project=project)
    output_agg_table_name = "rec_sln_demo_behavior_test_agg_v1"
  2. Define a statistical feature.

    feature_agg1 = Feature(
                name="user_avg_praise_count_1d",
                input=["praise_count"],
                transform=AggregationTransform(
                    agg_func="avg", # Name of the aggregate function. Optional values are 'avg', 'sum', 'min', 'max'.
                    condition=Condition(field="event", value="expr", operator="<>"), # Set a condition that is met when the value of the "event" field is not equal to "expr". You can understand the specific logic from the generated SQL statement.
                    group_by_keys="user_id", # The key corresponding to group by.
                    window_size=DayOf(1), # The window size, which is 1 day here.
                ),
            )
  3. You can create a pipeline and run the statistical feature transformation.

    agg_pipeline = fs_client.create_pipeline(ds_agg, output_agg_table_name).add_feature_transform([feature_agg1])
  4. Generate and execute the transformation.

    execute_date = '20240605'
    print("transform_info = ", agg_pipeline.transform_info)
    output_agg_table = agg_pipeline.execute(execute_date, drop_table=True)
  5. View the content of transform_info.

    transform_info_agg = output_agg_table.transform_info
    transform_info_agg
  6. View the results.

    pd_ret = output_agg_table.to_pandas(execute_date, limit=20)
    pd_ret

Automatic JOIN for statistical feature transformations with different windows

  1. Define the output table name.

    output_agg_table_name_2 = "rec_sln_demo_behavior_test_agg_v2"
  2. Define statistical features for multiple different windows. The defined window sizes are 1, 3, 7, 15, and 30 days.

    feature_agg1 = Feature(
                name="user_avg_praise_count_1d",
                input=["praise_count"],
                transform=AggregationTransform(
                    agg_func="avg",
                    condition=Condition(field="event", value="expr", operator="<>"),
                    group_by_keys="user_id",
                    window_size=DayOf(1),
                ),
            )
    feature_agg2 = Feature(
                name="user_avg_praise_count_3d",
                input=["praise_count"],
                transform=AggregationTransform(
                    agg_func="avg",
                    condition=Condition(field="event", value="expr", operator="<>"),
                    group_by_keys="user_id",
                    window_size=DayOf(3),
                ),
            )
    feature_agg3 = Feature(
                name="user_avg_praise_count_7d",
                input=["praise_count"],
                transform=AggregationTransform(
                    agg_func="avg",
                    condition=Condition(field="event", value="expr", operator="<>"),
                    group_by_keys="user_id",
                    window_size=DayOf(7),
                ),
            )
    feature_agg4 = Feature(
                name="user_avg_praise_count_15d",
                input=["praise_count"],
                transform=AggregationTransform(
                    agg_func="avg",
                    condition=Condition(field="event", value="expr", operator="<>"),
                    group_by_keys="user_id",
                    window_size=DayOf(15),
                ),
            )
    feature_agg5 = Feature(
                name="user_avg_praise_count_30d",
                input=["praise_count"],
                transform=AggregationTransform(
                    agg_func="avg",
                    condition=Condition(field="event", value="expr", operator="<>"),
                    group_by_keys="user_id",
                    window_size=DayOf(30),
                ),
            )
  3. Create a pipeline.

    agg_pipeline_2 = fs_client.create_pipeline(ds_agg, output_agg_table_name_2).add_feature_transform([feature_agg1, feature_agg2, feature_agg3, feature_agg4, feature_agg5])
  4. Generate and execute the pipeline.

    execute_date = '20240605'
    output_agg_table_2 = agg_pipeline_2.execute(execute_date, drop_table=True)
  5. View the transformation results.

    transform_info_agg_2 = output_agg_table_2.transform_info
    transform_info_agg_2
  6. View the results of the table run.

    pd_ret_2 = output_agg_table_2.to_pandas(execute_date, limit=20)
    pd_ret_2

Automatic merging and type derivation for multiple statistical feature transformation processes

To optimize calculations, multiple features with the same window size are automatically merged and calculated in the same group window block. The calculation process involves type changes. For example, `avg` transforms the `bigint` type to the `double` type. It is difficult to remember the numerous types of input features. Therefore, the statistical feature transformation process supports automatic type derivation, which means you do not need to specify the type in advance. The type of the resulting feature is automatically derived during the feature definition.

  1. Define the output table name.

    output_agg_table_name_3 = "rec_sln_demo_behavior_test_agg_v3"
  2. Define more features of different types.

    feature_agg6 = Feature(
                name="user_expr_cnt_1d",
                transform=AggregationTransform(
                    agg_func="count",
                    condition=Condition(field="event", value="expr", operator="="),
                    group_by_keys="user_id",
                    window_size=DayOf(1),
                )
            )
    feature_agg7 = Feature(
                name="user_expr_item_id_dcnt_1d",
                input=['item_id'],
                transform=AggregationTransform(
                    agg_func="count",
                    condition=Condition(field="event", value="expr", operator="="),
                    group_by_keys="user_id",
                    window_size=DayOf(1),
                ),
            )
    feature_agg8 = Feature(
                name="user_sum_praise_count_1d",
                input=["praise_count"],
                transform=AggregationTransform(
                    agg_func="sum",
                    condition=Condition(field="event", value="expr", operator="<>"),
                    group_by_keys="user_id",
                    window_size=DayOf(1),
                ),
            )
    feature_agg9 = Feature(
                name="user_sum_praise_count_3d",
                input=["praise_count"],
                transform=AggregationTransform(
                    agg_func="sum",
                    condition=Condition(field="event", value="expr", operator="<>"),
                    group_by_keys="user_id",
                    window_size=DayOf(3),
                ),
            )
  3. Create a pipeline.

    agg_pipeline_3 = fs_client.create_pipeline(ds_agg, output_agg_table_name_3).add_feature_transform([feature_agg1, feature_agg2, feature_agg3, feature_agg4, feature_agg5, feature_agg6, feature_agg7, feature_agg8, feature_agg9])
  4. Generate and execute the pipeline.

    execute_date = '20240605'
    output_agg_table_3 = agg_pipeline_3.execute(execute_date, drop_table=True)
  5. View the transformation results.

    transform_info_agg_3 = output_agg_table_3.transform_info
    transform_info_agg_3
  6. View the results of the table run.

    pd_ret_3 = output_agg_table_3.to_pandas(execute_date, limit=20)
    pd_ret_3

Built-in automatic extension functions to support automatic extension of statistical feature transformations

Manually implementing each statistical feature is complex because of the large number of features, which include different window sizes and numerous combinations of aggregate function calculations. The system provides built-in automatic extension functions. You only need to specify the input features to be counted, and the system automatically generates and completes the definitions for hundreds of statistical features.

  1. Specify the input features to be counted.

    name_prefix = "user_"
    input_list = ["playtime", "duration", "click_count", "praise_count"]
    event_name = 'event'
    event_type = 'expr'
    group_by_key = "user_id"
    count_feature_list = auto_count_feature_transform(name_prefix, input_list, event_name, event_type, group_by_key)
    print("len_count_feature_list = ", len(count_feature_list))
    print("count_feature_list = ", count_feature_list)
  2. Define the output table name and create a pipeline.

    output_agg_table_name_4 = "rec_sln_demo_behavior_test_agg_v4"
    agg_pipeline_4 =fs_client.create_pipeline(ds_agg, output_agg_table_name_4).add_feature_transform(count_feature_list)
  3. Generate and execute the pipeline.

    execute_date = '20240605'
    output_agg_table_4 = agg_pipeline_4.execute(execute_date, drop_table=True)
  4. View the transformation results.

    transform_info_agg_4 = output_agg_table_4.transform_info
    transform_info_agg_4
  5. View the results of the table run.

    pd_ret_4 = output_agg_table_4.to_pandas(execute_date, limit=20)
    pd_ret_4

Support for simultaneous transformation of different group keys

The previous sections describe the processing method for when all group keys are the same. The system also supports transformation operations on different group keys. The following is an example:

  1. Define the output table name.

    input_agg_table_name = "rec_sln_demo_behavior_table_preprocess_wide_v1"
    ds_agg = MaxComputeDataSource(table=input_agg_table_name, project=project)
    output_agg_table_name_5 = "rec_sln_demo_behavior_test_agg_v5"
  2. Define features for different group keys.

    feature_agg1 = Feature(
                name="item__sum_follow_cnt_15d",
                input=['follow_cnt'],
                transform=AggregationTransform(
                    agg_func="sum",
                    condition=Condition(field="event", value="expr", operator="="),
                    group_by_keys="item_id",
                    window_size=DayOf(1),
                )
            )
    feature_agg2 = Feature(
                name="author__max_follow_cnt_15d",
                input=['follow_cnt'],
                transform=AggregationTransform(
                    agg_func="max",
                    condition=Condition(field="event", value="expr", operator="="),
                    group_by_keys="author",
                    window_size=DayOf(15),
                ),
            )
  3. Create a pipeline.

    agg_pipeline_5 = fs_client.create_pipeline(ds_agg, output_agg_table_name_5).add_feature_transform([feature_agg1, feature_agg2])
  4. Generate and execute the pipeline.

    execute_date = '20240605'
    output_agg_table_5 = agg_pipeline_5.execute(execute_date, drop_table=True)
  5. View the transformation results.

    transform_info_agg_5 = output_agg_table_5.transform_info
    transform_info_agg_5

4. WindowTransform feature transformation

The preceding statistical feature transformation is sufficient for common feature engineering scenarios. However, in some large-scale recommendation scenarios, there are more advanced requirements. FeatureStore supports WindowTransform feature transformation, which lets you easily obtain KV features and use daily intermediate tables to optimize the calculation process. This reduces feature calculation time and saves computing costs. The advantages are as follows:

  • Capture complex non-linear interactions: Simple features (such as user age and gender) have difficulty expressing complex user preferences. Feature crossing can help capture more complex non-linear interaction relationships between users and items.

  • Improve prediction accuracy: Cross features can significantly improve the performance of recommendation systems and advertising systems.

  • Reduce storage space: For large-scale user and item collections, directly storing the interaction features of each user-item pair is not feasible. Feature extraction and feature transformation can effectively reduce the number of features that need to be stored.

  • Improve inference efficiency: By pre-calculating and storing cross features, you can quickly find and use these features during real-time inference, which improves the system response speed.

The implementation process of WindowTransform feature transformation is introduced in the following sections:

Simple aggregate function calculation process

Simple aggregate functions include count, sum, max, and min. The calculation process for these aggregate functions is relatively straightforward. After a daily-level summarization, a further summarization over multiple days is performed to obtain the final result. In addition, this section also introduces daily intermediate tables and the use of User-Defined Functions (UDFs) (MaxCompute UDF overview) to obtain the final calculation result. In the actual calculation process, the process for a user to execute feature engineering is the same as the conventional feature transformation described previously. The FeatureStore Python SDK automatically handles the creation of daily intermediate tables, UDF generation, resource upload, and automatic function registration. You can implement the feature engineering process without being aware of these details.

  1. Define the input and output table names.

    input_window_table_name = "rec_sln_demo_behavior_table_preprocess_wide_v1"
    ds_window_1 = MaxComputeDataSource(table=input_agg_table_name, project=project)
    output_window_table_name_1 = "rec_sln_demo_behavior_test_window_v1"
  2. Define the WindowTransform features.

    win_feature1 = Feature(
        name="item__kv_gender_click_7d",
        input=["gender"],
        transform=WindowTransform(
            agg_func="count",
            condition=Condition(field="event", value="click", operator="="),
            group_by_keys="item_id",
            window_size=DayOf(7),
        ),
    )
    win_feature2 = Feature(
        name="item__kv_gender_click_cnt_7d",
        input=["gender"],
        transform=WindowTransform(
            agg_func="sum", # Aggregate function. Optional values are 'sum', 'avg', 'max', 'min'.
            agg_field="click_count", # Perform the aggregate function calculation on this feature.
            condition=Condition(field="event", value="click", operator="="),
            group_by_keys="item_id",
            window_size=DayOf(7),
        ),
    )
  3. Create a pipeline and run the WindowTransform feature transformation.

    window_pipeline_1 = fs_client.create_pipeline(ds_window_1, output_window_table_name_1).add_feature_transform([win_feature1, win_feature2], keep_input_columns=True)
  4. Generate and execute the transformation.

    This generation process creates a daily temporary intermediate table. At this point, `DROP TABLE` only deletes the final result and does not delete the intermediate temporary table.

    execute_date = '20240605'
    print("transform_info = ", window_pipeline_1.transform_info)
    output_window_table_1 = window_pipeline_1.execute(execute_date, drop_table=True)

    In addition, because statistics for multiple days are involved (for example, the preceding example counts data for 7 days), the intermediate temporary table typically calculates data only for the latest partition. For this reason, the system provides the `backfill_partitions` parameter. When you execute the process for the first time, you can set this parameter to `True`, and the system automatically backfills the data for the dependent days. For example, if the count involves 7 days of data, the system automatically completes the 7 days of data. For subsequent routine runs, you can set the parameter to `False` to complete the data only for the latest day's partition.

    execute_date = '20240506'
    output_window_table_1 = window_pipeline_1.execute(execute_date, backfill_partitions=True)
    • When the `backfill_partitions` parameter is set to `True`, the system automatically completes the data for the dependent days of the temporary intermediate table. We recommend that you do this during the first routine run.

    • If the number of days to be counted is large, running the preceding code takes a long time.

  5. View the sink table data.

    window_ret_1 = output_window_table_1.to_pandas(execute_date, limit=50)
    window_ret_1
  6. View the actual calculation process.

    window_pipeline_1.transform_info

    As you can see from the calculation process, the system generates a daily intermediate temporary table named `rec_sln_demo_behavior_table_preprocess_wide_v1_tmp_daily`. This table summarizes the daily results and stores them in a fixed partition, which avoids repeated calculations.

    In addition, a UDF named `count_kv` is used to calculate the final result. This UDF automatically classifies and summarizes the statistical results into a result map, which is stored in a string format. This facilitates subsequent offline and online result inference.

The preceding content introduces the calculation process for simple aggregate functions, using `count` and `sum` as examples. Although this process involves concepts such as daily intermediate temporary tables and UDFs, the core flow is the same as a conventional data transformation operation and does not add operational complexity. Other simple aggregate functions, such as `max` and `min`, are similar.

avg aggregate function calculation process

Because averaging the results of daily averages leads to inaccurate calculations, the avg aggregate function has a specific calculation process. The correct method is to first calculate the total sum (sum_v) and total count (count_v) for the entire period, and then calculate the average using the formula sum_v/count_v.

Although this aggregate function is documented separately, its complex calculation details are encapsulated in transform_info. Therefore, you do not need to understand the underlying details. You can use this function like a conventional feature to generate the final result.

  1. Define the input and output table names.

    input_window_table_name = "rec_sln_demo_behavior_table_preprocess_wide_v1"
    ds_window_1 = MaxComputeDataSource(table=input_window_table_name, project=project)
    output_window_table_name_2 = "rec_sln_demo_behavior_test_window_v2"
  2. You can define the WindowTransform feature.

    win_feature1 = Feature(
        name="item__kv_gender_click_avg_7d",
        input=["gender"],
        transform=WindowTransform(
            agg_func="avg",
            agg_field="click_count",
            condition=Condition(field="event", value="click", operator="="),
            group_by_keys="item_id",
            window_size=DayOf(7),
        ),
    )
    win_feature2 = Feature(
        name="item__kv_gender_click_avg_15d",
        input=["gender"],
        transform=WindowTransform(
            agg_func="avg",
            agg_field="click_count",
            condition=Condition(field="event", value="click", operator="="),
            group_by_keys="item_id",
            window_size=DayOf(15),
        ),
    )
  3. Create a pipeline and run the WindowTransform feature transformation.

    window_pipeline_2 = fs_client.create_pipeline(ds_window_1, output_window_table_name_2).add_feature_transform([win_feature1, win_feature2])
  4. Generate and execute the transformation.

    execute_date = '20240605'
    print("transform_info = ", window_pipeline_2.transform_info)
    output_window_table_2 = window_pipeline_2.execute(execute_date, drop_table=True)
  5. View the sink table data.

    window_ret_2 = output_window_table_2.to_pandas(execute_date, limit=50)
    window_ret_2

Function calculation process for multiple group keys

Similarly, WindowTransform supports simultaneous calculations with multiple group keys. The result is then left-joined to the input table. The following is an example:

  1. Define the input and output table names.

    input_window_table_name = "rec_sln_demo_behavior_table_preprocess_wide_v1"
    ds_window_1 = MaxComputeDataSource(table=input_window_table_name, project=project)
    output_window_table_name_3 = "rec_sln_demo_behavior_test_window_v3"
  2. Define the WindowTransform features.

    win_feature1 = Feature(
        name="item__kv_gender_click_7d",
        input=["gender"],
        transform=WindowTransform(
            agg_func="count",
            condition=Condition(field="event", value="click", operator="="),
            group_by_keys="item_id",
            window_size=DayOf(7),
        ),
    )
    win_feature2 = Feature(
        name="item__kv_gender_click_cnt_7d",
        input=["gender"],
        transform=WindowTransform(
            agg_func="sum",
            agg_field="click_count",
            condition=Condition(field="event", value="click", operator="="),
            group_by_keys="item_id",
            window_size=DayOf(7),
        ),
    )
    win_feature3 = Feature(
        name="author__kv_gender_click_15d",
        input=["gender"],
        transform=WindowTransform(
            agg_func="count",
            condition=Condition(field="event", value="click", operator="="),
            group_by_keys="author",
            window_size=DayOf(7),
        ),
    )
    win_feature4 = Feature(
        name="author__kv_gender_click_cnt_15d",
        input=["gender"],
        transform=WindowTransform(
            agg_func="sum",
            agg_field="click_count",
            condition=Condition(field="event", value="click", operator="="),
            group_by_keys="author",
            window_size=DayOf(7),
        ),
    )
  3. Create a pipeline and run the WindowTransform feature transformation.

    window_pipeline_3 = fs_client.create_pipeline(ds_window_1, output_window_table_name_3).add_feature_transform([win_feature1, win_feature2, win_feature3, win_feature4])
  4. Generate and execute the transformation.

    execute_date = '20240605'
    print("transform_info = ", window_pipeline_3.transform_info)
    output_window_table_3 = window_pipeline_3.execute(execute_date, drop_table=True)
  5. View the sink table data.

    window_ret_3 = output_window_table_3.to_pandas(execute_date, limit=50)
    window_ret_3

Built-in automatic extension functions to support automatic extension of WindowTransform features

Similar to statistical feature transformations, manually implementing each WindowTransform statistical feature is complex because of the large number of features, which include different window sizes and numerous combinations of aggregate function calculations. The system provides built-in automatic extension functions. You only need to specify the input features to be counted, and the system automatically generates and completes the definitions for hundreds of statistical features.

  1. Specify the input features to be counted.

    name_prefix = "item"
    input_list = ['gender']
    agg_field = ["click_count"]
    event_name = 'event'
    event_type = 'click'
    group_by_key = "item_id"
    window_size = [7, 15, 30, 45]
    window_transform_feature_list = auto_window_feature_transform(name_prefix, input_list, agg_field, event_name, event_type, group_by_key, window_size)
    print("len_window_transform_feature_list = ", len(window_transform_feature_list))
    print("window_transform_feature_list = ", window_transform_feature_list)
  2. Define the output table name and create a pipeline.

    input_window_table_name = "rec_sln_demo_behavior_table_preprocess_wide_v1"
    ds_window_1 = MaxComputeDataSource(table=input_window_table_name, project=project)
    output_window_table_name_4 = "rec_sln_demo_behavior_test_window_v4"
    window_pipeline_4 =fs_client.create_pipeline(ds_window_1, output_window_table_name_4).add_feature_transform(window_transform_feature_list)
  3. Generate and execute the transformation.

    execute_date = '20240605'
    print("transform_info = ", window_pipeline_4.transform_info)
    output_window_table_4 = window_pipeline_4.execute(execute_date, drop_table=True)

JoinTransform transformation

In the preceding feature engineering process, especially for AggregationTransform and WindowTransform, the inputs are behavior tables, and the output results are also stored in a behavior table. However, in most cases, the final table needed for online use is not a behavior table, but a feature table generated by joining the behavior table with other tables, such as a user table or an item table.

Therefore, `JoinTransform` is introduced to support joining the features from AggregationTransform and WindowTransform with existing user or item tables.

Associate JoinTransform with WindowTransform
  1. Define the WindowTransform input table.

    window_table_name = 'rec_sln_demo_behavior_table_preprocess_wide_v1'
    ds_window_1 = MaxComputeDataSource(table=window_table_name, project=project)
  2. Define the WindowTransform features.

    win_fea1 = Feature(
        name="item__kv_gender_click_7d",
        input=["gender"],
        transform=WindowTransform(
            agg_func="count",
            condition=Condition(field="event", value="click", operator="="),
            group_by_keys="item_id",
            window_size=DayOf(7),
      )
    )
  3. Create a pipeline.

    Note

    Because other tables need to be joined later, the output table is not specified here.

    win_pipeline_1 = fs_client.create_pipeline(ds_window_1).add_feature_transform([win_fea1])
  4. Define the JoinTransform input and output tables.

    item_table_name = 'rec_sln_demo_item_table_preprocess_v1'
    ds_join_1 = MaxComputeDataSource(table=item_table_name, project=project)
    output_table_name = 'rec_sln_demo_item_table_v1_fs_window_debug_v1'
  5. Create a JoinTransform pipeline and connect it with the WindowTransform pipeline.

    join_pipeline_1 = fs_client.create_pipeline(ds_join_1, output_table_name).merge(win_pipeline_1)
  6. Generate and execute the transformation.

    execute_date = '20240605'
    output_join_table_1 = join_pipeline_1.execute(execute_date, drop_table=True)
  7. View the sink table data.

    join_ret_1 = output_join_table_1.to_pandas(execute_date, limit = 50)
    join_ret_1
  8. View the actual calculation process.

    output_join_table_1.transform_info
Associate JoinTransform with AggregationTransform
  1. Define the AggregationTransform input table.

    agg_table_name = 'rec_sln_demo_behavior_table_preprocess_wide_v1'
    ds_agg_1 = MaxComputeDataSource(table=agg_table_name, project=project)
  2. Define the AggregationTransform features.

    agg_fea1 = Feature(
        name="user_avg_praise_count_1d",
        input=["praise_count"],
        transform=AggregationTransform(
            agg_func="avg",
            condition=Condition(field="event", value="expr", operator="<>"),
            group_by_keys="user_id",
            window_size=DayOf(1),
        ),
    )
  3. Create a pipeline.

    Note

    Because other tables need to be joined later, the output table is not specified here.

    agg_pipeline_1 = fs_client.create_pipeline(ds_agg_1).add_feature_transform([agg_fea1])
  4. Define the JoinTransform input and output tables.

    user_table_name = 'rec_sln_demo_user_table_preprocess_v1'
    ds_join_2 = MaxComputeDataSource(table=user_table_name, project=project)
    output_table_name_2 = 'rec_sln_demo_user_table_v1_fs_window_debug_v1'
  5. Create a JoinTransform pipeline and connect it with the AggregationTransform pipeline.

    join_pipeline_2 = fs_client.create_pipeline(ds_join_2, output_table_name_2).merge(agg_pipeline_1, keep_input_columns=False)
  6. Generate and execute the transformation.

    execute_date = '20240605'
    output_join_table_2 = join_pipeline_2.execute(execute_date, drop_table=True)
  7. View the sink table data.

    join_ret_2 = output_join_table_2.to_pandas(execute_date, limit = 50)
    join_ret_2
  8. View the actual calculation process.

    output_join_table_2.transform_info

References

  • For more information about application scenarios, see Best practices for feature engineering.

  • FeatureStore is suitable for all scenarios that require features, such as recommendation, financial risk control, and user growth scenarios. FeatureStore is integrated with common data source engines and recommendation service engines of Alibaba Cloud. FeatureStore provides an efficient and convenient end-to-end platform from feature registration to model development and application. For more information about FeatureStore, see Overview.

  • If you have questions when you use FeatureStore, you can join the DingTalk group 34415007523 for technical assistance.