All Products
Search
Document Center

Artificial Intelligence Recommendation:Use FG in MaxCompute tasks

Last Updated:Mar 31, 2026

A MaxCompute PyFG (Python Feature Generation) job generates complex features in offline batches and supports complex ODPS 2.0 data types, including lists, maps, floats, and integers. It uses a configuration file and command-line parameters to determine whether to bin the generated features.

The end-to-end workflow has four stages:

  1. Set up the runtime environment — install or configure the pyfg package on your resource group or local machine.

  2. Upload resource files — upload fg.json and any operator-specific resource files to your MaxCompute project.

  3. Create the output table — run a PyOdps3 node to create the output table before running the main task.

  4. Run the offline task — run the Feature Generation (FG) job and verify the output.

Prerequisites

Before you begin, make sure you have:

  • A DataWorks workspace connected to a MaxCompute project

  • The fg.json configuration file generated by Configure features

  • Permissions to create and manage resources and tables in your MaxCompute project

Set up the runtime environment

Choose a setup method based on your DataWorks version and whether you need the latest version of pyfg:

MethodWhen to useDataWorks versionLatest pyfg
General-purpose resource group imageDefault choice; simplest setupAnyNot guaranteed
Install pyfg via pipImage not available or pyfg is outdatedOlder versionsYes
Customize a resource group imageFull control over the image environmentNew versionYes

Method 1: Use a general-purpose resource group image

In the DataWorks console, go to Scheduling Configuration > Resource Properties. Select a general-purpose resource group and choose the latest dataworks_pairec_task_pod image.

The dataworks_pairec_task_pod image release may lag behind pyfg updates, so the pyfg package bundled in the image might not be the latest version. To check the required version, see the script generated by Configure features. If you need the latest version, use Method 3 instead.

Method 2: Install pyfg via pip (older versions of DataWorks)

Log in to the DataWorks console, create an exclusive resource group for scheduling, and then use O&M Assistant to install pyfg.

Navigate to DataWorks > Management Center > Resource Group List > O&M Assistant and run the following command:

/home/tops/bin/pip3 install -i https://pypi.tuna.tsinghua.edu.cn/simple --upgrade --force-reinstall http://tzrec.oss-cn-beijing.aliyuncs.com/third_party/pyfg102-1.0.2-cp37-cp37m-linux_x86_64.whl

Method 3: Customize a resource group image (new version of DataWorks)

For full control over the image environment, build a custom image that includes the pyfg version you need. For instructions, see Custom images.

Upload resource files

Upload the FG configuration file in JSON format to your MaxCompute project.

Some feature operators require additional resource files. Upload these files to your MaxCompute project before running the task.

Feature operatorDescriptionResource file parameter
text_normalizerText normalizationStop word file stop_char_file
tokenize_featureText tokenization featureVocabulary configuration file vocab_file
bm25_featureText relevance featureTerm frequency configuration file term_doc_freq_file
custom_featureCustom operatorOperator configuration file operator_lib_file

Create the output table

Create the output table before running the main FG task. When multiple backfill tasks run concurrently, each task checks whether the output table exists. If the table does not exist, multiple tasks may attempt to create it at the same time, causing task failures. Creating the table in advance eliminates this risk.

In DataWorks, create a PyOdps3 node and run the following script. The script reads fg.json and creates the output table and any required resources.

from pyfg102 import run_on_odps

fg_task = run_on_odps.FgTask(
    args['input_table'],
    args['output_table'],
    args['fg_json_file'],
    args['partition_value'],
    force_delete_output_table=True,   # Drop and recreate the output table if it exists
    force_update_resource=True)       # Upload the latest resources before creating the table
fg_task.create_output_table(o)

Before running the script, configure the following parameters in Scheduling Configuration: input_table, output_table, fg_json_file, and partition_value.

Run the offline task

In DataWorks, create a PyOdps3 node and run the following script. The script processes each partition of the input table and writes feature results to the corresponding partition in the output table. If the output table does not exist, the script creates it automatically based on fg.json.

from pyfg102 import run_on_odps

fg_task = run_on_odps.FgTask(
    args['input_table'],
    args['output_table'],
    args['fg_json_file'],
    args['partition_value'],
    batch_size=128,                    # Number of records to process per batch
    force_delete_output_table=False,   # Keep the existing output table
    force_update_resource=False)       # Use already-uploaded resources
fg_task.add_sql_setting('odps.stage.mapper.split.size', 256)
fg_task.run(o)

Before running the script, configure the following parameters in Scheduling Configuration: input_table, output_table, fg_json_file, and partition_value.

To run the task from a local machine, install pyfg and PyODPS first.

Parameters

ParameterDefaultDescription
input_tableNoneThe input table.
output_tableNoneThe output table. Created automatically if it does not exist.
fg_json_fileNoneThe FG configuration file in JSON format.
partition_valueNoneThe input table partition to process. Results are written to the corresponding partition in the output table.
schemaNoneThe MaxCompute schema. For more information, see Schema operations.
batch_size128The number of records to process per batch.
memory1024The memory to allocate to the task node, in MB.
force_delete_output_tableFalseIf True, drops the existing output table before the task runs.
force_update_resourceFalseIf True, updates resources before the task runs. Do not set this to True in production — when multiple tasks run concurrently, this causes the original resource to be deleted before the new one is uploaded, which can disrupt other running tasks.
output_merged_strFalseIf True, merges output strings into a single string feature in RTP format.
debugFalseIf True, enables debug mode and prints all updated resource content to the logs.
sql_settingNoneMaxCompute SQL parameters, set via fg_task.add_sql_setting(). Call the method multiple times to add multiple settings. For available parameters, see Flag parameters.
fg_settingNoneFG parameters, set via fg_task.add_fg_setting() as key-value pairs. You can add multiple configuration items. Available since v0.4.0. For more information, see Global configuration.

How it works

You submit SQL tasks to MaxCompute from a gateway machine in an exclusive resource group where pyfg is installed, or from a local machine with PyODPS installed.

Each SQL task uses custom user-defined functions (UDFs) that depend on several resources: the FG shared library, configuration files such as fg.json, dictionaries, custom operator libraries, and UDF code files (.py). All these resources are uploaded to the MaxCompute cluster and stored in the MaxCompute distributed file system. When a task runs, each worker downloads the required resources and loads them into memory.

Some resources are shared across tasks — including FG shared libraries and UDF code files. When force_update_resource=True, the original resource is deleted before the new one is uploaded, creating a time interval during which other running tasks can be affected.

What's next