本文为您介绍DataFrame操作支持的执行方法。
延迟执行
execute
方法,或者调用立即执行的方法时(内部调用的也是execute
),才会执行这些操作。立即执行的方法如下表所示。
方法 | 说明 | 返回值 |
---|---|---|
persist | 将执行结果保存到MaxCompute表。 | PyODPS DataFrame |
execute | 执行并返回全部结果。 | ResultFrame |
head | 查看开头N行数据,这个方法会执行所有结果,并取开头N行数据。 | ResultFrame |
tail | 查看结尾N行数据,这个方法会执行所有结果,并取结尾N行数据。 | ResultFrame |
to_pandas | 转化为Pandas DataFrame或者Series,wrap参数为True的时候,返回PyODPS DataFrame对象。 |
|
plot,hist,boxplot | 画图有关。 | 不涉及 |
repr
的时候,调用execute
方法,您无需手动调用execute
。
>>> iris[iris.sepallength < 5][:5]
sepallength sepalwidth petallength petalwidth name
0 4.9 3.0 1.4 0.2 Iris-setosa
1 4.7 3.2 1.3 0.2 Iris-setosa
2 4.6 3.1 1.5 0.2 Iris-setosa
3 4.6 3.4 1.4 0.3 Iris-setosa
4 4.4 2.9 1.4 0.2 Iris-setosa
>>> from odps import options
>>> options.interactive = False
>>>
>>> iris[iris.sepallength < 5][:5]
Collection: ref_0
odps.Table
name: odps_test_sqltask_finance.`pyodps_iris`
schema:
sepallength : double
sepalwidth : double
petallength : double
petalwidth : double
name : string
Collection: ref_1
Filter[collection]
collection: ref_0
predicate:
Less[sequence(boolean)]
sepallength = Column[sequence(float64)] 'sepallength' from collection ref_0
Scalar[int8]
5
Slice[collection]
collection: ref_1
stop:
Scalar[int8]
5
此时打印repr
对象,会显示整个抽象语法树。
>>> result = iris.head(3)
>>> for r in result:
>>> print(list(r))
[5.0999999999999996, 3.5, 1.3999999999999999, 0.20000000000000001, u'Iris-setosa']
[4.9000000000000004, 3.0, 1.3999999999999999, 0.20000000000000001, u'Iris-setosa']
[4.7000000000000002, 3.2000000000000002, 1.3, 0.20000000000000001, u'Iris-setosa']
pd_df = iris.head(3).to_pandas() # 返回Pandas DataFrame。
wrapped_df = iris.head(3).to_pandas(wrap=True) # 返回使用Pandas后端的PyODPS DataFrame。
保存执行结果为MaxCompute表
persist
方法,用于返回一个新的DataFrame对象,参数为表名。
>>> iris2 = iris[iris.sepalwidth < 2.5].persist('pyodps_iris2')
>>> iris2.head(5)
sepallength sepalwidth petallength petalwidth name
0 4.5 2.3 1.3 0.3 Iris-setosa
1 5.5 2.3 4.0 1.3 Iris-versicolor
2 4.9 2.4 3.3 1.0 Iris-versicolor
3 5.0 2.0 3.5 1.0 Iris-versicolor
4 6.0 2.2 4.0 1.0 Iris-versicolor
persist
可以传入partitions
参数,用于创建一个分区表。它的分区是partitions
所指定的字段。
iris3 = iris[iris.sepalwidth < 2.5].persist('pyodps_iris3', partitions=['name'])
iris3.data
odps.Table
name: odps_test_sqltask_finance.`pyodps_iris3`
schema:
sepallength : double
sepalwidth : double
petallength : double
petalwidth : double
partitions:
name : string
如果您需要写入已经存在的表的某个分区,persist
可以传入partition
参数,指明写入表的哪个分区(例如ds=******
)。该DataFrame的每个字段的类型都必须相同,且都存在于该表中。drop_partition
和create_partition
参数只在此时有效,分别表示是否要删除(如果分区存在)或创建(如果分区不存在)该分区。
iris[iris.sepalwidth < 2.5].persist('pyodps_iris4', partition='ds=test', drop_partition=True, create_partition=True)
iris[iris.sepalwidth < 2.5].persist('pyodps_iris5', lifecycle=10)
persist
时需要手动指定ODPS入口对象,或者将需要的入口对象标明为全局对象。
# 假设入口对象为o。
# 指定入口对象。
df.persist('table_name', odps=o)
# 或者可将入口对象标记为全局。
o.to_global()
df.persist('table_name')
保存执行结果为Pandas DataFrame
您可以使用to_pandas
方法,如果wrap
参数为True,将返回PyODPS DataFrame对象。
type(iris[iris.sepalwidth < 2.5].to_pandas())
pandas.core.frame.DataFrame
type(iris[iris.sepalwidth < 2.5].to_pandas(wrap=True))
odps.df.core.DataFrame
将表中的数据读出,PyODPS可以执行open_reader
方法,通过reader.to_pandas()
转成Pandas DataFrame。
立即运行设置运行参数
execute
、persist
、to_pandas
等,您可以通过以下方法设置它们运行时的参数(仅对ODPS SQL后端有效):
- 设置全局参数。详情请参见SQL。
- 在这些立即执行的方法上,使用
hints
参数,可以确保这些参数只作用于当前的计算过程。iris[iris.sepallength < 5].to_pandas(hints={'odps.sql.mapper.split.size': 16})
运行时显示详细信息
from odps import options
options.verbose = True
iris[iris.sepallength < 5].exclude('sepallength')[:5].execute()
Sql compiled:
SELECT t1.`sepalwidth`, t1.`petallength`, t1.`petalwidth`, t1.`name`
FROM odps_test_sqltask_finance.`pyodps_iris` t1
WHERE t1.`sepallength` < 5
LIMIT 5
logview:
http://logview
sepalwidth petallength petalwidth name
0 3.0 1.4 0.2 Iris-setosa
1 3.2 1.3 0.2 Iris-setosa
2 3.1 1.5 0.2 Iris-setosa
3 3.4 1.4 0.3 Iris-setosa
4 2.9 1.4 0.2 Iris-setosa
my_logs = []
def my_logger(x):
my_logs.append(x)
options.verbose_log = my_logger
iris[iris.sepallength < 5].exclude('sepallength')[:5].execute()
sepalwidth petallength petalwidth name
0 3.0 1.4 0.2 Iris-setosa
1 3.2 1.3 0.2 Iris-setosa
2 3.1 1.5 0.2 Iris-setosa
3 3.4 1.4 0.3 Iris-setosa
4 2.9 1.4 0.2 Iris-setosa
print(my_logs)
['Sql compiled:', 'SELECT t1.`sepalwidth`, t1.`petallength`, t1.`petalwidth`, t1.`name` \nFROM odps_test_sqltask_finance.`pyodps_iris` t1 \nWHERE t1.`sepallength` < 5 \nLIMIT 5', 'logview:', u'http://logview']
缓存中间Collection计算结果
DataFrame的计算过程中,部分Collection被多处使用。如果您需要查看中间过程的执行结果, 可以使用cache
标记某个需要被优先计算的Collection。
cache
会延迟执行,调用cache
不会触发立即计算。
cached = iris[iris.sepalwidth < 3.5].cache()
df = cached['sepallength', 'name'].head(3)
df
sepallength name
0 4.9 Iris-setosa
1 4.7 Iris-setosa
2 4.6 Iris-setosa
cached.head(3) # 由于cached已经被计算,所以能立刻取到计算结果。
sepallength name
0 4.9 Iris-setosa
1 4.7 Iris-setosa
2 4.6 Iris-setosa
异步和并行执行
DataFrame支持异步操作,对于立即执行的方法,包括execute
、persist
、head
、tail
、to_pandas
(其他方法不支持), 传入async
参数,即可以将这个操作异步执行,timeout
参数指定超时时间, 异步返回的是 Future对象。
from odps.df import Delay
delay = Delay() # 创建Delay对象。
df = iris[iris.sepal_width < 5].cache() # 有一个共同的依赖。
future1 = df.sepal_width.sum().execute(delay=delay) # 立即返回future对象,此时并没有执行。
future2 = df.sepal_width.mean().execute(delay=delay)
future3 = df.sepal_length.max().execute(delay=delay)
delay.execute(n_parallel=3) # 并发度是3,此时才真正执行。
|==========================================| 1 / 1 (100.00%) 21s
future1.result()
458.10000000000014
future2.result()
3.0540000000000007
上述示例中,共同依赖的对象会先执行,然后再以并发度为3分别执行future1
到future3
。 当n_parallel
为1时,执行时间会达到37s。
delay.execute
也接受async
操作指定是否异步执行,当异步执行的时候,也可以用timeout
参数指定超时时间。