すべてのプロダクト
Search
ドキュメントセンター

MaxCompute:MapReduce API

最終更新日:Jun 11, 2025

このトピックでは、MapReduce API について説明し、MapReduce API を使用して大規模なデータセットを効率的に処理および分析する方法について説明します。

PyODPS DataFrame は MapReduce API をサポートしています。map 関数と reduce 関数は個別に記述できます。これは、map_reduce プロセスには mapper または reducer のみを含めることができるためです。

次の例は、WordCount プログラムを実行する方法を示しています。

>>> #encoding=utf-8
>>> from odps import ODPS
>>> from odps import options
>>> options.verbose = True
>>> o = ODPS('your-access-id', 'your-secret-access-key',project='DMP_UC_dev', endpoint='http://service-corp.odps.aliyun-inc.com/api')
>>> from odps.df import DataFrame

>>> def mapper(row):
>>>     for word in row[0].split():
>>>         yield word.lower(), 1
>>>
>>> def reducer(keys):
>>>     # リストが cnt=0 の代わりに使用されます。 cnt=0 を使用すると、h 関数の cnt はローカル変数と見なされ、その値は出力に含まれません。
>>>     cnt = [0]
>>>     def h(row, done):  # done は、このキーを持つ行が反復処理されることを示します。
>>>         cnt[0] += row[1]
>>>         if done:
>>>             yield keys[0], cnt[0]
>>>     return h
>>> # zx_word_count テーブルには 1 つの列のみがあり、STRING 型です。
>>> word_count = DataFrame(o.get_table('zx_word_count'))
>>> table = word_count.map_reduce(mapper, reducer, group=['word', ],
                        mapper_output_names=['word', 'cnt'],
                        mapper_output_types=['string', 'int'],
                        reducer_output_names=['word', 'cnt'],
                        reducer_output_types=['string', 'int'])
         word  cnt
0         are    1
1         day    1
2      doing?    1
3   everybody    1
4       first    1
5       hello    2
6         how    1
7          is    1
8          so    1
9         the    1
10       this    1
11      world    1
12        you    1

group パラメーターを使用して、reduce 関数を使用してデータをグループ化するフィールドを指定します。このパラメーターを指定しない場合、データはすべてのフィールドによってグループ化されます。Reducer は集計された キー を受信して初期化し、キー に基づいて集計された行を処理します。done は、これらの キー に関連するすべての行が反復処理されることを示します。

理解を容易にするために、この例では関数はクロージャとして記述されています。関数を callable クラスとして記述することもできます。

class reducer(object):
    def __init__(self, keys):
        self.cnt = 0

    def __call__(self, row, done):  # done は、このキーを持つ行が反復処理されることを示します。
        self.cnt += row.cnt
        if done:
            yield row.word, self.cnt

output をコメントに使用すると、コードが簡略化されます。

>>> from odps.df import output
>>>
>>> @output(['word', 'cnt'], ['string', 'int'])
>>> def mapper(row):
>>>     for word in row[0].split():
>>>         yield word.lower(), 1
>>>
>>> @output(['word', 'cnt'], ['string', 'int'])
>>> def reducer(keys):
>>>     # リストが cnt=0 の代わりに使用されます。 cnt=0 を使用すると、h 関数の cnt はローカル変数と見なされ、その値は出力に含まれません。
>>>     cnt = [0]
>>>     def h(row, done):  # done は、このキーを持つ行が反復処理されることを示します。
>>>         cnt[0] += row.cnt
>>>         if done:
>>>             yield keys.word, cnt[0]
>>>     return h
>>>
>>> word_count = DataFrame(o.get_table('zx_word_count'))
>>> table = word_count.map_reduce(mapper, reducer, group='word')
         word  cnt
0         are    1
1         day    1
2      doing?    1
3   everybody    1
4       first    1
5       hello    2
6         how    1
7          is    1
8          so    1
9         the    1
10       this    1
11      world    1
12        you    1

