Detailed explanation of Flink Connector

one. Overview of connector Connecter - a bridge between Flink and external systems

1. Connector Connector

Important sources and destinations of Flink's data

Connectors are the bridge between Flink and external systems. For example, we need to read data from Kafka, process the data in Flink, and then write it back to external systems such as HIVE and elastic search.

Event control in the processing flow: event processing watermark (watermark), checkpoint alignment record
Load balancing: reasonably allocate data partitions according to different concurrent loads
Data parsing and serialization: Our data may be stored in binary form in the external system, and may be stored in the form of various columns in the database. After we read it into Flink, we need to analyze it before we can perform subsequent data processing. So we also need to perform a serialization operation on the data when writing it back to the external system - converting it into the corresponding storage format in the external system for storage.

The figure above shows a very typical example.

We first read some of the records from Kafka through Source. Then send these records to some operators in Flink for corresponding operations, and then write them out to elastic search through the sink, so the source and sink serve as an interface at both ends of the Flink job.

two. Source API- the entrance of Flink data

1. Source interface evolution

Before Flink 1.10, Source was the two interfaces on the left: SourceFunction API (used to process streaming data), InputFormat API (used to process batch data). After Flink 1.10, the community introduced a new Source API and refactored the entire Source. So why should our community do such a job?

Inconsistent implementation of batch flow: As the ecology continues to grow, some problems have been exposed in the old API. The most intuitive problem is the inconsistency of batch flow implementation.
The interface is simple but the implementation is complex: the previous API may have a relatively simple interface implementation, but in fact, for developers, when implementing this interface, all logic and operations are very complicated to implement. Not friendly enough either.
Therefore, based on these problems, a new Source API design is proposed in FLIP-27. It has two characteristics:
Batch stream unification: streaming data processing and batch data processing do not need to maintain two sets of codes, and one set of codes is enough.
Simple implementation: Source API defines many conceptual abstractions. Although these abstractions may seem complicated, they actually simplify the developer's development work.

2. Core abstraction

1) Record Fragmentation (Split)

numbered collection of records

Take Kafka as an example. Kafka's fragmentation can be defined as an entire partition, or as a certain part of a partition. For example, if I start consuming data with an offset of 100, we define a shard between 200 and 201~300 as another shard, which is also possible. As long as it is a collection of records and we give it a unique number, we can define such a record fragment.

Progress can be tracked

We need to record the current processing position in this shard. When we record the checkpoint, we need to know what is currently processed, so that once a failure occurs, we can directly recover from the failure.

Record all the information of the shard

Taking Kafka as an example, information such as the start and end points of a partition must be included in the entire record fragment. Because we also use record shards as the unit when doing checkpoints, so the information in the record resolution should also be self-consistent.

2) Record Fragmentation Enumerator (Split Enumerator)

Discover record shards: detect the presence of shards in external systems
Assign record fragments: Enumerator exists in the role of a coordinator. It needs to assign tasks to our Source reader.

Coordinate Source readers: For example, the progress of some readers may be too fast. At this time, tell him to slow down a little to ensure that the watermarks are roughly consistent.

3) Source Reader (Source Reader)

Read data from record shards: read data according to the record shards assigned by the enumerator
Event time watermark processing: It is necessary to extract the event time from the data we read from the external system, and then perform the corresponding watermark sending operation.

Data analysis: deserialize the data read from the external system and send it to the downstream operator

3. Enumerator-Reader Architecture

The fragment enumerator runs on the Job Master, and the source reader runs on the Task Executor. Therefore, the enumerator is the role of the leader, the coordinator, and the reader is the role of the executor.
Their checkpoint storage is also separate, but there will be some communication between them. For example, the enumerator needs to assign tasks to the reader, and also inform the reader that there are no more fragments to be processed in the future. Due to a different operating environment, there will inevitably be some network communication between the two of them. Then there is the following definition of the communication stack.

Some events are determined on this communication stack to provide developers with their own implementation.

First of all, the top layer is Source Event, which is left to developers to define some customized operations. For example, if a Source is designed now, the reader may suspend reading under certain conditions, then the SplitEnumerator can be sent to the Source Reader through this Source event.

Secondly, the lower layer is called Operator Coordinator, the coordinator of operators. It communicates with the operator that actually performs the task through the Operator Event operator event. We have defined some operator events in advance, such as adding shards, notifying our leader that there are no new shards, etc. These events, which are common to all Sources, are abstracted at the Operator Event level.

