SSSP

Last Updated: Oct 31, 2017

Dijkstra is a typical algorithm that calculates the Single Source Shortest Path (SSSP) in a directed graph. For weighted directed graph G=(V,E), many paths are routed from source vertex s to sink vertex v. In these paths, the one that has the smallest edge weight sum is called the shortest distance from s to v. The basic principle of the algorithm is as follows:

  • Initialization: The distance from source vertex s to s itself is zero (d[s] = 0), and the distance from another vertex u to s is infinite (d[u]=∞).

  • Iteration: If an edge exists from u to v, the shortest distance from s to v is updated as: d[v] = min(d[v], d[u] + weight(u, v)). The iteration ends until the distance from all vertices to s does not change.

The basic principle shows that the algorithm is applicable to solutions using the MaxCompute Graph program. Each vertex maintains the current shortest distance to the source vertex. If the value changes, a message containing the new value and the edge weight is sent to the adjacent vertex. In the next iteration, the adjacent vertex updates the current shortest distance based on the received message. The iteration ends when the current shortest distance of all vertices does not change.

Source code

  1. import java.io.IOException;
  2. import com.aliyun.odps.io.WritableRecord;
  3. import com.aliyun.odps.graph.Combiner;
  4. import com.aliyun.odps.graph.ComputeContext;
  5. import com.aliyun.odps.graph.Edge;
  6. import com.aliyun.odps.graph.GraphJob;
  7. import com.aliyun.odps.graph.GraphLoader;
  8. import com.aliyun.odps.graph.MutationContext;
  9. import com.aliyun.odps.graph.Vertex;
  10. import com.aliyun.odps.graph.WorkerContext;
  11. import com.aliyun.odps.io.LongWritable;
  12. import com.aliyun.odps.data.TableInfo;
  13. public class SSSP {
  14. public static final String START_VERTEX = "sssp.start.vertex.id";
  15. public static class SSSPVertex extends
  16. Vertex<LongWritable, LongWritable, LongWritable, LongWritable> {
  17. private static long startVertexId = -1;
  18. public SSSPVertex() {
  19. this.setValue(new LongWritable(Long.MAX_VALUE));
  20. }
  21. public boolean isStartVertex(
  22. ComputeContext<LongWritable, LongWritable, LongWritable, LongWritable> context) {
  23. if (startVertexId == -1) {
  24. String s = context.getConfiguration().get(START_VERTEX);
  25. startVertexId = Long.parseLong(s);
  26. }
  27. return getId().get() == startVertexId;
  28. }
  29. @Override
  30. public void compute(
  31. ComputeContext<LongWritable, LongWritable, LongWritable, LongWritable> context,
  32. Iterable<LongWritable> messages) throws IOException {
  33. long minDist = isStartVertex(context) ? 0 : Integer.MAX_VALUE;
  34. for (LongWritable msg : messages) {
  35. if (msg.get() < minDist) {
  36. minDist = msg.get();
  37. }
  38. }
  39. if (minDist < this.getValue().get()) {
  40. this.setValue(new LongWritable(minDist));
  41. if (hasEdges()) {
  42. for (Edge<LongWritable, LongWritable> e : this.getEdges()) {
  43. context.sendMessage(e.getDestVertexId(), new LongWritable(minDist
  44. + e.getValue().get()));
  45. }
  46. }
  47. } else {
  48. voteToHalt();
  49. }
  50. }
  51. @Override
  52. public void cleanup(
  53. WorkerContext<LongWritable, LongWritable, LongWritable, LongWritable> context)
  54. throws IOException {
  55. context.write(getId(), getValue());
  56. }
  57. }
  58. public static class MinLongCombiner extends
  59. Combiner<LongWritable, LongWritable> {
  60. @Override
  61. public void combine(LongWritable vertexId, LongWritable combinedMessage,
  62. LongWritable messageToCombine) throws IOException {
  63. if (combinedMessage.get() > messageToCombine.get()) {
  64. combinedMessage.set(messageToCombine.get());
  65. }
  66. }
  67. }
  68. public static class SSSPVertexReader extends
  69. GraphLoader<LongWritable, LongWritable, LongWritable, LongWritable> {
  70. @Override
  71. public void load(
  72. LongWritable recordNum,
  73. WritableRecord record,
  74. MutationContext<LongWritable, LongWritable, LongWritable, LongWritable> context)
  75. throws IOException {
  76. SSSPVertex vertex = new SSSPVertex();
  77. vertex.setId((LongWritable) record.get(0));
  78. String[] edges = record.get(1).toString().split(",");
  79. for (int i = 0; i < edges.length; i++) {
  80. String[] ss = edges[i].split(":");
  81. vertex.addEdge(new LongWritable(Long.parseLong(ss[0])),
  82. new LongWritable(Long.parseLong(ss[1])));
  83. }
  84. context.addVertexRequest(vertex);
  85. }
  86. }
  87. public static void main(String[] args) throws IOException {
  88. if (args.length < 2) {
  89. System.out.println("Usage: <startnode> <input> <output>");
  90. System.exit(-1);
  91. }
  92. GraphJob job = new GraphJob();
  93. job.setGraphLoaderClass(SSSPVertexReader.class);
  94. job.setVertexClass(SSSPVertex.class);
  95. job.setCombinerClass(MinLongCombiner.class);
  96. job.set(START_VERTEX, args[0]);
  97. job.addInput(TableInfo.builder().tableName(args[1]).build());
  98. job.addOutput(TableInfo.builder().tableName(args[2]).build());
  99. long startTime = System.currentTimeMillis();
  100. job.run();
  101. System.out.println("Job Finished in "
  102. + (System.currentTimeMillis() - startTime) / 1000.0 + " seconds");
  103. }
  104. }

Code description

The source code of SSSP includes the following parts:

  • Row 85: Defines the SSSPVertexReader class, loads a graph, and resolves each record in the table into a vertex. The first column of the record is the vertex ID, and the second column stores all edge sets starting from the vertex, such as 2:2, 3:1, 4:4.
  • Row 21: Defines SSSPVertex, where:
    • The vertex value indicates the current shortest distance from this vertex to source vertex startVertexId.
    • The compute() method uses the iteration formula d[v] = min(d[v], d[u] + weight(u, v)) to update the vertex value.
    • The cleanup() method writes the vertex and its shortest distance to the source vertex to the result table.
  • Row 60: If the vertex value does not change, voteToHalt() is called to notify the framework that this vertex enters the halt status. The calculation ends when all vertices enter the halt state.
  • Row 72: Defines MinLongCombiner and combines messages sent to the same vertex to optimize performance and reduce memory usage.
  • Row 108: Runs the main program (main function), defines GraphJob, and specifies the implementation of Vertex/GraphLoader/Combiner, and the input and output tables.
Thank you! We've received your feedback.