反復処理中に、sort パラメーターを使用して列でデータをソートし、ascending パラメーターを使用してソート順を指定できます。ascending パラメーターはブール値にすることができ、sort パラメーターで指定されたすべてのフィールドが同じ順序で配置されることを示します。 ascending パラメーターはリストにすることもできます。リスト内の文字列の数は、sort パラメーターで指定されたフィールドの数と同じである必要があります。

コンバイナーを指定する

MapReduceMapReduce API では、combiner を使用して mapper 内のデータを集計します。 combiner は reducer と同じ方法で使用されますが、combiner はリソースを参照できません。combiner から生成されたフィールドの名前とデータ型は、combiner に対応する mapper のフィールドの名前とデータ型と同じである必要があります。

この例では、reducercombiner として使用して、mapper 内のデータを集計し、シャッフルされたデータを削減できます。

>>> words_df.map_reduce(mapper, reducer, combiner=reducer, group='word')

リソースを参照する

MapReduce API では、mapper および reducer によって参照されるリソースを個別に指定できます。

次の例では、非推奨の単語フィルタリングが mapper で実行され、reducer 内のホワイトリストにある単語の数が 5 つ増えます。

>>> white_list_file = o.create_resource('pyodps_white_list_words', 'file', file_obj='Python\nWorld')
>>>
>>> @output(['word', 'cnt'], ['string', 'int'])
>>> def mapper(resources):
>>>     stop_words = set(r[0].strip() for r in resources[0])
>>>     def h(row):
>>>         for word in row[0].split():
>>>             if word not in stop_words:
>>>                 yield word, 1
>>>     return h
>>>
>>> @output(['word', 'cnt'], ['string', 'int'])
>>> def reducer(resources):
>>>     d = dict()
>>>     d['white_list'] = set(word.strip() for word in resources[0])
>>>     d['cnt'] = 0
>>>     def inner(keys):
>>>         d['cnt'] = 0
>>>         def h(row, done):
>>>             d['cnt'] += row.cnt
>>>             if done:
>>>                 if row.word in d['white_list']:
>>>                     d['cnt'] += 5
>>>                 yield keys.word, d['cnt']
>>>         return h
>>>     return inner
>>>
>>> words_df.map_reduce(mapper, reducer, group='word',
>>>                     mapper_resources=[stop_words], reducer_resources=[white_list_file])
     word  cnt
0   hello    2
1    life    1
2  python    7
3   world    6
4   short    1
5     use    1

サードパーティの Python ライブラリを使用する

使い方は、マップステージのサードパーティの Python ライブラリと似ています。

  • ライブラリをグローバルに指定します。

    >>> from odps import options
    >>> options.df.libraries = ['six.whl', 'python_dateutil.whl']
  • 即時呼び出しメソッドを使用する場合は、ライブラリをローカルに指定します。

    >>> df.map_reduce(mapper=my_mapper, reducer=my_reducer, group='key').execute(libraries=['six.whl', 'python_dateutil.whl'])
    説明

    バイトコード定義の違いにより、yield from などの Python 3 でサポートされている新機能に基づいてコードを記述すると、Python 2.7 の MaxCompute ワーカーでコードが実行されたときにエラーが報告されます。 Python 3 の MapReduce API を使用して本番操作を実装する前に、コードが正常に実行されることを確認することをお勧めします。

データを再シャッフルする

データがクラスタ内で不均一に分散されている場合は、reshuffle メソッドを呼び出してデータを再シャッフルできます。

>>> df1 = df.reshuffle()

デフォルトでは、データは乱数としてハッシュされます。列ごとにデータを分散し、再シャッフルされたデータを特定の順序でソートすることもできます。

>>> df1.reshuffle('name', sort='id', ascending=False)

ブルームフィルター

PyODPS DataFrame は、ブルームフィルターでデータを計算するための bloom_filter インターフェースを提供します。