Address Lookup is used to locate which Operator the message should be sent to. Because after the entire job of Flink is executed, there will be a directed acyclic graph. Different operators may run on different Task Managers, so how to find the corresponding task and the corresponding operator is the task of this layer.

Due to the existence of network communication, there is an RPC Gateway between Job Master and Task Executor. All Events will eventually be transmitted over the network through RPC Gateway and RPC calls.

4. Source reader design

In order to simplify the practical steps of Source Reader and reduce the work of developers, the community has provided SourceReaderBase for everyone. Users can directly inherit the SourceReaderBase class during development, which greatly simplifies some development work for developers. Then we next analyze SourceReaderBase. It seems that there are a lot of components in this picture, but in fact we can break it down into two parts to understand.

Taking the elementQueue queue in the middle as the boundary, the part marked in blue on the left side of the queue is the component that needs to deal with the external system, and the part marked in orange on the right side of the elementQueue is the part that deals with the engine side of Flink.

First, the left side is composed of one or more fragmented readers. Each reader is driven by a Fetcher, and multiple Fetchers are managed by a Fetcher Manager. There are also many implementations here. For example, only one thread and one SplitReader can be opened, and multiple partitions can be consumed through this one reader. In addition, we can also open multiple threads according to the requirements - one thread runs one feature and one reader, and each reader is responsible for a partition to consume data in parallel. These completely depend on the user's implementation and choice.
For performance considerations, each SplitReader fetches a batch of data from the external system and puts them into elementQueue. As shown in the figure, in this blue box is a batch of data taken down each time, and then the orange box is each piece of data under this batch of data.

Second, the right side of elementQueue is composed of RecordEmitter and SourceOutput. RecordEmitter sends each record to another SourceOutput downstream to output the record. Each time RecordEmitter will take a batch of data from the middle elementQueue and send them downstream one by one. Since the RecordEmitter is driven by the main thread, the current design of the main thread uses a lock-free mailbox model, which divides the work that needs to be executed into one by one mail, and each time the worker thread takes out a mail from the mailbox and then to work, so we should note that the implementation here must be non-blocking.

RecordEmitter will report to the downstream every time it sends data downstream - whether there will be subsequent data to be processed later. At the same time, we will also record the processing progress of the current fragment in SplitStates, recording its current state and where it has been processed.

When SplitEnumerator discovers a new split in the external system, it needs to call the addSplits method via RPC to add the new split to the reader. On the SplitFetchermanager side, new shards will be allocated according to the previously selected thread model (if there is only one thread, a new task will be assigned to this thread, and the reader will be asked to read the new shard. If the overall implementation is multi-threaded, then create a new thread and a new reader to process the fragments separately. Similarly, we also need to record the progress of the current processing in SplitStates.

5. Create a checkpoint

Next, let's take a look at how checkpoints are handled in the new Source API.
First, our coordinator on the left, the shard enumerator. As shown in the figure, it currently still has a shard (Split_5) that has not been allocated. The arrows in the middle are some fragments on the way. Dashed lines are the boundaries of this checkpoint. We can see that the second fragment is already in front of the checkpoint, the fourth fragment is behind the checkpoint, and the bottom reader is requesting a new fragment from the SplitEnumerator. Looking at the readers again, the three readers have been assigned to certain Splits and have undergone some processing. They already have Positions. Then let's take a look at what enumerators and readers need to store during checkpoints
Enumerator: unallocated record shard (Split_5), allocated uncheckpointed record shard (Split_4)

Reader: Allocated record splits (Split_0,1,3), record allocation status (Split_2)

6. Three simple steps to implement Source

1) Split/SplitState

Split: external system fragmentation

SplitSerializer: Serialize/deserialize Split to pass to SourceReader

SplitState: Split state, used for Checkpoint and recovery

2) SplitEnumerator

Discover and subscribe to Split

EnumState: The state of Enumerator, used for Checkpoint and recovery

EnumStateSerializer: serialize/deserialize EnumState

3) SourceReader

SplitReader: an interface for data interaction with external systems

FetcherManager: Select the threading model (currently available)

RecordEmitter: convert message type and process event time

