Stream Processing with Apache Flink

1. Parallel processing and programming paradigms

As we all know, for computing-intensive or data-intensive work that requires a relatively large amount of calculation, parallel computing or divide and conquer is a very effective means to solve this type of problem. The key part of this method is how to divide an existing task, or how to allocate computing resources reasonably.

For example, during school, teachers sometimes ask classmates to help mark exam papers. If there are three ABC questions in the paper, then students may have the following division of labor and cooperation.

Method 1: Give the three questions of all the test papers to different people for review. In this way, after each reviewing student approves the topic he is responsible for, he can pass the test paper to the next reviewing student, thus forming a kind of assembly line work effect. Because there are only three questions in total, this kind of assembly line collaboration will be difficult to continue to expand as the number of students increases.

Method 2: The expansion of method 1 of the division of labor. Multiple students are allowed to review the same topic. For example, topic A is reviewed by two students, topic B is reviewed by three students, and topic C is reviewed by only one student. At this time, we need to consider how to further divide the computing tasks. For example, all students can be divided into three groups, the first group is responsible for topic A, the second group is responsible for topic B, and the third group is responsible for C. The students in the first group can divide the work within the group again, for example, the first student in group A approves half of the test papers, and the second student marks the other half of the test papers. After they finished the batching, they passed the test papers in their hands to the next group.
As mentioned above, the division according to the topics in the test paper and the division of the test paper itself are the so-called computing parallelism and data parallelism.

We can represent this parallelism with the DAG above.

In the picture, the students who are reviewing topic A are supposed to undertake some additional tasks, such as taking the test papers from the teacher's office to the place where the test papers are reviewed; the students who are in charge of topic C also have additional tasks, that is, after all the students have finished the test papers , carry out the statistics of the total score and record the work handed in. Accordingly, all the nodes in the graph can be divided into three categories. The first category is Source, which is responsible for obtaining data (taking test papers); the second category is data processing nodes, which do not need to deal with external systems most of the time; the last category is responsible for writing the entire calculation logic to an external system (system points and submit records). These three types of nodes are Source node, Transformation node and Sink node. In the DAG diagram, nodes represent calculations, and the connections between nodes represent dependencies between calculations.

Something about programming

Suppose there is a data set, which contains ten numbers from 1 to 10, how to do it if you multiply each number by 2 and perform cumulative summation (as shown in the figure above)? There are many ways.

If you use programming to solve it, there are two angles: the first is to adopt imperative programming, which is equivalent to telling the machine how to generate some data structures step by step, how to use these data structures to store some temporary intermediate results, and how to store These intermediate results are then converted into final results, which is equivalent to telling the machine how to do it step by step; the second is the declarative way. In declarative programming, you usually only need to tell the machine what task to complete, instead of the imperative way. Pass in detail. For example, we can convert the original data set into a Stream, and then convert the Stream into an Int type Stream. In the process, multiply each number by 2, and finally call the Sum method to get all the numbers. of and.

The code of the declarative programming language is more concise, and the concise development method is exactly the effect pursued by the computing engine. Therefore, all APIs related to task writing in Flink are declarative.

2. DataStream API overview and simple application

Before introducing the DataStream API in detail, let's take a look at the logical level of the Flink API.

In the old version of Flink, its API level follows the four-level relationship on the left side of the figure above. The top layer means that we can use a more advanced API, or a more declarative Table API and SQL to write logic. All content written by SQL and Table API will be internally translated and optimized by Flink into a program implemented by DataStream API. At the next level, the program of the DataStream API will be expressed as a series of Transformations, and finally the Transformations will be translated into JobGraph (that is, the DAG introduced above).

However, some changes have taken place in the newer version of Flink, the main changes are reflected in the Table API and SQL layer. It will no longer be translated into a DataStream API program, but will directly reach the underlying Transformation layer. In other words, the relationship between the DataStream API and the Table API has changed from a lower-level relationship to an upper-level relationship. This simplification of the process will bring some query optimization benefits accordingly.

Next, we use a simple DataStream API program as an example to introduce, or the requirement of multiplying by 2 and summing above.

If represented by Flink, its basic code is shown in the figure above. It seems a little more complicated than the stand-alone example, let's break it down step by step.

First of all, to implement any function with Flink, you must obtain a corresponding operating environment, that is, Sream Execution Environment;

Secondly, after obtaining the environment, you can call the add Source method of the environment to add an initial data source input for the logic; after setting the data source, you can get the reference of the data source, which is the Data Source object;

