Flink SQL platform-based practice

1. The development history of NetEase game Flink SQL

NetEase's game real-time computing platform is called Streamfly, which is named after Stormfly in the movie "How to Train Your Dragon". Since we are already migrating from Storm to Flink, we replaced Storm in Stormfly with the more general Stream.

The predecessor of Streamfly is a subsystem named Lambda under the offline operation platform Omega, which is responsible for the scheduling of all real-time jobs. At first, it supported Storm and Spark Streaming, and later changed to only support Flink. In 2019, we separated Lambda and built the Streamfly computing platform based on it. Subsequently, we developed and launched the first version of the Flink SQL platform StreamflySQL at the end of 2019. This version provides basic Flink SQL functions based on the template jar, but the user experience needs to be improved, so we rebuilt the second version of StreamflySQL from scratch in early 2021, and the second version is based on SQL Gateway.

To understand the difference between these two versions, we need to review the basic workflow of Flink SQL.

The SQL submitted by the user will first be parsed into a logical execution plan by Parser; the logical execution plan will be optimized by Planner Optimizer, and a physical execution plan will be generated; the physical execution plan will be generated by Planner CodeGen code, and translated into a common Transformation of DataStream API; finally, StreamGraphGenerator will These Transformations are converted into the final representation of Flink jobs, JobGraph, and submitted to the Flink cluster.

The above series of processes all take place in TableEnvironment. Depending on the deployment mode, TableEnvironment may run in Flink Client or JobManager. Flink now supports 3 cluster deployment modes, including Application, Per-Job and Session modes. In Application mode, TableEnvironment will run on the JobManager side, while in the other two modes, TableEnvironment will run on the Client side. However, these three modes have a common feature, TableEnvironment is one-time, and will automatically exit after submitting JobGraph.

In order to better reuse TableEnvironment to improve efficiency and provide stateful operations, some projects will put TableEnvironment into a new independent Server process to run, resulting in a new architecture, which we call Server Side-by-side SQL compilation. In contrast, there is also client-side SQL compilation.

Some students may ask why there is no SQL compilation on the JobManager side. This is because the JobManager is a relatively closed component that is not suitable for expansion, and even if it is done, the effect achieved is basically the same as that of the client-side compilation. So in general, there are generally two common Flink SQL platform architectures, Client and Server.

Client-side SQL compilation, as the name implies, means that SQL parsing, translation, and optimization are performed on the client side (the client here is a client in a broad sense, not necessarily a Flink client). Typical cases are the common template jar and Flink's SQL Client. The advantage of this architecture is that it can be used out of the box, the development cost is low, and the Flink public API is used, and the version upgrade is relatively easy; the disadvantage is that it is difficult to support advanced functions, and a relatively heavy TableEnvironment must be started every time. So The performance is relatively poor.

Then there is the server-side SQL editing. This architecture puts the SQL parsing, translation, and optimization logic into an independent server process, making the client very light, which is closer to the architecture of traditional databases. A typical example is Ververica's SQL Gateway. The advantage of this architecture is that it has good scalability, can support many customized functions, and has good performance; the disadvantage is that there is no mature solution in the open source world. As mentioned above, SQL Gateway is only a relatively early prototype system and lacks many Enterprise-level features, if used in the production environment, need to be modified to a certain extent, and these modifications involve more Flink internal APIs, requiring more background knowledge of Flink, generally speaking, the development cost is relatively high, and the workload of subsequent version upgrades is also relatively large .

Back to our Flink SQL platform, our StreamflySQL v1 is based on client-side SQL compilation, while v2 is based on server-side SQL compilation. Let me introduce them one by one.

2. StreamflySQL v1 based on template jar

There are three main reasons why StreamflySQL v1 chooses client-side SQL compilation:

1: The first is platform integration. Unlike many companies whose job schedulers are written in Java, which is more mainstream in big data, our Lambda scheduler is developed in Go. This is because Lambda supports a variety of real-time computing frameworks at the beginning of its design. For the sake of loose coupling and the company's technology stack, Lambda uses Go as the development language, and uses a method similar to YARN to dynamically generate shell scripts to call different frameworks. command line interface. Such a loosely coupled interface method brings us great flexibility. For example, we can easily support multiple versions of Flink without forcing users to upgrade with the system version, but at the same time, it also makes it impossible to directly call Flink's native Java. APIs.

2: The second reason is loose coupling. When developing, the Flink version was 1.9. At that time, the Client API was relatively complicated and not suitable for platform integration. At that time, the community was also promoting the reconstruction of the Client, so we tried to avoid relying on the Client API to develop the Flink SQL platform.

