Connected components

Last Updated: Oct 31, 2017

Two vertices are connected if a path exists between them. If any two vertices in undirected graph G are connected, G is called a connected graph. Otherwise, G is called an unconnected graph.

A connected sub-graph with a large number of vertices is called a connected component. This algorithm calculates connected component members of each vertex, and outputs the connected component of the vertex value that includes the smallest vertex ID.

The smallest vertex ID is transmitted along edges to all vertices of the connected component.

Source code

  1. import java.io.IOException;
  2. import com.aliyun.odps.data.TableInfo;
  3. import com.aliyun.odps.graph.ComputeContext;
  4. import com.aliyun.odps.graph.GraphJob;
  5. import com.aliyun.odps.graph.GraphLoader;
  6. import com.aliyun.odps.graph.MutationContext;
  7. import com.aliyun.odps.graph.Vertex;
  8. import com.aliyun.odps.graph.WorkerContext;
  9. import com.aliyun.odps.graph.examples.SSSP.MinLongCombiner;
  10. import com.aliyun.odps.io.LongWritable;
  11. import com.aliyun.odps.io.NullWritable;
  12. import com.aliyun.odps.io.WritableRecord;
  13. /**
  14. * Compute the connected component membership of each vertex and output
  15. * each vertex whose value containing the smallest ID in the connected
  16. * component containing that vertex.
  17. *
  18. * Algorithm: Propagate the smallest vertex ID along the edges to all
  19. * vertices of a connected component.
  20. *
  21. */
  22. public class ConnectedComponents {
  23. public static class CCVertex extends
  24. Vertex<LongWritable, LongWritable, NullWritable, LongWritable> {
  25. @Override
  26. public void compute(
  27. ComputeContext<LongWritable, LongWritable, NullWritable, LongWritable> context,
  28. Iterable<LongWritable> msgs) throws IOException {
  29. if (context.getSuperstep() == 0L) {
  30. this.setValue(getId());
  31. context.sendMessageToNeighbors(this, getValue());
  32. return;
  33. }
  34. long minID = Long.MAX_VALUE;
  35. for (LongWritable id : msgs) {
  36. if (id.get() < minID) {
  37. minID = id.get();
  38. }
  39. }
  40. if (minID < this.getValue().get()) {
  41. this.setValue(new LongWritable(minID));
  42. context.sendMessageToNeighbors(this, getValue());
  43. } else {
  44. this.voteToHalt();
  45. }
  46. }
  47. /**
  48. * Output table description:
  49. * +-----------------+----------------------------------------+
  50. * | Field | Type | Comment |
  51. * +-----------------+----------------------------------------+
  52. * | v | bigint | vertex ID |
  53. * | minID | bigint | smallest ID in the connected component |
  54. * +-----------------+----------------------------------------+
  55. */
  56. @Override
  57. public void cleanup(
  58. WorkerContext<LongWritable, LongWritable, NullWritable, LongWritable> context)
  59. throws IOException {
  60. context.write(getId(), getValue());
  61. }
  62. }
  63. /**
  64. * Input table description:
  65. * +-----------------+----------------------------------------------------+
  66. * | Field | Type | Comment |
  67. * +-----------------+----------------------------------------------------+
  68. * | v | bigint | vertex ID |
  69. * | es | string | comma separated target vertex ID of outgoing edges |
  70. * +-----------------+----------------------------------------------------+
  71. *
  72. * Example:
  73. * For graph:
  74. * 1 ----- 2
  75. * | |
  76. * 3 ----- 4
  77. * Input table:
  78. * +-----------+
  79. * | v | es |
  80. * +-----------+
  81. * | 1 | 2,3 |
  82. * | 2 | 1,4 |
  83. * | 3 | 1,4 |
  84. * | 4 | 2,3 |
  85. * +-----------+
  86. */
  87. public static class CCVertexReader extends
  88. GraphLoader<LongWritable, LongWritable, NullWritable, LongWritable> {
  89. @Override
  90. public void load(
  91. LongWritable recordNum,
  92. WritableRecord record,
  93. MutationContext<LongWritable, LongWritable, NullWritable, LongWritable> context)
  94. throws IOException {
  95. CCVertex vertex = new CCVertex();
  96. vertex.setId((LongWritable) record.get(0));
  97. String[] edges = record.get(1).toString().split(",");
  98. for (int i = 0; i < edges.length; i++) {
  99. long destID = Long.parseLong(edges[i]);
  100. vertex.addEdge(new LongWritable(destID), NullWritable.get());
  101. }
  102. context.addVertexRequest(vertex);
  103. }
  104. }
  105. public static void main(String[] args) throws IOException {
  106. if (args.length < 2) {
  107. System.out.println("Usage: <input> <output>");
  108. System.exit(-1);
  109. }
  110. GraphJob job = new GraphJob();
  111. job.setGraphLoaderClass(CCVertexReader.class);
  112. job.setVertexClass(CCVertex.class);
  113. job.setCombinerClass(MinLongCombiner.class);
  114. job.addInput(TableInfo.builder().tableName(args[0]).build());
  115. job.addOutput(TableInfo.builder().tableName(args[1]).build());
  116. long startTime = System.currentTimeMillis();
  117. job.run();
  118. System.out.println("Job Finished in "
  119. + (System.currentTimeMillis() - startTime) / 1000.0 + " seconds");
  120. }
  121. }
Thank you! We've received your feedback.