Consumer group - Usage

Last Updated: Mar 23, 2018


The consumer library is an advanced mode of log consumption in Log Service, and provides the consumer group concept to abstract and manage the consumption end. Compared with using SDKs directly to read data, you can only focus on the business logic by using the consumer library, without caring about the implementation details of Log Service, or the load balancing or failover between consumers.

Spark Streaming, Storm, and Flink connector use consumer library as the base implementation.


You must understand two concepts before using the consumer library: consumer group and consumer.

  • Consumer group

    A consumer group is composed of multiple consumers. Consumers in the same consumer group consume the data in the same Logstore and the data consumed by each consumer is different.

  • Consumer

    Consumers, as a unit that composes the consumer group, must consume data. The names of consumers in the same consumer group must be unique.

In Log Service, a Logstore can have multiple shards. The consumer library is used to allocate a shard to the consumers in a consumer group. The allocation rules are as follows:

  • Each shard can only be allocated to one consumer.
  • One consumer can have multiple shards at the same time.

After a new consumer is added to a consumer group, the affiliations of the shards for this consumer group is adjusted to achieve the load balancing of consumption. However, the preceding allocation rules are not changed. The allocation process is transparent to users.

The consumer library can also save the checkpoint, which allows consumers to consume data starting from the breakpoint after the program fault is resolved and makes sure that the data is consumed only once.


  • Add maven dependency

    1. <dependency>
    2. <groupId></groupId>
    3. <artifactId>protobuf-java</artifactId>
    4. <version>2.5.0</version>
    5. </dependency>
    6. <dependency>
    7. <groupId>com.aliyun.openservices</groupId>
    8. <artifactId>aliyun-log</artifactId>
    9. <version>0.6.11</version>
    10. </dependency>
    11. <dependency>
    12. <groupId>com.aliyun.openservices</groupId>
    13. <artifactId>loghub-client-lib</artifactId>
    14. <version>0.6.15</version>
    15. </dependency>
  • file

    1. public class Main {
    2. // Enter the domain name of Log Service according to your actual situation.
    3. private static String sEndpoint = "";
    4. // Enter the project name of Log Service according to your actual situation.
    5. private static String sProject = "ali-cn-hangzhou-sls-admin";
    6. // Enter the Logstore name of Log Service according to your actual situation.
    7. private static String sLogstore = "sls_operation_log";
    8. // Enter the consumer group name according to your actual situation.
    9. private static String sConsumerGroup = "consumerGroupX";
    10. // Enter the AccessKey of data consumption according to your actual situation.
    11. private static String sAccessKeyId = "";
    12. private static String sAccessKey = "";
    13. public static void main(String []args) throws LogHubClientWorkerException, InterruptedException
    14. {
    15. // The second parameter is the consumer name. The consumer names in the same consumer group must be unique. However, the consumer group names can be duplicate. Different consumer names start multiple processes on multiple machines to consume a Logstore in a load balancing way. In this case, the consumer group names can be classified by machine IP address. The ninth parameter maxFetchLogGroupSize is the number of Logstores each time obtained from Log Service. Use the default value. If you must adjust the value, make sure the value range is (0,1000].
    16. LogHubConfig config = new LogHubConfig(sConsumerGroup, "consumer_1", sEndpoint, sProject, sLogstore, sAccessKeyId, sAccessKey, LogHubConfig.ConsumePosition.BEGIN_CURSOR);
    17. ClientWorker worker = new ClientWorker(new SampleLogHubProcessorFactory(), config);
    18. Thread thread = new Thread(worker);
    19. //The ClientWorker automatically runs after the thread is running and extends the Runnable API.
    20. thread.start();
    21. Thread.sleep(60 * 60 * 1000);
    22. //Call the Shutdown function of worker to exit the consumption instance. The associated thread is automatically stopped.
    23. worker.shutdown();
    24. //Multiple asynchronous tasks are generated when the ClientWorker is running. We recommend that you wait 30 seconds until the running tasks exit after the shutdown.
    25. Thread.sleep(30 * 1000);
    26. }
    27. }
  • file

    1. public class SampleLogHubProcessor implements ILogHubProcessor
    2. {
    3. private int mShardId;
    4. // Record the last persistent checkpoint time.
    5. private long mLastCheckTime = 0;
    6. public void initialize(int shardId)
    7. {
    8. mShardId = shardId;
    9. }
    10. // The main logic of data consumption. Catch all the exceptions but the caught exceptions cannot be thrown.
    11. public String process(List<LogGroupData> logGroups,
    12. ILogHubCheckPointTracker checkPointTracker)
    13. {
    14. // Print the obtained data.
    15. for(LogGroupData logGroup: logGroups){
    16. FastLogGroup flg = logGroup.GetFastLogGroup();
    17. System.out.println(String.format("\tcategory\t:\t%s\n\tsource\t:\t%s\n\ttopic\t:\t%s\n\tmachineUUID\t:\t%s",
    18. flg.getCategory(), flg.getSource(), flg.getTopic(), flg.getMachineUUID()));
    19. System.out.println("Tags");
    20. for (int tagIdx = 0; tagIdx < flg.getLogTagsCount(); ++tagIdx) {
    21. FastLogTag logtag = flg.getLogTags(tagIdx);
    22. System.out.println(String.format("\t%s\t:\t%s", logtag.getKey(), logtag.getValue()));
    23. }
    24. for (int lIdx = 0; lIdx < flg.getLogsCount(); ++lIdx) {
    25. FastLog log = flg.getLogs(lIdx);
    26. System.out.println("--------\nLog: " + lIdx + ", time: " + log.getTime() + ", GetContentCount: " + log.getContentsCount());
    27. for (int cIdx = 0; cIdx < log.getContentsCount(); ++cIdx) {
    28. FastLogContent content = log.getContents(cIdx);
    29. System.out.println(content.getKey() + "\t:\t" + content.getValue());
    30. }
    31. }
    32. }
    33. long curTime = System.currentTimeMillis();
    34. // Write checkpoint to Log Service every 30 seconds. If worker crashes within 30 seconds, the newly started worker consumes data starting from the last checkpoint. Slight duplicate data may exist.
    35. if (curTime - mLastCheckTime > 30 * 1000)
    36. {
    37. try
    38. {
    39. //The parameter true indicates to update the checkpoint to Log Service immediately. The parameter false indicates to cache the checkpoint to your local machine and refresh the cached checkpoint to Log Service every 60 seconds by default.
    40. checkPointTracker.saveCheckPoint(true);
    41. }
    42. catch (LogHubCheckPointException e)
    43. {
    44. e.printStackTrace();
    45. }
    46. mLastCheckTime = curTime;
    47. }
    48. return null;
    49. }
    50. // The worker calls this function upon exit. You can perform cleanup here.
    51. public void shutdown(ILogHubCheckPointTracker checkPointTracker)
    52. {
    53. //Save the consumption breakpoint to Log Service.
    54. try {
    55. checkPointTracker.saveCheckPoint(true);
    56. } catch (LogHubCheckPointException e) {
    57. e.printStackTrace();
    58. }
    59. }
    60. }
    61. class SampleLogHubProcessorFactory implements ILogHubProcessorFactory
    62. {
    63. public ILogHubProcessor generatorProcessor()
    64. {
    65. // Generate a consumption instance.
    66. return new SampleLogHubProcessor();
    67. }
    68. }

