Multi-input and output

Last Updated: Oct 31, 2017

Currently, MaxCompute supports input and output of multiple tables.

Test preparation

(1) Prepare the JAR package of the test program. Assume that the package is named mapreduce-examples.jar.

(2) Prepare tables and resources for testing the multi-table input and output operations.

  • Create tables.

    1. create table wc_in1(key string, value string);
    2. create table wc_in2(key string, value string);
    3. create table mr_multiinout_out1 (key string, cnt bigint);
    4. create table mr_multiinout_out2 (key string, cnt bigint) partitioned by (a string, b string);
    5. alter table mr_multiinout_out2 add partition (a='1', b='1');
    6. alter table mr_multiinout_out2 add partition (a='2', b='2');
  • Add resources.

    1. add jar mapreduce-examples.jar -f;

(3) Run tunnel to import data.

  1. tunnel upload data1 wc_in1;
  2. tunnel upload data2 wc_in2;
  • The data imported into the wc_in1 table is as follows:
  1. hello,odps
  • The data imported into the wc_in2 table is as follows:
  1. hello,world

Test procedure

Run MultipleInOut in odpscmd.

  1. jar -resources mapreduce-examples.jar -classpath mapreduce-examples.jar
  2. com.aliyun.odps.mapred.open.example.MultipleInOut wc_in1,wc_in2 mr_multiinout_out1,mr_multiinout_out2|a=1/b=1|out1,mr_multiinout_out2|a=2/b=2|out2;

Expected results

The job is successfully completed.The content of mr_multiinout_out1 is as follows:

  1. +------------+------------+
  2. | key | cnt |
  3. +------------+------------+
  4. | default | 1 |
  5. +------------+------------+

The content of mr_multiinout_out2 is as follows:

  1. +--------+------------+---+---+
  2. | key | cnt | a | b |
  3. +--------+------------+---+---+
  4. | odps | 1 | 1 | 1 |
  5. | world | 1 | 1 | 1 |
  6. | out1 | 1 | 1 | 1 |
  7. | hello | 2 | 2 | 2 |
  8. | out2 | 1 | 2 | 2 |
  9. +--------+------------+---+---+

