Consumer library

Last Updated: Jul 05, 2017

View the consumption progress on the console

For more details of viewing the consumption progress on the console, please refer to Document description.

By default, Spark Streaming and Storm spout use a consumer library to consume LogHub data.

Application scenarios

The LogHub Consumer Library is an advanced mode of automatic shard allocation when multiple LogHub consumers simultaneously consume Logstores.

For example, the LogHub Consumer Library automatically handles the shard load balancing logic and consumer failover among multiple consumers in Storm and Spark. You can then focus on your business development without concern for shard allocation, checkpoints, and failover.

For example, three consumption instances A, B, and C are enabled for Stream Compute through Storm. When there are 10 shards, the system allocates three shards each to A and B and four shards to C. Then,

  • If A is down, the system allocates the data not consumed by A to B and C through load balancing mode. When A recovers, the system balances out the loads of A, B, and C.
  • If two consumption instances named D and E are added, the system performs load balancing to ensure that each instance consumes two shards.
  • If shards are merged or split, the system performs load balancing to account for the merged or split shards.
  • After read-only shards are consumed, the system performs load balancing to account for the remaining shards.

No missing or duplicate data occurs in the preceding processes. You only need to complete the following coding:

  1. Set configuration parameters.
  2. Write the log processing code.
  3. Enable consumption instances.

The LogHub Consumer Library allows you to focus on data processing without concern for issues such as load balancing, consumption breakpoint saving, sequential consumption, and consumption exception handling. Therefore, the LogHub Consumer Library is highly recommended for data consumption.

Terms

The LogHub Consumer Library has four important concepts: consumer group, consumer, heartbeat, and checkpoint. Their relationships are described as follows.

1

  • consumer group

    Consumer groups are sub-resources of Logstores. Consumers with the same consumer group name consume data from the same Logstore, but the data consumed by each consumer is different. Up to five consumer groups can be created under one Logstore, but the group names must be unique within the Logstore. Different consumer groups under the same Logstore consume data independently.

    1. {
    2. "order":boolean,
    3. "timeout": integer
    4. }
    • order: Indicates whether to consume data with identical keys sequentially based on the write time.
    • timeout: Indicates the timeout time (measured in seconds) for consumers in a consumer group. If a consumer does not send a heartbeat packet within the required timeout time, the consumer is considered to be timed out and Log Service determines that the consumer will go offline.
  • consumer

    Each consumer is allocated with several shards and can consume data in these shards. Consumers in the same consumer group must have unique names.

  • heartbeat

    Consumers must periodically send keep-alive heartbeat packets to Log Service.

  • checkpoint

    Consumers must periodically save the endpoint of their shard consumption to Log Service. After a shard is transferred from one consumer to another, the target consumer obtains the shard consumption breakpoint from Log Service and consumes data starting from the breakpoint.

LogHub Consumer Library usage

  1. Implement the following two interface classes of the LogHub Consumer Library:

    • ILogHubProcessor: Each shard corresponds to an instance, and each instance consumes the data of a specific shard.
    • ILogHubProcessorFactory: Generates the instance implementing the ILogHubProcessor interface.
  2. Set parameters.

  3. Enable one or more ClientWorker instances.

Maven address

  1. <dependency>
  2. <groupId>com.aliyun.openservices</groupId>
  3. <artifactId>loghub-client-lib</artifactId>
  4. <version>0.6.5</version>
  5. </dependency>

Example

Main function

  1. public static void main(String args[])
  2. {
  3. LogHubConfig config = new LogHubConfig(...);
  4. ClientWorker worker = new ClientWorker(new SampleLogHubProcessorFactory(), config);
  5. Thread thread = new Thread(worker);
  6. //The ClientWorker instance runs automatically after the thread is executed and extends the Runnable interface.
  7. thread.start();
  8. Thread.sleep(60 * 60 * 1000);
  9. //The shutdown function of the ClientWorker instance is called to exit the consumption instance. The associated thread is stopped automatically.
  10. worker.shutdown();
  11. //Multiple asynchronous tasks are generated when the ClientWorker instance is running. You are advised to wait 30s until all tasks are exited after shutdown.
  12. Thread.sleep(30 * 1000);
  13. }