ブルームフィルターは、sequence1 に存在するが sequence2 に存在しないデータをすばやく除外できますが、データが完全に除外されない場合があります。この近似フィルタリング方法は、データ量が大幅に異なる join シナリオで特に役立ちます。たとえば、ユーザーの閲覧データをトランザクションデータと join する場合、閲覧データの量はトランザクションデータの量よりもはるかに大きくなります。この場合、最初にトランザクションデータを使用してブルームフィルターを適用して閲覧データを事前にフィルタリングしてから、join 操作を実行できます。これにより、パフォーマンスが大幅に向上します。

>>> df1 = DataFrame(pd.DataFrame({'a': ['name1', 'name2', 'name3', 'name1'], 'b': [1, 2, 3, 4]}))
>>> df1
       a  b
0  name1  1
1  name2  2
2  name3  3
3  name1  4
>>> df2 = DataFrame(pd.DataFrame({'a': ['name1']}))
>>> df2
       a
0  name1
>>> df1.bloom_filter('a', df2.a) # 行 0 は計算式にすることができます。たとえば、df1.a + '1' です。
       a  b
0  name1  1
1  name1  4

説明:

  • 少量のデータが処理されます。したがって、df1 の列 aname2name3 が含まれる行は除外されます。ただし、システムが大量のデータを処理する場合、システムは指定された条件を満たすすべてのデータをフィルタリングできない場合があります。

  • 上記の JOIN 操作では、フィルタリングされていないデータはデータの精度に影響しません。ただし、データフィルタリングにより、JOIN パフォーマンスが大幅に向上します。

  • capacity パラメーターと error_rate パラメーターを指定して、データ量とエラー率を設定できます。デフォルト値は 30000.01 です。

説明

capacity パラメーターの値を増やすか、error_rate パラメーターの値を減らすと、メモリの使用量が増加します。したがって、要件に基づいてパラメーターを適切な値に設定してください。

コレクションオブジェクトの詳細については、「DataFrame の実行」をご参照ください。

ピボットテーブル

PyODPS DataFrame は、ピボットテーブル機能を提供します。次のコードは、サンプルテーブルデータを示しています。

>>> df
     A    B      C  D  E
0  foo  one  small  1  3
1  foo  one  large  2  4
2  foo  one  large  2  5
3  foo  two  small  3  6
4  foo  two  small  3  4
5  bar  one  large  4  5
6  bar  one  small  5  3
7  bar  two  small  6  2
8  bar  two  large  7  1
  • ピボットテーブル機能を使用する場合、1 つ以上のフィールドに基づいて平均値を取得するには、rows パラメーターが必要です。

    >>> df['A', 'D', 'E'].pivot_table(rows='A')
         A  D_mean  E_mean
    0  bar     5.5    2.75
    1  foo     2.2    4.40
  • rows に複数のフィールドを指定して、フィールドに基づいてデータを集計できます。

    >>> df.pivot_table(rows=['A', 'B', 'C'])
         A    B      C  D_mean  E_mean
    0  bar  one  large     4.0     5.0
    1  bar  one  small     5.0     3.0
    2  bar  two  large     7.0     1.0
    3  bar  two  small     6.0     2.0
    4  foo  one  large     2.0     4.5
    5  foo  one  small     1.0     3.0
    6  foo  two  small     3.0     5.0
  • values パラメーターを使用して、計算する列を指定できます。

    >>> df.pivot_table(rows=['A', 'B'], values='D')
         A    B    D_mean
    0  bar  one  4.500000
    1  bar  two  6.500000
    2  foo  one  1.666667
    3  foo  two  3.000000
  • デフォルトでは、平均値が計算されます。aggfunc パラメーターを使用して、1 つ以上の集計関数を指定できます。

    >>> df.pivot_table(rows=['A', 'B'], values=['D'], aggfunc=['mean', 'count', 'sum'])
         A    B    D_mean  D_count  D_sum
    0  bar  one  4.500000        2      9
    1  bar  two  6.500000        2     13
    2  foo  one  1.666667        3      5
    3  foo  two  3.000000        2      6
  • 元の列の値を新しいコレクションオブジェクトの列の値として使用できます。

    >>> df.pivot_table(rows=['A', 'B'], values='D', columns='C')
         A    B  large_D_mean  small_D_mean
    0  bar  one           4.0           5.0
    1  bar  two           7.0           6.0
    2  foo  one           2.0           1.0
    3  foo  two           NaN           3.0
  • fill_value を使用して空の値を置き換えることができます。

    >>> df.pivot_table(rows=['A', 'B'], values='D', columns='C', fill_value=0)
         A    B  large_D_mean  small_D_mean
    0  bar  one             4             5
    1  bar  two             7             6
    2  foo  one             2             1
    3  foo  two             0             3

