MaxCompute MapReduceフレームワークは、Join操作をサポートしていません。 ただし、カスタムマップまたはreduce関数を使用してデータを結合できます。
前提条件
テスト用の環境設定を完了します。「はじめに」をご参照ください。
準備
テストプログラムのJARパッケージを準備します。 このトピックでは、JARパッケージの名前はmapreduce-examples.jarで、MaxComputeのローカルインストールパスのbin\data\resourcesディレクトリに格納されます。
Join用のテストテーブルとリソースを準備します。
テストテーブルを作成します。 mr_Join_src1およびmr_Join_src2テーブルはテスト手順で結合され、mr_join_outテーブルはJoin操作の出力テーブルとして使用されます。
CREATE TABLE mr_Join_src1(key BIGINT, value STRING); CREATE TABLE mr_Join_src2(key BIGINT, value STRING); CREATE TABLE mr_Join_out(key BIGINT, value1 STRING, value2 STRING);テストリソースを追加します。
-- When adding the JAR package for the first time, you can ignore the -f flag. add jar data\resources\mapreduce-examples.jar -f;
Tunnelを使用して、MaxComputeクライアントのbinディレクトリにある
data1.txtおよびdata2.txtを、それぞれテーブルmr_Join_src1およびmr_Join_src2にインポートします。tunnel upload data1.txt mr_Join_src1; tunnel upload data2.txt mr_Join_src2;次のデータがmr_Join_src1テーブルにインポートされます。
1,hello 2,odps次のデータがmr_Join_src2テーブルにインポートされます。
1,odps 3,hello 4,odps
手順
MaxComputeクライアントでJoin操作を実行します。
jar -resources mapreduce-examples.jar -classpath data\resources\mapreduce-examples.jar
com.aliyun.odps.mapred.open.example.Join mr_Join_src1 mr_Join_src2 mr_Join_out;期待される結果
ジョブは正常に実行されます。 次のデータがmr_Join_outテーブルで返されます。 value1はmr_Join_src1テーブルの値を示し、value2はmr_Join_src2テーブルの値を示します。
+------------+------------+------------+
| key | value1 | value2 |
+------------+------------+------------+
| 1 | hello | odps |
+------------+------------+------------+サンプルコード
プロジェクトオブジェクトモデル (POM) の依存関係については、「注意事項」をご参照ください。
package com.aliyun.odps.mapred.open.example;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import com.aliyun.odps.data.Record;
import com.aliyun.odps.data.TableInfo;
import com.aliyun.odps.mapred.JobClient;
import com.aliyun.odps.mapred.MapperBase;
import com.aliyun.odps.mapred.ReducerBase;
import com.aliyun.odps.mapred.conf.JobConf;
import com.aliyun.odps.mapred.utils.InputUtils;
import com.aliyun.odps.mapred.utils.OutputUtils;
import com.aliyun.odps.mapred.utils.SchemaUtils;
/**
* Join, mr_Join_src1/mr_Join_src2(key bigint, value string), mr_Join_out(key
* bigint, value1 string, value2 string)
*
*/
public class Join {
public static final Log LOG = LogFactory.getLog(Join.class);
public static class JoinMapper extends MapperBase {
private Record mapkey;
private Record mapvalue;
private long tag;
@Override
public void setup(TaskContext context) throws IOException {
mapkey = context.createMapOutputKeyRecord();
mapvalue = context.createMapOutputValueRecord();
tag = context.getInputTableInfo().getLabel().equals("left") ? 0 : 1;
}
@Override
public void map(long key, Record record, TaskContext context)
throws IOException {
mapkey.set(0, record.get(0));
mapkey.set(1, tag);
for (int i = 1; i < record.getColumnCount(); i++) {
mapvalue.set(i - 1, record.get(i));
}
context.write(mapkey, mapvalue);
}
}
public static class JoinReducer extends ReducerBase {
private Record result = null;
@Override
public void setup(TaskContext context) throws IOException {
result = context.createOutputRecord();
}
/** Each input of the reduce function is the records that have the same key. */
@Override
public void reduce(Record key, Iterator<Record> values, TaskContext context)
throws IOException {
long k = key.getBigint(0);
List<Object[]> leftValues = new ArrayList<Object[]>();
/** Records are sorted based on the combination of the key and tag. This ensures that records in the left table are passed to the reduce function first when the reduce function performs the Join operation. */
while (values.hasNext()) {
Record value = values.next();
long tag = (Long) key.get(1);
/** Data in the left table is first cached in memory. */
if (tag == 0) {
leftValues.add(value.toArray().clone());
} else {
/** Data in the right table is joined with all data in the left table. */
/** The sample code has poor performance and is only used as an example. We recommend that you do not use the code in your production environment. */
for (Object[] leftValue : leftValues) {
int index = 0;
result.set(index++, k);
for (int i = 0; i < leftValue.length; i++) {
result.set(index++, leftValue[i]);
}
for (int i = 0; i < value.getColumnCount(); i++) {
result.set(index++, value.get(i));
}
context.write(result);
}
}
}
}
}
public static void main(String[] args) throws Exception {
if (args.length != 3) {
System.err.println("Usage: Join <input table1> <input table2> <out>");
System.exit(2);
}
JobConf job = new JobConf();
job.setMapperClass(JoinMapper.class);
job.setReducerClass(JoinReducer.class);
job.setMapOutputKeySchema(SchemaUtils.fromString("key:bigint,tag:bigint"));
job.setMapOutputValueSchema(SchemaUtils.fromString("value:string"));
job.setPartitionColumns(new String[]{"key"});
job.setOutputKeySortColumns(new String[]{"key", "tag"});
job.setOutputGroupingColumns(new String[]{"key"});
job.setNumReduceTasks(1);
InputUtils.addTable(TableInfo.builder().tableName(args[0]).label("left").build(), job);
InputUtils.addTable(TableInfo.builder().tableName(args[1]).label("right").build(), job);
OutputUtils.addTable(TableInfo.builder().tableName(args[2]).build(), job);
JobClient.runJob(job);
}
}