ILogHubProcessor and ILogHubProcessorFactory implementation samples

ILogHubProcessor indicates the consumption instance corresponding to a specific shard. Note the data consumption logic during the development process. The same ClientWorker instance consumes data serially, and only one ILogHubProcessor instance is generated. The ClientWorker instance calls the shutdown function of ILogHubProcessor upon exit.

  1. public class SampleLogHubProcessor implements ILogHubProcessor
  2. {
  3. private int mShardId;
  4. // Records the last persistent checkpoint time.
  5. private long mLastCheckTime = 0;
  6. public void initialize(int shardId)
  7. {
  8. mShardId = shardId;
  9. }
  10. // Master logic of data consumption
  11. public String process(List<LogGroupData> logGroups,
  12. ILogHubCheckPointTracker checkPointTracker)
  13. {
  14. for(LogGroupData logGroup: logGroups)
  15. {
  16. LogGroup lg = logGroup.GetLogGroup();
  17. System.out.println("source ip:" + lg.getSource());
  18. System.out.println("topic: " + lg.getTopic());
  19. for(Log log: lg.getLogsList())
  20. {
  21. StringBuilder content = new StringBuilder();
  22. content.append(log.getTime() + "\t");
  23. for(Content cont: log.getContentsList())
  24. {
  25. content.append(cont.getKey() + "=" + cont.getValue()+ "\t");
  26. }
  27. System.out.println(content.toString());
  28. }
  29. }
  30. long curTime = System.currentTimeMillis();
  31. // Writes checkpoints to the Log Service every 60s. If a ClientWorker instance crashes during the 60s period,
  32. // the new ClientWorker instance consumes data starting from the last checkpoint. Duplicate data may exist.
  33. if (curTime - mLastCheckTime > 60 * 1000)
  34. {
  35. try
  36. {
  37. //When the parameter is set to true, checkpoints are immediately updated to Log Service. When the parameter is set to false, checkpoints are locally cached. The default update interval is 60s.
  38. //The background updates checkpoints to Log Service.
  39. checkPointTracker.saveCheckPoint(true);
  40. }
  41. catch (LogHubCheckPointException e)
  42. {
  43. e.printStackTrace();
  44. }
  45. mLastCheckTime = curTime;
  46. }
  47. else
  48. {
  49. try
  50. {
  51. checkPointTracker.saveCheckPoint(false);
  52. }
  53. catch (LogHubCheckPointException e)
  54. {
  55. e.printStackTrace();
  56. }
  57. }
  58. // "null" indicates that data is properly processed. If you need to roll back to the last checkpoint for retry, you can return checkPointTracker.getCheckpoint().
  59. return null;
  60. }
  61. // The ClientWorker instance calls this function upon exit, during which you can perform cleanup.
  62. public void shutdown(ILogHubCheckPointTracker checkPointTracker)
  63. {
  64. //Saves the consumption breakpoint to Log Service.
  65. try {
  66. checkPointTracker.saveCheckPoint(true);
  67. } catch (LogHubCheckPointException e) {
  68. e.printStackTrace();
  69. }
  70. }
  71. }

ILogHubProcessorFactory is used to generate ILogHubProcessor.

  1. public class SampleLogHubProcessorFactory implements ILogHubProcessorFactory
  2. {
  3. public ILogHubProcessor generatorProcessor()
  4. {
  5. // Generates a consumption instance.
  6. return new SampleLogHubProcessor();
  7. }
  8. }

