このトピックでは、PyODPS のシーケンス操作と実行操作を実行する方法について説明します。
前提条件
開始する前に、次の操作を完了してください:
MaxCompute プロジェクトを作成済みであること。
DataWorks ワークスペースを作成済みであること。このトピックでは、DataStudio のパブリックプレビューバージョンのワークスペースを例として使用します。
DataWorks でワークフローを作成済みであること。詳細については、「ワークフローの作成」をご参照ください。
手順
テストデータセットをダウンロードし、MaxCompute にインポートします。
Iris データセットをダウンロードして解凍し、iris.data の名前を iris.csv に変更します。
pyodps_iris という名前のテーブルを作成し、iris.csv データセットをアップロードします。詳細については、「テーブルの作成とデータのアップロード」をご参照ください。
次の文を使用してテーブルを作成します。
CREATE TABLE if not exists pyodps_iris ( sepallength DOUBLE comment 'がくの長さ (cm)', sepalwidth DOUBLE comment 'がくの幅 (cm)', petallength DOUBLE comment '花びらの長さ (cm)', petalwidth DOUBLE comment '花びらの幅 (cm)', name STRING comment '種類' );
DataWorks コンソールにログインします。左側のナビゲーションウィンドウで [ワークスペース] をクリックします。対象のワークスペースを見つけ、[アクション] 列の をクリックして DataStudio ページに移動します。
DataStudio ページで、ワークフローを右クリックし、 を選択します。ノードの名前を入力し、[確認] をクリックします。
PyODPS ノードの構成タブで、次のサンプルコードを入力します。
from odps import DataFrame iris = DataFrame(o.get_table('pyodps_iris')) # 列を取得します。 print iris.sepallength.head(5) print iris['sepallength'].head(5) # 列のデータ型を表示します。 print iris.sepallength.dtype # 列のデータ型を変更します。 iris.sepallength.astype('int') # 計算を実行します。 print iris.groupby('name').sepallength.max().head(5) print iris.sepallength.max() # 列の名前を変更します。 print iris.sepalwidth.rename('speal_width').head(5) # 簡単な列操作を実行します。 print (iris.sepallength + iris.sepalwidth).rename('sum_sepal').head(5)[実行] をクリックし、[実行ログ] タブで結果を表示します。
次の結果が返されます。
Executing user script with PyODPS 0.8.0 Try to fetch data from tunnel sepallength 0 4.9 1 4.7 2 4.6 3 5.0 4 5.4 Try to fetch data from tunnel sepallength 0 4.9 1 4.7 2 4.6 3 5.0 4 5.4 FLOAT64 Sql compiled: CREATE TABLE tmp_pyodps_ed78e3ba_f13c_4a49_812d_2790d57c25dd LIFECYCLE 1 AS SELECT MAX(t1.`sepallength`) AS `sepallength_max` FROM data_service_fr.`pyodps_iris` t1 GROUP BY t1.`name` sepallength_max 0 5.8 1 7.0 2 7.9 Collection: ref_0 odps.Table name: data_service_fr.`pyodps_iris` schema: sepallength : double # がくの長さ (cm) sepalwidth : double # がくの幅 (cm) petallength : double # 花びらの長さ (cm) petalwidth : double # 花びらの幅 (cm) name : string # 種 max = Max[float64] sepallength = Column[sequence(float64)] 'sepallength' from collection ref_0 Try to fetch data from tunnel speal_width 0 3.0 1 3.2 2 3.1 3 3.6 4 3.9 Sql compiled: CREATE TABLE tmp_pyodps_28120275_8d0f_4683_8318_302fa21459ac LIFECYCLE 1 AS SELECT t1.`sepallength` + t1.`sepalwidth` AS `sum_sepal` FROM data_service_fr.`pyodps_iris` t1 sum_sepal 0 7.9 1 7.9 2 7.7 3 8.6 4 9.3 2019-08-13 10:48:13 INFO ================================================================= 2019-08-13 10:48:13 INFO シェルコマンドの終了コード 0 2019-08-13 10:48:13 INFO --- シェルコマンドの呼び出しが完了しました --- 2019-08-13 10:48:13 INFO シェルは正常に実行されました!PyExecute という名前の別の PyODPS ノードを作成して実行します。
次のサンプルコードを使用します。
from odps import options from odps import DataFrame # ランタイムインスタンスの Logview URL を表示します。 options.verbose = True iris = DataFrame(o.get_table('pyodps_iris')) iris[iris.sepallength < 5].exclude('sepallength')[:5].execute() my_logs = [] def my_loggers(x): my_logs.append(x) options.verbose_log = my_loggers iris[iris.sepallength < 5].exclude('sepallength')[:5].execute() print(my_logs) # 中間コレクションの結果をキャッシュします。 cached = iris[iris.sepalwidth < 3.5].cache() print cached.head(3) # 非同期および並列実行を実行します。 from odps.df import Delay delay = Delay() # Delay オブジェクトを作成します。 df = iris[iris.sepalwidth < 5].cache() # 共通の依存関係が存在します。 future1 = df.sepalwidth.sum().execute(delay=delay) # システムはすぐに future オブジェクトを返しますが、実行は開始されません。 future2 = df.sepalwidth.mean().execute(delay=delay) future3 = df.sepalwidth.max().execute(delay=delay) delay.execute(n_parallel=3) print future1.result() print future2.result() print future3.result()次の結果が返されます。
Executing user script with PyODPS 0.8.0 Sql compiled: CREATE TABLE tmp_pyodps_4a204590_0510_4e9c_823b_5b837a437840 LIFECYCLE 1 AS SELECT t1.`sepalwidth`, t1.`petallength`, t1.`petalwidth`, t1.`name` FROM data_service_fr.`pyodps_iris` t1 WHERE t1.`sepallength` < 5 LIMIT 5 Instance ID: 20190813025233386g04djssa Log view: http://logview.odps.aliyun.com/logview/XXX ['Sql compiled:', 'CREATE TABLE tmp_pyodps_03b92c55_8442_4e61_8978_656495487b8a LIFECYCLE 1 AS \nSELECT t1.`sepalwidth`, t1.`petallength`, t1.`petalwidth`, t1.`name` \nFROM data_service_fr.`pyodps_iris` t1 \nWHERE t1.`sepallength` < 5 \nLIMIT 5', 'Instance ID: 20190813025236282gcsna5pr2', u' Log view: http://logview.odps.aliyun.com/logview/?h=http://service.odps.aliyun.com/api&XXX sepallength sepalwidth petallength petalwidth name 0 4.9 3.0 1.4 0.2 Iris-setosa 1 4.7 3.2 1.3 0.2 Iris-setosa 2 4.6 3.1 1.5 0.2 Iris-setosa 454.6 3.05100671141 4.4 2019-08-13 10:52:48 INFO ================================================================= 2019-08-13 10:52:48 INFO シェルコマンドの終了コード 0 2019-08-13 10:52:48 INFO --- シェルコマンドの呼び出しが完了しました --- 2019-08-13 10:52:48 INFO シェルは正常に実行されました! 2019-08-13 10:52:48 INFO 現在のタスクステータス: FINISH