All Products
Search
Document Center

MaxCompute:Examples of using partitioned tables as input

Last Updated:Mar 26, 2026

These examples show how to use InputUtils.addTable() with a partition spec to read from specific partitions in a MaxCompute MapReduce job.

Both examples show only the main function. The code is not complete and cannot be compiled or run directly — use it as a reference when building your own implementation.

Example 1: Read from a single partition

Use this pattern when the partition value is known at job submission time.

public static void main(String[] args) throws Exception {
    JobConf job = new JobConf();
    ...
    LinkedHashMap<String, String> input = new LinkedHashMap<String, String>();
    input.put("pt", "123456");
    InputUtils.addTable(TableInfo.builder().tableName("input_table").partSpec(input).build(), job);
    LinkedHashMap<String, String> output = new LinkedHashMap<String, String>();
    output.put("ds", "654321");
    OutputUtils.addTable(TableInfo.builder().tableName("output_table").partSpec(output).build(), job);
    JobClient.runJob(job);
}

Example 2: Read from multiple partitions dynamically

Use this pattern when you need to filter partitions at runtime. This example combines the MaxCompute SDK and the MapReduce SDK: the MaxCompute SDK lists all partitions on the table, and a custom applicable function determines which partitions to include as input.

The applicable function is custom logic that you implement to filter partitions based on your requirements.
package com.aliyun.odps.mapred.open.example;
...
    public static void main(String[] args) throws Exception {
    if (args.length != 2) {
        System.err.println("Usage: WordCount <in_table> <out_table>");
        System.exit(2);
    }
    JobConf job = new JobConf();
    job.setMapperClass(TokenizerMapper.class);
    job.setCombinerClass(SumCombiner.class);
    job.setReducerClass(SumReducer.class);
    job.setMapOutputKeySchema(SchemaUtils.fromString("word:string"));
    job.setMapOutputValueSchema(SchemaUtils.fromString("count:bigint"));
    // Using an Alibaba Cloud account's AccessKey pair grants access to all API operations,
    // which is a high-risk approach. Use a RAM user instead for routine operations.
    // To create a RAM user, go to the Resource Access Management (RAM) console.
    // Store credentials in environment variables rather than hardcoding them in your code.
    Account account = new AliyunAccount(System.getenv("ALIBABA_CLOUD_ACCESS_KEY_ID"), System.getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET"));
    Odps odps = new Odps(account);
    odps.setEndpoint("odps_endpoint_url");
    odps.setDefaultProject("my_project");
    Table table = odps.tables().get(tblname);
    TableInfoBuilder builder = TableInfo.builder().tableName(tblname);
    for (Partition p : table.getPartitions()) {
        if (applicable(p)) {
            LinkedHashMap<String, String> partSpec = new LinkedHashMap<String, String>();
            for (String key : p.getPartitionSpec().keys()) {
                partSpec.put(key, p.getPartitionSpec().get(key));
            }
            InputUtils.addTable(builder.partSpec(partSpec).build(), job);
        }
    }
    OutputUtils.addTable(TableInfo.builder().tableName(args[1]).build(), job);
    JobClient.runJob(job);
}

How the partition loop works:

  1. odps.tables().get(tblname) uses the MaxCompute SDK to retrieve table metadata, including all partitions.

  2. The for loop iterates over each partition and calls applicable(p) — a custom function you implement — to decide whether to include it.

  3. For each included partition, a LinkedHashMap<String, String> is built from the partition's key-value pairs, then passed to InputUtils.addTable() as the partition spec.

  4. The output table is added without a partition spec using OutputUtils.addTable(TableInfo.builder().tableName(args[1]).build(), job).