Alibaba's optimization and improvement of Flink

Introduction: With the advent of the era of artificial intelligence and the explosion of data volume, Alibaba's commodity data processing often needs to face two different business processes, incremental and full, so Alibaba is thinking: Can there be a unified set of business processes? With the advanced big data engine technology, users only need to develop a set of codes according to their own business logic. In this way, in various scenarios, whether it is full data or incremental data, or real-time processing, one set of solutions can support all of them. This is the background and original intention of Alibaba's choice of Flink.

At that time, Flink had not yet experienced practice in terms of scale or stability, and its maturity was still open to question. Alibaba's real-time computing team decided to establish a Flink branch Blink within Alibaba, and made a lot of modifications and improvements to Flink to adapt it to the ultra-large-scale business scenario of Alibaba. So, what optimizations did Alibaba make to Flink?

Apache Flink overview

Apache Flink (hereinafter referred to as Flink) is a big data research project born in Europe, formerly known as StratoSphere. The project is a research project at the Technical University of Berlin and focused on batch computing in the early days. In 2014, core members of the StratoSphere project hatched Flink and donated Flink to Apache in the same year. Later, Flink successfully became Apache's top big data project. At the same time, the mainstream direction of Flink computing is positioned as stream computing, that is, using stream computing to do all big data computing work, which is the background of the birth of Flink technology.

In 2014, Flink began to emerge in the open source big data industry as a big data engine that mainly focuses on stream computing. What differentiates it from Storm, Spark Streaming, and other streaming computing engines is that it is not only a high-throughput, low-latency computing engine, but also provides many advanced features. For example, it provides stateful computing, supports state management, supports strongly consistent data semantics, and supports Event Time, WaterMark's processing of out-of-order messages, etc.

Flink's popularity is also inseparable from its many labels, including excellent performance (especially in the field of stream computing), high scalability, and support for fault tolerance. It is a pure-memory computing engine that does memory management. In addition, it also supports the processing of eventime, the Job of super large state (it is very common for the state size of the job in Alibaba to exceed TB), and the processing of exactly-once.

Alibaba and Flink

With the advent of the era of artificial intelligence and the explosion of data volume, the most common practice for data business in typical big data business scenarios is to use batch processing technology to process full data, and use stream computing to process real-time incremental data. In most business scenarios, the user's business logic is often the same in batch processing and stream processing. However, the two sets of computing engines that users use for batch and stream processing are different.

Therefore, users usually need to write two sets of code. Undoubtedly, this comes with some additional burden and cost. Alibaba's commodity data processing often needs to face two different sets of business processes: incremental and full, so Alibaba is thinking: can there be a unified big data engine technology, users only need to develop according to their own business logic a set of codes. In this way, in various scenarios, whether it is full data or incremental data, or real-time processing, one set of solutions can support all of them. This is the background and original intention of Alibaba's choice of Flink.

The platform built on Alibaba based on Flink was officially launched in 2016, and started from Alibaba's two scenarios of search and recommendation. At present, all Alibaba businesses, including all Alibaba subsidiaries, have adopted a real-time computing platform based on Flink. At the same time, the Flink computing platform runs on the open source Hadoop cluster. Hadoop's YARN is used as resource management scheduling, and HDFS is used as data storage. Therefore, Flink can seamlessly interface with the open source big data software Hadoop.

At present, this Flink-based real-time computing platform not only serves the Alibaba Group, but also provides Flink-based cloud product support to the entire developer ecosystem through Alibaba Cloud's cloud product API.

At that time, Flink had not yet experienced practice in terms of scale or stability, and its maturity was still open to question. Alibaba's real-time computing team decided to establish a Flink branch Blink within Alibaba, and made a lot of modifications and improvements to Flink to adapt it to the ultra-large-scale business scenario of Alibaba. During this process, the team not only made a lot of improvements and optimizations to Flink in terms of performance and stability, but also made a lot of innovations and improvements in core architecture and functions, which will be gradually pushed back to the community, for example: Flink New distributed architecture, incremental Checkpoint mechanism, Credit-based network flow control mechanism and Streaming SQL, etc. Next, we mainly analyze in depth what optimizations Alibaba has made to Flink from two levels?

Take it from open source and use it as open source 1. SQL layer

In order to truly enable users to develop a set of code according to their own business logic and run them in a variety of different scenarios at the same time, Flink first needs to provide users with a unified API. After some research, Alibaba Real-Time Computing believes that SQL is a very suitable choice. In the field of batch processing, SQL has been tested for decades and is recognized as a classic. In the field of stream computing, theories such as stream-table duality and stream-is-table ChangeLog have emerged in recent years. Based on these theoretical foundations, Alibaba proposed the concept of dynamic tables, so that stream computing can also be described by SQL like batch processing, and it is logically equivalent. In this way, users can use SQL to describe their business logic. The same query statement can be executed as a batch task, or a stream computing task with high throughput and low latency, or even use batch technology first. Perform historical data calculations, and then automatically convert to stream computing tasks to process the latest real-time data. Under this declarative API, the engine has more choices and room for optimization. Next, we will introduce some of the more important optimizations.

