All Products
Search
Document Center

MaxCompute:SSSP

Last Updated:Mar 26, 2026

Single Source Shortest Path (SSSP) refers to the shortest path from a given source vertex to every other vertex in a graph. MaxCompute Graph implements SSSP using Dijkstra's algorithm.

How it works

Dijkstra's algorithm works by having each vertex hold its current SSSP value and propagate updates to neighbors through message passing.

Initialization: The source vertex s starts with distance 0 (d[s] = 0). All other vertices start with infinite distance (d[u] = ∞).

Iteration: At each iteration, if a vertex receives a message indicating a shorter path, it updates its SSSP value and notifies its adjacent vertices using this formula:

d[v] = min(d[v], d[u] + weight(u, v))

Termination: When no vertex updates its SSSP value in an iteration, all vertices vote to halt and the job finishes.

For a weighted directed graph G = (V,E), multiple paths may exist from source vertex s to sink vertex v. The SSSP is the path with the smallest total edge weight.

Graph types

MaxCompute supports both directed and undirected graphs. SSSP results differ between the two because path availability depends on edge direction. MaxCompute Graph uses a directed graph as its base data model, but you can model undirected graphs by representing each undirected edge as two directed edges.

Framework roles

Each SSSP implementation requires three components that plug into the MaxCompute Graph framework:

Component Class Role
Vertex program SSSPVertex (extends Vertex) Receives messages, updates SSSP value, sends messages to neighbors, votes to halt
Message combiner MinLongCombiner (extends Combiner) Merges multiple messages to the same vertex into one, reducing network traffic
Graph loader SSSPGraphLoader (extends GraphLoader) Reads table records and converts them into graph vertices and edges

A fourth component, LoadingVertexResolver, handles conflicts that arise during data loading (for example, when the same vertex is added more than once).

Sample code

Directed graph

The directed graph implementation uses two classes: BaseLoadingVertexResolver (conflict resolution) and SSSP (the main job).

`BaseLoadingVertexResolver` — handles vertex loading conflicts:

import com.aliyun.odps.graph.Edge;
import com.aliyun.odps.graph.LoadingVertexResolver;
import com.aliyun.odps.graph.Vertex;
import com.aliyun.odps.graph.VertexChanges;
import com.aliyun.odps.io.Writable;
import com.aliyun.odps.io.WritableComparable;

import java.io.IOException;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Set;

@SuppressWarnings("rawtypes")
public class BaseLoadingVertexResolver<I extends WritableComparable, V extends Writable, E extends Writable, M extends Writable>
        extends LoadingVertexResolver<I, V, E, M> {
  @Override
  public Vertex<I, V, E, M> resolve(I vertexId, VertexChanges<I, V, E, M> vertexChanges) throws IOException {

    Vertex<I, V, E, M> vertex = addVertexIfDesired(vertexId, vertexChanges);

    if (vertex != null) {
      addEdges(vertex, vertexChanges);
    } else {
      System.err.println("Ignore all addEdgeRequests for vertex#" + vertexId);
    }
    return vertex;
  }

  protected Vertex<I, V, E, M> addVertexIfDesired(
          I vertexId,
          VertexChanges<I, V, E, M> vertexChanges) {
    Vertex<I, V, E, M> vertex = null;
    if (hasVertexAdditions(vertexChanges)) {
      vertex = vertexChanges.getAddedVertexList().get(0);
    }

    return vertex;
  }

  protected void addEdges(Vertex<I, V, E, M> vertex,
                          VertexChanges<I, V, E, M> vertexChanges) throws IOException {
    Set<I> destVertexId = new HashSet<I>();
    if (vertex.hasEdges()) {
      List<Edge<I, E>> edgeList = vertex.getEdges();
      for (Iterator<Edge<I, E>> edges = edgeList.iterator(); edges.hasNext(); ) {
        Edge<I, E> edge = edges.next();
        if (destVertexId.contains(edge.getDestVertexId())) {
          edges.remove();
        } else {
          destVertexId.add(edge.getDestVertexId());
        }
      }
    }

    for (Vertex<I, V, E, M> vertex1 : vertexChanges.getAddedVertexList()) {
      if (vertex1.hasEdges()) {
        List<Edge<I, E>> edgeList = vertex1.getEdges();
        for (Edge<I, E> edge : edgeList) {
          if (destVertexId.contains(edge.getDestVertexId())) continue;
          destVertexId.add(edge.getDestVertexId());
          vertex.addEdge(edge.getDestVertexId(), edge.getValue());
        }
      }
    }
  }

  protected boolean hasVertexAdditions(VertexChanges<I, V, E, M> changes) {
    return changes != null && changes.getAddedVertexList() != null
            && !changes.getAddedVertexList().isEmpty();
  }
}

