全部产品
Search
文档中心

MaxCompute:SSSP

更新时间:Jul 02, 2025

Single Source Shortest Path (SSSP) adalah jalur terpendek dari simpul sumber tertentu ke setiap simpul lain dalam graf. Algoritma Dijkstra merupakan algoritma umum yang digunakan untuk menghitung nilai SSSP antara simpul-simpul dalam graf berarah.

Cara kerja algoritma Dijkstra

Algoritma Dijkstra memperbarui SSSP values menggunakan vertices. Setiap vertex mempertahankan SSSP value saat ini. Jika SSSP value berubah, simpul tersebut menghitung jumlah dari nilai SSSP baru dan edge weight, lalu mengirim pesan untuk memberi tahu adjacent vertices tentang jumlah tersebut. Pada iterasi berikutnya, adjacent vertices memperbarui SSSP mereka berdasarkan pesan yang diterima. Iterasi berakhir ketika nilai SSSPs dari semua vertices ke simpul sumber tidak lagi berubah.

  • Inisialisasi: Jalur dari Simpul Sumber s ke Simpul Sumber s adalah 0 (d[s] = 0), dan jalur dari u ke s adalah tak terbatas (d[u] = ∞).

  • Iterasi: Jika ada sisi dari u ke v, nilai SSSP dari s ke v dihitung menggunakan rumus berikut: d[v] = min(d[v], d[u] + weight(u, v)). Iterasi berakhir ketika nilai SSSP dari semua simpul ke s tidak lagi berubah.

Catatan

Untuk graf berarah dengan bobot G = (V,E), beberapa jalur tersedia dari Simpul Sumber s ke Simpul Tujuan v. Jalur dengan total bobot sisi terkecil adalah SSSP dari s ke v.

Prinsip implementasi algoritma Dijkstra menunjukkan bahwa algoritma ini cocok untuk MaxCompute Graph.

Skenario

Graf diklasifikasikan menjadi dua jenis: graf berarah dan graf tak berarah. MaxCompute mendukung kedua jenis tersebut. Jalur dalam graf berarah dan graf tak berarah bervariasi berdasarkan distribusi data sumber. Oleh karena itu, hasil perhitungan SSSP untuk graf berarah dan graf tak berarah mungkin berbeda. MaxCompute Graph menggunakan graf berarah sebagai model data dasar untuk menghitung nilai SSSP dalam kerangka kerja.

Kode contoh

