Topology sorting

Last Updated: Oct 31, 2017

In directed edge (u,v), all vertex sequences satisfying u < v are called topological sequences. Topological sorting is an algorithm used to calculate the topological sequence of a directed graph.

Algorithm:

  • Finds a vertex that does not have any inbound edge from the graph and outputs the vertex.
  • Deletes the vertex and all outbound edges from the graph.
  • Repeats the preceding steps until all vertices are output.

Source code

  1. import java.io.IOException;
  2. import org.apache.commons.logging.Log;
  3. import org.apache.commons.logging.LogFactory;
  4. import com.aliyun.odps.data.TableInfo;
  5. import com.aliyun.odps.graph.Aggregator;
  6. import com.aliyun.odps.graph.Combiner;
  7. import com.aliyun.odps.graph.ComputeContext;
  8. import com.aliyun.odps.graph.GraphJob;
  9. import com.aliyun.odps.graph.GraphLoader;
  10. import com.aliyun.odps.graph.MutationContext;
  11. import com.aliyun.odps.graph.Vertex;
  12. import com.aliyun.odps.graph.WorkerContext;
  13. import com.aliyun.odps.io.LongWritable;
  14. import com.aliyun.odps.io.NullWritable;
  15. import com.aliyun.odps.io.BooleanWritable;
  16. import com.aliyun.odps.io.WritableRecord;
  17. public class TopologySort {
  18. private final static Log LOG = LogFactory.getLog(TopologySort.class);
  19. public static class TopologySortVertex extends
  20. Vertex<LongWritable, LongWritable, NullWritable, LongWritable> {
  21. @Override
  22. public void compute(
  23. ComputeContext<LongWritable, LongWritable, NullWritable, LongWritable> context,
  24. Iterable<LongWritable> messages) throws IOException {
  25. // in superstep 0, each vertex sends message whose value is 1 to its
  26. // neighbors
  27. if (context.getSuperstep() == 0) {
  28. if (hasEdges()) {
  29. context.sendMessageToNeighbors(this, new LongWritable(1L));
  30. }
  31. } else if (context.getSuperstep() >= 1) {
  32. // compute each vertex's indegree
  33. long indegree = getValue().get();
  34. for (LongWritable msg : messages) {
  35. indegree += msg.get();
  36. }
  37. setValue(new LongWritable(indegree));
  38. if (indegree == 0) {
  39. voteToHalt();
  40. if (hasEdges()) {
  41. context.sendMessageToNeighbors(this, new LongWritable(-1L));
  42. }
  43. context.write(new LongWritable(context.getSuperstep()), getId());
  44. LOG.info("vertex: " + getId());
  45. }
  46. context.aggregate(new LongWritable(indegree));
  47. }
  48. }
  49. }
  50. public static class TopologySortVertexReader extends
  51. GraphLoader<LongWritable, LongWritable, NullWritable, LongWritable> {
  52. @Override
  53. public void load(
  54. LongWritable recordNum,
  55. WritableRecord record,
  56. MutationContext<LongWritable, LongWritable, NullWritable, LongWritable> context)
  57. throws IOException {
  58. TopologySortVertex vertex = new TopologySortVertex();
  59. vertex.setId((LongWritable) record.get(0));
  60. vertex.setValue(new LongWritable(0));
  61. String[] edges = record.get(1).toString().split(",");
  62. for (int i = 0; i < edges.length; i++) {
  63. long edge = Long.parseLong(edges[i]);
  64. if (edge >= 0) {
  65. vertex.addEdge(new LongWritable(Long.parseLong(edges[i])),
  66. NullWritable.get());
  67. }
  68. }
  69. LOG.info(record.toString());
  70. context.addVertexRequest(vertex);
  71. }
  72. }
  73. public static class LongSumCombiner extends
  74. Combiner<LongWritable, LongWritable> {
  75. @Override
  76. public void combine(LongWritable vertexId, LongWritable combinedMessage,
  77. LongWritable messageToCombine) throws IOException {
  78. combinedMessage.set(combinedMessage.get() + messageToCombine.get());
  79. }
  80. }
  81. public static class TopologySortAggregator extends
  82. Aggregator<BooleanWritable> {
  83. @SuppressWarnings("rawtypes")
  84. @Override
  85. public BooleanWritable createInitialValue(WorkerContext context)
  86. throws IOException {
  87. return new BooleanWritable(true);
  88. }
  89. @Override
  90. public void aggregate(BooleanWritable value, Object item)
  91. throws IOException {
  92. boolean hasCycle = value.get();
  93. boolean inDegreeNotZero = ((LongWritable) item).get() == 0 ? false : true;
  94. value.set(hasCycle && inDegreeNotZero);
  95. }
  96. @Override
  97. public void merge(BooleanWritable value, BooleanWritable partial)
  98. throws IOException {
  99. value.set(value.get() && partial.get());
  100. }
  101. @SuppressWarnings("rawtypes")
  102. @Override
  103. public boolean terminate(WorkerContext context, BooleanWritable value)
  104. throws IOException {
  105. if (context.getSuperstep() == 0) {
  106. // since the initial aggregator value is true, and in superstep we don't
  107. // do aggregate
  108. return false;
  109. }
  110. return value.get();
  111. }
  112. }
  113. public static void main(String[] args) throws IOException {
  114. if (args.length != 2) {
  115. System.out.println("Usage : <inputTable> <outputTable>");
  116. System.exit(-1);
  117. }
  118. // The input table is in the format of
  119. // 0 1,2
  120. // 1 3
  121. // 2 3
  122. // 3 -1
  123. // The first column is vertexid, and the second column is the destination vertexid of the vertex edge. If the value is –1, the vertex does not have any outbound edge
  124. // The output table is in the format of
  125. // 0 0
  126. // 1 1
  127. // 1 2
  128. // 2 3
  129. // The first column is the supstep value, in which the topological sequence is hidden. The second column is vertexid
  130. // TopologySortAggregator is used to determine if the graph has loops
  131. // If the input graph has a loop, the iteration ends when the indegree of vertices in the active state is not 0
  132. // You can use records in the input and output tables to determine if the graph has loops
  133. GraphJob job = new GraphJob();
  134. job.setGraphLoaderClass(TopologySortVertexReader.class);
  135. job.setVertexClass(TopologySortVertex.class);
  136. job.addInput(TableInfo.builder().tableName(args[0]).build());
  137. job.addOutput(TableInfo.builder().tableName(args[1]).build());
  138. job.setCombinerClass(LongSumCombiner.class);
  139. job.setAggregatorClass(TopologySortAggregator.class);
  140. long startTime = System.currentTimeMillis();
  141. job.run();
  142. System.out.println("Job Finished in "
  143. + (System.currentTimeMillis() - startTime) / 1000.0 + " seconds");
  144. }
  145. }
Thank you! We've received your feedback.