If you use consumer groups to consume log data, you do not need to consider factors such as Log Service implementation, load balancing among consumers, or failovers that may occur. You can focus on business logic during log data consumption.

Prerequisites

An SDK development environment is set up. For more information, see Overview of Log Service SDKs.

Terms

Term Description
consumer group You can use consumer groups to consume data in Log Service. Each consumer group consists of multiple consumers. All consumers in a consumer group consume data in a Logstore. The data consumed by one consumer cannot be consumed by another consumer. You can create up to 30 consumer groups for a Logstore.
consumer The consumers in a consumer group consume data. The name of each consumer in a consumer group must be unique.
Logstore A Logstore in Log Service is a unit that is used to collect, store, and query log data. For more information, see Logstore.
shard A shard is used to control the read and write capacity of a Logstore. In Log Service, data is stored in shards. Each shard has an MD5 hash range, and each range is a left-closed and right-open interval. Each range does not overlap with the ranges of other shards. Each range must be within the entire MD5 hash range [00000000000000000000000000000000,ffffffffffffffffffffffffffffffff). For more information, see Shard.
checkpoint A checkpoint is the position at which a program stops consuming data. If the program is restarted, it consumes data from the last checkpoint.
A Logstore has multiple shards. Log Service allocates shards to the consumers in a consumer group based on the following rules:
  • Each shard can be allocated to only one consumer.
  • A consumer can consume data from multiple shards.

After a new consumer is added to a consumer group, the shards allocated to the consumer group are reallocated to each consumer for load balancing. The shards are reallocated based on the preceding rules.

When you use consumer groups to consume data, Log Service stores checkpoints. Consumers can resume data consumption from a checkpoint without repeatedly consuming data after a program fault is resolved.

Procedure