Kode contoh disediakan berdasarkan jenis graf.

  • Graf Berarah

    • Definisikan kelas BaseLoadingVertexResolver, yang direferensikan dalam kelas SSSP.

      import com.aliyun.odps.graph.Edge;
      import com.aliyun.odps.graph.LoadingVertexResolver;
      import com.aliyun.odps.graph.Vertex;
      import com.aliyun.odps.graph.VertexChanges;
      import com.aliyun.odps.io.Writable;
      import com.aliyun.odps.io.WritableComparable;
      
      import java.io.IOException;
      import java.util.HashSet;
      import java.util.Iterator;
      import java.util.List;
      import java.util.Set;
      
      @SuppressWarnings("rawtypes")
      public class BaseLoadingVertexResolver<I extends WritableComparable, V extends Writable, E extends Writable, M extends Writable>
              extends LoadingVertexResolver<I, V, E, M> {
        @Override
        public Vertex<I, V, E, M> resolve(I vertexId, VertexChanges<I, V, E, M> vertexChanges) throws IOException {
      
          Vertex<I, V, E, M> vertex = addVertexIfDesired(vertexId, vertexChanges);
      
          if (vertex != null) {
            addEdges(vertex, vertexChanges);
          } else {
            System.err.println("Ignore all addEdgeRequests for vertex#" + vertexId);
          }
          return vertex;
        }
      
        protected Vertex<I, V, E, M> addVertexIfDesired(
                I vertexId,
                VertexChanges<I, V, E, M> vertexChanges) {
          Vertex<I, V, E, M> vertex = null;
          if (hasVertexAdditions(vertexChanges)) {
            vertex = vertexChanges.getAddedVertexList().get(0);
          }
      
          return vertex;
        }
      
        protected void addEdges(Vertex<I, V, E, M> vertex,
                                VertexChanges<I, V, E, M> vertexChanges) throws IOException {
          Set<I> destVertexId = new HashSet<I>();
          if (vertex.hasEdges()) {
            List<Edge<I, E>> edgeList = vertex.getEdges();
            for (Iterator<Edge<I, E>> edges = edgeList.iterator(); edges.hasNext(); ) {
              Edge<I, E> edge = edges.next();
              if (destVertexId.contains(edge.getDestVertexId())) {
                edges.remove();
              } else {
                destVertexId.add(edge.getDestVertexId());
              }
            }
          }
      
          for (Vertex<I, V, E, M> vertex1 : vertexChanges.getAddedVertexList()) {
            if (vertex1.hasEdges()) {
              List<Edge<I, E>> edgeList = vertex1.getEdges();
              for (Edge<I, E> edge : edgeList) {
                if (destVertexId.contains(edge.getDestVertexId())) continue;
                destVertexId.add(edge.getDestVertexId());
                vertex.addEdge(edge.getDestVertexId(), edge.getValue());
              }
            }
          }
        }
      
        protected boolean hasVertexAdditions(VertexChanges<I, V, E, M> changes) {
          return changes != null && changes.getAddedVertexList() != null
                  && !changes.getAddedVertexList().isEmpty();
        }
      }

      Deskripsi Kode:

      • Baris 15: Definisikan kelas BaseLoadingVertexResolver. Kelas ini digunakan untuk menangani konflik yang mungkin terjadi saat memuat data ke graf berarah.

      • Baris 18: resolve adalah metode yang digunakan untuk menangani konflik. Misalnya, jika Anda memanggil metode addVertexRequest dua kali untuk menambahkan simpul, konflik pemuatan terjadi. Dalam hal ini, Anda harus menangani konflik sebelum menghitung nilai SSSP.

    • Definisikan kelas SSSP.

      import java.io.IOException;
      
      import com.aliyun.odps.graph.Combiner;
      import com.aliyun.odps.graph.ComputeContext;
      import com.aliyun.odps.graph.Edge;
      import com.aliyun.odps.graph.GraphJob;
      import com.aliyun.odps.graph.GraphLoader;
      import com.aliyun.odps.graph.MutationContext;
      import com.aliyun.odps.graph.Vertex;
      import com.aliyun.odps.graph.WorkerContext;
      import com.aliyun.odps.io.WritableRecord;
      import com.aliyun.odps.io.LongWritable;
      import com.aliyun.odps.data.TableInfo;
      
      public class SSSP {
        public static final String START_VERTEX = "sssp.start.vertex.id";
      
        public static class SSSPVertex extends
                Vertex<LongWritable, LongWritable, LongWritable, LongWritable> {
          private static long startVertexId = -1;
      
          public SSSPVertex() {
            this.setValue(new LongWritable(Long.MAX_VALUE));
          }
      
          public boolean isStartVertex(
                  ComputeContext<LongWritable, LongWritable, LongWritable, LongWritable> context) {
            if (startVertexId == -1) {
              String s = context.getConfiguration().get(START_VERTEX);
              startVertexId = Long.parseLong(s);
            }
            return getId().get() == startVertexId;
          }
      
          @Override
          public void compute(
                  ComputeContext<LongWritable, LongWritable, LongWritable, LongWritable> context,
                  Iterable<LongWritable> messages) throws IOException {
            long minDist = isStartVertex(context) ? 0 : Long.MAX_VALUE;
            for (LongWritable msg : messages) {
              if (msg.get() < minDist) {
                minDist = msg.get();
              }
            }
            if (minDist < this.getValue().get()) {
              this.setValue(new LongWritable(minDist));
              if (hasEdges()) {
                for (Edge<LongWritable, LongWritable> e : this.getEdges()) {
                  context.sendMessage(e.getDestVertexId(), new LongWritable(minDist + e.getValue().get()));
                }
              }
            } else {
              voteToHalt();
            }
          }
      
          @Override
          public void cleanup(
                  WorkerContext<LongWritable, LongWritable, LongWritable, LongWritable> context)
                  throws IOException {
            context.write(getId(), getValue());
          }
      
          @Override
          public String toString() {
            return "Vertex(id=" + this.getId() + ",value=" + this.getValue() + ",#edges=" + this.getEdges() + ")";
          }
        }
      
        public static class SSSPGraphLoader extends
                GraphLoader<LongWritable, LongWritable, LongWritable, LongWritable> {
          @Override
          public void load(
                  LongWritable recordNum,
                  WritableRecord record,
                  MutationContext<LongWritable, LongWritable, LongWritable, LongWritable> context)
                  throws IOException {
            SSSPVertex vertex = new SSSPVertex();
            vertex.setId((LongWritable) record.get(0));
            String[] edges = record.get(1).toString().split(",");
            for (String edge : edges) {
              String[] ss = edge.split(":");
              vertex.addEdge(new LongWritable(Long.parseLong(ss[0])), new LongWritable(Long.parseLong(ss[1])));
            }
            context.addVertexRequest(vertex);
          }
        }
      
        public static class MinLongCombiner extends
                Combiner<LongWritable, LongWritable> {
          @Override
          public void combine(LongWritable vertexId, LongWritable combinedMessage,
                              LongWritable messageToCombine) throws IOException {
            if (combinedMessage.get() > messageToCombine.get()) {
              combinedMessage.set(messageToCombine.get());
            }
          }
        }
      
        public static void main(String[] args) throws IOException {
          if (args.length < 3) {
            System.out.println("Usage: <startnode> <input> <output>");
            System.exit(-1);
          }
          GraphJob job = new GraphJob();
          job.setGraphLoaderClass(SSSPGraphLoader.class);
          job.setVertexClass(SSSPVertex.class);
          job.setCombinerClass(MinLongCombiner.class);
          job.setLoadingVertexResolver(BaseLoadingVertexResolver.class);
          job.set(START_VERTEX, args[0]);
          job.addInput(TableInfo.builder().tableName(args[1]).build());
          job.addOutput(TableInfo.builder().tableName(args[2]).build());
          long startTime = System.currentTimeMillis();
          job.run();
          System.out.println("Job Finished in "
                  + (System.currentTimeMillis() - startTime) / 1000.0 + " seconds");
        }
      }
      
                                  

      Deskripsi Kode:

      • Baris 19: Definisikan kelas SSSPVertex.

        • Nilai simpul menunjukkan nilai SSSP dari simpul ke simpul sumber yang ditentukan oleh startVertexId.

        • Metode compute() menggunakan rumus iterasi berikut untuk menghitung nilai SSSP dan memperbarui nilai SSSP ke nilai simpul: d[v] = min(d[v], d[u] + weight(u, v)).

        • Metode cleanup() menulis nilai SSSP ke tabel hasil.

      • Baris 54: Jika nilai SSSP dari simpul saat ini ke simpul sumber tidak berubah, metode voteToHalt() dipanggil untuk halt simpul menggunakan kerangka kerja. Saat semua simpul memilih untuk halt, perhitungan berakhir.

      • Baris 71: Definisikan kelas GraphLoader yang digunakan untuk memuat data dari tabel ke graf berarah. Catatan yang disimpan dalam tabel dipecah menjadi simpul atau sisi graf dan dimuat ke kerangka kerja. Dalam kode contoh sebelumnya, metode addVertexRequest digunakan untuk memuat simpul graf ke konteks untuk perhitungan graf.

      • Baris 90: Definisikan kelas MinLongCombiner. Kelas ini digunakan untuk menggabungkan pesan yang dikirim ke simpul yang sama. Kelas ini membantu meningkatkan kinerja komputasi dan mengurangi penggunaan memori.

      • Baris 101: Definisikan kelas GraphJob dalam fungsi main program utama. Kelas GraphJob digunakan untuk menentukan cara kerja kelas Vertex, GraphLoader, BaseLoadingVertexResolver, dan Combiner. Kelas GraphJob juga menentukan tabel input dan output.

      • Baris 110: Tambahkan kelas BaseLoadingVertexResolver yang digunakan untuk menangani konflik.

  • Graf Tak Berarah

    import com.aliyun.odps.data.TableInfo;
    import com.aliyun.odps.graph.*;
    import com.aliyun.odps.io.DoubleWritable;
    import com.aliyun.odps.io.LongWritable;
    import com.aliyun.odps.io.WritableRecord;
    
    import java.io.IOException;
    import java.util.HashSet;
    import java.util.Set;
    
    public class SSSPBenchmark4 {
        public static final String START_VERTEX = "sssp.start.vertex.id";
    
        public static class SSSPVertex extends
                Vertex<LongWritable, DoubleWritable, DoubleWritable, DoubleWritable> {
            private static long startVertexId = -1;
            public SSSPVertex() {
                this.setValue(new DoubleWritable(Double.MAX_VALUE));
            }
            public boolean isStartVertex(
                    ComputeContext<LongWritable, DoubleWritable, DoubleWritable, DoubleWritable> context) {
                if (startVertexId == -1) {
                    String s = context.getConfiguration().get(START_VERTEX);
                    startVertexId = Long.parseLong(s);
                }
                return getId().get() == startVertexId;
            }
    
            @Override
            public void compute(
                    ComputeContext<LongWritable, DoubleWritable, DoubleWritable, DoubleWritable> context,
                    Iterable<DoubleWritable> messages) throws IOException {
                double minDist = isStartVertex(context) ? 0 : Double.MAX_VALUE;
                for (DoubleWritable msg : messages) {
                    if (msg.get() < minDist) {
                        minDist = msg.get();
                    }
                }
                if (minDist < this.getValue().get()) {
                    this.setValue(new DoubleWritable(minDist));
                    if (hasEdges()) {
                        for (Edge<LongWritable, DoubleWritable> e : this.getEdges()) {
                            context.sendMessage(e.getDestVertexId(), new DoubleWritable(minDist
                                    + e.getValue().get()));
                        }
                    }
                } else {
                    voteToHalt();
                }
            }
    
            @Override
            public void cleanup(
                    WorkerContext<LongWritable, DoubleWritable, DoubleWritable, DoubleWritable> context)
                    throws IOException {
                context.write(getId(), getValue());
            }
        }
    
        public static class MinLongCombiner extends
                Combiner<LongWritable, DoubleWritable> {
            @Override
            public void combine(LongWritable vertexId, DoubleWritable combinedMessage,
                                DoubleWritable messageToCombine) {
                if (combinedMessage.get() > messageToCombine.get()) {
                    combinedMessage.set(messageToCombine.get());
                }
            }
        }
    
        public static class SSSPGraphLoader extends
                GraphLoader<LongWritable, DoubleWritable, DoubleWritable, DoubleWritable> {
            @Override
            public void load(
                    LongWritable recordNum,
                    WritableRecord record,
                    MutationContext<LongWritable, DoubleWritable, DoubleWritable, DoubleWritable> context)
                    throws IOException {
                LongWritable sourceVertexID = (LongWritable) record.get(0);
                LongWritable destinationVertexID = (LongWritable) record.get(1);
                DoubleWritable edgeValue = (DoubleWritable) record.get(2);
                Edge<LongWritable, DoubleWritable> edge = new Edge<LongWritable, DoubleWritable>(destinationVertexID, edgeValue);
                context.addEdgeRequest(sourceVertexID, edge);
                Edge<LongWritable, DoubleWritable> edge2 = new
                        Edge<LongWritable, DoubleWritable>(sourceVertexID, edgeValue);
                context.addEdgeRequest(destinationVertexID, edge2);
            }
        }
    
        public static class SSSPLoadingVertexResolver extends
                LoadingVertexResolver<LongWritable, DoubleWritable, DoubleWritable, DoubleWritable> {
    
            @Override
            public Vertex<LongWritable, DoubleWritable, DoubleWritable, DoubleWritable> resolve(
                    LongWritable vertexId,
                    VertexChanges<LongWritable, DoubleWritable, DoubleWritable, DoubleWritable> vertexChanges) throws IOException {
    
                SSSPVertex computeVertex = new SSSPVertex();
                computeVertex.setId(vertexId);
                Set<LongWritable> destinationVertexIDSet = new HashSet<>();
    
                if (hasEdgeAdditions(vertexChanges)) {
                    for (Edge<LongWritable, DoubleWritable> edge : vertexChanges.getAddedEdgeList()) {
                        if (!destinationVertexIDSet.contains(edge.getDestVertexId())) {
                            destinationVertexIDSet.add(edge.getDestVertexId());
                            computeVertex.addEdge(edge.getDestVertexId(), edge.getValue());
                        }
    
                    }
                }
    
                return computeVertex;
            }
    
            protected boolean hasEdgeAdditions(VertexChanges<LongWritable, DoubleWritable, DoubleWritable, DoubleWritable> changes) {
                return changes != null && changes.getAddedEdgeList() != null
                        && !changes.getAddedEdgeList().isEmpty();
            }
        }
    
        public static void main(String[] args) throws IOException {
            if (args.length < 2) {
                System.out.println("Usage: <startnode> <input> <output>");
                System.exit(-1);
            }
            GraphJob job = new GraphJob();
            job.setGraphLoaderClass(SSSPGraphLoader.class);
            job.setLoadingVertexResolver(SSSPLoadingVertexResolver.class);
            job.setVertexClass(SSSPVertex.class);
            job.setCombinerClass(MinLongCombiner.class);
            job.set(START_VERTEX, args[0]);
            job.addInput(TableInfo.builder().tableName(args[1]).build());
            job.addOutput(TableInfo.builder().tableName(args[2]).build());
            long startTime = System.currentTimeMillis();
            job.run();
            System.out.println("Job Finished in "
                    + (System.currentTimeMillis() - startTime) / 1000.0 + " seconds");
        }
    }
                        

    Deskripsi Kode:

    • Baris 15: Definisikan kelas SSSPVertex.

      • Nilai simpul menunjukkan nilai SSSP dari simpul ke simpul sumber yang ditentukan oleh startVertexId.

      • Metode compute() menggunakan rumus iterasi berikut untuk menghitung nilai SSSP dan memperbarui nilai ke nilai simpul: d[v] = min(d[v], d[u] + weight(u, v)).

      • Metode cleanup() menulis nilai SSSP ke tabel hasil.

    • Baris 54: Jika nilai SSSP dari simpul saat ini ke simpul sumber tidak berubah, voteToHalt() dipanggil untuk halt simpul menggunakan kerangka kerja. Saat semua simpul memilih untuk halt, perhitungan berakhir.

    • Baris 61: Definisikan kelas MinLongCombiner. Kelas ini digunakan untuk menggabungkan pesan yang dikirim ke simpul yang sama. Kelas ini membantu meningkatkan kinerja komputasi dan mengurangi penggunaan memori.

    • Baris 72: Definisikan kelas GraphLoader yang digunakan untuk memuat data dari tabel ke graf tak berarah. Metode addEdgeRequest dipanggil untuk menggunakan sisi antara dua simpul sebagai sisi tak berarah. Ini memastikan bahwa data graf yang disimpan dalam tabel saat ini dimuat ke graf tak berarah.

      • Baris 80: Tentukan ID simpul sumber.

      • Baris 81: Tentukan ID simpul tujuan.

      • Baris 82: Tentukan bobot sisi.

      • Baris 83: Tentukan ID simpul tujuan dan bobot sisi untuk menambahkan sisi ke simpul tujuan.

      • Baris 84: Kirim permintaan untuk menambahkan sisi ke simpul sumber.

      • Baris 85 hingga Baris 87: Setiap catatan menunjukkan sisi dua arah. Deskripsinya sama dengan Baris 83 dan Baris 84.

    • Definisikan kelas SSSPLoadingVertexResolver. Kelas ini digunakan untuk menangani konflik yang mungkin terjadi saat memuat data ke graf tak berarah. Misalnya, jika Anda memanggil metode addEdgeRequest dua kali untuk menambahkan sisi, konflik pemuatan terjadi. Dalam hal ini, Anda harus menangani konflik sebelum menghitung nilai SSSP.

    • Baris 101: Definisikan kelas GraphJob dalam fungsi main program utama. Kelas GraphJob digunakan untuk menentukan cara kerja kelas Vertex, GraphLoader, BaseLoadingVertexResolver, dan Combiner. Kelas GraphJob juga menentukan tabel input dan output.

Hasil eksekusi

Kode berikut adalah hasil eksekusi dari contoh kode berdasarkan graf berarah. Untuk informasi lebih lanjut, lihat Tulis Pekerjaan Graph.

vertex    value
1        0
2        2
3        1
4        3
5        2
  • vertex: simpul saat ini.

  • value: nilai SSSP.

Catatan

Anda dapat menggunakan ID simpul sumber, ID simpul tujuan, dan bobot sisi untuk memuat data ke graf tak berarah berdasarkan kode contoh untuk graf tak berarah.

Contoh tutorial

Untuk informasi lebih lanjut tentang kode contoh di atas, lihat Kembangkan Program Graph.