Consumer groups allow you to focus on the business logic during log data consumption. You do not need to consider such as Log Service implementation, load balancing among consumers, and failovers that may be introduced when you use SDKs to consume log data.

Overview

The following table describes the basic terms of consumer groups and consumers.
Term Description
consumer group A consumer group consists of multiple consumers. The consumers in a consumer group consume data of a Logstore, and the data consumed by each consumer is different.
consumer In a consumer group, a consumer is the basic unit that consumes data. Each consumer name in a consumer group must be unique.
A Logstore has multiple shards. A consumer library allocates shards to consumers in a consumer group based on the following principles:
  • Each shard can be allocated to one consumer.
  • One consumer can consume data in multiple shards.

After a new consumer joins a consumer group, shards allocated to the consumer group are reallocated to each consumer for load balancing based on the preceding principles. The reallocation is invisible to users.

A consumer library can also store checkpoints. This allows consumers to resume consumption from a breakpoint and avoid repetitive consumption after a program fault is resolved.

Procedure

Log consumption by consumer groups is implemented in Java or Python. The following takes Java as an example to describe how consumer groups consume log data. For more information, visit Java, Python, and Golang in GitHub.

  1. Add Maven dependencies.
    <dependency>
      <groupId>com.google.protobuf</groupId>
      <artifactId>protobuf-java</artifactId>
      <version>2.5.0</version>
    </dependency>
    <dependency>
      <groupId>com.aliyun.openservices</groupId>
      <artifactId>loghub-client-lib</artifactId>
      <version>0.6.16</version>
    </dependency>
  2. Create a main.Java file.
    import com.aliyun.openservices.loghub.client.ClientWorker;
    import com.aliyun.openservices.loghub.client.config.LogHubConfig;
    import com.aliyun.openservices.loghub.client.exceptions.LogHubClientWorkerException;
    
    public class Main {
        // Specify the endpoint of Log Service based on your business needs.
        private static String sEndpoint = "cn-hangzhou.log.aliyuncs.com";
        // Specify a Log Service project name based on your business needs.
        private static String sProject = "ali-cn-hangzhou-sls-admin";
        // Specify a Logstore name of Log Service based on your business needs.
        private static String sLogstore = "sls_operation_log";
        // Specify a consumer group name based on your business needs.
        private static String sConsumerGroup = "consumerGroupX";
        // Specify the AccessKey pair for data consumption based on your business needs.
        private static String sAccessKeyId = "";
        private static String sAccessKey = "";
    
        public static void main(String[] args) throws LogHubClientWorkerException, InterruptedException {
            // The second parameter is the consumer name. Each consumer name in a consumer group must be unique. However, the names of consumer groups can be the same. When different consumers start multiple processes on multiple servers to consume the data of a Logstore, you can use a server IP address to identify a consumer. The ninth parameter maxFetchLogGroupSize indicates the maximum number of log groups retrieved from the server at a time. The value of this parameter ranges from 1 to 1000. You can use the default value or specify a value based on your needs.
            LogHubConfig config = new LogHubConfig(sConsumerGroup, "consumer_1", sEndpoint, sProject, sLogstore, sAccessKeyId, sAccessKey, LogHubConfig.ConsumePosition.BEGIN_CURSOR);
            ClientWorker worker = new ClientWorker(new SampleLogHubProcessorFactory(), config);
            Thread thread = new Thread(worker);
            // The ClientWorker instance runs automatically after a thread is executed and implements the Runnable interface.
            thread.start();
            Thread.sleep(60 * 60 * 1000);
            // The shutdown function of the ClientWorker instance is called to exit the consumption instance. The associated thread is stopped automatically.
            worker.shutdown();
            // Multiple asynchronous tasks are generated when the ClientWorker instance is running. We recommend that you wait 30 seconds so that all running tasks exit after shutdown.
            Thread.sleep(30 * 1000);
        }
    }
  3. Create a SampleLogHubProcessor.java file.
    import com.aliyun.openservices.log.common.FastLog;
    import com.aliyun.openservices.log.common.FastLogContent;
    import com.aliyun.openservices.log.common.FastLogGroup;
    import com.aliyun.openservices.log.common.FastLogTag;
    import com.aliyun.openservices.log.common.LogGroupData;
    import com.aliyun.openservices.loghub.client.ILogHubCheckPointTracker;
    import com.aliyun.openservices.loghub.client.exceptions.LogHubCheckPointException;
    import com.aliyun.openservices.loghub.client.interfaces.ILogHubProcessor;
    import com.aliyun.openservices.loghub.client.interfaces.ILogHubProcessorFactory;
    
    import java.util.List;
    
    public class SampleLogHubProcessor implements ILogHubProcessor {
        private int shardId;
        // Record the last persistent checkpoint time.
        private long mLastCheckTime = 0;
    
        public void initialize(int shardId) {
            this.shardId = shardId;
        }
    
        // The main logic of data consumption. All exceptions must be captured and cannot be thrown.
        public String process(List<LogGroupData> logGroups,
                              ILogHubCheckPointTracker checkPointTracker) {
            // Display the retrieved data.
            for (LogGroupData logGroup : logGroups) {
                FastLogGroup flg = logGroup.GetFastLogGroup();
                System.out.println(String.format("\tcategory\t:\t%s\n\tsource\t:\t%s\n\ttopic\t:\t%s\n\tmachineUUID\t:\t%s",
                        flg.getCategory(), flg.getSource(), flg.getTopic(), flg.getMachineUUID()));
                System.out.println("Tags");
                for (int tagIdx = 0; tagIdx < flg.getLogTagsCount(); ++tagIdx) {
                    FastLogTag logtag = flg.getLogTags(tagIdx);
                    System.out.println(String.format("\t%s\t:\t%s", logtag.getKey(), logtag.getValue()));
                }
                for (int lIdx = 0; lIdx < flg.getLogsCount(); ++lIdx) {
                    FastLog log = flg.getLogs(lIdx);
                    System.out.println("--------\nLog: " + lIdx + ", time: " + log.getTime() + ", GetContentCount: " + log.getContentsCount());
                    for (int cIdx = 0; cIdx < log.getContentsCount(); ++cIdx) {
                        FastLogContent content = log.getContents(cIdx);
                        System.out.println(content.getKey() + "\t:\t" + content.getValue());
                    }
                }
            }
            long curTime = System.currentTimeMillis();
            // Write checkpoints to the server every 30 seconds. If a ClientWorker instance crashes within 30 seconds,
            // a new ClientWorker instance consumes data starting from the last checkpoint. Duplicate data may exist.
            if (curTime - mLastCheckTime > 30 * 1000) {
                try {
                    // If the parameter is set to true, checkpoints are updated to the server immediately. If the parameter is set to false, checkpoints are cached locally. The default update interval of checkpoints is 60 seconds.
                    checkPointTracker.saveCheckPoint(true);
                } catch (LogHubCheckPointException e) {
                    e.printStackTrace();
                }
                mLastCheckTime = curTime;
            }
            return null;
        }
    
        // The ClientWorker instance calls this function upon exit. You can perform a cleanup.
        public void shutdown(ILogHubCheckPointTracker checkPointTracker) {
            // Save consumption breakpoints to the server.
            try {
                checkPointTracker.saveCheckPoint(true);
            } catch (LogHubCheckPointException e) {
                e.printStackTrace();
            }
        }
    }
    
    class SampleLogHubProcessorFactory implements ILogHubProcessorFactory {
        public ILogHubProcessor generatorProcessor() {
            // Generate a consumption instance.
            return new SampleLogHubProcessor();
        }
    }
    Note Run the preceding code to print all data in a Logstore. If you want multiple consumers to consume a Logstore, you can modify the code based on the comments. You can use the same consumer group name and different consumer names to start a new consumption process.

