Consumer library

Last Updated: Nov 29, 2017

Consumer Library is an advanced mode of automatic shard allocation, and provide a concept of connsumerGroup to abstract and manage real time consumption. Spark Streaming and Storm use ConsumerGroup as the base model.

Basic concepts

The LogHub Consumer Library has four important concepts: consumer group, consumer, heartbeat, and checkpoint.

  • consumer group

    Consumer groups are sub-resources of Logstores. Consumers with the same consumer group name consume data from the same Logstore, but the data consumed by each consumer is different. Up to ten consumer groups can be created under one Logstore, but the group names must be unique within the Logstore. Different consumer groups under the same Logstore consume data independently.

    1. {
    2. "order":boolean,
    3. "timeout": integer
    4. }
    • order: Indicates whether to consume data with identical keys sequentially based on the write time.
    • timeout: Indicates the time-out time (measured in seconds) for consumers in a consumer group. If a consumer does not send a heartbeat packet within the required time-out time, the consumer is considered to be timed out and Log Service determines that the consumer will go offline.
  • consumer

    Each consumer is allocated with several shards and can consume data in these shards. Consumers in the same consumer group must have unique names.

  • heartbeat

    Consumers must periodically send keep-alive heartbeat packets to Log Service.

  • checkpoint

    Consumers must periodically save the endpoint of their shard consumption to Log Service. After a shard is transferred from one consumer to another, the target consumer obtains the shard consumption breakpoint from Log Service and consumes data starting from the breakpoint.

Stucture

Their relationships are described as follows.

1

Scenarios

The LogHub Consumer Library is an advanced mode of automatic shard allocation when multiple LogHub consumers simultaneously consume Logstores.

For example, the LogHub Consumer Library automatically handles the shard load balancing logic and consumer failover among multiple consumers in Storm and Spark. You can then focus on your business development without concern for shard allocation, checkpoints, and failover.

For example, three consumption instances A, B, and C are enabled for Stream Compute through Storm. When there are 10 shards, the system allocates three shards each to A and B and four shards to C. Then,

  • If A is down, the system allocates the data not consumed by A to B and C through load balancing mode. When A recovers, the system balances out the loads of A, B, and C.
  • If two consumption instances named D and E are added, the system performs load balancing to ensure that each instance consumes two shards.
  • If shards are merged or split, the system performs load balancing to account for the merged or split shards.
  • After read-only shards are consumed, the system performs load balancing to account for the remaining shards.

No missing or duplicate data occurs in the preceding processes. You only need to complete the following coding:

  1. Set configuration parameters.
  2. Write the log processing code.
  3. Enable consumption instances.

The LogHub Consumer Library allows you to focus on data processing without concern for issues such as load balancing, consumption breakpoint saving, sequential consumption, and consumption exception handling. Therefore, the LogHub Consumer Library is highly recommended for data consumption.

Best practice: Processing-Use ConsumerLib.

Status and alarm

Reset the consumption point

In some scenarios (fill data, repeat the calculation), we need to set a ConsumerGroup point to a certain point in time, so that the current consumer groups can start to consume from the new point. There are two ways:

  1. Delete consumer group
    • Delete consumer group on the console, and restart consumer group program
    • consumer group program start to consume from default starting point (configured by program)
  2. Reset the current consumer group to a certain point-in-time using SDK
    • The program and Java code example are as follows:
      1. Client client = new Client(host, accessId, accessKey);
      2. long time_stamp = Timestamp.valueOf("2017-08-15 00:00:00").getTime() / 1000;
      3. ListShardResponse shard_res = client.ListShard(new ListShardRequest(project, logStore));
      4. ArrayList<Shard> all_shards = shard_res.GetShards();
      5. for (Shard shard: all_shards)
      6. {
      7. shardId = shard.GetShardId();
      8. long cursor_time = time_stamp;
      9. String cursor = client.GetCursor(project, logStore, shardId, cursor_time).GetCursor();
      10. client.UpdateCheckPoint(project, logStore, consumerGroup, shardId, cursor);
      11. }

Usage Guidelines

Spark Streaming and Storm use ConsumerGroup as the base model. These clients support all features of Consumer Library.

Java: Based on Consumer Library interface

  1. Implement the following two interface classes of the LogHub Consumer Library:

    • ILogHubProcessor: Each shard corresponds to an instance, and each instance consumes the data of a specific shard.
    • ILogHubProcessorFactory: Generates the instance implementing the ILogHubProcessor interface.
  2. Set parameters.

  3. Enable one or more ClientWorker instances.

Maven address

  1. <dependency>
  2. <groupId>com.google.protobuf</groupId>
  3. <artifactId>protobuf-java</artifactId>
  4. <version>2.5.0</version>
  5. </dependency>
  6. <dependency>
  7. <groupId>com.aliyun.openservices</groupId>
  8. <artifactId>loghub-client-lib</artifactId>
  9. <version>0.6.8</version>
  10. </dependency>

