All Products
Search
Document Center

MaxCompute:MapReduce API

Last Updated:Mar 25, 2026

PyODPS DataFrame's MapReduce API lets you write custom map and reduce logic in Python and run it at scale on MaxCompute. A map_reduce job can contain only mappers, only reducers, or both.

WordCount example

The following example counts word occurrences across a table that contains a single STRING column.

#encoding=utf-8
from odps import ODPS
from odps import options
from odps.df import DataFrame

options.verbose = True

o = ODPS('your-access-id', 'your-secret-access-key',
         project='DMP_UC_dev',
         endpoint='http://service-corp.odps.aliyun-inc.com/api')

def mapper(row):
    for word in row[0].split():
        yield word.lower(), 1

def reducer(keys):
    # Use a list instead of cnt=0. A plain integer would be treated as a local
    # variable inside h(), so its value would not appear in the output.
    cnt = [0]
    def h(row, done):  # done=True when all rows for this key have been processed
        cnt[0] += row[1]
        if done:
            yield keys[0], cnt[0]
    return h

word_count = DataFrame(o.get_table('zx_word_count'))
table = word_count.map_reduce(
    mapper, reducer,
    group=['word'],
    mapper_output_names=['word', 'cnt'],
    mapper_output_types=['string', 'int'],
    reducer_output_names=['word', 'cnt'],
    reducer_output_types=['string', 'int'],
)

Expected output:

     word  cnt
0     are    1
1     day    1
2  doing?    1
...

The group parameter specifies which field the reducer uses to group incoming rows. If you omit it, all fields are used for grouping. The reducer receives the aggregated keys and processes every row that shares those keys. The done flag is True when the last row for a given key has been processed.

You can write the reducer as a callable class instead of a closure:

class reducer(object):
    def __init__(self, keys):
        self.cnt = 0

    def __call__(self, row, done):  # done=True when all rows for this key are processed
        self.cnt += row.cnt
        if done:
            yield row.word, self.cnt

Simplify output schema with the @output decorator

Use the @output decorator to declare output field names and types directly on the function, removing the need to pass mapper_output_names, mapper_output_types, reducer_output_names, and reducer_output_types to map_reduce.

from odps.df import output

@output(['word', 'cnt'], ['string', 'int'])
def mapper(row):
    for word in row[0].split():
        yield word.lower(), 1

@output(['word', 'cnt'], ['string', 'int'])
def reducer(keys):
    cnt = [0]
    def h(row, done):
        cnt[0] += row.cnt
        if done:
            yield keys.word, cnt[0]
    return h

word_count = DataFrame(o.get_table('zx_word_count'))
table = word_count.map_reduce(mapper, reducer, group='word')

To sort rows within each group during iteration, pass the sort parameter. The ascending parameter controls the sort direction: a single bool applies the same order to all sort fields; a list lets you set a different direction per field (the list length must match the number of sort fields).

Specify a combiner

During a MapReduce job, mapper output is transferred across the network to reducers — a step called the shuffle. The shuffle involves disk I/O, data serialization, and network transfer, and is one of the most expensive parts of the pipeline.

A combiner reduces shuffle cost by aggregating mapper output locally on each node before it is sent to reducers. This is most effective for commutative and associative operations such as counting and summing.

Constraints before you write a combiner:

  • A combiner cannot reference resources.

  • The output field names and types must exactly match those of the corresponding mapper.

The combiner interface is identical to the reducer interface. Pass the combiner function via the combiner parameter:

words_df.map_reduce(mapper, reducer, combiner=reducer, group='word')

Reference resources

Mappers and reducers can each reference their own set of resources. Resources are passed as function parameters (for example, def mapper(resources):).

The following example filters stop words in the mapper and boosts whitelist word counts by 5 in the reducer:

white_list_file = o.create_resource('pyodps_white_list_words', 'file', file_obj='Python\nWorld')