Limits and troubleshooting

A maximum of 10 consumer groups can be created for each Logstore. The ConsumerGroupQuotaExceed error is reported when the number of consumer groups exceeds 10.

We recommend that you configure Log4j for the consumer program to print error messages within consumer groups for troubleshooting. If you save the log4j.properties file to the resources directory and execute the program, the following exception occurs:
[WARN ] 2018-03-14 12:01:52,747 method:com.aliyun.openservices.loghub.client.LogHubConsumer.sampleLogError(LogHubConsumer.java:159)
com.aliyun.openservices.log.exception.LogException: Invalid loggroup count, (0,1000]
A typical log4j. properties configuration file is as follows:
log4j.rootLogger = info,stdout
log4j.appender.stdout = org.apache.log4j.ConsoleAppender
log4j.appender.stdout.Target = System.out
log4j.appender.stdout.layout = org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern = [%-5p] %d{yyyy-MM-dd HH:mm:ss,SSS} method:%l%n%m%n

Advanced operations

The preceding code can help you consume data in common scenarios. The following describes how to perform advanced operations in other scenarios.

  • Consume data that is logged from a certain time point
    LoghubConfig in the preceding code has two constructors:
    // The value of the consumerStartTimeInSeconds parameter is a UNIX timestamp representing the number of seconds that have elapsed since 00:00:00 on January 1, 1970, 00:00:00 UTC.
    public LogHubConfig(String consumerGroupName, 
                          String consumerName, 
                          String loghubEndPoint,
                          String project, String logStore,
                          String accessId, String accessKey,
                          int consumerStartTimeInSeconds);
    // The position parameter is an enumeration variable. LogHubConfig.ConsumePosition. BEGIN_CURSOR indicates that the consumption starts from the earliest data. LogHubConfig. ConsumePosition. END_CURSOR indicates that the consumption starts from the latest data.
    public LogHubConfig(String consumerGroupName, 
                          String consumerName, 
                          String loghubEndPoint,
                          String project, String logStore,
                          String accessId, String accessKey,
                          ConsumePosition position);

    You can use different constructors based on your needs. However, if a checkpoint is stored on the server, you need to start data consumption from the checkpoint.

  • Reset a checkpoint

    In case of data padding or repeated computing, you may need to set the consumption position to a time point for a consumer group to start data consumption. To do this, you can use either of the following two methods.

    • Delete the consumer group.
      1. Stop the consumption processes.
      2. Delete the consumer group from the console.
      3. Modify the code to specify the start time point for data consumption.
      4. Restart the consumption processes.
    • Use the SDK to reset the start time point of data consumption for the consumer group.
      1. Stop the consumption processes.
      2. Use the SDK to modify the checkpoint.
      3. Restart the consumption processes.
    public static void updateCheckpoint() throws Exception {
            Client client = new Client(host, accessId, accessKey);
            long timestamp = Timestamp.valueOf("2017-11-15 00:00:00").getTime() / 1000;
            ListShardResponse response = client.ListShard(new ListShardRequest(project, logStore));
            for (Shard shard : response.GetShards()) {
                int shardId = shard.GetShardId();
                String cursor = client.GetCursor(project, logStore, shardId, timestamp).GetCursor();
                client.UpdateCheckPoint(project, logStore, consumerGroup, shardId, cursor);
            }
        }