Finally, a series of transformation methods can be called to transform the data in the Data Source.

This conversion is shown in the figure, which is to divide each number by 2, and then we must use keyBy to group the data in order to sum. The constant passed in means to divide all the data into a group, and finally accumulate all the data in this group according to the first field, and finally get the result. After getting the result, you can't simply output it like a stand-alone program, but you need to add a Sink node in the whole logic to write all the data to the target location. After the above work is completed, it is necessary to call the Execute method in the Environment, and submit all the logic written above to a remote or local cluster for execution.

The biggest difference between the Flink DataStream API program and the stand-alone program is that the first few steps of the program will not trigger the calculation of the data, but it is like drawing a DAG graph. After the entire logical DAG graph is drawn, the entire graph can be submitted to the cluster as a whole for execution through the Execute method.

The introduction here links the Flink DataStream API with the DAG graph. In fact, the specific generation process of Flink tasks is much more complicated than that described above. It needs to be transformed and optimized step by step. The following figure shows the specific generation process of Flink jobs.

Conversion operations provided in the DataStream API

As shown in the sample code above, each DataStream object will generate a new transformation when the corresponding method is called. Correspondingly, the bottom layer will generate a new operator, which will be added to the existing logical DAG graph. It is equivalent to adding a connection to point to the last node of the existing DAG graph. All these APIs will generate a new object when calling it, and then you can continue to call its conversion method on the new object. It is like this chained way to draw the DAG diagram step by step.

The above explanation involves some higher-order function ideas. Every time you call a transformation on the DataStream, you need to pass it a parameter. In other words, the conversion determines what operation you want to perform on the data, and the function actually passed in the operator determines how the conversion operation should be completed.

In the figure above, in addition to the APIs listed on the left, there are two very important functions in the Flink DataStream API, which are ProcessFunction and CoProcessFunction. These two functions are provided to users as the bottom processing logic. All the conversions involved in the blue on the left side of the above figure can theoretically be completed with the underlying ProcessFunction and CoProcessFunction.

About data partitions

Data partitioning refers to the operation of data shuffling in traditional batch processing. If you think of playing cards as data, the Shuffle operation in traditional batch processing is equivalent to the process of sorting cards. Under normal circumstances, in the process of drawing cards, we will arrange the cards in order, and put the same numbers together. The biggest advantage of doing this is that you can find the card you want to play at once when you play the cards. Shuffle is a traditional batch processing method. Because all data in stream processing comes dynamically, the process of card sorting or data processing, grouping or partitioning is also done online.

For example, as shown on the right side of the figure above, there are two processing instances of operator A upstream, and three processing instances of operator B downstream. The stream processing equivalent to Shuffle shown here is called data partitioning or data routing. It is used to indicate which processing instance of downstream B to send the results to after A finishes processing the data.

Partitioning strategies provided in Flink

Figure X is the partition strategy provided by Flink. It should be noted that after DataStream calls the keyBy method, the entire data can be partitioned according to a Key value. But strictly speaking, keyBy is not actually the underlying physical partition strategy, but a conversion operation, because from the API point of view, it will convert DataStream into KeyedDataStream, and the operations supported by the two are also different. different.

Among all these partitioning strategies, Rescale may be a little difficult to understand. Rescale involves the locality of upstream and downstream data. It is similar to traditional Rebalance, that is, Round-Pobin, which is allocated in turn. The difference is that Rescale will try to avoid the transmission of data across the network.

If all the above partition strategies are not applicable, we can also call PartitionCustom to customize a data partition. It is worth noting that it is only a custom unicast, that is, for each data, only one instance to be sent downstream can be specified, and there is no way to copy it into multiple copies and send it to multiple downstream instances.

Connectors supported by Flink

As mentioned above, there are two key nodes in Figure X: node A, which needs to connect to the external system, and read data from the external system into the Flink processing cluster; node C, that is, the sink node, which needs to be aggregated and processed , and then write this result to some external system. The external system here can be a file system, or a database, etc.

The calculation logic in Flink may not have data output, that is to say, the final data may not be written out to the external system, because Flink also has a concept of state. The results calculated in the middle can actually be exposed to external systems through State, so it is allowed to have no dedicated Sink. But every Flink application must have a source, that is to say, the data must be read in from somewhere before subsequent processing can be performed.

The points to pay attention to about the Source and Sink types of connectors are as follows:

For Source, we are often more concerned about whether to support continuous monitoring and access to data updates, and then transmit the corresponding updated data to the system. For example, Flink has a corresponding FileSystem connector for files, such as CSV files. When the CSV file connector is defined, parameters can be used to specify whether to continuously monitor the file changes of a certain directory and access the updated file.

For Sink, we often care about whether the external system to be written supports updating the written results. For example, if you want to write data to Kafka, usually the data writing is Append-Only, that is, you cannot modify the records already written in the system (the community is using Kafka Compaction to implement Upsert Sink); if you write to the database, then Updates to existing data using primary keys are generally supported.

The above two characteristics determine the key point of whether the connector in Flink is oriented to static data or dynamic data.

As a reminder, the above screenshot is a document after Flink 1.11, and the connector has been refactored in Flink 1.11. In addition, connectors at the Table, SQL, and API levels will take on more tasks than connectors at the DataStream level. For example, whether it supports pushdown of some predicates or projection operations, etc. These features can help improve the overall performance of data processing.

3. State and time in Flink

If you want to understand the DataStream API in depth, state and time are the main points that must be mastered.

All computing can be simply divided into stateless computing and stateful computing. Stateless computing is relatively easy. Assuming that there is an addition operator here, every time a set of data comes in, they are all added up, and then the result is output, which is a bit like a pure function. A pure function means that the result of each calculation is only related to the input data, and the previous calculation or external state will not have any influence on it.

Here we mainly talk about stateful computing in Flink. Take the small game of picking branches as an example. What this game does very well in my opinion is that it records a lot of status by itself. For example, if you haven’t been online for a few days, and then talk to the NPC inside, it will tell you that you haven’t been online for a long time. . In other words, it will record the previous online time as a state, and it will be affected by this state when generating its NPC dialogue.

To realize this kind of stateful calculation, one thing to do is to record the previous state, and then inject this state into a new calculation. There are two specific implementation methods:

The first is to extract the state data before it enters the operator, then combine the state data and input data, and then input them into the operator at the same time to obtain an output. This method is used in Spark's StructureStreaming. The advantage is that existing stateless operators can be reused.

The second is Flink's current method, that is, the operator itself is stateful. When the operator performs calculations after receiving new data, it also considers the impact of the new input data and the existing state on the calculation process, and finally Output the result.

The computing engine should also become more and more intelligent like the games mentioned above, which can automatically learn the underlying laws in the data, and then adaptively optimize the computing logic to maintain high processing performance.

Flink's state primitives

Flink's state primitives refer to how to use Flink's state through code. The basic idea is to discard the data containers provided by native languages (such as Java or Scala) when programming, and replace them with state primitives in Flink.

As a system with better state support, Flink internally provides a variety of optional state primitives that can be used. From a large perspective, all state primitives can be divided into two types: Keyed State and Operator State. There are relatively few applications of Operator State, so we will not introduce them here. Let's focus on Keyed State.

Keyed State, that is, the state of the partition. The advantage of partitioned state is that the existing state can be divided into different blocks according to the partition provided by logic. The calculation and state in the block are bound together, and the calculation and state reading and writing between different Key values ​​are isolated. For each key value, you only need to manage your own calculation logic and state, and you don't need to consider the logic and state corresponding to other key values.

Keyed State can be further divided into the following five categories, which are:

More commonly used: ValueState, ListState, MapState
Less commonly used: ReducingState and AggregationState
Keyed State can only be used in RichFuction. The biggest difference between RichFuction and ordinary and traditional Function is that it has its own life cycle. The use of Key State is divided into the following four steps:

The first step is to declare State as a variable of the instance in RichFunction
In the second step, in the open method corresponding to RichFunction, perform an initial assignment operation for State. There are two steps in the assignment operation: first create a StateDescriptor, and specify a name for the State during creation; then call getRuntimeContext().getState(…) in RichFuntion, pass in the StateDescriptor just defined, and you can get the State .
Reminder: If the streaming application is running for the first time, the obtained State will be empty; if the State is restarted from an intermediate stage, it will be restored based on the configuration and previously saved data.

In the third step, after obtaining the State object, you can read and write the corresponding State in the RichFunction. If it is ValueState, you can call its Value method to get the corresponding value. The Flink framework will control and limit concurrent access to all states, so users do not need to consider concurrency issues.

Time for Flink