Key points:

  • resolve() is called when a loading conflict occurs — for example, if addVertexRequest is called twice for the same vertex. The method selects the first vertex and deduplicates its edges.

  • Without this resolver, duplicate vertex additions during data loading would cause undefined behavior before computation starts.

`SSSP` — the main job class:

import java.io.IOException;

import com.aliyun.odps.graph.Combiner;
import com.aliyun.odps.graph.ComputeContext;
import com.aliyun.odps.graph.Edge;
import com.aliyun.odps.graph.GraphJob;
import com.aliyun.odps.graph.GraphLoader;
import com.aliyun.odps.graph.MutationContext;
import com.aliyun.odps.graph.Vertex;
import com.aliyun.odps.graph.WorkerContext;
import com.aliyun.odps.io.WritableRecord;
import com.aliyun.odps.io.LongWritable;
import com.aliyun.odps.data.TableInfo;

public class SSSP {
  public static final String START_VERTEX = "sssp.start.vertex.id";

  public static class SSSPVertex extends
          Vertex<LongWritable, LongWritable, LongWritable, LongWritable> {
    private static long startVertexId = -1;

    public SSSPVertex() {
      this.setValue(new LongWritable(Long.MAX_VALUE));
    }

    public boolean isStartVertex(
            ComputeContext<LongWritable, LongWritable, LongWritable, LongWritable> context) {
      if (startVertexId == -1) {
        String s = context.getConfiguration().get(START_VERTEX);
        startVertexId = Long.parseLong(s);
      }
      return getId().get() == startVertexId;
    }

    @Override
    public void compute(
            ComputeContext<LongWritable, LongWritable, LongWritable, LongWritable> context,
            Iterable<LongWritable> messages) throws IOException {
      long minDist = isStartVertex(context) ? 0 : Long.MAX_VALUE;
      for (LongWritable msg : messages) {
        if (msg.get() < minDist) {
          minDist = msg.get();
        }
      }
      if (minDist < this.getValue().get()) {
        this.setValue(new LongWritable(minDist));
        if (hasEdges()) {
          for (Edge<LongWritable, LongWritable> e : this.getEdges()) {
            context.sendMessage(e.getDestVertexId(), new LongWritable(minDist + e.getValue().get()));
          }
        }
      } else {
        voteToHalt();  // No update — this vertex is done
      }
    }

    @Override
    public void cleanup(
            WorkerContext<LongWritable, LongWritable, LongWritable, LongWritable> context)
            throws IOException {
      context.write(getId(), getValue());  // Write final SSSP value to output table
    }

    @Override
    public String toString() {
      return "Vertex(id=" + this.getId() + ",value=" + this.getValue() + ",#edges=" + this.getEdges() + ")";
    }
  }

  public static class SSSPGraphLoader extends
          GraphLoader<LongWritable, LongWritable, LongWritable, LongWritable> {
    @Override
    public void load(
            LongWritable recordNum,
            WritableRecord record,
            MutationContext<LongWritable, LongWritable, LongWritable, LongWritable> context)
            throws IOException {
      SSSPVertex vertex = new SSSPVertex();
      vertex.setId((LongWritable) record.get(0));
      String[] edges = record.get(1).toString().split(",");
      for (String edge : edges) {
        String[] ss = edge.split(":");
        vertex.addEdge(new LongWritable(Long.parseLong(ss[0])), new LongWritable(Long.parseLong(ss[1])));
      }
      context.addVertexRequest(vertex);
    }
  }

  public static class MinLongCombiner extends
          Combiner<LongWritable, LongWritable> {
    @Override
    public void combine(LongWritable vertexId, LongWritable combinedMessage,
                        LongWritable messageToCombine) throws IOException {
      if (combinedMessage.get() > messageToCombine.get()) {
        combinedMessage.set(messageToCombine.get());
      }
    }
  }