Run the preceding codes to print all the data in a Logstore. To allow multiple consumers to consume one Logstore, follow the program annotations to modify the program, use the same consumer group name and different consumer names, and start other consumption processes.

Limits and exception diagnosis

Each Logstore can create at most 10 consumer groups. The error ConsumerGroupQuotaExceed is reported when the number exceeds the limit.

We recommend that you configure Log4j for the consumer program, which is used to throw the errors occurred in the consumer group and locate the exceptions. Put the file to the resources directory and run the program, the following exception occurs:

  1. [WARN ] 2018-03-14 12:01:52,747 method:com.aliyun.openservices.loghub.client.LogHubConsumer.sampleLogError(
  2. com.aliyun.openservices.log.exception.LogException: Invalid loggroup count, (0,1000]

See the following configuration for reference:

  1. log4j.rootLogger = info,stdout
  2. log4j.appender.stdout = org.apache.log4j.ConsoleAppender
  3. log4j.appender.stdout.Target = System.out
  4. log4j.appender.stdout.layout = org.apache.log4j.PatternLayout
  5. log4j.appender.stdout.layout.ConversionPattern = [%-5p] %d{yyyy-MM-dd HH:mm:ss,SSS} method:%l%n%m%n

Status and alarm

  1. View the consumer group status in the console.
  2. View the latency of the consumer group and configure alarms by using CloudMonitor.

Advanced customization

Normal users can consume data by using the preceding program. This section introduces some advanced topics.

  • To consume data starting from a specified time

    The LogHubConfig in the preceding codes has two constructors:

    1. // The parameter consumerStartTimeInSeconds indicates the number of seconds since the year 1970, which means to consume data after this time point.
    2. public LogHubConfig(String consumerGroupName,
    3. String consumerName,
    4. String loghubEndPoint,
    5. String project, String logStore,
    6. String accessId, String accessKey,
    7. int consumerStartTimeInSeconds);
    8. // position is an enumeration variable. LogHubConfig.ConsumePosition.BEGIN_CURSOR indicates to consume from the earliest data. LogHubConfig.ConsumePosition.END_CURSOR indicates to consume from the latest data.
    9. public LogHubConfig(String consumerGroupName,
    10. String consumerName,
    11. String loghubEndPoint,
    12. String project, String logStore,
    13. String accessId, String accessKey,
    14. ConsumePosition position);

    You can use different construction methods according to the consumption requirements. However, the position to start the consumption is subject to the checkpoint saved in Log Service (if any).

  • Use Resource Access Management (RAM) sub-account to access Log Service

    Sub-accounts must configure the following RAM permissions related to the consumer group. For how to configure the permissions, see RAM document.

    Action Resource
  • Reset checkpoint

    In some scenarios (such as supplementary data and repetitive computation), you must set a consumer group point position to a time point, which allows the consumer group to start the consumption from a new location. Use the following two methods to do so:

    1. Delete the consumer group.
      • Stop the consumption program and delete the consumer group in the console.
      • Modify codes. Use specified time point to consume data and restart the program.
    2. Use SDKs to reset the consumer group to a time point.
      • Stop the consumption program.
      • Use SDKs to modify the point position and restart the consumption program.
    1. Client client = new Client(host, accessId, accessKey);
    2. long time_stamp = Timestamp.valueOf("2017-11-15 00:00:00").getTime() / 1000;
    3. ListShardResponse shard_res = client.ListShard(new ListShardRequest(project, logStore));
    4. ArrayList<Shard> all_shards = shard_res.GetShards();
    5. for (Shard shard: all_shards)
    6. {
    7. shardId = shard.GetShardId();
    8. long cursor_time = time_stamp;
    9. String cursor = client.GetCursor(project, logStore, shardId, cursor_time).GetCursor();
    10. client.UpdateCheckPoint(project, logStore, consumerGroup, shardId, cursor);
    11. }
Thank you! We've received your feedback.