@output(['word', 'cnt'], ['string', 'int'])
def mapper(resources):
    stop_words = set(r[0].strip() for r in resources[0])
    def h(row):
        for word in row[0].split():
            if word not in stop_words:
                yield word, 1
    return h

@output(['word', 'cnt'], ['string', 'int'])
def reducer(resources):
    d = dict()
    d['white_list'] = set(word.strip() for word in resources[0])
    d['cnt'] = 0
    def inner(keys):
        d['cnt'] = 0
        def h(row, done):
            d['cnt'] += row.cnt
            if done:
                if row.word in d['white_list']:
                    d['cnt'] += 5
                yield keys.word, d['cnt']
        return h
    return inner

words_df.map_reduce(mapper, reducer, group='word',
                    mapper_resources=[stop_words],
                    reducer_resources=[white_list_file])

Expected output:

    word  cnt
0  hello    2
1   life    1
2  python    7
3   world    6
4   short    1
5     use    1

Use third-party Python libraries

Important

Python 3 bytecode features such as yield from cause errors on MaxCompute workers that run Python 2.7. Test your code end-to-end before running Python 3-based MapReduce jobs in production.

Specify libraries globally to apply them to all map_reduce calls in a session:

from odps import options
options.df.libraries = ['six.whl', 'python_dateutil.whl']

Or pass them to a single execution:

df.map_reduce(mapper=my_mapper, reducer=my_reducer, group='key').execute(
    libraries=['six.whl', 'python_dateutil.whl']
)

Reshuffle data

When data is unevenly distributed across cluster partitions, call reshuffle to rebalance it. By default, rows are assigned to partitions by random hash:

df1 = df.reshuffle()

To distribute by a specific column and sort the result:

df1 = df.reshuffle('name', sort='id', ascending=False)

Bloom filter

bloom_filter quickly pre-filters one dataset against another before a join, reducing the rows that need to be shuffled and compared. It works best when one dataset is much larger than the other — for example, filtering browsing-event data against a smaller set of transaction records before joining them.

The filter is approximate: it eliminates rows that are definitely absent from the reference set, but may retain a small number of rows that are not actually present.

df1 = DataFrame(pd.DataFrame({'a': ['name1', 'name2', 'name3', 'name1'], 'b': [1, 2, 3, 4]}))
df2 = DataFrame(pd.DataFrame({'a': ['name1']}))

df1.bloom_filter('a', df2.a)
# The first argument can also be a column expression, e.g., df1.a + '1'

Expected output:

       a  b
0  name1  1
1  name1  4

In this example, name2 and name3 are filtered out. On larger datasets, the filter may not eliminate every non-matching row, but the join result remains correct — the filter only affects performance, not accuracy.

Configure the filter with these parameters:

ParameterDefaultEffect on accuracyEffect on memory
capacity3000Higher values reduce false positivesIncreases
error_rate0.01Lower values reduce false positivesIncreases

Set both parameters based on your dataset size and available memory.

For information about executing DataFrame operations, see DataFrame execution.

Pivot table

pivot_table summarizes data by grouping rows and computing aggregate values across columns.

Sample data:

>>> df
     A    B      C  D  E
0  foo  one  small  1  3
1  foo  one  large  2  4
2  foo  one  large  2  5
3  foo  two  small  3  6
4  foo  two  small  3  4
5  bar  one  large  4  5
6  bar  one  small  5  3
7  bar  two  small  6  2
8  bar  two  large  7  1

The rows parameter is required. It specifies the fields to group by; the default aggregate function is mean:

>>> df['A', 'D', 'E'].pivot_table(rows='A')
     A  D_mean  E_mean
0  bar     5.5    2.75
1  foo     2.2    4.40

Pass multiple fields to rows for finer grouping:

>>> df.pivot_table(rows=['A', 'B', 'C'])
     A    B      C  D_mean  E_mean
