PyODPS DataFrames use lazy evaluation: operations like column selection, filtering, and renaming build a query plan but do not run until you explicitly trigger execution. This topic shows how to work with DataFrame columns (sequences) and how to control when and how queries run — including synchronous execution, result caching, and parallel async execution.
Prerequisites
Before you begin, ensure that you have:
-
A MaxCompute project (Create a MaxCompute project)
-
A DataWorks workspace in public preview for DataStudio (Create a DataWorks workspace)
How lazy evaluation works
Working with a PyODPS DataFrame involves three stages:
-
Build — Wrap a MaxCompute table in a
DataFrameobject. -
Transform — Select columns, filter rows, rename, and compute aggregations. These steps define the query but do not run it.
-
Execute — Call an action method such as
.head(),.execute(), or.cache()to send the query to MaxCompute and return results.
This differs from pandas, where operations run immediately:
# pandas — result is available immediately
result = df.groupby("name").sepallength.max()
# PyODPS — only defines the query; no data is fetched yet
result = iris.groupby("name").sepallength.max()
result.head(5) # <-- this line triggers execution
Understanding this build-transform-execute pattern is key to using sequence operations and execution controls correctly.
Set up the dataset
Step 1: Create the pyodps_iris table
-
Log on to the DataWorks console and select a region in the upper-left corner.
-
On the Workspaces page, find the target workspace and in the Actions column, choose Shortcuts > DataStudio.
-
On the Debugging Configurations page, select Computing Resource and Resource Group. If no resource group is displayed, click Create Resource Group and wait a few minutes. On the Resource Groups page, attach a workspace to the resource group.
-
Create a MaxCompute SQL node and run the following statement to create the
pyodps_iristable.CREATE TABLE if not exists pyodps_iris ( sepallength DOUBLE comment 'Sepal length (cm)', sepalwidth DOUBLE comment 'Sepal width (cm)', petallength DOUBLE comment 'Petal length (cm)', petalwidth DOUBLE comment 'Petal width (cm)', name STRING comment 'Species' );
Step 2: Import the Iris dataset
-
Download and decompress the Iris flower dataset. Rename
iris.datatoiris.csv. -
Log on to the DataWorks console and select a region in the upper-left corner.
-
In the navigation pane on the left, choose Data Integration > Data Upload and Download.
-
Click Go to Data Upload and Download.
-
In the navigation pane on the left, click the upload icon
and click Upload Data. Follow the prompts to upload iris.csvinto thepyodps_iristable.
Sequence operations
Column-level operations in PyODPS are called sequence operations. They let you access, inspect, transform, and compute on individual columns without triggering a full query.
On the DataStudio page, create a PyODPS 2 node and run the following code.
Access a column
Both forms are equivalent — use whichever matches your coding style:
from odps import DataFrame
iris = DataFrame(o.get_table('iristable_new'))
# Attribute access
print iris.sepallength.head(5) # triggers execution, returns first 5 rows
# Index access — equivalent to the line above
print iris['sepallength'].head(5)
Inspect and cast the data type
# View the data type of a column — does not trigger execution
print iris.sepallength.dtype
# Cast to a different type — returns a new typed sequence, does not modify the original DataFrame
iris.sepallength.astype('int')
Aggregate
# Max per group — triggers execution when .head() is called
print iris.groupby('name').sepallength.max().head(5)
# Global max — triggers execution immediately
print iris.sepallength.max()
Rename a column
# rename() returns a new sequence with the new name — does not trigger execution
print iris.sepalwidth.rename('speal_width').head(5)
Arithmetic between columns
# Column arithmetic returns a new sequence — does not trigger execution
print (iris.sepallength + iris.sepalwidth).rename('sum_sepal').head(5)
Execution operations
Use execution operations to control when queries run, how logs are captured, and whether intermediate results are reused across multiple computations.
Create a PyODPS node named PyExecute and run the following code.
from odps import options
from odps import DataFrame
iris = DataFrame(o.get_table('pyodps_iris'))
# --- Synchronous execution with Logview logging ---
# Print the Logview URL for each query submitted to MaxCompute
options.verbose = True
# Triggers execution; prints the Logview URL to stdout
iris[iris.sepallength < 5].exclude('sepallength')[:5].execute()
# Capture log output in a list instead of printing to stdout
my_logs = []
def my_loggers(x):
my_logs.append(x)
options.verbose_log = my_loggers
# Triggers execution; the Logview URL is appended to my_logs instead of stdout
iris[iris.sepallength < 5].exclude('sepallength')[:5].execute()
print(my_logs)
# --- Caching intermediate results ---
# cache() triggers execution and stores the result
# Subsequent operations reuse the cached data without submitting a new MaxCompute job
cached = iris[iris.sepalwidth < 3.5].cache()
print cached.head(3) # reads from cache, no additional MaxCompute job
# --- Asynchronous parallel execution ---
from odps.df import Delay
delay = Delay() # create a Delay object to batch multiple executions
# Build a shared filtered DataFrame — computed once, shared across all three aggregations
df = iris[iris.sepalwidth < 5].cache()
# Queue three aggregations; none starts yet — this does not trigger execution
future1 = df.sepalwidth.sum().execute(delay=delay)
future2 = df.sepalwidth.mean().execute(delay=delay)
future3 = df.sepalwidth.max().execute(delay=delay)
# Submit all three jobs in parallel
delay.execute(n_parallel=3)
# Retrieve results after all jobs complete
print future1.result()
print future2.result()
print future3.result()
Execution methods
| Method | Triggers execution | Returns |
|---|---|---|
.head(n) |
Yes | First n rows as a local result |
.execute() |
Yes | Full result set |
.cache() |
Yes | Cached collection for reuse |
delay.execute(n_parallel=n) |
Yes (batched) | Starts all queued futures in parallel |
When to use each pattern
-
.execute()— Standard trigger for a single query. Setoptions.verbose = Trueto log the Logview URL for debugging. -
.cache()— Use when the same filtered or transformed dataset feeds multiple downstream operations. Avoids resubmitting the same job to MaxCompute. -
Delay+n_parallel— Use when you have independent aggregations that can run concurrently. Reduces total wall-clock time compared to running them sequentially.