PageRank scores vertices in a directed graph by importance. Each vertex represents a web page; each directed edge represents a link from one page to another. A page with more high-scoring pages pointing to it receives a higher score — the same logic behind why a Twitter account followed by influential users ranks higher than one followed by bots.
How it works
At the start of each run, every vertex gets the same initial score: 1 / TotalNumVertices.
Each superstep, every vertex sends a vote to its neighbors equal to its current score divided by its out-degree:
vote sent to neighbor = PageRank(j) / out_degree(j)At the end of each superstep, every vertex recalculates its score by summing all incoming votes and applying a damping factor:
PageRank(i) = 0.15 / TotalNumVertices + 0.85 × sum of incoming votesThe damping factor (0.85) models the probability that a random web surfer follows a link rather than jumping to a random page. The complement (0.15) ensures every vertex has a nonzero baseline score even with no incoming edges.
The algorithm runs for up to 30 iterations by default (configurable).
Prerequisites
Before you begin, ensure that you have:
Set up the MaxCompute Graph testing environment by writing a Graph job
The MaxCompute client installed
Run the PageRank example
Step 1: Prepare the JAR file
Place graph-examples.jar in the data\resources folder inside the MaxCompute client's bin directory.
Step 2: Create the input and output tables
Run the following SQL statements in the MaxCompute client:
CREATE TABLE pagerank_in(vertex STRING, des_1 STRING, des_2 STRING);
CREATE TABLE pagerank_out(vertex_id STRING, vertex_value DOUBLE);The pagerank_in table uses the first column as source vertices and the remaining columns as destination vertices.
Step 3: Register the JAR file
-- Use -f to overwrite if the resource already exists.
add jar data\resources\graph-examples.jar -f;Step 4: Upload the test data
From the MaxCompute client's bin directory, upload data.txt into the pagerank_in table:
tunnel upload data.txt pagerank_in;The data.txt file contains the following graph edges:
1,2,4
2,1,3
4,2,3
3,1,2Each row is a source vertex followed by its destination vertices. For example, vertex 1 has edges pointing to vertices 2 and 4.
Step 5: Run the job
jar -resources graph-examples.jar -classpath data\resources\graph-examples.jar
com.aliyun.odps.graph.PageRank pagerank_in pagerank_outJob parameters
| Parameter | Description | Default |
|---|---|---|
pagerank_in | Input table name | — |
pagerank_out | Output table name | — |
| Max iterations (optional third argument) | Maximum number of supersteps before the job stops | 30 |
To override the maximum number of iterations, pass it as a third argument:
jar -resources graph-examples.jar -classpath data\resources\graph-examples.jar
com.aliyun.odps.graph.PageRank pagerank_in pagerank_out 50Expected results
After the job completes, query pagerank_out:
+------------+--------------------+
| vertex_id | vertex_value |
+------------+--------------------+
| 1 | 0.2781238395149928 |
| 2 | 0.3245614688676814 |
| 3 | 0.24161225195637787|
| 4 | 0.155702636559485 |
+------------+--------------------+Vertex 2 scores highest because both vertex 1 and vertex 4 point to it, and vertex 1 itself has incoming links from vertices 2 and 3. Vertex 4 scores lowest — only vertex 1 points to it, and no high-scoring vertex gives it a strong vote.
For local debugging before submitting to the cluster, see Local debugging.
Sample code
The full Java implementation is shown below. The key classes are:
`PageRankVertex` — defines the per-vertex compute logic
`PageRankVertexReader` — loads the input table and builds the graph
`main` — configures and runs the
GraphJob
import java.io.IOException;
import org.apache.log4j.Logger;
import com.aliyun.odps.io.WritableRecord;
import com.aliyun.odps.graph.ComputeContext;
import com.aliyun.odps.graph.GraphJob;
import com.aliyun.odps.graph.GraphLoader;
import com.aliyun.odps.graph.MutationContext;
import com.aliyun.odps.graph.Vertex;
import com.aliyun.odps.graph.WorkerContext;
import com.aliyun.odps.io.DoubleWritable;
import com.aliyun.odps.io.LongWritable;
import com.aliyun.odps.io.NullWritable;
import com.aliyun.odps.data.TableInfo;
import com.aliyun.odps.io.Text;
import com.aliyun.odps.io.Writable;
public class PageRank {
private final static Logger LOG = Logger.getLogger(PageRank.class);
public static class PageRankVertex extends
Vertex<Text, DoubleWritable, NullWritable, DoubleWritable> {
@Override
public void compute(
ComputeContext<Text, DoubleWritable, NullWritable, DoubleWritable> context,
Iterable<DoubleWritable> messages) throws IOException {
if (context.getSuperstep() == 0) {
// Superstep 0: initialize every vertex to 1 / TotalNumVertices
setValue(new DoubleWritable(1.0 / context.getTotalNumVertices()));
} else if (context.getSuperstep() >= 1) {
// Superstep >= 1: sum incoming votes and apply the damping formula
double sum = 0;
for (DoubleWritable msg : messages) {
sum += msg.get();
}
DoubleWritable vertexValue = new DoubleWritable(
(0.15f / context.getTotalNumVertices()) + 0.85f * sum);
setValue(vertexValue);
}
// Send this vertex's share of its score to each neighbor
if (hasEdges()) {
context.sendMessageToNeighbors(this, new DoubleWritable(getValue()
.get() / getEdges().size()));
}
}
@Override
public void cleanup(
WorkerContext<Text, DoubleWritable, NullWritable, DoubleWritable> context)
throws IOException {
// Write the final vertex ID and PageRank score to the output table
context.write(getId(), getValue());
}
}
public static class PageRankVertexReader extends
GraphLoader<Text, DoubleWritable, NullWritable, DoubleWritable> {
@Override
public void load(
LongWritable recordNum,
WritableRecord record,
MutationContext<Text, DoubleWritable, NullWritable, DoubleWritable> context)
throws IOException {
// Each table row becomes one vertex.
// Column 0 is the source vertex; columns 1+ are destination vertices (edges).
PageRankVertex vertex = new PageRankVertex();
vertex.setValue(new DoubleWritable(0));
vertex.setId((Text) record.get(0));
System.out.println(record.get(0));
for (int i = 1; i < record.size(); i++) {
Writable edge = record.get(i);
System.out.println(edge.toString());
if (!( edge.equals(NullWritable.get()))) {
vertex.addEdge(new Text(edge.toString()), NullWritable.get());
}
}
LOG.info("vertex edgs size: "
+ (vertex.hasEdges() ? vertex.getEdges().size() : 0));
context.addVertexRequest(vertex);
}
}
private static void printUsage() {
System.out.println("Usage: <in> <out> [Max iterations (default 30)]");
System.exit(-1);
}
public static void main(String[] args) throws IOException {
if (args.length < 2)
printUsage();
GraphJob job = new GraphJob();
job.setGraphLoaderClass(PageRankVertexReader.class);
job.setVertexClass(PageRankVertex.class);
job.addInput(TableInfo.builder().tableName(args[0]).build());
job.addOutput(TableInfo.builder().tableName(args[1]).build());
// Default max iteration is 30; override with a third argument.
job.setMaxIteration(30);
if (args.length >= 3)
job.setMaxIteration(Integer.parseInt(args[2]));
long startTime = System.currentTimeMillis();
job.run();
System.out.println("Job Finished in "
+ (System.currentTimeMillis() - startTime) / 1000.0 + " seconds");
}
}What's next
Local debugging — test Graph jobs locally before submitting to the cluster
Write a Graph job — set up the full MaxCompute Graph development environment