This example shows how to register a JAR package and a text file as MaxCompute resources, then use them in a MapReduce job to upload data into a table.
How it works
The mapper reads the text file at setup time using context.readResourceFileAsStream(), parses each line, and writes records to the output table. The job has no reduce phase.
The -resources flag and -classpath flag serve different purposes:
| Flag | Purpose |
|---|---|
-resources mapreduce-examples.jar,import.txt |
Registers files so MaxCompute distributes them to worker nodes. Use this for any file the job needs to read at runtime. |
-classpath data\resources\mapreduce-examples.jar |
Adds the JAR to the Java classpath so the JVM can load the Upload class. Use this for JARs that contain the job code. |
Prerequisites
Before you begin, ensure that you have:
-
Completed the environment setup described in Getting started
-
The
mapreduce-examples.jarfile in thebin\data\resourcesdirectory of your local MaxCompute installation
Prepare tables and resources
Run the following commands on the MaxCompute client.
-
Create the output table:
CREATE TABLE mr_upload_src(key BIGINT, value STRING); -
Add the text file and JAR package as resources:
add file data\resources\import.txt -f; add jar data\resources\mapreduce-examples.jar -f;When adding the JAR package for the first time, you can ignore the
-fflag.The
import.txtfile contains the following data:1000,odps
Run the job
Run the following command on the MaxCompute client to start the upload job:
jar -resources mapreduce-examples.jar,import.txt -classpath data\resources\mapreduce-examples.jar
com.aliyun.odps.mapred.open.example.Upload import.txt mr_upload_src;
Parameter reference:
| Parameter | Description |
|---|---|
-resources mapreduce-examples.jar,import.txt |
Registers the JAR and text file so MaxCompute distributes them to worker nodes |
-classpath data\resources\mapreduce-examples.jar |
Adds the JAR to the Java classpath so the Upload class can be loaded |
import.txt |
First argument to Upload: the resource file name, passed to the job via import.filename in JobConf |
mr_upload_src |
Second argument to Upload: the output table |
Verify the result
After the job completes, query mr_upload_src. The table should contain the following row, which maps directly to the 1000,odps line in import.txt (field 0 → key, field 1 → value):
+------------+------------+
| key | value |
+------------+------------+
| 1000 | odps |
+------------+------------+
Sample code
For Project Object Model (POM) dependency configuration, see Getting started — Precautions.
package com.aliyun.odps.mapred.open.example;
import java.io.BufferedInputStream;
import java.io.FileNotFoundException;
import java.io.IOException;
import com.aliyun.odps.data.Record;
import com.aliyun.odps.data.TableInfo;
import com.aliyun.odps.mapred.JobClient;
import com.aliyun.odps.mapred.MapperBase;
import com.aliyun.odps.mapred.TaskContext;
import com.aliyun.odps.mapred.conf.JobConf;
import com.aliyun.odps.mapred.utils.InputUtils;
import com.aliyun.odps.mapred.utils.OutputUtils;
import com.aliyun.odps.mapred.utils.SchemaUtils;
/**
* Upload
* Import data from a text file into a table.
*/
public class Upload {
public static class UploadMapper extends MapperBase {
@Override
public void setup(TaskContext context) throws IOException {
Record record = context.createOutputRecord();
StringBuilder importdata = new StringBuilder();
BufferedInputStream bufferedInput = null;
try {
byte[] buffer = new byte[1024];
int bytesRead = 0;
// Get the resource file name from JobConf
String filename = context.getJobConf().get("import.filename");
// Read the resource file distributed to this worker node
bufferedInput = context.readResourceFileAsStream(filename);
while ((bytesRead = bufferedInput.read(buffer)) != -1) {
String chunk = new String(buffer, 0, bytesRead);
importdata.append(chunk);
}
// Parse each line: split by comma, write key (BIGINT) and value (STRING)
String lines[] = importdata.toString().split("\n");
for (int i = 0; i < lines.length; i++) {
String[] ss = lines[i].split(",");
record.set(0, Long.parseLong(ss[0].trim()));
record.set(1, ss[1].trim());
context.write(record);
}
} catch (FileNotFoundException ex) {
throw new IOException(ex);
} catch (IOException ex) {
throw new IOException(ex);
} finally {
}
}
@Override
public void map(long recordNum, Record record, TaskContext context)
throws IOException {
}
}
public static void main(String[] args) throws Exception {
if (args.length != 2) {
System.err.println("Usage: Upload <import_txt> <out_table>");
System.exit(2);
}
JobConf job = new JobConf();
job.setMapperClass(UploadMapper.class);
// Pass the resource file name to the mapper via JobConf
job.set("import.filename", args[0]);
// Set reducers to 0: this is a map-only job
job.setNumReduceTasks(0);
job.setMapOutputKeySchema(SchemaUtils.fromString("key:bigint"));
job.setMapOutputValueSchema(SchemaUtils.fromString("value:string"));
InputUtils.addTable(TableInfo.builder().tableName("mr_empty").build(), job);
OutputUtils.addTable(TableInfo.builder().tableName(args[1]).build(), job);
JobClient.runJob(job);
}
}
Set JobConf
The example above uses the JobConf interface in the SDK to set job properties. Alternatively, use the -conf parameter in the jar command to specify a JobConf configuration file:
jar -conf <your-jobconf-file> ...