From Spark for batch processing to Flink for streaming and batch integration
What are the benefits of integrating flow and batch, especially in the context of BI/AI/ETL. On the whole, if we can help users achieve the integration of flow and batch, there will be the above four obvious benefits:
Can avoid code duplication and reuse code core processing logic
It is best if the code logic can be completely consistent, but this will be difficult. But on the whole, the current business logic is getting longer and more complex, and there are many requirements. If we use different frameworks and different engines, users have to rewrite the logic every time, which is very stressful and difficult to maintain. . So on the whole, it is particularly important to avoid code duplication as much as possible and help users reuse code logic.
Flow batch has two directions
The issues to be considered in these two directions are very different. At present, some frameworks such as Flink for Streaming and Spark for Batch are relatively mature in batch processing or stream processing, and have produced many unilateral users. When we want to help users move to another direction, such as some business requirements, we usually divide them into two categories, whether to start from stream processing to batch processing, or to start from batch processing to stream processing. The two production practice scenarios introduced later correspond to these two directions.
Reduce maintenance workload
Avoid maintaining multiple systems. The differences between the systems may be very large, and the frameworks and engines are different, which will cause more problems. If there are multiple pipelines in the company, one in real time and one offline, it will cause data inconsistency. Therefore, a lot of work will be done in data verification, data accuracy query, data storage, etc., to maintain data consistency as much as possible.
There are many frameworks and engines, and business logic must run both in real time and offline. Therefore, there are many things to learn when supporting users.
2. Current status of the industry
Both Flink and Spark are engines that support both stream processing and batch processing. We agree that Flink's stream processing is better, so how good can its batch processing be? At the same time, Spark's batch processing is relatively good, so can its stream processing be enough to help users solve existing needs?
Now there are various engine frameworks, can there be a unified framework on top of them, similar to federated processing or some simple physical APIs, such as Beam API or custom interfaces.
The question that needs to be considered in Beam is how well can it optimize batch and stream processing? Beam is currently still partial to physical implementation, and we need to study the future plan.
LinkedIn, including other companies, will consider making some custom interface solutions, considering a common SQL layer, a common SQL or API layer, and running different framework engines underneath. The issue that needs to be considered here is that frameworks like Spark and Flink are relatively mature and already have a large number of user groups. When we propose a new API, a new solution, what is the user acceptance? How should a new solution be maintained within the company?
3. Production case scenario
The following content mainly focuses on the effect of Flink as a batch, a simple comparison between Flink and Spark, and some internal solutions of LinkedIn. Share two production example scenarios, one is how to do flow-batch integration when machine learning feature engineering is generated, and the other is how to do flow-batch integration in complex ETL data flow.
3.1 Case A - Machine Learning Feature Engineering
The first type of direction, stream processing -> batch processing, is classified as stream-batch integration.
The main logic of case A is how to integrate stream and batch from stream processing to batch processing when doing feature generation in machine learning. The core business logic is feature transformation. The transformation process and logic are complicated, so use it to do some standardization.
For example, the background of some member information entered on the LinkedIn page, etc., needs to be extracted and standardized in order to make some recommendations, help you find some jobs, and so on. When the member's identity information is updated, there will be filtering and preprocessing logic, including the process of reading Kafka, and there may be some small table queries during the feature conversion process. This logic is very straightforward, without complicated join operations and other data processing processes.
Previously its pipeline was real-time, requiring periodic updates of the stream by reading supplementary information from the offline pipeline. This kind of backfill puts a lot of pressure on the real-time cluster. When backfilling, you need to wait for the backfill to work, and you need to monitor the workflow to prevent the real-time cluster from going down. Therefore, the user asks whether it is possible to do offline backfill, and does not want to do backfill through real-time stream processing.
Currently our users are using Beam on Samza for stream processing. They are very familiar with Beam API and Spark Dataset API, and will also use Dataset API to do some other business processing besides backfill.
What needs to be emphasized is that many Dataset APIs operate directly on Objects and have high requirements for type security. It is impractical to suggest that these users directly change to workflows such as SQL or DataFrame, because their existing business logic is not It is to directly operate and convert Object.
In this case, we can provide users with some options, Imperative API. Take a look at the solutions offered by the industry:
The first choice is the Flink DataStream API that is about to be unified. We also investigated the Flink DataSet API (deprecated) when we were doing program evaluation before. The DataStream API can be unified, and the support for stream processing and batch processing is relatively good. Perfect. But the disadvantage is that, after all, it is an Imperative API, and there may not be many optimizations, and it should continue to be optimized in the future. See FLIP-131: Consolidate the user-facing Dataflow SDKs/APIs (and deprecate the DataSet API) and FLIP-134: Batch execution for the DataStream API.
The second choice is Spark Dataset, which is also a natural choice for users. Dataset API can be used for Streaming, which is different from Flink’s Dataset, DataStream API and other physical APIs. It is based on Spark Dataframe SQL engine to do some type safety, and the degree of optimization is relatively better. See the articles Databricks: Introducing Apache Spark Datasets and Spark Structured Streaming Programming Guide: Unsupported-operations.
The third option is Beam On Spark, which currently mainly uses RDD runners. It is still difficult to support runners with optimization. Later, I will talk about some ongoing work of Beam in case B in detail. See Beam Documentation - Using the Apache Spark Runner and BEAM-8470 Create a new Spark runner based on Spark Structured streaming framework.
But the API of Beam is very different from Flink and Spark. It is an ecosystem of Google. We have helped users solve some problems before. Their workflow is on Beam on Samza, and they use p collections or p transformation to write Some business logic, the signatures of output and input methods are very different, we have developed some lightweight converters to help users reuse existing business logic, which can be better used in rewritten Flink or Spark jobs.
From the perspective of DAG, Case A is a very simple business process, which is to convert Objects simply and directly. In this case, the performance of Flink and Spark is very close.
Usually, we use the Flink Dashboard UI to look at some exceptions, business processes, etc., which is an obvious advantage over Spark. Spark to query the Driver log, query exception is more troublesome. But Flink still has several areas that need to be improved:
History Server - supports richer Metrics, etc.
The metrics presented by the Spark History Server UI are relatively rich, which is of great help to users in performance analysis. Whether Flink does batch processing can also allow Spark users to see the same amount of metrics information, so as to reduce the difficulty of user development and improve user development efficiency.
Better batch operation and maintenance tools
Share something that LinkedIn has been doing since two or three years ago. LinkedIn has 200,000 jobs running on the cluster every day, and needs better tools to support batch users to operate and maintain their own jobs. We provide Dr. Elephant and GridBench to help users debug and operate their own jobs.
Dr. Elephant is open source, which can help users better debug jobs, find problems and provide suggestions. In addition, before the test cluster is transferred to the production cluster, it will be decided whether to allow production according to the score of the evaluation result in the report generated by Dr. Elephant.
GridBench mainly does some data statistical analysis, including CPU method hotspot analysis, etc., to help users optimize and improve their jobs. GridBench also plans to open source in the future, which can support various engine frameworks, including adding Flink, and Flink jobs can be better evaluated with GridBench. GridBench Talk: Project Optimum: Spark Performance at LinkedIn Scale.
Users can not only see the reports generated by GridBench and Dr. Elephant, but also see some of the most basic information of the job through the command line, such as application CPU time, resource consumption, etc., and can also compare the differences between different Spark jobs and Flink jobs. Conduct comparative analysis.
The above are the two areas where Flink batch processing needs to be improved.
3.2 Case B - Complex ETL data flow
The second type of direction, batch processing -> stream processing, is classified as stream-batch integration.
The core logic of the ETL data flow is relatively complex, such as including the session window aggregation window, which calculates the user page views every hour, divides into different jobs, shares the page key in the metadata table in the middle, and processes the first job at 00 time points , the second job processes the 01 time point, does some sessionize operations, and finally outputs the results, which are divided into open session and close session, so as to incrementally process the data of each hour.
This workflow was originally offline incremental processing through Spark SQL, which is pure offline incremental processing. When users want to move jobs online to do some real-time processing, they need to rebuild a real-time workflow such as Beam On Samza. During the construction process, we have very close contact and communication with users, and users encounter many problems. Yes, including the reuse of the entire development logic, ensuring that the two business logics produce the same results, and where the data is finally stored, etc. It took a long time to migrate, and the final effect is not very good.
In addition, the user's job logic uses both Hive and Spark to write a lot of large and complex UDFs, and this migration is also a very heavy workload. Users are familiar with Spark SQL and Spark DataFrame API.
The black solid line in the figure above is the real-time processing process, and the gray arrow is mainly the batch processing process, which is equivalent to a Lambda structure.
For case B, the job includes many joins and session windows, and they also used Spark SQL to develop the job before. Obviously we have to start with the Declartive API, which currently provides three solutions:
The first choice is Flink Table API/SQL, which can do both stream processing and batch processing. The same SQL has comprehensive functional support, and stream processing and batch processing are also optimized. You can read the following articles Alibaba Cloud Blog: What's All Involved with Blink Merging with Apache Flink? and FLINK-11439 INSERT INTO flink_sql SELECT * FROM blink_sql.
The second option is Spark DataFrame API/SQL, which can also use the same interface for batch processing and stream processing, but Spark's stream processing support is still not strong enough.
The third option is Beam Schema Aware API/SQL. Beam is more of a physical API. Early work is currently being carried out on Schema Aware API/SQL, so it will not be considered for the time being. Therefore, the main analysis results and experience afterwards are obtained from the comparison between Flink Table API/SQL and Spark DataFrame API/SQL. See the articles Beam Design Document - Schema-Aware PCollections and Beam User Guide - Beam SQL overview.
From the user's point of view, Flink Table API/SQL and Spark DataFrame API/SQL are very close. There are some relatively small differences, such as keywords, rules, how to write join, etc., which will also bring certain benefits to users. If you are troubled, you will wonder if you are using it wrong.
Both Flink and Spark are well integrated with Hive, such as Hive UDF reuse, etc., which reduces the migration pressure by half for the UDF migration in Case B.
The performance of Flink in the pipeline mode is significantly better than Spark. It is conceivable that whether or not to disk will have a relatively large impact on performance. If a large number of disks are required, each stage must drop the data to the disk. If you read it again, the processing performance will definitely be worse than that of the pipeline mode that does not drop the disk. The pipeline is more suitable for short processing, and it still has a relatively large advantage in 20 minutes to 40 minutes. If the pipeline is longer, the fault tolerance cannot be compared with the batch mode. Spark's batch performance is still better than Flink's. This area needs to be evaluated based on the cases within the company.
Flink's support for window is obviously much richer than other engines, such as session window, which is very convenient for users to use. In order to realize the session window, our users specially wrote a lot of UDFs, including doing incremental processing, building all sessions, taking out records for processing, and so on. Now use the session window operator directly, saving a lot of development consumption. At the same time, window operations such as group aggregation are also supported by streaming batches.
UDFs are the biggest hurdle when migrating between engine frameworks. If the UDF is written in Hive, it is easy to migrate, because both Flink and Spark support Hive UDF very well, but if the UDF is written in Flink or Spark, migrating to any engine framework will Encountered very big problems, such as migrating to Presto to do OLAP near real-time query.
In order to realize the reuse of UDF, our LinkedIn has developed a transport project internally, which has been open sourced to github. You can read the blog published by LinkedIn: Transport: Towards Logical Independence Using Translatable Portable UDFs.
transport provides a user-oriented User API for all engine frameworks, provides a common function development interface, and automatically generates UDFs based on different engine frameworks, such as Presto, Hive, Spark, Flink, etc.
Using a common UDF API to connect all engine frameworks allows users to reuse their own business logic. Users can use it easily. For example, the following users develop a
To deal with the user's SQL migration problem, the user used to develop the job with Spark SQL before, and then wanted to use the stream-batch integration and change it to Flink SQL. There are still many engine frameworks at present. LinkedIn has developed a coral solution, which has been open sourced on github, and has also done some talks on facebook, including providing users with an isolation layer together with transport UDF so that users can better Achieve cross-engine migration and reuse your own business logic.
Look at the execution process of Coral. First, the familiar ASCII SQL and table attributes are defined in the job script, and then a Coral IR tree structure is generated, which is finally translated into the physical plan of each engine.
In the analysis of Case B, streams and batches are unified. When the cluster business volume is particularly large, users attach great importance to the performance, stability, and success rate of batch processing. Among them, Shuffle Service has a greater impact on batch processing performance.
4. Comparison of Shuffle Service on Spark and Flink
In-memory Shuffle, supported by Spark and Flink, is faster, but does not support scalability.
Hash-based Shuffle, Spark and Flink all support it. Compared with In-memory Shuffle, the fault tolerance support is better, but it also does not support scalability.
Sort-based Shuffle supports scalability for large Shuffles. It reads from the disk bit by bit for Sort match and then reads it back. It is also supported in FLIP-148: Introduce Sort-Based Blocking Shuffle to Flink.
External Shuffle Service, when the cluster is very busy, for example, when doing dynamic resource scheduling, external services are very important. It has better isolation of Shuffle performance and resource dependence, and can better schedule resources after isolation. FLINK-11805 A Common External Shuffle Service Framework is currently reopening.
Disaggregate Shuffle and the big data field all advocate Cloud Native, and the separation of computing and storage should also be considered in the design of Shuffle Service. FLINK-10653 Introduce Pluggable Shuffle Service Architecture Introduces the pluggable Shuffle Service architecture.
Spark has made a relatively large improvement to Shuffle Service. This work is also a magnet project led by LinkedIn, forming a paper called introducing-magnet (Magnet: A scalable and performant shuffle architecture for Apache Spark), which was included in LinkedIn blog 2020. Magnet obviously improves the efficiency of disk read and write. From a relatively small random range to a relatively large sequential read, it will also do some merging instead of random random read shuffle data, avoiding some problems of random IO.
Shuffle stability and scalability issues are mitigated by the Magent Shuffle Service. Before that, we found a lot of Shuffle problems, such as high job failure and so on. If you want to use Flink for batch processing and help users who used Spark for batch processing before, you really need to spend more effort on Shuffle.
In terms of shuffle availability, the best-effort method will be used to push shuffle blocks, ignoring some large blocks to ensure eventual consistency and accuracy.
Make a copy of the shuffle temporary data to ensure accuracy.
If the push process is particularly slow, there will be an early termination technique.
Compared with Vanilla Shuffle, Magent Shuffle reduces the waiting time for reading Shuffle data by almost 100%, the task execution time by almost 50%, and the end-to-end task duration by almost 30%.
LinkedIn is very recognized and happy to see that Flink has obvious advantages in stream processing and batch processing, and it is more unified and continuously optimized.
Flink's batch processing capabilities need to be improved, such as history server, metrics, and debugging. When users are developing, they need to look at some solutions from the user community. The entire ecosystem must be built so that users can use it conveniently.
Flink needs to invest more energy in the shuffle service and the offline workflow of large clusters to ensure the success rate of the workflow, and how to provide better user support and monitor the health of the cluster if the scale increases.
With the emergence of more and more framework engines, it is best to give users a more unified interface. The challenges in this area are relatively large, including development and operation and maintenance. According to the experience of LinkedIn, we still see many problems , it is not possible to cover all user usage scenarios through a single solution, and it is difficult to fully cover even some functions or expressions. Like coral, transport UDF.
Knowledge Base Team
Knowledge Base Team
Knowledge Base Team
Knowledge Base Team
Explore More Special Offers
50,000 email package starts as low as USD 1.99, 120 short messages start at only USD 1.00