本文为您介绍DataFrame支持的聚合操作,以及如何实现分组聚合和编写自定义聚合。DataFrame提供对列进行HyperLogLog计数的接口。

from odps.df import DataFrame
iris = DataFrame(o.get_table('pyodps_iris'))
常用聚合操作如下:
  • 使用describe函数,查看DataFrame里数字列的数量、最大值、最小值、平均值以及标准差。
    >>> print(iris.describe())
    返回结果如下。
        type  sepal_length  sepal_width  petal_length  petal_width
    0  count    150.000000   150.000000    150.000000   150.000000
    1   mean      5.843333     3.054000      3.758667     1.198667
    2    std      0.828066     0.433594      1.764420     0.763161
    3    min      4.300000     2.000000      1.000000     0.100000
    4    max      7.900000     4.400000      6.900000     2.500000
  • 使用单列执行聚合操作。
    >>> iris.sepallength.max()
    返回结果如下。
    7.9
  • 如果在消除重复后的列上进行聚合,可以先调用unique函数,再调用相应的聚合函数。
    >>> iris.name.unique().cat(sep=',')
    返回结果如下。
    u'Iris-setosa,Iris-versicolor,Iris-virginica'
  • 如果所有列支持同一种聚合操作,可以直接在整个DataFrame上执行聚合操作。
    >>> iris.exclude('category').mean()
    返回结果如下。
       sepal_length  sepal_width  petal_length  petal_width
    1      5.843333     3.054000      3.758667     1.198667
  • 使用count函数获取DataFrame的总行数。
    >>> iris.count()
    返回结果如下。
    150
    说明 如果需要打印对应数据到日志中,请执行print(iris.count().execute())
PyODPS支持的聚合操作,如下表所示。
聚合操作 说明
count(或size) 数量。
unique 不重复值数量。
min 最小值。
max 最大值。
sum 求和。
mean 均值。
median 中位数。
quantile(p) p分位数,仅在整数值下可取得准确值。
var 方差。
std 标准差。
moment n阶中心矩(或n阶矩)。
skew 样本偏度(无偏估计)。
kurtosis 样本峰度(无偏估计)。
cat 按sep做字符串连接操作。
tolist 组合为LIST。
说明 不同于Pandas,对于列上的聚合操作,无论是在MaxCompute还是Pandas后端,PyODPS DataFrame都会忽略空值。这一逻辑与SQL类似。

分组聚合

分组聚合操作如下:
  • DataFrame提供了groupby函数执行分组操作,分组后通过调用agg或者aggregate方法,执行聚合操作。最终的结果列中会包含分组的列和聚合的列。
    >>> iris.groupby('name').agg(iris.sepallength.max(), smin=iris.sepallength.min())
    返回结果如下。
                  name  sepallength_max  smin
    0      Iris-setosa              5.8   4.3
    1  Iris-versicolor              7.0   4.9
    2   Iris-virginica              7.9   4.9
  • DataFrame提供了value_counts函数,按某列分组后,将每个组的个数从大到小进行排列。
    • 使用groupby函数实现。
      >>> iris.groupby('name').agg(count=iris.name.count()).sort('count', ascending=False).head(5)
      返回结果如下。
                    name  count
      0   Iris-virginica     50
      1  Iris-versicolor     50
      2      Iris-setosa     50
    • 使用value_counts函数实现。
      >>> iris['name'].value_counts().head(5)
      返回结果如下。
                    name  count
      0   Iris-virginica     50
      1  Iris-versicolor     50
      2      Iris-setosa     50
  • 对于聚合后的单列操作,您也可以直接取出列名。但此时只能使用聚合函数。
    >>> iris.groupby('name').petallength.sum()
    返回结果如下。
       petallength_sum
    0             73.2
    1            213.0
    2            277.6
    >>> iris.groupby('name').agg(iris.petallength.notnull().sum())
    返回结果如下。
                  name  petallength_sum
    0      Iris-setosa               50
    1  Iris-versicolor               50
    2   Iris-virginica               50
  • 分组时也支持对常量进行分组,但是需要使用Scalar初始化。
    >>> from odps.df import Scalar
    >>> iris.groupby(Scalar(1)).petallength.sum()
    返回结果如下。
       petallength_sum
    0            563.8

编写自定义聚合

对字段使用agg或者aggregate方法调用自定义聚合。自定义聚合需要提供一个类,这个类需要提供以下方法:

  • buffer():返回一个Mutable的Object(例如LIST或DICT),buffer大小不应随数据量增大而递增。
  • __call__(buffer, *val):将值聚合到中间buffer
  • merge(buffer, pbuffer):将pbuffer聚合到buffer中。
  • getvalue(buffer):返回最终值。
计算平均值的示例如下。
class Agg(object):

    def buffer(self):
        return [0.0, 0]

    def __call__(self, buffer, val):
        buffer[0] += val
        buffer[1] += 1

    def merge(self, buffer, pbuffer):
        buffer[0] += pbuffer[0]
        buffer[1] += pbuffer[1]

    def getvalue(self, buffer):
        if buffer[1] == 0:
            return 0.0
        return buffer[0] / buffer[1]
>>> iris.sepalwidth.agg(Agg)
返回结果如下。
3.0540000000000007
编写自定义聚合还需要关注如下内容:
  • 如果最终类型和输入类型发生了变化,则需要指定类型。
    >>> iris.sepalwidth.agg(Agg, 'float')
  • 自定义聚合也可以用在分组聚合中。
    >>> iris.groupby('name').sepalwidth.agg(Agg)
    返回结果如下。
       petallength_aggregation
    0                    3.418
    1                    2.770
    2                    2.974
  • 对多列可以使用agg方法调用自定义聚合。
    class Agg(object):
    
        def buffer(self):
            return [0.0, 0.0]
    
        def __call__(self, buffer, val1, val2):
            buffer[0] += val1
            buffer[1] += val2
    
        def merge(self, buffer, pbuffer):
            buffer[0] += pbuffer[0]
            buffer[1] += pbuffer[1]
    
        def getvalue(self, buffer):
            if buffer[1] == 0:
                return 0.0
            return buffer[0] / buffer[1]
    >>> from odps.df import agg
    >>> to_agg = agg([iris.sepalwidth, iris.sepallength], Agg, rtype='float')  # 对两列调用自定义聚合。
    >>> iris.groupby('name').agg(val=to_agg)
    返回结果如下。
                  name       val
    0      Iris-setosa  0.682781
    1  Iris-versicolor  0.466644
    2   Iris-virginica  0.451427
  • 如果您需要调用MaxCompute上已经存在的UDAF,指定函数名即可。
    >>> iris.groupby('name').agg(iris.sepalwidth.agg('your_func'))  # 对单列聚合。
    >>> to_agg = agg([iris.sepalwidth, iris.sepallength], 'your_func', rtype='float')
    >>> iris.groupby('name').agg(to_agg.rename('val'))  # 对多列聚合。
    说明 目前,因受限于Python UDF,自定义聚合无法支持将LIST或DICT类型作为初始输入或最终输出结果。

HyperLogLog计数

DataFrame提供了对列进行HyperLogLog计数的接口hll_count,这个接口是近似个数的估计接口。当数据量很大时,它可以较快地估计去重后的数据量。

使用该接口计算海量用户UV时,可以快速得出估计值。
>>> df = DataFrame(pd.DataFrame({'a': np.random.randint(100000, size=100000)}))
>>> df.a.hll_count()
返回结果如下。
63270
>>> df.a.nunique()
返回结果如下。
63250
说明 splitter参数会对每个字段进行分隔,再计算去重后的数据量。