PyODPS DataFrame uses deferred execution — operations don't run until you explicitly trigger them. This page explains how to trigger execution, retrieve and save results, configure runtime parameters, and run operations asynchronously or in parallel.
Prerequisites
Before you begin, make sure you have:
-
A sample table named
pyodps_iris. See DataFrame data processing for setup instructions. -
A DataFrame object created from a MaxCompute table. See the "Create a DataFrame object from a MaxCompute table" section in Create a DataFrame object.
How it works
PyODPS DataFrame separates defining an operation from executing it. When you chain filters, projections, or aggregations, nothing runs — you're building a logical plan. Execution starts only when you call an action: a method that explicitly triggers the plan and returns results.
The following methods are actions:
| Method | What it does | Returns |
|---|---|---|
execute |
Runs the operation and returns all results | ResultFrame |
head |
Runs the operation and returns the first N rows | ResultFrame |
tail |
Runs the operation and returns the last N rows | ResultFrame |
persist |
Saves results to a MaxCompute table and returns a new DataFrame pointing to that table | PyODPS DataFrame |
to_pandas |
Converts a Collection to a pandas DataFrame, or a Sequence to a Series. Set wrap=True to get a PyODPS DataFrame instead. |
pandas DataFrame or PyODPS DataFrame |
plot, hist, boxplot |
Plotting methods | N/A |
In interactive environments (such as Jupyter notebooks), PyODPS DataFrame automatically calls execute when displaying results or calling repr — no manual call needed.
# Non-interactive environment: call execute() explicitly
print(iris[iris.sepallength < 5][:5].execute())
# Interactive environment: execute is called automatically
iris[iris.sepallength < 5][:5]
Both produce:
sepallength sepalwidth petallength petalwidth name
0 4.9 3.0 1.4 0.2 Iris-setosa
1 4.7 3.2 1.3 0.2 Iris-setosa
2 4.6 3.1 1.5 0.2 Iris-setosa
3 4.6 3.4 1.4 0.3 Iris-setosa
4 4.4 2.9 1.4 0.2 Iris-setosa
To disable automatic execution in an interactive environment:
from odps import options
options.interactive = False
# Now repr() displays the abstract syntax tree (AST), not results
iris[iris.sepallength < 5][:5]
Output:
Collection: ref_0
odps.Table
name: hudi_mc_0612.`iris3`
schema:
sepallength : double # Sepal length (cm)
sepalwidth : double # Sepal width (cm)
petallength : double # Petal length (cm)
petalwidth : double # Petal width (cm)
name : string # Type
Collection: ref_1
Filter[collection]
collection: ref_0
predicate:
Less[sequence(boolean)]
sepallength = Column[sequence(float64)] 'sepallength' from collection ref_0
Scalar[int8]
5
Slice[collection]
collection: ref_1
stop:
Scalar[int8]
5
After disabling automatic execution, call execute explicitly to get results.
Retrieve results from a ResultFrame
execute and head return a ResultFrame — a read-only result set. Use it to iterate over records or convert to pandas, but note that a ResultFrame cannot be used in further DataFrame calculations.
Iterate over records:
result = iris.head(3)
for r in result:
print(list(r))
Output:
[4.9, 3.0, 1.4, 0.2, 'Iris-setosa']
[4.7, 3.2, 1.3, 0.2, 'Iris-setosa']
[4.6, 3.1, 1.5, 0.2, 'Iris-setosa']
If pandas is installed, convert a ResultFrame to a pandas DataFrame or a PyODPS DataFrame:
# Returns a pandas DataFrame
pd_df = iris.head(3).to_pandas()
# Returns a PyODPS DataFrame backed by pandas
wrapped_df = iris.head(3).to_pandas(wrap=True)
Alternatively, useopen_readerwithreader.to_pandas()to convert results to a pandas DataFrame. See Tables.
Save results to MaxCompute tables
Use persist when you want to write results back to MaxCompute and continue working with the output as a DataFrame. Unlike execute — which returns a ResultFrame for local use — persist writes to a table and returns a new PyODPS DataFrame pointing to that table.
Save to a new table
Pass the table name to persist:
iris2 = iris[iris.sepalwidth < 2.5].persist('pyodps_iris')
print(iris2.head(5))
Output:
sepallength sepalwidth petallength petalwidth name
0 4.5 2.3 1.3 0.3 Iris-setosa
1 5.5 2.3 4.0 1.3 Iris-versicolor
2 4.9 2.4 3.3 1.0 Iris-versicolor
3 5.0 2.0 3.5 1.0 Iris-versicolor
4 6.0 2.2 4.0 1.0 Iris-versicolor
Save to a partitioned table
Use the partitions parameter to create a partitioned table. The table is partitioned on the specified columns:
iris3 = iris[iris.sepalwidth < 2.5].persist('pyodps_iris_test', partitions=['name'])
print(iris3.data)
Output:
odps.Table
name: odps_test_sqltask_finance.`pyodps_iris`
schema:
sepallength : double
sepalwidth : double
petallength : double
petalwidth : double
partitions:
name : string
Write to an existing partition
Use the partition parameter to target a specific partition of an existing table (for example, ds=test). The table must contain all columns of the DataFrame with matching types.
-
drop_partition=True: drops the partition if it already exists -
create_partition=True: creates the partition if it doesn't exist
Both parameters are valid only when partition is specified.
print(iris[iris.sepalwidth < 2.5].persist(
'pyodps_iris_partition',
partition='ds=test',
drop_partition=True,
create_partition=True
).head(5))
Output:
sepallength sepalwidth petallength petalwidth name ds
0 4.5 2.3 1.3 0.3 Iris-setosa test
1 5.5 2.3 4.0 1.3 Iris-versicolor test
2 4.9 2.4 3.3 1.0 Iris-versicolor test
3 5.0 2.0 3.5 1.0 Iris-versicolor test
4 6.0 2.2 4.0 1.0 Iris-versicolor test
Set a lifecycle (time-to-live)
Use the lifecycle parameter to set how many days the table data is retained. For example, set a 10-day lifecycle:
print(iris[iris.sepalwidth < 2.5].persist('pyodps_iris', lifecycle=10).head(5))
Output:
sepallength sepalwidth petallength petalwidth name
0 4.5 2.3 1.3 0.3 Iris-setosa
1 5.5 2.3 4.0 1.3 Iris-versicolor
2 4.9 2.4 3.3 1.0 Iris-versicolor
3 5.0 2.0 3.5 1.0 Iris-versicolor
4 6.0 2.2 4.0 1.0 Iris-versicolor
Persist from a pandas-only data source
If your DataFrame has no MaxCompute objects (only pandas objects), specify the MaxCompute entrance object when calling persist:
# Option 1: pass the entrance object directly
df.persist('table_name', odps=o)
# Option 2: mark the entrance object as global
o.to_global()
df.persist('table_name')
Save results to a pandas DataFrame
Call to_pandas to convert results to a pandas DataFrame. Set wrap=True to get a PyODPS DataFrame instead.
# Returns a pandas DataFrame
print(type(iris[iris.sepalwidth < 2.5].to_pandas()))
# <class 'pandas.core.frame.DataFrame'>
# Returns a PyODPS DataFrame
print(type(iris[iris.sepalwidth < 2.5].to_pandas(wrap=True)))
# <class 'odps.df.core.DataFrame'>
Configure runtime parameters
Pass the hints parameter to execute, persist, or to_pandas to set runtime parameters for that specific call. This works only with the MaxCompute SQL backend.
print(iris[iris.sepallength < 5].to_pandas(hints={'odps.sql.mapper.split.size': 16}))
Output:
sepallength sepalwidth petallength petalwidth name
0 4.5 2.3 1.3 0.3 Iris-setosa
1 4.9 2.4 3.3 1.0 Iris-versicolor
To set global runtime parameters instead, see SQL.
View runtime details
Set options.verbose = True to print the compiled SQL, instance ID, and LogView URL for each operation:
from odps import options
options.verbose = True
print(iris[iris.sepallength < 5].exclude('sepallength')[:5].execute())
Output:
Sql compiled:
SELECT t1.`sepalwidth`, t1.`petallength`, t1.`petalwidth`, t1.`name`
FROM odps_test_sqltask_finance.`pyodps_iris` t1
WHERE t1.`sepallength` < 5
LIMIT 5
Instance ID:
Log view:http://logview
sepalwidth petallength petalwidth name
0 2.3 1.3 0.3 Iris-setosa
1 2.4 3.3 1.0 Iris-versicolor
To capture log output in your code, assign a custom function to options.verbose_log:
my_logs = []
def my_logger(x):
my_logs.append(x)
options.verbose_log = my_logger
print(iris[iris.sepallength < 5].exclude('sepallength')[:5].execute())
print(my_logs)
Output:
sepalwidth petallength petalwidth name
0 2.3 1.3 0.3 Iris-setosa
1 2.4 3.3 1.0 Iris-versicolor
['Sql compiled:', 'CREATE TABLE tmp_pyodps_24332bdb_4fd0_4d0d_aed4_38a443618268 LIFECYCLE 1 AS \nSELECT t1.`sepalwidth`, t1.`petallength`, t1.`petalwidth`, t1.`name` \nFROM odps_test_sqltask_finance.`pyodps_iris` t1 \nWHERE t1.`sepallength` < 5 \nLIMIT 5', 'Instance ID: 20230815034706122gbymevg*****', ' Log view:]
Cache intermediate results
When multiple downstream operations share the same expensive intermediate Collection, use cache to avoid recomputing it each time. Mark the Collection with cache before branching — execution is still deferred and doesn't start at the point of the cache call.
cached = iris[iris.sepalwidth < 3.5]['sepallength', 'name'].cache()
df = cached.head(3)
print(df)
# The following result is returned:
sepallength name
0 4.5 Iris-setosa
1 5.5 Iris-versicolor
2 4.9 Iris-versicolor
# cached is already computed, so this returns immediately without re-executing
print(cached.head(3))
# The following result is returned:
sepallength name
0 4.5 Iris-setosa
1 5.5 Iris-versicolor
2 4.9 Iris-versicolor
Run operations asynchronously and in parallel
Asynchronous execution
Pass async_=True to execute, persist, head, tail, or to_pandas to run the operation asynchronously. The method returns a Future object immediately. Use the timeout parameter to set a timeout.
future = iris[iris.sepalwidth < 10].head(10, async_=True)
print(future.result())
Output:
sepallength sepalwidth petallength petalwidth name
0 4.5 2.3 1.3 0.3 Iris-setosa
1 5.5 2.3 4.0 1.3 Iris-versicolor
2 4.9 2.4 3.3 1.0 Iris-versicolor
3 5.0 2.0 3.5 1.0 Iris-versicolor
4 6.0 2.2 4.0 1.0 Iris-versicolor
5 6.2 2.2 4.5 1.5 Iris-versicolor
6 5.5 2.4 3.8 1.1 Iris-versicolor
7 5.5 2.4 3.7 1.0 Iris-versicolor
8 6.3 2.3 4.4 1.3 Iris-versicolor
9 5.0 2.3 3.3 1.0 Iris-versicolor
Parallel execution with the Delay API
Use the Delay API to defer execute, persist, head, tail, and to_pandas calls. When you call delay.execute, the system identifies dependencies among the deferred operations and executes them based on the specified concurrency. The deferred methods return Future objects; execution does not start until delay.execute is called.
from odps.df import Delay
delay = Delay() # Create a Delay object
df = iris[iris.sepal_width < 5].cache() # Shared base for all branches
# Register each branch — returns Future objects; execution has not started yet
future1 = df.sepal_width.sum().execute(delay=delay)
future2 = df.sepal_width.mean().execute(delay=delay)
future3 = df.sepal_length.max().execute(delay=delay)
# Start execution with 3 concurrent threads
delay.execute(n_parallel=3)
# |==========================================| 1 / 1 (100.00%) 21s
print(future1.result())
# 25.0
print(future2.result())
# 2.272727272727273
PyODPS DataFrame first executes the shared df object, then runs future1 through future3 with the specified concurrency.
Pass async_=True to delay.execute to run the entire batch asynchronously. Use the timeout parameter to set a timeout for the batch.