Single Source Shortest Path (SSSP) refers to the shortest path from a given source vertex to every other vertex in a graph. MaxCompute Graph implements SSSP using Dijkstra's algorithm.
How it works
Dijkstra's algorithm works by having each vertex hold its current SSSP value and propagate updates to neighbors through message passing.
Initialization: The source vertex s starts with distance 0 (d[s] = 0). All other vertices start with infinite distance (d[u] = ∞).
Iteration: At each iteration, if a vertex receives a message indicating a shorter path, it updates its SSSP value and notifies its adjacent vertices using this formula:
d[v] = min(d[v], d[u] + weight(u, v))
Termination: When no vertex updates its SSSP value in an iteration, all vertices vote to halt and the job finishes.
For a weighted directed graphG = (V,E), multiple paths may exist from source vertexsto sink vertexv. The SSSP is the path with the smallest total edge weight.
Graph types
MaxCompute supports both directed and undirected graphs. SSSP results differ between the two because path availability depends on edge direction. MaxCompute Graph uses a directed graph as its base data model, but you can model undirected graphs by representing each undirected edge as two directed edges.
Framework roles
Each SSSP implementation requires three components that plug into the MaxCompute Graph framework:
| Component | Class | Role |
|---|---|---|
| Vertex program | SSSPVertex (extends Vertex) |
Receives messages, updates SSSP value, sends messages to neighbors, votes to halt |
| Message combiner | MinLongCombiner (extends Combiner) |
Merges multiple messages to the same vertex into one, reducing network traffic |
| Graph loader | SSSPGraphLoader (extends GraphLoader) |
Reads table records and converts them into graph vertices and edges |
A fourth component, LoadingVertexResolver, handles conflicts that arise during data loading (for example, when the same vertex is added more than once).
Sample code
Directed graph
The directed graph implementation uses two classes: BaseLoadingVertexResolver (conflict resolution) and SSSP (the main job).
`BaseLoadingVertexResolver` — handles vertex loading conflicts:
import com.aliyun.odps.graph.Edge;
import com.aliyun.odps.graph.LoadingVertexResolver;
import com.aliyun.odps.graph.Vertex;
import com.aliyun.odps.graph.VertexChanges;
import com.aliyun.odps.io.Writable;
import com.aliyun.odps.io.WritableComparable;
import java.io.IOException;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Set;
@SuppressWarnings("rawtypes")
public class BaseLoadingVertexResolver<I extends WritableComparable, V extends Writable, E extends Writable, M extends Writable>
extends LoadingVertexResolver<I, V, E, M> {
@Override
public Vertex<I, V, E, M> resolve(I vertexId, VertexChanges<I, V, E, M> vertexChanges) throws IOException {
Vertex<I, V, E, M> vertex = addVertexIfDesired(vertexId, vertexChanges);
if (vertex != null) {
addEdges(vertex, vertexChanges);
} else {
System.err.println("Ignore all addEdgeRequests for vertex#" + vertexId);
}
return vertex;
}
protected Vertex<I, V, E, M> addVertexIfDesired(
I vertexId,
VertexChanges<I, V, E, M> vertexChanges) {
Vertex<I, V, E, M> vertex = null;
if (hasVertexAdditions(vertexChanges)) {
vertex = vertexChanges.getAddedVertexList().get(0);
}
return vertex;
}
protected void addEdges(Vertex<I, V, E, M> vertex,
VertexChanges<I, V, E, M> vertexChanges) throws IOException {
Set<I> destVertexId = new HashSet<I>();
if (vertex.hasEdges()) {
List<Edge<I, E>> edgeList = vertex.getEdges();
for (Iterator<Edge<I, E>> edges = edgeList.iterator(); edges.hasNext(); ) {
Edge<I, E> edge = edges.next();
if (destVertexId.contains(edge.getDestVertexId())) {
edges.remove();
} else {
destVertexId.add(edge.getDestVertexId());
}
}
}
for (Vertex<I, V, E, M> vertex1 : vertexChanges.getAddedVertexList()) {
if (vertex1.hasEdges()) {
List<Edge<I, E>> edgeList = vertex1.getEdges();
for (Edge<I, E> edge : edgeList) {
if (destVertexId.contains(edge.getDestVertexId())) continue;
destVertexId.add(edge.getDestVertexId());
vertex.addEdge(edge.getDestVertexId(), edge.getValue());
}
}
}
}
protected boolean hasVertexAdditions(VertexChanges<I, V, E, M> changes) {
return changes != null && changes.getAddedVertexList() != null
&& !changes.getAddedVertexList().isEmpty();
}
}
Key points:
-
resolve()is called when a loading conflict occurs — for example, ifaddVertexRequestis called twice for the same vertex. The method selects the first vertex and deduplicates its edges. -
Without this resolver, duplicate vertex additions during data loading would cause undefined behavior before computation starts.
`SSSP` — the main job class:
import java.io.IOException;
import com.aliyun.odps.graph.Combiner;
import com.aliyun.odps.graph.ComputeContext;
import com.aliyun.odps.graph.Edge;
import com.aliyun.odps.graph.GraphJob;
import com.aliyun.odps.graph.GraphLoader;
import com.aliyun.odps.graph.MutationContext;
import com.aliyun.odps.graph.Vertex;
import com.aliyun.odps.graph.WorkerContext;
import com.aliyun.odps.io.WritableRecord;
import com.aliyun.odps.io.LongWritable;
import com.aliyun.odps.data.TableInfo;
public class SSSP {
public static final String START_VERTEX = "sssp.start.vertex.id";
public static class SSSPVertex extends
Vertex<LongWritable, LongWritable, LongWritable, LongWritable> {
private static long startVertexId = -1;
public SSSPVertex() {
this.setValue(new LongWritable(Long.MAX_VALUE));
}
public boolean isStartVertex(
ComputeContext<LongWritable, LongWritable, LongWritable, LongWritable> context) {
if (startVertexId == -1) {
String s = context.getConfiguration().get(START_VERTEX);
startVertexId = Long.parseLong(s);
}
return getId().get() == startVertexId;
}
@Override
public void compute(
ComputeContext<LongWritable, LongWritable, LongWritable, LongWritable> context,
Iterable<LongWritable> messages) throws IOException {
long minDist = isStartVertex(context) ? 0 : Long.MAX_VALUE;
for (LongWritable msg : messages) {
if (msg.get() < minDist) {
minDist = msg.get();
}
}
if (minDist < this.getValue().get()) {
this.setValue(new LongWritable(minDist));
if (hasEdges()) {
for (Edge<LongWritable, LongWritable> e : this.getEdges()) {
context.sendMessage(e.getDestVertexId(), new LongWritable(minDist + e.getValue().get()));
}
}
} else {
voteToHalt(); // No update — this vertex is done
}
}
@Override
public void cleanup(
WorkerContext<LongWritable, LongWritable, LongWritable, LongWritable> context)
throws IOException {
context.write(getId(), getValue()); // Write final SSSP value to output table
}
@Override
public String toString() {
return "Vertex(id=" + this.getId() + ",value=" + this.getValue() + ",#edges=" + this.getEdges() + ")";
}
}
public static class SSSPGraphLoader extends
GraphLoader<LongWritable, LongWritable, LongWritable, LongWritable> {
@Override
public void load(
LongWritable recordNum,
WritableRecord record,
MutationContext<LongWritable, LongWritable, LongWritable, LongWritable> context)
throws IOException {
SSSPVertex vertex = new SSSPVertex();
vertex.setId((LongWritable) record.get(0));
String[] edges = record.get(1).toString().split(",");
for (String edge : edges) {
String[] ss = edge.split(":");
vertex.addEdge(new LongWritable(Long.parseLong(ss[0])), new LongWritable(Long.parseLong(ss[1])));
}
context.addVertexRequest(vertex);
}
}
public static class MinLongCombiner extends
Combiner<LongWritable, LongWritable> {
@Override
public void combine(LongWritable vertexId, LongWritable combinedMessage,
LongWritable messageToCombine) throws IOException {
if (combinedMessage.get() > messageToCombine.get()) {
combinedMessage.set(messageToCombine.get());
}
}
}
public static void main(String[] args) throws IOException {
if (args.length < 3) {
System.out.println("Usage: <startnode> <input> <output>");
System.exit(-1);
}
GraphJob job = new GraphJob();
job.setGraphLoaderClass(SSSPGraphLoader.class);
job.setVertexClass(SSSPVertex.class);
job.setCombinerClass(MinLongCombiner.class);
job.setLoadingVertexResolver(BaseLoadingVertexResolver.class);
job.set(START_VERTEX, args[0]);
job.addInput(TableInfo.builder().tableName(args[1]).build());
job.addOutput(TableInfo.builder().tableName(args[2]).build());
long startTime = System.currentTimeMillis();
job.run();
System.out.println("Job Finished in "
+ (System.currentTimeMillis() - startTime) / 1000.0 + " seconds");
}
}
Key points:
-
`SSSPVertex.compute()` implements
d[v] = min(d[v], d[u] + weight(u, v)). The vertex finds the minimum distance across all received messages, updates its value if it improved, then sends the new distance plus each outgoing edge weight to neighbors. If the value has not changed, it callsvoteToHalt()to signal that this vertex needs no further computation. -
`SSSPGraphLoader.load()` reads each table record as a vertex with a comma-separated edge list in
"destId:weight"format. It usesaddVertexRequestto register the vertex, which may triggerBaseLoadingVertexResolverif the same vertex appears multiple times. -
`MinLongCombiner.combine()` keeps only the minimum distance among all messages bound for the same vertex. This reduces message volume, improving computation performance and reducing memory usage.
-
`GraphJob` (main) wires the components together: loader, vertex class, combiner, and conflict resolver. It sets the start vertex ID (passed as
args[0]) and binds input and output tables.
Undirected graph
The undirected graph implementation (SSSPBenchmark4) differs from the directed graph in two ways:
-
Edge loading:
SSSPGraphLoader.load()callsaddEdgeRequesttwice per record — once for each direction — making every edge bidirectional. -
Data types: Edge weights and distances use
DoubleWritableinstead ofLongWritable, supporting fractional weights.
import com.aliyun.odps.data.TableInfo;
import com.aliyun.odps.graph.*;
import com.aliyun.odps.io.DoubleWritable;
import com.aliyun.odps.io.LongWritable;
import com.aliyun.odps.io.WritableRecord;
import java.io.IOException;
import java.util.HashSet;
import java.util.Set;
public class SSSPBenchmark4 {
public static final String START_VERTEX = "sssp.start.vertex.id";
public static class SSSPVertex extends
Vertex<LongWritable, DoubleWritable, DoubleWritable, DoubleWritable> {
private static long startVertexId = -1;
public SSSPVertex() {
this.setValue(new DoubleWritable(Double.MAX_VALUE));
}
public boolean isStartVertex(
ComputeContext<LongWritable, DoubleWritable, DoubleWritable, DoubleWritable> context) {
if (startVertexId == -1) {
String s = context.getConfiguration().get(START_VERTEX);
startVertexId = Long.parseLong(s);
}
return getId().get() == startVertexId;
}
@Override
public void compute(
ComputeContext<LongWritable, DoubleWritable, DoubleWritable, DoubleWritable> context,
Iterable<DoubleWritable> messages) throws IOException {
double minDist = isStartVertex(context) ? 0 : Double.MAX_VALUE;
for (DoubleWritable msg : messages) {
if (msg.get() < minDist) {
minDist = msg.get();
}
}
if (minDist < this.getValue().get()) {
this.setValue(new DoubleWritable(minDist));
if (hasEdges()) {
for (Edge<LongWritable, DoubleWritable> e : this.getEdges()) {
context.sendMessage(e.getDestVertexId(), new DoubleWritable(minDist
+ e.getValue().get()));
}
}
} else {
voteToHalt();
}
}
@Override
public void cleanup(
WorkerContext<LongWritable, DoubleWritable, DoubleWritable, DoubleWritable> context)
throws IOException {
context.write(getId(), getValue());
}
}
public static class MinLongCombiner extends
Combiner<LongWritable, DoubleWritable> {
@Override
public void combine(LongWritable vertexId, DoubleWritable combinedMessage,
DoubleWritable messageToCombine) {
if (combinedMessage.get() > messageToCombine.get()) {
combinedMessage.set(messageToCombine.get());
}
}
}
public static class SSSPGraphLoader extends
GraphLoader<LongWritable, DoubleWritable, DoubleWritable, DoubleWritable> {
@Override
public void load(
LongWritable recordNum,
WritableRecord record,
MutationContext<LongWritable, DoubleWritable, DoubleWritable, DoubleWritable> context)
throws IOException {
LongWritable sourceVertexID = (LongWritable) record.get(0); // Source vertex ID
LongWritable destinationVertexID = (LongWritable) record.get(1); // Destination vertex ID
DoubleWritable edgeValue = (DoubleWritable) record.get(2); // Edge weight
// Add a directed edge from source to destination
Edge<LongWritable, DoubleWritable> edge = new Edge<LongWritable, DoubleWritable>(destinationVertexID, edgeValue);
context.addEdgeRequest(sourceVertexID, edge);
// Add the reverse edge to make the graph undirected
Edge<LongWritable, DoubleWritable> edge2 = new
Edge<LongWritable, DoubleWritable>(sourceVertexID, edgeValue);
context.addEdgeRequest(destinationVertexID, edge2);
}
}
public static class SSSPLoadingVertexResolver extends
LoadingVertexResolver<LongWritable, DoubleWritable, DoubleWritable, DoubleWritable> {
@Override
public Vertex<LongWritable, DoubleWritable, DoubleWritable, DoubleWritable> resolve(
LongWritable vertexId,
VertexChanges<LongWritable, DoubleWritable, DoubleWritable, DoubleWritable> vertexChanges) throws IOException {
SSSPVertex computeVertex = new SSSPVertex();
computeVertex.setId(vertexId);
Set<LongWritable> destinationVertexIDSet = new HashSet<>();
if (hasEdgeAdditions(vertexChanges)) {
for (Edge<LongWritable, DoubleWritable> edge : vertexChanges.getAddedEdgeList()) {
if (!destinationVertexIDSet.contains(edge.getDestVertexId())) {
destinationVertexIDSet.add(edge.getDestVertexId());
computeVertex.addEdge(edge.getDestVertexId(), edge.getValue());
}
}
}
return computeVertex;
}
protected boolean hasEdgeAdditions(VertexChanges<LongWritable, DoubleWritable, DoubleWritable, DoubleWritable> changes) {
return changes != null && changes.getAddedEdgeList() != null
&& !changes.getAddedEdgeList().isEmpty();
}
}
public static void main(String[] args) throws IOException {
if (args.length < 2) {
System.out.println("Usage: <startnode> <input> <output>");
System.exit(-1);
}
GraphJob job = new GraphJob();
job.setGraphLoaderClass(SSSPGraphLoader.class);
job.setLoadingVertexResolver(SSSPLoadingVertexResolver.class);
job.setVertexClass(SSSPVertex.class);
job.setCombinerClass(MinLongCombiner.class);
job.set(START_VERTEX, args[0]);
job.addInput(TableInfo.builder().tableName(args[1]).build());
job.addOutput(TableInfo.builder().tableName(args[2]).build());
long startTime = System.currentTimeMillis();
job.run();
System.out.println("Job Finished in "
+ (System.currentTimeMillis() - startTime) / 1000.0 + " seconds");
}
}
Key points:
-
`SSSPGraphLoader.load()` reads each record as a triple: source vertex ID, destination vertex ID, and edge weight. It calls
addEdgeRequesttwice — once in each direction — so that every record becomes a bidirectional edge. Both calls share the same edge weight, making the graph symmetrically weighted. -
`SSSPLoadingVertexResolver.resolve()` handles conflicts from
addEdgeRequest: when the same edge is requested more than once for a vertex, it deduplicates using aHashSetof destination vertex IDs, keeping only the first occurrence.
Execution results
The following output is from running the directed graph example. Each row shows a vertex and its shortest distance from the source vertex.
vertex value
1 0
2 2
3 1
4 3
5 2
-
vertex: the current vertex
-
value: the SSSP value
For the undirected graph, load table records with source vertex ID, destination vertex ID, and edge weight columns — the loader handles bidirectional edge creation automatically.
What's next
-
Write a Graph job — end-to-end guide for running graph jobs on MaxCompute
-
Develop Graph programs — full tutorial covering the SSSP example above