Main function

  1. public static void main(String args[])
  2. {
  3. LogHubConfig config = new LogHubConfig(...);
  4. ClientWorker worker = new ClientWorker(new SampleLogHubProcessorFactory(), config);
  5. Thread thread = new Thread(worker);
  6. //The ClientWorker instance runs automatically after the thread is executed and extends the Runnable interface.
  7. thread.start();
  8. Thread.sleep(60 * 60 * 1000);
  9. //The shutdown function of the ClientWorker instance is called to exit the consumption instance. The associated thread is stopped automatically.
  10. worker.shutdown();
  11. //Multiple asynchronous tasks are generated when the ClientWorker instance is running. You are advised to wait 30s until all tasks are exited after shutdown.
  12. Thread.sleep(30 * 1000);
  13. }

ILogHubProcessor and ILogHubProcessorFactory implementation samples

ILogHubProcessor indicates the consumption instance corresponding to a specific shard. Note the data consumption logic during the development process. The same ClientWorker instance consumes data serially, and only one ILogHubProcessor instance is generated. The ClientWorker instance calls the shutdown function of ILogHubProcessor upon exit.

  1. public class SampleLogHubProcessor implements ILogHubProcessor
  2. {
  3. private int mShardId;
  4. // Records the last persistent checkpoint time.
  5. private long mLastCheckTime = 0;
  6. public void initialize(int shardId)
  7. {
  8. mShardId = shardId;
  9. }
  10. // Master logic of data consumption
  11. public String process(List<LogGroupData> logGroups,
  12. ILogHubCheckPointTracker checkPointTracker)
  13. {
  14. for(LogGroupData logGroup: logGroups){
  15. FastLogGroup flg = logGroup.GetFastLogGroup();
  16. System.out.println(String.format("\tcategory\t:\t%s\n\tsource\t:\t%s\n\ttopic\t:\t%s\n\tmachineUUID\t:\t%s",
  17. flg.getCategory(), flg.getSource(), flg.getTopic(), flg.getMachineUUID()));
  18. System.out.println("Tags");
  19. for (int tagIdx = 0; tagIdx < flg.getLogTagsCount(); ++tagIdx) {
  20. FastLogTag logtag = flg.getLogTags(tagIdx);
  21. System.out.println(String.format("\t%s\t:\t%s", logtag.getKey(), logtag.getValue()));
  22. }
  23. for (int lIdx = 0; lIdx < flg.getLogsCount(); ++lIdx) {
  24. FastLog log = flg.getLogs(lIdx);
  25. System.out.println("--------\nLog: " + lIdx + ", time: " + log.getTime() + ", GetContentCount: " + log.getContentsCount());
  26. for (int cIdx = 0; cIdx < log.getContentsCount(); ++cIdx) {
  27. FastLogContent content = log.getContents(cIdx);
  28. System.out.println(content.getKey() + "\t:\t" + content.getValue());
  29. }
  30. }
  31. }
  32. long curTime = System.currentTimeMillis();
  33. // Writes checkpoints to the Log Service every 60s. If a ClientWorker instance crashes during the 60s period,
  34. // the new ClientWorker instance consumes data starting from the last checkpoint. Duplicate data may exist.
  35. if (curTime - mLastCheckTime > 60 * 1000)
  36. {
  37. try
  38. {
  39. //When the parameter is set to true, checkpoints are immediately updated to Log Service. When the parameter is set to false, checkpoints are locally cached. The default update interval is 60s.
  40. //The background updates checkpoints to Log Service.
  41. checkPointTracker.saveCheckPoint(true);
  42. }
  43. catch (LogHubCheckPointException e)
  44. {
  45. e.printStackTrace();
  46. }
  47. mLastCheckTime = curTime;
  48. }
  49. else
  50. {
  51. try
  52. {
  53. checkPointTracker.saveCheckPoint(false);
  54. }
  55. catch (LogHubCheckPointException e)
  56. {
  57. e.printStackTrace();
  58. }
  59. }
  60. // "null" indicates that data is properly processed. If you need to roll back to the last checkpoint for retry, you can return checkPointTracker.getCheckpoint().
  61. return null;
  62. }
  63. // The ClientWorker instance calls this function upon exit, during which you can perform cleanup.
  64. public void shutdown(ILogHubCheckPointTracker checkPointTracker)
  65. {
  66. //Saves the consumption breakpoint to Log Service.
  67. try {
  68. checkPointTracker.saveCheckPoint(true);
  69. } catch (LogHubCheckPointException e) {
  70. e.printStackTrace();
  71. }
  72. }
  73. }

ILogHubProcessorFactory is used to generate ILogHubProcessor.

  1. public class SampleLogHubProcessorFactory implements ILogHubProcessorFactory
  2. {
  3. public ILogHubProcessor generatorProcessor()
  4. {
  5. // Generates a consumption instance.
  6. return new SampleLogHubProcessor();
  7. }
  8. }

