全部产品
Search
文档中心

MaxCompute:Contoh MultipleInOut

更新时间:Jun 19, 2025

Topik ini menjelaskan cara menjalankan MultipleInOut di MapReduce.

Prasyarat

Selesaikan konfigurasi lingkungan untuk pengujian. Lihat Memulai.

Persiapan

  1. Siapkan paket JAR program uji. Dalam topik ini, paket JAR diberi nama mapreduce-examples.jar dan disimpan di direktori bin\data\resources dalam jalur instalasi lokal MaxCompute.

  2. Siapkan tabel uji dan sumber daya untuk MultipleInOut.

    1. Buat tabel uji.

      CREATE TABLE wc_in1(key STRING, value STRING);
      CREATE TABLE wc_in2(key STRING, value STRING);
      CREATE TABLE mr_multiinout_out1 (key STRING, cnt BIGINT);
      CREATE TABLE mr_multiinout_out2 (key STRING, cnt BIGINT)  PARTITIONED BY (a string, b string);
      ALTER TABLE mr_multiinout_out2 ADD PARTITION (a='1', b='1');
      ALTER TABLE mr_multiinout_out2 ADD PARTITION (a='2', b='2');
    2. Tambahkan sumber daya uji.

      -- Saat menambahkan paket JAR untuk pertama kali, Anda dapat mengabaikan flag -f.
      add jar data\resources\mapreduce-examples.jar -f;
  3. Gunakan Tunnel untuk mengimpor data1.txt dan data2.txt, yang terletak di direktori bin klien MaxCompute, ke tabel wc_in1 dan wc_in2, masing-masing.

    tunnel upload data1.txt wc_in1;
    tunnel upload data2.txt wc_in2;

    Data berikut diimpor ke tabel wc_in1:

    hello,odps

    Data berikut diimpor ke tabel wc_in2:

    hello,world

Prosedur

Jalankan MultipleInOut pada klien MaxCompute.

jar -resources mapreduce-examples.jar -classpath data\resources\mapreduce-examples.jar
com.aliyun.odps.mapred.open.example.MultipleInOut wc_in1,wc_in2 mr_multiinout_out1,mr_multiinout_out2|a=1/b=1|out1,mr_multiinout_out2|a=2/b=2|out2;

Hasil yang Diharapkan

Pekerjaan berjalan normal. Data berikut dikembalikan di tabel mr_multiinout_out1:

+------------+------------+
| key        | cnt        |
+------------+------------+
| default    | 1          |
+------------+------------+

Data berikut dikembalikan di tabel mr_multiinout_out2:

+--------+------------+---+---+
| key    | cnt        | a | b |
+--------+------------+---+---+
| odps   | 1          | 1 | 1 |
| world  | 1          | 1 | 1 |
| out1   | 1          | 1 | 1 |
| hello  | 2          | 2 | 2 |
| out2   | 1          | 2 | 2 |
+--------+------------+---+---+

Kode Contoh

Untuk informasi tentang dependensi Project Object Model (POM), lihat Peringatan.

package com.aliyun.odps.mapred.open.example;
import java.io.IOException;
import java.util.Iterator;
import java.util.LinkedHashMap;
import com.aliyun.odps.data.Record;
import com.aliyun.odps.data.TableInfo;
import com.aliyun.odps.mapred.JobClient;
import com.aliyun.odps.mapred.MapperBase;
import com.aliyun.odps.mapred.ReducerBase;
import com.aliyun.odps.mapred.TaskContext;
import com.aliyun.odps.mapred.conf.JobConf;
import com.aliyun.odps.mapred.utils.InputUtils;
import com.aliyun.odps.mapred.utils.OutputUtils;
import com.aliyun.odps.mapred.utils.SchemaUtils;
/**
     * Contoh multi input & output.
     **/
