PyODPS DataFrame では、大規模データセットに対する多様な集約操作がサポートされています。具体的には、組み込み集計関数、グループ単位の集約、カスタム集約(ユーザー定義集計関数、UDAF)、および HyperLogLog を用いた相違値の推定です。
すべての例では、データソースとして pyodps_iris テーブルを使用します:
from odps.df import DataFrame
iris = DataFrame(o.get_table('pyodps_iris'))
組み込み集計関数
DataFrame の各列に対して利用可能な集計関数を以下に示します。
| 関数 | 説明 |
|---|---|
count または size |
行数をカウントします |
unique |
相違値の数をカウントします |
min |
最小値を返します |
max |
最大値を返します |
sum |
合計値を返します |
mean |
平均値を返します |
median |
中央値を返します |
quantile(p) |
p 分位数を返します。整数のみで正確な結果が得られます |
var |
分散を返します |
std |
標準偏差を返します |
moment |
N 次中心モーメントまたは N 次モーメントを返します |
skew |
標本歪度(不偏推定量)を返します |
kurtosis |
標本尖度(不偏推定量)を返します |
cat |
区切り文字で文字列を連結します |
tolist |
列をリストに集約します |
PyODPS DataFrame では、MaxCompute バックエンドおよび pandas バックエンドの両方において、集約操作時に null 値は無視されます。これは pandas DataFrame の動作とは異なりますが、SQL のセマンティクスと一致します。
使用例
すべての数値列の概要表示 — describe() を呼び出すと、カウント数、最大値、最小値、平均値、標準偏差を一度に取得できます:
print(iris.describe())
出力例:
type sepal_length sepal_width petal_length petal_width
0 count 150.000000 150.000000 150.000000 150.000000
1 mean 5.843333 3.054000 3.758667 1.198667
2 std 0.828066 0.433594 1.764420 0.763161
3 min 4.300000 2.000000 1.000000 0.100000
4 max 7.900000 4.400000 6.900000 2.500000
単一列の集約:
iris.sepallength.max()
出力: 7.9
相違値に対する集約 — 集約関数を呼び出す前に unique() を呼び出します:
iris.name.unique().cat(sep=',')
出力: u'Iris-setosa,Iris-versicolor,Iris-virginica'
全列に対する集約 — 全ての列が同一の操作をサポートする場合、DataFrame 全体にその操作を適用できます:
iris.exclude('category').mean()
出力:
sepal_length sepal_width petal_length petal_width
1 5.843333 3.054000 3.758667 1.198667
全行数のカウント:
iris.count()
出力: 150
ログに出力を表示するには、print(iris.count().execute()) を実行します。
データのグループ化と集約
PyODPS DataFrame の groupby は、split-apply-combine モデルに従います:
-
Split(分割):
groupby()により、1 つ以上の列に基づいてデータがグループに分割されます。 -
Apply(適用):
agg()またはaggregate()を使用して、各グループに対して集約関数を独立して適用します。 -
Combine(統合):結果が単一の DataFrame に統合されます。
結果には、グループ化対象の列と集約された列の両方が含まれます。
名前付き集約
agg() にキーワード引数を渡すことで、出力列名を直接制御できます。これを「名前付き集約」といいます。集約列の名前を明示的に制御したい場合には、この機能をご利用ください:
iris.groupby('name').agg(iris.sepallength.max(), smin=iris.sepallength.min())
出力:
name sepallength_max smin
0 Iris-setosa 5.8 4.3
1 Iris-versicolor 7.0 4.9
2 Iris-virginica 7.9 4.9
この例では、smin=iris.sepallength.min() により、集約列の名前が smin に変更されます。
グループごとのユニークな値の数
等価な 2 つのアプローチが利用可能です。value_counts() を使用すると、記述が簡潔になります:
# groupby + agg を使用した場合
iris.groupby('name').agg(count=iris.name.count()).sort('count', ascending=False).head(5)
# value_counts を使用した場合(等価かつ簡潔)
iris['name'].value_counts().head(5)
いずれも以下の出力を生成します:
name count
0 Iris-virginica 50
1 Iris-versicolor 50
2 Iris-setosa 50
グループから単一列の集約
groupby() の後に列名を指定することで、該当列のみを集約した結果を取得できます:
iris.groupby('name').petallength.sum()
出力:
petallength_sum
0 73.2
1 213.0
2 277.6
この構文を使用する場合、対象列に対する集約関数のみが利用可能です。null 値チェックやその他の式を適用するには、代わりに agg() を使用してください:
iris.groupby('name').agg(iris.petallength.notnull().sum())
出力:
name petallength_sum
0 Iris-setosa 50
1 Iris-versicolor 50
2 Iris-virginica 50
定数によるグループ化
自然なグループ化列がない場合に、すべての行をまとめて集約するには、Scalar を用いて定数でグループ化します:
from odps.df import Scalar
iris.groupby(Scalar(1)).petallength.sum()
出力:
petallength_sum
0 563.8
カスタム集約の作成
agg() または aggregate() を使用して、ユーザー定義集計関数(UDAF)を列に適用できます。カスタム集約クラスでは、以下の 4 つのメソッドを実装する必要があります:
| メソッド | 説明 |
|---|---|
buffer() |
部分的な結果を蓄積する可変オブジェクト(リストまたは辞書)を返します。バッファーのサイズは、データ量に応じて増加してはなりません。 |
__call__(buffer, *val) |
値をバッファーに追加します。 |
merge(buffer, pbuffer) |
部分バッファー(pbuffer)をメインのバッファーにマージします。 |
getvalue(buffer) |
最終的な集約値を返します。 |
例:カスタム平均値計算
class Agg(object):
def buffer(self):
return [0.0, 0]
def __call__(self, buffer, val):
buffer[0] += val
buffer[1] += 1
def merge(self, buffer, pbuffer):
buffer[0] += pbuffer[0]
buffer[1] += pbuffer[1]
def getvalue(self, buffer):
if buffer[1] == 0:
return 0.0
return buffer[0] / buffer[1]iris.sepalwidth.agg(Agg)
出力: 3.0540000000000007
使用上の注意点
出力データ型の指定 — 出力データ型が入力と異なる場合は、明示的に指定してください:
iris.sepalwidth.agg(Agg, 'float')
groupby との併用:
iris.groupby('name').sepalwidth.agg(Agg)
出力:
petallength_aggregation
0 3.418
1 2.770
2 2.974
複数列の集約 — agg() を odps.df からインポートし、列のリストを渡します:
class Agg(object):
def buffer(self):
return [0.0, 0.0]
def __call__(self, buffer, val1, val2):
buffer[0] += val1
buffer[1] += val2
def merge(self, buffer, pbuffer):
buffer[0] += pbuffer[0]
buffer[1] += pbuffer[1]
def getvalue(self, buffer):
if buffer[1] == 0:
return 0.0
return buffer[0] / buffer[1]from odps.df import agg
to_agg = agg([iris.sepalwidth, ], Agg, rtype='float') # ユーザー定義集計関数(UDAF)を用いて 2 列のデータを集約します。
iris.groupby('name').agg(val=to_agg)
出力:
name val
0 Iris-setosa 0.682781
1 Iris-versicolor 0.466644
2 Iris-virginica 0.451427
既存の MaxCompute UDAF の呼び出し — クラスではなく、UDAF の名前(文字列)を渡します:
# 単一列の場合
iris.groupby('name').agg(iris.sepalwidth.agg('your_func'))
# 複数列の場合
to_agg = agg([iris.sepalwidth, ], 'your_func', rtype='float')
iris.groupby('name').agg(to_agg.rename('val'))
Python UDF の制限により、LIST 型および DICT 型は、カスタム集約の入力または出力データ型として使用できません。
HyperLogLog を用いたカウント
hll_count は、HyperLogLog アルゴリズムを実装しており、列内の相違値の数を推定します。これは正確なカウントではなく、近似値を返します。大規模データセットにおいて正確なカウントが遅すぎる場合にご利用ください。たとえば、ユニークビジター(UV)数を高速に推定する際に有効です。
以下の例では pandas DataFrame を使用しています。ローカル環境で実行してください。DataWorks 上で実行する場合は、あらかじめサードパーティ製パッケージ経由で pandas をインポートする必要があります。
from odps.df import DataFrame
import pandas as pd
import numpy as np
df = DataFrame(pd.DataFrame({'a': np.random.randint(100000, size=100000)}))
df.a.hll_count()
出力: 63270
比較のために、正確な相違値カウントを実行した場合:
df.a.nunique()
出力: 63250
両者の結果は近似していますが、完全には一致しません。hll_count は、わずかな誤差を許容することで高速性を実現しています。
splitter パラメーターを用いることで、文字列値を分割してから相違要素をカウントできます。