You can use Log Service SDK for Java, C++, Python, or Go to create consumer groups and consume data. In this example, Log Service SDK for Java is used.

  1. Add Maven dependencies.
    <dependency>
      <groupId>com.google.protobuf</groupId>
      <artifactId>protobuf-java</artifactId>
      <version>2.5.0</version>
    </dependency>
    <dependency>
      <groupId>com.aliyun.openservices</groupId>
      <artifactId>loghub-client-lib</artifactId>
      <version>0.6.33</version>
    </dependency>
  2. Create a Main.java file.
    import com.aliyun.openservices.loghub.client.ClientWorker;
    import com.aliyun.openservices.loghub.client.config.LogHubConfig;
    import com.aliyun.openservices.loghub.client.exceptions.LogHubClientWorkerException;
    
    public class Main {
        // The Log Service endpoint. Set the parameter based on your business scenario. For more information, see Endpoints. 
        private static String sEndpoint = "cn-hangzhou.log.aliyuncs.com";
        // The name of the Log Service project. Set the parameter based on your business scenario. 
        private static String sProject = "ali-cn-hangzhou-sls-admin";
        // The name of the Logstore. Set the parameter based on your business scenario. 
        private static String sLogstore = "sls_operation_log";
        // The name of the consumer group. Set the parameter based on your business scenario. 
        private static String sConsumerGroup = "consumerGroupX";
        // The AccessKey pair that is used to consume data. Specify the AccessKey ID and AccessKey secret based on your business scenario. For more information, see AccessKey pair. 
        private static String sAccessKeyId = "";
        private static String sAccessKey = "";
    
        public static void main(String[] args) throws LogHubClientWorkerException, InterruptedException {
            // consumer_1 is the name of a consumer. The name of each consumer in a consumer group must be unique. If different consumers start multiple processes on multiple servers to consume the data of a Logstore, you can use a server IP address to identify a consumer. 
            // The maxFetchLogGroupSize parameter specifies the maximum number of log groups that you can obtain from the server at the same time. Valid values: 1 to 1000. We recommend that you use the default value. you can use config.setMaxFetchLogGroupSize(100); to change the value.
            LogHubConfig config = new LogHubConfig(sConsumerGroup, "consumer_1", sEndpoint, sProject, sLogstore, sAccessKeyId, sAccessKey, LogHubConfig.ConsumePosition.BEGIN_CURSOR);
            ClientWorker worker = new ClientWorker(new SampleLogHubProcessorFactory(), config);
            Thread thread = new Thread(worker);
            // After you execute the thread, the ClientWorker instance automatically runs and extends the Runnable interface. 
            thread.start();
            Thread.sleep(60 * 60 * 1000);
            // The shutdown function of the ClientWorker instance is called to exit the consumption instance. The associated thread is automatically stopped. 
            worker.shutdown();
            // Multiple asynchronous tasks are generated when the ClientWorker instance is running. To ensure that all running tasks exit after the instance is shut down, we recommend that you set Thread.sleep to 30 seconds. 
            Thread.sleep(30 * 1000);
        }
    }
  3. Create a file named SampleLogHubProcessor.java.
    import com.aliyun.openservices.log.common.FastLog;
    import com.aliyun.openservices.log.common.FastLogContent;
    import com.aliyun.openservices.log.common.FastLogGroup;
    import com.aliyun.openservices.log.common.FastLogTag;
    import com.aliyun.openservices.log.common.LogGroupData;
    import com.aliyun.openservices.loghub.client.ILogHubCheckPointTracker;
    import com.aliyun.openservices.loghub.client.exceptions.LogHubCheckPointException;
    import com.aliyun.openservices.loghub.client.interfaces.ILogHubProcessor;
    import com.aliyun.openservices.loghub.client.interfaces.ILogHubProcessorFactory;
    
    import java.util.List;
    
    public class SampleLogHubProcessor implements ILogHubProcessor {
        private int shardId;
        // The time when the last persistent checkpoint was saved. 
        private long mLastCheckTime = 0;
    
        public void initialize(int shardId) {
            this.shardId = shardId;
        }
    
        // The main logic of data consumption. You must include the code to handle all exceptions that may occur during data consumption. 
        public String process(List<LogGroupData> logGroups,
                              ILogHubCheckPointTracker checkPointTracker) {
            // Display the data that you obtained. 
            for (LogGroupData logGroup : logGroups) {
                FastLogGroup flg = logGroup.GetFastLogGroup();
                System.out.println(String.format("\tcategory\t:\t%s\n\tsource\t:\t%s\n\ttopic\t:\t%s\n\tmachineUUID\t:\t%s",
                        flg.getCategory(), flg.getSource(), flg.getTopic(), flg.getMachineUUID()));
                System.out.println("Tags");
                for (int tagIdx = 0; tagIdx < flg.getLogTagsCount(); ++tagIdx) {
                    FastLogTag logtag = flg.getLogTags(tagIdx);
                    System.out.println(String.format("\t%s\t:\t%s", logtag.getKey(), logtag.getValue()));
                }
                for (int lIdx = 0; lIdx < flg.getLogsCount(); ++lIdx) {
                    FastLog log = flg.getLogs(lIdx);
                    System.out.println("--------\nLog: " + lIdx + ", time: " + log.getTime() + ", GetContentCount: " + log.getContentsCount());
                    for (int cIdx = 0; cIdx < log.getContentsCount(); ++cIdx) {
                        FastLogContent content = log.getContents(cIdx);
                        System.out.println(content.getKey() + "\t:\t" + content.getValue());
                    }
                }
            }
            long curTime = System.currentTimeMillis();
            // Write a checkpoint to the server every 30 seconds. If the ClientWorker instance unexpectedly stops within 30 seconds, a newly started ClientWorker instance continues to consume data from the last checkpoint. A small amount of data may be repeatedly consumed. 
            if (curTime - mLastCheckTime > 30 * 1000) {
                try {
                    // If you set the parameter to true, checkpoints are immediately synchronized to the server. If you set the parameter to false, checkpoints are locally cached. The default value of a synchronization interval is 60 seconds. 
                    checkPointTracker.saveCheckPoint(true);
                } catch (LogHubCheckPointException e) {
                    e.printStackTrace();
                }
                mLastCheckTime = curTime;
            }
            return null;
        }
    
        // The ClientWorker instance calls this function when the instance exits. You can delete the checkpoints. 
        public void shutdown(ILogHubCheckPointTracker checkPointTracker) {
            // Save checkpoints to the server. 
            try {
                checkPointTracker.saveCheckPoint(true);
            } catch (LogHubCheckPointException e) {
                e.printStackTrace();
            }
        }
    }
    
    class SampleLogHubProcessorFactory implements ILogHubProcessorFactory {
        public ILogHubProcessor generatorProcessor() {
            // Generate a consumption instance. 
            return new SampleLogHubProcessor();
        }
    }
    For more information, see Java, C++, Python, and Go.
  4. Run Main.java.
    In this example, NGINX logs are consumed and the consumption result is displayed.
    :    GET
    request_uri    :    /request/path-3/file-7
    status    :    200
    body_bytes_sent    :    3820
    host    :    www.example.com
    request_time    :    43
    request_length    :    1987
    http_user_agent    :    Mozilla/5.0 (Windows NT 6.1) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/41.0.2228.0 Safari/537.36
    http_referer    :    www.example.com
    http_x_forwarded_for    :    192.168.10.196
    upstream_response_time    :    0.02
    --------
    Log: 158, time: 1635629778, GetContentCount: 14
    ......
        category    :    null
        source    :    127.0.0.1
        topic    :    nginx_access_log
        machineUUID    :    null
    Tags
        __receive_time__    :    1635629815
    --------
    Log: 0, time: 1635629788, GetContentCount: 14
    ......
        category    :    null
        source    :    127.0.0.1
        topic    :    nginx_access_log
        machineUUID    :    null
    Tags
        __receive_time__    :    1635629877
    --------
    ......

