PageRank は Web ページのランキングを計算する典型的なアルゴリズムです。 入力有向グラフ G において、頂点は Web ページを示します。 Web ページ A と B の間にリンクが存在する場合、A と B を結ぶ辺が存在します。

アルゴリズムの基本概念は以下のとおりです。
  • 初期化: 頂点の値は PageRank のランク値 (double 型) を示します。 初期段階では、すべての頂点の値は 1/TotalNumVertices です。
  • 反復計算式: PageRank(i) = 0.15/TotalNumVertices + 0.85 x sum。 sum は、 PageRank(j)/out_degree(j) の合計を示します。j は頂点 i を指すすべての頂点を示します。

このアルゴリズムの基本概念は、MaxCompute Graph プログラムを使用するソリューションに適用可能なことを示しています。 各頂点 j は PageRank の値を保持します。 PageRank(j)/out_degree(j) は、反復の度に、隣接する頂点 (投票用) に送信されます。 次の反復では、各頂点は反復計算式を使って PageRank 値を再計算します。

サンプルコード

import java.io.IOException;

import org.apache.log4j.Logger;

import com.aliyun.odps.io.WritableRecord;
import com.aliyun.odps.graph.ComputeContext;
import com.aliyun.odps.graph.GraphJob;
import com.aliyun.odps.graph.GraphLoader;
import com.aliyun.odps.graph.MutationContext;
import com.aliyun.odps.graph.Vertex;
import com.aliyun.odps.graph.WorkerContext;
import com.aliyun.odps.io.DoubleWritable;
import com.aliyun.odps.io.LongWritable;
import com.aliyun.odps.io.NullWritable;
import com.aliyun.odps.data.TableInfo;
import com.aliyun.odps.io.Text;
import com.aliyun.odps.io.Writable;

public class PageRank {

  private final static Logger LOG = Logger.getLogger(PageRank.class);

  public static class PageRankVertex extends
      Vertex<Text, DoubleWritable, NullWritable, DoubleWritable> {

    @Override
    public void compute(
        ComputeContext<Text, DoubleWritable, NullWritable, DoubleWritable> context,
        Iterable<DoubleWritable> messages) throws IOException {
      if (context.getSuperstep() == 0) {
        setValue(new DoubleWritable(1.0 / context.getTotalNumVertices()));
      } else if (context.getSuperstep() >= 1) {
        double sum = 0;
        for (DoubleWritable msg : messages) {
          sum += msg.get();
        
        DoubleWritable vertexValue = new DoubleWritable(
            (0.15f / context.getTotalNumVertices()) + 0.85f * sum);
        setValue(vertexValue);
      
      if (hasEdges()) {
        context.sendMessageToNeighbors(this, new DoubleWritable(getValue()
            .get() / getEdges().size()));
      
    

    @Override
    public void cleanup(
        WorkerContext<Text, DoubleWritable, NullWritable, DoubleWritable> context)
        throws IOException {
      context.write(getId(), getValue());
    
  

  public static class PageRankVertexReader extends
      GraphLoader<Text, DoubleWritable, NullWritable, DoubleWritable> {

    @Override
    public void load(
        LongWritable recordNum,
        WritableRecord record,
        MutationContext<Text, DoubleWritable, NullWritable, DoubleWritable> context)
        throws IOException {
      PageRankVertex vertex = new PageRankVertex();
      vertex.setValue(new DoubleWritable(0));
      vertex.setId((Text) record.get(0));
      System.out.println(record.get(0));

      for (int i = 1; i < record.size(); i++) {
        Writable edge = record.get(i);
        System.out.println(edge.toString());
        if (!( edge.equals(NullWritable.get()))) {
          vertex.addEdge(new Text(edge.toString()), NullWritable.get());
        
      
      LOG.info("vertex edgs size: "
          + (vertex.hasEdges() ? vertex.getEdges().size() : 0));
      context.addVertexRequest(vertex);
    

  

  private static void printUsage() {
    System.out.println("Usage: <in> <out> [Max iterations (default 30)]"); 
    System.exit(-1);
  

  public static void main(String[] args) throws IOException {
    if (args.length < 2)
      printUsage();

    GraphJob job = new GraphJob();

    job.setGraphLoaderClass(PageRankVertexReader.class);
    job.setVertexClass(PageRankVertex.class);
    job.addInput(TableInfo.builder().tableName(args[0]).build());
    job.addOutput(TableInfo.builder().tableName(args[1]).build());

    // default max iteration is 30
    job.setMaxIteration(30);
    if (args.length >= 3)
      job.setMaxIteration(Integer.parseInt(args[2]));

    long startTime = System.currentTimeMillis();
    job.run();
    System.out.println("Job Finished in "
        + (System.currentTimeMillis() - startTime) / 1000.0 + " seconds");
  

PageRank のソースコードの詳細を以下に示します。
  • 23 行目: PageRankVertex を定義します。
    • 頂点の値は、現在の頂点 (Web ページ) の PageRank 値を示します。
    • compute() メソッド は、反復計算式 PageRank(i) = 0.15/TotalNumVertices + 0.85 x sum を使用して頂点の値を更新します。
    • cleanup() メソッドは、頂点と PageRank 値を結果テーブルに書き込みます。
  • 55 行目: PageRankVertexReader クラスを定義し、グラフをロードし、テーブルの各レコードを頂点に分解します。 レコードの最初の列は開始頂点で、他の列は到達頂点です。
  • 88 行目: メインプログラム (main 関数) を実行し、GraphJob を定義し、Vertex/GraphLoader、反復最大数 (デフォルトは 30) および 入出力テーブルの実装を指定します。