Single Source Shortest Path (SSSP) refers to the shortest path from a given source vertex to every other vertex in a graph. Dijkstra's algorithm is a common algorithm that is used to calculate the values of the SSSPs between vertices in a directed graph.

How the Dijkstra's algorithm works

Dijkstra's algorithm updates the SSSP values by using vertices. Each vertex maintains the current SSSP value. If the current SSSP value changes, the vertex calculates the sum of the new SSSP value and edge weight and sends a message to notify adjacent vertices of the sum. In the next iteration, the adjacent vertices update their SSSP values based on the received message. The iteration does not end until the values of the SSSPs from all vertices to the source vertex remain unchanged.

  • Initialization: The path from Source Vertex s to Source Vertex s is 0 (d[s] = 0), and the path from u to s is infinite (d[u] = ∞).
  • Iteration: If an edge from u to v exists, the value of the SSSP from s to v is calculated by using the following formula: d[v] = min(d[v], d[u] + weight(u, v)). The iteration does not end until the values of the SSSPs from all vertices to s remain unchanged.
Note For a weighted directed graph G = (V,E), multiple paths are available from Source Vertex s to Sink Vertex v. The path with the smallest sum of edge weight values is the SSSP from s to v.

The implementation principle of Dijkstra's algorithm shows that this algorithm is suitable for MaxCompute Graph.

Scenarios

Graphs are classified into two types: directed graph and undirected graph. MaxCompute supports both of the two types. The paths in directed graphs and undirected graphs vary based on the distribution of source data. Therefore, the SSSP calculation results of directed graphs and undirected graphs may be different. MaxCompute Graph uses a directed graph as the basic data model to calculate the SSSP values in the framework.

Sample code

