このトピックでは、PyODPS でシーケンス操作と実行操作を実行する方法について説明します。
操作手順
MaxCompute プロジェクトを作成済みであること。
DataWorks ワークスペースを作成済みであることを確認します。このトピックでは、DataStudio がパブリックプレビュー中のワークスペースを例として使用します。
DataWorks に
pyodps_irisという名前のテーブルを作成します。DataWorks コンソールにログインし、左上のコーナーでリージョンを選択します。
ワークスペース ページで、対象のワークスペースを見つけ、アクション 列で を選択します。
[デバッグ設定] ページで、[コンピューティングリソース] と [リソースグループ] を選択します。
リソースグループが表示されない場合は、[リソースグループの作成] をクリックし、リソースグループが作成されるまで数分待ちます。リソースグループ ページで、ワークスペースをリソースグループにアタッチします。
MaxCompute SQL ノードで次の文を実行して、
pyodps_irisテーブルを作成します。CREATE TABLE if not exists pyodps_iris ( sepallength DOUBLE comment 'がくの長さ (cm)', sepalwidth DOUBLE comment 'がくの幅 (cm)', petallength DOUBLE comment '花びらの長さ (cm)', petalwidth DOUBLE comment '花びらの幅 (cm)', name STRING comment '種' );
テストデータセットをダウンロードし、MaxCompute にインポートします。
「アヤメ」データセットをダウンロードして解凍し、
iris.dataファイルの名前をiris.csvに変更します。DataWorks コンソールにログインし、左上のコーナーでリージョンを選択します。
左側のナビゲーションウィンドウで、 を選択します。
入力 データのアップロードとダウンロード をクリックします。
左側のナビゲーションウィンドウで、アップロードアイコン
をクリックし、[データのアップロード] をクリックします。
DataStudio ページで、新しい MaxCompute の [PyODPS 2] ノードを作成します。次のサンプルコードを入力し、[実行] をクリックします。
from odps import DataFrame iris = DataFrame(o.get_table('iristable_new')) # 列を取得します。 print iris.sepallength.head(5) print iris['sepallength'].head(5) # 列のデータ型を表示します。 print iris.sepallength.dtype # 列のデータ型を変更します。 iris.sepallength.astype('int') # 計算を実行します。 print iris.groupby('name').sepallength.max().head(5) print iris.sepallength.max() # 列の名前を変更します。 print iris.sepalwidth.rename('speal_width').head(5) # 簡単な列操作を実行します。 print (iris.sepallength + iris.sepalwidth).rename('sum_sepal').head(5)PyExecute という名前の PyODPS ノードを作成し、次のコードで実行します。
from odps import options from odps import DataFrame # ランタイムインスタンスの Logview URL を表示します。 options.verbose = True iris = DataFrame(o.get_table('pyodps_iris')) iris[iris.sepallength < 5].exclude('sepallength')[:5].execute() my_logs = [] def my_loggers(x): my_logs.append(x) options.verbose_log = my_loggers iris[iris.sepallength < 5].exclude('sepallength')[:5].execute() print(my_logs) # 中間の Collection 結果をキャッシュします。 cached = iris[iris.sepalwidth < 3.5].cache() print cached.head(3) # 非同期および並列実行を実行します。 from odps.df import Delay delay = Delay() # Delay オブジェクトを作成します。 df = iris[iris.sepalwidth < 5].cache() # 共通の依存関係が存在します。 future1 = df.sepalwidth.sum().execute(delay=delay) # システムはすぐに future オブジェクトを返しますが、実行は開始されません。 future2 = df.sepalwidth.mean().execute(delay=delay) future3 = df.sepalwidth.max().execute(delay=delay) delay.execute(n_parallel=3) print future1.result() print future2.result() print future3.result()