3: The third reason is practical experience. Because the template jar + configuration center mode has been widely used in Netease games, we have accumulated a lot of practical experience in this area. In summary, we naturally adopted the template jar + configuration center architecture to implement the v1 version.

The picture above is the overall architecture of the v1 version. On the basis of the Lambda job platform, we have added the StreamflySQL backend as a configuration center, which is responsible for generating a Lambda job based on the SQL submitted by the user and the job running configuration plus a common template jar.

The overall job submission process is as follows:

*Users submit SQL and run configurations in the front-end SQL editor.
*StreamflySQL backend generates a Lambda job after receiving the request and passes the configuration ID.
*Lambda then starts the job, behind which is the execution of the Flink CLI run command to submit the job.
The *Flink CLI run command will start the Flink Client to load and execute the main function of the template jar. At this time, the SQL and configuration will be read, and the TableEnvironment will be initialized.
*TableEnvironment will read the necessary meta-information such as Database/Table from Catalog. By the way, we do not use a unified catalog to maintain the metadata of different components in Netease games, but different components have their own metadata centers corresponding to different catalogs.
*Finally, the TableEnvironment compiles the JobGraph and deploys the job in the way of Per-Job Cluster.

StreamflySQL v1 realizes the construction of the Flink SQL platform from zero to one, which meets some business needs, but there are still many pain points.

The first pain point is slow response.

In a more typical SQL, starting a job in the form of a template jar requires preparing the TableEnviroment, which may take 5 seconds, and then executing SQL compilation and optimization, including interacting with the Catalog to obtain metadata, may also take 5 seconds ; After compiling the jobgraph, you need to prepare a per-job cluster, which generally takes more than 20 seconds; finally, you need to wait for the scheduling of the Flink job, that is, the state of the job changing from scheduled to running, which may also take 10 seconds .

In general, it takes at least 40 seconds for the v1 version to start a Flink SQL job, which is relatively long. But after careful analysis of these steps, only SQL compilation optimization and job scheduling are unavoidable. Others such as TableEnvironment and Flink cluster can actually be prepared in advance. The slowness here is that resources are lazy initialized and there is almost no reuse.

The second pain point is that debugging is difficult.

Our requirements for SQL debugging are as follows:

*The first point is that the debugged SQL should be basically the same as the online SQL.
*The second point is that debugging SQL cannot affect the online data, it can read the online data, but cannot write.
*The third point, because the debugged SQL usually only needs to extract a small number of data samples to verify the correctness of the SQL, so we want to limit the resources for debugging SQL, on the one hand for cost considerations, and on the other hand to prevent debugging SQL and online jobs create resource competition.
*Fourth point, because the amount of data processed by debugging SQL is relatively small, we hope to obtain the results in a faster and more convenient way.

In the v1 version, we designed the following solutions to the above requirements:

*First of all, for the debugged SQL, the system will replace the original Sink with a dedicated PrintSink during SQL translation, which solves the first two points in the requirement.
*Then limit the flow of PrintSink, achieve the overall flow limit through Flink's back pressure mechanism, and limit the maximum execution time of the job. After the timeout, the system will automatically end the job, which solves the resource limitation in the demand. .
*Finally, in order to respond faster, the debugged job will not be submitted to the YARN cluster to run, but will be executed locally on a MiniCluster on the Lamdba server, and it is also convenient for us to extract the PrintSink results from the standard output. Addresses the last point in the requirements.

The structure of the debugging mode is shown in the figure above. Compared with the general SQL submission process, the main difference is that the job will not be submitted to YARN, but will be executed locally on the Lambda server, which saves the overhead of preparing the Flink cluster and is easier to control resources and getting results.

The above debugging solutions are basically available, but there are still many problems in the actual use process.

*First, if the SQL submitted by the user is complex, the compilation and optimization of the SQL may take a long time, which will cause the job to time out easily, and the system may end it before the result is output. At the same time, such SQL is also It will put a lot of pressure on the server.
*Secondly, this architecture cannot debug jobs with a long time window or jobs that require Bootstrap State.
*Third, because the execution results are returned in batches after the job ends, not in a stream during job execution, users need to wait until the job ends—usually more than 10 minutes before seeing the results.
*Fourth, replace the Sink that debugs SQL in the SQL translation stage. This function is realized by transforming Flink’s Planner, which is equivalent to the intrusion of business logic into Planner, which is not elegant.

The third pain point is that the v1 version only allows a single DML.

Compared with traditional databases, the SQL statements we support are very limited. For example, MySQL's SQL can be divided into DML, DQL, DDL, and DCL.

