Multi-Input and Output

Last Updated: Sep 22, 2016

At present, ODPS supports the input and output of multiple tables. In multiple inputs, the column quantity and data types of multiple tables must be the same. IN multiple outputs, the column quantity and data types can be different.

Test Preparation

(1) Prepare the jar package of test program. Suppose that the package is named “mapreduce-examples.jar”.

(2) Prepare the multiple-in-out test tables and resource.

  • Create the tables:

    1. create table wc_in1(key string, value string);
    2. create table wc_in2(key string, value string);
    3. create table mr_multiinout_out1 (key string, cnt bigint);
    4. create table mr_multiinout_out2 (key string, cnt bigint) partitioned by (a string, b string);
    5. alter table mr_multiinout_out2 add partition (a='1', b='1');
    6. alter table mr_multiinout_out2 add partition (a='2', b='2');
  • Add the resource:

    1. add jar mapreduce-examples.jar -f;

(3) Use tunnel command to import data:

  1. tunnel upload data1 wc_in1;
  2. tunnel upload data2 wc_in2;
  • Import the data file into the table “wc_in1” and the corresponding contents are shown as follows:
    1. hello,odps
  • Import the data file into the table “wc_in2” and the corresponding contents are shown as follows:
    1. hello,world

Test Steps

ExecuteMultipleInOut on the odpscmd:

  1. jar -resources mapreduce-examples.jar -classpath mapreduce-examples.jar
  2. com.aliyun.odps.mapred.open.example.MultipleInOut wc_in1,wc_in2 mr_multiinout_out1,mr_multiinout_out2|a=1/b=1|out1,mr_multiinout_out2|a=2/b=2|out2;

Test Result

The job ran successfully.The contents in mr_multiinout_out1, as follows:

  1. +------------+------------+
  2. | key | cnt |
  3. +------------+------------+
  4. | default | 1 |
  5. +------------+------------+

The contents in mr_multiinout_out2, as follows:

  1. +--------+------------+---+---+
  2. | key | cnt | a | b |
  3. +--------+------------+---+---+
  4. | odps | 1 | 1 | 1 |
  5. | world | 1 | 1 | 1 |
  6. | out1 | 1 | 1 | 1 |
  7. | hello | 2 | 2 | 2 |
  8. | out2 | 1 | 2 | 2 |
  9. +--------+------------+---+---+

