If you use consumer groups to consume data, you do not need to consider factors such as Log Service implementation, load balancing among consumers, or failovers that may occur. You need to only focus on business logic during log data consumption.
Prerequisites
An SDK development environment is set up. For more information, see Overview of Log Service SDK.Terms
Term | Description |
---|---|
consumer group | You can use consumer groups to consume data in Log Service. A consumer group consists of multiple consumers. All consumers in a consumer group consume data in a Logstore. However, consumers do not repeatedly consume data. Important You can create up to 30 consumer groups for a Logstore. |
consumer | The consumers in a consumer group consume data. Important The names of the consumers in a consumer group must be unique. |
Logstore | A Logstore is used to collect, store, and query data. For more information, see Logstore. |
Shard | A shard is used to control the read and write capacity of a Logstore. In Log Service, data is stored in shards. For more information, see Shard. |
Checkpoint | A checkpoint is the position at which a program stops consuming data. If the program is restarted, the program consumes data from the last checkpoint. Note If you use consumer groups to consume data, Log Service stores checkpoints. Consumers can resume data consumption from a checkpoint without repeatedly consuming data after a program recovers. |
Allocation rules
- Each shard can be allocated to only one consumer.
- A consumer can consume data from multiple shards.
After a new consumer is added to a consumer group, shards that are allocated to the consumers in the consumer group are reallocated to each consumer for load balancing. The shards are reallocated based on the preceding rules.
Step 1: Consume data
You can use Log Service SDK for Java, C++, Python, or Go to create consumer groups and consume data. In the following example, Log Service SDK for Java is used.
- Add Maven dependencies. Open the pom.xml file in the root directory of the Java project and add the following code:
<dependency> <groupId>com.google.protobuf</groupId> <artifactId>protobuf-java</artifactId> <version>2.5.0</version> </dependency> <dependency> <groupId>com.aliyun.openservices</groupId> <artifactId>loghub-client-lib</artifactId> <version>0.6.33</version> </dependency>
- Create a Main.java file.
import com.aliyun.openservices.loghub.client.ClientWorker; import com.aliyun.openservices.loghub.client.config.LogHubConfig; import com.aliyun.openservices.loghub.client.exceptions.LogHubClientWorkerException; public class Main { // The Log Service endpoint. Enter an endpoint based on your business requirements. For more information, see Endpoints. private static String Endpoint = "cn-hangzhou.log.aliyuncs.com"; // The name of the Log Service project. Enter a name based on your business requirements. You must enter the name of an existing project. private static String Project = "ali-cn-hangzhou-sls-admin"; // The name of the Logstore. Enter a name based on your business requirements. You must enter the name of an existing Logstore. private static String Logstore = "sls_operation_log"; // The name of the consumer group. Enter a name based on your business requirements. You do not need to create a consumer group in advance. A consumer group is automatically created when a program runs. private static String ConsumerGroup = "consumerGroupX"; // The AccessKey ID and AccessKey secret of your Alibaba Cloud account. Enter an AccessKey ID and AccessKey secret based on your business requirements. For more information, see AccessKey pair. private static String AccessKeyId= "LTAI5t****"; private static String AccessKeySecret = "w0cRy0****"; public static void main(String[] args) throws LogHubClientWorkerException, InterruptedException { // consumer_1 specifies the name of a consumer. The name of each consumer in a consumer group must be unique. If different consumers start processes on different machines to consume data in a Logstore, you can use the machine IP addresses to identify each consumer. // maxFetchLogGroupSize specifies the maximum number of log groups that can be obtained from Log Service at a time. Retain the default value. You can use
config.setMaxFetchLogGroupSize(100);
to change the maximum number. Valid values: (0,1000]. LogHubConfig config = new LogHubConfig(ConsumerGroup, "consumer_1", Endpoint, Project, Logstore, AccessKeyId, AccessKeySecret, LogHubConfig.ConsumePosition.BEGIN_CURSOR,1000); ClientWorker worker = new ClientWorker(new SampleLogHubProcessorFactory(), config); Thread thread = new Thread(worker); // After the Thread instance runs, the ClientWorker instance automatically runs and extends the Runnable interface. thread.start(); Thread.sleep(60 * 60 * 1000); // The shutdown function of the ClientWorker instance is called to exit the consumer. The Thread instance is automatically stopped. worker.shutdown(); // Multiple asynchronous tasks are generated when the ClientWorker instance is running. To ensure that all running tasks securely stop after the shutdown, we recommend that you set Thread.sleep to 30 seconds. Thread.sleep(30 * 1000); } } - Create a SampleLogHubProcessor.java file.
For more information, see Java, C++, Python, and Go.import com.aliyun.openservices.log.common.FastLog; import com.aliyun.openservices.log.common.FastLogContent; import com.aliyun.openservices.log.common.FastLogGroup; import com.aliyun.openservices.log.common.FastLogTag; import com.aliyun.openservices.log.common.LogGroupData; import com.aliyun.openservices.loghub.client.ILogHubCheckPointTracker; import com.aliyun.openservices.loghub.client.exceptions.LogHubCheckPointException; import com.aliyun.openservices.loghub.client.interfaces.ILogHubProcessor; import com.aliyun.openservices.loghub.client.interfaces.ILogHubProcessorFactory; import java.util.List; public class SampleLogHubProcessor implements ILogHubProcessor { private int shardId; // The time when the last checkpoint is saved. private long mLastCheckTime = 0; public void initialize(int shardId) { this.shardId = shardId; } // The main logic of data consumption. You must include the code to handle all exceptions that may occur during data consumption. public String process(List<LogGroupData> logGroups, ILogHubCheckPointTracker checkPointTracker) { // The obtained data is displayed. for (LogGroupData logGroup : logGroups) { FastLogGroup flg = logGroup.GetFastLogGroup(); System.out.println("Tags"); for (int tagIdx = 0; tagIdx < flg.getLogTagsCount(); ++tagIdx) { FastLogTag logtag = flg.getLogTags(tagIdx); System.out.println(String.format("\t%s\t:\t%s", logtag.getKey(), logtag.getValue())); } for (int lIdx = 0; lIdx < flg.getLogsCount(); ++lIdx) { FastLog log = flg.getLogs(lIdx); System.out.println("--------\nLog: " + lIdx + ", time: " + log.getTime() + ", GetContentCount: " + log.getContentsCount()); for (int cIdx = 0; cIdx < log.getContentsCount(); ++cIdx) { FastLogContent content = log.getContents(cIdx); System.out.println(content.getKey() + "\t:\t" + content.getValue()); } } } long curTime = System.currentTimeMillis(); // A checkpoint is written to Log Service at an interval of 30 seconds. If the ClientWorker instance unexpectedly stops within 30 seconds, a newly started ClientWorker instance consumes data from the last checkpoint. A small amount of data may be repeatedly consumed. if (curTime - mLastCheckTime > 30 * 1000) { try { If true is passed to saveCheckPoint, checkpoints are immediately synchronized to Log Service. If false is passed to saveCheckPoint, checkpoints are cached on your computer. By default, checkpoints are synchronized to Log Service at an interval of 60 seconds. checkPointTracker.saveCheckPoint(true); } catch (LogHubCheckPointException e) { e.printStackTrace(); } mLastCheckTime = curTime; } return null; } // The shutdown function of the ClientWorker instance is called. You can manage the checkpoints. public void shutdown(ILogHubCheckPointTracker checkPointTracker) { // The checkpoints are immediately saved to Log Service. try { checkPointTracker.saveCheckPoint(true); } catch (LogHubCheckPointException e) { e.printStackTrace(); } } } class SampleLogHubProcessorFactory implements ILogHubProcessorFactory { public ILogHubProcessor generatorProcessor() { // A consumer is generated. return new SampleLogHubProcessor(); } }
- Run Main.java. In the following example, NGINX logs are consumed and the consumption result is displayed.
: GET request_uri : /request/path-3/file-7 status : 200 body_bytes_sent : 3820 host : www.example.com request_time : 43 request_length : 1987 http_user_agent : Mozilla/5.0 (Windows NT 6.1) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/41.0.2228.0 Safari/537.36 http_referer : www.example.com http_x_forwarded_for : 192.168.10.196 upstream_response_time : 0.02 -------- Log: 158, time: 1635629778, GetContentCount: 14 ...... category : null source : 127.0.0.1 topic : nginx_access_log machineUUID : null Tags __receive_time__ : 1635629815 -------- Log: 0, time: 1635629788, GetContentCount: 14 ...... category : null source : 127.0.0.1 topic : nginx_access_log machineUUID : null Tags __receive_time__ : 1635629877 -------- ......
Step 2: View the status of a consumer group
View the status of a consumer group in the Log Service console
- Log on to the Log Service console.
- In the Projects section, click the project that you want to manage.
- On the
icon of the Logstore that you want to manage and click the
icon of Data Consumption.
tab, click the - In the consumer group list, click the consumer group that you want to manage.
- On the Consumer Group Status page, view the data consumption progress of each shard.
View the status of a consumer group by using an SDK
import java.util.List;
import com.aliyun.openservices.log.Client;
import com.aliyun.openservices.log.common.Consts.CursorMode;
import com.aliyun.openservices.log.common.ConsumerGroup;
import com.aliyun.openservices.log.common.ConsumerGroupShardCheckPoint;
import com.aliyun.openservices.log.exception.LogException;
public class ConsumerGroupTest {
static String endpoint = "";
static String project = "";
static String logstore = "";
static String accesskeyId = "";
static String accesskey = "";
public static void main(String[] args) throws LogException {
Client client = new Client(endpoint, accesskeyId, accesskey);
// Obtain all consumer groups that are created for the Logstore. If no consumer groups exist, an empty string is returned.
List<ConsumerGroup> consumerGroups = client.ListConsumerGroup(project, logstore).GetConsumerGroups();
for(ConsumerGroup c: consumerGroups){
// Display the attributes of each consumer group, including the name, heartbeat timeout period, and whether log data is consumed in order.
System.out.println("Name: " + c.getConsumerGroupName());
System.out.println("Heartbeat timeout period: " + c.getTimeout());
System.out.println("Ordered consumption: " + c.isInOrder());
for(ConsumerGroupShardCheckPoint cp: client.GetCheckPoint(project, logstore, c.getConsumerGroupName()).GetCheckPoints()){
System.out.println("shard: " + cp.getShard());
// The time is a long integer and is accurate to the microsecond.
System.out.println("The time at which the progress of data consumption is last updated: " + cp.getUpdateTime());
System.out.println("Consumer name: " + cp.getConsumer());
String consumerPrg = "";
if(cp.getCheckPoint().isEmpty())
consumerPrg = "Consumption is not started";
else{
// The UNIX timestamp. Unit: seconds. Format the output value of the timestamp.
try{
int prg = client.GetPrevCursorTime(project, logstore, cp.getShard(), cp.getCheckPoint()).GetCursorTime();
consumerPrg = "" + prg;
}
catch(LogException e){
if(e.GetErrorCode() == "InvalidCursor")
consumerPrg = "Invalid. The time at which the progress of data consumption is last updated is beyond the retention period of the data";
else{
//internal server error
throw e;
}
}
}
System.out.println("Consumption progress: " + consumerPrg);
String endCursor = client.GetCursor(project, logstore, cp.getShard(), CursorMode.END).GetCursor();
int endPrg = 0;
try{
endPrg = client.GetPrevCursorTime(project, logstore, cp.getShard(), endCursor).GetCursorTime();
}
catch(LogException e){
//do nothing
}
// The UNIX timestamp. Unit: seconds. Format the output value of the timestamp.
System.out.println("The time when the last data record is received: " + endPrg);
}
}
}
}
Name: etl-6cac01c571d5a4b933649c04a7ba215b
Heartbeat timeout period: 60
Ordered consumption: false
shard: 0
The time at which the progress of data consumption is last updated: 1639555453575211
Consumer name: etl-356464787983a3d17086a9797e3d5f0e6959b066-256521
Consumption progress: 1639555453
The time at which the last data record is received: 1639555453
shard: 1
The time at which the progress of data consumption is last updated: 1639555392071328
Consumer name: etl-356464787983a3d17086a9797e3d5f0e6959b066-256521
Consumption progress: 1639555391
The time at which the last data record is received: 1639555391
Name: etl-2bd3fdfdd63595d56b1ac24393bf5991
Heartbeat timeout period: 60
Ordered consumption: false
shard: 0
The time at which the progress of data consumption is last updated: 1639555453256773
Consumer name: etl-532719dd2f198b3878ee3c6dfc80aeffb39ee48b-061390
Consumption progress: 1639555453
The time at which the last data record is received: 1639555453
shard: 1
The time at which the progress of data consumption is last updated: 1639555392066234
Consumer name: etl-532719dd2f198b3878ee3c6dfc80aeffb39ee48b-061390
Consumption progress: 1639555391
The time at which the last data record is received: 1639555391
Name: consumerGroupX
Heartbeat timeout period: 60
Ordered consumption: false
shard: 0
The time at which the progress of data consumption is last updated: 1639555434142879
Consumer name: consumer_1
Consumption progress: 1635615029
The time at which the last data record is received: 1639555453
shard: 1
The time at which the progress of data consumption is last updated: 1639555437976929
Consumer name: consumer_1
Consumption progress: 1635616802
The time at which the last data record is received: 1639555391
Authorize a RAM user to perform operations on consumer groups
Before you can use a RAM user to perform operations on consumer groups, you must grant the required permissions to the RAM user. For more information, see Step 2: Grant permissions to the RAM user.
Action | Description | Resource |
---|---|---|
log:GetCursorOrData (GetCursor and PullLogs) | Queries cursors based on the time when logs are generated. | acs:log:${regionName}:${projectOwnerAliUid}:project/${projectName}/logstore/${logstoreName} |
log:CreateConsumerGroup | Creates a consumer group for a specified Logstore. | acs:log:${regionName}:${projectOwnerAliUid}:project/${projectName}/logstore/${logstoreName}/consumergroup/${consumerGroupName} |
log:ListConsumerGroup | Queries all consumer groups of a specified Logstore. | acs:log:${regionName}:${projectOwnerAliUid}:project/${projectName}/logstore/${logstoreName}/consumergroup/* |
log:ConsumerGroupUpdateCheckPoint | Updates the checkpoint for a shard that is allocated to a specified consumer group. | acs:log:${regionName}:${projectOwnerAliUid}:project/${projectName}/logstore/${logstoreName}/consumergroup/${consumerGroupName} |
log:ConsumerGroupHeartBeat | Sends a heartbeat message for a specified consumer to Log Service. | acs:log:${regionName}:${projectOwnerAliUid}:project/${projectName}/logstore/${logstoreName}/consumergroup/${consumerGroupName} |
log:UpdateConsumerGroup | Modifies the attributes of a specified consumer group. | acs:log:${regionName}:${projectOwnerAliUid}:project/${projectName}/logstore/${logstoreName}/consumergroup/${consumerGroupName} |
log:GetConsumerGroupCheckPoint | Queries the checkpoints for one or all shards that are allocated to a specified consumer group. | acs:log:${regionName}:${projectOwnerAliUid}:project/${projectName}/logstore/${logstoreName}/consumergroup/${consumerGroupName} |
The following list provides resource information about a consumer group. To allow a RAM user to perform operations on the consumer group, you can refer to the following code to grant the required permissions to the RAM user:
- ID of the Alibaba Cloud account to which the project belongs: 174649****602745
- ID of the region where the project resides: cn-hangzhou
- Name of the project: project-test
- Name of the Logstore: logstore-test
- Name of the consumer group: consumergroup-test
{
"Version": "1",
"Statement": [
{
"Effect": "Allow",
"Action": [
"log:GetCursorOrData"
],
"Resource": "acs:log:cn-hangzhou:174649****602745:project/project-test/logstore/logstore-test"
},
{
"Effect": "Allow",
"Action": [
"log:CreateConsumerGroup",
"log:ListConsumerGroup"
],
"Resource": "acs:log:cn-hangzhou:174649****602745:project/project-test/logstore/logstore-test/consumergroup/*"
},
{
"Effect": "Allow",
"Action": [
"log:ConsumerGroupUpdateCheckPoint",
"log:ConsumerGroupHeartBeat",
"log:UpdateConsumerGroup",
"log:GetConsumerGroupCheckPoint"
],
"Resource": "acs:log:cn-hangzhou:174649****602745:project/project-test/logstore/logstore-test/consumergroup/consumergroup-test"
}
]
}
What to do next
- Configure Log4j for troubleshooting. We recommend that you configure Log4j for the consumer program to return error messages when consumer groups encounter exceptions. This helps you troubleshoot the errors. The following code shows a log4j.properties configuration file that is commonly used:
log4j.rootLogger = info,stdout log4j.appender.stdout = org.apache.log4j.ConsoleAppender log4j.appender.stdout.Target = System.out log4j.appender.stdout.layout = org.apache.log4j.PatternLayout log4j.appender.stdout.layout.ConversionPattern = [%-5p] %d{yyyy-MM-dd HH:mm:ss,SSS} method:%l%n%m%n
After you configure Log4j, you can view information about exceptions when you run the consumer program. The following code shows information about an exception:[WARN ] 2018-03-14 12:01:52,747 method:com.aliyun.openservices.loghub.client.LogHubConsumer.sampleLogError(LogHubConsumer.java:159) com.aliyun.openservices.log.exception.LogException: Invalid loggroup count, (0,1000]
- Use a consumer group to consume data that is generated from a point in time
// consumerStartTimeInSeconds specifies a point in time. The data that is generated after the point in time is consumed. public LogHubConfig(String consumerGroupName, String consumerName, String loghubEndPoint, String project, String logStore, String accessId, String accessKey, int consumerStartTimeInSeconds); // position is an enumeration variable. LogHubConfig.ConsumePosition.BEGIN_CURSOR specifies that the consumption starts from the earliest data. LogHubConfig.ConsumePosition.END_CURSOR specifies that the consumption starts from the latest data. public LogHubConfig(String consumerGroupName, String consumerName, String loghubEndPoint, String project, String logStore, String accessId, String accessKey, ConsumePosition position);
Note- You can use different constructors based on your business requirements.
- If a checkpoint is stored on Log Service, data consumption starts from the checkpoint.
- When Log Service consumes data, a checkpoint is preferentially used to start data consumption. If you want to specify a point in time for Log Service to start data consumption, make sure that the value of consumerStartTimeInSeconds falls within the time-to-live (TTL) period. Otherwise, Log Service cannot consume data based on your configurations.
- Reset a checkpoint.
public static void updateCheckpoint() throws Exception { Client client = new Client(host, accessId, accessKey); long timestamp = Timestamp.valueOf("2017-11-15 00:00:00").getTime() / 1000; ListShardResponse response = client.ListShard(new ListShardRequest(project, logStore)); for (Shard shard : response.GetShards()) { int shardId = shard.GetShardId(); String cursor = client.GetCursor(project, logStore, shardId, timestamp).GetCursor(); client.UpdateCheckPoint(project, logStore, consumerGroup, shardId, cursor); } }
References
- API operations
Operation API operation Create a consumer group CreateConsumerGroup Query a consumer group ListConsumerGroup Delete a consumer group DeleteConsumerGroup Update a consumer group UpdateConsumerGroup Send a heartbeat message for a consumer ConsumerGroupHeartBeat Query the checkpoints of a consumer group GetCheckPoint Update the checkpoints of a consumer group ConsumerGroupUpdateCheckPoint - SDKs
Programming language Documentation link SDK for Java SDK for Python - Log Service CLI
Operation CLI command Create a consumer group create_consumer_group Query a consumer group list_consumer_group Update a consumer group update_consumer_group Delete a consumer group delete_consumer_group Query the checkpoints of a consumer group get_check_point Update the checkpoints of a consumer group update_check_point