キーと値の文字列変換

DataFrame は、キーと値のペアを列に抽出し、標準列をキーと値のペアに変換できます。 DataFrame オブジェクトの作成と管理方法の詳細については、「DataFrame オブジェクトを作成する」をご参照ください。

  • キーと値のペアを列に抽出します。例:

    >>> df
        name               kv
    0  name1  k1=1,k2=3,k5=10
    1  name1    k1=7.1,k7=8.2
    2  name2    k2=1.2,k3=1.5
    3  name2      k9=1.1,k2=1

    extract_kv メソッドを使用して、キーと値のフィールドを抽出します。

    >>> df.extract_kv(columns=['kv'], kv_delim='=', item_delim=',')
       name   kv_k1  kv_k2  kv_k3  kv_k5  kv_k7  kv_k9
    0  name1    1.0    3.0    NaN   10.0    NaN    NaN
    1  name1    7.0    NaN    NaN    NaN    8.2    NaN
    2  name2    NaN    1.2    1.5    NaN    NaN    NaN
    3  name2    NaN    1.0    NaN    NaN    NaN    1.1

    columns パラメーターは、抽出するフィールドを指定します。kv_delim パラメーターは、キーと値の間のデリミタを指定します。item_delim パラメーターは、キーと値のペア間のデリミタを指定します。パラメーターを指定しない場合、キーと値はコロン(:)で区切られ、キーと値のペアはカンマ(,)で区切られます。出力フィールドの名前は、元のフィールド名とキー値の組み合わせです。名前とキー値は、アンダースコア(_)を使用して連結されます。欠損している列のデフォルト値は NONE です。fill_value を使用して、欠損している列の値を埋めることができます。

    >>> df.extract_kv(columns=['kv'], kv_delim='=', fill_value=0)
       name   kv_k1  kv_k2  kv_k3  kv_k5  kv_k7  kv_k9
    0  name1    1.0    3.0    0.0   10.0    0.0    0.0
    1  name1    7.0    0.0    0.0    0.0    8.2    0.0
    2  name2    0.0    1.2    1.5    0.0    0.0    0.0
    3  name2    0.0    1.0    0.0    0.0    0.0    1.1
  • 複数の列をキーと値のペアに変換します。例:

    >>> df
       name    k1   k2   k3    k5   k7   k9
    0  name1  1.0  3.0  NaN  10.0  NaN  NaN
    1  name1  7.0  NaN  NaN   NaN  8.2  NaN
    2  name2  NaN  1.2  1.5   NaN  NaN  NaN
    3  name2  NaN  1.0  NaN   NaN  NaN  1.1

    to_kv メソッドを使用して、データをキーと値の形式に変換します。

    >>> df.to_kv(columns=['k1', 'k2', 'k3', 'k5', 'k7', 'k9'], kv_delim='=')
        name               kv
    0  name1  k1=1,k2=3,k5=10
    1  name1    k1=7.1,k7=8.2
    2  name2    k2=1.2,k3=1.5
    3  name2      k9=1.1,k2=1