Sample Code

  1. package com.aliyun.odps.mapred.open.example;
  2. import java.io.IOException;
  3. import java.util.Iterator;
  4. import java.util.LinkedHashMap;
  5. import com.aliyun.odps.data.Record;
  6. import com.aliyun.odps.data.TableInfo;
  7. import com.aliyun.odps.mapred.JobClient;
  8. import com.aliyun.odps.mapred.MapperBase;
  9. import com.aliyun.odps.mapred.ReducerBase;
  10. import com.aliyun.odps.mapred.TaskContext;
  11. import com.aliyun.odps.mapred.conf.JobConf;
  12. import com.aliyun.odps.mapred.utils.InputUtils;
  13. import com.aliyun.odps.mapred.utils.OutputUtils;
  14. import com.aliyun.odps.mapred.utils.SchemaUtils;
  15. /**
  16. * Multi input & output example.
  17. **/
  18. public class MultipleInOut {
  19. public static class TokenizerMapper extends MapperBase {
  20. Record word;
  21. Record one;
  22. @Override
  23. public void setup(TaskContext context) throws IOException {
  24. word = context.createMapOutputKeyRecord();
  25. one = context.createMapOutputValueRecord();
  26. one.set(new Object[] { 1L });
  27. }
  28. @Override
  29. public void map(long recordNum, Record record, TaskContext context)
  30. throws IOException {
  31. for (int i = 0; i < record.getColumnCount(); i++) {
  32. word.set(new Object[] { record.get(i).toString() });
  33. context.write(word, one);
  34. }
  35. }
  36. }
  37. public static class SumReducer extends ReducerBase {
  38. private Record result;
  39. private Record result1;
  40. private Record result2;
  41. @Override
  42. public void setup(TaskContext context) throws IOException {
  43. result = context.createOutputRecord();
  44. result1 = context.createOutputRecord("out1");
  45. result2 = context.createOutputRecord("out2");
  46. }
  47. @Override
  48. public void reduce(Record key, Iterator<Record> values, TaskContext context)
  49. throws IOException {
  50. long count = 0;
  51. while (values.hasNext()) {
  52. Record val = values.next();
  53. count += (Long) val.get(0);
  54. }
  55. long mod = count % 3;
  56. if (mod == 0) {
  57. result.set(0, key.get(0));
  58. result.set(1, count);
  59. //if not specify label, output default outputs.
  60. context.write(result);
  61. } else if (mod == 1) {
  62. result1.set(0, key.get(0));
  63. result1.set(1, count);
  64. context.write(result1, "out1");
  65. } else {
  66. result2.set(0, key.get(0));
  67. result2.set(1, count);
  68. context.write(result2, "out2");
  69. }
  70. }
  71. @Override
  72. public void cleanup(TaskContext context) throws IOException {
  73. Record result = context.createOutputRecord();
  74. result.set(0, "default");
  75. result.set(1, 1L);
  76. context.write(result);
  77. Record result1 = context.createOutputRecord("out1");
  78. result1.set(0, "out1");
  79. result1.set(1, 1L);
  80. context.write(result1, "out1");
  81. Record result2 = context.createOutputRecord("out2");
  82. result2.set(0, "out2");
  83. result2.set(1, 1L);
  84. context.write(result2, "out2");
  85. }
  86. }
  87. public static LinkedHashMap<String, String> convertPartSpecToMap(
  88. String partSpec) {
  89. LinkedHashMap<String, String> map = new LinkedHashMap<String, String>();
  90. if (partSpec != null && !partSpec.trim().isEmpty()) {
  91. String[] parts = partSpec.split("/");
  92. for (String part : parts) {
  93. String[] ss = part.split("=");
  94. if (ss.length != 2) {
  95. throw new RuntimeException("ODPS-0730001: error part spec format: "
  96. + partSpec);
  97. }
  98. map.put(ss[0], ss[1]);
  99. }
  100. }
  101. return map;
  102. }
  103. public static void main(String[] args) throws Exception {
  104. String[] inputs = null;
  105. String[] outputs = null;
  106. if (args.length == 2) {
  107. inputs = args[0].split(",");
  108. outputs = args[1].split(",");
  109. } else {
  110. System.err.println("MultipleInOut in... out...");
  111. System.exit(1);
  112. }
  113. JobConf job = new JobConf();
  114. job.setMapperClass(TokenizerMapper.class);
  115. job.setReducerClass(SumReducer.class);
  116. job.setMapOutputKeySchema(SchemaUtils.fromString("word:string"));
  117. job.setMapOutputValueSchema(SchemaUtils.fromString("count:bigint"));
  118. //Parse the strings in user's input table.
  119. for (String in : inputs) {
  120. String[] ss = in.split("\\|");
  121. if (ss.length == 1) {
  122. InputUtils.addTable(TableInfo.builder().tableName(ss[0]).build(), job);
  123. } else if (ss.length == 2) {
  124. LinkedHashMap<String, String> map = convertPartSpecToMap(ss[1]);
  125. InputUtils.addTable(TableInfo.builder().tableName(ss[0]).partSpec(map).build(), job);
  126. } else {
  127. System.err.println("Style of input: " + in + " is not right");
  128. System.exit(1);
  129. }
  130. }
  131. //analyze the string in user input table.
  132. for (String out : outputs) {
  133. String[] ss = out.split("\\|");
  134. if (ss.length == 1) {
  135. OutputUtils.addTable(TableInfo.builder().tableName(ss[0]).build(), job);
  136. } else if (ss.length == 2) {
  137. LinkedHashMap<String, String> map = convertPartSpecToMap(ss[1]);
  138. OutputUtils.addTable(TableInfo.builder().tableName(ss[0]).partSpec(map).build(), job);
  139. } else if (ss.length == 3) {
  140. if (ss[1].isEmpty()) {
  141. LinkedHashMap<String, String> map = convertPartSpecToMap(ss[2]);
  142. OutputUtils.addTable(TableInfo.builder().tableName(ss[0]).partSpec(map).build(), job);
  143. } else {
  144. LinkedHashMap<String, String> map = convertPartSpecToMap(ss[1]);
  145. OutputUtils.addTable(TableInfo.builder().tableName(ss[0]).partSpec(map)
  146. .label(ss[2]).build(), job);
  147. }
  148. } else {
  149. System.err.println("Style of output: " + out + " is not right");
  150. System.exit(1);
  151. }
  152. }
  153. JobClient.runJob(job);
  154. }
  155. }
Thank you! We've received your feedback.