Configure instructions

  1. public class LogHubConfig
  2. {
  3. //Default interval when the ClientWorker instance pulls data.
  4. public static final long DEFAULT_DATA_FETCH_INTERVAL_MS = 200;
  5. //Name of a consumer group. The name can be 3 to 63 characters in length, and can include English letters, digits, underscores (_), and hyphens (-). It cannot be null and must begin and end with lowercase letters or numbers.
  6. private String mConsumerGroupName;
  7. //Name of a consumer. Consumers in the same consumer group must have unique names.
  8. private String mWorkerInstanceName;
  9. //Address of the LogHub data interface.
  10. private String mLogHubEndPoint;
  11. //Project name
  12. private String mProject;
  13. //LogStore name
  14. private String mLogStore;
  15. //AccessKey ID of the cloud account.
  16. private String mAccessId;
  17. //AccessKey of the cloud account.
  18. private String mAccessKey;
  19. //Indicates the consumption start point of a shard when the shard's checkpoint is not recorded in Log Service. The value does not take effect if Log Service records valid checkpoint information. mCursorPosition can be set to BEGIN_CURSOR, END_CURSOR, or SPECIAL_TIMER_CURSOR. BEGIN_CURSOR indicates that consumption starts from the first data entry of the shard. END_CURSOR indicates that consumption starts from the last data entry at the current time. SPECIAL_TIMER_CURSOR is used in conjunction with mLoghubCursorStartTime to indicate a specific data consumption start time.
  20. private LogHubCursorPosition mCursorPosition;
  21. //When mCursorPosition is set to SPECIAL_TIMER_CURSOR, data consumption will start at a specific time measured in seconds.
  22. private int mLoghubCursorStartTime = 0;
  23. // The interval (measured in milliseconds) of LogHub data acquisition by polling. The smaller the interval, the faster data is extracted. The default value is DEFAULT_DATA_FETCH_INTERVAL_MS. We recommend to set the interval to a value greater than 200 ms.
  24. private long mDataFetchIntervalMillis;
  25. // The interval (measured in milliseconds) when the ClientWorker instance sends heartbeat packets to Log Service. The recommended value is 10,000 ms.
  26. private long mHeartBeatIntervalMillis;
  27. //Sequential consumption or not.
  28. private boolean mConsumeInOrder;
  29. }

Precautions

  • consumerGroupName in LogHubConfig indicates a consumer group. Consumers with the same consumerGroupName are differentiated by workerInstance name. They consume the shards of the same Logstore.

    1. Assume that a LogStore has four shards numbered from 0 to 3.
    2. There are three ClientWorker instances with the following consumerGroupName and workerinstance name settings:
    3. <consumer_group_name_1 , worker_A>
    4. <consumer_group_name_1 , worker_B>
    5. <consumer_group_name_2 , worker_C>
    6. The ClientWorker instances are allocated with the following shards:
    7. <consumer_group_name_1 , worker_A>: shard_0, shard_1
    8. <consumer_group_name_1 , worker_B>: shard_2, shard_3
    9. <consumer_group_name_2 , worker_C>: Shard_0, shard_1, shard_2, shard_3 # The ClientWorker instances with different consumer group names consume data independently.
  • Ensure that the ILogHubProcessor process () interface is correctly implemented and exited.

  • The saveCheckPoint () interface of ILogHubCheckPointTracker indicates that the data processing is complete, regardless of whether the transferred parameter is set to true or false. If the parameter is set to true, checkpoint persistency is implemented immediately in Log Service. If the parameter is set to false, checkpoints are synchronized to Log Service every 60s.

  • RAM authorization is required if the AccessKey ID and AccessKey secret of a sub-account are configured in LogHubConfig.

Action Resource
log:GetCursorOrData acs:log:${regionName}:${projectOwnerAliUid}:project/${projectName}/logstore/${logstoreName}
log:CreateConsumerGroup acs:log:${regionName}:${projectOwnerAliUid}:project/${projectName}/logstore/${logstoreName}/consumergroup/*
log:ListConsumerGroup acs:log:${regionName}:${projectOwnerAliUid}:project/${projectName}/logstore/${logstoreName}/consumergroup/*
log:ConsumerGroupUpdateCheckPoint acs:log:${regionName}:${projectOwnerAliUid}:project/${projectName}/logstore/${logstoreName}/consumergroup/${consumerGroupName}
log:ConsumerGroupHeartBeat acs:log:${regionName}:${projectOwnerAliUid}:project/${projectName}/logstore/${logstoreName}/consumergroup/${consumerGroupName}
log:GetConsumerGroupCheckPoint acs:log:${regionName}:${projectOwnerAliUid}:project/${projectName}/logstore/${logstoreName}/consumergroup/${consumerGroupName}

Python: Developed based on Consumer Library interface

Thank you! We've received your feedback.