すべてのプロダクト
Search
ドキュメントセンター

MaxCompute:実行して結果を取得する

最終更新日:Jan 07, 2025

このトピックでは、DataFrame操作に使用できる実行方法について説明します。

前提条件

次の要件が満たされていることを確認してください。

  • pyodps_irisという名前のサンプルテーブルが用意されています。 詳細については、「DataFrameデータ処理」をご参照ください。

  • DataFrameオブジェクトが作成されます。 詳細については、「DataFrameオブジェクトの作成」の「MaxComputeテーブルからのDataFrameオブジェクトの作成」セクションをご参照ください。

延期された実行

DataFrame操作は、executeメソッドを明示的に呼び出す場合、またはexecuteメソッドを内部的に呼び出すメソッドを呼び出す場合にのみ実行されます。 executeメソッドを内部で呼び出すメソッドを次の表に示します。

Method

説明

戻り値

persist

実行結果をMaxComputeテーブルに保存します。

PyODPS DataFrame

execute

操作を実行し、すべての結果を返します。

結果フレーム

head

操作を実行し、結果データの最初のN行を返します。

結果フレーム

tail

操作を実行し、結果データの最後のN行を返します。

結果フレーム

to_pandas

Collectionオブジェクトをpandas DataFrameオブジェクトに変換するか、SequenceオブジェクトをSeriesオブジェクトに変換します。 wrapパラメーターがTrueに設定されている場合、PyODPS DataFrameオブジェクトが返されます。

  • wrapパラメーターがTrueに設定されている場合、PyODPS DataFrameオブジェクトが返されます。

  • wrapパラメーターがFalseに設定されている場合、pandas DataFrameオブジェクトが返されます。 wrapパラメーターのデフォルト値はFalseです。

plot、hist、およびboxplot

プロット方法。

非該当

説明

インタラクティブ環境では、PyODPS DataFrameが結果データを表示するとき、またはreprメソッドを呼び出すときに、PyODPS DataFrameは自動的にexecuteメソッドを呼び出します。 executeメソッドを手動で呼び出す必要はありません。

# In a non-interactive environment, you need to manually call the execute method.
print(iris[iris.sepallength < 5][:5].execute())

# In an interactive environment, the system automatically calls the execute method.
print(iris[iris.sepallength < 5][:5])

次の応答が返されます。

   sepallength  sepalwidth  petallength  petalwidth         name
0          4.9         3.0          1.4         0.2  Iris-setosa
1          4.7         3.2          1.3         0.2  Iris-setosa
2          4.6         3.1          1.5         0.2  Iris-setosa
3          4.6         3.4          1.4         0.3  Iris-setosa
4          4.4         2.9          1.4         0.2  Iris-setosa

対話型環境でシステムがexecuteメソッドを自動的に呼び出すことを無効にする必要がある場合は、手動操作が必要です。 次のコードは例を示しています。

from odps import options
options.interactive = False

print(iris[iris.sepallength < 5][:5])

次の応答が返されます。

Collection: ref_0
  odps.Table
    name: hudi_mc_0612.`iris3`
    schema:
      sepallength           : double      # Sepal length (cm)
      sepalwidth            : double      # Sepal width (cm)
      petallength           : double      # Petal length (cm)
      petalwidth            : double      # Petal width (cm)
      name                  : string      # Type
Collection: ref_1
  Filter[collection]
    collection: ref_0
    predicate:
      Less[sequence(boolean)]
        sepallength = Column[sequence(float64)] 'sepallength' from collection ref_0
        Scalar[int8]
          5
Slice[collection]
  collection: ref_1
  stop:
    Scalar[int8]
      5

自動呼び出しを無効にすると、reprオブジェクトが表示されるときに抽象構文ツリー (AST) 全体が表示されます。 この場合、メソッドを使用する必要がある場合は、executeメソッドを手動で呼び出す必要があります。

実行結果の取得

executeメソッドまたはheadメソッドを呼び出した後にResultFrameが返された場合、ResultFrameから結果を取得できます。

説明

