PyODPS DataFrame's MapReduce API lets you write custom map and reduce logic in Python and run it at scale on MaxCompute. A map_reduce job can contain only mappers, only reducers, or both.
WordCount example
The following example counts word occurrences across a table that contains a single STRING column.
#encoding=utf-8
from odps import ODPS
from odps import options
from odps.df import DataFrame
options.verbose = True
o = ODPS('your-access-id', 'your-secret-access-key',
project='DMP_UC_dev',
endpoint='http://service-corp.odps.aliyun-inc.com/api')
def mapper(row):
for word in row[0].split():
yield word.lower(), 1
def reducer(keys):
# Use a list instead of cnt=0. A plain integer would be treated as a local
# variable inside h(), so its value would not appear in the output.
cnt = [0]
def h(row, done): # done=True when all rows for this key have been processed
cnt[0] += row[1]
if done:
yield keys[0], cnt[0]
return h
word_count = DataFrame(o.get_table('zx_word_count'))
table = word_count.map_reduce(
mapper, reducer,
group=['word'],
mapper_output_names=['word', 'cnt'],
mapper_output_types=['string', 'int'],
reducer_output_names=['word', 'cnt'],
reducer_output_types=['string', 'int'],
)Expected output:
word cnt
0 are 1
1 day 1
2 doing? 1
...The group parameter specifies which field the reducer uses to group incoming rows. If you omit it, all fields are used for grouping. The reducer receives the aggregated keys and processes every row that shares those keys. The done flag is True when the last row for a given key has been processed.
You can write the reducer as a callable class instead of a closure:
class reducer(object):
def __init__(self, keys):
self.cnt = 0
def __call__(self, row, done): # done=True when all rows for this key are processed
self.cnt += row.cnt
if done:
yield row.word, self.cntSimplify output schema with the @output decorator
Use the @output decorator to declare output field names and types directly on the function, removing the need to pass mapper_output_names, mapper_output_types, reducer_output_names, and reducer_output_types to map_reduce.
from odps.df import output
@output(['word', 'cnt'], ['string', 'int'])
def mapper(row):
for word in row[0].split():
yield word.lower(), 1
@output(['word', 'cnt'], ['string', 'int'])
def reducer(keys):
cnt = [0]
def h(row, done):
cnt[0] += row.cnt
if done:
yield keys.word, cnt[0]
return h
word_count = DataFrame(o.get_table('zx_word_count'))
table = word_count.map_reduce(mapper, reducer, group='word')To sort rows within each group during iteration, pass the sort parameter. The ascending parameter controls the sort direction: a single bool applies the same order to all sort fields; a list lets you set a different direction per field (the list length must match the number of sort fields).
Specify a combiner
During a MapReduce job, mapper output is transferred across the network to reducers — a step called the shuffle. The shuffle involves disk I/O, data serialization, and network transfer, and is one of the most expensive parts of the pipeline.
A combiner reduces shuffle cost by aggregating mapper output locally on each node before it is sent to reducers. This is most effective for commutative and associative operations such as counting and summing.
Constraints before you write a combiner:
A combiner cannot reference resources.
The output field names and types must exactly match those of the corresponding mapper.
The combiner interface is identical to the reducer interface. Pass the combiner function via the combiner parameter:
words_df.map_reduce(mapper, reducer, combiner=reducer, group='word')Reference resources
Mappers and reducers can each reference their own set of resources. Resources are passed as function parameters (for example, def mapper(resources):).
The following example filters stop words in the mapper and boosts whitelist word counts by 5 in the reducer:
white_list_file = o.create_resource('pyodps_white_list_words', 'file', file_obj='Python\nWorld')
@output(['word', 'cnt'], ['string', 'int'])
def mapper(resources):
stop_words = set(r[0].strip() for r in resources[0])
def h(row):
for word in row[0].split():
if word not in stop_words:
yield word, 1
return h
@output(['word', 'cnt'], ['string', 'int'])
def reducer(resources):
d = dict()
d['white_list'] = set(word.strip() for word in resources[0])
d['cnt'] = 0
def inner(keys):
d['cnt'] = 0
def h(row, done):
d['cnt'] += row.cnt
if done:
if row.word in d['white_list']:
d['cnt'] += 5
yield keys.word, d['cnt']
return h
return inner
words_df.map_reduce(mapper, reducer, group='word',
mapper_resources=[stop_words],
reducer_resources=[white_list_file])Expected output:
word cnt
0 hello 2
1 life 1
2 python 7
3 world 6
4 short 1
5 use 1Use third-party Python libraries
Python 3 bytecode features such as yield from cause errors on MaxCompute workers that run Python 2.7. Test your code end-to-end before running Python 3-based MapReduce jobs in production.
Specify libraries globally to apply them to all map_reduce calls in a session:
from odps import options
options.df.libraries = ['six.whl', 'python_dateutil.whl']Or pass them to a single execution:
df.map_reduce(mapper=my_mapper, reducer=my_reducer, group='key').execute(
libraries=['six.whl', 'python_dateutil.whl']
)Reshuffle data
When data is unevenly distributed across cluster partitions, call reshuffle to rebalance it. By default, rows are assigned to partitions by random hash:
df1 = df.reshuffle()To distribute by a specific column and sort the result:
df1 = df.reshuffle('name', sort='id', ascending=False)Bloom filter
bloom_filter quickly pre-filters one dataset against another before a join, reducing the rows that need to be shuffled and compared. It works best when one dataset is much larger than the other — for example, filtering browsing-event data against a smaller set of transaction records before joining them.
The filter is approximate: it eliminates rows that are definitely absent from the reference set, but may retain a small number of rows that are not actually present.
df1 = DataFrame(pd.DataFrame({'a': ['name1', 'name2', 'name3', 'name1'], 'b': [1, 2, 3, 4]}))
df2 = DataFrame(pd.DataFrame({'a': ['name1']}))
df1.bloom_filter('a', df2.a)
# The first argument can also be a column expression, e.g., df1.a + '1'Expected output:
a b
0 name1 1
1 name1 4In this example, name2 and name3 are filtered out. On larger datasets, the filter may not eliminate every non-matching row, but the join result remains correct — the filter only affects performance, not accuracy.
Configure the filter with these parameters:
| Parameter | Default | Effect on accuracy | Effect on memory |
|---|---|---|---|
capacity | 3000 | Higher values reduce false positives | Increases |
error_rate | 0.01 | Lower values reduce false positives | Increases |
Set both parameters based on your dataset size and available memory.
For information about executing DataFrame operations, see DataFrame execution.
Pivot table
pivot_table summarizes data by grouping rows and computing aggregate values across columns.
Sample data:
>>> df
A B C D E
0 foo one small 1 3
1 foo one large 2 4
2 foo one large 2 5
3 foo two small 3 6
4 foo two small 3 4
5 bar one large 4 5
6 bar one small 5 3
7 bar two small 6 2
8 bar two large 7 1The rows parameter is required. It specifies the fields to group by; the default aggregate function is mean:
>>> df['A', 'D', 'E'].pivot_table(rows='A')
A D_mean E_mean
0 bar 5.5 2.75
1 foo 2.2 4.40Pass multiple fields to rows for finer grouping:
>>> df.pivot_table(rows=['A', 'B', 'C'])
A B C D_mean E_mean
0 bar one large 4.0 5.0
1 bar one small 5.0 3.0
...Use values to restrict the columns being aggregated:
>>> df.pivot_table(rows=['A', 'B'], values='D')
A B D_mean
0 bar one 4.500000
1 bar two 6.500000
2 foo one 1.666667
3 foo two 3.000000Use aggfunc to apply one or more aggregate functions:
>>> df.pivot_table(rows=['A', 'B'], values=['D'], aggfunc=['mean', 'count', 'sum'])
A B D_mean D_count D_sum
0 bar one 4.500000 2 9
1 bar two 6.500000 2 13
2 foo one 1.666667 3 5
3 foo two 3.000000 2 6Use columns to pivot a column's values into new column headers:
>>> df.pivot_table(rows=['A', 'B'], values='D', columns='C')
A B large_D_mean small_D_mean
0 bar one 4.0 5.0
1 bar two 7.0 6.0
2 foo one 2.0 1.0
3 foo two NaN 3.0Use fill_value to replace NaN with a default:
>>> df.pivot_table(rows=['A', 'B'], values='D', columns='C', fill_value=0)
A B large_D_mean small_D_mean
0 bar one 4 5
1 bar two 7 6
2 foo one 2 1
3 foo two 0 3Key-value string conversion
DataFrame can parse key-value strings into separate columns and convert columnar data back into key-value strings. For information about creating DataFrame objects, see Create a DataFrame object.
Extract key-value pairs into columns
Use extract_kv to parse a column that contains delimited key-value strings:
>>> df
name kv
0 name1 k1=1,k2=3,k5=10
1 name1 k1=7.1,k7=8.2
2 name2 k2=1.2,k3=1.5
3 name2 k9=1.1,k2=1
>>> df.extract_kv(columns=['kv'], kv_delim='=', item_delim=',')
name kv_k1 kv_k2 kv_k3 kv_k5 kv_k7 kv_k9
0 name1 1.0 3.0 NaN 10.0 NaN NaN
1 name1 7.0 NaN NaN NaN 8.2 NaN
2 name2 NaN 1.2 1.5 NaN NaN NaN
3 name2 NaN 1.0 NaN NaN NaN 1.1Parameters:
| Parameter | Description | Default |
|---|---|---|
columns | Fields to extract key-value pairs from | — |
kv_delim | Delimiter between each key and its value | : |
item_delim | Delimiter between key-value pairs | , |
Output column names follow the pattern {original_field}_{key}, joined with an underscore. Missing values default to NaN. Use fill_value to replace missing values with a specific default:
>>> df.extract_kv(columns=['kv'], kv_delim='=', fill_value=0)
name kv_k1 kv_k2 kv_k3 kv_k5 kv_k7 kv_k9
0 name1 1.0 3.0 0.0 10.0 0.0 0.0
1 name1 7.0 0.0 0.0 0.0 8.2 0.0
2 name2 0.0 1.2 1.5 0.0 0.0 0.0
3 name2 0.0 1.0 0.0 0.0 0.0 1.1Convert columns to key-value strings
Use to_kv to serialize multiple columns into a single key-value string column:
>>> df
name k1 k2 k3 k5 k7 k9
0 name1 1.0 3.0 NaN 10.0 NaN NaN
1 name1 7.0 NaN NaN NaN 8.2 NaN
2 name2 NaN 1.2 1.5 NaN NaN NaN
3 name2 NaN 1.0 NaN NaN NaN 1.1
>>> df.to_kv(columns=['k1', 'k2', 'k3', 'k5', 'k7', 'k9'], kv_delim='=')
name kv
0 name1 k1=1,k2=3,k5=10
1 name1 k1=7.1,k7=8.2
2 name2 k2=1.2,k3=1.5
3 name2 k9=1.1,k2=1