Sample code

  1. package com.aliyun.odps.mapred.open.example;
  2. import java.io.IOException;
  3. import java.util.Iterator;
  4. import java.util.LinkedHashMap;
  5. import com.aliyun.odps.data.Record;
  6. import com.aliyun.odps.data.TableInfo;
  7. import com.aliyun.odps.mapred.JobClient;
  8. import com.aliyun.odps.mapred.MapperBase;
  9. import com.aliyun.odps.mapred.ReducerBase;
  10. import com.aliyun.odps.mapred.TaskContext;
  11. import com.aliyun.odps.mapred.conf.JobConf;
  12. import com.aliyun.odps.mapred.utils.InputUtils;
  13. import com.aliyun.odps.mapred.utils.OutputUtils;
  14. import com.aliyun.odps.mapred.utils.SchemaUtils;
  15. /**
  16. * Multi input & output example.
  17. **/
  18. public class MultipleInOut {
  19. public static class TokenizerMapper extends MapperBase {
  20. Record word;
  21. Record one;
  22. @Override
  23. public void setup(TaskContext context) throws IOException {
  24. word = context.createMapOutputKeyRecord();
  25. one = context.createMapOutputValueRecord();
  26. one.set(new Object[] { 1L });
  27. }
  28. @Override
  29. public void map(long recordNum, Record record, TaskContext context)
  30. throws IOException {
  31. for (int i = 0; i < record.getColumnCount(); i++) {
  32. word.set(new Object[] { record.get(i).toString() });
  33. context.write(word, one);
  34. }
  35. }
  36. }
  37. public static class SumReducer extends ReducerBase {
  38. private Record result;
  39. private Record result1;
  40. private Record result2;
  41. @Override
  42. public void setup(TaskContext context) throws IOException {
  43. result = context.createOutputRecord();
  44. result1 = context.createOutputRecord("out1");
  45. result2 = context.createOutputRecord("out2");
  46. }
  47. @Override
  48. public void reduce(Record key, Iterator<Record> values, TaskContext context)
  49. throws IOException {
  50. long count = 0;
  51. while (values.hasNext()) {
  52. Record val = values.next();
  53. count += (Long) val.get(0);
  54. }
  55. long mod = count % 3;
  56. if (mod == 0) {
  57. result.set(0, key.get(0));
  58. result.set(1, count);
  59. //No label is specified. Default output is adopted.
  60. context.write(result);
  61. } else if (mod == 1) {
  62. result1.set(0, key.get(0));
  63. result1.set(1, count);
  64. context.write(result1, "out1");
  65. } else {
  66. result2.set(0, key.get(0));
  67. result2.set(1, count);
  68. context.write(result2, "out2");
  69. }
  70. }
  71. @Override
  72. public void cleanup(TaskContext context) throws IOException {
  73. Record result = context.createOutputRecord();
  74. result.set(0, "default");
  75. result.set(1, 1L);
  76. context.write(result);
  77. Record result1 = context.createOutputRecord("out1");
  78. result1.set(0, "out1");
  79. result1.set(1, 1L);
  80. context.write(result1, "out1");
  81. Record result2 = context.createOutputRecord("out2");
  82. result2.set(0, "out2");
  83. result2.set(1, 1L);
  84. context.write(result2, "out2");
  85. }
  86. }
  87. public static LinkedHashMap<String, String> convertPartSpecToMap(
  88. String partSpec) {
  89. LinkedHashMap<String, String> map = new LinkedHashMap<String, String>();
  90. if (partSpec != null && !partSpec.trim().isEmpty()) {
  91. String[] parts = partSpec.split("/");
  92. for (String part : parts) {
  93. String[] ss = part.split("=");
  94. if (ss.length != 2) {
  95. throw new RuntimeException("ODPS-0730001: error part spec format: "
  96. + partSpec);
  97. }
  98. map.put(ss[0], ss[1]);
  99. }
  100. }
  101. return map;
  102. }
  103. public static void main(String[] args) throws Exception {
  104. String[] inputs = null;
  105. String[] outputs = null;
  106. if (args.length == 2) {
  107. inputs = args[0].split(",");
  108. outputs = args[1].split(",");
  109. } else {
  110. System.err.println("MultipleInOut in... out...");
  111. System.exit(1);
  112. }
  113. JobConf job = new JobConf();
  114. job.setMapperClass(TokenizerMapper.class);
  115. job.setReducerClass(SumReducer.class);
  116. job.setMapOutputKeySchema(SchemaUtils.fromString("word:string"));
  117. job.setMapOutputValueSchema(SchemaUtils.fromString("count:bigint"));
  118. //Parse the user input table strings.
  119. for (String in : inputs) {
  120. String[] ss = in.split("\\|");
  121. if (ss.length == 1) {
  122. InputUtils.addTable(TableInfo.builder().tableName(ss[0]).build(), job);
  123. } else if (ss.length == 2) {
  124. LinkedHashMap<String, String> map = convertPartSpecToMap(ss[1]);
  125. InputUtils.addTable(TableInfo.builder().tableName(ss[0]).partSpec(map).build(), job);
  126. } else {
  127. System.err.println("Style of input: " + in + " is not right");
  128. System.exit(1);
  129. }
  130. }
  131. //Parse the user output table strings.
  132. for (String out : outputs) {
  133. String[] ss = out.split("\\|");
  134. if (ss.length == 1) {
  135. OutputUtils.addTable(TableInfo.builder().tableName(ss[0]).build(), job);
  136. } else if (ss.length == 2) {
  137. LinkedHashMap<String, String> map = convertPartSpecToMap(ss[1]);
  138. OutputUtils.addTable(TableInfo.builder().tableName(ss[0]).partSpec(map).build(), job);
  139. } else if (ss.length == 3) {
  140. if (ss[1].isEmpty()) {
  141. LinkedHashMap<String, String> map = convertPartSpecToMap(ss[2]);
  142. OutputUtils.addTable(TableInfo.builder().tableName(ss[0]).partSpec(map).build(), job);
  143. } else {
  144. LinkedHashMap<String, String> map = convertPartSpecToMap(ss[1]);
  145. OutputUtils.addTable(TableInfo.builder().tableName(ss[0]).partSpec(map)
  146. .label(ss[2]).build(), job);
  147. }
  148. } else {
  149. System.err.println("Style of output: " + out + " is not right");
  150. System.exit(1);
  151. }
  152. }
  153. JobClient.runJob(job);
  154. }
  155. }
Thank you! We've received your feedback.