  public static void main(String[] args) throws IOException {
    if (args.length < 3) {
      System.out.println("Usage: <startnode> <input> <output>");
      System.exit(-1);
    }
    GraphJob job = new GraphJob();
    job.setGraphLoaderClass(SSSPGraphLoader.class);
    job.setVertexClass(SSSPVertex.class);
    job.setCombinerClass(MinLongCombiner.class);
    job.setLoadingVertexResolver(BaseLoadingVertexResolver.class);
    job.set(START_VERTEX, args[0]);
    job.addInput(TableInfo.builder().tableName(args[1]).build());
    job.addOutput(TableInfo.builder().tableName(args[2]).build());
    long startTime = System.currentTimeMillis();
    job.run();
    System.out.println("Job Finished in "
            + (System.currentTimeMillis() - startTime) / 1000.0 + " seconds");
  }
}

Key points:

  • `SSSPVertex.compute()` implements d[v] = min(d[v], d[u] + weight(u, v)). The vertex finds the minimum distance across all received messages, updates its value if it improved, then sends the new distance plus each outgoing edge weight to neighbors. If the value has not changed, it calls voteToHalt() to signal that this vertex needs no further computation.

  • `SSSPGraphLoader.load()` reads each table record as a vertex with a comma-separated edge list in "destId:weight" format. It uses addVertexRequest to register the vertex, which may trigger BaseLoadingVertexResolver if the same vertex appears multiple times.

  • `MinLongCombiner.combine()` keeps only the minimum distance among all messages bound for the same vertex. This reduces message volume, improving computation performance and reducing memory usage.

  • `GraphJob` (main) wires the components together: loader, vertex class, combiner, and conflict resolver. It sets the start vertex ID (passed as args[0]) and binds input and output tables.

Undirected graph

The undirected graph implementation (SSSPBenchmark4) differs from the directed graph in two ways:

  • Edge loading: SSSPGraphLoader.load() calls addEdgeRequest twice per record — once for each direction — making every edge bidirectional.

  • Data types: Edge weights and distances use DoubleWritable instead of LongWritable, supporting fractional weights.

import com.aliyun.odps.data.TableInfo;
import com.aliyun.odps.graph.*;
import com.aliyun.odps.io.DoubleWritable;
import com.aliyun.odps.io.LongWritable;
import com.aliyun.odps.io.WritableRecord;

import java.io.IOException;
import java.util.HashSet;
import java.util.Set;

public class SSSPBenchmark4 {
    public static final String START_VERTEX = "sssp.start.vertex.id";

    public static class SSSPVertex extends
            Vertex<LongWritable, DoubleWritable, DoubleWritable, DoubleWritable> {
        private static long startVertexId = -1;
        public SSSPVertex() {
            this.setValue(new DoubleWritable(Double.MAX_VALUE));
        }
        public boolean isStartVertex(
                ComputeContext<LongWritable, DoubleWritable, DoubleWritable, DoubleWritable> context) {
            if (startVertexId == -1) {
                String s = context.getConfiguration().get(START_VERTEX);
                startVertexId = Long.parseLong(s);
            }
            return getId().get() == startVertexId;
        }

        @Override
        public void compute(
                ComputeContext<LongWritable, DoubleWritable, DoubleWritable, DoubleWritable> context,
                Iterable<DoubleWritable> messages) throws IOException {
            double minDist = isStartVertex(context) ? 0 : Double.MAX_VALUE;
            for (DoubleWritable msg : messages) {
                if (msg.get() < minDist) {
                    minDist = msg.get();
                }
            }
            if (minDist < this.getValue().get()) {
                this.setValue(new DoubleWritable(minDist));
                if (hasEdges()) {
                    for (Edge<LongWritable, DoubleWritable> e : this.getEdges()) {
                        context.sendMessage(e.getDestVertexId(), new DoubleWritable(minDist
                                + e.getValue().get()));
                    }
                }
            } else {
                voteToHalt();
            }
        }

        @Override
        public void cleanup(
                WorkerContext<LongWritable, DoubleWritable, DoubleWritable, DoubleWritable> context)
                throws IOException {
            context.write(getId(), getValue());
        }
    }

    public static class MinLongCombiner extends
            Combiner<LongWritable, DoubleWritable> {
        @Override
        public void combine(LongWritable vertexId, DoubleWritable combinedMessage,
                            DoubleWritable messageToCombine) {
            if (combinedMessage.get() > messageToCombine.get()) {
                combinedMessage.set(messageToCombine.get());
            }
        }
    }

