All Products
Search
Document Center

Simple Log Service:Use Storm to consume log data

Last Updated:Aug 24, 2023

Simple Log Service LogHub provides efficient and reliable log collection and consumption channels. You can use various methods such as Logtail or SDKs to collect log data in real time. After log data is collected to Simple Log Service, you can use Storm to consume the log data in real time. To reduce the costs of consuming data, Simple Log Service provides LogHub Storm spouts to read data from Simple Log Service in real time.

Architecture and implementation

Figure 1 Architecture and implementationLogService_user_guide_0175.png

  • In the preceding figure, LogHub Storm spouts are enclosed in red dashed-line boxes. Each Storm topology has a group of spouts that work together to read data from a Logstore. Spouts in different topologies are independent of each other.

  • Each topology is identified by using the unique name of the consumer group that is specified in the topology. Spouts in a topology use a consumer group to consume log data. This helps implement load balancing among consumers and automatic failover. For more information, see Use consumer groups to consume data.

  • Spouts in a topology read data from a Logstorein real time and send the data to bolts in the topology. The spouts save consumption checkpoints to the LogHub server on a regular basis.

Note
  • You can specify up to 10 consumer groups for a Logstore. If you no longer require a consumer group, you can call the DeleteConsumerGroup operation to delete the consumer group.

  • We recommend that you configure the same number of spouts for a Logstore as the number of shards in the Logstore. If you use a single spout to process a large amount of data from multiple shards, your system performance may be affected.

  • If the amount of data in a shard exceeds the processing capacity of a single spout, you can split the shard into multiple shards. This reduces the amount of data in each shard.

  • LogHub Storm spouts and bolts must use the ack method to check whether log data is sent from spouts to bolts and whether the data is processed by the bolts.

Prerequisites

  • A Resource Access Management (RAM) user is created, and the required permissions are granted to the RAM user. For more information, see Create a RAM user and grant permissions to the RAM user.

  • The ALIBABA_CLOUD_ACCESS_KEY_ID and ALIBABA_CLOUD_ACCESS_KEY_SECRET environment variables are configured. For more information, see Configure environment variables.

    Important
    • The AccessKey pair of an Alibaba Cloud account has permissions on all API operations. We recommend that you use the AccessKey pair of a RAM user to call API operations or perform routine O&M.

    • We recommend that you do not save the AccessKey ID or AccessKey secret in your project code. Otherwise, the AccessKey pair may be leaked, and the security of all resources within your account may be compromised.

Examples

  • The following code provides an example on how to construct a Storm topology:

         public static void main( String[] args )
        {   // Use the local test mode.   
            String mode = "Local";  
           // Use the unique name of a consumer group to identify a topology. 
            String consumer_group_name = "";   
            String project = "";     
            String logstore = "";   
           // The Simple Log Service endpoint. 
            String endpoint = "";  
           // Configure environment variables. In this example, the AccessKey ID and AccessKey secret are obtained from environment variables. 
            String access_id = System.getenv("ALIBABA_CLOUD_ACCESS_KEY_ID"); 
            String access_key = System.getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET");
            // Configure settings for a Loghub Storm spout. 
            LogHubSpoutConfig config = new LogHubSpoutConfig(consumer_group_name,
                    endpoint, project, logstore, access_id,
                    access_key, LogHubCursorPosition.END_CURSOR);
            TopologyBuilder builder = new TopologyBuilder();
            // Create a LogHub Storm spout. 
            LogHubSpout spout = new LogHubSpout(config);
            // In actual business scenarios, we recommend that you create the same number of spouts for a Logstore as the number of shards in the Logstore. 
            builder.setSpout("spout", spout, 1);
            builder.setBolt("exclaim", new SampleBolt()).shuffleGrouping("spout");
            Config conf = new Config();
            conf.setDebug(false);
            conf.setMaxSpoutPending(1); 
            // If you use Kryo to serialize and deserialize data, explicitly specify LogGroupDataSerializSerializer. 
            Config.registerSerialization(conf, LogGroupData.class, LogGroupDataSerializSerializer.class);
            if (mode.equals("Local")) {
                logger.info("Local mode...");
                LocalCluster cluster  = new LocalCluster();
                cluster.submitTopology("test-jstorm-spout", conf, builder.createTopology());
                try {
                    Thread.sleep(6000 * 1000);   
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }  
                cluster.killTopology("test-jstorm-spout");
                cluster.shutdown();  
            } else if (mode.equals("Remote")) {
                logger.info("Remote mode...");
                conf.setNumWorkers(2);
                try {
                    StormSubmitter.submitTopology("stt-jstorm-spout-4", conf, builder.createTopology());
                } catch (AlreadyAliveException e) {
                    e.printStackTrace();
                } catch (InvalidTopologyException e) {
                    e.printStackTrace();
                }
            } else {
                logger.error("invalid mode: " + mode);
            }
        }
  • The following code provides an example on how to consume log data and display the content of each log by using bolts:

    public class SampleBolt extends BaseRichBolt {
        private static final long serialVersionUID = 475265688777440****;
        private static final Logger logger = Logger.getLogger(BaseBasicBolt.class);
        private OutputCollector mCollector;
        @Override
        public void prepare(@SuppressWarnings("rawtypes") Map stormConf, TopologyContext context,
                OutputCollector collector) {
            mCollector = collector;
        }
        @Override
        public void execute(Tuple tuple) {
            String shardId = (String) tuple
                    .getValueByField(LogHubSpout.FIELD_SHARD_ID);
            @SuppressWarnings("unchecked")
            List<LogGroupData> logGroupDatas = (ArrayList<LogGroupData>) tuple.getValueByField(LogHubSpout.FIELD_LOGGROUPS);
            for (LogGroupData groupData : logGroupDatas) {
                // Each log group consists of one or more logs. 
                LogGroup logGroup = groupData.GetLogGroup();
                for (Log log : logGroup.getLogsList()) {
                    StringBuilder sb = new StringBuilder();
                    // Each log has a time field and multiple fields formatted in key-value pairs. 
                    int log_time = log.getTime();
                    sb.append("LogTime:").append(log_time);
                    for (Content content : log.getContentsList()) {
                        sb.append("\t").append(content.getKey()).append(":")
                                .append(content.getValue());
                    }
                    logger.info(sb.toString());
                }
            }
            // LogHub spouts and bolts must use the ack method to check whether log data is sent from spouts to bolts and whether the data is processed by the bolts. 
            mCollector.ack(tuple);
        }
        @Override
        public void declareOutputFields(OutputFieldsDeclarer declarer) {
            //do nothing
        }
    }

Add Maven dependencies

  • The following code provides an example on how to add Maven dependencies for Storm 1.0 or earlier, such as Storm 0.9.6:

    <dependency>
      <groupId>com.aliyun.openservices</groupId>
      <artifactId>loghub-storm-spout</artifactId>
      <version>0.6.6</version>
    </dependency>
  • The following code provides an example on how to add Maven dependencies for Storm 1.0 or later:

    <dependency>
      <groupId>com.aliyun.openservices</groupId>
      <artifactId>loghub-storm-1.0-spout</artifactId>
      <version>0.1.3</version>
    </dependency>