DML is used to manipulate data, and common statements include INSERT / UPDATE / DELETE. StreamflySQL v1 only supports INSERT, which is consistent with Flink SQL. Flink SQL uses the retract mode—that is, a method similar to Changelog—to represent UPDATE/DELETE, so it only supports INSERT, which is actually no problem.

DQL is used to query data, and the common statement is SELECT. This is supported in Flink SQL, but StreamflySQL v1 does not support DQL because the lack of Sink cannot generate a meaningful Flink job.

DDL is used to define metadata, common statements are CREATE / ALTER /DROP etc. This is not supported in the StreamflySQL v1 version, because the entry point for calling SQL from the template jar is sqlUpdate, which does not support pure metadata operations, and it is completely uneconomical to start a TableEnvironment for pure metadata operations.

Finally, DCL is used to manage data permissions, such as GRANT and REVOKE statements. This Flink SQL is not supported, because Flink is currently only a user of data rather than a manager, and DCL is meaningless.

Taken together, the v1 version only supports a single DML, which makes our beautiful SQL editor empty. Based on the above pain points, we researched and developed StreamflySQL v2 this year. v2 uses the server-side SQL compilation architecture.


3. StreamflySQL v2 based on SQL Gateway

Our core requirement is to solve several pain points of the v1 version, including improving user experience and providing more complete SQL support. The overall idea is to adopt the architecture compiled by SQL on the server side to improve scalability and performance. In addition, our cluster deployment mode has also been changed to Session Cluster, which prepares cluster resources in advance and saves the time to start YARN application.

There are two key issues here.

*First of all, do we want to develop completely by ourselves or based on open source projects? During the research, we found that Ververica's SQL Gateway project is very suitable for our needs. It is easy to expand and is a basic implementation of the FLIP-91 SQL Gateway in the Flink community. It is also easy to integrate with the development direction of the community in the future.
*The second problem is that SQL Gateway itself has the ability to submit jobs, which overlaps with our existing Lambda platform, which will cause problems such as redundant construction and difficult unified management, such as authentication and authorization, resource management, monitoring and alarming, etc. There will be two entrances. So how should the two be divided? Our final solution is to use the two-stage scheduling of Session Cluster, that is, resource initialization and job execution are separated, so we can let Lambda be responsible for the management of Session Cluster, and StreamflySQL is responsible for the management of SQL jobs, so that we can reuse Lambda large Part of the basic ability.

This is the architecture diagram of StreamflySQL v2. We embedded SQL Gateway into the SpringBoot application and developed a new backend. Overall, it looks more complicated than the v1 version, because the original one-level scheduling has become a two-level scheduling of sessions and jobs.

First, the user needs to create a SQL session, and the StreamflySQL backend will generate a session job. From the perspective of Lambda, a session job is a special job that uses the yarn-session script to start a Flink Session Cluster. After the Session Cluster is initialized, users can submit SQL within the session. The StreamflySQL backend will open a TableEnvironment for each session, responsible for executing SQL statements. If it is an SQL that only involves metadata, it will directly call the Catalog interface to complete it. If it is a job-type SQL, it will be compiled into a JobGraph and submitted to the Session Cluster for execution.


The v2 version largely solves several pain points of the v1 version:

*In terms of response time, v1 usually takes around 1 minute, while v2 usually completes within 10 seconds.
*In terms of debugging and previewing, v2 does not need to wait for the end of the job, but returns the result through the socket stream when the job is running. This is due to the clever design of SQL gateway. For the select statement, SQL Gateway will automatically register a socket-based temporary table and write the select result to this table.
*In terms of SQL support, v1 only supports DML, while v2 can support DML/DQL/DDL with the help of SQL Gateway.

However, although SQL Gateway has good core functions, it has not been smooth sailing for us to use, and we have also encountered some challenges.

The first and most important thing is the persistence of metadata.


The metadata of SQL Gateway itself is only stored in memory. If the process restarts or encounters an abnormal crash, the metadata will be lost, which is unacceptable in the enterprise production environment. Therefore, after we integrate SQL Gateway into the SpringBoot program, we naturally save the metadata to the database.

Metadata is mainly session metadata, including session Catalog, Function, Table, and jobs. These metadata can be divided into 4 layers according to the scope of application. The bottom two layers are global configurations, which exist in the form of configuration files; the upper two layers are metadata dynamically generated at runtime, which are stored in the database. The upper-level configuration items have higher priority and can be used to override the lower-level configuration.

Let's look at these metadata from the bottom up:

*The bottom layer is the global default Flink Configuration, which is our flink-conf yaml configuration under Flink Home.
*The upper layer is the configuration of the Gateway itself, such as the deployment mode (such as YARN or K8S), such as the Catalog and Function to be published by default, etc.
*The third layer is the Session Configuraion at the session level, such as the cluster ID of the Session Cluster corresponding to the session or the resource configuration of the TaskManager, etc.
*The top layer is Job-level configuration, including metadata dynamically generated by the job, such as job ID, user-set checkpoint period, and so on.

