このトピックでは、DataFrame操作に使用できる実行方法について説明します。
前提条件
次の要件が満たされていることを確認してください。
pyodps_irisという名前のサンプルテーブルが用意されています。 詳細については、「DataFrameデータ処理」をご参照ください。
DataFrameオブジェクトが作成されます。 詳細については、「DataFrameオブジェクトの作成」の「MaxComputeテーブルからのDataFrameオブジェクトの作成」セクションをご参照ください。
延期された実行
DataFrame操作は、executeメソッドを明示的に呼び出す場合、またはexecuteメソッドを内部的に呼び出すメソッドを呼び出す場合にのみ実行されます。 executeメソッドを内部で呼び出すメソッドを次の表に示します。
Method | 説明 | 戻り値 |
persist | 実行結果をMaxComputeテーブルに保存します。 | PyODPS DataFrame |
execute | 操作を実行し、すべての結果を返します。 | 結果フレーム |
head | 操作を実行し、結果データの最初のN行を返します。 | 結果フレーム |
tail | 操作を実行し、結果データの最後のN行を返します。 | 結果フレーム |
to_pandas | Collectionオブジェクトをpandas DataFrameオブジェクトに変換するか、SequenceオブジェクトをSeriesオブジェクトに変換します。 wrapパラメーターがTrueに設定されている場合、PyODPS DataFrameオブジェクトが返されます。 |
|
plot、hist、およびboxplot | プロット方法。 | 非該当 |
インタラクティブ環境では、PyODPS DataFrameが結果データを表示するとき、またはreprメソッドを呼び出すときに、PyODPS DataFrameは自動的にexecuteメソッドを呼び出します。 executeメソッドを手動で呼び出す必要はありません。
例
# In a non-interactive environment, you need to manually call the execute method.
print(iris[iris.sepallength < 5][:5].execute())
# In an interactive environment, the system automatically calls the execute method.
print(iris[iris.sepallength < 5][:5])次の応答が返されます。
sepallength sepalwidth petallength petalwidth name
0 4.9 3.0 1.4 0.2 Iris-setosa
1 4.7 3.2 1.3 0.2 Iris-setosa
2 4.6 3.1 1.5 0.2 Iris-setosa
3 4.6 3.4 1.4 0.3 Iris-setosa
4 4.4 2.9 1.4 0.2 Iris-setosa対話型環境でシステムがexecuteメソッドを自動的に呼び出すことを無効にする必要がある場合は、手動操作が必要です。 次のコードは例を示しています。
from odps import options
options.interactive = False
print(iris[iris.sepallength < 5][:5])次の応答が返されます。
Collection: ref_0
odps.Table
name: hudi_mc_0612.`iris3`
schema:
sepallength : double # Sepal length (cm)
sepalwidth : double # Sepal width (cm)
petallength : double # Petal length (cm)
petalwidth : double # Petal width (cm)
name : string # Type
Collection: ref_1
Filter[collection]
collection: ref_0
predicate:
Less[sequence(boolean)]
sepallength = Column[sequence(float64)] 'sepallength' from collection ref_0
Scalar[int8]
5
Slice[collection]
collection: ref_1
stop:
Scalar[int8]
5自動呼び出しを無効にすると、reprオブジェクトが表示されるときに抽象構文ツリー (AST) 全体が表示されます。 この場合、メソッドを使用する必要がある場合は、executeメソッドを手動で呼び出す必要があります。
実行結果の取得
executeメソッドまたはheadメソッドを呼び出した後にResultFrameが返された場合、ResultFrameから結果を取得できます。
ResultFrameは結果セットであり、後続の計算では使用できません。
ResultFrameからすべてのレコードを繰り返し取得できます。 次のコードは例を示しています。
result = iris.head(3) for r in result: print(list(r))次の応答が返されます。
[4.9, 3.0, 1.4, 0.2, 'Iris-setosa'] [4.7, 3.2, 1.3, 0.2, 'Iris-setosa'] [4.6, 3.1, 1.5, 0.2, 'Iris-setosa']pandasがインストールされている場合、ResultFrameはpandas DataFrameまたはpandasバックエンドを使用するPyODPS DataFrameに変換できます。
# Return a pandas DataFrame. pd_df = iris.head(3).to_pandas() # Return a PyODPS DataFrame that uses the pandas backend. wrapped_df = iris.head(3).to_pandas(wrap=True)
MaxComputeテーブルへの結果の保存
persistメソッドを呼び出して、Collectionオブジェクトの新しいDataFrameオブジェクトを返すことができます。 persistメソッドは、テーブル名をパラメーターとして使用します。iris2 = iris[iris.sepalwidth < 2.5].persist('pyodps_iris') print(iris2.head(5))次の応答が返されます。
sepallength sepalwidth petallength petalwidth name 0 4.5 2.3 1.3 0.3 Iris-setosa 1 5.5 2.3 4.0 1.3 Iris-versicolor 2 4.9 2.4 3.3 1.0 Iris-versicolor 3 5.0 2.0 3.5 1.0 Iris-versicolor 4 6.0 2.2 4.0 1.0 Iris-versicolorpersistメソッドでpartitionsパラメーターを指定して、パーティションテーブルを作成できます。 テーブルは、パーティションで指定された列に基づいてパーティション分割されます。iris3 = iris[iris.sepalwidth < 2.5].persist('pyodps_iris_test', partitions=['name']) print(iris3.data)次の応答が返されます。
odps.Table name: odps_test_sqltask_finance.`pyodps_iris` schema: sepallength : double sepalwidth : double petallength : double petalwidth : double partitions: name : string既存のテーブルのパーティションにデータを書き込むには、
persistメソッドでpartitionパラメーターを指定します。 partitionパラメーターは、データが書き込まれるパーティションを指定します。 たとえば、パーティションパラメーターをds=******に設定します。 テーブルには、DataFrameオブジェクトのすべての列が含まれている必要があります。drop_partitionおよびcreate_partitionパラメーターは、partitionパラメーターが指定されている場合にのみ有効です。 drop_partitionパラメーターは、パーティションが存在する場合に指定されたパーティションを削除するかどうかを指定します。 create_partitionパラメーターは、パーティションが存在しない場合に指定されたパーティションを作成するかどうかを指定します。print(iris[iris.sepalwidth < 2.5].persist('pyodps_iris_partition', partition='ds=test', drop_partition=True, create_partition=True).head(5))次の応答が返されます。
sepallength sepalwidth petallength petalwidth name ds 0 4.5 2.3 1.3 0.3 Iris-setosa test 1 5.5 2.3 4.0 1.3 Iris-versicolor test 2 4.9 2.4 3.3 1.0 Iris-versicolor test 3 5.0 2.0 3.5 1.0 Iris-versicolor test 4 6.0 2.2 4.0 1.0 Iris-versicolor testテーブルにデータを書き込むときに、テーブルの有効期間 (TTL) を指定できます。 たとえば、次のステートメントでは、テーブルのTTLを10日に設定します。
print(iris[iris.sepalwidth < 2.5].persist('pyodps_iris', lifecycle=10).head(5))次の応答が返されます。
sepallength sepalwidth petallength petalwidth name 0 4.5 2.3 1.3 0.3 Iris-setosa 1 5.5 2.3 4.0 1.3 Iris-versicolor 2 4.9 2.4 3.3 1.0 Iris-versicolor 3 5.0 2.0 3.5 1.0 Iris-versicolor 4 6.0 2.2 4.0 1.0 Iris-versicolorデータソースにMaxComputeオブジェクトが含まれておらず、pandasオブジェクトのみが含まれている場合、
persistメソッドを呼び出すときに、MaxComputeエントランスオブジェクトを手動で指定するか、エントランスオブジェクトをグローバルオブジェクトとしてマークする必要があります。# The entrance object is o. # Specify the entrance object. df.persist('table_name', odps=o) # Alternative operation: Mark the entrance object as a global object. o.to_global() df.persist('table_name')
結果をpandas DataFrameに保存する
to_pandasメソッドを呼び出して、結果をpandas DataFrameオブジェクトに保存できます。 wrapパラメーターがTrueに設定されている場合、PyODPS DataFrameオブジェクトが返されます。
例1:
to_pandasメソッドを呼び出して、pandas DataFrameオブジェクトを返します。print(type(iris[iris.sepalwidth < 2.5].to_pandas()))次の応答が返されます。
<class 'pandas.core.frame.DataFrame'>例2: PyODPS DataFrameオブジェクトを返すには、
wrapパラメーターをTrueに設定します。print(type(iris[iris.sepalwidth < 2.5].to_pandas(wrap=True)))次の応答が返されます。
<class 'odps.df.core.DataFrame'>
PyODPSでopen_readerメソッドを呼び出し、reader.to_pandas() を使用して結果をpandas DataFrameオブジェクトに変換できます。 詳細については、「テーブル」をご参照ください。
ランタイムパラメーターの設定
execute、persist、to_pandasなど、すぐに実行されるメソッドのランタイムパラメーターを設定できます。 この設定は、MaxCompute SQLバックエンドに対してのみ有効です。
グローバルパラメーターを設定します。 詳細は、「SQL」をご参照ください。
これらのメソッドで
hintsパラメーターを指定します。 これにより、指定されたランタイムパラメータが現在の計算に対してのみ有効になります。print(iris[iris.sepallength < 5].to_pandas(hints={'odps.sql.mapper.split.size': 16}))次の応答が返されます。
sepallength sepalwidth petallength petalwidth name 0 4.5 2.3 1.3 0.3 Iris-setosa 1 4.9 2.4 3.3 1.0 Iris-versicolor
実行時に詳細を表示する
実行時にインスタンスのLogView情報を表示するには、グローバル設定を変更する必要があります。 次のコードは例を示しています。
from odps import options options.verbose = True print(iris[iris.sepallength < 5].exclude('sepallength')[:5].execute())次の応答が返されます。
Sql compiled: SELECT t1.`sepalwidth`, t1.`petallength`, t1.`petalwidth`, t1.`name` FROM odps_test_sqltask_finance.`pyodps_iris` t1 WHERE t1.`sepallength` < 5 LIMIT 5 Instance ID: Log view:http://logview sepalwidth petallength petalwidth name 0 2.3 1.3 0.3 Iris-setosa 1 2.4 3.3 1.0 Iris-versicolorログ機能を指定できます。 次のコードは例を示しています。
my_logs = [] def my_logger(x): my_logs.append(x) options.verbose_log = my_logger print(iris[iris.sepallength < 5].exclude('sepallength')[:5].execute()) print(my_logs)次の応答が返されます。
sepalwidth petallength petalwidth name 0 2.3 1.3 0.3 Iris-setosa 1 2.4 3.3 1.0 Iris-versicolor ['Sql compiled:', 'CREATE TABLE tmp_pyodps_24332bdb_4fd0_4d0d_aed4_38a443618268 LIFECYCLE 1 AS \nSELECT t1.`sepalwidth`, t1.`petallength`, t1.`petalwidth`, t1.`name` \nFROM odps_test_sqltask_finance.`pyodps_iris` t1 \nWHERE t1.`sepallength` < 5 \nLIMIT 5', 'Instance ID: 20230815034706122gbymevg*****', ' Log view:]
Collectionオブジェクトの中間計算結果のキャッシュ
DataFrameの計算プロセス中に、いくつかのCollectionオブジェクトが複数回使用されます。 中間プロセスの実行結果を表示するには、cacheメソッドを呼び出して、最初に計算するコレクションオブジェクトをマークします。 次のコードは例を示しています。
cacheメソッドの実行は延期されます。 cacheメソッドが呼び出された直後に自動計算はトリガーされません。
cached = iris[iris.sepalwidth < 3.5]['sepallength', 'name'].cache()
df = cached.head(3)
print(df)
# The following result is returned:
sepallength name
0 4.5 Iris-setosa
1 5.5 Iris-versicolor
2 4.9 Iris-versicolor
# You can immediately retrieve the calculation result because cached is calculated.
print(cached.head(3))
# The following result is returned:
sepallength name
0 4.5 Iris-setosa
1 5.5 Iris-versicolor
2 4.9 Iris-versicolor非同期および並列実行
非同期実行
PyODPS DataFrameは非同期実行をサポートします。 asyncパラメーターを指定して、execute、persist、head、tail、およびto_pandasのすぐに実行されるメソッドの非同期実行を有効にできます。 timeoutパラメーターは、タイムアウト期間を指定します。 非同期操作では、Futureオブジェクトが返されます。
future = iris[iris.sepalwidth < 10].head(10, async_=True)
print(future.result())
# The following result is returned:
sepallength sepalwidth petallength petalwidth name
0 4.5 2.3 1.3 0.3 Iris-setosa
1 5.5 2.3 4.0 1.3 Iris-versicolor
2 4.9 2.4 3.3 1.0 Iris-versicolor
3 5.0 2.0 3.5 1.0 Iris-versicolor
4 6.0 2.2 4.0 1.0 Iris-versicolor
5 6.2 2.2 4.5 1.5 Iris-versicolor
6 5.5 2.4 3.8 1.1 Iris-versicolor
7 5.5 2.4 3.7 1.0 Iris-versicolor
8 6.3 2.3 4.4 1.3 Iris-versicolor
9 5.0 2.3 3.3 1.0 Iris-versicolor並列実行
新しく導入されたDelay API操作を呼び出して、すぐに実行される次のメソッドを延期できます。execute、persist、head、tail、to_pandas。 次に、Futureオブジェクトが返されます。 Delay API操作が呼び出されると、システムは依存関係を見つけ、指定された同時実行性に基づいてメソッドを実行します。 この場合、非同期実行がサポートされます。
from odps.df import Delay
delay = Delay() # Create a Delay object.
df = iris[iris.sepal_width < 5].cache() # Common dependency of subsequent expressions.
future1 = df.sepal_width.sum().execute(delay=delay) # Return a Future object. The execution is not started.
future2 = df.sepal_width.mean().execute(delay=delay)
future3 = df.sepal_length.max().execute(delay=delay)
delay.execute(n_parallel=3) # The execution starts with three concurrent threads.
|==========================================| 1 / 1 (100.00%) 21s
print(future1.result())
# The following result is returned:
25.0
print(future2.result())
# The following result is returned:
2.272727272727273上記の例では、PyODPS DataFrameは最初に共有依存関係のオブジェクトを実行します。 次に、PyODPS DataFrameは同時実行性を3に設定し、future1からfuture3までのオブジェクトを実行します。
delay.exe cuteでasyncパラメーターを指定して、非同期実行を有効にするかどうかを指定できます。 非同期実行が有効になっている場合は、timeoutパラメーターを使用してタイムアウト期間を指定することもできます。