The consumer group is an advanced mode of real-time data consumption, which provides multiple consumption instances for the automatic load balancing of Logstore consumption. Both Spark Streaming and Storm use consumer  group as the basic mode.

View consumption progress in the console

  1. Log on to the Log Service console.
  2. On the Project List page, click the project name.
  3. Click  LogHub - Consume  > Consumerin the left-side navigation pane.
  4. On the Consumer Groups page, select a Logstore to view whether or not the consumer group function is enabled or not.
    Figure 1. Consumer

  5. Click  Status at the right of the consumer group to view the data  consumption progress for each shard.
    Figure 2. Consumption status

    As shown in the preceding figure, the Logstore has  six shards and corresponds to three consumers.  The latest data consumption time for each consumer is shown under the second column. You can use the data consumption time to determine if the current data processing can keep up with data generation. If data processing severely lags behind (that is, data consumption is slower than data generation), we recommend that you increase the number of consumers.

Use APIs/SDKs to view consumption progress

The following commands use Java SDK as an example, which shows how to use APIs to obtain the consumption status:

package test;
import java.util.ArrayList;
import com.aliyun.openservices.log.Client;
import com.aliyun.openservices.log.common.Consts.CursorMode;
import com.aliyun.openservices.log.common.ConsumerGroup;
import com.aliyun.openservices.log.common.ConsumerGroupShardCheckPoint;
import com.aliyun.openservices.log.exception.LogException;
public class ConsumerGroupTest {
    static String endpoint = "";
    static String project = "";
    static String logstore = "";
    static String accesskeyId = "";
    static String accesskey = "";
    public static void main(String[] args) throws LogException {
        Client client = new Client(endpoint, accesskeyId, accesskey);
                //Retrieve all consumer groups in this Logstore. If no consumer group exists, the consumerGroups length is 0.
        ArrayList<ConsumerGroup> consumerGroups;
            consumerGroups = client.ListConsumerGroup(project, logstore). GetConsumerGroups();
        catch(LogException e){
            if(e.GetErrorCode() == "LogStoreNotExist")
                System.out.println("this logstore does not have any consumer group");
                //internal server error branch
        for(ConsumerGroup c: consumerGroups){
                        //Print consumer group properties, including names, heartbeat timeout, and whether or not the consumption is in order.
                       System.out.println("Name:" + c.getConsumerGroupName());
                        System.out.println("Heartbeat timeout:" + c.getTimeout());
                        System.out.println("Consumption in order" + c.isInOrder());
            for(ConsumerGroupShardCheckPoint cp: client.GetCheckPoint(project, logstore, c.getConsumerGroupName()). GetCheckPoints()){
                System.out.println("shard: " + cp.getShard());
                // Please format, this time returns the exact time to milliseconds, the length of the integer
                                //Format the returned time to be precise to milliseconds in the long integer.
                                System.out.println("Last data consumption time:" + cp.getUpdateTime());
                String consumerPrg = "";
                                        consumerPrg = "Consumption not started";
                                        //UNIX timestamp. Measured in seconds. Format the value upon output.
                        int prg = client.GetPrevCursorTime(project, logstore, cp.getShard(), cp.getCheckPoint()). GetCursorTime();
                        consumerPrg = "" + prg;
                    catch(LogException e){
                        if(e.GetErrorCode() == "InvalidCursor")
                                                        consumerPrg = "Invalid. The previous consumption time has exceeded the data lifecycle in the Logstore.";
                            //internal server error
                            throw e;
                                System.out.println("Consumption progress:" + consumerPrg);
                String endCursor = client.GetCursor(project, logstore, cp.getShard(), CursorMode.END). GetCursor();
                int endPrg = 0;
                    endPrg = client.GetPrevCursorTime(project, logstore, cp.getShard(), endCursor). GetCursorTime();
                catch(LogException e){
                    //do nothing
                                //UNIX timestamp. Measured in seconds. Format the value upon output.
                                System.out.println("The arrival time of the last piece of data:" + endPrg);