PageRankは、webページをランク付けするために使用されるアルゴリズムです。 PageRankの入力は有向グラフです。 各頂点はウェブページを表す。 各エッジは、2つのウェブページ間のリンクを表す。 ウェブページが接続されていない場合、頂点間にエッジは存在しない。
制御ポリシー機能の動作
PageRankアルゴリズムの仕組み:
初期化: 頂点値はPageRankのランク値を示します。 ランク値はDOUBLEタイプです。 初期化中、各頂点の値は
1/TotalNumVertices
です。イテレーション式:
PageRank(i) = 0.15/TotalNumVertices + 0.85 × 合計
の値sumは、中心iを指す各頂点の
PageRank(j)/out_degree(j)
を合計するために使用されます。 式中のjは、中心iを指す各頂点を指す。
PageRankアルゴリズムは、MaxCompute Graphプログラムに適しています。 各頂点jは、そのPageRank値を維持し、各反復において、PageRank(j)/out_degree(j)
をその隣接する頂点 (投票のため) に送信する。 次の反復では、各頂点は反復式を使用してPageRank値を再計算します。
前提条件
Graphジョブを記述して、必要なテスト環境を設定しました。
テストの準備
このテストは、MaxComputeクライアントからクラスターにジョブを送信することによって実行されます。 最初にローカルテストを実行することもできます。 詳細については、「ローカルデバッグ」をご参照ください。
graph-examples. JARという名前のテストプログラム用のjarファイルを準備し、MaxComputeクライアントのbinディレクトリ内のdata\resourcesフォルダーに保存します。
PageRankのテストテーブルとリソースを準備します。
テストテーブルの作成
CREATE TABLE pagerank_in(vertex STRING, des_1 STRING, des_2 STRING); CREATE TABLE pagerank_out(vertex_id STRING, vertex_value DOUBLE);
テストソースの追加
-- Use the -f override option to ignore files on the first addition. add jar data\resources\graph-examples.jar -f;
Tunnelコマンドを実行して、data.txtファイルをMaxComputeクライアントbinディレクトリから
pagerank_in
テーブルにインポートします。tunnel upload data.txt pagerank_in;
data.txt
ファイルのデータを次に示します。1,2,4 2,1,3 4,2,3 3,1,2
テスト手順
MaxComputeクライアントでPageRankテストを実行します。
jar -resources graph-examples.jar -classpath data\resources\graph-examples.jar
com.aliyun.odps.graph.PageRank pagerank_in pagerank_out
想定される結果
ジョブが正常に完了した後、出力テーブルpagerank_out
の内容は次のとおりです。
+------------+--------------+
| vertex_id | vertex_value |
+------------+--------------+
| 1 | 0.2781238395149928 |
| 2 | 0.3245614688676814 |
| 3 | 0.24161225195637787 |
| 4 | 0.155702636559485 |
+------------+--------------+
サンプルコード
import java.io.IOException;
import org.apache.log4j.Logger;
import com.aliyun.odps.io.WritableRecord;
import com.aliyun.odps.graph.ComputeContext;
import com.aliyun.odps.graph.GraphJob;
import com.aliyun.odps.graph.GraphLoader;
import com.aliyun.odps.graph.MutationContext;
import com.aliyun.odps.graph.Vertex;
import com.aliyun.odps.graph.WorkerContext;
import com.aliyun.odps.io.DoubleWritable;
import com.aliyun.odps.io.LongWritable;
import com.aliyun.odps.io.NullWritable;
import com.aliyun.odps.data.TableInfo;
import com.aliyun.odps.io.Text;
import com.aliyun.odps.io.Writable;
public class PageRank {
private final static Logger LOG = Logger.getLogger(PageRank.class);
public static class PageRankVertex extends
Vertex<Text, DoubleWritable, NullWritable, DoubleWritable> {
@Override
public void compute(
ComputeContext<Text, DoubleWritable, NullWritable, DoubleWritable> context,
Iterable<DoubleWritable> messages) throws IOException {
if (context.getSuperstep() == 0) {
setValue(new DoubleWritable(1.0 / context.getTotalNumVertices()));
} else if (context.getSuperstep() >= 1) {
double sum = 0;
for (DoubleWritable msg : messages) {
sum += msg.get();
}
DoubleWritable vertexValue = new DoubleWritable(
(0.15f / context.getTotalNumVertices()) + 0.85f * sum);
setValue(vertexValue);
}
if (hasEdges()) {
context.sendMessageToNeighbors(this, new DoubleWritable(getValue()
.get() / getEdges().size()));
}
}
@Override
public void cleanup(
WorkerContext<Text, DoubleWritable, NullWritable, DoubleWritable> context)
throws IOException {
context.write(getId(), getValue());
}
}
public static class PageRankVertexReader extends
GraphLoader<Text, DoubleWritable, NullWritable, DoubleWritable> {
@Override
public void load(
LongWritable recordNum,
WritableRecord record,
MutationContext<Text, DoubleWritable, NullWritable, DoubleWritable> context)
throws IOException {
PageRankVertex vertex = new PageRankVertex();
vertex.setValue(new DoubleWritable(0));
vertex.setId((Text) record.get(0));
System.out.println(record.get(0));
for (int i = 1; i < record.size(); i++) {
Writable edge = record.get(i);
System.out.println(edge.toString());
if (!( edge.equals(NullWritable.get()))) {
vertex.addEdge(new Text(edge.toString()), NullWritable.get());
}
}
LOG.info("vertex edgs size: "
+ (vertex.hasEdges() ? vertex.getEdges().size() : 0));
context.addVertexRequest(vertex);
}
}
private static void printUsage() {
System.out.println("Usage: <in> <out> [Max iterations (default 30)]");
System.exit(-1);
}
public static void main(String[] args) throws IOException {
if (args.length < 2)
printUsage();
GraphJob job = new GraphJob();
job.setGraphLoaderClass(PageRankVertexReader.class);
job.setVertexClass(PageRankVertex.class);
job.addInput(TableInfo.builder().tableName(args[0]).build());
job.addOutput(TableInfo.builder().tableName(args[1]).build());
// default max iteration is 30
job.setMaxIteration(30);
if (args.length >= 3)
job.setMaxIteration(Integer.parseInt(args[2]));
long startTime = System.currentTimeMillis();
job.run();
System.out.println("Job Finished in "
+ (System.currentTimeMillis() - startTime) / 1000.0 + " seconds");
}
}
説明:
23行目: PageRankVertexクラスを定義します。
頂点の値は、現在の頂点 (Web ページ) の PageRank 値を示します。
compute() メソッドは、次の反復式を使用して頂点値を更新します。
PageRank(i) = 0.15/TotalNumVertices + 0.85 × 合計値
cleanup() メソッドは、頂点と PageRank 値を結果テーブルに書き込みます。
行55: PageRankVertexReaderクラスを定義し、グラフをロードし、テーブル内の各レコードを頂点に解決します。 テーブルの最初の列はソース頂点で、他の列は宛先頂点です。
行88: main関数を含め、GraphJobクラスを定義し、反復の最大数、入出力テーブル、およびVertexとGraphLoaderの実装を指定します。 デフォルトでは、最大30回の反復を実行できます。