If we think about it carefully, we will find that most of these things actually deal with external systems, that is to say, there are very few parts that deal with the Flink engine itself, and users no longer need to worry about checkpoint locks, multi-threading Problems, etc., can focus more development energy on the part of the development and interaction with external systems. Therefore, the new Source API greatly simplifies the development of developers through these abstractions.

three. Sink API- Export of Flink data

If you have a certain understanding of Flink, you will find that it can achieve exactly once semantics, and the data is neither repeated nor lost. So in order to achieve this "exact once", Flink has also done a lot of work, one of which is very important to realize the two-phase commit on the sink side.

1. Pre-submission stage

In the pre-submission phase, since our distributed system generally has this "coordinator 1 + executor n" mode, then in the pre-submission pre-submission phase, first of all, our coordinator needs to request submission , that is to say, he needs to send a message requesting submission to all executors, so as to start the entire two-phase submission.
When the executor receives the request to commit, he will do some preparations for the commit. After all the preparatory work is done, all his executors will reply to this coordinator stating that they are now ready for the next commit. When the coordinator receives the "continuable" requests from all executors, the pre-commit phase ends and we enter the second phase of our submission - the commit execution phase.

2. Submit the execution phase

The submitter will send a message to the executor to decide to submit, and the executor will process the submission-related things that have just been prepared to actually execute a submission action. After completion, the result of a reply will be reported to the coordinator, and the feedback submission will be executed normally.

Once the coordinator decides to enter the second submission execution phase, all executors must execute the order without compromise. That is to say, if a certain coordinator has a problem at this stage, he still has to implement this decision after he recovers. That is to say, once the decision is made to submit, the executor must carry out the action of submitting.

If an executor in the pre-submission stage may have some failures when he is about to submit, and does not perform the correct submission action, then he may respond to an error to the coordinator, such as the network is broken, or it may time out after a period of time After that, the coordinator does not receive the response request from the No. 3 executor, then the coordinator will trigger the rollback action of the second stage. That is to say, all executors will be told "this submission attempt failed, and everyone needs to roll back to the previous state". Then our executor will have a rollback action to undo the previous operation.

3. The practice of two-phase commit in Flink

1) Pre-submission stage

Take the Sink of this file system as an example.
The Sink of the file system performs a pre-commit action after receiving the checkpoint boundary (writing the current data to a temporary file on the hard disk). When the pre-commit phase is completed, all operators will report to our coordinator Reply to the message "Ready for commit".

2) Submit the execution phase

The second phase, the commit execution phase starts. The JobManager will send an instruction to submit to all operators, and after receiving the instruction, the Sink will actually perform the final submission action.

Let's take the file system as an example. As we just said, the data is written to a temporary file during the pre-submission stage. Then when the actual submission is performed, the temporary file will be defined in advance. The canonical rename of this name is equivalent to implementing a commit.

It should be noted here that the setting of temporary files is not useless, and it has a foreshadowing effect on possible subsequent rollbacks and other situations. We cleverly used the two-phase commit mechanism to ensure exactly-once semantics.

4. Sink model

1) Writer: Responsible for writing the continuous upstream data to a certain state in the middle during the writing or pre-committing phase.

2) Committable: The "intermediate state" mentioned above is a component that can perform this commit operation.

3) Committer: actually submit the Committable

4) Global Commiter: global committer. This component is optional and depends on your external system. Example: Iceberg.

Four. future development

Improve the new Source

Because Source and Sink have just been launched, relatively speaking, there are still some problems. Some developers may have some new requirements, new updates and enhancements. At present, it has been considered a relatively stable state, but it still needs to be continuously improved.

Migrating existing connectors to the new API

With the continuous advancement of the stream-batch integrated connector, all connectors will be migrated to the new API.

Connector Testing Framework

The connector testing framework tries to provide a relatively consistent and unified testing standard for all connectors. Test developers no longer need to write some cases by themselves, consider various test environments, test scenarios, and so on. Let our developers quickly test their own code with different scenarios and different use cases like building blocks, so that more development energy can be concentrated on the development of the logic itself, greatly reducing the developer's testing burden. This is also the consistent goal of Source API, Sink API and subsequent framework development. It is to make connector development easier and lower the threshold, so as to attract more developers to contribute to the Flink ecosystem.

Related Articles

Explore More Special Offers

  1. Short Message Service(SMS) & Mail Service

    50,000 email package starts as low as USD 1.99, 120 short messages start at only USD 1.00

phone Contact Us