Time is also a very important point in Flink, it and State are complementary. Generally speaking, there are two types of time provided in the Flink engine: the first type is Processing Time; the second type is Event Time. Processing Time represents the time in the real world, and Event Time is the time contained in the data. During the data generation process, fields such as timestamps will be carried, because in many cases, the timestamps carried in the data need to be used as a reference, and then the data is processed by time.

Processing Time is relatively simple to deal with, because it does not need to consider issues such as out-of-order; and Event Time is relatively complicated to deal with. Since Processing Time is used to directly call the system time, considering the uncertainty of multi-threaded or distributed systems, the result of each operation may be uncertain; on the contrary, because the Event Time timestamp is used It is written into each piece of data, so when replaying a certain data for multiple processing, the timestamps carried will not change. If the processing logic does not change, the final result is relatively certain.

The difference between Processing Time and Event Time.

The data in the above figure is an example, arranged according to the time from 1 to 7. For machine time, each machine's time increases monotonically. In this case, the time obtained with Processing Time is perfect for data sorted from smallest to largest time. For Event Time, due to some reasons of delay or distribution, the order of data arrival may be different from the order in which they are actually generated, and the data may be out of order to a certain extent. At this time, it is necessary to make full use of the timestamp carried in the data to divide the data in a coarse-grained manner. For example, the data can be divided into three groups, the minimum time in the first group is 1, the minimum time in the second group is 4, and the minimum time in the third group is 7. After this division, the data is arranged in ascending order between groups.

How to fully resolve a certain degree of disorder, so that the entire system looks like the data is basically in order? One solution is to insert meta data called Watermark in the middle of the data. In the example above, after the arrival of the first three data, assuming that there is no data less than or equal to 3, then a Watermark 3 can be inserted into the entire data. No data less than or equal to 3 will come, and then it can safely and boldly carry out some of its own processing logic.

To sum up, Processing Time is strictly increasing when used; and Event Time will have certain disorder, which needs to be alleviated by Watermark.

From the API point of view, how to allocate Timestamp or generate Watermark is relatively easy, there are two ways:

The first one is to call the collectWithTimestamp method provided internally in the SourceFunction to extract the data containing the timestamp; you can also use the emitWatermark method in the SourceFunction to generate a Watermark, and then insert it into the data stream.

Second, if it is not in SourceFunction, you can call the DateStream.assignTimestampsAndWatermarks method, and pass in two types of Watermark generators at the same time:

The first type is periodic generation, which is equivalent to configuring a value in the environment, such as how often (referring to real time) the system will automatically call the Watermar generation strategy.

The second type is generated based on special records. If you encounter some special data, you can use the AssignWithPunctuatedWatermarks method to assign timestamps and Watermarks.

Reminder: Flink has built-in some commonly used assigners, namely WatermarkAssigner. For example, for a fixed data, it will subtract the fixed time from the timestamp corresponding to the data as a Watermark. Regarding the Timestamp allocation and Watermark generation interfaces, there may be certain changes in subsequent versions. Note that the new version of Flink has unified the above two types of generators.

Time related API

The time-related APIs that Flink will use when writing logic, the following figure summarizes the APIs corresponding to Event Time and Processing Time.

Three things can be accomplished through interface support in application logic:

First, get the recorded time. Event Time can adjust context.getTimestamp, or extract the corresponding time from the data field in the SQL operator. Processing Time can be called directly by calling currentProcessingTime, and its interior is the value returned by directly calling the static method of obtaining the system time.

Second, get Watermark. In fact, the concept of Watermark exists only in Event Time, but not in Processing Time. But in Processing Time, if you have to regard something as a Watermark, it is actually the data time itself. That is to say, the value obtained after calling the timerService.currentProcessingTime method for the first time. This value is not only the current recorded time, but also the current Watermark value, because time always flows forward. After calling this value for the first time, the value will definitely not be lower than the first value in the second call. Small.

Third, register the timer. The role of the timer is to clean up. For example, it is necessary to clean up a cache at a certain time in the future. Since the cleaning work should happen at some point in the future, you can call the timerServicerEventTimeTimer or ProcessingTimeTimer method to register the timer, and then add a processing logic for the timer callback in the entire method. When the corresponding Event Time or Processing Time exceeds the timer setting time, it will call the method to write the destruction logic of the timer by itself.

Related Articles

Explore More Special Offers

  1. Short Message Service(SMS) & Mail Service

    50,000 email package starts as low as USD 1.99, 120 short messages start at only USD 1.00

phone Contact Us