This topic describes the execution methods that are supported in DataFrame API operations.
Deferred execution
execute
method or when you call the methods that internally call execute
. The following table lists the methods that internally call the execute method.
Method | Description | Return value |
---|---|---|
persist | Saves the execution results to MaxCompute tables. | PyODPS DataFrame |
execute | Executes the operations and returns all the results. | ResultFrame |
head | Executes the operations and returns the first N rows of result data. | ResultFrame |
tail | Executes the operations and returns the last N rows of result data. | ResultFrame |
to_pandas | Converts a Collection object to a pandas DataFrame object or a Sequence object to a Series object. If you set the wrap parameter to True, an Alibaba Cloud MaxCompute SDK for python (PyODPS) DataFrame object is returned. |
|
plot, hist, and boxplot | Plotting methods. | N/A |
execute
method when it displays result data or when it calls the repr
method. You do not need to manually call the execute
method.
>>> iris[iris.sepallength < 5][:5]
sepallength sepalwidth petallength petalwidth name
0 4.9 3.0 1.4 0.2 Iris-setosa
1 4.7 3.2 1.3 0.2 Iris-setosa
2 4.6 3.1 1.5 0.2 Iris-setosa
3 4.6 3.4 1.4 0.3 Iris-setosa
4 4.4 2.9 1.4 0.2 Iris-setosa
>>> from odps import options
>>> options.interactive = False
>>>
>>> iris[iris.sepallength < 5][:5]
Collection: ref_0
odps.Table
name: odps_test_sqltask_finance.`pyodps_iris`
schema:
sepallength : double
sepalwidth : double
petallength : double
petalwidth : double
name : string
Collection: ref_1
Filter[collection]
collection: ref_0
predicate:
Less[sequence(boolean)]
sepallength = Column[sequence(float64)] 'sepallength' from collection ref_0
Scalar[int8]
5
Slice[collection]
collection: ref_1
stop:
Scalar[int8]
5
If you call repr
to return a printable representation of the object, the entire abstract syntax tree
is displayed.
>>> result = iris.head(3)
>>> for r in result:
>>> print(list(r))
[5.0999999999999996, 3.5, 1.3999999999999999, 0.20000000000000001, u'Iris-setosa']
[4.9000000000000004, 3.0, 1.3999999999999999, 0.20000000000000001, u'Iris-setosa']
[4.7000000000000002, 3.2000000000000002, 1.3, 0.20000000000000001, u'Iris-setosa']
pd_df = iris.head(3).to_pandas() # Return a pandas DataFrame.
wrapped_df = iris.head(3).to_pandas(wrap=True) # Return a PyODPS DataFrame that uses the pandas backend.
Save results to MaxCompute tables
persist
method to return a new DataFrame object for a Collection object. The persist method
uses the table name as the parameter.
>>> iris2 = iris[iris.sepalwidth < 2.5].persist('pyodps_iris2')
>>> iris2.head(5)
sepallength sepalwidth petallength petalwidth name
0 4.5 2.3 1.3 0.3 Iris-setosa
1 5.5 2.3 4.0 1.3 Iris-versicolor
2 4.9 2.4 3.3 1.0 Iris-versicolor
3 5.0 2.0 3.5 1.0 Iris-versicolor
4 6.0 2.2 4.0 1.0 Iris-versicolor
partitions
parameter to the persist
method to create a partitioned table. The table is partitioned based on the columns
that are specified by partitions
.
iris3 = iris[iris.sepalwidth < 2.5].persist('pyodps_iris3', partitions=['name'])
iris3.data
odps.Table
name: odps_test_sqltask_finance.`pyodps_iris3`
schema:
sepallength : double
sepalwidth : double
petallength : double
petalwidth : double
partitions:
name : string
To write data to a partition of an existing table, you can pass the partition
parameter to the persist
method. The partition parameter specifies the destination partition. For example,
set the partition parameter to ds=******
. The table must contain all columns of the DataFrame object and the columns must
be of the same type. The drop_partition
and create_partition
parameters are valid only if the partition parameter is specified. The drop_partition
parameter specifies whether to delete the specified partition if the partition exists.
The create_partition parameter specifies whether to create the specified partition
if the partition does not exist.
iris[iris.sepalwidth < 2.5].persist('pyodps_iris4', partition='ds=test', drop_partition=True, create_partition=True)
iris[iris.sepalwidth < 2.5].persist('pyodps_iris5', lifecycle=10)
persist
method.
# Assume that the entrance object of PyODPS is o.
# Specify the entrance object.
df.persist('table_name', odps=o)
# Alternatively, mark the entrance object as global.
o.to_global()
df.persist('table_name')
Save results to pandas DataFrame
You can call the to_pandas
method to save results to pandas DataFrame objects. If the wrap
parameter is set to True, a PyODPS DataFrame object is returned.
type(iris[iris.sepalwidth < 2.5].to_pandas())
pandas.core.frame.DataFrame
type(iris[iris.sepalwidth < 2.5].to_pandas(wrap=True))
odps.df.core.DataFrame
After data is read from Tables, PyODPS can execute the open_reader
method and use reader.to_pandas()
to convert Collection objects to pandas DataFrame objects.
Set runtime parameters
execute
, persist
, and to_pandas
. This setting is valid only for the MaxCompute SQL backend.
- Set global parameters. For more information, see SQL.
- You can specify the
hints
parameter in these methods. This ensures that the specified runtime parameters are valid only for the current calculation.iris[iris.sepallength < 5].to_pandas(hints={'odps.sql.mapper.split.size': 16})
Display details at runtime
from odps import options
options.verbose = True
iris[iris.sepallength < 5].exclude('sepallength')[:5].execute()
Sql compiled:
SELECT t1.`sepalwidth`, t1.`petallength`, t1.`petalwidth`, t1.`name`
FROM odps_test_sqltask_finance.`pyodps_iris` t1
WHERE t1.`sepallength` < 5
LIMIT 5
logview:
http://logview
sepalwidth petallength petalwidth name
0 3.0 1.4 0.2 Iris-setosa
1 3.2 1.3 0.2 Iris-setosa
2 3.1 1.5 0.2 Iris-setosa
3 3.4 1.4 0.3 Iris-setosa
4 2.9 1.4 0.2 Iris-setosa
my_logs = []
def my_logger(x):
my_logs.append(x)
options.verbose_log = my_logger
iris[iris.sepallength < 5].exclude('sepallength')[:5].execute()
sepalwidth petallength petalwidth name
0 3.0 1.4 0.2 Iris-setosa
1 3.2 1.3 0.2 Iris-setosa
2 3.1 1.5 0.2 Iris-setosa
3 3.4 1.4 0.3 Iris-setosa
4 2.9 1.4 0.2 Iris-setosa
print(my_logs)
['Sql compiled:', 'SELECT t1.`sepalwidth`, t1.`petallength`, t1.`petalwidth`, t1.`name` \nFROM odps_test_sqltask_finance.`pyodps_iris` t1 \nWHERE t1.`sepallength` < 5 \nLIMIT 5', 'logview:', u'http://logview']
Cache intermediate results
When PyODPS DataFrame calculates data, some Collection objects are used multiple times.
To view the execution results of an intermediate process, you can call the cache
method to mark a Collection object that you want to calculate first.
cache
method is deferred. Automatic calculation is not immediately triggered after the
cache
method is called.
cached = iris[iris.sepalwidth < 3.5].cache()
df = cached['sepallength', 'name'].head(3)
df
sepallength name
0 4.9 Iris-setosa
1 4.7 Iris-setosa
2 4.6 Iris-setosa
cached.head(3) # The result can be immediately retrieved because the cached object is calculated.
sepallength name
0 4.9 Iris-setosa
1 4.7 Iris-setosa
2 4.6 Iris-setosa
Asynchronous and parallel execution
PyODPS DataFrame supports asynchronous operations. For the execute
, persist
, head
, tail
, and to_pandas
methods, you can pass the async
parameter to enable asynchronous execution. The timeout
parameter specifies the time-out period. Asynchronous operations return Future objects.
from odps.df import Delay
delay = Delay() # Create the Delay object.
df = iris[iris.sepal_width < 5].cache() # Common dependency of subsequent expressions.
future1 = df.sepal_width.sum().execute(delay=delay) # Return the Future object. The execution is not started.
future2 = df.sepal_width.mean().execute(delay=delay)
future3 = df.sepal_length.max().execute(delay=delay)
delay.execute(n_parallel=3) # The execution starts with three concurrent threads.
|==========================================| 1 / 1 (100.00%) 21s
future1.result()
458.10000000000014
future2.result()
3.0540000000000007
In the preceding example, PyODPS DataFrame first executes the object of the shared
dependency. Then, PyODPS DataFrame sets the degree of parallelism to 3 and executes
objects from future1
to future3
. If n_parallel
is set to 1, the execution time reaches 37 seconds.
You can pass the async
parameter to delay.execute
to specify whether to enable asynchronous execution. If asynchronous execution is
enabled, you can also use the timeout
parameter to specify the time-out period.