Consume LogHub logs by using Storm

Last Updated: Jul 05, 2017

LogHub of Log Service provides an efficient and reliable log channel for collecting log data through Logtail and SDKs. You can access real-time systems such as Spark Streaming and Storm to consume the data written to LogHub.

The LogHub Storm spout feature reads data from LogHub in real time, reducing Storm users’ cost for LogHub consumption.

Basic architecture and process

1

  • In the preceding figure, LogHub Storm spouts are enclosed in the red dotted boxes. Each Storm topology has a group of spouts to read data from a Logstore. The spouts in different topologies are independent of each other.
  • Each topology is identified by a unique LogHub consumer group name. The LogHub client lib is used for load balancing and automatic failover among the spouts in the same topology.
  • Spouts read data from LogHub in real time, send data to the bolt nodes of the topology, and periodically save consumption endpoints as checkpoints to the LogHub server.

Note:

  • To prevent misuse, each Logstore supports up to five consumer groups. You can use the DeleteConsumerGroup interface of the Java SDK to delete unused consumer groups.
  • It is recommended to have an equal number of spouts and shards. Otherwise, a single spout may be unable to process a large amount of data.
  • If a shard contains a large amount of data exceeding the processing capability of a single spout, you can use the shard split interface to reduce the per-shard data volume.
  • Dependency on the Storm ACK mechanism is mandatory in LogHub spouts to confirm that spouts correctly send messages to bolts. Therefore, bolts must call ACK for such confirmation.

Example

Spout (used for topology creation)

  1. public static void main( String[] args )
  2. {
  3. String mode = "Local"; // Uses the local test mode.
  4. String conumser_group_name = ""; // Each topology must be assigned a unique consumer group name. The name can be 3 to 63 characters in length, and can include English letters, digits, underscores (_), and hyphens (-). It cannot be null and must begin and end with lowercase letters or numbers.
  5. String project = ""; // Project of the Log Service
  6. String logstore = ""; // LogStore of the Log Service
  7. String endpoint = ""; // Domain of the Log Service
  8. String access_id = ""; // User's access key
  9. String access_key = "";
  10. // Configurations required for creating a LogHub Storm spout.
  11. LogHubSpoutConfig config = new LogHubSpoutConfig(conumser_group_name,
  12. endpoint, project, logstore, access_id,
  13. access_key, LogHubCursorPosition.END_CURSOR);
  14. TopologyBuilder builder = new TopologyBuilder();
  15. // Creates a LogHub Storm spout.
  16. LogHubSpout spout = new LogHubSpout(config);
  17. // In the actual condition, the number of spouts may be equal to the number of LogStore shards.
  18. builder.setSpout("spout", spout, 1);
  19. builder.setBolt("exclaim", new SampleBolt()).shuffleGrouping("spout");
  20. Config conf = new Config();
  21. conf.setDebug(false);
  22. conf.setMaxSpoutPending(1);
  23. // The serialization method LogGroupDataSerializSerializer of LogGroupData must be configured explicitly when Kryo is used for data serialization and deserialization.
  24. Config.registerSerialization(conf, LogGroupData.class, LogGroupDataSerializSerializer.class);
  25. if (mode.equals("Local")) {
  26. logger.info("Local mode...");
  27. LocalCluster cluster = new LocalCluster();
  28. cluster.submitTopology("test-jstorm-spout", conf, builder.createTopology());
  29. try {
  30. Thread.sleep(6000 * 1000); //waiting for several minutes
  31. } catch (InterruptedException e) {
  32. // TODO Auto-generated catch block
  33. e.printStackTrace();
  34. }
  35. cluster.killTopology("test-jstorm-spout");
  36. cluster.shutdown();
  37. } else if (mode.equals("Remote")) {
  38. logger.info("Remote mode...");
  39. conf.setNumWorkers(2);
  40. try {
  41. StormSubmitter.submitTopology("stt-jstorm-spout-4", conf, builder.createTopology());
  42. } catch (AlreadyAliveException e) {
  43. // TODO Auto-generated catch block
  44. e.printStackTrace();
  45. } catch (InvalidTopologyException e) {
  46. // TODO Auto-generated catch block
  47. e.printStackTrace();
  48. }
  49. } else {
  50. logger.error("invalid mode: " + mode);
  51. }
  52. }
  53. }

Sample code of bolts that consume data (only the content of each log is printed)

  1. public class SampleBolt extends BaseRichBolt {
  2. private static final long serialVersionUID = 4752656887774402264L;
  3. private static final Logger logger = Logger.getLogger(BaseBasicBolt.class);
  4. private OutputCollector mCollector;
  5. @Override
  6. public void prepare(@SuppressWarnings("rawtypes") Map stormConf, TopologyContext context,
  7. OutputCollector collector) {
  8. mCollector = collector;
  9. }
  10. @Override
  11. public void execute(Tuple tuple) {
  12. String shardId = (String) tuple
  13. .getValueByField(LogHubSpout.FIELD_SHARD_ID);
  14. @SuppressWarnings("unchecked")
  15. List<LogGroupData> logGroupDatas = (ArrayList<LogGroupData>) tuple.getValueByField(LogHubSpout.FIELD_LOGGROUPS);
  16. for (LogGroupData groupData : logGroupDatas) {
  17. // Each LogGroup consists of one or more logs.
  18. LogGroup logGroup = groupData.GetLogGroup();
  19. for (Log log : logGroup.getLogsList()) {
  20. StringBuilder sb = new StringBuilder();
  21. // Each log has a time field and multiple key–value pairs.
  22. int log_time = log.getTime();
  23. sb.append("LogTime:").append(log_time);
  24. for (Content content : log.getContentsList()) {
  25. sb.append("\t").append(content.getKey()).append(":")
  26. .append(content.getValue());
  27. }
  28. logger.info(sb.toString());
  29. }
  30. }
  31. // The dependency on the Storm ACK mechanism is mandatory in LogHub spouts to confirm that spouts correctly send messages
  32. // to bolts. Therefore, bolts must call ACK for such confirmation.
  33. mCollector.ack(tuple);
  34. }
  35. @Override
  36. public void declareOutputFields(OutputFieldsDeclarer declarer) {
  37. //do nothing
  38. }
  39. }

Maven

Use the following code for versions earlier than Storm 1.0 (for example, 0.9.6).

  1. <dependency>
  2. <groupId>com.aliyun.openservices</groupId>
  3. <artifactId>loghub-storm-spout</artifactId>
  4. <version>0.6.5</version>
  5. </dependency>

Use the following code for Storm 1.0 and later versions.

  1. <dependency>
  2. <groupId>com.aliyun.openservices</groupId>
  3. <artifactId>loghub-storm-1.0-spout</artifactId>
  4. <version>0.1.2</version>
  5. </dependency>
Thank you! We've received your feedback.