The first is to upgrade and replace the technical architecture of the SQL layer. Developers who have researched or used Flink should know that Flink has two basic APIs, one is DataStream and the other is DataSet. The DataStream API is provided for stream processing users, and the DataSet API is provided for batch processing users, but the execution paths of the two sets of APIs are completely different, and even different tasks need to be generated for execution. After a series of optimizations, Flink's native SQL layer will call the DataSet or DataStream API according to the user's choice of batch processing or stream processing. This will cause users to often face two sets of almost completely independent technology stacks in daily development and optimization, and many things may need to be repeated twice. This will also lead to optimizations made on one side of the technology stack that the other side cannot enjoy. Therefore, Alibaba proposed a new Quyer Processor at the SQL layer, which mainly includes an optimization layer (Query Optimizer) that can reuse streams and batches as much as possible, and an operator layer (Query Executor) based on the same interface. In this way, more than 80% of the work can be reused on both sides, such as some common optimization rules, basic data structures, and so on. At the same time, streams and batches will each retain their own unique optimizations and operators to meet different job behaviors.

After the technical architecture of the SQL layer was unified, Alibaba began to seek a more efficient basic data structure to make Blink's execution at the SQL layer more efficient. In native Flink SQL, a data structure called Row is uniformly used, which is completely composed of some objects in JAVA to form a row in a relational database. If the current row of data consists of an integer, a float and a string, then the Row will contain a JAVA Integer, Double and String. As we all know, these JAVA objects have a lot of extra overhead in the heap, and unnecessary boxing and unboxing operations are also introduced in the process of accessing these data. Based on these problems, Alibaba proposed a new data structure BinaryRow, which, like the original Row, also represents a row in a relational data, but the difference is that it completely uses binary data to store these data. In the above example, three fields of different types are uniformly represented by JAVA byte[]. This brings many benefits:

First of all, in terms of storage space, a lot of unnecessary extra consumption is removed, making the storage of objects more compact;
Secondly, when dealing with network or state storage, you can also omit a lot of unnecessary serialization and deserialization overhead;
Finally, after removing various unnecessary boxing and unboxing operations, the entire execution code is also more GC friendly.
By introducing such an efficient basic data structure, the execution efficiency of the entire SQL layer has been more than doubled.

At the operator implementation level, Alibaba has introduced a wider range of code generation technologies. Thanks to the unification of technical architecture and basic data structure, many code generation technologies can be reused in a wider range. At the same time, due to the strong type guarantee of SQL, users can know in advance the type of data that the operator needs to process, so that more targeted and efficient execution code can be generated. In native Flink SQL, only simple expressions like a > 2 or c + d will apply code generation technology. After Alibaba optimization, some operators will perform overall code generation, such as sorting, aggregation, etc. This allows users to control the logic of the operator more flexibly, and can directly embed the final running code into the class, eliminating the expensive function call overhead. Some basic data structures and algorithms of application code generation technology, such as sorting algorithm, HashMap based on binary data, etc., can also be shared and reused between stream and batch operators, allowing users to truly enjoy the unification of technology and architecture benefits. After optimizing the data structure or algorithm for some scenarios of batch processing, the performance of stream computing can also be improved. Next, let's talk about the drastic improvements Alibaba has made to Flink at the Runtime layer.

2. Runtime layer

In order for Flink to take root in Alibaba's large-scale production environment, the real-time computing team encountered various challenges as scheduled, and the first one was how to integrate Flink with other cluster management systems. Flink's native cluster management model is not yet perfect, and other relatively mature cluster management systems cannot be used natively. Based on this, a series of thorny questions emerge one after another: how to coordinate resources among multi-tenant? How to dynamically apply and release resources? How to specify different resource types?

In order to solve this problem, the real-time computing team went through a lot of research and analysis. The final solution was to transform the Flink resource scheduling system so that Flink can run on the Yarn cluster natively; and to reconstruct the Master architecture so that one Job corresponds to one Master , the Master is no longer the cluster bottleneck. Taking this as an opportunity, Alibaba and the community jointly launched a new Flip-6 architecture, making Flink resource management a pluggable architecture, laying a solid foundation for Flink's sustainable development. Flink can now run seamlessly on YARN, Mesos and K8s, which is a powerful illustration of the importance of this architecture.

