A MaxCompute PyFG job generates complex features in offline batches and supports complex ODPS 2.0 data types, such as lists, maps, floats, and integers. The job uses a configuration file and command-line parameters to determine whether to bin the generated features.
Method 1: Use a general-purpose resource group image
In the DataWorks console, navigate to the Scheduling Configuration - Resource Properties section. Select a general-purpose resource group and the latest dataworks_pairec_task_pod image.
Note: The release of the dataworks_pairec_task_pod image may lag behind pyfg updates. Therefore, the pyfg package included in the latest image may not be the latest version. For the specific version, see the script generated as described in Configure features. You can follow Method 3 to customize the resource group image to use the latest version of pyfg.
Method 2: Install dependency packages (for older versions of DataWorks)
Log on to the DataWorks console, create an exclusive resource group for scheduling, and then use O&M Assistant to install the pyfg package.
To install the pyfg package in a DataWorks exclusive resource group for scheduling, navigate to DataWorks > Management Center > Resource Group List > <a href="https://dataworks.console.aliyun.com/resource/runcommand" id="963b1e2ebd2ts">O&M Assistant</a>.
/home/tops/bin/pip3 install -i https://pypi.tuna.tsinghua.edu.cn/simple --upgrade --force-reinstall http://tzrec.oss-cn-beijing.aliyuncs.com/third_party/pyfg101-1.0.1-cp37-cp37m-linux_x86_64.whlMethod 3: Use a custom resource group image (for newer versions of DataWorks)
For more information, see Custom images.
Upload resource files
Upload the FG configuration file in JSON format to your MaxCompute project.
Some FG operators require additional resource files. You must manually upload these files to your MaxCompute project.
Feature operator | Description | Resource file configuration item |
Text normalization | Stop word file | |
Text tokenization feature | Vocabulary configuration file | |
Text relevance feature | Term frequency configuration file | |
Custom operator | Operator configuration file |
Create an output table
In DataWorks, create a PyOdps3 node and run the following script. This script creates an output table and other required resources based on the contents of the fg.json file.
from pyfg101 import run_on_odps
fg_task = run_on_odps.FgTask(
args['input_table'],
args['output_table'],
args['fg_json_file'],
args['partition_value'],
force_delete_output_table=True,
force_update_resource=True)
fg_task.create_output_table(o)Before you run the script, configure the following parameters in the scheduling configuration: input_table, output_table, fg_json_file, and partition_value.
The fg_task.run(o) method automatically creates an output table if it does not exist. However, we recommend that you create the table in advance to prevent conflicts that might cause task failures during concurrent data backfills.
Run an FG offline task
Create a PyOdps3 node in DataWorks and run the following script to automatically create an output table based on the contents of fg.json.
from pyfg101 import run_on_odps
fg_task = run_on_odps.FgTask(
args['input_table'],
args['output_table'],
args['fg_json_file'],
args['partition_value'],
batch_size=128,
force_delete_output_table=False,
force_update_resource=False)
fg_task.add_sql_setting('odps.stage.mapper.split.size', 256)
fg_task.run(o)
Before you run the script, set the following parameters in the scheduling configuration: input_table, output_table, fg_json_file, and partition_value.
If PyODPS is installed on your local machine, you can also install pyfg to submit tasks.
Parameters
Parameter | Default value | Description |
input_table | None | The input table. |
output_table | None | The output table. It is created automatically. |
fg_json_file | None | The FG configuration file in JSON format. |
partition_value | None | Specifies the partition of the input table to use as input for FG. The results are saved to the corresponding partition of the output table. |
schema | None | Specifies the MaxCompute schema. For more information, see Schema operations. |
batch_size | 128 | The number of records to process in a batch. |
memory | 1024 | The amount of memory used by the task node, in MB. |
force_delete_output_table | False | Specifies whether to delete the output table. If set to True, the output table is deleted before the task runs. |
force_update_resource | False | Specifies whether to update resources. If set to True, resources are updated before the task runs. Do not always set this to True. It can cause concurrency conflicts. |
output_merged_str | False | Specifies whether to merge strings. If set to True, strings are automatically merged to output a large string feature in RTP format. |
debug | False | Specifies whether to run in debug mode. If set to True, the content of all updated resources is printed. |
sql_setting | None | The key and value parameters for the |
fg_setting | None | The key and value parameters for the |
Modify the default parameter values in the example code.
Additional information
The pyfg package is installed on a gateway machine in an exclusive resource group. This machine can submit SQL tasks to MaxCompute. Alternatively, you can install the pyfg package on any machine that has the pyodps tool and then use that machine to submit tasks to a MaxCompute cluster.
Custom User-Defined Functions (UDFs) in SQL tasks require several resources, such as the FG shared library, configuration files like fg.json, dictionaries, custom operator libraries, and UDF code files (.py). You must upload all these resources to the MaxCompute cluster, where they are stored in a distributed file system. When a task runs, each worker downloads these resources from the distributed file system and loads them into memory.
Some resources, such as the FG shared library and UDF code files, are shared among multiple tasks. When you set force_update_resource=True, the original resources are deleted before new ones are uploaded. This process creates a time gap that might affect other running tasks.