Sample code is provided based on the graph type.

  • Directed graph
    • Define the BaseLoadingVertexResolver class, which is referenced in the SSSP class.
      import com.aliyun.odps.graph.Edge;
      import com.aliyun.odps.graph.LoadingVertexResolver;
      import com.aliyun.odps.graph.Vertex;
      import com.aliyun.odps.graph.VertexChanges;
      import com.aliyun.odps.io.Writable;
      import com.aliyun.odps.io.WritableComparable;
      
      import java.io.IOException;
      import java.util.HashSet;
      import java.util.Iterator;
      import java.util.List;
      import java.util.Set;
      
      @SuppressWarnings("rawtypes")
      public class BaseLoadingVertexResolver<I extends WritableComparable, V extends Writable, E extends Writable, M extends Writable>
              extends LoadingVertexResolver<I, V, E, M> {
        @Override
        public Vertex<I, V, E, M> resolve(I vertexId, VertexChanges<I, V, E, M> vertexChanges) throws IOException {
      
          Vertex<I, V, E, M> vertex = addVertexIfDesired(vertexId, vertexChanges);
      
          if (vertex != null) {
            addEdges(vertex, vertexChanges);
          } else {
            System.err.println("Ignore all addEdgeRequests for vertex#" + vertexId);
          }
          return vertex;
        }
      
        protected Vertex<I, V, E, M> addVertexIfDesired(
                I vertexId,
                VertexChanges<I, V, E, M> vertexChanges) {
          Vertex<I, V, E, M> vertex = null;
          if (hasVertexAdditions(vertexChanges)) {
            vertex = vertexChanges.getAddedVertexList().get(0);
          }
      
          return vertex;
        }
      
        protected void addEdges(Vertex<I, V, E, M> vertex,
                                VertexChanges<I, V, E, M> vertexChanges) throws IOException {
          Set<I> destVertexId = new HashSet<I>();
          if (vertex.hasEdges()) {
            List<Edge<I, E>> edgeList = vertex.getEdges();
            for (Iterator<Edge<I, E>> edges = edgeList.iterator(); edges.hasNext(); ) {
              Edge<I, E> edge = edges.next();
              if (destVertexId.contains(edge.getDestVertexId())) {
                edges.remove();
              } else {
                destVertexId.add(edge.getDestVertexId());
              }
            }
          }
      
          for (Vertex<I, V, E, M> vertex1 : vertexChanges.getAddedVertexList()) {
            if (vertex1.hasEdges()) {
              List<Edge<I, E>> edgeList = vertex1.getEdges();
              for (Edge<I, E> edge : edgeList) {
                if (destVertexId.contains(edge.getDestVertexId())) continue;
                destVertexId.add(edge.getDestVertexId());
                vertex.addEdge(edge.getDestVertexId(), edge.getValue());
              }
            }
          }
        }
      
        protected boolean hasVertexAdditions(VertexChanges<I, V, E, M> changes) {
          return changes != null && changes.getAddedVertexList() != null
                  && !changes.getAddedVertexList().isEmpty();
        }
      }
      Code description:
      • Line 15: Define the BaseLoadingVertexResolver class. This class is used to handle conflicts that may occur when you load data to a directed graph.
      • Line 18: resolve is the method that is used to handle conflicts. For example, if you call the addVertexRequest method twice to add a vertex, a loading conflict occurs. In this case, you must handle the conflict before you calculate the SSSP values.
    • Define the SSSP class.
      import java.io.IOException;
      
      import com.aliyun.odps.graph.Combiner;
      import com.aliyun.odps.graph.ComputeContext;
      import com.aliyun.odps.graph.Edge;
      import com.aliyun.odps.graph.GraphJob;
      import com.aliyun.odps.graph.GraphLoader;
      import com.aliyun.odps.graph.MutationContext;
      import com.aliyun.odps.graph.Vertex;
      import com.aliyun.odps.graph.WorkerContext;
      import com.aliyun.odps.graph.example.resolver.BaseLoadingVertexResolver;
      import com.aliyun.odps.io.WritableRecord;
      import com.aliyun.odps.io.LongWritable;
      import com.aliyun.odps.data.TableInfo;
      
      public class SSSP {
        public static final String START_VERTEX = "sssp.start.vertex.id";
      
        public static class SSSPVertex extends
                Vertex<LongWritable, LongWritable, LongWritable, LongWritable> {
          private static long startVertexId = -1;
      
          public SSSPVertex() {
            this.setValue(new LongWritable(Long.MAX_VALUE));
          }
      
          public boolean isStartVertex(
                  ComputeContext<LongWritable, LongWritable, LongWritable, LongWritable> context) {
            if (startVertexId == -1) {
              String s = context.getConfiguration().get(START_VERTEX);
              startVertexId = Long.parseLong(s);
            }
            return getId().get() == startVertexId;
          }
      
          @Override
          public void compute(
                  ComputeContext<LongWritable, LongWritable, LongWritable, LongWritable> context,
                  Iterable<LongWritable> messages) throws IOException {
            long minDist = isStartVertex(context) ? 0 : Long.MAX_VALUE;
            for (LongWritable msg : messages) {
              if (msg.get() < minDist) {
                minDist = msg.get();
              }
            }
            if (minDist < this.getValue().get()) {
              this.setValue(new LongWritable(minDist));
              if (hasEdges()) {
                for (Edge<LongWritable, LongWritable> e : this.getEdges()) {
                  context.sendMessage(e.getDestVertexId(), new LongWritable(minDist + e.getValue().get()));
                }
              }
            } else {
              voteToHalt();
            }
          }
      
          @Override
          public void cleanup(
                  WorkerContext<LongWritable, LongWritable, LongWritable, LongWritable> context)
                  throws IOException {
            context.write(getId(), getValue());
          }
      
          @Override
          public String toString() {
            return "Vertex(id=" + this.getId() + ",value=" + this.getValue() + ",#edges=" + this.getEdges() + ")";
          }
        }
      
        public static class SSSPGraphLoader extends
                GraphLoader<LongWritable, LongWritable, LongWritable, LongWritable> {
          @Override
          public void load(
                  LongWritable recordNum,
                  WritableRecord record,
                  MutationContext<LongWritable, LongWritable, LongWritable, LongWritable> context)
                  throws IOException {
            SSSPVertex vertex = new SSSPVertex();
            vertex.setId((LongWritable) record.get(0));
            String[] edges = record.get(1).toString().split(",");
            for (String edge : edges) {
              String[] ss = edge.split(":");
              vertex.addEdge(new LongWritable(Long.parseLong(ss[0])), new LongWritable(Long.parseLong(ss[1])));
            }
            context.addVertexRequest(vertex);
          }
        }
      
        public static class MinLongCombiner extends
                Combiner<LongWritable, LongWritable> {
          @Override
          public void combine(LongWritable vertexId, LongWritable combinedMessage,
                              LongWritable messageToCombine) throws IOException {
            if (combinedMessage.get() > messageToCombine.get()) {
              combinedMessage.set(messageToCombine.get());
            }
          }
        }
      
        public static void main(String[] args) throws IOException {
          if (args.length < 3) {
            System.out.println("Usage: <startnode> <input> <output>");
            System.exit(-1);
          }
          GraphJob job = new GraphJob();
          job.setGraphLoaderClass(SSSPGraphLoader.class);
          job.setVertexClass(SSSPVertex.class);
          job.setCombinerClass(MinLongCombiner.class);
          job.setLoadingVertexResolver(BaseLoadingVertexResolver.class);
          job.set(START_VERTEX, args[0]);
          job.addInput(TableInfo.builder().tableName(args[1]).build());
          job.addOutput(TableInfo.builder().tableName(args[2]).build());
          long startTime = System.currentTimeMillis();
          job.run();
          System.out.println("Job Finished in "
                  + (System.currentTimeMillis() - startTime) / 1000.0 + " seconds");
        }
      }
      
                                  
      Code description:
      • Line 19: Define the SSSPVertex class.
        • The vertex value indicates the value of the SSSP from a vertex to the source vertex that is specified by startVertexId.
        • The compute() method uses the following iteration formula to calculate the SSSP value and update the SSSP value to the vertex value: d[v] = min(d[v], d[u] + weight(u, v)).
        • The cleanup() method writes the SSSP value to the result table.
      • Line 54: If the value of the SSSP from the current vertex to the source vertex does not change, the voteToHalt() method is called to halt the vertex by using the framework. When all vertices vote to halt, the calculation ends.
      • Line 71: Define the GraphLoader class that is used to load data from tables to a directed graph. Records that are stored in tables are resolved into vertices or edges of graphs and loaded to the framework. In the preceding sample code, the addVertexRequest method is used to load graph vertices to the context for graph calculation.
      • Line 90: Define the MinLongCombiner class. This class is used to merge messages that are sent to the same vertex. This class helps improve computation performance and reduce memory usage.
      • Line 101: Define the GraphJob class in the main function of the main program. The GraphJob class is used to specify how the Vertex, GraphLoader, BaseLoadingVertexResolver, and Combiner classes work. The GraphJob class also specifies the input and output tables.
      • Line 110: Add the BaseLoadingVertexResolver class that is used to handle conflicts.
  • Undirected graph
    import com.aliyun.odps.data.TableInfo;
    import com.aliyun.odps.graph.*;
    import com.aliyun.odps.io.DoubleWritable;
    import com.aliyun.odps.io.LongWritable;
    import com.aliyun.odps.io.WritableRecord;
    
    import java.io.IOException;
    import java.util.HashSet;
    import java.util.Set;
    
    public class SSSPBenchmark4 {
        public static final String START_VERTEX = "sssp.start.vertex.id";
    
        public static class SSSPVertex extends
                Vertex<LongWritable, DoubleWritable, DoubleWritable, DoubleWritable> {
            private static long startVertexId = -1;
            public SSSPVertex() {
                this.setValue(new DoubleWritable(Double.MAX_VALUE));
            }
            public boolean isStartVertex(
                    ComputeContext<LongWritable, DoubleWritable, DoubleWritable, DoubleWritable> context) {
                if (startVertexId == -1) {
                    String s = context.getConfiguration().get(START_VERTEX);
                    startVertexId = Long.parseLong(s);
                }
                return getId().get() == startVertexId;
            }
    
            @Override
            public void compute(
                    ComputeContext<LongWritable, DoubleWritable, DoubleWritable, DoubleWritable> context,
                    Iterable<DoubleWritable> messages) throws IOException {
                double minDist = isStartVertex(context) ? 0 : Double.MAX_VALUE;
                for (DoubleWritable msg : messages) {
                    if (msg.get() < minDist) {
                        minDist = msg.get();
                    }
                }
                if (minDist < this.getValue().get()) {
                    this.setValue(new DoubleWritable(minDist));
                    if (hasEdges()) {
                        for (Edge<LongWritable, DoubleWritable> e : this.getEdges()) {
                            context.sendMessage(e.getDestVertexId(), new DoubleWritable(minDist
                                    + e.getValue().get()));
                        }
                    }
                } else {
                    voteToHalt();
                }
            }
    
            @Override
            public void cleanup(
                    WorkerContext<LongWritable, DoubleWritable, DoubleWritable, DoubleWritable> context)
                    throws IOException {
                context.write(getId(), getValue());
            }
        }
    
        public static class MinLongCombiner extends
                Combiner<LongWritable, DoubleWritable> {
            @Override
            public void combine(LongWritable vertexId, DoubleWritable combinedMessage,
                                DoubleWritable messageToCombine) {
                if (combinedMessage.get() > messageToCombine.get()) {
                    combinedMessage.set(messageToCombine.get());
                }
            }
        }
    
        public static class SSSPGraphLoader extends
                GraphLoader<LongWritable, DoubleWritable, DoubleWritable, DoubleWritable> {
            @Override
            public void load(
                    LongWritable recordNum,
                    WritableRecord record,
                    MutationContext<LongWritable, DoubleWritable, DoubleWritable, DoubleWritable> context)
                    throws IOException {
                LongWritable sourceVertexID = (LongWritable) record.get(0);
                LongWritable destinationVertexID = (LongWritable) record.get(1);
                DoubleWritable edgeValue = (DoubleWritable) record.get(2);
                Edge<LongWritable, DoubleWritable> edge = new Edge<LongWritable, DoubleWritable>(destinationVertexID, edgeValue);
                context.addEdgeRequest(sourceVertexID, edge);
                Edge<LongWritable, DoubleWritable> edge2 = new
                        Edge<LongWritable, DoubleWritable>(sourceVertexID, edgeValue);
                context.addEdgeRequest(destinationVertexID, edge2);
            }
        }
    
        public static class SSSPLoadingVertexResolver extends
                LoadingVertexResolver<LongWritable, DoubleWritable, DoubleWritable, DoubleWritable> {
    
            @Override
            public Vertex<LongWritable, DoubleWritable, DoubleWritable, DoubleWritable> resolve(
                    LongWritable vertexId,
                    VertexChanges<LongWritable, DoubleWritable, DoubleWritable, DoubleWritable> vertexChanges) throws IOException {
    
                SSSPVertex computeVertex = new SSSPVertex();
                computeVertex.setId(vertexId);
                Set<LongWritable> destinationVertexIDSet = new HashSet<>();
    
                if (hasEdgeAdditions(vertexChanges)) {
                    for (Edge<LongWritable, DoubleWritable> edge : vertexChanges.getAddedEdgeList()) {
                        if (!destinationVertexIDSet.contains(edge.getDestVertexId())) {
                            destinationVertexIDSet.add(edge.getDestVertexId());
                            computeVertex.addEdge(edge.getDestVertexId(), edge.getValue());
                        }
    
                    }
                }
    
                return computeVertex;
            }
    
            protected boolean hasEdgeAdditions(VertexChanges<LongWritable, DoubleWritable, DoubleWritable, DoubleWritable> changes) {
                return changes != null && changes.getAddedEdgeList() != null
                        && !changes.getAddedEdgeList().isEmpty();
            }
        }
    
        public static void main(String[] args) throws IOException {
            if (args.length < 2) {
                System.out.println("Usage: <startnode> <input> <output>");
                System.exit(-1);
            }
            GraphJob job = new GraphJob();
            job.setGraphLoaderClass(SSSPGraphLoader.class);
            job.setLoadingVertexResolver(SSSPLoadingVertexResolver.class);
            job.setVertexClass(SSSPVertex.class);
            job.setCombinerClass(MinLongCombiner.class);
            job.set(START_VERTEX, args[0]);
            job.addInput(TableInfo.builder().tableName(args[1]).build());
            job.addOutput(TableInfo.builder().tableName(args[2]).build());
            long startTime = System.currentTimeMillis();
            job.run();
            System.out.println("Job Finished in "
                    + (System.currentTimeMillis() - startTime) / 1000.0 + " seconds");
        }
    }
                        
    Code description:
    • Line 15: Define the SSSPVertex class.
      • The vertex value indicates the value of the SSSP from a vertex to the source vertex that is specified by startVertexId.
      • The compute() method uses the following iteration formula to calculate the SSSP value and update the value to the vertex value: d[v] = min(d[v], d[u] + weight(u, v)).
      • The cleanup() method writes the SSSP value to the result table.
    • Line 54: If the value of the SSSP from the current vertex to the source vertex does not change, voteToHalt() is called to halt the vertex by using the framework. When all vertices vote to halt, the calculation ends.
    • Line 61: Define the MinLongCombiner class. This class is used to merge messages that are sent to the same vertex. This class helps improve computation performance and reduce memory usage.
    • Line 72: Define the GraphLoader class that is used to load data from tables to an undirected graph. The addEdgeRequest method is called to use the edges between the two vertices as undirected edges. This ensures that graph data stored in the current table is loaded to an undirected graph.
      • Line 80: Specify the ID of the source vertex.
      • Line 81: Specify the ID of the destination vertex.
      • Line 82: Specify the edge weight.
      • Line 83: Specify the ID of the destination vertex and edge weight to add an edge to the destination vertex.
      • Line 84: Send a request to add an edge to the source vertex.
      • Line 85 to Line 87: Each record indicates a bidirectional edge. The descriptions are the same as those of Line 83 and Line 84.
    • Define the SSSPLoadingVertexResolver class. This class is used to handle conflicts that may occur when you load the data to an undirected graph. For example, if you call the addEdgeRequest method twice to add an edge, a loading conflict occurs. In this case, you must handle the conflict before you calculate the SSSP values.
    • Line 101: Define the GraphJob class in the main function of the main program. The GraphJob class is used to specify how the Vertex, GraphLoader, BaseLoadingVertexResolver, and Combiner classes work. The GraphJob class also specifies the input and output tables.

Calculate SSSP values

This section provides an example to demonstrate how to calculate the SSSP values of a directed graph. For more information about sample data, see Write a Graph job.

Sample command:
jar -libjars odps-graph-example-sssp.jar -classpath $LOCAL_JAR_PATH/odps-graph-example-sssp.jar com.aliyun.odps.graph.example.SSSP 1 sssp_in sssp_out;
After you run the command, the following command output is returned.
In the command output, the vertex column indicates the current vertex and the value column indicates the value of the SSSP from the current vertex to Source Vertex 1.
vertex    value
1        0
2        2
3        1
4        3
5        2
Note You can use the source vertex ID, destination vertex IDs, and edge weights to load data to an undirected graph based on the sample code for undirected graphs.