このトピックでは、一般的な MapReduce クラスとメソッドについて説明します。
Maven を使用する場合、Maven リポジトリ で odps-sdk-mapred を検索して、Java 用 SDK の最新バージョンを見つけることができます。次の Maven 依存関係を使用して、プロジェクトで SDK を宣言できます。
<dependency>
<groupId>com.aliyun.odps</groupId>
<artifactId>odps-sdk-mapred</artifactId>
<version>0.40.10-public</version>
</dependency>データ型
MaxCompute MapReduce がサポートするデータ型には、BIGINT、STRING、DOUBLE、BOOLEAN、DATETIME、DECIMAL が含まれます。次の表は、MaxCompute と Java のデータ型間のマッピングを示しています。
MaxCompute データ型 | Java データ型 |
BIGINT | LONG |
STRING | STRING |
DOUBLE | DOUBLE |
BOOLEAN | BOOLEAN |
DATETIME | DATE |
DECIMAL | BIGDECIMAL |
MapReduce クラス
クラス | 説明 |
MapperBase | ユーザー定義マッパークラスが継承する必要がある基底クラス。マッパーは、入力テーブルのレコードをキーと値のペアに変換し、キーと値のペアをレデューサーに渡します。または、マッパーは Reduce ステージをスキップして、キーと値のペアを結果テーブルに書き込むこともできます。Reduce ステージをスキップして計算結果を直接返すジョブは、MapOnly ジョブと呼ばれます。 |
ReducerBase | ユーザー定義レデューサークラスが継承する必要がある基底クラス。レデューサーは、キーに関連付けられた一連の値を縮小します。 |
TaskContext | タスクのコンテキストについて説明します。タスクコンテキストは、MapperBase および ReducerBase の複数のメンバーメソッドの入力パラメーターです。 |
JobClient | ジョブクライアントを定義します。ジョブクライアントは、ジョブを送信および管理します。ジョブクライアントは、ブロッキングモードまたは非ブロッキングモードでジョブを送信できます。ブロッキングモードは同期モードですが、非ブロッキングモードは非同期モードです。 |
RunningJob | 実行中のジョブを定義します。このクラスのオブジェクトは、実行中の MapReduce ジョブのインスタンスを追跡するために使用されます。 |
JobConf | MapReduce ジョブの構成について説明します。JobConf オブジェクトは、main 関数で定義されます。次に、ジョブクライアントは JobConf オブジェクトに基づいて MaxCompute にジョブを送信します。 |
MapperBase
次の表に、MapperBase クラスのメソッドを示します。
メソッド | 説明 |
void cleanup(TaskContext context) | Map ステージの最後に map メソッドの後に呼び出されるメソッド。 |
void map(long key, Record record, TaskContext context) | 入力テーブルのレコードを処理します。 |
void setup(TaskContext context) | Map ステージの最初に map メソッドの前に呼び出されるメソッド。 |
ReducerBase
次の表に、ReducerBase クラスのメソッドを示します。
メソッド | 説明 |
void cleanup( TaskContext context) | Reduce ステージの最後に reduce メソッドの後に呼び出されるメソッド。 |
void reduce(Record key, Iterator<Record > values, TaskContext context) | 入力テーブルのレコードを処理します。 |
void setup( TaskContext context) | Reduce ステージの最初に reduce メソッドの前に呼び出されるメソッド。 |
TaskContext
次の表に、TaskContext クラスのメソッドを示します。
メソッド | 説明 |
TableInfo[] getOutputTableInfo() | 出力テーブルに関する情報を取得します。 |
Record createOutputRecord() | デフォルトの出力テーブルのレコードを作成します。 |
Record createOutputRecord(String label) | 指定されたラベルの出力テーブルのレコードを作成します。 |
Record createMapOutputKeyRecord() | Map ステージで生成されるキーと値のペアのキーのレコードを作成します。 |
Record createMapOutputValueRecord() | Map ステージで生成されるキーと値のペアの値のレコードを作成します。 |
void write(Record record) | デフォルトの出力テーブルにレコードを書き込みます。このメソッドは、Reduce ステージで複数回呼び出すことができます。 |
void write(Record record, String label) | 指定されたラベルの出力テーブルにレコードを書き込みます。このメソッドは、Reduce ステージで複数回呼び出すことができます。 |
void write(Record key, Record value) | レコードをキーと値のペアに変換します。このメソッドは、Map ステージで複数回呼び出すことができます。 |
BufferedInputStream readResourceFileAsStream(String resourceName) | ファイルリソースを読み取ります。 |
Iterator<Record > readResourceTable(String resourceName) | テーブルリソースを読み取ります。 |
Counter getCounter(Enum<? > name) | 指定された名前のカウンターを取得します。 |
Counter getCounter(String group, String name) | 指定されたグループ内の指定された名前のカウンターを取得します。 |
void progress() | ハートビート情報を MapReduce フレームワークに送信します。タスクがデータの処理に長時間かかり、この期間中にフレームワークを呼び出す必要がない場合は、このメソッドを呼び出してタスクのタイムアウトを回避できます。タスクのデフォルトのタイムアウト期間は 600 秒です。 |
ワーカーが長時間実行され、フレームワークがワーカーがタイムアウトしたと判断した場合、フレームワークはワーカーを停止します。この場合、TaskContext クラスの progress メソッドを呼び出して、ワーカーが MapReduce によって停止されないようにすることができます。progress メソッドは、ハートビート情報をフレームワークに送信します。progress メソッドは、ワーカーの進捗状況を報告するためには使用されません。
MaxCompute MapReduce では、ワーカーのデフォルトのタイムアウト期間は 10 分です。タイムアウト期間を変更することはできません。ワーカーが 10 分以内に progress メソッドを呼び出してハートビート情報を送信しない場合、フレームワークはワーカーを終了し、map または reduce タスクは失敗します。そのため、map または reduce タスクで progress メソッドを定期的に呼び出して、フレームワークが予期せずワーカーを終了しないようにすることをお勧めします。
JobConf
次の表に、JobConf クラスのメソッドを示します。
メソッド | 説明 |
void setResources(String resourceNames) | 現在のジョブで使用されるリソースを宣言します。マッパーまたはレデューサーは、TaskContext オブジェクトで宣言されているリソースのみを読み取ることができます。 |
void setMapOutputKeySchema(Column[] schema) | マッパーからレデューサーに渡されるキーの属性を設定します。 |
void setMapOutputValueSchema(Column[] schema) | マッパーからレデューサーに渡される値の属性を設定します。 |
void setOutputKeySortColumns(String[] cols) | マッパーからレデューサーに渡されるキーをソートするための列を設定します。 |
void setOutputGroupingColumns(String[] cols) | キーをグループ化するための列を設定します。 |
void setMapperClass(Class<? extends Mapper > theClass) | ジョブのマッパーを設定します。 |
void setPartitionColumns(String[] cols) | ジョブのパーティションキー列を設定します。デフォルトでは、パーティションキー列は、マッパーによって生成されるキーのすべての列です。 |
void setReducerClass(Class<? extends Reducer > theClass) | ジョブのレデューサーを設定します。 |
void setCombinerClass(Class<? extends Reducer > theClass) | ジョブのコンバイナーを設定します。コンバイナーは、同じキーを持つレコードを結合します。レデューサーに似ていますが、Map ステージで動作します。 |
void setSplitSize(long size) | 分割サイズを MB 単位で設定します。デフォルトの分割サイズは 256 MB です。 |
void setNumReduceTasks(int n) | Reduce タスクの数を設定します。デフォルトでは、Reduce タスクの数は Map タスクの数の 4 分の 1 です。 |
void setMemoryForMapTask(int mem) | Map タスクのワーカーが使用できるメモリを MB 単位で設定します。デフォルトのメモリサイズは 2048 MB です。 |
void setMemoryForReduceTask(int mem) | Reduce タスクのワーカーが使用できるメモリを MB 単位で設定します。デフォルトのメモリサイズは 2048 MB です。 |
グループ化列は、ソート列から選択されます。ソート列とパーティションキー列は、キーに存在する必要があります。
Map ステージでは、マッパーからのレコードのハッシュ値は、指定されたパーティションキー列に基づいて計算されます。ハッシュ値は、レコードが渡されるレデューサーを決定するのに役立ちます。レコードは、レデューサーに渡される前に、ソート列に基づいてソートされます。
Reduce ステージでは、入力レコードはグループ化列に基づいてグループ化されます。次に、同じキーを共有するレコードのグループが、1 つの入力として reduce メソッドに渡されます。
JobClient
次の表に、JobClient クラスのメソッドを示します。
メソッド | 説明 |
static RunningJob runJob(JobConf job) | ブロッキング(同期)モードで MapReduce ジョブを送信し、ジョブが完了するまで待ってから戻ります。 |
static RunningJob submitJob(JobConf job) | 非ブロッキングモードで MapReduce ジョブを送信し、RunningJob オブジェクトを返します。 |
RunningJob
次の表に、RunningJob クラスのメソッドを示します。
メソッド | 説明 |
String getInstanceID() | ジョブインスタンスの ID を取得します。ジョブインスタンス ID を使用して、操作ログを表示したり、ジョブを管理したりできます。 |
boolean isComplete() | ジョブが完了したかどうかを確認します。 |
boolean isSuccessful() | ジョブインスタンスが成功したかどうかを確認します。 |
void waitForCompletion() | ジョブインスタンスが終了するのを待ちます。このメソッドは、同期モードで送信されるジョブに使用されます。 |
JobStatus getJobStatus() | ジョブインスタンスの実行ステータスを確認します。 |
void killJob() | 現在のジョブを終了します。 |
Counters getCounters() | カウンター情報を取得します。 |
InputUtils
次の表に、InputUtils クラスのメソッドを示します。
メソッド | 説明 |
static void addTable(TableInfo table, JobConf conf) | タスクに入力テーブルを追加します。このメソッドは複数回呼び出すことができます。新しいテーブルは入力キューに追加されます。 |
static void setTables(TableInfo [] tables, JobConf conf) | タスクに複数の入力テーブルを追加します。 |
OutputUtils
次の表に、OutputUtils クラスのメソッドを示します。
メソッド | 説明 |
static void addTable(TableInfo table, JobConf conf) | タスクに出力テーブルを追加します。このメソッドは複数回呼び出すことができます。新しいテーブルは出力キューに追加されます。 |
static void setTables(TableInfo[] tables, JobConf conf) | タスクに複数の出力テーブルを追加します。 |
Pipeline
Pipeline は、拡張 MapReduce モデルのメインクラスです。Pipeline.builder メソッドを呼び出して、パイプラインを構築できます。次のコードは、Pipeline クラスのメソッドを示しています。
public Builder addMapper(Class<? extends Mapper> mapper)
public Builder addMapper(Class<? extends Mapper> mapper,
Column[] keySchema, Column[] valueSchema, String[] sortCols,
SortOrder[] order, String[] partCols,
Class<? extends Partitioner> theClass, String[] groupCols)
public Builder addReducer(Class<? extends Reducer> reducer)
public Builder addReducer(Class<? extends Reducer> reducer,
Column[] keySchema, Column[] valueSchema, String[] sortCols,
SortOrder[] order, String[] partCols,
Class<? extends Partitioner> theClass, String[] groupCols)
public Builder setOutputKeySchema(Column[] keySchema)
public Builder setOutputValueSchema(Column[] valueSchema)
public Builder setOutputKeySortColumns(String[] sortCols)
public Builder setOutputKeySortOrder(SortOrder[] order)
public Builder setPartitionColumns(String[] partCols)
public Builder setPartitionerClass(Class<? extends Partitioner> theClass)
public Builder setOutputGroupingColumns(String[] cols)次の例は、Pipeline.builder メソッドを呼び出してパイプラインを構築する方法を示しています。
Job job = new Job();
Pipeline pipeline = Pipeline.builder()
.addMapper(TokenizerMapper.class)
.setOutputKeySchema(
new Column[] { new Column("word", OdpsType.STRING) })
.setOutputValueSchema(
new Column[] { new Column("count", OdpsType.BIGINT) })
.addReducer(SumReducer.class)
.setOutputKeySchema(
new Column[] { new Column("count", OdpsType.BIGINT) })
.setOutputValueSchema(
new Column[] { new Column("word", OdpsType.STRING),
new Column("count", OdpsType.BIGINT) })
.addReducer(IdentityReducer.class).createPipeline();
job.setPipeline(pipeline);
job.addInput(...)
job.addOutput(...)
job.submit();前の例に示すように、main 関数でマッパーの後に 2 つのレデューサーが続く MapReduce ジョブを作成できます。MapReduce の基本的な機能に精通していれば、拡張 MapReduce モデルは簡単に使用できます。
拡張 MapReduce モデルを使用する前に、MapReduce の使用方法を学習することをお勧めします。
JobConf を使用して、マッパーの後に 1 つのレデューサーのみが続く MapReduce ジョブを作成できます。