A MaxCompute PyFG (Python Feature Generation) job generates complex features in offline batches and supports complex ODPS 2.0 data types, including lists, maps, floats, and integers. It uses a configuration file and command-line parameters to determine whether to bin the generated features.
The end-to-end workflow has four stages:
Set up the runtime environment — install or configure the
pyfgpackage on your resource group or local machine.Upload resource files — upload
fg.jsonand any operator-specific resource files to your MaxCompute project.Create the output table — run a PyOdps3 node to create the output table before running the main task.
Run the offline task — run the Feature Generation (FG) job and verify the output.
Prerequisites
Before you begin, make sure you have:
A DataWorks workspace connected to a MaxCompute project
The
fg.jsonconfiguration file generated by Configure featuresPermissions to create and manage resources and tables in your MaxCompute project
Set up the runtime environment
Choose a setup method based on your DataWorks version and whether you need the latest version of pyfg:
| Method | When to use | DataWorks version | Latest pyfg |
|---|---|---|---|
| General-purpose resource group image | Default choice; simplest setup | Any | Not guaranteed |
| Install pyfg via pip | Image not available or pyfg is outdated | Older versions | Yes |
| Customize a resource group image | Full control over the image environment | New version | Yes |
Method 1: Use a general-purpose resource group image
In the DataWorks console, go to Scheduling Configuration > Resource Properties. Select a general-purpose resource group and choose the latest dataworks_pairec_task_pod image.
Thedataworks_pairec_task_podimage release may lag behind pyfg updates, so thepyfgpackage bundled in the image might not be the latest version. To check the required version, see the script generated by Configure features. If you need the latest version, use Method 3 instead.
Method 2: Install pyfg via pip (older versions of DataWorks)
Log in to the DataWorks console, create an exclusive resource group for scheduling, and then use O&M Assistant to install pyfg.
Navigate to DataWorks > Management Center > Resource Group List > O&M Assistant and run the following command:
/home/tops/bin/pip3 install -i https://pypi.tuna.tsinghua.edu.cn/simple --upgrade --force-reinstall http://tzrec.oss-cn-beijing.aliyuncs.com/third_party/pyfg102-1.0.2-cp37-cp37m-linux_x86_64.whlMethod 3: Customize a resource group image (new version of DataWorks)
For full control over the image environment, build a custom image that includes the pyfg version you need. For instructions, see Custom images.
Upload resource files
Upload the FG configuration file in JSON format to your MaxCompute project.
Some feature operators require additional resource files. Upload these files to your MaxCompute project before running the task.
| Feature operator | Description | Resource file parameter |
|---|---|---|
| text_normalizer | Text normalization | Stop word file stop_char_file |
| tokenize_feature | Text tokenization feature | Vocabulary configuration file vocab_file |
| bm25_feature | Text relevance feature | Term frequency configuration file term_doc_freq_file |
| custom_feature | Custom operator | Operator configuration file operator_lib_file |
Create the output table
Create the output table before running the main FG task. When multiple backfill tasks run concurrently, each task checks whether the output table exists. If the table does not exist, multiple tasks may attempt to create it at the same time, causing task failures. Creating the table in advance eliminates this risk.
In DataWorks, create a PyOdps3 node and run the following script. The script reads fg.json and creates the output table and any required resources.
from pyfg102 import run_on_odps
fg_task = run_on_odps.FgTask(
args['input_table'],
args['output_table'],
args['fg_json_file'],
args['partition_value'],
force_delete_output_table=True, # Drop and recreate the output table if it exists
force_update_resource=True) # Upload the latest resources before creating the table
fg_task.create_output_table(o)Before running the script, configure the following parameters in Scheduling Configuration: input_table, output_table, fg_json_file, and partition_value.
Run the offline task
In DataWorks, create a PyOdps3 node and run the following script. The script processes each partition of the input table and writes feature results to the corresponding partition in the output table. If the output table does not exist, the script creates it automatically based on fg.json.
from pyfg102 import run_on_odps
fg_task = run_on_odps.FgTask(
args['input_table'],
args['output_table'],
args['fg_json_file'],
args['partition_value'],
batch_size=128, # Number of records to process per batch
force_delete_output_table=False, # Keep the existing output table
force_update_resource=False) # Use already-uploaded resources
fg_task.add_sql_setting('odps.stage.mapper.split.size', 256)
fg_task.run(o)Before running the script, configure the following parameters in Scheduling Configuration: input_table, output_table, fg_json_file, and partition_value.
To run the task from a local machine, install pyfg and PyODPS first.
Parameters
| Parameter | Default | Description |
|---|---|---|
input_table | None | The input table. |
output_table | None | The output table. Created automatically if it does not exist. |
fg_json_file | None | The FG configuration file in JSON format. |
partition_value | None | The input table partition to process. Results are written to the corresponding partition in the output table. |
schema | None | The MaxCompute schema. For more information, see Schema operations. |
batch_size | 128 | The number of records to process per batch. |
memory | 1024 | The memory to allocate to the task node, in MB. |
force_delete_output_table | False | If True, drops the existing output table before the task runs. |
force_update_resource | False | If True, updates resources before the task runs. Do not set this to True in production — when multiple tasks run concurrently, this causes the original resource to be deleted before the new one is uploaded, which can disrupt other running tasks. |
output_merged_str | False | If True, merges output strings into a single string feature in RTP format. |
debug | False | If True, enables debug mode and prints all updated resource content to the logs. |
sql_setting | None | MaxCompute SQL parameters, set via fg_task.add_sql_setting(). Call the method multiple times to add multiple settings. For available parameters, see Flag parameters. |
fg_setting | None | FG parameters, set via fg_task.add_fg_setting() as key-value pairs. You can add multiple configuration items. Available since v0.4.0. For more information, see Global configuration. |
How it works
You submit SQL tasks to MaxCompute from a gateway machine in an exclusive resource group where pyfg is installed, or from a local machine with PyODPS installed.
Each SQL task uses custom user-defined functions (UDFs) that depend on several resources: the FG shared library, configuration files such as fg.json, dictionaries, custom operator libraries, and UDF code files (.py). All these resources are uploaded to the MaxCompute cluster and stored in the MaxCompute distributed file system. When a task runs, each worker downloads the required resources and loads them into memory.
Some resources are shared across tasks — including FG shared libraries and UDF code files. When force_update_resource=True, the original resource is deleted before the new one is uploaded, creating a time interval during which other running tasks can be affected.
What's next
Feature configuration — generate the
fg.jsonfile used by the FG taskBuilt-in feature operators — reference for all built-in operators and their resource file parameters
Custom feature operators — build and register custom operators
Flag parameters — full list of MaxCompute SQL flag parameters