MaxCompute PyFg Job可以離線批量產生複雜特徵,支援ODPS2.0的複雜類型(list、map、float、int等),根據設定檔和命令列參數決定是否對產生的特徵做分箱操作。
方式一:使用通用資源群組鏡像
在DataWorks的調度配置-資源屬性區塊選擇一個通用的資源群組,選擇最新的dataworks_pairec_task_pod鏡像。
注意:dataworks_pairec_task_pod鏡像的發布相對於pyfg的版本更新可能比較滯後,最新的鏡像內包含的pyfg包可能不是最新版本(具體版本請參考推薦方案定製-特徵配置產生的指令碼)。可按照下面描述的方式三自訂資源群組鏡像,使用最新的pyfg。
方式二:安裝依賴包(適用於老版DataWorks)
登入DataWorks控制台,建立獨享調度資源群組,使用營運助手安裝pyfg的包。
在DataWorks獨享資源群組中安裝pyfg包,頁面路徑為:DataWorks->管理中心->資源群組列表->營運助手,樣本如下:
/home/tops/bin/pip3 install -i https://pypi.tuna.tsinghua.edu.cn/simple --upgrade --force-reinstall http://tzrec.oss-cn-beijing.aliyuncs.com/third_party/pyfg101-1.0.1-cp37-cp37m-linux_x86_64.whl方式三:自訂資源群組鏡像(適用於新版DataWorks)
參考文檔:自訂鏡像。
上傳資源檔
上傳FG的設定檔(JSON格式)到MaxCompute的專案空間中。
某些FG運算元需要額外的資源檔,使用者需要手動上傳到MaxCompute的專案空間中。
特徵運算元 | 描述 | 資源檔配置項 |
文本歸一化 | 停用詞檔案 | |
文本分詞特徵 | 詞彙表設定檔 | |
文本相關性特徵 | 詞頻設定檔 | |
自訂運算元 | 運算元設定檔 |
建立輸出表
可以在DataWorks裡建立一個PyOdps3節點,執行下面的指令碼。程式會根據fg.json的內容建立輸出表,同時建立後續執行任務時需要使用的各種資源。
from pyfg101 import run_on_odps
fg_task = run_on_odps.FgTask(
args['input_table'],
args['output_table'],
args['fg_json_file'],
args['partition_value'],
force_delete_output_table=True,
force_update_resource=True)
fg_task.create_output_table(o)執行之前需要在調度配置裡配置好參數:input_table、output_table、fg_json_file、partition_value。
雖然fg_task.run(o)本身也會在沒有輸出表時自動建立輸出表,但還是建議先調用該介面預先建立好輸出表,防止在並發補資料時引起衝突導致任務失敗。
執行 FG 離線任務
在DataWorks裡建立一個PyOdps3節點,執行下面的指令碼,程式會根據fg.json的內容自動建立輸出表。
from pyfg101 import run_on_odps
fg_task = run_on_odps.FgTask(
args['input_table'],
args['output_table'],
args['fg_json_file'],
args['partition_value'],
batch_size=128,
force_delete_output_table=False,
force_update_resource=False)
fg_task.add_sql_setting('odps.stage.mapper.split.size', 256)
fg_task.run(o)
執行之前需要在調度配置裡配置好參數:input_table、output_table、fg_json_file、partition_value。
如果本地安裝了PyODPS,pyfg也可以安裝在本地,在本地提交任務。
參數說明
參數 | 預設值 | 說明 |
input_table | 無 | 輸入表。 |
output_table | 無 | 輸出表,會自動建立。 |
fg_json_file | 無 | FG設定檔,JSON格式。 |
partition_value | 無 | 指定輸入表的partition分區作為FG的輸入,結果儲存在輸出表的partition分區中。 |
schema | 無 | 指定[MaxCompute的Schema](Schema操作) |
batch_size | 128 | 批量處理的記錄數。 |
memory | 1024 | 任務節點使用的記憶體大小,單位:M |
force_delete_output_table | False | 是否刪除輸出表,設定為True時會先自動刪除輸出表,再運行任務。 |
force_update_resource | False | 是否更新資源,設定為True時會先自動更新資源,再運行任務;不要一直設定為True,會有並發衝突的問題。 |
output_merged_str | False | 是否合并字串,設定為 True 時會自動合并字串,輸出RTP格式的大String特徵 |
debug | False | 是否是調試模型,設定為True時會列印所有更新的資源的內容。 |
sql_setting | 無 |
|
fg_setting | 無 |
|
直接按照樣本修改代碼中的參數預設值即可。
補充說明
pyfg的包是裝在專有資源群組的一台機器上(gateway);這台機器可以提交SQL任務到MaxCompute。pyfg的包也可以裝在自己的任何一台機器上,這台機器通過pyodps這個工具就可以提交任務到MaxCompute叢集。
SQL任務裡的自訂UDF需要一些資源,包括FG的共用庫,各種設定檔(如fg.json、詞典、自訂OP的lib等),以及UDF自己的代碼檔案(.py),這些資源都需要上傳到MaxCompute叢集,儲存在MaxCompute的Distributed File System中。執行任務時,每個worker都會從MaxCompute叢集的Distributed File System中下載這些資源,並載入到記憶體中。
有部分資源是多個任務共用的,比如FG的共用庫,UDF的代碼檔案等。force_update_resource=True時,會先刪除原來的資源,再上傳新的資源。這裡有個時間間隔,在這個時間間隔內,會影響到其他正在執行的任務。