0  bar  one  large     4.0     5.0
1  bar  one  small     5.0     3.0
...

Use values to restrict the columns being aggregated:

>>> df.pivot_table(rows=['A', 'B'], values='D')
     A    B    D_mean
0  bar  one  4.500000
1  bar  two  6.500000
2  foo  one  1.666667
3  foo  two  3.000000

Use aggfunc to apply one or more aggregate functions:

>>> df.pivot_table(rows=['A', 'B'], values=['D'], aggfunc=['mean', 'count', 'sum'])
     A    B    D_mean  D_count  D_sum
0  bar  one  4.500000        2      9
1  bar  two  6.500000        2     13
2  foo  one  1.666667        3      5
3  foo  two  3.000000        2      6

Use columns to pivot a column's values into new column headers:

>>> df.pivot_table(rows=['A', 'B'], values='D', columns='C')
     A    B  large_D_mean  small_D_mean
0  bar  one           4.0           5.0
1  bar  two           7.0           6.0
2  foo  one           2.0           1.0
3  foo  two           NaN           3.0

Use fill_value to replace NaN with a default:

>>> df.pivot_table(rows=['A', 'B'], values='D', columns='C', fill_value=0)
     A    B  large_D_mean  small_D_mean
0  bar  one             4             5
1  bar  two             7             6
2  foo  one             2             1
3  foo  two             0             3

Key-value string conversion

DataFrame can parse key-value strings into separate columns and convert columnar data back into key-value strings. For information about creating DataFrame objects, see Create a DataFrame object.

Extract key-value pairs into columns

Use extract_kv to parse a column that contains delimited key-value strings:

>>> df
    name               kv
0  name1  k1=1,k2=3,k5=10
1  name1    k1=7.1,k7=8.2
2  name2    k2=1.2,k3=1.5
3  name2      k9=1.1,k2=1

>>> df.extract_kv(columns=['kv'], kv_delim='=', item_delim=',')
   name   kv_k1  kv_k2  kv_k3  kv_k5  kv_k7  kv_k9
0  name1    1.0    3.0    NaN   10.0    NaN    NaN
1  name1    7.0    NaN    NaN    NaN    8.2    NaN
2  name2    NaN    1.2    1.5    NaN    NaN    NaN
3  name2    NaN    1.0    NaN    NaN    NaN    1.1

Parameters:

ParameterDescriptionDefault
columnsFields to extract key-value pairs from
kv_delimDelimiter between each key and its value:
item_delimDelimiter between key-value pairs,

Output column names follow the pattern {original_field}_{key}, joined with an underscore. Missing values default to NaN. Use fill_value to replace missing values with a specific default:

>>> df.extract_kv(columns=['kv'], kv_delim='=', fill_value=0)
   name   kv_k1  kv_k2  kv_k3  kv_k5  kv_k7  kv_k9
0  name1    1.0    3.0    0.0   10.0    0.0    0.0
1  name1    7.0    0.0    0.0    0.0    8.2    0.0
2  name2    0.0    1.2    1.5    0.0    0.0    0.0
3  name2    0.0    1.0    0.0    0.0    0.0    1.1

Convert columns to key-value strings

Use to_kv to serialize multiple columns into a single key-value string column:

>>> df
   name    k1   k2   k3    k5   k7   k9
0  name1  1.0  3.0  NaN  10.0  NaN  NaN
1  name1  7.0  NaN  NaN   NaN  8.2  NaN
2  name2  NaN  1.2  1.5   NaN  NaN  NaN
3  name2  NaN  1.0  NaN   NaN  NaN  1.1

>>> df.to_kv(columns=['k1', 'k2', 'k3', 'k5', 'k7', 'k9'], kv_delim='=')
    name               kv
0  name1  k1=1,k2=3,k5=10
1  name1    k1=7.1,k7=8.2
2  name2    k2=1.2,k3=1.5
3  name2      k9=1.1,k2=1