全部产品
Search
文档中心

MaxCompute:PageRank

更新时间:Jul 02, 2025

PageRank adalah algoritma yang digunakan untuk memberi peringkat halaman web. Input dari PageRank berupa graf berarah. Setiap simpul mewakili halaman web, dan setiap sisi mewakili tautan antara dua halaman web. Jika halaman web tidak terhubung, tidak ada sisi di antara simpul-simpul tersebut.

Cara kerjanya

Berikut cara kerja algoritma PageRank:

  • Inisialisasi: Nilai simpul menunjukkan nilai peringkat PageRank. Nilai peringkat ini bertipe DOUBLE. Selama inisialisasi, nilai setiap simpul adalah 1/TotalNumVertices.

  • Rumus iterasi:

    PageRank(i) = 0.15/TotalNumVertices + 0.85 × Nilai dari sum

    Nilai sum dihitung dengan menjumlahkan PageRank(j)/out_degree(j) dari setiap simpul yang menunjuk ke pusat i. Variabel j dalam rumus mengacu pada setiap simpul yang menunjuk ke pusat i.

Algoritma PageRank cocok untuk program MaxCompute Graph. Setiap simpul mempertahankan nilai PageRank-nya dan mengirimkan PageRank(j)/out_degree(j) ke simpul-simpul yang berdekatan (untuk pemungutan suara) dalam setiap iterasi. Pada iterasi berikutnya, setiap simpul menghitung ulang nilai PageRank menggunakan rumus iterasi.

Prasyarat

Anda telah menyiapkan lingkungan pengujian yang diperlukan dengan menulis pekerjaan Graph.

Persiapan pengujian

Pengujian ini dilakukan dengan mengirimkan pekerjaan ke kluster dari klien MaxCompute. Anda juga dapat melakukan pengujian lokal terlebih dahulu. Untuk lebih jelasnya, lihat Debugging Lokal.

  1. Siapkan file JAR untuk program pengujian, bernama graph-examples.jar, dan simpan di folder data\resources dalam direktori bin klien MaxCompute.

  2. Siapkan tabel dan sumber daya pengujian untuk PageRank.

    1. Buat tabel pengujian.

      CREATE TABLE pagerank_in(vertex STRING, des_1 STRING, des_2 STRING);
      CREATE TABLE pagerank_out(vertex_id STRING, vertex_value DOUBLE);
    2. Tambahkan sumber pengujian.

      -- Gunakan opsi -f override untuk mengabaikan file pada penambahan pertama.
      add jar data\resources\graph-examples.jar -f;
  3. Jalankan perintah Tunnel untuk mengimpor file data.txt dari direktori bin klien MaxCompute ke tabel pagerank_in.

    tunnel upload data.txt pagerank_in;

    Berikut adalah data dari file data.txt:

    1,2,4
    2,1,3
    4,2,3
    3,1,2

Prosedur pengujian

Jalankan pengujian PageRank di klien MaxCompute.

jar -resources graph-examples.jar -classpath data\resources\graph-examples.jar
com.aliyun.odps.graph.PageRank pagerank_in pagerank_out

Hasil yang diharapkan

Setelah pekerjaan berhasil diselesaikan, isi dari tabel keluaran pagerank_out adalah sebagai berikut.

+------------+--------------+
| vertex_id  | vertex_value |
+------------+--------------+
| 1          | 0.2781238395149928 |
| 2          | 0.3245614688676814 |
| 3          | 0.24161225195637787 |
| 4          | 0.155702636559485 |
+------------+--------------+

Kode contoh

import java.io.IOException;

import org.apache.log4j.Logger;

import com.aliyun.odps.io.WritableRecord;
import com.aliyun.odps.graph.ComputeContext;
import com.aliyun.odps.graph.GraphJob;
import com.aliyun.odps.graph.GraphLoader;
import com.aliyun.odps.graph.MutationContext;
import com.aliyun.odps.graph.Vertex;
import com.aliyun.odps.graph.WorkerContext;
import com.aliyun.odps.io.DoubleWritable;
import com.aliyun.odps.io.LongWritable;
import com.aliyun.odps.io.NullWritable;
import com.aliyun.odps.data.TableInfo;
import com.aliyun.odps.io.Text;
import com.aliyun.odps.io.Writable;