    public static class SSSPGraphLoader extends
            GraphLoader<LongWritable, DoubleWritable, DoubleWritable, DoubleWritable> {
        @Override
        public void load(
                LongWritable recordNum,
                WritableRecord record,
                MutationContext<LongWritable, DoubleWritable, DoubleWritable, DoubleWritable> context)
                throws IOException {
            LongWritable sourceVertexID = (LongWritable) record.get(0);      // Source vertex ID
            LongWritable destinationVertexID = (LongWritable) record.get(1); // Destination vertex ID
            DoubleWritable edgeValue = (DoubleWritable) record.get(2);       // Edge weight

            // Add a directed edge from source to destination
            Edge<LongWritable, DoubleWritable> edge = new Edge<LongWritable, DoubleWritable>(destinationVertexID, edgeValue);
            context.addEdgeRequest(sourceVertexID, edge);

            // Add the reverse edge to make the graph undirected
            Edge<LongWritable, DoubleWritable> edge2 = new
                    Edge<LongWritable, DoubleWritable>(sourceVertexID, edgeValue);
            context.addEdgeRequest(destinationVertexID, edge2);
        }
    }

    public static class SSSPLoadingVertexResolver extends
            LoadingVertexResolver<LongWritable, DoubleWritable, DoubleWritable, DoubleWritable> {

        @Override
        public Vertex<LongWritable, DoubleWritable, DoubleWritable, DoubleWritable> resolve(
                LongWritable vertexId,
                VertexChanges<LongWritable, DoubleWritable, DoubleWritable, DoubleWritable> vertexChanges) throws IOException {

            SSSPVertex computeVertex = new SSSPVertex();
            computeVertex.setId(vertexId);
            Set<LongWritable> destinationVertexIDSet = new HashSet<>();

            if (hasEdgeAdditions(vertexChanges)) {
                for (Edge<LongWritable, DoubleWritable> edge : vertexChanges.getAddedEdgeList()) {
                    if (!destinationVertexIDSet.contains(edge.getDestVertexId())) {
                        destinationVertexIDSet.add(edge.getDestVertexId());
                        computeVertex.addEdge(edge.getDestVertexId(), edge.getValue());
                    }

                }
            }

            return computeVertex;
        }

        protected boolean hasEdgeAdditions(VertexChanges<LongWritable, DoubleWritable, DoubleWritable, DoubleWritable> changes) {
            return changes != null && changes.getAddedEdgeList() != null
                    && !changes.getAddedEdgeList().isEmpty();
        }
    }

    public static void main(String[] args) throws IOException {
        if (args.length < 2) {
            System.out.println("Usage: <startnode> <input> <output>");
            System.exit(-1);
        }
        GraphJob job = new GraphJob();
        job.setGraphLoaderClass(SSSPGraphLoader.class);
        job.setLoadingVertexResolver(SSSPLoadingVertexResolver.class);
        job.setVertexClass(SSSPVertex.class);
        job.setCombinerClass(MinLongCombiner.class);
        job.set(START_VERTEX, args[0]);
        job.addInput(TableInfo.builder().tableName(args[1]).build());
        job.addOutput(TableInfo.builder().tableName(args[2]).build());
        long startTime = System.currentTimeMillis();
        job.run();
        System.out.println("Job Finished in "
                + (System.currentTimeMillis() - startTime) / 1000.0 + " seconds");
    }
}

Key points:

  • `SSSPGraphLoader.load()` reads each record as a triple: source vertex ID, destination vertex ID, and edge weight. It calls addEdgeRequest twice — once in each direction — so that every record becomes a bidirectional edge. Both calls share the same edge weight, making the graph symmetrically weighted.

  • `SSSPLoadingVertexResolver.resolve()` handles conflicts from addEdgeRequest: when the same edge is requested more than once for a vertex, it deduplicates using a HashSet of destination vertex IDs, keeping only the first occurrence.

Execution results

The following output is from running the directed graph example. Each row shows a vertex and its shortest distance from the source vertex.

vertex    value
1        0
2        2
3        1
4        3
5        2
  • vertex: the current vertex

  • value: the SSSP value

For the undirected graph, load table records with source vertex ID, destination vertex ID, and edge weight columns — the loader handles bidirectional edge creation automatically.

What's next