Pipeline

Last Updated: Jun 10, 2016

Test Preparation

(1) Prepare the jar package of test program. Suppose that the package is named “mapreduce-examples.jar”.

(2) Prepare the pipeline test table and resource.

  • Create the tables:
    1. create table wc_in (key string, value string);
    2. create table wc_out(key string, cnt bigint);
  • Add the resource:
    1. add jar mapreduce-examples.jar -f;

(3) Use the tunnel command to import the data:

  1. tunnel upload data wc_in;
  • The contents of data file imported into the table “wc_in”:
    1. hello,odps

Test Steps

Execute WordCountPipeline on the odpscmd:

  1. jar -resources mapreduce-examples.jar -classpath mapreduce-examples.jar
  2. com.aliyun.odps.mapred.open.example.WordCountPipeline wc_in wc_out;

Test Result

The job ran successfully.The contents of output table “wc_out”:

  1. +------------+------------+
  2. | key | cnt |
  3. +------------+------------+
  4. | hello | 1 |
  5. | odps | 1 |
  6. +------------+------------+

Sample Code

  1. package com.aliyun.odps.mapred.example;
  2. import java.io.IOException;
  3. import java.util.Iterator;
  4. import com.aliyun.odps.Column;
  5. import com.aliyun.odps.OdpsException;
  6. import com.aliyun.odps.OdpsType;
  7. import com.aliyun.odps.data.Record;
  8. import com.aliyun.odps.data.TableInfo;
  9. import com.aliyun.odps.mapred.Job;
  10. import com.aliyun.odps.mapred.MapperBase;
  11. import com.aliyun.odps.mapred.ReducerBase;
  12. import com.aliyun.odps.pipeline.Pipeline;
  13. public class Histogram {
  14. public static class TokenizerMapper extends MapperBase {
  15. Record word;
  16. Record one;
  17. @Override
  18. public void setup(TaskContext context) throws IOException {
  19. word = context.createMapOutputKeyRecord();
  20. one = context.createMapOutputValueRecord();
  21. one.setBigint(0, 1L);
  22. }
  23. @Override
  24. public void map(long recordNum, Record record, TaskContext context)
  25. throws IOException {
  26. for (int i = 0; i < record.getColumnCount(); i++) {
  27. String[] words = record.get(i).toString().split("\\s+");
  28. for (String w : words) {
  29. word.setString(0, w);
  30. context.write(word, one);
  31. }
  32. }
  33. }
  34. }
  35. public static class SumReducer extends ReducerBase {
  36. private Record num;
  37. private Record result;
  38. @Override
  39. public void setup(TaskContext context) throws IOException {
  40. num = context.createOutputKeyRecord();
  41. result = context.createOutputValueRecord();
  42. }
  43. @Override
  44. public void reduce(Record key, Iterator<Record> values, TaskContext context)
  45. throws IOException {
  46. long count = 0;
  47. while (values.hasNext()) {
  48. Record val = values.next();
  49. count += (Long) val.get(0);
  50. }
  51. result.set(0, key.get(0));
  52. num.set(0, count);
  53. context.write(num, result);
  54. }
  55. }
  56. public static class IdentityReducer extends ReducerBase {
  57. @Override
  58. public void reduce(Record key, Iterator<Record> values, TaskContext context)
  59. throws IOException {
  60. while (values.hasNext()) {
  61. context.write(values.next());
  62. }
  63. }
  64. }
  65. public static void main(String[] args) throws OdpsException {
  66. if (args.length != 2) {
  67. System.err.println("Usage: orderedwordcount <in_table> <out_table>");
  68. System.exit(2);
  69. }
  70. Job job = new Job();
  71. Pipeline pipeline = Pipeline.builder()
  72. .addMapper(TokenizerMapper.class)
  73. .setOutputKeySchema(
  74. new Column[] { new Column("word", OdpsType.STRING) })
  75. .setOutputValueSchema(
  76. new Column[] { new Column("count", OdpsType.BIGINT) })
  77. .setOutputKeySortColumns(new String[] { "word" })
  78. .setPartitionColumns(new String[] { "word" })
  79. .setOutputGroupingColumns(new String[] { "word" })
  80. .addReducer(SumReducer.class)
  81. .setOutputKeySchema(
  82. new Column[] { new Column("count", OdpsType.BIGINT) })
  83. .setOutputValueSchema(
  84. new Column[] { new Column("word", OdpsType.STRING)})
  85. .addReducer(IdentityReducer.class).createPipeline();
  86. job.setPipeline(pipeline);
  87. job.addInput(TableInfo.builder().tableName(args[0]).build());
  88. job.addOutput(TableInfo.builder().tableName(args[1]).build());
  89. job.submit();
  90. job.waitForCompletion();
  91. System.exit(job.isSuccessful() == true ? 0 : 1);
  92. }
  93. }
Thank you! We've received your feedback.