このトピックでは、MapReduce API について説明し、MapReduce API を使用して大規模なデータセットを効率的に処理および分析する方法について説明します。
PyODPS DataFrame は MapReduce API をサポートしています。map
関数と reduce
関数は個別に記述できます。これは、map_reduce
プロセスには mapper
または reducer
のみを含めることができるためです。
次の例は、WordCount
プログラムを実行する方法を示しています。
>>> #encoding=utf-8
>>> from odps import ODPS
>>> from odps import options
>>> options.verbose = True
>>> o = ODPS('your-access-id', 'your-secret-access-key',project='DMP_UC_dev', endpoint='http://service-corp.odps.aliyun-inc.com/api')
>>> from odps.df import DataFrame
>>> def mapper(row):
>>> for word in row[0].split():
>>> yield word.lower(), 1
>>>
>>> def reducer(keys):
>>> # リストが cnt=0 の代わりに使用されます。 cnt=0 を使用すると、h 関数の cnt はローカル変数と見なされ、その値は出力に含まれません。
>>> cnt = [0]
>>> def h(row, done): # done は、このキーを持つ行が反復処理されることを示します。
>>> cnt[0] += row[1]
>>> if done:
>>> yield keys[0], cnt[0]
>>> return h
>>> # zx_word_count テーブルには 1 つの列のみがあり、STRING 型です。
>>> word_count = DataFrame(o.get_table('zx_word_count'))
>>> table = word_count.map_reduce(mapper, reducer, group=['word', ],
mapper_output_names=['word', 'cnt'],
mapper_output_types=['string', 'int'],
reducer_output_names=['word', 'cnt'],
reducer_output_types=['string', 'int'])
word cnt
0 are 1
1 day 1
2 doing? 1
3 everybody 1
4 first 1
5 hello 2
6 how 1
7 is 1
8 so 1
9 the 1
10 this 1
11 world 1
12 you 1
group
パラメーターを使用して、reduce
関数を使用してデータをグループ化するフィールドを指定します。このパラメーターを指定しない場合、データはすべてのフィールドによってグループ化されます。Reducer
は集計された キー
を受信して初期化し、キー
に基づいて集計された行を処理します。done
は、これらの キー
に関連するすべての行が反復処理されることを示します。
理解を容易にするために、この例では関数はクロージャとして記述されています。関数を callable
クラスとして記述することもできます。
class reducer(object):
def __init__(self, keys):
self.cnt = 0
def __call__(self, row, done): # done は、このキーを持つ行が反復処理されることを示します。
self.cnt += row.cnt
if done:
yield row.word, self.cnt
output
をコメントに使用すると、コードが簡略化されます。
>>> from odps.df import output
>>>
>>> @output(['word', 'cnt'], ['string', 'int'])
>>> def mapper(row):
>>> for word in row[0].split():
>>> yield word.lower(), 1
>>>
>>> @output(['word', 'cnt'], ['string', 'int'])
>>> def reducer(keys):
>>> # リストが cnt=0 の代わりに使用されます。 cnt=0 を使用すると、h 関数の cnt はローカル変数と見なされ、その値は出力に含まれません。
>>> cnt = [0]
>>> def h(row, done): # done は、このキーを持つ行が反復処理されることを示します。
>>> cnt[0] += row.cnt
>>> if done:
>>> yield keys.word, cnt[0]
>>> return h
>>>
>>> word_count = DataFrame(o.get_table('zx_word_count'))
>>> table = word_count.map_reduce(mapper, reducer, group='word')
word cnt
0 are 1
1 day 1
2 doing? 1
3 everybody 1
4 first 1
5 hello 2
6 how 1
7 is 1
8 so 1
9 the 1
10 this 1
11 world 1
12 you 1
反復処理中に、sort
パラメーターを使用して列でデータをソートし、ascending
パラメーターを使用してソート順を指定できます。ascending
パラメーターはブール値にすることができ、sort
パラメーターで指定されたすべてのフィールドが同じ順序で配置されることを示します。 ascending パラメーターはリストにすることもできます。リスト内の文字列の数は、sort
パラメーターで指定されたフィールドの数と同じである必要があります。
コンバイナーを指定する
MapReduce
MapReduce API では、combiner
を使用して mapper
内のデータを集計します。 combiner は reducer
と同じ方法で使用されますが、combiner はリソースを参照できません。combiner
から生成されたフィールドの名前とデータ型は、combiner に対応する mapper
のフィールドの名前とデータ型と同じである必要があります。
この例では、reducer
を combiner
として使用して、mapper
内のデータを集計し、シャッフルされたデータを削減できます。
>>> words_df.map_reduce(mapper, reducer, combiner=reducer, group='word')
リソースを参照する
MapReduce API では、mapper
および reducer
によって参照されるリソースを個別に指定できます。
次の例では、非推奨の単語フィルタリングが mapper
で実行され、reducer
内のホワイトリストにある単語の数が 5 つ増えます。
>>> white_list_file = o.create_resource('pyodps_white_list_words', 'file', file_obj='Python\nWorld')
>>>
>>> @output(['word', 'cnt'], ['string', 'int'])
>>> def mapper(resources):
>>> stop_words = set(r[0].strip() for r in resources[0])
>>> def h(row):
>>> for word in row[0].split():
>>> if word not in stop_words:
>>> yield word, 1
>>> return h
>>>
>>> @output(['word', 'cnt'], ['string', 'int'])
>>> def reducer(resources):
>>> d = dict()
>>> d['white_list'] = set(word.strip() for word in resources[0])
>>> d['cnt'] = 0
>>> def inner(keys):
>>> d['cnt'] = 0
>>> def h(row, done):
>>> d['cnt'] += row.cnt
>>> if done:
>>> if row.word in d['white_list']:
>>> d['cnt'] += 5
>>> yield keys.word, d['cnt']
>>> return h
>>> return inner
>>>
>>> words_df.map_reduce(mapper, reducer, group='word',
>>> mapper_resources=[stop_words], reducer_resources=[white_list_file])
word cnt
0 hello 2
1 life 1
2 python 7
3 world 6
4 short 1
5 use 1
サードパーティの Python ライブラリを使用する
使い方は、マップステージのサードパーティの Python ライブラリと似ています。
ライブラリをグローバルに指定します。
>>> from odps import options >>> options.df.libraries = ['six.whl', 'python_dateutil.whl']
即時呼び出しメソッドを使用する場合は、ライブラリをローカルに指定します。
>>> df.map_reduce(mapper=my_mapper, reducer=my_reducer, group='key').execute(libraries=['six.whl', 'python_dateutil.whl'])
説明バイトコード定義の違いにより、
yield from
などの Python 3 でサポートされている新機能に基づいてコードを記述すると、Python 2.7 の MaxCompute ワーカーでコードが実行されたときにエラーが報告されます。 Python 3 の MapReduce API を使用して本番操作を実装する前に、コードが正常に実行されることを確認することをお勧めします。
データを再シャッフルする
データがクラスタ内で不均一に分散されている場合は、reshuffle
メソッドを呼び出してデータを再シャッフルできます。
>>> df1 = df.reshuffle()
デフォルトでは、データは乱数としてハッシュされます。列ごとにデータを分散し、再シャッフルされたデータを特定の順序でソートすることもできます。
>>> df1.reshuffle('name', sort='id', ascending=False)
ブルームフィルター
PyODPS DataFrame は、ブルームフィルターでデータを計算するための bloom_filter
インターフェースを提供します。
ブルームフィルターは、sequence1
に存在するが sequence2
に存在しないデータをすばやく除外できますが、データが完全に除外されない場合があります。この近似フィルタリング方法は、データ量が大幅に異なる join
シナリオで特に役立ちます。たとえば、ユーザーの閲覧データをトランザクションデータと join
する場合、閲覧データの量はトランザクションデータの量よりもはるかに大きくなります。この場合、最初にトランザクションデータを使用してブルームフィルターを適用して閲覧データを事前にフィルタリングしてから、join
操作を実行できます。これにより、パフォーマンスが大幅に向上します。
>>> df1 = DataFrame(pd.DataFrame({'a': ['name1', 'name2', 'name3', 'name1'], 'b': [1, 2, 3, 4]}))
>>> df1
a b
0 name1 1
1 name2 2
2 name3 3
3 name1 4
>>> df2 = DataFrame(pd.DataFrame({'a': ['name1']}))
>>> df2
a
0 name1
>>> df1.bloom_filter('a', df2.a) # 行 0 は計算式にすることができます。たとえば、df1.a + '1' です。
a b
0 name1 1
1 name1 4
説明:
少量のデータが処理されます。したがって、
df1
の列a
にname2
とname3
が含まれる行は除外されます。ただし、システムが大量のデータを処理する場合、システムは指定された条件を満たすすべてのデータをフィルタリングできない場合があります。上記の
JOIN
操作では、フィルタリングされていないデータはデータの精度に影響しません。ただし、データフィルタリングにより、JOIN
パフォーマンスが大幅に向上します。capacity
パラメーターとerror_rate
パラメーターを指定して、データ量とエラー率を設定できます。デフォルト値は3000
と0.01
です。
capacity
パラメーターの値を増やすか、error_rate
パラメーターの値を減らすと、メモリの使用量が増加します。したがって、要件に基づいてパラメーターを適切な値に設定してください。
コレクションオブジェクトの詳細については、「DataFrame の実行」をご参照ください。
ピボットテーブル
PyODPS DataFrame は、ピボットテーブル機能を提供します。次のコードは、サンプルテーブルデータを示しています。
>>> df
A B C D E
0 foo one small 1 3
1 foo one large 2 4
2 foo one large 2 5
3 foo two small 3 6
4 foo two small 3 4
5 bar one large 4 5
6 bar one small 5 3
7 bar two small 6 2
8 bar two large 7 1
ピボットテーブル機能を使用する場合、1 つ以上のフィールドに基づいて平均値を取得するには、
rows
パラメーターが必要です。>>> df['A', 'D', 'E'].pivot_table(rows='A') A D_mean E_mean 0 bar 5.5 2.75 1 foo 2.2 4.40
rows
に複数のフィールドを指定して、フィールドに基づいてデータを集計できます。>>> df.pivot_table(rows=['A', 'B', 'C']) A B C D_mean E_mean 0 bar one large 4.0 5.0 1 bar one small 5.0 3.0 2 bar two large 7.0 1.0 3 bar two small 6.0 2.0 4 foo one large 2.0 4.5 5 foo one small 1.0 3.0 6 foo two small 3.0 5.0
values
パラメーターを使用して、計算する列を指定できます。>>> df.pivot_table(rows=['A', 'B'], values='D') A B D_mean 0 bar one 4.500000 1 bar two 6.500000 2 foo one 1.666667 3 foo two 3.000000
デフォルトでは、平均値が計算されます。
aggfunc
パラメーターを使用して、1 つ以上の集計関数を指定できます。>>> df.pivot_table(rows=['A', 'B'], values=['D'], aggfunc=['mean', 'count', 'sum']) A B D_mean D_count D_sum 0 bar one 4.500000 2 9 1 bar two 6.500000 2 13 2 foo one 1.666667 3 5 3 foo two 3.000000 2 6
元の列の値を新しいコレクションオブジェクトの列の値として使用できます。
>>> df.pivot_table(rows=['A', 'B'], values='D', columns='C') A B large_D_mean small_D_mean 0 bar one 4.0 5.0 1 bar two 7.0 6.0 2 foo one 2.0 1.0 3 foo two NaN 3.0
fill_value
を使用して空の値を置き換えることができます。>>> df.pivot_table(rows=['A', 'B'], values='D', columns='C', fill_value=0) A B large_D_mean small_D_mean 0 bar one 4 5 1 bar two 7 6 2 foo one 2 1 3 foo two 0 3
キーと値の文字列変換
DataFrame は、キーと値のペアを列に抽出し、標準列をキーと値のペアに変換できます。 DataFrame オブジェクトの作成と管理方法の詳細については、「DataFrame オブジェクトを作成する」をご参照ください。
キーと値のペアを列に抽出します。例:
>>> df name kv 0 name1 k1=1,k2=3,k5=10 1 name1 k1=7.1,k7=8.2 2 name2 k2=1.2,k3=1.5 3 name2 k9=1.1,k2=1
extract_kv
メソッドを使用して、キーと値のフィールドを抽出します。>>> df.extract_kv(columns=['kv'], kv_delim='=', item_delim=',') name kv_k1 kv_k2 kv_k3 kv_k5 kv_k7 kv_k9 0 name1 1.0 3.0 NaN 10.0 NaN NaN 1 name1 7.0 NaN NaN NaN 8.2 NaN 2 name2 NaN 1.2 1.5 NaN NaN NaN 3 name2 NaN 1.0 NaN NaN NaN 1.1
columns
パラメーターは、抽出するフィールドを指定します。kv_delim
パラメーターは、キーと値の間のデリミタを指定します。item_delim
パラメーターは、キーと値のペア間のデリミタを指定します。パラメーターを指定しない場合、キーと値はコロン(:)で区切られ、キーと値のペアはカンマ(,)で区切られます。出力フィールドの名前は、元のフィールド名とキー値の組み合わせです。名前とキー値は、アンダースコア(_
)を使用して連結されます。欠損している列のデフォルト値は NONE です。fill_value
を使用して、欠損している列の値を埋めることができます。>>> df.extract_kv(columns=['kv'], kv_delim='=', fill_value=0) name kv_k1 kv_k2 kv_k3 kv_k5 kv_k7 kv_k9 0 name1 1.0 3.0 0.0 10.0 0.0 0.0 1 name1 7.0 0.0 0.0 0.0 8.2 0.0 2 name2 0.0 1.2 1.5 0.0 0.0 0.0 3 name2 0.0 1.0 0.0 0.0 0.0 1.1
複数の列をキーと値のペアに変換します。例:
>>> df name k1 k2 k3 k5 k7 k9 0 name1 1.0 3.0 NaN 10.0 NaN NaN 1 name1 7.0 NaN NaN NaN 8.2 NaN 2 name2 NaN 1.2 1.5 NaN NaN NaN 3 name2 NaN 1.0 NaN NaN NaN 1.1
to_kv
メソッドを使用して、データをキーと値の形式に変換します。>>> df.to_kv(columns=['k1', 'k2', 'k3', 'k5', 'k7', 'k9'], kv_delim='=') name kv 0 name1 k1=1,k2=3,k5=10 1 name1 k1=7.1,k7=8.2 2 name2 k2=1.2,k3=1.5 3 name2 k9=1.1,k2=1