PyODPS DataFrame supports a range of aggregation operations on large datasets: built-in aggregate functions, grouped aggregations, custom aggregations (user-defined aggregate functions, or UDAFs), and HyperLogLog-based distinct value estimation.
All examples use the pyodps_iris table as the data source:
from odps.df import DataFrame
iris = DataFrame(o.get_table('pyodps_iris'))
Built-in aggregation functions
The following aggregation functions are available on DataFrame columns.
| Function | Description |
|---|---|
count or size |
Counts the number of rows |
unique |
Counts the number of distinct values |
min |
Returns the minimum value |
max |
Returns the maximum value |
sum |
Returns the total sum |
mean |
Returns the mean value |
median |
Returns the median value |
quantile(p) |
Returns the p-quantile; returns accurate results only for integers |
var |
Returns the variance |
std |
Returns the standard deviation |
moment |
Returns the Nth central moment or Nth moment |
skew |
Returns the sample skewness (unbiased estimation) |
kurtosis |
Returns the sample kurtosis (unbiased estimation) |
cat |
Concatenates strings with a separator |
tolist |
Aggregates a column into a list |
PyODPS DataFrames ignore null values in aggregation operations on both the MaxCompute and pandas backends. This differs from pandas DataFrame behavior but matches SQL semantics.
Examples
Describe all numerical columns — call describe() to get the count, max, min, mean, and standard deviation at once:
print(iris.describe())
Output:
type sepal_length sepal_width petal_length petal_width
0 count 150.000000 150.000000 150.000000 150.000000
1 mean 5.843333 3.054000 3.758667 1.198667
2 std 0.828066 0.433594 1.764420 0.763161
3 min 4.300000 2.000000 1.000000 0.100000
4 max 7.900000 4.400000 6.900000 2.500000
Aggregate a single column:
iris.sepallength.max()
Output: 7.9
Aggregate over distinct values — call unique() before the aggregate function:
iris.name.unique().cat(sep=',')
Output: u'Iris-setosa,Iris-versicolor,Iris-virginica'
Aggregate all columns — if all columns support the same operation, apply it to the entire DataFrame:
iris.exclude('category').mean()
Output:
sepal_length sepal_width petal_length petal_width
1 5.843333 3.054000 3.758667 1.198667
Count all rows:
iris.count()
Output: 150
To display the result in logs, run print(iris.count().execute()).
Group and aggregate data
Groupby in PyODPS DataFrame follows a split-apply-combine model:
-
Split:
groupby()splits the data into groups based on one or more columns. -
Apply:
agg()oraggregate()applies an aggregation function to each group independently. -
Combine: the results are combined into a single DataFrame.
The result includes both the grouped column and the aggregated column.
Named aggregation
Pass keyword arguments to agg() to control output column names directly. This is named aggregation — use it whenever you want explicit control over the names of aggregated columns:
iris.groupby('name').agg(iris.sepallength.max(), smin=iris.sepallength.min())
Output:
name sepallength_max smin
0 Iris-setosa 5.8 4.3
1 Iris-versicolor 7.0 4.9
2 Iris-virginica 7.9 4.9
In this example, smin=iris.sepallength.min() renames the aggregated column to smin.
Count distinct values per group
Two equivalent approaches are available. Use value_counts() for conciseness:
# Using groupby + agg
iris.groupby('name').agg(count=iris.name.count()).sort('count', ascending=False).head(5)
# Using value_counts (equivalent, more concise)
iris['name'].value_counts().head(5)
Both produce the same output:
name count
0 Iris-virginica 50
1 Iris-versicolor 50
2 Iris-setosa 50
Aggregate a single column from a group
Access the column by name after groupby() to retrieve only that aggregated column:
iris.groupby('name').petallength.sum()
Output:
petallength_sum
0 73.2
1 213.0
2 277.6
When using this syntax, you are limited to aggregate functions on that column. To apply non-null checks or other expressions, use agg() instead:
iris.groupby('name').agg(iris.petallength.notnull().sum())
Output:
name petallength_sum
0 Iris-setosa 50
1 Iris-versicolor 50
2 Iris-virginica 50
Group by a constant value
To aggregate all rows together without a natural grouping column, group by a constant using Scalar:
from odps.df import Scalar
iris.groupby(Scalar(1)).petallength.sum()
Output:
petallength_sum
0 563.8
Write custom aggregations
Use agg() or aggregate() to apply a user-defined aggregate function (UDAF) to a column. A custom aggregation class must implement four methods:
| Method | Description |
|---|---|
buffer() |
Returns a mutable object (list or dict) that accumulates partial results. The buffer size must not grow with the amount of data. |
__call__(buffer, *val) |
Adds a value to the buffer. |
merge(buffer, pbuffer) |
Merges a partial buffer (pbuffer) into the main buffer. |
getvalue(buffer) |
Returns the final aggregated value. |
Example: custom mean
class Agg(object):
def buffer(self):
return [0.0, 0]
def __call__(self, buffer, val):
buffer[0] += val
buffer[1] += 1
def merge(self, buffer, pbuffer):
buffer[0] += pbuffer[0]
buffer[1] += pbuffer[1]
def getvalue(self, buffer):
if buffer[1] == 0:
return 0.0
return buffer[0] / buffer[1]iris.sepalwidth.agg(Agg)
Output: 3.0540000000000007
Usage notes
Specify the output data type when it differs from the input:
iris.sepalwidth.agg(Agg, 'float')
Combine with groupby:
iris.groupby('name').sepalwidth.agg(Agg)
Output:
petallength_aggregation
0 3.418
1 2.770
2 2.974
Aggregate multiple columns — use agg() from odps.df and pass a list of columns:
class Agg(object):
def buffer(self):
return [0.0, 0.0]
def __call__(self, buffer, val1, val2):
buffer[0] += val1
buffer[1] += val2
def merge(self, buffer, pbuffer):
buffer[0] += pbuffer[0]
buffer[1] += pbuffer[1]
def getvalue(self, buffer):
if buffer[1] == 0:
return 0.0
return buffer[0] / buffer[1]from odps.df import agg
to_agg = agg([iris.sepalwidth, ], Agg, rtype='float') # Call a user-defined aggregate function (UDAF) to aggregate data in two columns.
iris.groupby('name').agg(val=to_agg)
Output:
name val
0 Iris-setosa 0.682781
1 Iris-versicolor 0.466644
2 Iris-virginica 0.451427
Call an existing MaxCompute UDAF by name — pass the UDAF name as a string instead of a class:
# Single column
iris.groupby('name').agg(iris.sepalwidth.agg('your_func'))
# Multiple columns
to_agg = agg([iris.sepalwidth, ], 'your_func', rtype='float')
iris.groupby('name').agg(to_agg.rename('val'))
Due to Python UDF limitations, LIST and DICT types cannot be used as the input or output data type for custom aggregations.
HyperLogLog counting
hll_count implements the HyperLogLog algorithm to estimate the number of distinct values in a column. It returns an approximate count, not an exact one. Use it when exact counts are too slow on large datasets — for example, to estimate unique visitors (UVs) quickly.
The example below uses a pandas DataFrame. Run it in a local environment. If running in DataWorks, import pandas via a third-party package first.
from odps.df import DataFrame
import pandas as pd
import numpy as np
df = DataFrame(pd.DataFrame({'a': np.random.randint(100000, size=100000)}))
df.a.hll_count()
Output: 63270
For comparison, the exact distinct count:
df.a.nunique()
Output: 63250
The two results are close but not identical — hll_count trades a small margin of error for speed.
Use the splitter parameter to split string values before counting distinct elements.