全部产品
Search
文档中心

MaxCompute:API MapReduce

更新时间:Jul 02, 2025

Topik ini menjelaskan API MapReduce untuk membantu Anda memahami cara menggunakannya dalam memproses dan menganalisis kumpulan data besar secara efisien.

PyODPS DataFrame mendukung API MapReduce. Anda dapat menulis fungsi map dan reduce secara terpisah, karena proses map_reduce hanya dapat berisi mappers atau reducers.

Contoh berikut menunjukkan cara menjalankan program WordCount:

>>> #encoding=utf-8
>>> from odps import ODPS
>>> from odps import options
>>> options.verbose = True
>>> o = ODPS('your-access-id', 'your-secret-access-key',project='DMP_UC_dev', endpoint='http://service-corp.odps.aliyun-inc.com/api')
>>> from odps.df import DataFrame

>>> def mapper(row):
>>>     for word in row[0].split():
>>>         yield word.lower(), 1
>>>
>>> def reducer(keys):
>>>     # Sebuah list digunakan daripada cnt=0. Jika Anda menggunakan cnt=0, cnt di fungsi h dianggap variabel lokal, yang nilainya tidak termasuk dalam output.
>>>     cnt = [0]
>>>     def h(row, done):  # done menunjukkan bahwa baris dengan key ini telah diiterasi.
>>>         cnt[0] += row[1]
>>>         if done:
>>>             yield keys[0], cnt[0]
>>>     return h
>>> # Tabel zx_word_count hanya memiliki satu kolom, yang bertipe STRING.
>>> word_count = DataFrame(o.get_table('zx_word_count'))
>>> table = word_count.map_reduce(mapper, reducer, group=['word', ],
                        mapper_output_names=['word', 'cnt'],
                        mapper_output_types=['string', 'int'],
                        reducer_output_names=['word', 'cnt'],
                        reducer_output_types=['string', 'int'])
         word  cnt
0         are    1
1         day    1
2      doing?    1
3   everybody    1
4       first    1
5       hello    2
6         how    1
7          is    1
8          so    1
9         the    1
10       this    1
11      world    1
12        you    1

Gunakan parameter group untuk menentukan bidang yang ingin digunakan untuk mengelompokkan data dengan fungsi reduce. Jika tidak ditentukan, data akan dikelompokkan berdasarkan semua bidang. Reducers menerima dan menginisialisasi keys yang teragregasi, serta memproses baris yang diagregasi berdasarkan keys. done menandakan bahwa semua baris terkait dengan keys tersebut telah diiterasi.

Untuk kemudahan pemahaman, fungsi ditulis sebagai closure dalam contoh ini. Anda juga dapat menulis fungsi sebagai kelas callable.

class reducer(object):
    def __init__(self, keys):
        self.cnt = 0

    def __call__(self, row, done):  # done menunjukkan bahwa baris dengan key ini telah diiterasi.
        self.cnt += row.cnt
        if done:
            yield row.word, self.cnt

Kode menjadi lebih sederhana jika Anda menggunakan output untuk komentar.

>>> from odps.df import output
>>>
>>> @output(['word', 'cnt'], ['string', 'int'])
>>> def mapper(row):
>>>     for word in row[0].split():
>>>         yield word.lower(), 1
>>>
>>> @output(['word', 'cnt'], ['string', 'int'])
>>> def reducer(keys):
>>>     # Sebuah list digunakan daripada cnt=0. Jika Anda menggunakan cnt=0, cnt di fungsi h dianggap variabel lokal, yang nilainya tidak termasuk dalam output.
>>>     cnt = [0]
>>>     def h(row, done):  # done menunjukkan bahwa baris dengan key ini telah diiterasi.
>>>         cnt[0] += row.cnt
>>>         if done:
>>>             yield keys.word, cnt[0]
>>>     return h
>>>
>>> word_count = DataFrame(o.get_table('zx_word_count'))
>>> table = word_count.map_reduce(mapper, reducer, group='word')
         word  cnt
0         are    1
1         day    1
2      doing?    1
3   everybody    1
4       first    1
5       hello    2
6         how    1
7          is    1
8          so    1
9         the    1
10       this    1
11      world    1
12        you    1

