Connected Components

Last Updated: Jun 22, 2016

If the path exists between two vertices, we consider these two vertices to be connected. If arbitrary two vertices in undirected graph G are connected, then G is called connected graph, otherwise it is called unconnected graph. The subs connected graph with large number of vertices is called connected component. This algorithm is to compute the connected component membership of each vertex, output each vertex which’s value containing the smallest vertex id in the connected component containing that vertex finally and propagate the smallest vertex id along the edges to all vertices of a connected component.

Source Code

  1. import java.io.IOException;
  2. import com.aliyun.odps.data.TableInfo;
  3. import com.aliyun.odps.graph.ComputeContext;
  4. import com.aliyun.odps.graph.GraphJob;
  5. import com.aliyun.odps.graph.GraphLoader;
  6. import com.aliyun.odps.graph.MutationContext;
  7. import com.aliyun.odps.graph.Vertex;
  8. import com.aliyun.odps.graph.WorkerContext;
  9. import com.aliyun.odps.graph.examples.SSSP.MinLongCombiner;
  10. import com.aliyun.odps.io.LongWritable;
  11. import com.aliyun.odps.io.NullWritable;
  12. import com.aliyun.odps.io.WritableRecord;
  13. /**
  14. * Compute the connected component membership of each vertex and output
  15. * each vertex which's value containing the smallest id in the connected
  16. * component containing that vertex.
  17. *
  18. * Algorithm: propagate the smallest vertex id along the edges to all
  19. * vertices of a connected component.
  20. *
  21. */
  22. public class ConnectedComponents {
  23. public static class CCVertex extends
  24. Vertex<LongWritable, LongWritable, NullWritable, LongWritable> {
  25. @Override
  26. public void compute(
  27. ComputeContext<LongWritable, LongWritable, NullWritable, LongWritable> context,
  28. Iterable<LongWritable> msgs) throws IOException {
  29. if (context.getSuperstep() == 0L) {
  30. this.setValue(getId());
  31. context.sendMessageToNeighbors(this, getValue());
  32. return;
  33. }
  34. long minID = Long.MAX_VALUE;
  35. for (LongWritable id : msgs) {
  36. if (id.get() < minID) {
  37. minID = id.get();
  38. }
  39. }
  40. if (minID < this.getValue().get()) {
  41. this.setValue(new LongWritable(minID));
  42. context.sendMessageToNeighbors(this, getValue());
  43. } else {
  44. this.voteToHalt();
  45. }
  46. }
  47. /**
  48. * Output Table Description:
  49. * +-----------------+----------------------------------------+
  50. * | Field | Type | Comment |
  51. * +-----------------+----------------------------------------+
  52. * | v | bigint | vertex id |
  53. * | minID | bigint | smallest id in the connected component |
  54. * +-----------------+----------------------------------------+
  55. */
  56. @Override
  57. public void cleanup(
  58. WorkerContext<LongWritable, LongWritable, NullWritable, LongWritable> context)
  59. throws IOException {
  60. context.write(getId(), getValue());
  61. }
  62. }
  63. /**
  64. * Input Table Description:
  65. * +-----------------+----------------------------------------------------+
  66. * | Field | Type | Comment |
  67. * +-----------------+----------------------------------------------------+
  68. * | v | bigint | vertex id |
  69. * | es | string | comma separated target vertex id of outgoing edges |
  70. * +-----------------+----------------------------------------------------+
  71. *
  72. * Example:
  73. * For graph:
  74. * 1 ----- 2
  75. * | |
  76. * 3 ----- 4
  77. * Input table:
  78. * +-----------+
  79. * | v | es |
  80. * +-----------+
  81. * | 1 | 2,3 |
  82. * | 2 | 1,4 |
  83. * | 3 | 1,4 |
  84. * | 4 | 2,3 |
  85. * +-----------+
  86. */
  87. public static class CCVertexReader extends
  88. GraphLoader<LongWritable, LongWritable, NullWritable, LongWritable> {
  89. @Override
  90. public void load(
  91. LongWritable recordNum,
  92. WritableRecord record,
  93. MutationContext<LongWritable, LongWritable, NullWritable, LongWritable> context)
  94. throws IOException {
  95. CCVertex vertex = new CCVertex();
  96. vertex.setId((LongWritable) record.get(0));
  97. String[] edges = record.get(1).toString().split(",");
  98. for (int i = 0; i < edges.length; i++) {
  99. long destID = Long.parseLong(edges[i]);
  100. vertex.addEdge(new LongWritable(destID), NullWritable.get());
  101. }
  102. context.addVertexRequest(vertex);
  103. }
  104. }
  105. public static void main(String[] args) throws IOException {
  106. if (args.length < 2) {
  107. System.out.println("Usage: <input> <output>");
  108. System.exit(-1);
  109. }
  110. GraphJob job = new GraphJob();
  111. job.setGraphLoaderClass(CCVertexReader.class);
  112. job.setVertexClass(CCVertex.class);
  113. job.setCombinerClass(MinLongCombiner.class);
  114. job.addInput(TableInfo.builder().tableName(args[0]).build());
  115. job.addOutput(TableInfo.builder().tableName(args[1]).build());
  116. long startTime = System.currentTimeMillis();
  117. job.run();
  118. System.out.println("Job Finished in "
  119. + (System.currentTimeMillis() - startTime) / 1000.0 + " seconds");
  120. }
  121. }
Thank you! We've received your feedback.