DataWorks為您提供PyODPS 3節點,您可以在該節點中直接使用Python代碼編寫MaxCompute作業,並進行作業的周期性調度。本文為您介紹如何通過DataWorks實現Python任務的配置與調度。
節點介紹
PyODPS是MaxCompute的Python SDK,提供了簡潔易用的編程介面,讓您能夠使用Python編寫作業、查詢表和視圖,以及管理MaxCompute資源,詳情請參見PyODPS。在DataWorks中,您可以通過PyODPS節點來調度運行Python任務,並將其與其他作業進行整合操作。
注意事項
在DataWorks資源群組本地運行PyODPS節點代碼時,若代碼中需要調用第三方包,Serverless資源群組可通過自訂鏡像安裝第三方包。
說明如果代碼中存在UDF引用第三方包的情況,不支援使用上述方式,具體配置方法,請參見UDF樣本:Python UDF使用第三方包。
如需升級PyODPS版本,Serverless資源群組可通過自訂鏡像執行
/home/tops/bin/pip3 install pyodps==0.12.1進行升級(可以將0.12.1替換為您要升級的PyODPS版本),獨享調度資源群組則通過營運助手執行相同命令進行升級。如果您的PyODPS任務需要訪問特殊的網路環境(如VPC網路或IDC網路中的資料來源或服務等),請使用Serverless調度資源群組,並參考網路連通解決方案打通Serverless資源群組與目標環境的網路連通。
PyODPS文法及更多資訊請參見PyODPS文檔。
PyODPS節點分為PyODPS 2和PyODPS 3兩種,二者的區別在於底層Python語言版本不同。PyODPS 2底層Python語言版本為Python 2,PyODPS 3底層Python語言版本為Python 3,請您根據實際使用的Python語言版本建立PyODPS節點。
若通過PyODPS節點執行SQL無法正常產生資料血緣關係,即資料血緣在資料地圖無法正常展示,您可在任務代碼處通過手動設定DataWorks調度啟動並執行相關參數解決。查看資料血緣,詳情請參見查看血緣資訊;參數設定,詳情請參見設定運行參數hints。任務運行時所需參數可參考如下代碼擷取。
import os ... # get DataWorks sheduler runtime parameters skynet_hints = {} for k, v in os.environ.items(): if k.startswith('SKYNET_'): skynet_hints[k] = v ... # setting hints while submiting a task o.execute_sql('INSERT OVERWRITE TABLE XXXX SELECT * FROM YYYY WHERE ***', hints=skynet_hints) ...PyODPS節點的輸出日誌最大支援4MB。建議您盡量避免在日誌中直接輸出大量的資料結果。相反,建議您多輸出警示日誌和正常進度的日誌,以提供更有價值的資訊。
使用限制
使用獨享調度資源群組執行PyODPS節點時,建議在節點內擷取到獨享資源群組本地處理的資料不超過50MB,該操作受限於獨享調度資源群組的規格,處理的本機資料過多並超出作業系統閾值時可能發生OOM(Got Killed)錯誤。請避免在PyODPS節點中寫入過多的資料處理代碼。詳情請參見高效使用PyODPS最佳實務。
使用Serverless資源群組執行PyODPS節點時,您可根據節點內需要處理的資料量合理配置PyODPS節點的CU。
說明運行該任務時,若使用Serverless資源群組,單任務支援最大配置
64CU,但建議不超過16CU,以避免CU過大導致資源不足,影響任務啟動。如果您發現有Got killed報錯,即表明記憶體使用量超限,進程被中止。因此,請盡量避免本地的資料操作。通過PyODPS發起的SQL和DataFrame任務(除to_pandas外)不受此限制。
非自訂函數代碼可以使用平台預裝的Numpy和Pandas。不支援其他帶有二進位代碼的第三方包。
由於相容性原因,在DataWorks中,
options.tunnel.use_instance_tunnel預設設定為False。如果需要全域開啟instance tunnel,需要手動將該值設定為True。當Python 3的子版本號碼不同(例如Python 3.8和Python 3.7)時,位元組碼的定義有所不同。
目前MaxCompute使用的Python 3版本為3.7,當使用其它版本Python 3中的部分文法(例如Python 3.8中的finally block)時,執行會報錯,建議您選擇Python 3.7。
PyODPS 3支援運行在Serverless資源群組上。如需購買使用,請參見使用Serverless資源群組。
不支援在PyODPS節點內配置多Python任務並發執行。
PyODPS節點列印日誌請使用
print,暫不支援使用logger.info。
準備工作
為DataWorks工作空間綁定MaxCompute計算資源。
操作步驟
在PyODPS 3節點編輯頁面,執行如下開發操作。
PyODPS 3程式碼範例
建立PyODPS節點後,您可以進行代碼編輯及運行,更多關於PyODPS文法說明,請參見基本操作概述。本文為您介紹以下五種程式碼範例,您可根據實際業務需要,選擇樣本內容。
ODPS入口
DataWorks的PyODPS節點中,將會包含一個全域的變數
odps或o,即ODPS入口,您無需手動定義ODPS入口。print(odps.exist_table('PyODPS_iris'))執行SQL
您可以在PyODPS節點中執行SQL,詳情請參見SQL。
DataWorks上預設未開啟
instance tunnel,即instance.open_reader預設使用Result介面(最多一萬條記錄)。您可以通過reader.count擷取記錄數。如果您需要迭代擷取全部資料,則需要關閉limit限制。您可以通過下列語句在全域範圍內開啟instance tunnel並關閉limit限制。options.tunnel.use_instance_tunnel = True options.tunnel.limit_instance_tunnel = False # 關閉limit限制,讀取全部資料。 with instance.open_reader() as reader: # 通過Instance Tunnel可讀取全部資料。您也可以通過在
open_reader上添加tunnel=True,實現僅對本次open_reader開啟instance tunnel。同時,您還可以添加limit=False,實現僅對本次關閉limit限制。# 本次open_reader使用Instance Tunnel介面,且能讀取全部資料。 with instance.open_reader(tunnel=True, limit=False) as reader:
設定運行參數
您可以通過設定
hints參數,來設定運行時的參數,參數類型是dict。 Hints參數的詳情請參見SET操作。o.execute_sql('select * from PyODPS_iris', hints={'odps.sql.mapper.split.size': 16})對全域配置設定
sql.settings後,每次運行時,都需要添加相關的運行時的參數。from odps import options options.sql.settings = {'odps.sql.mapper.split.size': 16} o.execute_sql('select * from PyODPS_iris') # 根據全域配置添加hints。
讀取運行結果
運行SQL的執行個體能夠直接執行
open_reader的操作,有以下兩種情況:SQL返回了結構化的資料。
with o.execute_sql('select * from dual').open_reader() as reader: for record in reader: # 處理每一個record。可能執行的是desc等SQL語句,通過
reader.raw屬性,擷取到原始的SQL執行結果。with o.execute_sql('desc dual').open_reader() as reader: print(reader.raw)說明如果使用了自訂調度參數,頁面上直接觸發運行PyODPS 3節點時,需要寫死時間,PyODPS節點無法直接替換。
DataFrame
您還可以通過DataFrame(不推薦使用)的方式處理資料。
執行
在DataWorks的環境裡,DataFrame的執行需要顯式調用立即執行的方法。
from odps.df import DataFrame iris = DataFrame(o.get_table('pyodps_iris')) for record in iris[iris.sepal_width < 3].execute(): # 調用立即執行的方法,處理每條Record。如果您需要在Print時調用立即執行,需要開啟
options.interactive。from odps import options from odps.df import DataFrame options.interactive = True # 在開始處開啟開關。 iris = DataFrame(o.get_table('pyodps_iris')) print(iris.sepal_width.sum()) # Print時會立即執行。列印詳細資料
通過設定
options.verbose選項。在DataWorks上,預設已經處於開啟狀態,運行過程會列印Logview等詳細資料。
PyODPS 3代碼開發
以下以一個簡單樣本為您介紹PyODPS節點的使用:
準備資料集,建立pyodps_iris樣本表,具體操作請參見Dataframe資料處理。
建立DataFrame,詳情請參見從MaxCompute表建立DataFrame。
在PyODPS節點中輸入以下代碼並運行。
from odps.df import DataFrame # 從ODPS表建立DataFrame。 iris = DataFrame(o.get_table('pyodps_iris')) print(iris.sepallength.head(5))
執行PyODPS任務
在調試配置的計算資源中,選擇配置計算資源、計算配額和DataWorks資源群組。
說明訪問公用網路或VPC網路環境的資料來源需要使用與資料來源測試連通性成功的調度資源群組。詳情請參見網路連通方案。
您可以根據任務情況選擇配置鏡像資訊。
在工具列的參數對話方塊中選擇已建立的MaxCompute資料來源,單擊運行PyODPS任務。
如需定期執行節點任務,請根據業務需求配置調度資訊。配置詳情請參見節點調度配置。
與DataWorks中的SQL節點不同,為了避免影響代碼,PyODPS節點不會在代碼中替換類似 ${param_name}的字串,而是在執行代碼前,在全域變數中增加一個名為
args的dict,調度參數可以在此擷取。例如,在參數中設定ds=${yyyymmdd},則可以通過以下方式在代碼中擷取該參數資訊。print('ds=' + args['ds']) ds=20240930說明如果您需要擷取名為
ds的分區,則可以使用如下方法。o.get_table('table_name').get_partition('ds=' + args['ds'])節點任務配置完成後,需對節點進行發布。詳情請參見節點/工作流程發布。
任務發布後,您可以在營運中心查看周期任務的運行情況。詳情請參見營運中心入門。
通過關聯角色運行節點
支援配置節點關聯角色,使用特定RAM角色運行節點任務,實現許可權的細粒度控制和安全管控。
後續步驟
PyODPS常見問題:您可瞭解PyODPS執行過程中的常見問題,便於出現異常時快速排查解決。