PyODPS DataFrame mendukung berbagai operasi agregasi pada set data besar, termasuk fungsi agregat bawaan, agregasi berbasis kelompok, agregasi kustom (fungsi agregat yang didefinisikan pengguna atau UDAF), serta estimasi jumlah nilai unik berbasis algoritma HyperLogLog.
Semua contoh menggunakan tabel pyodps_iris sebagai sumber data:
from odps.df import DataFrame
iris = DataFrame(o.get_table('pyodps_iris'))
Fungsi agregasi bawaan
Fungsi agregasi berikut tersedia untuk kolom DataFrame.
| Function | Description |
|---|---|
count atau size |
Menghitung jumlah baris |
unique |
Menghitung jumlah nilai unik |
min |
Mengembalikan nilai minimum |
max |
Mengembalikan nilai maksimum |
sum |
Mengembalikan jumlah total |
mean |
Mengembalikan nilai rata-rata |
median |
Mengembalikan nilai median |
quantile(p) |
Mengembalikan kuantil ke-p; hanya menghasilkan hasil akurat untuk bilangan bulat |
var |
Mengembalikan varians |
std |
Mengembalikan deviasi standar |
moment |
Mengembalikan momen pusat ke-N atau momen ke-N |
skew |
Mengembalikan kemencengan sampel (estimasi tidak bias) |
kurtosis |
Mengembalikan keruncingan sampel (estimasi tidak bias) |
cat |
Menggabungkan string dengan pemisah |
tolist |
Mengagregasi sebuah kolom menjadi daftar |
PyODPS DataFrame mengabaikan nilai null dalam operasi agregasi baik di backend MaxCompute maupun pandas. Perilaku ini berbeda dari pandas DataFrame, tetapi sesuai dengan semantik SQL.
Contoh
Jelaskan semua kolom numerik — panggil describe() untuk mendapatkan count, max, min, mean, dan deviasi standar sekaligus:
print(iris.describe())
Output:
type sepal_length sepal_width petal_length petal_width
0 count 150.000000 150.000000 150.000000 150.000000
1 mean 5.843333 3.054000 3.758667 1.198667
2 std 0.828066 0.433594 1.764420 0.763161
3 min 4.300000 2.000000 1.000000 0.100000
4 max 7.900000 4.400000 6.900000 2.500000
Agregasi satu kolom:
iris.sepallength.max()
Output: 7.9
Agregasi atas nilai unik — panggil unique() sebelum fungsi agregasi:
iris.name.unique().cat(sep=',')
Output: u'Iris-setosa,Iris-versicolor,Iris-virginica'
Agregasi semua kolom — jika semua kolom mendukung operasi yang sama, terapkan ke seluruh DataFrame:
iris.exclude('category').mean()
Output:
sepal_length sepal_width petal_length petal_width
1 5.843333 3.054000 3.758667 1.198667
Hitung semua baris:
iris.count()
Output: 150
Untuk menampilkan hasil dalam log, jalankan print(iris.count().execute()).
Mengelompokkan dan mengagregasi data
Groupby pada PyODPS DataFrame mengikuti model split-apply-combine:
-
Split:
groupby()membagi data menjadi kelompok berdasarkan satu atau beberapa kolom. -
Apply:
agg()atauaggregate()menerapkan fungsi agregasi ke setiap kelompok secara independen. -
Combine: hasilnya digabungkan menjadi satu DataFrame.
Hasilnya mencakup kolom yang dikelompokkan dan kolom yang diagregasi.
Agregasi bernama
Berikan argumen kata kunci ke agg() untuk mengontrol nama kolom output secara langsung. Pendekatan ini disebut agregasi bernama dan digunakan ketika Anda ingin menentukan eksplisit nama kolom hasil agregasi:
iris.groupby('name').agg(iris.sepallength.max(), smin=iris.sepallength.min())
Output:
name sepallength_max smin
0 Iris-setosa 5.8 4.3
1 Iris-versicolor 7.0 4.9
2 Iris-virginica 7.9 4.9
Pada contoh ini, smin=iris.sepallength.min() mengganti nama kolom agregasi menjadi smin.
Hitung jumlah nilai unik per kelompok
Tersedia dua pendekatan ekuivalen. Gunakan value_counts() untuk penulisan yang lebih ringkas:
# Menggunakan groupby + agg
iris.groupby('name').agg(count=iris.name.count()).sort('count', ascending=False).head(5)
# Menggunakan value_counts (ekuivalen, lebih ringkas)
iris['name'].value_counts().head(5)
Keduanya menghasilkan output yang sama:
name count
0 Iris-virginica 50
1 Iris-versicolor 50
2 Iris-setosa 50
Agregasi satu kolom dari suatu kelompok
Akses kolom berdasarkan namanya setelah groupby() untuk hanya mengambil kolom agregasi tersebut:
iris.groupby('name').petallength.sum()
Output:
petallength_sum
0 73.2
1 213.0
2 277.6
Saat menggunakan sintaks ini, Anda hanya dapat menggunakan fungsi agregasi pada kolom tersebut. Untuk menerapkan pemeriksaan non-null atau ekspresi lainnya, gunakan agg() sebagai gantinya:
iris.groupby('name').agg(iris.petallength.notnull().sum())
Output:
name petallength_sum
0 Iris-setosa 50
1 Iris-versicolor 50
2 Iris-virginica 50
Kelompokkan berdasarkan nilai konstan
Untuk mengagregasi semua baris bersama tanpa kolom pengelompokan alami, kelompokkan berdasarkan nilai konstan menggunakan Scalar:
from odps.df import Scalar
iris.groupby(Scalar(1)).petallength.sum()
Output:
petallength_sum
0 563.8
Menulis agregasi kustom
Gunakan agg() atau aggregate() untuk menerapkan fungsi agregat yang didefinisikan pengguna (UDAF) ke suatu kolom. Kelas agregasi kustom harus mengimplementasikan empat metode:
| Method | Description |
|---|---|
buffer() |
Mengembalikan objek yang dapat dimodifikasi (list atau dict) yang mengakumulasi hasil parsial. Ukuran buffer tidak boleh bertambah seiring volume data. |
__call__(buffer, *val) |
Menambahkan nilai ke buffer. |
merge(buffer, pbuffer) |
Menggabungkan buffer parsial (pbuffer) ke buffer utama. |
getvalue(buffer) |
Mengembalikan nilai agregasi akhir. |
Contoh: mean kustom
class Agg(object):
def buffer(self):
return [0.0, 0]
def __call__(self, buffer, val):
buffer[0] += val
buffer[1] += 1
def merge(self, buffer, pbuffer):
buffer[0] += pbuffer[0]
buffer[1] += pbuffer[1]
def getvalue(self, buffer):
if buffer[1] == 0:
return 0.0
return buffer[0] / buffer[1]iris.sepalwidth.agg(Agg)
Output: 3.0540000000000007
Catatan penggunaan
Tentukan tipe data output saat berbeda dari input:
iris.sepalwidth.agg(Agg, 'float')
Gabungkan dengan groupby:
iris.groupby('name').sepalwidth.agg(Agg)
Output:
petallength_aggregation
0 3.418
1 2.770
2 2.974
Agregasi beberapa kolom — gunakan agg() dari odps.df dan berikan daftar kolom:
class Agg(object):
def buffer(self):
return [0.0, 0.0]
def __call__(self, buffer, val1, val2):
buffer[0] += val1
buffer[1] += val2
def merge(self, buffer, pbuffer):
buffer[0] += pbuffer[0]
buffer[1] += pbuffer[1]
def getvalue(self, buffer):
if buffer[1] == 0:
return 0.0
return buffer[0] / buffer[1]from odps.df import agg
to_agg = agg([iris.sepalwidth, ], Agg, rtype='float') # Panggil fungsi agregat yang didefinisikan pengguna (UDAF) untuk mengagregasi data dalam dua kolom.
iris.groupby('name').agg(val=to_agg)
Output:
name val
0 Iris-setosa 0.682781
1 Iris-versicolor 0.466644
2 Iris-virginica 0.451427
Panggil UDAF MaxCompute yang sudah ada berdasarkan nama — berikan nama UDAF sebagai string, bukan sebagai kelas:
# Satu kolom
iris.groupby('name').agg(iris.sepalwidth.agg('your_func'))
# Beberapa kolom
to_agg = agg([iris.sepalwidth, ], 'your_func', rtype='float')
iris.groupby('name').agg(to_agg.rename('val'))
Karena keterbatasan Python UDF, tipe LIST dan DICT tidak dapat digunakan sebagai tipe data input atau output untuk agregasi kustom.
Perhitungan HyperLogLog
hll_count mengimplementasikan algoritma HyperLogLog untuk memperkirakan jumlah nilai unik dalam suatu kolom. Fungsi ini mengembalikan perkiraan jumlah, bukan jumlah pasti. Gunakan fungsi ini ketika perhitungan pasti terlalu lambat pada set data besar—misalnya, untuk memperkirakan jumlah pengunjung unik (UV) secara cepat.
Contoh di bawah ini menggunakan pandas DataFrame. Jalankan di lingkungan lokal. Jika dijalankan di DataWorks, impor pandas melalui paket pihak ketiga terlebih dahulu.
from odps.df import DataFrame
import pandas as pd
import numpy as np
df = DataFrame(pd.DataFrame({'a': np.random.randint(100000, size=100000)}))
df.a.hll_count()
Output: 63270
Sebagai perbandingan, jumlah nilai unik yang tepat:
df.a.nunique()
Output: 63250
Kedua hasil tersebut mirip tetapi tidak identik—hll_count menukar sedikit margin kesalahan demi kecepatan.
Gunakan parameter splitter untuk membagi nilai string sebelum menghitung elemen unik.