public class PageRank {

  private final static Logger LOG = Logger.getLogger(PageRank.class);

  public static class PageRankVertex extends
      Vertex<Text, DoubleWritable, NullWritable, DoubleWritable> {

    @Override
    public void compute(
        ComputeContext<Text, DoubleWritable, NullWritable, DoubleWritable> context,
        Iterable<DoubleWritable> messages) throws IOException {
      if (context.getSuperstep() == 0) {
        setValue(new DoubleWritable(1.0 / context.getTotalNumVertices()));
      } else if (context.getSuperstep() >= 1) {
        double sum = 0;
        for (DoubleWritable msg : messages) {
          sum += msg.get();
        }
        DoubleWritable vertexValue = new DoubleWritable(
            (0.15f / context.getTotalNumVertices()) + 0.85f * sum);
        setValue(vertexValue);
      }
      if (hasEdges()) {
        context.sendMessageToNeighbors(this, new DoubleWritable(getValue()
            .get() / getEdges().size()));
      }
    }

    @Override
    public void cleanup(
        WorkerContext<Text, DoubleWritable, NullWritable, DoubleWritable> context)
        throws IOException {
      context.write(getId(), getValue());
    }
  }

  public static class PageRankVertexReader extends
      GraphLoader<Text, DoubleWritable, NullWritable, DoubleWritable> {

    @Override
    public void load(
        LongWritable recordNum,
        WritableRecord record,
        MutationContext<Text, DoubleWritable, NullWritable, DoubleWritable> context)
        throws IOException {
      PageRankVertex vertex = new PageRankVertex();
      vertex.setValue(new DoubleWritable(0));
      vertex.setId((Text) record.get(0));
      System.out.println(record.get(0));

      for (int i = 1; i < record.size(); i++) {
        Writable edge = record.get(i);
        System.out.println(edge.toString());
        if (!( edge.equals(NullWritable.get()))) {
          vertex.addEdge(new Text(edge.toString()), NullWritable.get());
        }
      }
      LOG.info("vertex edgs size: "
          + (vertex.hasEdges() ? vertex.getEdges().size() : 0));
      context.addVertexRequest(vertex);
    }

  }

  private static void printUsage() {
    System.out.println("Usage: <in> <out> [Max iterations (default 30)]");
    System.exit(-1);
  }

  public static void main(String[] args) throws IOException {
    if (args.length < 2)
      printUsage();

    GraphJob job = new GraphJob();

    job.setGraphLoaderClass(PageRankVertexReader.class);
    job.setVertexClass(PageRankVertex.class);
    job.addInput(TableInfo.builder().tableName(args[0]).build());
    job.addOutput(TableInfo.builder().tableName(args[1]).build());

    // default max iteration is 30
    job.setMaxIteration(30);
    if (args.length >= 3)
      job.setMaxIteration(Integer.parseInt(args[2]));

    long startTime = System.currentTimeMillis();
    job.run();
    System.out.println("Job Finished in "
        + (System.currentTimeMillis() - startTime) / 1000.0 + " seconds");
  }
}
            

Deskripsi:

  • Baris 23: Tentukan kelas PageRankVertex.

    • Nilai simpul menunjukkan nilai PageRank saat ini dari simpul (halaman web).

    • Metode compute() menggunakan rumus iterasi berikut untuk memperbarui nilai simpul: PageRank(i) = 0.15/TotalNumVertices + 0.85 × Nilai dari sum.

    • Metode cleanup() menulis simpul dan nilai PageRank-nya ke tabel hasil.

  • Baris 55: Tentukan kelas PageRankVertexReader, muat graf, dan pecahkan setiap rekaman dalam tabel menjadi sebuah simpul. Kolom pertama tabel adalah simpul sumber, dan kolom lainnya adalah simpul tujuan.

  • Baris 88: Sertakan fungsi utama, tentukan kelas GraphJob, dan tentukan jumlah maksimum iterasi, tabel input dan output, serta implementasi Vertex dan GraphLoader. Secara default, maksimal 30 iterasi dapat dilakukan.