SSSP

Last Updated: Jun 22, 2016

Dijkstra algorithm is the classic algorithm to solve single source shortest path (Single Source Shortest Path, abbreviation: SSSP) in directed graph. The shortest path: for a directed weighted graph ‘G=(V,E)’, there are multiple paths from the souce vertex ‘s’ to meeting vertex ‘v’, in which the distance which edge weight sum is minimum is called the shortest path from s to v. The basic principles of this algorithm include:

  • Initialization: the distance from source vertex s to s itself (d[s]=0) and the distance from other vertex u to s is infinite (d[u]= ∞).
  • Iteration: if there is an edge from u to v, then the shortest distance from s to v will be updated to‘d[v] =min (d[v], d[u] +weight (u, v))’. Until the distances from all vertices to s do not change, the iteration is terminated.

According to the fundamentals of this algorithm, it is very suitable to implement it by MaxCompute Graph: each vertex maintains the shortest distance value from the source vertex. Once this value changes, send the new value added edge weights to neighbor nodes. In the following iteration, the neighbor nodes will update the current shortest distance according to their received message respectively. When the current shortest distances of all vertices do not change, the iteration will be terminated.

Source Code

  1. import java.io.IOException;
  2. import com.aliyun.odps.io.WritableRecord;
  3. import com.aliyun.odps.graph.Combiner;
  4. import com.aliyun.odps.graph.ComputeContext;
  5. import com.aliyun.odps.graph.Edge;
  6. import com.aliyun.odps.graph.GraphJob;
  7. import com.aliyun.odps.graph.GraphLoader;
  8. import com.aliyun.odps.graph.MutationContext;
  9. import com.aliyun.odps.graph.Vertex;
  10. import com.aliyun.odps.graph.WorkerContext;
  11. import com.aliyun.odps.io.LongWritable;
  12. import com.aliyun.odps.data.TableInfo;
  13. public class SSSP {
  14. public static final String START_VERTEX = "sssp.start.vertex.id";
  15. public static class SSSPVertex extends
  16. Vertex<LongWritable, LongWritable, LongWritable, LongWritable> {
  17. private static long startVertexId = -1;
  18. public SSSPVertex() {
  19. this.setValue(new LongWritable(Long.MAX_VALUE));
  20. }
  21. public boolean isStartVertex(
  22. ComputeContext<LongWritable, LongWritable, LongWritable, LongWritable> context) {
  23. if (startVertexId == -1) {
  24. String s = context.getConfiguration().get(START_VERTEX);
  25. startVertexId = Long.parseLong(s);
  26. }
  27. return getId().get() == startVertexId;
  28. }
  29. @Override
  30. public void compute(
  31. ComputeContext<LongWritable, LongWritable, LongWritable, LongWritable> context,
  32. Iterable<LongWritable> messages) throws IOException {
  33. long minDist = isStartVertex(context) ? 0 : Integer.MAX_VALUE;
  34. for (LongWritable msg : messages) {
  35. if (msg.get() < minDist) {
  36. minDist = msg.get();
  37. }
  38. }
  39. if (minDist < this.getValue().get()) {
  40. this.setValue(new LongWritable(minDist));
  41. if (hasEdges()) {
  42. for (Edge<LongWritable, LongWritable> e : this.getEdges()) {
  43. context.sendMessage(e.getDestVertexId(), new LongWritable(minDist
  44. + e.getValue().get()));
  45. }
  46. }
  47. } else {
  48. voteToHalt();
  49. }
  50. }
  51. @Override
  52. public void cleanup(
  53. WorkerContext<LongWritable, LongWritable, LongWritable, LongWritable> context)
  54. throws IOException {
  55. context.write(getId(), getValue());
  56. }
  57. }
  58. public static class MinLongCombiner extends
  59. Combiner<LongWritable, LongWritable> {
  60. @Override
  61. public void combine(LongWritable vertexId, LongWritable combinedMessage,
  62. LongWritable messageToCombine) throws IOException {
  63. if (combinedMessage.get() > messageToCombine.get()) {
  64. combinedMessage.set(messageToCombine.get());
  65. }
  66. }
  67. }
  68. public static class SSSPVertexReader extends
  69. GraphLoader<LongWritable, LongWritable, LongWritable, LongWritable> {
  70. @Override
  71. public void load(
  72. LongWritable recordNum,
  73. WritableRecord record,
  74. MutationContext<LongWritable, LongWritable, LongWritable, LongWritable> context)
  75. throws IOException {
  76. SSSPVertex vertex = new SSSPVertex();
  77. vertex.setId((LongWritable) record.get(0));
  78. String[] edges = record.get(1).toString().split(",");
  79. for (int i = 0; i < edges.length; i++) {
  80. String[] ss = edges[i].split(":");
  81. vertex.addEdge(new LongWritable(Long.parseLong(ss[0])),
  82. new LongWritable(Long.parseLong(ss[1])));
  83. }
  84. context.addVertexRequest(vertex);
  85. }
  86. }
  87. public static void main(String[] args) throws IOException {
  88. if (args.length < 2) {
  89. System.out.println("Usage: <startnode> <input> <output>");
  90. System.exit(-1);
  91. }
  92. GraphJob job = new GraphJob();
  93. job.setGraphLoaderClass(SSSPVertexReader.class);
  94. job.setVertexClass(SSSPVertex.class);
  95. job.setCombinerClass(MinLongCombiner.class);
  96. job.set(START_VERTEX, args[0]);
  97. job.addInput(TableInfo.builder().tableName(args[1]).build());
  98. job.addOutput(TableInfo.builder().tableName(args[2]).build());
  99. long startTime = System.currentTimeMillis();
  100. job.run();
  101. System.out.println("Job Finished in "
  102. + (System.currentTimeMillis() - startTime) / 1000.0 + " seconds");
  103. }
  104. }

Code Description

The source codes of SSSP include the following parts:

  • Line 85: define the class SSSPVertexReader, to load graph and parse each record in table to a vertex. The first column of record is vertex ID and the second column of record is to save all edges which take this vertex as the starting point, the content is shown as: 2:2, 3:1, 4:4.
  • Line 21: define SSSPVertex, in which:
    • The vertex value indicates the current shortest path from this vertex to source vertex startVertexId.
    • The method ‘compute()’ uses iterative formula ‘d[v]=min(d[v], d[u]+weight(u, v))’ to update vertex value;
    • The method ‘cleanup()’ writes the vertex and its shortest path to source vertex into the result table;
  • Line 60: when the vertex value does not change, call ‘voteToHalt ()’ to tell the framework that this vertex has entered halt status. If all vertices enter halt status, the computation is terminated.
  • Line 72: define MinLongCombiner. Combine messages sent to the same vertex to optimize the performance and reduce memeory occupy.
  • Line 108: main program (mainClass), define GraphJob, to achieve Vertex/GraphLoader/Combiner and to specify input and output tables.
Thank you! We've received your feedback.