Simple Log Service LogHub provides efficient and reliable log collection and consumption channels. You can use various methods such as Logtail or SDKs to collect log data in real time. After log data is collected to Simple Log Service, you can use Storm to consume the log data in real time. To reduce the costs of consuming data, Simple Log Service provides LogHub Storm spouts to read data from Simple Log Service in real time.
Architecture and implementation
Prerequisites
A Resource Access Management (RAM) user is created, and the required permissions are granted to the RAM user. For more information, see Create a RAM user and grant permissions to the RAM user.
The ALIBABA_CLOUD_ACCESS_KEY_ID and ALIBABA_CLOUD_ACCESS_KEY_SECRET environment variables are configured. For more information, see Configure environment variables.
ImportantThe AccessKey pair of an Alibaba Cloud account has permissions on all API operations. We recommend that you use the AccessKey pair of a RAM user to call API operations or perform routine O&M.
We recommend that you do not save the AccessKey ID or AccessKey secret in your project code. Otherwise, the AccessKey pair may be leaked, and the security of all resources within your account may be compromised.
Examples
The following code provides an example on how to construct a Storm topology:
public static void main( String[] args ) { // Use the local test mode. String mode = "Local"; // Use the unique name of a consumer group to identify a topology. String consumer_group_name = ""; String project = ""; String logstore = ""; // The Simple Log Service endpoint. String endpoint = ""; // Configure environment variables. In this example, the AccessKey ID and AccessKey secret are obtained from environment variables. String access_id = System.getenv("ALIBABA_CLOUD_ACCESS_KEY_ID"); String access_key = System.getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET"); // Configure settings for a Loghub Storm spout. LogHubSpoutConfig config = new LogHubSpoutConfig(consumer_group_name, endpoint, project, logstore, access_id, access_key, LogHubCursorPosition.END_CURSOR); TopologyBuilder builder = new TopologyBuilder(); // Create a LogHub Storm spout. LogHubSpout spout = new LogHubSpout(config); // In actual business scenarios, we recommend that you create the same number of spouts for a Logstore as the number of shards in the Logstore. builder.setSpout("spout", spout, 1); builder.setBolt("exclaim", new SampleBolt()).shuffleGrouping("spout"); Config conf = new Config(); conf.setDebug(false); conf.setMaxSpoutPending(1); // If you use Kryo to serialize and deserialize data, explicitly specify LogGroupDataSerializSerializer. Config.registerSerialization(conf, LogGroupData.class, LogGroupDataSerializSerializer.class); if (mode.equals("Local")) { logger.info("Local mode..."); LocalCluster cluster = new LocalCluster(); cluster.submitTopology("test-jstorm-spout", conf, builder.createTopology()); try { Thread.sleep(6000 * 1000); } catch (InterruptedException e) { e.printStackTrace(); } cluster.killTopology("test-jstorm-spout"); cluster.shutdown(); } else if (mode.equals("Remote")) { logger.info("Remote mode..."); conf.setNumWorkers(2); try { StormSubmitter.submitTopology("stt-jstorm-spout-4", conf, builder.createTopology()); } catch (AlreadyAliveException e) { e.printStackTrace(); } catch (InvalidTopologyException e) { e.printStackTrace(); } } else { logger.error("invalid mode: " + mode); } }
The following code provides an example on how to consume log data and display the content of each log by using bolts:
public class SampleBolt extends BaseRichBolt { private static final long serialVersionUID = 475265688777440****; private static final Logger logger = Logger.getLogger(BaseBasicBolt.class); private OutputCollector mCollector; @Override public void prepare(@SuppressWarnings("rawtypes") Map stormConf, TopologyContext context, OutputCollector collector) { mCollector = collector; } @Override public void execute(Tuple tuple) { String shardId = (String) tuple .getValueByField(LogHubSpout.FIELD_SHARD_ID); @SuppressWarnings("unchecked") List<LogGroupData> logGroupDatas = (ArrayList<LogGroupData>) tuple.getValueByField(LogHubSpout.FIELD_LOGGROUPS); for (LogGroupData groupData : logGroupDatas) { // Each log group consists of one or more logs. LogGroup logGroup = groupData.GetLogGroup(); for (Log log : logGroup.getLogsList()) { StringBuilder sb = new StringBuilder(); // Each log has a time field and multiple fields formatted in key-value pairs. int log_time = log.getTime(); sb.append("LogTime:").append(log_time); for (Content content : log.getContentsList()) { sb.append("\t").append(content.getKey()).append(":") .append(content.getValue()); } logger.info(sb.toString()); } } // LogHub spouts and bolts must use the ack method to check whether log data is sent from spouts to bolts and whether the data is processed by the bolts. mCollector.ack(tuple); } @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { //do nothing } }
Add Maven dependencies
The following code provides an example on how to add Maven dependencies for Storm 1.0 or earlier, such as Storm 0.9.6:
<dependency> <groupId>com.aliyun.openservices</groupId> <artifactId>loghub-storm-spout</artifactId> <version>0.6.6</version> </dependency>
The following code provides an example on how to add Maven dependencies for Storm 1.0 or later:
<dependency> <groupId>com.aliyun.openservices</groupId> <artifactId>loghub-storm-1.0-spout</artifactId> <version>0.1.3</version> </dependency>