Configuration

  1. public class LogHubConfig
  2. {
  3. //Default interval when the ClientWorker instance pulls data.
  4. public static final long DEFAULT_DATA_FETCH_INTERVAL_MS = 200;
  5. //Name of a consumer group. The name can be 3 to 63 characters in length, and can include English letters, digits, underscores (_), and hyphens (-). It cannot be null and must begin and end with lowercase letters or numbers.
  6. private String mConsumerGroupName;
  7. //Name of a consumer. Consumers in the same consumer group must have unique names.
  8. private String mWorkerInstanceName;
  9. //Address of the LogHub data interface.
  10. private String mLogHubEndPoint;
  11. //Project name
  12. private String mProject;
  13. //LogStore name
  14. private String mLogStore;
  15. //Access key ID of the cloud account.
  16. private String mAccessId;
  17. //Access key of the cloud account.
  18. private String mAccessKey;
  19. //Indicates the consumption start point of a shard when the shard's checkpoint is not recorded in Log Service. The value does not take effect if Log Service records valid checkpoint information. mCursorPosition can be set to BEGIN_CURSOR, END_CURSOR, or SPECIAL_TIMER_CURSOR. BEGIN_CURSOR indicates that consumption starts from the first data entry of the shard. END_CURSOR indicates that consumption starts from the last data entry at the current time. SPECIAL_TIMER_CURSOR is used in conjunction with mLoghubCursorStartTime to indicate a specific data consumption start time.
  20. private LogHubCursorPosition mCursorPosition;
  21. //When mCursorPosition is set to SPECIAL_TIMER_CURSOR, data consumption will start at a specific time measured in seconds.
  22. private int mLoghubCursorStartTime = 0;
  23. // The interval (measured in milliseconds) of LogHub data acquisition by polling. The smaller the interval, the faster data is extracted. The default value is DEFAULT_DATA_FETCH_INTERVAL_MS. It is recommended to set the interval to a value greater than 200 ms.
  24. private long mDataFetchIntervalMillis;
  25. // The interval (measured in milliseconds) when the ClientWorker instance sends heartbeat packets to Log Service. The recommended value is 10,000 ms.
  26. private long mHeartBeatIntervalMillis;
  27. //Sequential consumption or not.
  28. private boolean mConsumeInOrder;
  29. }

Important notes

  • consumerGroupName in LogHubConfig indicates a consumer group. Consumers with the same consumerGroupName are differentiated by workerInstance name. They consume the shards of the same Logstore.

    1. Assume that a LogStore has four shards numbered from 0 to 3.
    2. There are three ClientWorker instances with the following consumerGroupName and workerinstance name settings:
    3. <consumer_group_name_1 , worker_A>,
    4. <consumer_group_name_1 , worker_B>,
    5. <consumer_group_name_2 , worker_C>
    6. The ClientWorker instances are allocated with the following shards:
    7. <consumer_group_name_1 , worker_A>: shard_0, shard_1
    8. <consumer_group_name_1 , worker_B>: shard_2, shard_3
    9. <consumer_group_name_2 , worker_C>: Shard_0, shard_1, shard_2, shard_3 # The ClientWorker instances with different consumer group names consume data independently.
  • Ensure that the ILogHubProcessor process () interface is correctly implemented and exited.

  • The saveCheckPoint () interface of ILogHubCheckPointTracker indicates that the data processing is complete, regardless of whether the transferred parameter is set to true or false. If the parameter is set to true, checkpoint persistency is implemented immediately in Log Service. If the parameter is set to false, checkpoints are synchronized to Log Service every 60s.

  • RAM authorization is required if the access key ID and access key secret of a sub-account are configured in LogHubConfig.

Action Resource
log:GetCursorOrData acs:log:${regionName}:${projectOwnerAliUid}:project/${projectName}/logstore/${logstoreName}
log:CreateConsumerGroup acs:log:${regionName}:${projectOwnerAliUid}:project/${projectName}/logstore/${logstoreName}/consumergroup/*
log:ListConsumerGroup acs:log:${regionName}:${projectOwnerAliUid}:project/${projectName}/logstore/${logstoreName}/consumergroup/*
log:ConsumerGroupUpdateCheckPoint acs:log:${regionName}:${projectOwnerAliUid}:project/${projectName}/logstore/${logstoreName}/consumergroup/${consumerGroupName}
log:ConsumerGroupHeartBeat acs:log:${regionName}:${projectOwnerAliUid}:project/${projectName}/logstore/${logstoreName}/consumergroup/${consumerGroupName}
log:GetConsumerGroupCheckPoint acs:log:${regionName}:${projectOwnerAliUid}:project/${projectName}/logstore/${logstoreName}/consumergroup/${consumerGroupName}
Thank you! We've received your feedback.