このトピックでは、DataFrame でサポートされている集約操作について説明します。グループ化と集約の実装方法、およびカスタム集約の作成方法が含まれます。 DataFrame は、列に基づく HyperLogLog カウントに関する情報も提供します。
from odps.df import DataFrame
iris = DataFrame(o.get_table('pyodps_iris'))上記の DataFrame に対して、以下の一般的な集約操作を実行できます。
describe関数を呼び出して、DataFrame 内の数値列の数量、最大値、最小値、平均値、および標準偏差を表示します。print(iris.describe())次の結果が返されます。
type sepal_length sepal_width petal_length petal_width 0 count 150.000000 150.000000 150.000000 150.000000 1 mean 5.843333 3.054000 3.758667 1.198667 2 std 0.828066 0.433594 1.764420 0.763161 3 min 4.300000 2.000000 1.000000 0.100000 4 max 7.900000 4.400000 6.900000 2.500000単一の列で集約操作を実行します。
iris.sepallength.max()次の結果が返されます。
7.9個別のデータレコードのシーケンスに対して集約を実行するには、関連する集約関数を呼び出す前に
unique関数を呼び出します。iris.name.unique().cat(sep=',')次の結果が返されます。
u'Iris-setosa,Iris-versicolor,Iris-virginica'すべての列が同じ集約操作をサポートしている場合は、DataFrame 全体にこの集約操作を実行します。
iris.exclude('category').mean()次の結果が返されます。
sepal_length sepal_width petal_length petal_width 1 5.843333 3.054000 3.758667 1.198667count 関数を呼び出して、DataFrame 内の行の総数を計算します。
iris.count()次の結果が返されます。
150説明結果をログに表示する場合は、
print(iris.count().execute())コマンドを実行します。
次の表に、PyODPS がサポートする集約操作を示します。
集約 | 説明 |
count または size | 行数を計算します。 |
unique | 重複を除いた値の数を計算します。 |
min | 最小値を計算します。 |
max | 最大値を計算します。 |
sum | 指定された値の合計を計算します。 |
mean | 平均値を計算します。 |
median | 中央値を計算します。 |
quantile(p) | p 分位数を計算します。この関数は、整数が計算される場合にのみ正確な結果を返します。 |
var | 分散を計算します。 |
std | 標準偏差を計算します。 |
moment | n 次の中心モーメントまたは n 次モーメントを計算します。 |
skew | 標本歪度を計算します。この関数は、不偏推定結果を返します。 |
kurtosis | 標本尖度を計算します。この関数は、不偏推定結果を返します。 |
cat | 区切り文字を使用して文字列を連結します。 |
tolist | 列を集約してリストにします。 |
PyODPS DataFrame は、MaxCompute および pandas バックエンドの列に対する集約操作の null 値を無視します。これは pandas DataFrame とは異なりますが、SQL のロジックに似ています。
データのグループ化と集約
次の方法を使用して、データをグループ化および集約できます。
PyODPS DataFrame は、
groupby関数を提供してデータをグループ化します。データがグループ化された後、agg関数またはaggregate関数を呼び出してデータを集約します。結果の列には、グループ化された列と集約された列が含まれます。iris.groupby('name').agg(, smin=.min())次の結果が返されます。
name sepallength_max smin 0 Iris-setosa 5.8 4.3 1 Iris-versicolor 7.0 4.9 2 Iris-virginica 7.9 4.9PyODPS DataFrame は、
value_counts関数を提供します。指定された列に基づいてデータがグループ化された後、各グループ内の個別の値の数に基づいてグループを降順にソートできます。groupby関数を呼び出します。iris.groupby('name').agg(count=iris.name.count()).sort('count', ascending=False).head(5)次の結果が返されます。
name count 0 Iris-virginica 50 1 Iris-versicolor 50 2 Iris-setosa 50value_counts関数を呼び出します。iris['name'].value_counts().head(5)次の結果が返されます。
name count 0 Iris-virginica 50 1 Iris-versicolor 50 2 Iris-setosa 50
単一の集約列の列名を取得できます。ただし、この操作では、集約関数を集約列の値の管理にのみ使用できます。
iris.groupby('name').petallength.sum()次の結果が返されます。
petallength_sum 0 73.2 1 213.0 2 277.6iris.groupby('name').agg(iris.petallength.notnull().sum())次の結果が返されます。
name petallength_sum 0 Iris-setosa 50 1 Iris-versicolor 50 2 Iris-virginica 50定数値でデータをグループ化することもできます。この操作には、Scalar の初期化が必要です。
from odps.df import Scalar iris.groupby(Scalar(1)).petallength.sum()次の結果が返されます。
petallength_sum 0 563.8
カスタム集約を作成する
agg 関数または aggregate 関数を使用して、列に対してカスタム集約を呼び出します。カスタム集約では、クラスが次のメソッドを提供する必要があります。
buffer(): LIST や DICT などの変更可能なオブジェクトを返します。bufferサイズは、データ量とともに増加してはなりません。__call__(buffer, *val): 値をbufferに集約します。merge(buffer, pbuffer):pbufferをbufferに集約します。getvalue(buffer): 最終的な値を返します。
次のサンプルコードは、平均値の計算方法の例を示しています。
class Agg(object):
def buffer(self):
return [0.0, 0]
def __call__(self, buffer, val):
buffer[0] += val
buffer[1] += 1
def merge(self, buffer, pbuffer):
buffer[0] += pbuffer[0]
buffer[1] += pbuffer[1]
def getvalue(self, buffer):
if buffer[1] == 0:
return 0.0
return buffer[0] / buffer[1]iris.sepalwidth.agg(Agg)次の結果が返されます。
3.0540000000000007カスタム集約を作成する場合は、次の点に注意してください。
出力のデータ型が入力のデータ型と異なる場合は、出力のデータ型を指定する必要があります。
iris.sepalwidth.agg(Agg, 'float')カスタム集約を使用して、データをグループ化および集約できます。
iris.groupby('name').sepalwidth.agg(Agg)次の結果が返されます。
petallength_aggregation 0 3.418 1 2.770 2 2.974agg関数を使用して、複数の列に対してカスタム集約を呼び出すことができます。class Agg(object): def buffer(self): return [0.0, 0.0] def __call__(self, buffer, val1, val2): buffer[0] += val1 buffer[1] += val2 def merge(self, buffer, pbuffer): buffer[0] += pbuffer[0] buffer[1] += pbuffer[1] def getvalue(self, buffer): if buffer[1] == 0: return 0.0 return buffer[0] / buffer[1]from odps.df import agg to_agg = agg([iris.sepalwidth, ], Agg, rtype='float') # ユーザー定義集計関数 (UDAF) を呼び出して、2 つの列のデータを集計します。 iris.groupby('name').agg(val=to_agg)次の結果が返されます。
name val 0 Iris-setosa 0.682781 1 Iris-versicolor 0.466644 2 Iris-virginica 0.451427MaxCompute 内の既存の UDAF を呼び出すには、UDAF の名前を指定するだけで済みます。
iris.groupby('name').agg(iris.sepalwidth.agg('your_func')) # 単一の列の値を集計します。 to_agg = agg([iris.sepalwidth, ], 'your_func', rtype='float') iris.groupby('name').agg(to_agg.rename('val')) # 複数の列の値を集計します。説明Python ユーザー定義関数 (UDF) の制限により、カスタム集約の入力または出力データ型として LIST 型または DICT 型を指定することはできません。
HyperLogLog カウント
PyODPS DataFrame は、HyperLogLog 操作である hll_count 操作を提供します。この操作を呼び出して、列内の個別の値の数をカウントできます。この操作は、推定値を返します。大量のデータが計算される場合、この操作を呼び出して個別の値の数を推定できます。
たとえば、この操作を呼び出してユニークビジター (UV) の数を計算し、短時間で推定値を取得できます。
次の例では、Pandas パッケージを使用しています。この例のコードは、オンプレミス環境で実行できます。 DataWorks 環境でコードを実行する場合は、サードパーティパッケージを使用して Pandas パッケージをインポートする必要があります。
from odps.df import DataFrame
import pandas as pd
import numpy as np
df = DataFrame(pd.DataFrame({'a': np.random.randint(100000, size=100000)}))
df.a.hll_count()次の結果が返されます。
63270df.a.nunique()次の結果が返されます。
63250splitter パラメーターは、列を分割して個別の値の数を計算するために使用されます。