ResultFrameは結果セットであり、後続の計算では使用できません。

  • ResultFrameからすべてのレコードを繰り返し取得できます。 次のコードは例を示しています。

    result = iris.head(3)
    for r in result:
        print(list(r))

    次の応答が返されます。

    [4.9, 3.0, 1.4, 0.2, 'Iris-setosa']
    [4.7, 3.2, 1.3, 0.2, 'Iris-setosa']
    [4.6, 3.1, 1.5, 0.2, 'Iris-setosa']
  • pandasがインストールされている場合、ResultFrameはpandas DataFrameまたはpandasバックエンドを使用するPyODPS DataFrameに変換できます。

    # Return a pandas DataFrame. 
    pd_df = iris.head(3).to_pandas()
    
    # Return a PyODPS DataFrame that uses the pandas backend. 
    wrapped_df = iris.head(3).to_pandas(wrap=True)  

MaxComputeテーブルへの結果の保存

  • persistメソッドを呼び出して、Collectionオブジェクトの新しいDataFrameオブジェクトを返すことができます。 persistメソッドは、テーブル名をパラメーターとして使用します。

    iris2 = iris[iris.sepalwidth < 2.5].persist('pyodps_iris')
    print(iris2.head(5))

    次の応答が返されます。

       sepallength  sepalwidth  petallength  petalwidth             name
    0          4.5         2.3          1.3         0.3      Iris-setosa
    1          5.5         2.3          4.0         1.3  Iris-versicolor
    2          4.9         2.4          3.3         1.0  Iris-versicolor
    3          5.0         2.0          3.5         1.0  Iris-versicolor
    4          6.0         2.2          4.0         1.0  Iris-versicolor
  • persistメソッドでpartitionsパラメーターを指定して、パーティションテーブルを作成できます。 テーブルは、パーティションで指定された列に基づいてパーティション分割されます。

    iris3 = iris[iris.sepalwidth < 2.5].persist('pyodps_iris_test', partitions=['name'])
    print(iris3.data)

    次の応答が返されます。

    odps.Table
      name: odps_test_sqltask_finance.`pyodps_iris`
      schema:
        sepallength           : double
        sepalwidth            : double
        petallength           : double
        petalwidth            : double
      partitions:
        name                  : string
  • 既存のテーブルのパーティションにデータを書き込むには、persistメソッドでpartitionパラメーターを指定します。 partitionパラメーターは、データが書き込まれるパーティションを指定します。 たとえば、パーティションパラメーターをds=****** に設定します。 テーブルには、DataFrameオブジェクトのすべての列が含まれている必要があります。 drop_partitionおよびcreate_partitionパラメーターは、partitionパラメーターが指定されている場合にのみ有効です。 drop_partitionパラメーターは、パーティションが存在する場合に指定されたパーティションを削除するかどうかを指定します。 create_partitionパラメーターは、パーティションが存在しない場合に指定されたパーティションを作成するかどうかを指定します。

    print(iris[iris.sepalwidth < 2.5].persist('pyodps_iris_partition', partition='ds=test', drop_partition=True, create_partition=True).head(5))

    次の応答が返されます。

       sepallength  sepalwidth  petallength  petalwidth             name    ds
    0          4.5         2.3          1.3         0.3      Iris-setosa  test
    1          5.5         2.3          4.0         1.3  Iris-versicolor  test
    2          4.9         2.4          3.3         1.0  Iris-versicolor  test
    3          5.0         2.0          3.5         1.0  Iris-versicolor  test
    4          6.0         2.2          4.0         1.0  Iris-versicolor  test
  • テーブルにデータを書き込むときに、テーブルの有効期間 (TTL) を指定できます。 たとえば、次のステートメントでは、テーブルのTTLを10日に設定します。

    print(iris[iris.sepalwidth < 2.5].persist('pyodps_iris', lifecycle=10).head(5))

    次の応答が返されます。

       sepallength  sepalwidth  petallength  petalwidth             name
    0          4.5         2.3          1.3         0.3      Iris-setosa
    1          5.5         2.3          4.0         1.3  Iris-versicolor
    2          4.9         2.4          3.3         1.0  Iris-versicolor
    3          5.0         2.0          3.5         1.0  Iris-versicolor
    4          6.0         2.2          4.0         1.0  Iris-versicolor
  • データソースにMaxComputeオブジェクトが含まれておらず、pandasオブジェクトのみが含まれている場合、persistメソッドを呼び出すときに、MaxComputeエントランスオブジェクトを手動で指定するか、エントランスオブジェクトをグローバルオブジェクトとしてマークする必要があります。

    # The entrance object is o. 
    # Specify the entrance object. 
    df.persist('table_name', odps=o)
    # Alternative operation: Mark the entrance object as a global object. 
    o.to_global()
    df.persist('table_name')