Such a flexible design not only solves the problem of metadata persistence, but also lays the foundation for our multi-tenant feature.

Multi-tenancy is divided into two aspects: resources and authentication:

In terms of resources, StreamflySQL uses the Lambda job platform to start Session Clusters in different queues. Their Master nodes and resources are naturally isolated, so there is no problem of different users sharing a Master node and mixing resources like Spark Thrift Server.
In terms of authentication, because the Session Cluster belongs to different users, the StreamflySQL backend needs to implement multi-tenant disguise. In Netease games, components generally use Kerberos authentication. The way we implement multi-tenancy is to use Hadoop’s Proxy User, first log in as a super user, and then pretend to be a project user to obtain delegation tokens from different components. The components here are mainly Hive MetaStore and HDFS, and finally save these tokens to UGI Inside and use doAS to submit jobs.

In order to achieve high availability and expand service capabilities, StreamflySQL naturally needs to be deployed in a multi-instance architecture. Because we have stored the main state metadata in the database, we can build a new TableEnvironment from the database at any time, so the StreamflySQL instance is as light as a normal web service, and can be easily scaled up and down.

But not all states can be persisted, and some states are deliberately not persisted. For example, the user uses the SET command to change the properties of the TableEnvironment, such as enabling Table Hints, which are temporary properties and will be reset after the TableEnvironment is rebuilt. This is as expected. For another example, when a user submits a select query for debugging and preview, TaskManager will establish a socket connection with the StreamflySQL backend, and the socket connection is obviously not persistent. Therefore, we have added affinity load balancing before the multiple instances of StreamflySQL, and dispatch traffic according to the Session ID, so that under normal circumstances, the requests of the same user fall on the same instance, ensuring the continuity of user experience.

In fact, the word status here is a pun and has two meanings:

*The first meaning is the running status of the job. SQL gateway currently only submits SQL and does not monitor subsequent running status. Therefore, StreamflySQL sets up a monitoring thread pool to periodically poll and update job status. Because StreamflySQL is multi-instance, if their monitoring threads operate the same job at the same time, there may be a problem of lost updates, so we use CAS optimistic locks here to ensure that outdated updates will not take effect. Then we will issue an alarm when the job exits abnormally or the status cannot be obtained. For example, in the case of JobManager failover, we cannot know the status of the Flink job. At this time, the system will issue a disconnected abnormal status alarm.

*The second meaning is the persistent state of Flink, that is, Flink State. The native SQL gateway does not manage Flink's savepoint and checkpoint, so we added stop and stop-with-savepoint functions, and forced to enable retained checkpoint. This allows the system to automatically find the latest checkpoint when the job encounters an abnormal termination or a simple stop.

Here I can share our algorithm. In fact, Lambda also provides the function of automatically finding the latest checkpoint, but Lambda assumes that the jobs are Per-Job Cluster, so you only need to find the latest checkpoint in the cluster checkpoint directory. But such an algorithm is not applicable to StreamflySQL, because Session Cluster has multiple jobs, and the latest checkpoint is not necessarily our target job. Therefore, we changed to use a search method similar to JobManager HA, first read the metadata of the job archive directory, and extract the latest checkpoint from it.

4. Future work

One of the first problems we have to solve in the future is the problem of state migration, that is, how to restore from the original savepoint after the user changes the SQL. At present, users can only be notified of risks by changing the type. For example, adding or subtracting fields will not cause incompatibility of Savepoint, but if a join table is added, the impact will be hard to say. Therefore, in the future, we plan to analyze the execution plan before and after the SQL change to inform users of the state compatibility before and after the change in advance.

The second problem is fine-grained resource management. At present, we cannot specify SQL resources during job compilation. For example, the CPU and memory of TaskManager are determined after the Session Cluster is started, which is at the session level. Currently, resources can only be adjusted through job parallelism, which is inflexible and prone to waste. Now Flink 1.14 has supported the fine-grained resource management of the DataStream API, and resources can be set at the operator level, but the SQL API has no plans yet, and we may participate in the future to promote the progress of related proposals.

Finally, community contributions. We have some experience in using SQL Gateway and have made many improvements to it. We hope that these improvements can be given back to the Flink community to promote the progress of FLIP-91 SQL Gateway.

Related Articles

Explore More Special Offers

  1. Short Message Service(SMS) & Mail Service

    50,000 email package starts as low as USD 1.99, 120 short messages start at only USD 1.00

phone Contact Us