public class MultipleInOut {
    public static class TokenizerMapper extends MapperBase {
        Record word;
        Record one;
        @Override
            public void setup(TaskContext context) throws IOException {
            word = context.createMapOutputKeyRecord();
            one = context.createMapOutputValueRecord();
            one.set(new Object[] { 1L });
        }
        @Override
            public void map(long recordNum, Record record, TaskContext context)
            throws IOException {
            for (int i = 0; i < record.getColumnCount(); i++) {
                word.set(new Object[] { record.get(i).toString() });
                context.write(word, one);
            }
        }
    }
    public static class SumReducer extends ReducerBase {
        private Record result;
        private Record result1;
        private Record result2;
        @Override
            public void setup(TaskContext context) throws IOException {
            /** Buat rekaman untuk setiap output dan tambahkan label untuk membedakan output. */
            result = context.createOutputRecord();
            result1 = context.createOutputRecord("out1");
            result2 = context.createOutputRecord("out2");
        }
        @Override
            public void reduce(Record key, Iterator<Record> values, TaskContext context)
            throws IOException {
            long count = 0;
            while (values.hasNext()) {
                Record val = values.next();
                count += (Long) val.get(0);
            }
            long mod = count % 3;
            if (mod == 0) {
                result.set(0, key.get(0));
                result.set(1, count);
                /** Jika Anda tidak menentukan label, output default digunakan. */
                context.write(result);
            } else if (mod == 1) {
                result1.set(0, key.get(0));
                result1.set(1, count);
                context.write(result1, "out1");
            } else {
                result2.set(0, key.get(0));
                result2.set(1, count);
                context.write(result2, "out2");
            }
        }
        @Override
            public void cleanup(TaskContext context) throws IOException {
            Record result = context.createOutputRecord();
            result.set(0, "default");
            result.set(1, 1L);
            context.write(result);
            Record result1 = context.createOutputRecord("out1");
            result1.set(0, "out1");
            result1.set(1, 1L);
            context.write(result1, "out1");
            Record result2 = context.createOutputRecord("out2");
            result2.set(0, "out2");
            result2.set(1, 1L);
            context.write(result2, "out2");
        }
    }
    /** Ubah string partisi seperti "ds=1/pt=2" menjadi MAP. */
    public static LinkedHashMap<String, String> convertPartSpecToMap(
        String partSpec) {
        LinkedHashMap<String, String> map = new LinkedHashMap<String, String>();
        if (partSpec != null && !partSpec.trim().isEmpty()) {
            String[] parts = partSpec.split("/");
            for (String part : parts) {
                String[] ss = part.split("=");
                if (ss.length != 2) {
                    throw new RuntimeException("ODPS-0730001: format spesifikasi part salah: "
                                               + partSpec);
                }
                map.put(ss[0], ss[1]);
            }
        }
        return map;
    }
    public static void main(String[] args) throws Exception {
        String[] inputs = null;
        String[] outputs = null;
        if (args.length == 2) {
            inputs = args[0].split(",");
            outputs = args[1].split(",");
        } else {
            System.err.println("MultipleInOut in... out...");
            System.exit(1);
        }
        JobConf job = new JobConf();
        job.setMapperClass(TokenizerMapper.class);
        job.setReducerClass(SumReducer.class);
        job.setMapOutputKeySchema(SchemaUtils.fromString("word:string"));
        job.setMapOutputValueSchema(SchemaUtils.fromString("count:bigint"));
        /** Parsing string tabel input. */
        for (String in : inputs) {
            String[] ss = in.split("\\|");
            if (ss.length == 1) {
                InputUtils.addTable(TableInfo.builder().tableName(ss[0]).build(), job);
            } else if (ss.length == 2) {
                LinkedHashMap<String, String> map = convertPartSpecToMap(ss[1]);
                InputUtils.addTable(TableInfo.builder().tableName(ss[0]).partSpec(map).build(), job);
            } else {
                System.err.println("Gaya input: " + in + " tidak benar");
                System.exit(1);
            }
        }
        /** Parsing string tabel output. */
        for (String out : outputs) {
            String[] ss = out.split("\\|");
            if (ss.length == 1) {
                OutputUtils.addTable(TableInfo.builder().tableName(ss[0]).build(), job);
            } else if (ss.length == 2) {
                LinkedHashMap<String, String> map = convertPartSpecToMap(ss[1]);
                OutputUtils.addTable(TableInfo.builder().tableName(ss[0]).partSpec(map).build(), job);
            } else if (ss.length == 3) {
                if (ss[1].isEmpty()) {
                    LinkedHashMap<String, String> map = convertPartSpecToMap(ss[2]);
                    OutputUtils.addTable(TableInfo.builder().tableName(ss[0]).partSpec(map).build(), job);
                } else {
                    LinkedHashMap<String, String> map = convertPartSpecToMap(ss[1]);
                    OutputUtils.addTable(TableInfo.builder().tableName(ss[0]).partSpec(map)
                                         .label(ss[2]).build(), job);
                }
            } else {
                System.err.println("Gaya output: " + out + " tidak benar");
                System.exit(1);
            }
        }
        JobClient.runJob(job);
    }
}