Selama iterasi, Anda dapat menggunakan parameter sort untuk mengurutkan data berdasarkan kolom dan parameter ascending untuk menentukan urutan pengurutan. Parameter ascending bisa berupa nilai BOOL, yang menunjukkan bahwa semua bidang yang ditentukan oleh parameter sort disusun dalam urutan yang sama. Parameter ascending juga bisa berupa daftar. Jumlah string dalam daftar harus sama dengan jumlah bidang yang ditentukan oleh parameter sort.

Tentukan combiner

Dalam API MapReduce, sebuah combiner digunakan untuk mengagregasi data dalam mapper. Combiner digunakan dengan cara yang sama seperti reducer, tetapi combiner tidak dapat merujuk ke sumber daya. Nama dan tipe data dari bidang yang dihasilkan oleh combiner harus sama dengan yang dihasilkan oleh mapper yang sesuai dengan combiner tersebut.

Dalam contoh ini, Anda dapat menggunakan reducer sebagai combiner untuk mengagregasi data dalam mapper guna mengurangi data yang di-shuffle.

>>> words_df.map_reduce(mapper, reducer, combiner=reducer, group='word')

Rujuk sumber daya

Dalam API MapReduce, Anda dapat secara terpisah menentukan sumber daya yang dirujuk oleh mappers dan reducers.

Dalam contoh berikut, penyaringan kata yang sudah usang dilakukan dalam mapper, dan jumlah kata dalam daftar putih dalam reducer ditambah 5.

>>> white_list_file = o.create_resource('pyodps_white_list_words', 'file', file_obj='Python\nWorld')
>>>
>>> @output(['word', 'cnt'], ['string', 'int'])
>>> def mapper(resources):
>>>     stop_words = set(r[0].strip() for r in resources[0])
>>>     def h(row):
>>>         for word in row[0].split():
>>>             if word not in stop_words:
>>>                 yield word, 1
>>>     return h
>>>
>>> @output(['word', 'cnt'], ['string', 'int'])
>>> def reducer(resources):
>>>     d = dict()
>>>     d['white_list'] = set(word.strip() for word in resources[0])
>>>     d['cnt'] = 0
>>>     def inner(keys):
>>>         d['cnt'] = 0
>>>         def h(row, done):
>>>             d['cnt'] += row.cnt
>>>             if done:
>>>                 if row.word in d['white_list']:
>>>                     d['cnt'] += 5
>>>                 yield keys.word, d['cnt']
>>>         return h
>>>     return inner
>>>
>>> words_df.map_reduce(mapper, reducer, group='word',
>>>                     mapper_resources=[stop_words], reducer_resources=[white_list_file])
     word  cnt
0   hello    2
1    life    1
2  python    7
3   world    6
4   short    1
5     use    1

Gunakan pustaka Python pihak ketiga

Penggunaannya mirip dengan pustaka Python pihak ketiga dalam tahap map.

  • Tentukan pustaka secara global:

    >>> from odps import options
    >>> options.df.libraries = ['six.whl', 'python_dateutil.whl']
  • Jika Anda menggunakan metode yang dipanggil langsung, tentukan pustaka secara lokal:

    >>> df.map_reduce(mapper=my_mapper, reducer=my_reducer, group='key').execute(libraries=['six.whl', 'python_dateutil.whl'])
    Catatan

    Karena perbedaan dalam definisi bytecode, jika Anda menulis kode berdasarkan fitur baru yang didukung oleh Python 3, seperti yield from, kesalahan akan dilaporkan saat kode dieksekusi pada pekerja MaxCompute Python 2.7. Sebelum melaksanakan operasi produksi menggunakan API MapReduce Python 3, pastikan kode berjalan normal.

Reshuffle data

Jika data didistribusikan secara tidak merata dalam kluster, Anda dapat memanggil metode reshuffle untuk reshuffle data.

>>> df1 = df.reshuffle()

Secara default, data di-hash sebagai angka acak. Anda juga dapat mendistribusikan data berdasarkan kolom dan mengurutkan data yang direshuffle dalam urutan tertentu.

>>> df1.reshuffle('name', sort='id', ascending=False)

Filter Bloom

PyODPS DataFrame menyediakan antarmuka bloom_filter untuk menghitung data dengan filter Bloom.

