Topik ini menjelaskan API MapReduce untuk membantu Anda memahami cara menggunakannya dalam memproses dan menganalisis kumpulan data besar secara efisien.
PyODPS DataFrame mendukung API MapReduce. Anda dapat menulis fungsi map dan reduce secara terpisah, karena proses map_reduce hanya dapat berisi mappers atau reducers.
Contoh berikut menunjukkan cara menjalankan program WordCount:
>>> #encoding=utf-8
>>> from odps import ODPS
>>> from odps import options
>>> options.verbose = True
>>> o = ODPS('your-access-id', 'your-secret-access-key',project='DMP_UC_dev', endpoint='http://service-corp.odps.aliyun-inc.com/api')
>>> from odps.df import DataFrame
>>> def mapper(row):
>>> for word in row[0].split():
>>> yield word.lower(), 1
>>>
>>> def reducer(keys):
>>> # Sebuah list digunakan daripada cnt=0. Jika Anda menggunakan cnt=0, cnt di fungsi h dianggap variabel lokal, yang nilainya tidak termasuk dalam output.
>>> cnt = [0]
>>> def h(row, done): # done menunjukkan bahwa baris dengan key ini telah diiterasi.
>>> cnt[0] += row[1]
>>> if done:
>>> yield keys[0], cnt[0]
>>> return h
>>> # Tabel zx_word_count hanya memiliki satu kolom, yang bertipe STRING.
>>> word_count = DataFrame(o.get_table('zx_word_count'))
>>> table = word_count.map_reduce(mapper, reducer, group=['word', ],
mapper_output_names=['word', 'cnt'],
mapper_output_types=['string', 'int'],
reducer_output_names=['word', 'cnt'],
reducer_output_types=['string', 'int'])
word cnt
0 are 1
1 day 1
2 doing? 1
3 everybody 1
4 first 1
5 hello 2
6 how 1
7 is 1
8 so 1
9 the 1
10 this 1
11 world 1
12 you 1Gunakan parameter group untuk menentukan bidang yang ingin digunakan untuk mengelompokkan data dengan fungsi reduce. Jika tidak ditentukan, data akan dikelompokkan berdasarkan semua bidang. Reducers menerima dan menginisialisasi keys yang teragregasi, serta memproses baris yang diagregasi berdasarkan keys. done menandakan bahwa semua baris terkait dengan keys tersebut telah diiterasi.
Untuk kemudahan pemahaman, fungsi ditulis sebagai closure dalam contoh ini. Anda juga dapat menulis fungsi sebagai kelas callable.
class reducer(object):
def __init__(self, keys):
self.cnt = 0
def __call__(self, row, done): # done menunjukkan bahwa baris dengan key ini telah diiterasi.
self.cnt += row.cnt
if done:
yield row.word, self.cntKode menjadi lebih sederhana jika Anda menggunakan output untuk komentar.
>>> from odps.df import output
>>>
>>> @output(['word', 'cnt'], ['string', 'int'])
>>> def mapper(row):
>>> for word in row[0].split():
>>> yield word.lower(), 1
>>>
>>> @output(['word', 'cnt'], ['string', 'int'])
>>> def reducer(keys):
>>> # Sebuah list digunakan daripada cnt=0. Jika Anda menggunakan cnt=0, cnt di fungsi h dianggap variabel lokal, yang nilainya tidak termasuk dalam output.
>>> cnt = [0]
>>> def h(row, done): # done menunjukkan bahwa baris dengan key ini telah diiterasi.
>>> cnt[0] += row.cnt
>>> if done:
>>> yield keys.word, cnt[0]
>>> return h
>>>
>>> word_count = DataFrame(o.get_table('zx_word_count'))
>>> table = word_count.map_reduce(mapper, reducer, group='word')
word cnt
0 are 1
1 day 1
2 doing? 1
3 everybody 1
4 first 1
5 hello 2
6 how 1
7 is 1
8 so 1
9 the 1
10 this 1
11 world 1
12 you 1Selama iterasi, Anda dapat menggunakan parameter sort untuk mengurutkan data berdasarkan kolom dan parameter ascending untuk menentukan urutan pengurutan. Parameter ascending bisa berupa nilai BOOL, yang menunjukkan bahwa semua bidang yang ditentukan oleh parameter sort disusun dalam urutan yang sama. Parameter ascending juga bisa berupa daftar. Jumlah string dalam daftar harus sama dengan jumlah bidang yang ditentukan oleh parameter sort.
Tentukan combiner
Dalam API MapReduce, sebuah combiner digunakan untuk mengagregasi data dalam mapper. Combiner digunakan dengan cara yang sama seperti reducer, tetapi combiner tidak dapat merujuk ke sumber daya. Nama dan tipe data dari bidang yang dihasilkan oleh combiner harus sama dengan yang dihasilkan oleh mapper yang sesuai dengan combiner tersebut.
Dalam contoh ini, Anda dapat menggunakan reducer sebagai combiner untuk mengagregasi data dalam mapper guna mengurangi data yang di-shuffle.
>>> words_df.map_reduce(mapper, reducer, combiner=reducer, group='word')Rujuk sumber daya
Dalam API MapReduce, Anda dapat secara terpisah menentukan sumber daya yang dirujuk oleh mappers dan reducers.
Dalam contoh berikut, penyaringan kata yang sudah usang dilakukan dalam mapper, dan jumlah kata dalam daftar putih dalam reducer ditambah 5.
>>> white_list_file = o.create_resource('pyodps_white_list_words', 'file', file_obj='Python\nWorld')
>>>
>>> @output(['word', 'cnt'], ['string', 'int'])
>>> def mapper(resources):
>>> stop_words = set(r[0].strip() for r in resources[0])
>>> def h(row):
>>> for word in row[0].split():
>>> if word not in stop_words:
>>> yield word, 1
>>> return h
>>>
>>> @output(['word', 'cnt'], ['string', 'int'])
>>> def reducer(resources):
>>> d = dict()
>>> d['white_list'] = set(word.strip() for word in resources[0])
>>> d['cnt'] = 0
>>> def inner(keys):
>>> d['cnt'] = 0
>>> def h(row, done):
>>> d['cnt'] += row.cnt
>>> if done:
>>> if row.word in d['white_list']:
>>> d['cnt'] += 5
>>> yield keys.word, d['cnt']
>>> return h
>>> return inner
>>>
>>> words_df.map_reduce(mapper, reducer, group='word',
>>> mapper_resources=[stop_words], reducer_resources=[white_list_file])
word cnt
0 hello 2
1 life 1
2 python 7
3 world 6
4 short 1
5 use 1Gunakan pustaka Python pihak ketiga
Penggunaannya mirip dengan pustaka Python pihak ketiga dalam tahap map.
Tentukan pustaka secara global:
>>> from odps import options >>> options.df.libraries = ['six.whl', 'python_dateutil.whl']Jika Anda menggunakan metode yang dipanggil langsung, tentukan pustaka secara lokal:
>>> df.map_reduce(mapper=my_mapper, reducer=my_reducer, group='key').execute(libraries=['six.whl', 'python_dateutil.whl'])CatatanKarena perbedaan dalam definisi bytecode, jika Anda menulis kode berdasarkan fitur baru yang didukung oleh Python 3, seperti
yield from, kesalahan akan dilaporkan saat kode dieksekusi pada pekerja MaxCompute Python 2.7. Sebelum melaksanakan operasi produksi menggunakan API MapReduce Python 3, pastikan kode berjalan normal.
Reshuffle data
Jika data didistribusikan secara tidak merata dalam kluster, Anda dapat memanggil metode reshuffle untuk reshuffle data.
>>> df1 = df.reshuffle()Secara default, data di-hash sebagai angka acak. Anda juga dapat mendistribusikan data berdasarkan kolom dan mengurutkan data yang direshuffle dalam urutan tertentu.
>>> df1.reshuffle('name', sort='id', ascending=False)Filter Bloom
PyODPS DataFrame menyediakan antarmuka bloom_filter untuk menghitung data dengan filter Bloom.
Filter Bloom dapat dengan cepat menyaring data yang ada di sequence1 tetapi tidak ada di sequence2, meskipun data tersebut mungkin tidak sepenuhnya tersaring. Metode penyaringan perkiraan ini sangat berguna dalam skenario join di mana volume data sangat berbeda. Misalnya, ketika Anda join data penelusuran pengguna dengan data transaksi, volume data penelusuran jauh lebih besar daripada data transaksi. Dalam hal ini, Anda dapat terlebih dahulu menerapkan filter Bloom menggunakan data transaksi untuk pra-menyaring data penelusuran dan kemudian melakukan operasi join. Ini sangat meningkatkan performa.
>>> df1 = DataFrame(pd.DataFrame({'a': ['name1', 'name2', 'name3', 'name1'], 'b': [1, 2, 3, 4]}))
>>> df1
a b
0 name1 1
1 name2 2
2 name3 3
3 name1 4
>>> df2 = DataFrame(pd.DataFrame({'a': ['name1']}))
>>> df2
a
0 name1
>>> df1.bloom_filter('a', df2.a) # Baris 0 dapat berupa ekspresi komputasi, misalnya, df1.a + '1'.
a b
0 name1 1
1 name1 4Deskripsi:
Sejumlah kecil data diproses. Oleh karena itu, baris yang berisi
name2danname3di kolomadaridf1disaring. Namun, jika sistem memproses sejumlah besar data, sistem mungkin tidak menyaring semua data yang memenuhi kondisi yang ditentukan.Dalam operasi
JOINsebelumnya, data yang tidak disaring tidak mempengaruhi akurasi data. Namun, penyaringan data sangat meningkatkan performaJOIN.Anda dapat menentukan parameter
capacitydanerror_rateuntuk mengonfigurasi volume data dan tingkat kesalahan. Nilai default adalah3000dan0.01.
Jika Anda meningkatkan nilai parameter capacity atau menurunkan nilai parameter error_rate, penggunaan memori meningkat. Oleh karena itu, atur parameter ke nilai yang sesuai berdasarkan kebutuhan Anda.
Untuk informasi lebih lanjut tentang objek koleksi, lihat Eksekusi DataFrame.
Tabel pivot
PyODPS DataFrame menyediakan fitur tabel pivot. Kode berikut menunjukkan data tabel sampel:
>>> df
A B C D E
0 foo one small 1 3
1 foo one large 2 4
2 foo one large 2 5
3 foo two small 3 6
4 foo two small 3 4
5 bar one large 4 5
6 bar one small 5 3
7 bar two small 6 2
8 bar two large 7 1Jika Anda menggunakan fitur tabel pivot, parameter
rowsdiperlukan untuk mendapatkan nilai rata-rata berdasarkan satu atau lebih bidang.>>> df['A', 'D', 'E'].pivot_table(rows='A') A D_mean E_mean 0 bar 5.5 2.75 1 foo 2.2 4.40Anda dapat menentukan beberapa bidang dalam
rowsuntuk mengagregasi data berdasarkan bidang tersebut.>>> df.pivot_table(rows=['A', 'B', 'C']) A B C D_mean E_mean 0 bar one large 4.0 5.0 1 bar one small 5.0 3.0 2 bar two large 7.0 1.0 3 bar two small 6.0 2.0 4 foo one large 2.0 4.5 5 foo one small 1.0 3.0 6 foo two small 3.0 5.0Anda dapat menggunakan parameter
valuesuntuk menentukan kolom yang ingin Anda hitung.>>> df.pivot_table(rows=['A', 'B'], values='D') A B D_mean 0 bar one 4.500000 1 bar two 6.500000 2 foo one 1.666667 3 foo two 3.000000Secara default, nilai rata-rata dihitung. Anda dapat menggunakan parameter
aggfuncuntuk menentukan satu atau lebih fungsi agregat.>>> df.pivot_table(rows=['A', 'B'], values=['D'], aggfunc=['mean', 'count', 'sum']) A B D_mean D_count D_sum 0 bar one 4.500000 2 9 1 bar two 6.500000 2 13 2 foo one 1.666667 3 5 3 foo two 3.000000 2 6Anda dapat menggunakan nilai kolom asli sebagai nilai kolom dalam objek koleksi baru.
>>> df.pivot_table(rows=['A', 'B'], values='D', columns='C') A B large_D_mean small_D_mean 0 bar one 4.0 5.0 1 bar two 7.0 6.0 2 foo one 2.0 1.0 3 foo two NaN 3.0Anda dapat menggunakan
fill_valueuntuk mengganti nilai kosong.>>> df.pivot_table(rows=['A', 'B'], values='D', columns='C', fill_value=0) A B large_D_mean small_D_mean 0 bar one 4 5 1 bar two 7 6 2 foo one 2 1 3 foo two 0 3
Konversi string pasangan kunci-nilai
DataFrame dapat mengekstrak pasangan kunci-nilai ke dalam kolom, dan mengonversi kolom standar menjadi pasangan kunci-nilai. Untuk informasi lebih lanjut tentang cara membuat dan mengelola objek DataFrame, lihat Buat objek DataFrame.
Ekstrak pasangan kunci-nilai ke dalam kolom. Contoh:
>>> df name kv 0 name1 k1=1,k2=3,k5=10 1 name1 k1=7.1,k7=8.2 2 name2 k2=1.2,k3=1.5 3 name2 k9=1.1,k2=1Gunakan metode
extract_kvuntuk mengekstrak bidang pasangan kunci-nilai:>>> df.extract_kv(columns=['kv'], kv_delim='=', item_delim=',') name kv_k1 kv_k2 kv_k3 kv_k5 kv_k7 kv_k9 0 name1 1.0 3.0 NaN 10.0 NaN NaN 1 name1 7.0 NaN NaN NaN 8.2 NaN 2 name2 NaN 1.2 1.5 NaN NaN NaN 3 name2 NaN 1.0 NaN NaN NaN 1.1Parameter
columnsmenentukan bidang yang ingin Anda ekstrak. Parameterkv_delimmenentukan pemisah antara kunci dan nilai. Parameteritem_delimmenentukan pemisah antara pasangan kunci-nilai. Jika tidak ditentukan, kunci dan nilai dipisahkan dengan titik dua (:), dan pasangan kunci-nilai dipisahkan dengan koma (,). Nama bidang keluaran adalah kombinasi dari nama bidang asli dan nilai kunci. Nama dan nilai kunci dihubungkan menggunakan garis bawah (_). Nilai default untuk kolom yang hilang adalah NONE. Anda dapat menggunakanfill_valueuntuk mengisi nilai pada kolom yang hilang.>>> df.extract_kv(columns=['kv'], kv_delim='=', fill_value=0) name kv_k1 kv_k2 kv_k3 kv_k5 kv_k7 kv_k9 0 name1 1.0 3.0 0.0 10.0 0.0 0.0 1 name1 7.0 0.0 0.0 0.0 8.2 0.0 2 name2 0.0 1.2 1.5 0.0 0.0 0.0 3 name2 0.0 1.0 0.0 0.0 0.0 1.1Konversi beberapa kolom menjadi pasangan kunci-nilai. Contoh:
>>> df name k1 k2 k3 k5 k7 k9 0 name1 1.0 3.0 NaN 10.0 NaN NaN 1 name1 7.0 NaN NaN NaN 8.2 NaN 2 name2 NaN 1.2 1.5 NaN NaN NaN 3 name2 NaN 1.0 NaN NaN NaN 1.1Gunakan metode
to_kvuntuk mengonversi data ke format pasangan kunci-nilai:>>> df.to_kv(columns=['k1', 'k2', 'k3', 'k5', 'k7', 'k9'], kv_delim='=') name kv 0 name1 k1=1,k2=3,k5=10 1 name1 k1=7.1,k7=8.2 2 name2 k2=1.2,k3=1.5 3 name2 k9=1.1,k2=1