Log processing is a large category, including real-time computing, data warehousing, and offline computation. This document introduces ways to maintain the order of log processing, and avoid loss and duplication in such scenarios as real-time computing, break-down of upstream/downstream service system, and dramatic fluctuation of service traffic.
To explain the concept clearly, this article uses the day at bank as an example. LogHub model of the Log Service, and the usage of LogHub with Spark Streaming and Storm Spout to complete log data processing are also described in this document.
Jay Kreps, a former LinkedIn employee, notes in his article that log data is “append-only, totally-ordered sequence of records ordered by time” (The Log: What every software engineer must know about real-time data’s unifying abstraction).
- Append only: Log works in append mode. Log entries cannot be modified once being generated.
- Totally ordered by time: Log entries are strictly ordered by time. Every log entry is generated at a specific time point. Different log entries may seem to be generated at the same time, for example, GET and SET method. For the computer, however, they were performed in sequence.
50 years ago, the term “log” was associated with a thick notebook written by a ship captain or operator. Now, with the rise of computers, logs are produced and consumed everywhere: servers, routers, sensors, GPS, purchase orders, and various devices. Using the example of a ship captain’s log, we can see that, besides a recorded timestamp, a log can contain all sorts of information, for example, a text record, an image, weather conditions, or sailing course. Half a century has passed. The captain’s log is extended to other fields, such as a purchase order, a payment record, a user access, and a database operation.
In the computer world, we usually use these log types: metric, binlog (database and NoSQL), event, auditing, and access Log.
In the following demo example, each operation that user performs at the bank is taken as a log entry. The entry consists of user, account name, operation time, operation type, and amount.
2016-06-28 08:00:00 Michael Jacob Deposit US$1,000
2016-06-27 09:00:00 Shane Lee Withdraw US$2,0000
Alibaba Cloud Log Service LogHub is used as the model for demonstration. For more information, see Basic concepts of Alibaba Cloud Log Service.
- Log: Composed of time, and a pair of key and value.
- LogGroup: A collection of logs that share the same metadata (IP address, source).
Their relationship is as follows:
- Shard: A partition, a basic unit for reading and writing logs in a LogGroup, or in other words a 48-hour-cycle FIFO queue. Every shard offers 5 MB/s write, and 10 MB/s read capability. Shard uses logical segments (BeginKey and EndKey) to sort different types of data.
- LogStore: A log library that stores log data of the same type. LogStore is a carrier constructed from shards with [0000, FFFF..) segments. One LogStore may contain one or more shards.
- Project: A container for storing LogStores.
The relationship among these concepts is as follows:
Take the 19th-century bank as an example. Several users (producers) in a city made withdrawals (user operations) from a bank, where several clerks (consumers) were at service. Computer system was not yet available for real-time synchronization in the 19th century. Therefore, each clerk had to keep related information in an account book and brought it along with the cash back to the company for reconciliation.
In the world of distributed system, take a clerk as a single server with fixed memory size and computing capacity. Users are regarded as requests from different data sources, and the bank’s lobby as the log database (LogStore) that processes users’ access data.
- Log/LogGroup: Operations like deposit and withdrawal initiated by users.
- User: Log/LogGroup producer.
- Clerk: Bank employees responsible for processing user requests.
- Bank lobby (LogStore): A user’s operation request goes to the bank’s lobby before being handled by a clerk.
- Partition (Shard): The way that the bank lobby organizes user requests.
There were two clerks (Clerk A and Clerk B) at the bank. Michael Jacob entered the bank and asked Clerk A to deposit USD 1,000 into his account, and Clerk A made the deposit and recorded it in her account book. Michael Jacob, who was in need of money in the afternoon, went back to the bank and tried to withdraw some money at Clerk B’s counter. Clerk B checked her account book and found that there was no record of Michael Jacob’s deposit.
This example shows that deposit and withdrawal are operations in strict sequence. It requires the same clerk (processor) to handle these operations for the same user to maintain consistency.
Problem can be solved with the method of order preservation: queue user requests, create a Shard, and assign Clerk A as the only clerk who handles the requests. User requests are handled on a First-In, First-Out (FIFO) basis. Everything works fine except for low efficiency. In case of 1,000 users, it will not improve the efficiency in any way even if the bank assigns 10 clerks instead of one.
What can we do in this case?
- Assume that bank has 10 clerks, and in turn creates 10 shards.
- How to guarantee that the operations on the same account are in order? Users can be mapped using consistent hashing. For example, set up 10 queues (shards), and have every clerk handles one shard. Different bank accounts or user names are mapped to a specific shard. In this case, Michael Jacob’s hash value, Jacob or J, is always mapped to a specific shard (in a segment of the shard), and the processing end is always Clerk A.
If many users’ surnames start with J, you can switch to another policy. For example, users can be hashed by AccountID, ZipCode, or other attributes, for a better balance of operation requests among the shards.
Michael Jacob went to Counter A to make a deposit. Clerk A stepped out to answer a call halfway while she was handling the request of Michael. When she was back from the call, she thought that Michael’s deposit was already made and started to handle the request of the next user. Hence, Michael’s deposit request was lost.
Machines do not make human errors, and longer uptime and reliability are higher than tellers. However, a business may still be interrupted in case that the system breaks down or encounters a heavy workload. It is unacceptable to lose users’ deposits in such a case.
How to solve this problem?
Clerk A can record an entry in her notebook (rather than an account book) to indicate the current segment in which the request being handled is. Only when Michael Jacob’s deposit request is entirely confirmed, can Clerk A proceed to handle the next request.
What is the downside? The same request may be handled twice. In another scenario, when Clerk A completed handling Michael Jacob’s request (with the account book updated) and was ready to make a record in her notebook, she was unexpectedly wanted and left away. Upon returning, she found that Michael Jacob’s request was not recorded in her notebook, and therefore handled Michael Jacob’s request for a second time, which led to a repeated entry.
Will repeated entries cause problems? Not necessarily.
In the case of idempotence, repeated entries do not impact the results except for wasting a bit of resources. What is idempotence? An operation of duplicate consumption that does not impact the results is idempotence. For example, a user’s checking the balance is a read-only operation and does not impact the results even if being repeated. Non-read-only operations, such as logging off a user, can be performed twice in a row.
Most operations in the real world are not idempotent, like deposit and withdrawal. Repeat of such entries may cause catastrophic results. What is the solution then? Clerk A must treat “updating the account book” and “recording completion of shard processing in the notebook” as one operation and write down the progress (CheckPoint).
If Clerk A leaves temporarily or permanently, any other clerk who takes over the request follows the same rule. If the request is recorded as completed, move to the next request; if not completed, repeat it. It is imperative to maintain atomicity during the process.
CheckPoint can use the element position (or time) in a shard as a key and put it into an object that can be persisted. It means that the current element has been processed.
After the preceding three concepts are explained, the principle is not complicated. However, in the real world, scale changes and uncertainty may further complicate these three questions, for example:
- The number of users will surge on pay day.
- After all, the clerks are not robots. They tend to take regular breaks and occasional leaves.
- To improve the overall service experience, bank managers have to improve clerks’ efficiency. What are the criteria of being efficient? What is the processing speed in a shard?
- Can a clerk pass the clerk’s account book and notebook to another during a handover?
In the beginning of the day, there is only one shard, Shard 0. All the user requests go to the queue for Shard 0, and Clerk A is comfortable to handle the workload alone.
The bank manager decides to split Shard 0 into two new shards (Shard 1 and Shard 2) after 10:00 o’clock, and executes a rule that assigns users to a queue for Shard 1 if their surnames start with a letter in the range from A to W, and users to a queue for Shard 2 if their surnames start with X, Y, or Z. Why are the two shard segments not divided equally? Because the surnames are not evenly distributed in terms of the first letter. Such a mapping guarantees workload balance between the two shards.
Consumption of requests between 10:00 and 12:00 is as follows:
Seeing that handling two shards at a time is getting difficult for Clerk A, the bank manager sends Clerks B and C for help. Because, of the only two shards, Clerk B took over one shard from Clerk A and Clerk C stood by.
Clerk A handles the requests in Shard1 under high pressure. The bank manager splits Shard 1 into two new shards (Shard 3 and Shard 4). Clerk A is responsible for Shard 3 and Clerk C is responsible for Shard 4. All the requests assigned to the queue for Shard 1 after 12:00 are diverted to Shard 3 and Shard 4 respectively.
Consumption of requests after 12:00 is as follows:
The bank manager relieves Clerks A and B and let Clerk C to handle requests in Shard 2, Shard 3, and Shard 4. Then, the bank manager merges Shard 2 and Shard 3 into Shard 5, and at last Shard 5 and Shard 4 into one shard. The bank was closed when all the requests in the last shard were handled.
The preceding process can be abstracted into typical scenarios of log processing. To address the business needs of a bank, a log foundation framework capable of automatic scaling and flexible adaptation must be provided, including the following:
- Elastic scaling of shards, for more information, see LogHub Auto Scaling (Merge/Split).
- Automatic adaptation of consumers when they log on/off and prevent data loss during the handling, for more information, see Consumer group.
- Support order preservation during the handling, for more information, see LogHub support for ordering write and consumption.
- Prevent repeated entries during the handling (which requires consumers’ cooperation).
- Observe the consuming progress for reasonable allocation of computing resources, for more infornation, see Using console to view consumer group progress.
- Support incoming logs from more channels (in the banking sector, more channels, such as online banking, mobile banking, and cheques can bring in more user requests), for more information, see Collection methods.
LogHub and LogHub Consumer Library can help you resolve the typical problems in real time log processing. You can focus on the business logic, without concerning about traffic, resizing, failover, and other issues.
Additionally, APIs for Storm and Spark Streaming are available with Consumer Library. For more technical information and discussion of Log Service, visit Log Service Homepage and Log Processing Community.