Log Service の LogHub は、Logtail と SDK を使用して、リアルタイムにログデータを収集するための効率的で信頼性の高いログチャネルを提供します。 ログを収集した後、Spark Stream や Storm といったリアルタイムシステムを使用して LogHub に書き込まれたデータを使用することができます。

Log Service は、LogHub Storm spout を使用して LogHub からリアルタイムでデータを読み取り、Storm ユーザーの LogHub 消費コストを削減します。

基本的なアーキテクチャとプロセス

図 1. 基本的なアーキテクチャとプロセス
  • 上の図では、LogHub Storm 吐き出し口が赤い点線で囲まれています。 各 Storm ト ポロジには、Logstore からすべてのデータを読み取るスパウトのグループがあります。 異なるトポロジーのスパウトはお互いに影響しません。
  • 各トポロジは、一意の LogHub コンシューマグループ名によって識別されます。 同じトポロジのスパウトは、コンシューマライブラリを使用して負荷分散と自動フェールオーバーを実現します。
  • Spouts はリアルタイムで LogHub からデータを読み取り、トポロジーのボルトノードにデータを送信し、Checkpoint として定期的に消費端点を LogHub に保存します。

制限

  • 誤使用を防ぐため、各ログストアは最大 5 つのコンシューマグループをサポートしています。 Java SDK の DeleteConsumerGroup インタフェースを使用して、未使用のコンシューマグループを削除できます。 不要なコンシューマグループを削除できます。
  • スパウトの数はシャードの数と同じであることを推奨します。 それ以外の場合、単一のスパウトが大量のデータを処理しないことがあります。
  • シャードに単一スパウトの処理能力を超える大量のデータが含まれている場合は、シャード分割インターフェースを使用してシャードを分割し、各シャードのデータ量を減らすことができます。
  • Storm への依存 LogHub スパウトでスパウトがメッセージを正しくボルトに送信することを確認するには、ACK が必要です。 したがって、ボルトは確認のために ACK を呼び出す必要です。

使用例

  • Spout (トポロジの構築に使用) topology)
         public static void main( String[] args )
        {     
            String mode = "Local"; // Use the local test mode.
               String conumser_group_name = ""; // Specify a unique consumer group name for each topology. The value cannot be empty. The value can be 3–63 characters long, contain lowercase letters, numbers, hyphens (-), and underscores (_), and must begin and end with a lowercase letter or number.
            String project = ""; // The Log Service project. 
            String logstore = ""; // The Log Service Logstore.
            String endpoint = "";   // Domain of the Log Service
            String access_id = "";  // User's access key
            String access_key = "";
            // Configurations required for building a LogHub Storm spout.
            Loghubspoutconfig Config = new loghubspoutconfig (conumser_group_name,
                    endpoint, project, logstore, access_id,
                    access_key, LogHubCursorPosition.END_CURSOR);
            Topologybuilder builder = new topologybuilder ();
            // 构建 loghub storm spout
            Loghubspout spin = new (config );
            // The number of spouts can be the same as that of Logstore shards in actual scenarios.
            builder.setSpout("spout", spout, 1);
            builder.setBolt("exclaim", new SampleBolt()).shuffleGrouping("spout");
            Config conf = new Config();
            conf.setDebug(false);
            conf.setMaxSpoutPending(1); 
            // The serialization method LogGroupDataSerializSerializer of LogGroupData must be configured explicitly when Kryo is used for data serialization and deserialization.
            Config.registerSerialization(conf, LogGroupData.class, LogGroupDataSerializSerializer.class);
            if (mode.equals("Local")) {
                logger.info("Local mode...") ;
                LocalCluster cluster = new LocalCluster();
                cluster.submitTopology("test-jstorm-spout", conf, builder.createTopology());
                try {
                    Thread.sleep(6000 * 1000); //waiting for several minutes
                } catch (InterruptedException e) {
                    // TODO Auto-generated catch block
                    e.printStackTrace();
                }  
                cluster.killTopology("test-jstorm-spout");
                cluster.shutdown();  
            } else if (mode.equals("Remote")) {
                logger.info("Remote mode...");
                conf.setNumWorkers(2);
                try {
                    StormSubmitter.submitTopology("stt-jstorm-spout-4", conf, builder.createTopology());
                } catch (AlreadyAliveException e) {
                    // TODO Auto-generated catch block
                    e.printStackTrace();
                } catch (InvalidTopologyException e) {
                    // TODO Auto-generated catch block
                    e.printStackTrace();
                }
            } else {
                logger.error("invalid mode: " + mode);
            }
        }
  • 次のボルトコードの例では、データを消費し、 各ログの内容のみを出力します。
    public class SampleBolt extends BaseRichBolt {
        private static final long serialVersionUID = 4752656887774402264L;
        private static final Logger logger = Logger.getLogger(BaseBasicBolt.class);
        private OutputCollector mCollector;
        @Override
        public void prepare(@SuppressWarnings("rawtypes") Map stormConf, TopologyContext context,
                OutputCollector collector) {
            mCollector = collector;
        }
        @Override
        public void execute(Tuple tuple) {
            String shardId = (String) tuple
                    .getValueByField(LogHubSpout.FIELD_SHARD_ID);
            @SuppressWarnings("unchecked")
            List<LogGroupData> logGroupDatas = (ArrayList<LogGroupData>) tuple.getValueByField(LogHubSpout.FIELD_LOGGROUPS);
            for (LogGroupData groupData : logGroupDatas) {
                // Each LogGroup consists of one or more logs.
                LogGroup logGroup = groupData.GetLogGroup();
                for (Log log : logGroup.getLogsList()) {
                    StringBuilder sb = new StringBuilder();
                    // Each log has a time field and multiple key: value pairs,
                    int log_time = log.getTime();
                    sb.append("LogTime:").append(log_time);
                    for (Content content : log.getContentsList()) {
                        sb.append("\t").append(content.getKey()).append(":")
                                .append(content.getValue());
                    }
                    logger.info(sb.toString());
                }
            }
            // The dependency on the Storm ACK mechanism is mandatory in LogHub spouts to confirm that spouts send messages correctly
            // to bolts. Therefore, bolts must call ACK for such confirmation.
            mCollector.ack(tuple);
        }
        @Override
        public void declareOutputFields(OutputFieldsDeclarer declarer) {
            //do nothing
        }
    }

Maven

Storm 1.0 より前のバージョン(たとえば、0.9.6)では、次のコードを使用します。

<dependency>
  <groupId>com.aliyun.openservices</groupId>
  <artifactId>loghub-storm-spout</artifactId>
  <version>0.6.5</version>
</dependency>

Use the following code for Storm 1.0 and later versions:

<dependency>
  <groupId>com.aliyun.openservices</groupId>
  <artifactId>loghub-storm-1.0-spout</artifactId>
  <version>0.1.2</version>
</dependency>