edit-icon download-icon

Use Storm to consume LogHub logs

Last Updated: Mar 19, 2018

LogHub of Log Service provides an efficient and reliable log channel for collecting log data in real time by using Logtail and SDKs. After collecting logs, you can consume the data written to LogHub by using real-time systems such as Spark Stream and Storm.

Log Service provides LogHub Storm spout to read data from LogHub in real time, reducing the cost of LogHub consumption for Storm users.

Basic architecture and process

1

  • In the preceding figure, the LogHub Storm spout is enclosed in the red dotted box. Each Storm topology has a group of spouts to read all the data from a Logstore. The spouts in different topologies do not affect each other.
  • Each topology is identified by a unique LogHub consumer group name. Spouts in the same topology use the LogHub client lib to achieve load balancing and automatic failover.
  • Spouts read data from LogHub in real time, send data to the bolt nodes of the topology, and periodically save consumption endpoint as checkpoint to LogHub.

Limits

  • To prevent misuse, each Logstore supports up to five consumer groups. You can use the DeleteConsumerGroup interface of the Java SDK to delete unused consumer groups.
  • We recommend that the number of spouts is the same as that of shards. Otherwise, a single spout may not process a large amount of data.
  • If a shard contains a large amount of data exceeding the processing capability of a single spout, you can use the shard split interface to split the shard and reduce the data volume of each shard.
  • Dependency on the Storm ACK is required in LogHub spouts to confirm that spouts correctly send messages to bolts. Therefore, bolts must call ACK for confirmation.

Usage example

Spout (used to build topology)

  1. public static void main( String[] args )
  2. {
  3. String mode = "Local"; // Use the local test mode.
  4. String conumser_group_name = ""; // Specify a unique consumer group name for each topology. The value cannot be empty. The value can be 3–63 characters long, contain lowercase letters, numbers, hyphens (-), and underscores (_), and must begin and end with a lowercase letter or number.
  5. String project = ""; // The Log Service project.
  6. String logstore = ""; // The Log Service Logstore.
  7. String endpoint = ""; // The domain name used to access Log Service.
  8. String access_id = ""; // Your AccessKey.
  9. String access_key = "";
  10. // Configurations required for building a LogHub Storm spout.
  11. LogHubSpoutConfig config = new LogHubSpoutConfig(conumser_group_name,
  12. endpoint, project, logstore, access_id,
  13. access_key, LogHubCursorPosition.END_CURSOR);
  14. TopologyBuilder builder = new TopologyBuilder();
  15. // Build a LogHub Storm spout.
  16. LogHubSpout spout = new LogHubSpout(config);
  17. // The number of spouts can be the same as that of Logstore shards in actual scenarios.
  18. builder.setSpout("spout", spout, 1);
  19. builder.setBolt("exclaim", new SampleBolt()).shuffleGrouping("spout");
  20. Config conf = new Config();
  21. conf.setDebug(false);
  22. conf.setMaxSpoutPending(1);
  23. // The serialization method LogGroupDataSerializSerializer of LogGroupData must be configured explicitly when Kryo is used for data serialization and deserialization.
  24. Config.registerSerialization(conf, LogGroupData.class, LogGroupDataSerializSerializer.class);
  25. if (mode.equals("Local")) {
  26. logger.info("Local mode...");
  27. LocalCluster cluster = new LocalCluster();
  28. cluster.submitTopology("test-jstorm-spout", conf, builder.createTopology());
  29. try {
  30. Thread.sleep(6000 * 1000); //waiting for several minutes
  31. } catch (InterruptedException e) {
  32. // TODO Auto-generated catch block
  33. e.printStackTrace();
  34. }
  35. cluster.killTopology("test-jstorm-spout");
  36. cluster.shutdown();
  37. } else if (mode.equals("Remote")) {
  38. logger.info("Remote mode...");
  39. conf.setNumWorkers(2);
  40. try {
  41. StormSubmitter.submitTopology("stt-jstorm-spout-4", conf, builder.createTopology());
  42. } catch (AlreadyAliveException e) {
  43. // TODO Auto-generated catch block
  44. e.printStackTrace();
  45. } catch (InvalidTopologyException e) {
  46. // TODO Auto-generated catch block
  47. e.printStackTrace();
  48. }
  49. } else {
  50. logger.error("invalid mode: " + mode);
  51. }
  52. }
  53. }

Bolt

The following bolt code example consumes data and only prints the contents of each log.

  1. public class SampleBolt extends BaseRichBolt {
  2. private static final long serialVersionUID = 4752656887774402264L;
  3. private static final Logger logger = Logger.getLogger(BaseBasicBolt.class);
  4. private OutputCollector mCollector;
  5. @Override
  6. public void prepare(@SuppressWarnings("rawtypes") Map stormConf, TopologyContext context,
  7. OutputCollector collector) {
  8. mCollector = collector;
  9. }
  10. @Override
  11. public void execute(Tuple tuple) {
  12. String shardId = (String) tuple
  13. .getValueByField(LogHubSpout.FIELD_SHARD_ID);
  14. @SuppressWarnings("unchecked")
  15. List<LogGroupData> logGroupDatas = (ArrayList<LogGroupData>) tuple.getValueByField(LogHubSpout.FIELD_LOGGROUPS);
  16. for (LogGroupData groupData : logGroupDatas) {
  17. // Each LogGroup consists of one or more logs.
  18. LogGroup logGroup = groupData.GetLogGroup();
  19. for (Log log : logGroup.getLogsList()) {
  20. StringBuilder sb = new StringBuilder();
  21. // Each log has a time field and multiple key-value pairs.
  22. int log_time = log.getTime();
  23. sb.append("LogTime:").append(log_time);
  24. for (Content content : log.getContentsList()) {
  25. sb.append("\t").append(content.getKey()).append(":")
  26. .append(content.getValue());
  27. }
  28. logger.info(sb.toString());
  29. }
  30. }
  31. // The dependency on the Storm ACK is required in LogHub spouts to confirm that spouts correctly send messages to bolts. Therefore, bolts must call ACK for confirmation.
  32. mCollector.ack(tuple);
  33. }
  34. @Override
  35. public void declareOutputFields(OutputFieldsDeclarer declarer) {
  36. //do nothing
  37. }
  38. }

Maven

Use the following code for versions earlier than Storm 1.0 (for example, 0.9.6):

  1. <dependency>
  2. <groupId>com.aliyun.openservices</groupId>
  3. <artifactId>loghub-storm-spout</artifactId>
  4. <version>0.6.5</version>
  5. </dependency>

Use the following code for Storm 1.0 and later versions:

  1. <dependency>
  2. <groupId>com.aliyun.openservices</groupId>
  3. <artifactId>loghub-storm-1.0-spout</artifactId>
  4. <version>0.1.2</version>
  5. </dependency>
Thank you! We've received your feedback.