After solving the large-scale deployment of Flink clusters, the next step is reliability and stability. In order to ensure the high availability of Flink in the production environment, Alibaba has focused on improving Flink's FailOver mechanism. The first is the Master's FailOver. Flink's native Master's FailOver will restart all jobs. After the improvement, any failover of the master will not affect the normal operation of the job. Secondly, the region-based task failover is introduced to minimize the impact of any task's failover on users. Impact. With the escort of these improvements, a large number of Alibaba business parties have begun to migrate real-time computing to Flink to run.

Stateful Streaming is the biggest highlight of Flink. The Checkpoint mechanism based on the Chandy-Lamport algorithm enables Flink to have the computing power of Exactly Once consistency. However, in the early Flink version, the performance of Checkpoint had a certain bottleneck under the large-scale data volume. Alibaba is also Numerous improvements have been made to Checkpoint, such as:

Incremental Checkpoint Mechanism: It is common to encounter large jobs with dozens of TB states in Alibaba's production environment. It is very expensive to do a full amount of CP to shake the ground. Therefore, Alibaba has developed an incremental Checkpoint mechanism. became a long stream of water;
Checkpoint small file merging: it is all caused by scale. With the increasing number of Flink JOBs in the entire cluster, the number of CP files has also increased, and the HDFS NameNode that was finally pressed is overwhelmed. Alibaba merged several CP small files into one large file. The way the files are organized ultimately reduces the pressure on the NameNode by dozens of times.
Although all data can be placed in State, due to some historical reasons, users still have some data that needs to be stored in some external KV storage such as HBase. Users need to access these external data in Flink jobs, but because Flink has always Both are single-threaded processing models, which lead to the delay in accessing external data becoming the bottleneck of the entire system. Obviously, asynchronous access is a direct means to solve this problem, but it is not easy for users to write multi-threading in UDF while ensuring ExactlyOnce semantics. . Alibaba proposed AsyncOperator in Flink, which makes it as easy for users to write asynchronous calls in Flink JOB as writing "Hello Word", which has made a great leap in the throughput of Flink Jobs.

Flink is designed to be a unified computing engine for batch streams. After using lightning-fast stream computing, batch users are also interested in staying in Flink communities. However, batch computing also brings new challenges. First, in terms of task scheduling, Alibaba has introduced a more flexible scheduling mechanism, which can perform more efficient scheduling according to the dependencies between tasks; the second is data shuffle, Flink’s native Shuffle The Service is bound to the TM. After the task is executed, the TM cannot release the resources. In addition, the original Batch shuffle does not merge the files, so it cannot be used in production. Alibaba developed the Yarn Shuffle Service function and solved the above two problems. When developing the Yarn Shuffle Service, Alibaba found that it was very inconvenient to develop a new Shuffle Service, and it needed to invade many places in the Flink code. Flink's Shuffle becomes a pluggable architecture. At present, Alibaba's search business is already using Flink Batch Job, and it has already started to serve production.

After more than 3 years of polishing, Blink has begun to thrive in Alibaba, but the optimization and improvement of Runtime is never-ending, and a large wave of improvements and optimizations are on the way.

The future direction of Flink

At present, Flink has become a mainstream stream computing engine. The next important task for the community is to make Flink make breakthroughs in batch computing and become a mainstream batch computing engine in more scenarios. Then go further and seamlessly switch between streams and batches, blurring the boundaries between streams and batches more and more. With Flink, in one computation, there can be both stream computation and batch computation.

Next, Alibaba will also work to promote Flink to be supported by more languages ​​in the ecosystem, not only Java and Scala languages, but also Python and Go languages ​​used in machine learning.

Another point I have to say is AI, because many big data computing needs and data volumes are supporting very popular AI scenarios, so Flink will continue to improve the upper-level Machine Learning algorithm library based on the improvement of the streaming batch ecosystem. Flink will also integrate with more mature machine learning and deep learning. For example, Tensorflow On Flink can be used to integrate ETL data processing of big data, feature calculation and feature calculation of machine learning, training calculation, etc., so that developers can enjoy the benefits brought by multiple ecosystems at the same time.

Finally, in terms of ecology and community activity, one of the things that Alibaba is currently promoting is to prepare for the first Flink Forward China Summit (thousand people) to be held at the National Convention Center from December 20 to 21, 2018. Participants You will have the opportunity to learn why companies such as Alibaba, Tencent, Didi, Meituan, ByteDance, etc. use Flink as their preferred stream processing engine.

Related Articles

Explore More Special Offers

  1. Short Message Service(SMS) & Mail Service

    50,000 email package starts as low as USD 1.99, 120 short messages start at only USD 1.00

phone Contact Us