Page Rank

Last Updated: Jun 22, 2016

PageRank algorithm is a classical algorithm for computing Webpage rank. The input of this algorithm is a directed graph G and the vertex indicates webpage. If the link from webpage A to webpage B exists, then the edge from A to B also exists. The basic principle of the algorithm:

  • Initialization: Vertex value indicates the rank value of PageRank (double type). At initial time, values of all vertices are ‘1/TotalNumVertices’.
  • Iterative formula: PageRank (i)=0.15/TotalNumVertices+0.85*sum, in which, ‘sum’ indicates the accumulated value of PageRank (j)/out_degree (j) of all vertices (set as j) which point to the vertex i.

Seen from the basic principle of algorithm, this algorithm is very suitable for the use of ODPS Graph: each vertex (j) maintains its PageRank value. In each round of iteration, PageRank (j)/out_degree (j) will be sent to its neighbor vertex (vote for it). In next iteration, PageRank value of each vertax will be re-calculated according to iterative formula.

Source Code

  1. import java.io.IOException;
  2. import org.apache.log4j.Logger;
  3. import com.aliyun.odps.io.WritableRecord;
  4. import com.aliyun.odps.graph.ComputeContext;
  5. import com.aliyun.odps.graph.GraphJob;
  6. import com.aliyun.odps.graph.GraphLoader;
  7. import com.aliyun.odps.graph.MutationContext;
  8. import com.aliyun.odps.graph.Vertex;
  9. import com.aliyun.odps.graph.WorkerContext;
  10. import com.aliyun.odps.io.DoubleWritable;
  11. import com.aliyun.odps.io.LongWritable;
  12. import com.aliyun.odps.io.NullWritable;
  13. import com.aliyun.odps.data.TableInfo;
  14. import com.aliyun.odps.io.Text;
  15. import com.aliyun.odps.io.Writable;
  16. public class PageRank {
  17. private final static Logger LOG = Logger.getLogger(PageRank.class);
  18. public static class PageRankVertex extends
  19. Vertex<Text, DoubleWritable, NullWritable, DoubleWritable> {
  20. @Override
  21. public void compute(
  22. ComputeContext<Text, DoubleWritable, NullWritable, DoubleWritable> context,
  23. Iterable<DoubleWritable> messages) throws IOException {
  24. if (context.getSuperstep() == 0) {
  25. setValue(new DoubleWritable(1.0 / context.getTotalNumVertices()));
  26. } else if (context.getSuperstep() >= 1) {
  27. double sum = 0;
  28. for (DoubleWritable msg : messages) {
  29. sum += msg.get();
  30. }
  31. DoubleWritable vertexValue = new DoubleWritable(
  32. (0.15f / context.getTotalNumVertices()) + 0.85f * sum);
  33. setValue(vertexValue);
  34. }
  35. if (hasEdges()) {
  36. context.sendMessageToNeighbors(this, new DoubleWritable(getValue()
  37. .get() / getEdges().size()));
  38. }
  39. }
  40. @Override
  41. public void cleanup(
  42. WorkerContext<Text, DoubleWritable, NullWritable, DoubleWritable> context)
  43. throws IOException {
  44. context.write(getId(), getValue());
  45. }
  46. }
  47. public static class PageRankVertexReader extends
  48. GraphLoader<Text, DoubleWritable, NullWritable, DoubleWritable> {
  49. @Override
  50. public void load(
  51. LongWritable recordNum,
  52. WritableRecord record,
  53. MutationContext<Text, DoubleWritable, NullWritable, DoubleWritable> context)
  54. throws IOException {
  55. PageRankVertex vertex = new PageRankVertex();
  56. vertex.setValue(new DoubleWritable(0));
  57. vertex.setId((Text) record.get(0));
  58. System.out.println(record.get(0));
  59. for (int i = 1; i < record.size(); i++) {
  60. Writable edge = record.get(i);
  61. System.out.println(edge.toString());
  62. if (!(edge.equals(NullWritable.get()))) {
  63. vertex.addEdge(new Text(edge.toString()), NullWritable.get());
  64. }
  65. }
  66. LOG.info("vertex edgs size: "
  67. + (vertex.hasEdges() ? vertex.getEdges().size() : 0));
  68. context.addVertexRequest(vertex);
  69. }
  70. }
  71. private static void printUsage() {
  72. System.out.println("Usage: <in> <out> [Max iterations (default 30)]");
  73. System.exit(-1);
  74. }
  75. public static void main(String[] args) throws IOException {
  76. if (args.length < 2)
  77. printUsage();
  78. GraphJob job = new GraphJob();
  79. job.setGraphLoaderClass(PageRankVertexReader.class);
  80. job.setVertexClass(PageRankVertex.class);
  81. job.addInput(TableInfo.builder().tableName(args[0]).build());
  82. job.addOutput(TableInfo.builder().tableName(args[1]).build());
  83. // default max iteration is 30
  84. job.setMaxIteration(30);
  85. if (args.length >= 3)
  86. job.setMaxIteration(Integer.parseInt(args[2]));
  87. long startTime = System.currentTimeMillis();
  88. job.run();
  89. System.out.println("Job Finished in "
  90. + (System.currentTimeMillis() - startTime) / 1000.0 + " seconds");
  91. }
  92. }

Code Description

The source codes of PageRank include the following parts:

  • Line 57: define the class PageRankVertexReader to load graph and parse each record to be a vertex. The first column of record is the starting vertax and other columns are end vertices.
  • Line 25: define PageRankVertex, in which:
    • The vertex value indicates the current PageRank value of this vertex (webpage);
    • The method compute() uses iterative formula ‘PageRank(i)=0.15/TotalNumVertices+0.85*sum’ to update the vertex value;
    • The method ‘cleanup ()’ writes the vertex value and PageRank value into result table.
  • Line 90: main program (mainClass). Define GraphJob, to specify Vertex/GraphLoader achievement, maximum iteration occurrence (default is 30) and to specify input and output tables.
Thank you! We've received your feedback.