結果をpandas DataFrameに保存する

to_pandasメソッドを呼び出して、結果をpandas DataFrameオブジェクトに保存できます。 wrapパラメーターがTrueに設定されている場合、PyODPS DataFrameオブジェクトが返されます。

  • 例1: to_pandasメソッドを呼び出して、pandas DataFrameオブジェクトを返します。

    print(type(iris[iris.sepalwidth < 2.5].to_pandas()))

    次の応答が返されます。

    <class 'pandas.core.frame.DataFrame'>
  • 例2: PyODPS DataFrameオブジェクトを返すには、wrapパラメーターをTrueに設定します。

    print(type(iris[iris.sepalwidth < 2.5].to_pandas(wrap=True)))

    次の応答が返されます。

    <class 'odps.df.core.DataFrame'>
説明

PyODPSでopen_readerメソッドを呼び出し、reader.to_pandas() を使用して結果をpandas DataFrameオブジェクトに変換できます。 詳細については、「テーブル」をご参照ください。

ランタイムパラメーターの設定

executepersistto_pandasなど、すぐに実行されるメソッドのランタイムパラメーターを設定できます。 この設定は、MaxCompute SQLバックエンドに対してのみ有効です。

  • グローバルパラメーターを設定します。 詳細は、「SQL」をご参照ください。

  • これらのメソッドでhintsパラメーターを指定します。 これにより、指定されたランタイムパラメータが現在の計算に対してのみ有効になります。

    print(iris[iris.sepallength < 5].to_pandas(hints={'odps.sql.mapper.split.size': 16}))

    次の応答が返されます。

       sepallength  sepalwidth  petallength  petalwidth             name
    0          4.5         2.3          1.3         0.3      Iris-setosa
    1          4.9         2.4          3.3         1.0  Iris-versicolor

