すべてのプロダクト
Search
ドキュメントセンター

MaxCompute:PyODPS のシーケンス操作と実行操作

最終更新日:Nov 09, 2025

このトピックでは、PyODPS のシーケンス操作と実行操作を実行する方法について説明します。

前提条件

開始する前に、次の操作を完了してください:

手順

  1. テストデータセットをダウンロードし、MaxCompute にインポートします。

    1. Iris データセットをダウンロードして解凍し、iris.data の名前を iris.csv に変更します。

    2. pyodps_iris という名前のテーブルを作成し、iris.csv データセットをアップロードします。詳細については、「テーブルの作成とデータのアップロード」をご参照ください。

      次の文を使用してテーブルを作成します。

      CREATE TABLE if not exists pyodps_iris
      (
      sepallength  DOUBLE comment 'がくの長さ (cm)',
      sepalwidth   DOUBLE comment 'がくの幅 (cm)',
      petallength  DOUBLE comment '花びらの長さ (cm)',
      petalwidth   DOUBLE comment '花びらの幅 (cm)',
      name         STRING comment '種類'
      );
  2. DataWorks コンソールにログインします。左側のナビゲーションウィンドウで [ワークスペース] をクリックします。対象のワークスペースを見つけ、[アクション] 列の [クイックアクセス] > [DataStudio] をクリックして DataStudio ページに移動します。

  3. DataStudio ページで、ワークフローを右クリックし、[ノードの作成] > [MaxCompute] > [PyODPS 2] を選択します。ノードの名前を入力し、[確認] をクリックします。

  4. PyODPS ノードの構成タブで、次のサンプルコードを入力します。

    from odps import DataFrame
    iris = DataFrame(o.get_table('pyodps_iris'))
    
    # 列を取得します。
    print iris.sepallength.head(5)
    
    print iris['sepallength'].head(5)
    
    # 列のデータ型を表示します。
    print iris.sepallength.dtype
    
    # 列のデータ型を変更します。
    iris.sepallength.astype('int')
    
    # 計算を実行します。
    print iris.groupby('name').sepallength.max().head(5)
    
    print iris.sepallength.max()
    
    # 列の名前を変更します。
    print iris.sepalwidth.rename('speal_width').head(5)
    
    # 簡単な列操作を実行します。
    print (iris.sepallength + iris.sepalwidth).rename('sum_sepal').head(5)
  5. [実行] をクリックし、[実行ログ] タブで結果を表示します。

    次の結果が返されます。

    Executing user script with PyODPS 0.8.0
    Try to fetch data from tunnel
    
       sepallength
    0          4.9
    1          4.7
    2          4.6
    3          5.0
    4          5.4
    Try to fetch data from tunnel
       sepallength
    0          4.9
    1          4.7
    2          4.6
    3          5.0
    4          5.4
    FLOAT64
    Sql compiled:
    CREATE TABLE tmp_pyodps_ed78e3ba_f13c_4a49_812d_2790d57c25dd LIFECYCLE 1 AS
    SELECT MAX(t1.`sepallength`) AS `sepallength_max`
    FROM data_service_fr.`pyodps_iris` t1
    GROUP BY t1.`name`
    
       sepallength_max
    0              5.8
    1              7.0
    2              7.9
    Collection: ref_0
      odps.Table
        name: data_service_fr.`pyodps_iris`
        schema:
          sepallength           : double      # がくの長さ (cm)
          sepalwidth            : double      # がくの幅 (cm)
          petallength           : double      # 花びらの長さ (cm)
          petalwidth            : double      # 花びらの幅 (cm)
          name                  : string      # 種
    max = Max[float64]
      sepallength = Column[sequence(float64)] 'sepallength' from collection ref_0
    Try to fetch data from tunnel
       speal_width
    0          3.0
    1          3.2
    2          3.1
    3          3.6
    4          3.9
    Sql compiled:
    CREATE TABLE tmp_pyodps_28120275_8d0f_4683_8318_302fa21459ac LIFECYCLE 1 AS
    SELECT t1.`sepallength` + t1.`sepalwidth` AS `sum_sepal`
    FROM data_service_fr.`pyodps_iris` t1
    
       sum_sepal
    0        7.9
    1        7.9
    2        7.7
    3        8.6
    4        9.3
    2019-08-13 10:48:13 INFO =================================================================
    2019-08-13 10:48:13 INFO シェルコマンドの終了コード 0
    2019-08-13 10:48:13 INFO --- シェルコマンドの呼び出しが完了しました ---
    2019-08-13 10:48:13 INFO シェルは正常に実行されました!
  6. PyExecute という名前の別の PyODPS ノードを作成して実行します。

    次のサンプルコードを使用します。

    from odps import options
    from odps import DataFrame
    
    # ランタイムインスタンスの Logview URL を表示します。
    options.verbose = True
    iris = DataFrame(o.get_table('pyodps_iris'))
    iris[iris.sepallength < 5].exclude('sepallength')[:5].execute()
    
    my_logs = []
    def my_loggers(x):
        my_logs.append(x)
    
    options.verbose_log = my_loggers
    
    iris[iris.sepallength < 5].exclude('sepallength')[:5].execute()
    
    print(my_logs)
    
    # 中間コレクションの結果をキャッシュします。
    cached = iris[iris.sepalwidth < 3.5].cache()
    print cached.head(3)
    
    # 非同期および並列実行を実行します。
    from odps.df import Delay
    delay = Delay() # Delay オブジェクトを作成します。
    df = iris[iris.sepalwidth < 5].cache()  # 共通の依存関係が存在します。
    future1 = df.sepalwidth.sum().execute(delay=delay) # システムはすぐに future オブジェクトを返しますが、実行は開始されません。
    future2 = df.sepalwidth.mean().execute(delay=delay)
    future3 = df.sepalwidth.max().execute(delay=delay)
    
    delay.execute(n_parallel=3)
    
    print future1.result()
    print future2.result()
    print future3.result()

    次の結果が返されます。

    Executing user script with PyODPS 0.8.0
    Sql compiled:
    CREATE TABLE tmp_pyodps_4a204590_0510_4e9c_823b_5b837a437840 LIFECYCLE 1 AS
    SELECT t1.`sepalwidth`, t1.`petallength`, t1.`petalwidth`, t1.`name`
    FROM data_service_fr.`pyodps_iris` t1
    WHERE t1.`sepallength` < 5
    LIMIT 5
    Instance ID: 20190813025233386g04djssa
      Log view: http://logview.odps.aliyun.com/logview/XXX
    ['Sql compiled:', 'CREATE TABLE tmp_pyodps_03b92c55_8442_4e61_8978_656495487b8a LIFECYCLE 1 AS \nSELECT t1.`sepalwidth`, t1.`petallength`, t1.`petalwidth`, t1.`name` \nFROM data_service_fr.`pyodps_iris` t1 \nWHERE t1.`sepallength` < 5 \nLIMIT 5', 'Instance ID: 20190813025236282gcsna5pr2', u'  
    Log view: http://logview.odps.aliyun.com/logview/?h=http://service.odps.aliyun.com/api&XXX
    
       sepallength  sepalwidth  petallength  petalwidth         name
    0          4.9         3.0          1.4         0.2  Iris-setosa
    1          4.7         3.2          1.3         0.2  Iris-setosa
    2          4.6         3.1          1.5         0.2  Iris-setosa
    
    454.6
    3.05100671141
    4.4
    2019-08-13 10:52:48 INFO =================================================================
    2019-08-13 10:52:48 INFO シェルコマンドの終了コード 0
    2019-08-13 10:52:48 INFO --- シェルコマンドの呼び出しが完了しました ---
    2019-08-13 10:52:48 INFO シェルは正常に実行されました!
    2019-08-13 10:52:48 INFO 現在のタスクステータス: FINISH