Filter Bloom dapat dengan cepat menyaring data yang ada di sequence1 tetapi tidak ada di sequence2, meskipun data tersebut mungkin tidak sepenuhnya tersaring. Metode penyaringan perkiraan ini sangat berguna dalam skenario join di mana volume data sangat berbeda. Misalnya, ketika Anda join data penelusuran pengguna dengan data transaksi, volume data penelusuran jauh lebih besar daripada data transaksi. Dalam hal ini, Anda dapat terlebih dahulu menerapkan filter Bloom menggunakan data transaksi untuk pra-menyaring data penelusuran dan kemudian melakukan operasi join. Ini sangat meningkatkan performa.

>>> df1 = DataFrame(pd.DataFrame({'a': ['name1', 'name2', 'name3', 'name1'], 'b': [1, 2, 3, 4]}))
>>> df1
       a  b
0  name1  1
1  name2  2
2  name3  3
3  name1  4
>>> df2 = DataFrame(pd.DataFrame({'a': ['name1']}))
>>> df2
       a
0  name1
>>> df1.bloom_filter('a', df2.a) # Baris 0 dapat berupa ekspresi komputasi, misalnya, df1.a + '1'.
       a  b
0  name1  1
1  name1  4

Deskripsi:

  • Sejumlah kecil data diproses. Oleh karena itu, baris yang berisi name2 dan name3 di kolom a dari df1 disaring. Namun, jika sistem memproses sejumlah besar data, sistem mungkin tidak menyaring semua data yang memenuhi kondisi yang ditentukan.

  • Dalam operasi JOIN sebelumnya, data yang tidak disaring tidak mempengaruhi akurasi data. Namun, penyaringan data sangat meningkatkan performa JOIN.

  • Anda dapat menentukan parameter capacity dan error_rate untuk mengonfigurasi volume data dan tingkat kesalahan. Nilai default adalah 3000 dan 0.01.

Catatan

Jika Anda meningkatkan nilai parameter capacity atau menurunkan nilai parameter error_rate, penggunaan memori meningkat. Oleh karena itu, atur parameter ke nilai yang sesuai berdasarkan kebutuhan Anda.

Untuk informasi lebih lanjut tentang objek koleksi, lihat Eksekusi DataFrame.

Tabel pivot

PyODPS DataFrame menyediakan fitur tabel pivot. Kode berikut menunjukkan data tabel sampel:

>>> df
     A    B      C  D  E
0  foo  one  small  1  3
1  foo  one  large  2  4
2  foo  one  large  2  5
3  foo  two  small  3  6
4  foo  two  small  3  4
5  bar  one  large  4  5
6  bar  one  small  5  3
7  bar  two  small  6  2
8  bar  two  large  7  1
  • Jika Anda menggunakan fitur tabel pivot, parameter rows diperlukan untuk mendapatkan nilai rata-rata berdasarkan satu atau lebih bidang.

    >>> df['A', 'D', 'E'].pivot_table(rows='A')
         A  D_mean  E_mean
    0  bar     5.5    2.75
    1  foo     2.2    4.40
  • Anda dapat menentukan beberapa bidang dalam rows untuk mengagregasi data berdasarkan bidang tersebut.

    >>> df.pivot_table(rows=['A', 'B', 'C'])
         A    B      C  D_mean  E_mean
    0  bar  one  large     4.0     5.0
    1  bar  one  small     5.0     3.0
    2  bar  two  large     7.0     1.0
    3  bar  two  small     6.0     2.0
    4  foo  one  large     2.0     4.5
    5  foo  one  small     1.0     3.0
    6  foo  two  small     3.0     5.0
  • Anda dapat menggunakan parameter values untuk menentukan kolom yang ingin Anda hitung.

    >>> df.pivot_table(rows=['A', 'B'], values='D')
         A    B    D_mean
    0  bar  one  4.500000
    1  bar  two  6.500000
    2  foo  one  1.666667
    3  foo  two  3.000000
  • Secara default, nilai rata-rata dihitung. Anda dapat menggunakan parameter aggfunc untuk menentukan satu atau lebih fungsi agregat.

    >>> df.pivot_table(rows=['A', 'B'], values=['D'], aggfunc=['mean', 'count', 'sum'])
         A    B    D_mean  D_count  D_sum
    0  bar  one  4.500000        2      9
    1  bar  two  6.500000        2     13
    2  foo  one  1.666667        3      5
    3  foo  two  3.000000        2      6
  • Anda dapat menggunakan nilai kolom asli sebagai nilai kolom dalam objek koleksi baru.

    >>> df.pivot_table(rows=['A', 'B'], values='D', columns='C')
         A    B  large_D_mean  small_D_mean
    0  bar  one           4.0           5.0
    1  bar  two           7.0           6.0
    2  foo  one           2.0           1.0
    3  foo  two           NaN           3.0
  • Anda dapat menggunakan fill_value untuk mengganti nilai kosong.

    >>> df.pivot_table(rows=['A', 'B'], values='D', columns='C', fill_value=0)
         A    B  large_D_mean  small_D_mean
    0  bar  one             4             5
    1  bar  two             7             6
    2  foo  one             2             1
    3  foo  two             0             3

