単一ソース最短パス (SSSP) は、グラフ内の特定のソース頂点から他のすべての頂点までの最短パスを指します。 ダイクストラのアルゴリズムは、有向グラフの頂点間のSSSPの値を計算するために使用される一般的なアルゴリズムです。
Dijkstraのアルゴリズムの仕組み
ダイクストラのアルゴリズムは、頂点を使用してSSSP値を更新します。 各頂点は、現在のSSSP値を維持する。 現在のSSSP値が変化した場合、頂点は新しいSSSP値とedge weightの合計を計算し、隣接する頂点に合計を通知するメッセージを送信します。 次の反復では、隣接する頂点は、受信したメッセージに基づいてSSSP値を更新する。 反復は、すべての頂点からソース頂点までのSSSPの値が変更されないままになるまで終了しません。
初期化: ソース頂点sからソース頂点sへのパスは0 (
d[s] = 0) であり、uからsへのパスは無限大 (d[u] = ∞) です。反復: uからvまでのエッジが存在する場合、sからvまでのSSSPの値は、以下の式を使用することによって計算される。
d[v] = min(d[v], d[u] + weight(u, v))。 反復は、すべての頂点からsまでのSSSPの値が変更されないままになるまで終了しません。
重み付き有向グラフG = (V,E) の場合、ソース頂点sからシンク頂点vまでの複数の経路が利用可能である。エッジ重み値の和が最小の経路は、sからvまでのSSSPである。
Dijkstraのアルゴリズムの実装原理は、このアルゴリズムがMaxCompute Graphに適していることを示しています。
シナリオ
グラフは、有向グラフと無向グラフの2種類に分類される。 MaxComputeは2つのタイプの両方をサポートしています。 有向グラフと無向グラフのパスは、ソースデータの分布によって異なります。 したがって、有向グラフと無向グラフのSSSP計算結果が異なる場合がある。 MaxCompute Graphは、有向グラフを基本データモデルとして使用して、フレームワーク内のSSSP値を計算します。
サンプルコード
サンプルコードは、グラフタイプに基づいて提供されます。
有向グラフ
SSSPクラスで参照されるBaseLoadingVertexResolverクラスを定義します。import com.aliyun.odps.graph.Edge; import com.aliyun.odps.graph.LoadingVertexResolver; import com.aliyun.odps.graph.Vertex; import com.aliyun.odps.graph.VertexChanges; import com.aliyun.odps.io.Writable; import com.aliyun.odps.io.WritableComparable; import java.io.IOException; import java.util.HashSet; import java.util.Iterator; import java.util.List; import java.util.Set; @SuppressWarnings("rawtypes") public class BaseLoadingVertexResolver<I extends WritableComparable, V extends Writable, E extends Writable, M extends Writable> extends LoadingVertexResolver<I, V, E, M> { @Override public Vertex<I, V, E, M> resolve(I vertexId, VertexChanges<I, V, E, M> vertexChanges) throws IOException { Vertex<I, V, E, M> vertex = addVertexIfDesired(vertexId, vertexChanges); if (vertex != null) { addEdges(vertex, vertexChanges); } else { System.err.println("Ignore all addEdgeRequests for vertex#" + vertexId); } return vertex; } protected Vertex<I, V, E, M> addVertexIfDesired( I vertexId, VertexChanges<I, V, E, M> vertexChanges) { Vertex<I, V, E, M> vertex = null; if (hasVertexAdditions(vertexChanges)) { vertex = vertexChanges.getAddedVertexList().get(0); } return vertex; } protected void addEdges(Vertex<I, V, E, M> vertex, VertexChanges<I, V, E, M> vertexChanges) throws IOException { Set<I> destVertexId = new HashSet<I>(); if (vertex.hasEdges()) { List<Edge<I, E>> edgeList = vertex.getEdges(); for (Iterator<Edge<I, E>> edges = edgeList.iterator(); edges.hasNext(); ) { Edge<I, E> edge = edges.next(); if (destVertexId.contains(edge.getDestVertexId())) { edges.remove(); } else { destVertexId.add(edge.getDestVertexId()); } } } for (Vertex<I, V, E, M> vertex1 : vertexChanges.getAddedVertexList()) { if (vertex1.hasEdges()) { List<Edge<I, E>> edgeList = vertex1.getEdges(); for (Edge<I, E> edge : edgeList) { if (destVertexId.contains(edge.getDestVertexId())) continue; destVertexId.add(edge.getDestVertexId()); vertex.addEdge(edge.getDestVertexId(), edge.getValue()); } } } } protected boolean hasVertexAdditions(VertexChanges<I, V, E, M> changes) { return changes != null && changes.getAddedVertexList() != null && !changes.getAddedVertexList().isEmpty(); } }コードの説明:
15行目: BaseLoadingVertexResolverクラスを定義します。 このクラスは、有向グラフにデータをロードするときに発生する可能性のある競合を処理するために使用されます。
18行目: resolveは、競合を処理するために使用されるメソッドです。 たとえば、addVertexRequestメソッドを2回呼び出して頂点を追加すると、読み込みの競合が発生します。 この場合、SSSP値を計算する前に競合を処理する必要があります。
SSSPクラスを定義します。import java.io.IOException; import com.aliyun.odps.graph.Combiner; import com.aliyun.odps.graph.ComputeContext; import com.aliyun.odps.graph.Edge; import com.aliyun.odps.graph.GraphJob; import com.aliyun.odps.graph.GraphLoader; import com.aliyun.odps.graph.MutationContext; import com.aliyun.odps.graph.Vertex; import com.aliyun.odps.graph.WorkerContext; import com.aliyun.odps.io.WritableRecord; import com.aliyun.odps.io.LongWritable; import com.aliyun.odps.data.TableInfo; public class SSSP { public static final String START_VERTEX = "sssp.start.vertex.id"; public static class SSSPVertex extends Vertex<LongWritable, LongWritable, LongWritable, LongWritable> { private static long startVertexId = -1; public SSSPVertex() { this.setValue(new LongWritable(Long.MAX_VALUE)); } public boolean isStartVertex( ComputeContext<LongWritable, LongWritable, LongWritable, LongWritable> context) { if (startVertexId == -1) { String s = context.getConfiguration().get(START_VERTEX); startVertexId = Long.parseLong(s); } return getId().get() == startVertexId; } @Override public void compute( ComputeContext<LongWritable, LongWritable, LongWritable, LongWritable> context, Iterable<LongWritable> messages) throws IOException { long minDist = isStartVertex(context) ? 0 : Long.MAX_VALUE; for (LongWritable msg : messages) { if (msg.get() < minDist) { minDist = msg.get(); } } if (minDist < this.getValue().get()) { this.setValue(new LongWritable(minDist)); if (hasEdges()) { for (Edge<LongWritable, LongWritable> e : this.getEdges()) { context.sendMessage(e.getDestVertexId(), new LongWritable(minDist + e.getValue().get())); } } } else { voteToHalt(); } } @Override public void cleanup( WorkerContext<LongWritable, LongWritable, LongWritable, LongWritable> context) throws IOException { context.write(getId(), getValue()); } @Override public String toString() { return "Vertex(id=" + this.getId() + ",value=" + this.getValue() + ",#edges=" + this.getEdges() + ")"; } } public static class SSSPGraphLoader extends GraphLoader<LongWritable, LongWritable, LongWritable, LongWritable> { @Override public void load( LongWritable recordNum, WritableRecord record, MutationContext<LongWritable, LongWritable, LongWritable, LongWritable> context) throws IOException { SSSPVertex vertex = new SSSPVertex(); vertex.setId((LongWritable) record.get(0)); String[] edges = record.get(1).toString().split(","); for (String edge : edges) { String[] ss = edge.split(":"); vertex.addEdge(new LongWritable(Long.parseLong(ss[0])), new LongWritable(Long.parseLong(ss[1]))); } context.addVertexRequest(vertex); } } public static class MinLongCombiner extends Combiner<LongWritable, LongWritable> { @Override public void combine(LongWritable vertexId, LongWritable combinedMessage, LongWritable messageToCombine) throws IOException { if (combinedMessage.get() > messageToCombine.get()) { combinedMessage.set(messageToCombine.get()); } } } public static void main(String[] args) throws IOException { if (args.length < 3) { System.out.println("Usage: <startnode> <input> <output>"); System.exit(-1); } GraphJob job = new GraphJob(); job.setGraphLoaderClass(SSSPGraphLoader.class); job.setVertexClass(SSSPVertex.class); job.setCombinerClass(MinLongCombiner.class); job.setLoadingVertexResolver(BaseLoadingVertexResolver.class); job.set(START_VERTEX, args[0]); job.addInput(TableInfo.builder().tableName(args[1]).build()); job.addOutput(TableInfo.builder().tableName(args[2]).build()); long startTime = System.currentTimeMillis(); job.run(); System.out.println("Job Finished in " + (System.currentTimeMillis() - startTime) / 1000.0 + " seconds"); } }コードの説明:
19行目: SSSPVertexクラスを定義します。
頂点値は、startVertexIdで指定された頂点からソース頂点までのSSSPの値を示します。
compute() メソッドは、以下の反復式を使用してSSSP値を計算し、SSSP値を頂点値に更新します。
d[v] = min(d[v], d[u] + weight(u, v))。cleanup() メソッドは、SSSP値を結果テーブルに書き込みます。
54行目: 現在の頂点からソース頂点へのSSSPの値が変化しない場合、voteToHalt() メソッドが呼び出され、フレームワークを使用して頂点を停止します。 すべての頂点が停止に投票すると、計算は終了します。
71行目: テーブルから有向グラフにデータをロードするために使用されるGraphLoaderクラスを定義します。 テーブルに格納されたレコードは、グラフの頂点またはエッジに分解され、フレームワークにロードされます。 上記のサンプルコードでは、addVertexRequestメソッドを使用して、グラフ頂点をグラフ計算のコンテキストに読み込みます。
90行目: MinLongCombinerクラスを定義します。 このクラスは、同じ頂点に送信されるメッセージをマージするために使用されます。 このクラスは、計算パフォーマンスの向上とメモリ使用量の削減に役立ちます。
行101: メインプログラムのmain関数でGraphJobクラスを定義します。 GraphJobクラスは、Vertex、GraphLoader、BaseLoadingVertexResolver、およびCombinerクラスの動作を指定するために使用されます。 GraphJobクラスは、入力テーブルと出力テーブルも指定します。
行110: 競合の処理に使用されるBaseLoadingVertexResolverクラスを追加します。
無向グラフ
import com.aliyun.odps.data.TableInfo; import com.aliyun.odps.graph.*; import com.aliyun.odps.io.DoubleWritable; import com.aliyun.odps.io.LongWritable; import com.aliyun.odps.io.WritableRecord; import java.io.IOException; import java.util.HashSet; import java.util.Set; public class SSSPBenchmark4 { public static final String START_VERTEX = "sssp.start.vertex.id"; public static class SSSPVertex extends Vertex<LongWritable, DoubleWritable, DoubleWritable, DoubleWritable> { private static long startVertexId = -1; public SSSPVertex() { this.setValue(new DoubleWritable(Double.MAX_VALUE)); } public boolean isStartVertex( ComputeContext<LongWritable, DoubleWritable, DoubleWritable, DoubleWritable> context) { if (startVertexId == -1) { String s = context.getConfiguration().get(START_VERTEX); startVertexId = Long.parseLong(s); } return getId().get() == startVertexId; } @Override public void compute( ComputeContext<LongWritable, DoubleWritable, DoubleWritable, DoubleWritable> context, Iterable<DoubleWritable> messages) throws IOException { double minDist = isStartVertex(context) ? 0 : Double.MAX_VALUE; for (DoubleWritable msg : messages) { if (msg.get() < minDist) { minDist = msg.get(); } } if (minDist < this.getValue().get()) { this.setValue(new DoubleWritable(minDist)); if (hasEdges()) { for (Edge<LongWritable, DoubleWritable> e : this.getEdges()) { context.sendMessage(e.getDestVertexId(), new DoubleWritable(minDist + e.getValue().get())); } } } else { voteToHalt(); } } @Override public void cleanup( WorkerContext<LongWritable, DoubleWritable, DoubleWritable, DoubleWritable> context) throws IOException { context.write(getId(), getValue()); } } public static class MinLongCombiner extends Combiner<LongWritable, DoubleWritable> { @Override public void combine(LongWritable vertexId, DoubleWritable combinedMessage, DoubleWritable messageToCombine) { if (combinedMessage.get() > messageToCombine.get()) { combinedMessage.set(messageToCombine.get()); } } } public static class SSSPGraphLoader extends GraphLoader<LongWritable, DoubleWritable, DoubleWritable, DoubleWritable> { @Override public void load( LongWritable recordNum, WritableRecord record, MutationContext<LongWritable, DoubleWritable, DoubleWritable, DoubleWritable> context) throws IOException { LongWritable sourceVertexID = (LongWritable) record.get(0); LongWritable destinationVertexID = (LongWritable) record.get(1); DoubleWritable edgeValue = (DoubleWritable) record.get(2); Edge<LongWritable, DoubleWritable> edge = new Edge<LongWritable, DoubleWritable>(destinationVertexID, edgeValue); context.addEdgeRequest(sourceVertexID, edge); Edge<LongWritable, DoubleWritable> edge2 = new Edge<LongWritable, DoubleWritable>(sourceVertexID, edgeValue); context.addEdgeRequest(destinationVertexID, edge2); } } public static class SSSPLoadingVertexResolver extends LoadingVertexResolver<LongWritable, DoubleWritable, DoubleWritable, DoubleWritable> { @Override public Vertex<LongWritable, DoubleWritable, DoubleWritable, DoubleWritable> resolve( LongWritable vertexId, VertexChanges<LongWritable, DoubleWritable, DoubleWritable, DoubleWritable> vertexChanges) throws IOException { SSSPVertex computeVertex = new SSSPVertex(); computeVertex.setId(vertexId); Set<LongWritable> destinationVertexIDSet = new HashSet<>(); if (hasEdgeAdditions(vertexChanges)) { for (Edge<LongWritable, DoubleWritable> edge : vertexChanges.getAddedEdgeList()) { if (!destinationVertexIDSet.contains(edge.getDestVertexId())) { destinationVertexIDSet.add(edge.getDestVertexId()); computeVertex.addEdge(edge.getDestVertexId(), edge.getValue()); } } } return computeVertex; } protected boolean hasEdgeAdditions(VertexChanges<LongWritable, DoubleWritable, DoubleWritable, DoubleWritable> changes) { return changes != null && changes.getAddedEdgeList() != null && !changes.getAddedEdgeList().isEmpty(); } } public static void main(String[] args) throws IOException { if (args.length < 2) { System.out.println("Usage: <startnode> <input> <output>"); System.exit(-1); } GraphJob job = new GraphJob(); job.setGraphLoaderClass(SSSPGraphLoader.class); job.setLoadingVertexResolver(SSSPLoadingVertexResolver.class); job.setVertexClass(SSSPVertex.class); job.setCombinerClass(MinLongCombiner.class); job.set(START_VERTEX, args[0]); job.addInput(TableInfo.builder().tableName(args[1]).build()); job.addOutput(TableInfo.builder().tableName(args[2]).build()); long startTime = System.currentTimeMillis(); job.run(); System.out.println("Job Finished in " + (System.currentTimeMillis() - startTime) / 1000.0 + " seconds"); } }コードの説明:
15行目: SSSPVertexクラスを定義します。
頂点値は、startVertexIdで指定された頂点からソース頂点までのSSSPの値を示します。
compute() メソッドは、以下の反復式を使用してSSSP値を計算し、その値を頂点値に更新します。
d[v] = min(d[v], d[u] + weight(u, v))。cleanup() メソッドは、SSSP値を結果テーブルに書き込みます。
54行目: 現在の頂点からソース頂点へのSSSPの値が変化しない場合、フレームワークを使用して頂点を停止するためにvoteToHalt() が呼び出される。 すべての頂点が停止に投票すると、計算は終了します。
61行目: MinLongCombinerクラスを定義します。 このクラスは、同じ頂点に送信されるメッセージをマージするために使用されます。 このクラスは、計算パフォーマンスの向上とメモリ使用量の削減に役立ちます。
72行目: テーブルから無向グラフにデータをロードするために使用されるGraphLoaderクラスを定義します。 addEdgeRequestメソッドは、2つの頂点間のエッジを無向エッジとして使用するために呼び出されます。 これにより、現在のテーブルに格納されているグラフデータが無向グラフにロードされます。
80行目: ソース頂点のIDを指定します。
81行目: 宛先頂点のIDを指定します。
行82: エッジの重みを指定します。
83行目: 宛先頂点のIDとエッジの重みを指定して、宛先頂点にエッジを追加します。
84行目: ソース頂点にエッジを追加する要求を送信する。
85行目〜87行目: 各レコードは双方向エッジを示す。 これらの説明は、ライン83及びライン84と同様である。
SSSPLoadingVertexResolverクラスを定義します。 このクラスは、データを無向グラフにロードするときに発生する可能性のある競合を処理するために使用されます。 たとえば、addEdgeRequestメソッドを2回呼び出してエッジを追加すると、読み込みの競合が発生します。 この場合、SSSP値を計算する前に競合を処理する必要があります。
行101: メインプログラムのmain関数でGraphJobクラスを定義します。 GraphJobクラスは、Vertex、GraphLoader、BaseLoadingVertexResolver、およびCombinerクラスの動作を指定するために使用されます。 GraphJobクラスは、入力テーブルと出力テーブルも指定します。
実行結果
次のコードは、有向グラフに基づくコード例の実行結果です。 詳細については、「Graphジョブの書き込み」をご参照ください。
vertex value
1 0
2 2
3 1
4 3
5 2vertex: 現在の頂点。
value: SSSP値。
ソース頂点ID、宛先頂点ID、およびエッジ重みを使用して、無向グラフのサンプルコードに基づいてデータを無向グラフにロードできます。
例チュートリアル
上記のコード例の詳細については、「グラフプログラムの開発」をご参照ください。