このトピックでは、Pythonを使用してHadoopストリーミングジョブを送信する方法について説明します。
前提条件
E-MapReduce(EMR)Hadoopクラスターが作成されていること。
クラスターの作成方法の詳細については、クラスターの作成をご参照ください。
手順
- SSHモードでHadoopクラスターにログオンします。詳細については、クラスターへのログオンをご参照ください。
- mapper.pyという名前のファイルを作成します。
- 次のコマンドを実行して、mapper.pyという名前のファイルを作成し、ファイルを開きます。
vim /home/hadoop/mapper.py Iキーを押して、編集モードに切り替えます。- mapper.py ファイルに次の情報を追加します。
#!/usr/bin/env python import sys for line in sys.stdin: line = line.strip() words = line.split() for word in words: print '%s\t%s' % (word, 1) Escを押して編集モードを終了します。次に、:wqと入力してファイルを保存して閉じます。
- 次のコマンドを実行して、mapper.pyという名前のファイルを作成し、ファイルを開きます。
- reducer.pyという名前のファイルを作成します。
- 次のコマンドを実行して、reducer.pyという名前のファイルを作成し、ファイルを開きます。
vim /home/hadoop/reducer.py Iキーを押して、編集モードに切り替えます。- reducer.py ファイルに次の情報を追加します。
#!/usr/bin/env python from operator import itemgetter import sys current_word = None current_count = 0 word = None for line in sys.stdin: line = line.strip() word, count = line.split('\t', 1) try: count = int(count) except ValueError: continue if current_word == word: current_count += count else: if current_word: print '%s\t%s' % (current_word, current_count) current_count = count current_word = word if current_word == word: print '%s\t%s' % (current_word, current_count) Escを押して編集モードを終了します。次に、:wqと入力してファイルを保存して閉じます。
- 次のコマンドを実行して、reducer.pyという名前のファイルを作成し、ファイルを開きます。
- 次のコマンドを実行して、hosts ファイルを HDFS にアップロードします。
hdfs dfs -put /etc/hosts /tmp/ - 次のコマンドを実行して、Hadoopストリーミングジョブを送信します。
hadoop jar /usr/lib/hadoop-current/share/hadoop/tools/lib/hadoop-streaming-X.X.X.jar -file /home/hadoop/mapper.py -mapper mapper.py -file /home/hadoop/reducer.py -reducer reducer.py -input /tmp/hosts -output /tmp/outputパラメーター 説明 input 入力パス。この例では、入力パスは /tmp/hosts です。 output 出力パス。この例では、出力パスは /tmp/output です。 説明 hadoop-streaming-X.X.X.jar のX.X.Xは、JARパッケージのバージョンを示します。JARパッケージのバージョンは、クラスターのHadoopバージョンと同じである必要があります。/usr/lib/hadoop-current/share/hadoop/tools/lib/ ディレクトリでJARパッケージのバージョンを確認できます。