Use the Log Service console to view the status of a consumer group

  1. Log on to the Log Service console.
  2. In the Projects section, click the name of the project that you want to view.
  3. Click the Show icon icon of the Logstore that you want to view and click Data Consumption.
  4. Click a consumer group to view the data consumption progress of each shard.

Use code to view the status of a consumer group

You can use an SDK to view the data consumption progress. In this example, Log Service SDK for Java is used, and ConsumerGroupTest.java is executed.
import java.util.List;
import com.aliyun.openservices.log.Client;
import com.aliyun.openservices.log.common.Consts.CursorMode;
import com.aliyun.openservices.log.common.ConsumerGroup;
import com.aliyun.openservices.log.common.ConsumerGroupShardCheckPoint;
import com.aliyun.openservices.log.exception.LogException;
public class ConsumerGroupTest {
    static String endpoint = "";
    static String project = "";
    static String logstore = "";
    static String accesskeyId = "";
    static String accesskey = "";
    public static void main(String[] args) throws LogException {
        Client client = new Client(endpoint, accesskeyId, accesskey);
        // Obtain all consumer groups that are created for the Logstore. If no consumer group exists, an empty string is returned. 
        List<ConsumerGroup> consumerGroups = client.ListConsumerGroup(project, logstore).GetConsumerGroups();
        for(ConsumerGroup c: consumerGroups){
            // Display the properties of consumer groups. The properties of a consumer group include the name, heartbeat timeout period, and whether the consumer group consumes data in order. 
            System.out.println("Name: " + c.getConsumerGroupName());
            System.out.println("Heartbeat timeout period: " + c.getTimeout());
            System.out.println("Ordered consumption: " + c.isInOrder());
            for(ConsumerGroupShardCheckPoint cp: client.GetCheckPoint(project, logstore, c.getConsumerGroupName()).GetCheckPoints()){
                System.out.println("shard: " + cp.getShard());
                // The consumption time. The time is a long integer and is accurate to microseconds. 
                System.out.println("The time when data was last consumed: " + cp.getUpdateTime());
                System.out.println("Consumer name: " + cp.getConsumer());
                String consumerPrg = "";
                if(cp.getCheckPoint().isEmpty())
                    consumerPrg = "Consumption is not started";
                else{
                    // The UNIX timestamp. Unit: seconds. Format the output value of the timestamp. 
                    try{
                        int prg = client.GetPrevCursorTime(project, logstore, cp.getShard(), cp.getCheckPoint()).GetCursorTime();
                        consumerPrg = "" + prg;
                    }
                    catch(LogException e){
                        if(e.GetErrorCode() == "InvalidCursor")
                            consumerPrg = "Invalid. The previous point in time when data was consumed is out of the retention period of the data in the Logstore";
                        else{
                            //internal server error
                            throw e;
                        }
                    }
                }
                System.out.println("Consumption progress: " + consumerPrg);
                String endCursor = client.GetCursor(project, logstore, cp.getShard(), CursorMode.END).GetCursor();
                int endPrg = 0;
                try{
                    endPrg = client.GetPrevCursorTime(project, logstore, cp.getShard(), endCursor).GetCursorTime();
                }
                catch(LogException e){
                    //do nothing
                }
                // The UNIX timestamp. Unit: seconds. Format the output value of the timestamp. 
                System.out.println("The time when the last data record was received: " + endPrg);
            }
        }
    }
}
The following information is returned:
Name: etl-6cac01c571d5a4b933649c04a7ba215b
Heartbeat timeout period: 60
Ordered consumption: false
shard: 0
The time when data was last consumed: 1639555453575211
Consumer name: etl-356464787983a3d17086a9797e3d5f0e6959b066-256521
Consumption progress: 1639555453
The time when the last data record was received: 1639555453
shard: 1
The time when data was last consumed: 1639555392071328
Consumer name: etl-356464787983a3d17086a9797e3d5f0e6959b066-256521
Consumption progress: 1639555391
The time when the last data record was received: 1639555391
Name: etl-2bd3fdfdd63595d56b1ac24393bf5991
Heartbeat timeout period: 60
Ordered consumption: false
shard: 0
The time when data was last consumed: 1639555453256773
Consumer name: etl-532719dd2f198b3878ee3c6dfc80aeffb39ee48b-061390
Consumption progress: 1639555453
The time when the last data record was received: 1639555453
shard: 1
The time when data was last consumed: 1639555392066234
Consumer name: etl-532719dd2f198b3878ee3c6dfc80aeffb39ee48b-061390
Consumption progress: 1639555391
The time when the last data record was received: 1639555391
Name: consumerGroupX
Heartbeat timeout period: 60
Ordered consumption: false
shard: 0
The time when data was last consumed: 1639555434142879
Consumer name: consumer_1
Consumption progress: 1635615029
The time when the last data record was received: 1639555453
shard: 1
The time when data was last consumed: 1639555437976929
Consumer name: consumer_1
Consumption progress: 1635616802
The time when the last data record was received: 1639555391

