10 Minutes to Understand Kafka
Related Tags：1. Message Queue for Apache Kafka
2. E-MapReduce | Kafka
Abstract: Apache Kafka is a fast, scalable, high-throughput, fault-tolerant distributed "publish-subscribe" messaging system, written in Scala and Java, capable of delivering messages from one endpoint to another.
Compared with traditional message middleware (such as ActiveMQ, RabbitMQ), Kafka has the characteristics of high throughput, built-in partition, support for message replication and high fault tolerance, which is very suitable for large-scale message processing applications.
Kafka official website:
The main design goals of Kafka are as follows:
Provides message persistence capability with a time complexity of O(1), ensuring constant-time access performance even for terabytes of data or more.
High throughput. Even on very cheap commercial machines, a single machine can support the transmission of 100K messages per second.
It supports message partitioning and distributed consumption between Kafka Servers, while ensuring the sequential transmission of messages within each Partition.
Both offline data processing and real-time data processing are supported.
Support online horizontal expansion.
Kafka is commonly used in two broad categories of applications:
Build real-time streaming data pipelines to reliably get data between systems or applications.
Build real-time streaming applications to transform or respond to data streams.
To understand how Kafka performs these operations, let's dive into Kafka's capabilities from the ground up.
First a few concepts:
Kafka runs as a cluster on one or more servers that can span multiple data centers.
Kafka clusters store streams of records in categories called topics.
Each record consists of a key, a value and a timestamp.
The Kafka architecture system:
There are many application scenarios for Kafka. Here are some of our most common scenarios:
①User activity tracking: different activity messages of users on the website are published to different theme centers, and then these messages can be monitored and processed in real time.
Of course, it can also be loaded into Hadoop or offline processing data warehouses to profile users. On large e-commerce platforms like Taobao, Tmall, and JD.com, all activities of users must be tracked.
②Log collection as shown below:
③The current limiting peak clipping is as shown in the figure below:
④ High throughput rate realization: Compared with other MQs, the biggest feature of Kafka is high throughput rate. In order to increase storage capacity, Kafka writes all messages to low-speed, large-capacity hard drives.
Arguably, this would result in a performance penalty, but in reality, Kafka can still maintain extremely high throughput rates and its performance is not compromised.
It mainly adopts the following methods to achieve high throughput:
Sequential read and write: Kafka writes messages to the partition Partition, and the messages in the partition are read and written sequentially. Sequential reads and writes are faster than random reads and writes.
Zero-copy: Producers and consumers implement zero-copy for messages in Kafka.
Bulk send: Kafka allows batch send mode.
Message compression: Kafka allows compression of message collections.
The advantages of Kafka are as follows:
① Decoupling: It is extremely difficult to predict what requirements the project will encounter in the future at the beginning of the project.
The message system inserts an implicit, data-based interface layer in the middle of the process, and both processes must implement this interface.
This allows you to extend or modify both processes independently, as long as you make sure they obey the same interface constraints.
② Redundancy (copy): In some cases, the process of processing data will fail. It will be lost unless the data is persisted.
Message queues avoid the risk of data loss by persisting data until they have been fully processed.
In the "insert-get-delete" paradigm used by many message queues, before removing a message from the queue, your processing system needs to explicitly indicate that the message has been processed, thus ensuring that your data is kept safe until you are done using it.
③Scalability: Because the message queue decouples your processing process, it is easy to increase the frequency of message queuing and processing, as long as the processing process is additionally added. No need to change the code, no need to adjust the parameters. Expansion is as easy as turning up the power button.
④Flexibility & peak processing capacity: In the case of a sharp increase in the number of visits, the application still needs to continue to function, but such burst traffic is not common; Undoubtedly a huge waste.
Using message queues enables critical components to withstand sudden access pressures without completely crashing due to sudden overloaded requests.
⑤Recoverability: When a part of the system fails, it will not affect the entire system. Message queues reduce the coupling between processes, so even if a process processing a message hangs, messages added to the queue can still be processed after the system is restored.
⑥Order guarantee: In most usage scenarios, the order of data processing is very important. Most message queues are inherently ordered and guarantee that data will be processed in a specific order. Kafka guarantees the ordering of messages within a Partition.
⑦ Buffering: In any important system, there will be elements that require different processing times. For example, loading an image takes less time than applying filters.
The message queue uses a buffer layer to help tasks execute most efficiently, and the processing of writes to the queue is as fast as possible. This buffering helps control and optimize the speed at which data flows through the system.
⑧ Asynchronous communication: In many cases, users do not want or need to process messages immediately. Message queues provide asynchronous processing mechanisms that allow users to put a message on the queue, but not process it immediately. Put as many messages as you want into the queue, and then process them when needed.
The comparison between Kafka and other MQ is as follows:
①RabbitMQ: RabbitMQ is an open source message queue written in Erlang. It supports many protocols: AMQP, XMPP, SMTP, STOMP. Because of this, it is very heavyweight and more suitable for enterprise-level development.
At the same time, the Broker architecture is implemented, which means that messages are queued in the central queue before being sent to the client. It has good support for routing, load balancing or data persistence.
②Redis: Redis is a NoSQL database based on Key-Value pairs, with active development and maintenance.
Although it is a Key-Value database storage system, it supports MQ functions, so it can be used as a lightweight queue service.
For the enqueue and dequeue operations of RabbitMQ and Redis, each is executed 1 million times, and the execution time is recorded every 100,000 times. The test data is divided into four different sizes of 128Bytes, 512Bytes, 1K and 10K.
Experiments show that: when entering the queue, when the data is relatively small, the performance of Redis is higher than that of RabbitMQ, and if the data size exceeds 10K, Redis is unbearably slow; when leaving the queue, regardless of the data size, Redis shows very good performance , and the dequeue performance of RabbitMQ is much lower than that of Redis.
③ZeroMQ: ZeroMQ is known as the fastest message queue system, especially for high-throughput demand scenarios.
ZeroMQ can implement advanced/complex queues that RabbitMQ is not good at, but developers need to combine multiple technical frameworks by themselves. The technical complexity is a challenge to the successful application of this MQ.
ZeroMQ has a unique non-middleware model, you don't need to install and run a message server or middleware because your application will play the server role.
You simply reference the ZeroMQ library, install it using NuGet, and you can happily send messages between applications.
But ZeroMQ only provides non-persistent queues, which means that data will be lost if it goes down. Among them, Twitter's Storm versions before 0.9.0 used ZeroMQ as the data stream transmission by default (Storm has supported both ZeroMQ and Netty as transmission modules since version 0.9).
④ActiveMQ: ActiveMQ is a sub-project under Apache. Similar to ZeroMQ, it can implement queues in broker and peer-to-peer technology. At the same time, similar to RabbitMQ, it can efficiently implement advanced application scenarios with a small amount of code.
⑤Kafka/Jafka: Kafka is a sub-project under Apache, a high-performance cross-language distributed publish/subscribe message queue system, and Jafka is incubated on top of Kafka, that is, an upgraded version of Kafka.
Has the following characteristics:
Fast persistence, message persistence can be performed under O(1) system overhead.
High throughput, a throughput rate of 10W/s can be achieved on an ordinary server.
A completely distributed system, Broker, Producer, and Consumer all natively and automatically support distribution and automatically achieve load balancing.
Supports parallel loading of Hadoop data, a viable solution for log data and offline analysis systems like Hadoop, but requires real-time processing constraints.
Kafka unifies online and offline message processing through Hadoop's parallel loading mechanism. Apache Kafka is a very lightweight messaging system compared to ActiveMQ, and in addition to its very good performance, it is a distributed system that works well.
Several important roles of Kafka are as follows:
①Kafka as a storage system: Any message queue that allows publishing of messages independent of usage effectively acts as a storage system for running messages. Kafka is different in that it is a very good storage system.
Data written to Kafka is written to disk and replicated for fault tolerance. Kafka allows producers to wait for acknowledgments in order to ensure that writes do not complete even if the write server fails until full replication.
Kafka's on-disk structure scales well for usage - Kafka will do the same whether you have 50KB or 50TB of persistent data on the server.
Because it takes storage seriously and allows clients to control where it reads, you can think of Kafka as a specialized distributed file system for high-performance, low-latency commit log storage, replication, and propagation.
② Kafka as a messaging system: How does Kafka's stream concept compare to traditional enterprise messaging systems?
Traditionally, messaging has two models: queuing and publish-subscribe. In the queue, a group of consumers can read from the server, and each record goes to one of them.
Broadcast to all consumers in a publish-subscribe record. Each of these two models has advantages and disadvantages.
The advantage of queuing is that it allows you to divide data processing across multiple consumer instances, thus scaling the processing volume.
Unfortunately, queues are not multi-user - one process reads the missing data. Publish-subscribe allows you to broadcast data to multiple processes, but cannot scale processing since every message is delivered to every subscriber.
Kafka's concept of consumer groups summarizes both concepts. Like queues, consumer groups allow you to divide processing into a group of processes (members of consumer groups). Like publish-subscribe, Kafka allows you to broadcast messages to multiple consumer groups.
The beauty of the Kafka model is that every topic has these properties - can scale processing, and is multi-subscriber, no need to choose one or the other.
Kafka also has stronger ordering guarantees than traditional messaging systems. Traditional queues keep records on the server in order, and if multiple consumers consume from the queue, the server will distribute the records in the order in which they were stored.
However, although the server dispatches records in order, these records are delivered to consumers asynchronously, so they may arrive out of order on different consumers.
This effectively means that the order of records is lost in the case of parallel use. Messaging systems usually solve this problem with the concept of "exclusive consumers", which allow only one process to consume from the queue, however, this of course means there is no parallelism in the processing.
Kafka does it better, by having the concept of parallelism (i.e. partitioning) within a topic, Kafka is able to provide ordering guarantees and load balancing across a pool of user processes.
This is achieved by assigning partitions in a topic to consumers in a consumer group so that each partition is fully consumed by one consumer in the group.
By doing this, we ensure that the consumer is the only reader for that partition and consumes the data in order. Since there are many partitions, the load on many consumer instances can still be balanced. Note, however, that consumer instances in a consumer group cannot exceed partitions.
③ Kafka is used as stream processing: It is not enough to read, write and store data streams, the purpose is to realize real-time processing of streams.
In Kafka, a stream processor is anything that takes a continuous stream of data from an input topic, does some processing on that input and produces a continuous stream of data to output the topic.
For example, a retail application could accept an input stream of sales and shipments and output a stream of reorders and price adjustments calculated from this data.
Simple processing can be done directly using the producer and consumer APIs. However, for more complex transformations, Kafka provides a fully integrated Streams API.
This allows building applications that perform non-critical processing that compute aggregates of streams or join streams together.
This feature helps solve the challenges faced by such applications: handling out-of-order data, reprocessing input when code changes, performing state calculations, etc.
The Streaming API builds on the core primitives provided by Kafka: it uses the producer and consumer APIs for input, Kafka for state storage, and the same group mechanism for fault tolerance between stream processor instances.
Key terms in Kafka explained
Topic: Topic. In Kafka, a category attribute is used to divide the class to which the message belongs, and the class that divides the message is called for Topic. Topic is equivalent to the classification label of a message and is a logical concept.
Physically, messages of different topics are stored separately. Logically, although messages of a topic are stored on one or more brokers, users only need to specify the topic of the message to produce or consume data without caring where the data is stored.
Partition: Partition. The messages in the Topic are divided into one or more Partitions, which are a physical concept and correspond to one or several directories on the system. Messages within a Partition are ordered, but messages between Partitions are unordered.
Segment segment. The Partition is further subdivided into several Segments, and the size of each Segment file is equal.
Broker: A Kafka cluster contains one or more servers, and each server node is called a broker.
Broker stores Topic data. If a Topic has N Partitions and the cluster has N Brokers, then each Broker stores a Partition of the Topic.
If a Topic has N Partitions and the cluster has (N+M) Brokers, then N Brokers store a Partition of the Topic, and the remaining M Brokers do not store Partition data of the Topic.
If a Topic has N Partitions and the number of Brokers in the cluster is less than N, then a Broker stores one or more Partitions of the Topic.
In the actual production environment, try to avoid this situation, which can easily lead to unbalanced data in the Kafka cluster.
Producer: Producer. Publishers of messages, producers publish data to topics of their choice.
The producer is responsible for choosing which record to assign to which partition in the topic. That is: a message produced by the producer will be written to a certain Partition.
Consumer: Consumer. Messages can be read from the broker. A consumer can consume messages from multiple topics; a consumer can consume messages from multiple Partitions in the same topic; a Partiton allows multiple consumers to consume at the same time.
Consumer Group: Consumer Group is a scalable and fault-tolerant consumer mechanism provided by Kafka.
There can be multiple consumers in a group that share a common ID, the Group ID. All consumers within the group coordinate to consume all partitions of the subscribed topic.
Kafka guarantees that only one Consumer in the same Consumer Group will consume a message.
In fact, Kafka guarantees that each Consumer instance will only consume one or more specific Partitions in a stable state, and the data of a Partition will only be consumed by a specific Consumer instance.
Below we use a picture on the official website to identify the correspondence between the number of consumers and the number of partitions.
A two-server Kafka cluster with four partitions (P0-P3) with two consumer groups. Consumer group A has two consumer instances and group B has four.
For this consumer group, I have not understood it before. My own summary is: Partitoin to Group in Topic is the communication method of publish and subscribe.
That is, the message of a Topic's Partition will be consumed by all groups, which belongs to the one-to-many mode; the group-to-consumer is a point-to-point communication method, which belongs to the one-to-one mode.
For example: If Group is not used, start 10 Consumers to consume a Topic, these 10 Consumers can get all the data of the Topic, which is equivalent to 10 times any message in the Topic is consumed.
If you use Group, you need to bring the groupid when connecting. Topic messages will be distributed to 10 consumers, and each message will only be consumed once.
Replizcas of partition: Partition replica. A replica is a backup of a partition, a backup of a partition created to prevent message loss.
Partition Leader: Each Partition has multiple copies, and only one of them is the Leader. The Leader is the Partition that is currently responsible for reading and writing messages. That is, all read and write operations can only occur on the Leader partition.
Partition Follower: All Followers need to synchronize messages from the Leader, and Followers and Leaders always keep messages synchronized. The relationship between Leader and Follower is a master-slave relationship, not a master-slave relationship.
ISR, In-Sync Replicas, refers to the replica synchronization list. The ISR list is maintained by the Leader.
AR, Assigned Replicas, refers to all copies of a Partition, that is, the list of allocated copies.
OSR, Outof-Sync Replicas, is a list of non-synchronized replicas.
Offset: offset. Each message has a unique 64-byte Offset under the current Partition, which is equivalent to the offset of the first message in the current partition.
Broker Controller: One of the multiple Brokers in the Kafka cluster will be elected as a Controller, which is responsible for managing the status of Partitions and Replicas in the entire cluster.
Only the Broker Controller will register the Watcher with Zookeeper, other brokers and partitions do not need to be registered. That is, Zookeeper only needs to monitor the status changes of the Broker Controller.
HW and LEO:
HW, HighWatermark, high water mark, indicates the highest Partition offset that the Consumer can consume. HW guarantees the consistency of messages in the Kafka cluster. To be precise, it ensures the consistency of data between the Partition's Follower and Leader.
LEO, Log End Offset, the offset of the last message of the log. Messages are written to Kafka's log file, which is the current offset of the last message written in the Partition.
For messages newly written by the Leader, the Consumer cannot consume them immediately. The Leader will wait for the message to be synchronized by the Partition Followers in all ISRs before updating the HW, and then the message can be consumed by the Consumer.
I believe you are still confused after reading the above concept, okay! Let's use a diagram to visualize the relationship between the two:
Zookeeper: Zookeeper is responsible for maintaining and coordinating the Broker, and is responsible for the election of the Broker Controller. Before Kafka 0.9, Offset was managed by ZK.
Summary: ZK is responsible for Controller election, and Controller is responsible for Leader election.
Coordinator: Generally refers to the Group Coordinator process running on each Broker, used to manage each member in the Consumer Group, mainly used for Offset displacement management and Rebalance. A Coordinator can manage multiple consumer groups at the same time.
Rebalance: When the number of consumer groups changes, or the number of Partitions in Topic changes, the ownership of Partitions will be transferred between consumers, that is, Partitions will be redistributed. This process is called rebalance.
Rebalancing can bring high performance, high availability, and scaling to consumer groups and brokers, but consumers cannot read messages during rebalancing, that is, the entire broker cluster is unavailable for a short period of time. Therefore, unnecessary rebalancing is to be avoided.
Offset Commit: The Consumer takes a batch of messages from the Broker and writes them into the Buffer for consumption. After consuming the messages within the specified time, it will automatically submit the Offset of the consumed messages to the Broker to record which messages have been consumed. Of course, if the consumption is not completed within the time limit, the Offset will not be submitted.
How Kafka works and the process
①Message writing algorithm
The message sender sends the message to the broker, and forms the final log that can be consumed by consumers, which is a relatively complicated process:
The Producer first finds the leader of the Partition from Zookeeper.
The Producer sends the message to the Leader.
The Leader connects the message to the local log and notifies the Followers of the ISR.
Followers in the ISR pull messages from the Leader, write them to the local log, and send Ack to the Leader.
After the Leader receives the Ack of all Followers in the ISR, it increases HW and sends Ack to the Producer, indicating that the message was written successfully.
②Message routing strategy
When publishing a message through the API, the producer uses the Record as the message to publish.
Record contains Key and Value, Value is our real message itself, and Key is used for the Partition where the routing message is to be stored.
The Partition to which the message is written is not random, but has a routing strategy:
If Partition is specified, it will be directly written to the specified Partition.
If Partition is not specified but Key is specified, the modulo is obtained by taking the modulo of the Hash value of Key and the number of Partitions.
The result is the Partition index to be selected.
If neither Partition nor Key is specified, a Partition is selected using a round-robin algorithm.
③HW truncation mechanism
If the Partition Leader receives a new message, other Followers in the ISR are in the process of synchronizing, and the leader is down before the synchronization is completed.
At this point, a new leader needs to be elected. If there is no HW truncation mechanism, it will lead to inconsistency between Leader and Follower data in Partition.
When the original leader crashes and then recovers, roll back its LEO to the HW at the time of its crash, and then synchronize the data with the new leader, so that the data in the old leader and the new leader can be guaranteed to be consistent. This mechanism It is called the HW truncation mechanism.
④Reliability of message sending
When producers send messages to Kafka, they can choose the level of reliability they want. Set by the value of the request.required.acks parameter.
A value of 0: send asynchronously. The producer sends a message to Kafka without a successful Ack from Kafka. This method is the most efficient, but the least reliable.
There may be cases where messages are lost:
Message loss occurs during transmission.
There will be message loss inside the broker.
There will be situations where the order of messages written to Kafka does not match the order of production.
1 value: synchronous transmission. The producer sends a message to Kafka, and the Partition Leader of the Broker sends a successful Ack immediately after receiving the message (no need to wait for the Follower in the ISR to synchronize).
The producer knows that the message was sent successfully after receiving it, and then sends the message again. If the Ack from Kafka has not been received, the producer will consider that the message sending failed and will resend the message.
In this way, if the Producer does not receive Ack, it must be able to confirm that the message sending failed, and then it can be resent.
However, even if an ACK is received, there is no guarantee that the message will be sent successfully. Therefore, in this case, message loss may also occur.
-1 value: Send synchronously. The producer sends a message to Kafka. After Kafka receives the message, it will not send a successful Ack to the producer until all replicas in the ISR list have synchronized the message.
If the Ack from Kafka has not been received, it is considered that the message sending failed, and the message will be automatically resent. In this way, the message will be repeatedly received.
⑤Analysis of consumer consumption process
The producer sends the message to Topitc, and the consumer can consume it. The consumption process is as follows:
Consumer to When a Broker submits a connection request, the connected Broker will send it the communication URL of the Broker Controller, that is, the Listeners address in the configuration file.
After the Consumer specifies the Topic to consume, it will send a consumption request to the Broker Controller.
The Broker Controller will assign one or several Partition Leaders to the Consumer, and send the current Offset of the Partition to the Consumer.
The Consumer will consume the messages in it according to the Partition assigned by the Broker Controller.
After the Consumer consumes the message, the Consumer will send a message to the Broker that the message has been consumed, that is, the Offset of the message.
After the Broker receives the Consumer's Offset, it will update the corresponding __consumer_offset.
The above process will repeat until the consumer stops requesting consumption.
The Consumer can reset the Offset, so that it can flexibly consume the messages stored on the Broker.
⑥Partition Leader election scope
When the Leader goes down, the Broker Controller will select a Follower from the ISR to become the new Leader.
What if there are no other copies in the ISR? The leader election range can be set by the value of unclean.leader.election.enable.
False: Must wait until all replicas in the ISR list are alive before making a new election. The reliability of this strategy is guaranteed, but the availability is low.
True: In the case of no replicas in the ISR list, any host that is not down can be selected as the new Leader. This strategy has high availability, but reliability is not guaranteed.
⑦The solution to the problem of repeated consumption
Repeated consumption of the same Consumer: When the Consumer has a consumption timeout due to its low consumption capacity, repeated consumption may occur.
When a certain data has just been consumed, but is preparing to submit the offset, the consumption time expires, and the broker considers that the message has not been successfully consumed. At this time, there will be a problem of repeated consumption. Its solution: Extend the Offset submission time.
Repeated consumption by different consumers: When the consumer consumes the message but fails to submit the offset, the already consumed messages will be consumed repeatedly. Its solution: change the automatic submission to manual submission.
⑧ Solve the problem of Kafka repeated consumption from the perspective of architecture design
When we design the program, for example, considering some abnormal situations such as network failure, we will set the number of message retries, and there may be other possible message repetitions, so how should we solve it? Three options are provided below:
Option 1: Save and query
Set a unique uuid for each message, and we must store a uuid for all messages.
When we consume a message, we first go to the persistent system to check to see if it has been consumed before. If it has not been consumed, it is being consumed. If it has been consumed, it is fine to discard it.
The diagram below illustrates this scenario:
Option 2: Use idempotency
Idempotence is defined mathematically as follows: if a function f(x) satisfies: f(f(x)) = f(x), then the function f(x) satisfies idempotency.
This concept has been extended to the computer domain and is used to describe an operation, method or service. The characteristic of an idempotent operation is that any multiple executions of it have the same effect as a single execution.
An idempotent method, using the same parameters, calls it multiple times and calls it once, and the impact on the system is the same. So, for idempotent methods, there is no need to worry about any changes to the system due to repeated execution.
Let's take an example to illustrate. Without considering concurrency, "set teacher X's account balance to 1 million yuan", the impact on the system after one execution is that teacher X's account balance becomes 1 million yuan.
As long as the provided parameter is 1 million yuan, no matter how many times it is executed, Mr. X's account balance will always be 1 million yuan and will not change. This operation is an idempotent operation.
To give another example, "add 1 million yuan to teacher X's balance", this operation is not idempotent. Each time it is executed, the account balance will increase by 1 million yuan. The impact of executing multiple times and executing once on the system ( That is, the account balance) is different.
Therefore, through these two examples, we can think that if the business logic of the system consuming messages is idempotent, then there is no need to worry about the problem of message duplication, because the same message, consuming once and consuming multiple times has a complete impact on the system the same. It can also be considered that consumption multiple times is equal to consumption once.
So, how to implement idempotent operations? The best way is to start from the business logic design, and design the business logic of consumption as an idempotent operation.
However, not all businesses can be designed to be naturally idempotent, and some methods and techniques are needed here to achieve idempotency.
Below we introduce a common method: using the unique constraint of the database to achieve idempotency.
For example, the example of the transfer that does not have the idempotency feature we just mentioned: add 1 million yuan to Teacher X's account balance. In this example, we can make it idempotent by transforming the business logic.
First of all, we can limit that only one change operation can be performed per account for each transfer order. In a distributed system, there are many ways to implement this limitation. The simplest is to build a transfer flow table in the database.
This table has three fields: transfer note ID, account ID and change amount, and then combine the two fields of transfer note ID and account ID to create a unique constraint, so that for the same transfer note ID and account ID, at most only A record can exist.
In this way, our logic for consuming messages can be changed to: "Add a transfer record to the transfer flow table, and then update the user's balance asynchronously based on the transfer record.
In the operation of adding a transfer record to the transfer flow table, since we have pre-defined the unique constraint of "account ID and transfer note ID" in this table, only one record can be inserted into the same account for the same transfer note, and subsequent repeated insertions The operation will fail, thus implementing an idempotent operation.
Knowledge Base Team
Knowledge Base Team
Knowledge Base Team
Knowledge Base Team
Explore More Special Offers
50,000 email package starts as low as USD 1.99, 120 short messages start at only USD 1.00