本文介紹如何使用PyODPS的Sequence及執行操作。
操作步驟
-
已建立DataWorks工作空間。本文以參加資料開發(Data Studio)公測的新版工作空間為例。
-
在DataWorks中建立表
pyodps_iris。-
登入DataWorks控制台,在左上方選擇地區。
-
在工作空間列表頁面,單擊目標工作空間對應的操作列。
-
在運行配置頁面,選擇计算资源和資源組。
如果顯示沒有資源群組,點擊建立資源群組後需要等待幾分鐘建立完成。並在資源群組列表中,為該資源群組綁定使用工作空間。
-
在MaxCompute SQL節點檔案中,按照如下語句建立表
pyodps_iris。CREATE TABLE if not exists pyodps_iris ( sepallength DOUBLE comment '片長度(cm)', sepalwidth DOUBLE comment '片寬度(cm)', petallength DOUBLE comment '瓣長度(cm)', petalwidth DOUBLE comment '瓣寬度(cm)', name STRING comment '種類' );
-
-
下載測試資料集並匯入MaxCompute。
-
下載並解壓鳶尾花資料集,將
iris.data重新命名為iris.csv。 -
登入DataWorks控制台,在左上方選擇地區。
-
在左側導覽列選擇。
-
單擊進入數據上傳與下載。
-
在左側導覽列單擊上傳表徵圖
,單擊数据上传。
-
-
在Data Studio頁面中建立MaxCompute PyODPS 2節點。輸入如下範例程式碼,單擊運行。
from odps import DataFrame iris = DataFrame(o.get_table('iristable_new')) #擷取列。 print iris.sepallength.head(5) print iris['sepallength'].head(5) #查看列的類型。 print iris.sepallength.dtype #修改列的類型。 iris.sepallength.astype('int') #計算。 print iris.groupby('name').sepallength.max().head(5) print iris.sepallength.max() #重新命名列。 print iris.sepalwidth.rename('speal_width').head(5) #簡單的列變化。 print (iris.sepallength + iris.sepalwidth).rename('sum_sepal').head(5) -
建立並運行PyODPS節點PyExecute。代碼如下:
from odps import options from odps import DataFrame #查看運行時的instance的logview。 options.verbose = True iris = DataFrame(o.get_table('pyodps_iris')) iris[iris.sepallength < 5].exclude('sepallength')[:5].execute() my_logs = [] def my_loggers(x): my_logs.append(x) options.verbose_log = my_loggers iris[iris.sepallength < 5].exclude('sepallength')[:5].execute() print(my_logs) #緩衝中間Collection結果。 cached = iris[iris.sepalwidth < 3.5].cache() print cached.head(3) #非同步和並存執行。 from odps.df import Delay delay = Delay() #建立Delay對象。 df = iris[iris.sepalwidth < 5].cache() #有一個共同的依賴。 future1 = df.sepalwidth.sum().execute(delay=delay) #立即返回future對象,此時並沒有執行。 future2 = df.sepalwidth.mean().execute(delay=delay) future3 = df.sepalwidth.max().execute(delay=delay) delay.execute(n_parallel=3) print future1.result() print future2.result() print future3.result()