すべてのプロダクト
Search
ドキュメントセンター

MaxCompute:PyODPS のシーケンス操作と実行操作

最終更新日:Mar 01, 2026

このトピックでは、PyODPS でシーケンス操作と実行操作を実行する方法について説明します。

操作手順

  1. MaxCompute プロジェクトを作成済みであること。

  2. DataWorks ワークスペースを作成済みであることを確認します。このトピックでは、DataStudio がパブリックプレビュー中のワークスペースを例として使用します。

  3. DataWorks に pyodps_iris という名前のテーブルを作成します。

    1. DataWorks コンソールにログインし、左上のコーナーでリージョンを選択します。

    2. ワークスペース ページで、対象のワークスペースを見つけ、アクション 列で ショートカット > [DataStudio] を選択します。

    3. [デバッグ設定] ページで、[コンピューティングリソース][リソースグループ] を選択します。

      リソースグループが表示されない場合は、[リソースグループの作成] をクリックし、リソースグループが作成されるまで数分待ちます。リソースグループ ページで、ワークスペースをリソースグループにアタッチします。

    4. MaxCompute SQL ノードで次の文を実行して、pyodps_iris テーブルを作成します。

      CREATE TABLE if not exists pyodps_iris
      (
      sepallength  DOUBLE comment 'がくの長さ (cm)',
      sepalwidth   DOUBLE comment 'がくの幅 (cm)',
      petallength  DOUBLE comment '花びらの長さ (cm)',
      petalwidth   DOUBLE comment '花びらの幅 (cm)',
      name         STRING comment '種'
      );
  4. テストデータセットをダウンロードし、MaxCompute にインポートします。

    1. 「アヤメ」データセットをダウンロードして解凍し、iris.data ファイルの名前を iris.csv に変更します。

    2. DataWorks コンソールにログインし、左上のコーナーでリージョンを選択します。

    3. 左側のナビゲーションウィンドウで、データ統合 > データのアップロードとダウンロード を選択します。

    4. 入力 データのアップロードとダウンロード をクリックします。

    5. 左側のナビゲーションウィンドウで、アップロードアイコン image をクリックし、[データのアップロード] をクリックします。

  5. DataStudio ページで、新しい MaxCompute の [PyODPS 2] ノードを作成します。次のサンプルコードを入力し、[実行] をクリックします。

    from odps import DataFrame
    iris = DataFrame(o.get_table('iristable_new'))
    
    # 列を取得します。
    print iris.sepallength.head(5)
    
    print iris['sepallength'].head(5)
    
    # 列のデータ型を表示します。
    print iris.sepallength.dtype
    
    # 列のデータ型を変更します。
    iris.sepallength.astype('int')
    
    # 計算を実行します。
    print iris.groupby('name').sepallength.max().head(5)
    
    print iris.sepallength.max()
    
    # 列の名前を変更します。
    print iris.sepalwidth.rename('speal_width').head(5)
    
    # 簡単な列操作を実行します。
    print (iris.sepallength + iris.sepalwidth).rename('sum_sepal').head(5)
  6. PyExecute という名前の PyODPS ノードを作成し、次のコードで実行します。

    from odps import options
    from odps import DataFrame
    
    # ランタイムインスタンスの Logview URL を表示します。
    options.verbose = True
    iris = DataFrame(o.get_table('pyodps_iris'))
    iris[iris.sepallength < 5].exclude('sepallength')[:5].execute()
    
    my_logs = []
    def my_loggers(x):
      my_logs.append(x)
    
    options.verbose_log = my_loggers
    
    iris[iris.sepallength < 5].exclude('sepallength')[:5].execute()
    
    print(my_logs)
    
    # 中間の Collection 結果をキャッシュします。
    cached = iris[iris.sepalwidth < 3.5].cache()
    print cached.head(3)
    
    # 非同期および並列実行を実行します。
    from odps.df import Delay
    delay = Delay() # Delay オブジェクトを作成します。
    df = iris[iris.sepalwidth < 5].cache()  # 共通の依存関係が存在します。
    future1 = df.sepalwidth.sum().execute(delay=delay) # システムはすぐに future オブジェクトを返しますが、実行は開始されません。
    future2 = df.sepalwidth.mean().execute(delay=delay)
    future3 = df.sepalwidth.max().execute(delay=delay)
    
    delay.execute(n_parallel=3)
    
    print future1.result()
    print future2.result()
    print future3.result()