Related operations

  • Configure Log4j for troubleshooting.
    We recommend that you configure Log4j for the consumer program to return error messages in consumer groups. This way, you can handle exceptions at the earliest opportunity. The following code shows a configuration file of log4j.properties:
    log4j.rootLogger = info,stdout
    log4j.appender.stdout = org.apache.log4j.ConsoleAppender
    log4j.appender.stdout.Target = System.out
    log4j.appender.stdout.layout = org.apache.log4j.PatternLayout
    log4j.appender.stdout.layout.ConversionPattern = [%-5p] %d{yyyy-MM-dd HH:mm:ss,SSS} method:%l%n%m%n
    After you configure Log4j, you can view the information of exceptions that occur when you run the consumer program. The following example shows an error message:
    [WARN ] 2018-03-14 12:01:52,747 method:com.aliyun.openservices.loghub.client.LogHubConsumer.sampleLogError(LogHubConsumer.java:159)
    com.aliyun.openservices.log.exception.LogException: Invalid loggroup count, (0,1000]
  • Use a consumer group to consume data that is generated from a certain point in time.
    // consumerStartTimeInSeconds specifies that the data generated after the point in time is consumed. 
    public LogHubConfig(String consumerGroupName, 
                          String consumerName, 
                          String loghubEndPoint,
                          String project, String logStore,
                          String accessId, String accessKey,
                          int consumerStartTimeInSeconds);
    
    // The value of the position parameter is an enumeration variable. LogHubConfig.ConsumePosition.BEGIN_CURSOR specifies that the consumption starts from the earliest data. LogHubConfig.ConsumePosition.END_CURSOR specifies that the consumption starts from the latest data. 
    public LogHubConfig(String consumerGroupName, 
                          String consumerName, 
                          String loghubEndPoint,
                          String project, String logStore,
                          String accessId, String accessKey,
                          ConsumePosition position);
    Note
    • You can use different constructors based on your business requirements.
    • If a checkpoint is stored on the server, data consumption starts from this checkpoint.
  • Reset a checkpoint.
    public static void updateCheckpoint() throws Exception {
            Client client = new Client(host, accessId, accessKey);
            long timestamp = Timestamp.valueOf("2017-11-15 00:00:00").getTime() / 1000;
            ListShardResponse response = client.ListShard(new ListShardRequest(project, logStore));
            for (Shard shard : response.GetShards()) {
                int shardId = shard.GetShardId();
                String cursor = client.GetCursor(project, logStore, shardId, timestamp).GetCursor();
                client.UpdateCheckPoint(project, logStore, consumerGroup, shardId, cursor);
            }
        }

