These examples show how to use InputUtils.addTable() with a partition spec to read from specific partitions in a MaxCompute MapReduce job.
Both examples show only the main function. The code is not complete and cannot be compiled or run directly — use it as a reference when building your own implementation.Example 1: Read from a single partition
Use this pattern when the partition value is known at job submission time.
public static void main(String[] args) throws Exception {
JobConf job = new JobConf();
...
LinkedHashMap<String, String> input = new LinkedHashMap<String, String>();
input.put("pt", "123456");
InputUtils.addTable(TableInfo.builder().tableName("input_table").partSpec(input).build(), job);
LinkedHashMap<String, String> output = new LinkedHashMap<String, String>();
output.put("ds", "654321");
OutputUtils.addTable(TableInfo.builder().tableName("output_table").partSpec(output).build(), job);
JobClient.runJob(job);
}Example 2: Read from multiple partitions dynamically
Use this pattern when you need to filter partitions at runtime. This example combines the MaxCompute SDK and the MapReduce SDK: the MaxCompute SDK lists all partitions on the table, and a custom applicable function determines which partitions to include as input.
The applicable function is custom logic that you implement to filter partitions based on your requirements.package com.aliyun.odps.mapred.open.example;
...
public static void main(String[] args) throws Exception {
if (args.length != 2) {
System.err.println("Usage: WordCount <in_table> <out_table>");
System.exit(2);
}
JobConf job = new JobConf();
job.setMapperClass(TokenizerMapper.class);
job.setCombinerClass(SumCombiner.class);
job.setReducerClass(SumReducer.class);
job.setMapOutputKeySchema(SchemaUtils.fromString("word:string"));
job.setMapOutputValueSchema(SchemaUtils.fromString("count:bigint"));
// Using an Alibaba Cloud account's AccessKey pair grants access to all API operations,
// which is a high-risk approach. Use a RAM user instead for routine operations.
// To create a RAM user, go to the Resource Access Management (RAM) console.
// Store credentials in environment variables rather than hardcoding them in your code.
Account account = new AliyunAccount(System.getenv("ALIBABA_CLOUD_ACCESS_KEY_ID"), System.getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET"));
Odps odps = new Odps(account);
odps.setEndpoint("odps_endpoint_url");
odps.setDefaultProject("my_project");
Table table = odps.tables().get(tblname);
TableInfoBuilder builder = TableInfo.builder().tableName(tblname);
for (Partition p : table.getPartitions()) {
if (applicable(p)) {
LinkedHashMap<String, String> partSpec = new LinkedHashMap<String, String>();
for (String key : p.getPartitionSpec().keys()) {
partSpec.put(key, p.getPartitionSpec().get(key));
}
InputUtils.addTable(builder.partSpec(partSpec).build(), job);
}
}
OutputUtils.addTable(TableInfo.builder().tableName(args[1]).build(), job);
JobClient.runJob(job);
}How the partition loop works:
odps.tables().get(tblname)uses the MaxCompute SDK to retrieve table metadata, including all partitions.The
forloop iterates over each partition and callsapplicable(p)— a custom function you implement — to decide whether to include it.For each included partition, a
LinkedHashMap<String, String>is built from the partition's key-value pairs, then passed toInputUtils.addTable()as the partition spec.The output table is added without a partition spec using
OutputUtils.addTable(TableInfo.builder().tableName(args[1]).build(), job).