Konversi string pasangan kunci-nilai

DataFrame dapat mengekstrak pasangan kunci-nilai ke dalam kolom, dan mengonversi kolom standar menjadi pasangan kunci-nilai. Untuk informasi lebih lanjut tentang cara membuat dan mengelola objek DataFrame, lihat Buat objek DataFrame.

  • Ekstrak pasangan kunci-nilai ke dalam kolom. Contoh:

    >>> df
        name               kv
    0  name1  k1=1,k2=3,k5=10
    1  name1    k1=7.1,k7=8.2
    2  name2    k2=1.2,k3=1.5
    3  name2      k9=1.1,k2=1

    Gunakan metode extract_kv untuk mengekstrak bidang pasangan kunci-nilai:

    >>> df.extract_kv(columns=['kv'], kv_delim='=', item_delim=',')
       name   kv_k1  kv_k2  kv_k3  kv_k5  kv_k7  kv_k9
    0  name1    1.0    3.0    NaN   10.0    NaN    NaN
    1  name1    7.0    NaN    NaN    NaN    8.2    NaN
    2  name2    NaN    1.2    1.5    NaN    NaN    NaN
    3  name2    NaN    1.0    NaN    NaN    NaN    1.1

    Parameter columns menentukan bidang yang ingin Anda ekstrak. Parameter kv_delim menentukan pemisah antara kunci dan nilai. Parameter item_delim menentukan pemisah antara pasangan kunci-nilai. Jika tidak ditentukan, kunci dan nilai dipisahkan dengan titik dua (:), dan pasangan kunci-nilai dipisahkan dengan koma (,). Nama bidang keluaran adalah kombinasi dari nama bidang asli dan nilai kunci. Nama dan nilai kunci dihubungkan menggunakan garis bawah (_). Nilai default untuk kolom yang hilang adalah NONE. Anda dapat menggunakan fill_value untuk mengisi nilai pada kolom yang hilang.

    >>> df.extract_kv(columns=['kv'], kv_delim='=', fill_value=0)
       name   kv_k1  kv_k2  kv_k3  kv_k5  kv_k7  kv_k9
    0  name1    1.0    3.0    0.0   10.0    0.0    0.0
    1  name1    7.0    0.0    0.0    0.0    8.2    0.0
    2  name2    0.0    1.2    1.5    0.0    0.0    0.0
    3  name2    0.0    1.0    0.0    0.0    0.0    1.1
  • Konversi beberapa kolom menjadi pasangan kunci-nilai. Contoh:

    >>> df
       name    k1   k2   k3    k5   k7   k9
    0  name1  1.0  3.0  NaN  10.0  NaN  NaN
    1  name1  7.0  NaN  NaN   NaN  8.2  NaN
    2  name2  NaN  1.2  1.5   NaN  NaN  NaN
    3  name2  NaN  1.0  NaN   NaN  NaN  1.1

    Gunakan metode to_kv untuk mengonversi data ke format pasangan kunci-nilai:

    >>> df.to_kv(columns=['k1', 'k2', 'k3', 'k5', 'k7', 'k9'], kv_delim='=')
        name               kv
    0  name1  k1=1,k2=3,k5=10
    1  name1    k1=7.1,k7=8.2
    2  name2    k2=1.2,k3=1.5
    3  name2      k9=1.1,k2=1