実行時に詳細を表示する

  • 実行時にインスタンスのLogView情報を表示するには、グローバル設定を変更する必要があります。 次のコードは例を示しています。

    from odps import options
    options.verbose = True
    
    print(iris[iris.sepallength < 5].exclude('sepallength')[:5].execute())

    次の応答が返されます。

    Sql compiled:
    SELECT t1.`sepalwidth`, t1.`petallength`, t1.`petalwidth`, t1.`name`
    FROM odps_test_sqltask_finance.`pyodps_iris` t1
    WHERE t1.`sepallength` < 5
    LIMIT 5
    Instance ID:
      Log view:http://logview
      
       sepalwidth  petallength  petalwidth             name
    0         2.3          1.3         0.3      Iris-setosa
    1         2.4          3.3         1.0  Iris-versicolor
  • ログ機能を指定できます。 次のコードは例を示しています。

    my_logs = []
    def my_logger(x):
        my_logs.append(x)
    options.verbose_log = my_logger
    print(iris[iris.sepallength < 5].exclude('sepallength')[:5].execute())
    
    print(my_logs)

    次の応答が返されます。

       sepalwidth  petallength  petalwidth             name
    0         2.3          1.3         0.3      Iris-setosa
    1         2.4          3.3         1.0  Iris-versicolor
    
    ['Sql compiled:', 'CREATE TABLE tmp_pyodps_24332bdb_4fd0_4d0d_aed4_38a443618268 LIFECYCLE 1 AS \nSELECT t1.`sepalwidth`, t1.`petallength`, t1.`petalwidth`, t1.`name` \nFROM odps_test_sqltask_finance.`pyodps_iris` t1 \nWHERE t1.`sepallength` < 5 \nLIMIT 5', 'Instance ID: 20230815034706122gbymevg*****', '  Log view:]

Collectionオブジェクトの中間計算結果のキャッシュ

DataFrameの計算プロセス中に、いくつかのCollectionオブジェクトが複数回使用されます。 中間プロセスの実行結果を表示するには、cacheメソッドを呼び出して、最初に計算するコレクションオブジェクトをマークします。 次のコードは例を示しています。

説明

cacheメソッドの実行は延期されます。 cacheメソッドが呼び出された直後に自動計算はトリガーされません。

cached = iris[iris.sepalwidth < 3.5]['sepallength', 'name'].cache()
df = cached.head(3)
print(df)

# The following result is returned:
   sepallength             name
0          4.5      Iris-setosa
1          5.5  Iris-versicolor
2          4.9  Iris-versicolor

# You can immediately retrieve the calculation result because cached is calculated. 
print(cached.head(3))

# The following result is returned:
   sepallength             name
0          4.5      Iris-setosa
1          5.5  Iris-versicolor
2          4.9  Iris-versicolor

非同期および並列実行

非同期実行

PyODPS DataFrameは非同期実行をサポートします。 asyncパラメーターを指定して、executepersistheadtail、およびto_pandasのすぐに実行されるメソッドの非同期実行を有効にできます。 timeoutパラメーターは、タイムアウト期間を指定します。 非同期操作では、Futureオブジェクトが返されます。

future = iris[iris.sepalwidth < 10].head(10, async_=True)
print(future.result())

# The following result is returned:
   sepallength  sepalwidth  petallength  petalwidth             name
0          4.5         2.3          1.3         0.3      Iris-setosa
1          5.5         2.3          4.0         1.3  Iris-versicolor
2          4.9         2.4          3.3         1.0  Iris-versicolor
3          5.0         2.0          3.5         1.0  Iris-versicolor
4          6.0         2.2          4.0         1.0  Iris-versicolor
5          6.2         2.2          4.5         1.5  Iris-versicolor
6          5.5         2.4          3.8         1.1  Iris-versicolor
7          5.5         2.4          3.7         1.0  Iris-versicolor
8          6.3         2.3          4.4         1.3  Iris-versicolor
9          5.0         2.3          3.3         1.0  Iris-versicolor

並列実行

新しく導入されたDelay API操作を呼び出して、すぐに実行される次のメソッドを延期できます。executepersistheadtailto_pandas。 次に、Futureオブジェクトが返されます。 Delay API操作が呼び出されると、システムは依存関係を見つけ、指定された同時実行性に基づいてメソッドを実行します。 この場合、非同期実行がサポートされます。

from odps.df import Delay
delay = Delay()  # Create a Delay object. 

df = iris[iris.sepal_width < 5].cache()  # Common dependency of subsequent expressions. 
future1 = df.sepal_width.sum().execute(delay=delay)  # Return a Future object. The execution is not started. 
future2 = df.sepal_width.mean().execute(delay=delay)
future3 = df.sepal_length.max().execute(delay=delay)
delay.execute(n_parallel=3)  # The execution starts with three concurrent threads. 
|==========================================|   1 /  1  (100.00%)        21s
print(future1.result())

# The following result is returned:
25.0
 
print(future2.result())

# The following result is returned:
2.272727272727273

上記の例では、PyODPS DataFrameは最初に共有依存関係のオブジェクトを実行します。 次に、PyODPS DataFrameは同時実行性を3に設定し、future1からfuture3までのオブジェクトを実行します。

delay.exe cuteasyncパラメーターを指定して、非同期実行を有効にするかどうかを指定できます。 非同期実行が有効になっている場合は、timeoutパラメーターを使用してタイムアウト期間を指定することもできます。