Consumer group status and alerts

Access consumer groups as a RAM user

Before a RAM user can access consumer groups, relevant permissions must be granted to the user. For more information about how to grant permissions to a RAM user, see Overview.

The following table lists the actions you can take as a RAM user.
Action Resource
log:GetCursorOrData acs:log:${regionName}:${projectOwnerAliUid}:project/${projectName}/logstore/${logstoreName}
log:CreateConsumerGroup acs:log:${regionName}:${projectOwnerAliUid}:project/${projectName}/logstore/${logstoreName}/consumergroup/*
log:ListConsumerGroup acs:log:${regionName}:${projectOwnerAliUid}:project/${projectName}/logstore/${logstoreName}/consumergroup/*
log:ConsumerGroupUpdateCheckPoint acs:log:${regionName}:${projectOwnerAliUid}:project/${projectName}/logstore/${logstoreName}/consumergroup/${consumerGroupName}
log:ConsumerGroupHeartBeat acs:log:${regionName}:${projectOwnerAliUid}:project/${projectName}/logstore/${logstoreName}/consumergroup/${consumerGroupName}
log:UpdateConsumerGroup acs:log:${regionName}:${projectOwnerAliUid}:project/${projectName}/logstore/${logstoreName}/consumergroup/${consumerGroupName}
log:GetConsumerGroupCheckPoint acs:log:${regionName}:${projectOwnerAliUid}:project/${projectName}/logstore/${logstoreName}/consumergroup/${consumerGroupName}
For example, a project named project-test resides in China (Hangzhou). The ID of the Alibaba Cloud account to which the project belongs is 1234567. The name of the Logstore to be consumed is logstore-test and the consumer group name is consumergroup-test. To enable a RAM user to access the consumer group, you must grant the following permissions to the user.
{
  "Version": "1",
  "Statement": [
    {
      "Effect": "Allow",
      "Action": [
        "log:GetCursorOrData"
      ],
      "Resource": "acs:log:cn-hangzhou:1234567:project/project-test/logstore/logstore-test"
    },
    {
      "Effect": "Allow",
      "Action": [
        "log:CreateConsumerGroup",
        "log:ListConsumerGroup"
      ],
      "Resource": "acs:log:cn-hangzhou:1234567:project/project-test/logstore/logstore-test/consumergroup/*"
    },
    {
      "Effect": "Allow",
      "Action": [
        "log:ConsumerGroupUpdateCheckPoint",
        "log:ConsumerGroupHeartBeat",
        "log:UpdateConsumerGroup",
        "log:GetConsumerGroupCheckPoint"
      ],
      "Resource": "acs:log:cn-hangzhou:1234567:project/project-test/logstore/logstore-test/consumergroup/consumergroup-test"
    }
  ]
}