Authorize a RAM user to access consumer groups

Before you use a RAM user to access consumer groups, you must grant the required permissions to the RAM user. For more information, see Step 2: Grant permissions to the RAM user.

The following table describes the actions that a RAM user can perform.
Action Description Resource
log:GetCursorOrData (GetCursor and PullLogs) Obtains a cursor based on the point in time when log data is generated. acs:log:${regionName}:${projectOwnerAliUid}:project/${projectName}/logstore/${logstoreName}
log:CreateConsumerGroup Creates a consumer group in a specified Logstore. acs:log:${regionName}:${projectOwnerAliUid}:project/${projectName}/logstore/${logstoreName}/consumergroup/${consumerGroupName}
log:ListConsumerGroup Queries all consumer groups of a specified Logstore. acs:log:${regionName}:${projectOwnerAliUid}:project/${projectName}/logstore/${logstoreName}/consumergroup/*
log:ConsumerGroupUpdateCheckPoint Updates the consumption checkpoint for a shard of a specified consumer group. acs:log:${regionName}:${projectOwnerAliUid}:project/${projectName}/logstore/${logstoreName}/consumergroup/${consumerGroupName}
log:ConsumerGroupHeartBeat Sends a heartbeat packet for a consumer to Log Service. acs:log:${regionName}:${projectOwnerAliUid}:project/${projectName}/logstore/${logstoreName}/consumergroup/${consumerGroupName}
log:UpdateConsumerGroup Modifies the properties of a specified consumer group. acs:log:${regionName}:${projectOwnerAliUid}:project/${projectName}/logstore/${logstoreName}/consumergroup/${consumerGroupName}
log:GetConsumerGroupCheckPoint Retrieves the consumption checkpoints for one or all shards of a specified consumer group. acs:log:${regionName}:${projectOwnerAliUid}:project/${projectName}/logstore/${logstoreName}/consumergroup/${consumerGroupName}
For example, the project-test project resides in the China (Hangzhou) region. The ID of the Alibaba Cloud account to which the project belongs is 174649****602745. The name of the Logstore from which you want to consume log data is logstore-test, and the consumer group name is consumergroup-test. To allow a RAM user to access the consumer group, you must grant the following permissions to the RAM user:
{
  "Version": "1",
  "Statement": [
    {
      "Effect": "Allow",
      "Action": [
        "log:GetCursorOrData"
      ],
      "Resource": "acs:log:cn-hangzhou:174649****602745:project/project-test/logstore/logstore-test"
    },
    {
      "Effect": "Allow",
      "Action": [
        "log:CreateConsumerGroup",
        "log:ListConsumerGroup"
      ],
      "Resource": "acs:log:cn-hangzhou:174649****602745:project/project-test/logstore/logstore-test/consumergroup/*"
    },
    {
      "Effect": "Allow",
      "Action": [
        "log:ConsumerGroupUpdateCheckPoint",
        "log:ConsumerGroupHeartBeat",
        "log:UpdateConsumerGroup",
        "log:GetConsumerGroupCheckPoint"
      ],
      "Resource": "acs:log:cn-hangzhou:174649****602745:project/project-test/logstore/logstore-test/consumergroup/consumergroup-test"
    }
  ]
}