How to migrate Hive SQL to Flink SQL
2. Flink Python
1、 Motivation of Hive SQL migration
Flink has become the de facto standard for stream computing. Currently, Flink and Flink SQL are generally selected for real-time computing or stream computing at home and abroad. In addition, Flink is also a well-known streaming batch big data computing engine.
However, Flink also faces challenges. For example, although the current large-scale applications are mainly stream computing, Flink batch computing is not widely used. To further promote the integration of streaming and batch computing in the real sense, we need to promote more Flink batch computing in the industry and more actively embrace the existing offline ecosystem. Currently, the offline ecosystem in the industry is dominated by Hive. Therefore, we have done a lot of Hive related integration in the past versions, including Hive Catalog, Hive syntax compatibility, Hive UDF compatibility, and streaming Hive. In Flink 1.16, we have further improved the compatibility of HiveSQL and supported the protocol compatibility of HiveServer2.
So why does Flink support the migration of Hive SQL? On the one hand, we hope to attract more Hive offline digital warehouse users to constantly polish the batch computing engine and align with the mainstream batch computing engine. On the other hand, it is compatible with Hive SQL to reduce the threshold for existing offline users to use Flink to develop offline businesses. In addition, ecology is the biggest threshold for open source products. Flink already has very rich real-time ecological tools, but offline ecology is still lacking. Through compatibility with Hive ecosystem, you can quickly integrate Hive offline ecological tools and platforms, reducing the cost of user access. Finally, this is also an important part of realizing the integration of streaming and batch. We hope to promote the industry to try a unified streaming and batch computing engine, and then unified streaming and batch computing SQL.
From the user's perspective, why should Hive SQL be migrated to Flink SQL?
For the platform side, the unified flow batch computing engine only needs to maintain a set of Flink engines, which can reduce maintenance costs and improve the team's R&D efficiency. In addition, Flink+Gateway+HiveSQL compatibility can be used to quickly build an OLAP system. Another advantage of Flink is that it has a rich connector ecology, and can use Flink's rich data sources to achieve powerful federated queries. For example, you can not only do ad hoc queries in Hive database, but also do federated queries between Hive table data and MySQL, HBase, Iceberg, Hudi and other data sources.
For offline database users, Hive SQL can be used to write stream computing jobs, greatly reducing the cost of real-time transformation. The previous HiveSQL syntax is still used, but it can be run in streaming mode. On this basis, we can also further explore the construction of streaming batch integrated SQL layer and streaming batch integrated database layer.
2、 Challenges of Hive SQL migration
However, Flink's migration to support HiveSQL faces many challenges, mainly in the following three aspects:
Compatibility: including compatibility of offline digital warehouse operation and Hive platform tools. It is mainly compatible with application layer and platform side.
Stability: The stability of production must be guaranteed first for the job after migration. We also did a lot of work in 1.16, including FLIP-168 prediction execution and Adaptive Hash Join. Later, we will publish more articles to introduce this work.
Performance: Finally, performance is also very important. In 1.16, we also did a lot of work in this area, including Dynamic Partition Pruning (DPP), metadata access acceleration, etc. More articles will be published later to introduce this work.
Next, we will focus on the work related to Hive compatibility.
The compatibility of Hive syntax does not completely create a new SQL engine, but reuses many core processes and codes of Flink SQL. We abstract the pluggable parser layer to support and extend different syntax. Flink SQL will be converted to Flink RelNode by Flink Parser, optimized to Physical Plan by Logical Plan, and finally converted to Job Graph for execution. To support Hive syntax compatibility, we introduced the Hive Parser component to convert Hive SQL into Flink RelNode. In this process, most of Hive's existing SQL parsing logic is reused to ensure the compatibility of the syntax layer (all based on Calcite). Later, the same process and code are reused by RelNode and converted into LogicalPlan, Physical Plan, and JobGraph, and finally submitted for execution.
From the perspective of architecture, Hive syntax compatibility is not complicated, but it is a work of "devil in details". The figure above shows some Flink Hive compatibility related issues in Flink 1.16, involving query compatibility, type system, semantics, behavior, DDL, DML, auxiliary query commands and many other syntax functions. The cumulative number of issues completed reached nearly 100.
Flink 1.16 improves Hive compatibility from 85% to 94.1%. Compatibility testing mainly relies on Hive qtest test set, which contains more than 12000 test cases, covering all the current mainstream syntax functions of Hive. Some parts that are not compatible include the ACID function (less used in the industry). If the ACID function is removed, the compatibility has reached more than 97%.
SQLGateway is the server layer component of Flink SQL. It is a separate process and benchmarking HiveServer2 component. From the overall architecture of Flink, SQLGateway is in the middle.
Down, Flink SQL and Hive SQL encapsulate the user API. Both Flink SQL and Hive SQL are executed using the Flink streaming and batching runtime, which can run in batch mode or stream mode. Flink resources can also be deployed and run on YARN, K8S and Flink standalone clusters.
Upwards, SQLGateway provides pluggable protocol layer Endpoint, and currently provides HiveServer2 and REST protocol implementations. Through the HiveServer2 Endpoint, users can connect many tools and components (Zeppelin, Superset, Beeline, DBeaver, etc.) of the Hive ecosystem to the SQL Gateway, provide unified SQL services for streaming and batching, and are compatible with Hive SQL. Through REST protocol, you can use Postman and curl commands or use Python and Java programming to access it, providing complete and flexible stream computing services. In the future, Endpoint capabilities will continue to expand, for example, it can provide higher performance gRPC protocol or compatible PG protocol.
3、 Hive SQL Migration Practice
At present, Kwai is working closely with Flink community to promote the integration of streaming and batch. At present, Kwai migration of Hive SQL jobs to Flink SQL jobs has made initial progress, and thousands of jobs have been migrated. The main migration strategy of Kwai is the dual running platform. Existing services continue to run. The dual running platform has intelligent routing components, which can identify jobs by specifying rules or patterns and deliver them to MapReduce, Spark or Flink for running. At the initial stage, the operation was cautious. Some jobs were designated to run in Flink first through the white list mechanism to observe their stability and performance, compare the consistency of their results, and then gradually use rules to scale. For more practical experience and details, please pay attention to the "Practice of Hive SQL Migration to Flink SQL in Kwai" shared on Flink Forward Asia 2022.
4、 Hive SQL migration demonstration
Demo1: How to migrate Hive SQL to Flink SQL
Next, we will demonstrate how Hive SQL can be migrated to Flink SQL. We have built a YARN cluster and Hive related components, including HiveServer2 services. We use Zeppelin for data visualization and SQL queries. We will demonstrate that Hive SQL can be migrated to Flink SQL only by changing one line of address. The Zeppelin experience is the same, and the SQL does not need to be modified. For the complete Demo video, please watch the complete speech video: https://www.bilibili.com/video/BV1BV4y1T7d4
First, configure Hive Interpreter in Zeppelin, and fill in the JDBC address, port, user name, password, driver and other information of HiveServer2.
With the current History Interpreter, we can create a widened store with the History DDL command_ sale_ Detail table. Use the History SQL syntax to associate stores_ sales、date_ Dim and store are printed into a wide table and written to the store_ sale_ detail。 After executing the INSERT INTO statement, you can see the MapReduce task running on the Hadoop platform.
store_ sale_ After the production of the detail wide table is completed, we can query and analyze, such as viewing the sales volume of each store on Sundays. After running, the results can be displayed in pie chart and other forms.
The above simple demonstration uses Hive for data production and data analysis. The computing engine uses Hive's native Hadoop MapReduce job, which runs on the YARN cluster. Next, we start to migrate to Flink SQL, and the job is still running on the YARN cluster.
First, set up the Flink SQL cluster and start the SQLGateway. We have downloaded and unzipped Flink version 1.16. The Hive connector, JDBC connector and MySQL driver have also been prepared in advance under the lib folder. In addition, you need to replace the flash table planner loader with the flash table planner JAR package in the opt/directory, and then start the YARN session cluster. After the session cluster is started, you can see the session application of Flink on yarn.
Before starting the SQLGateway, you need to modify the configuration, mainly to configure the information related to HiveServer2 Endpoint.
Here, the endpoint type of SQLGateway is HiveServer2, and three additional configurations need to be set:
HiveServer2's hive-conf-dir, thrift.host, and thrift.port. Note that the port number we started is 20002. Then start the SQL Gateway service through the sql-gateway.sh start command.
Once started, you can migrate. Because HiveServer2 runs on the same machine, you only need to modify the port number. Change the 10000 port number here to the just started 20002 port number, that is, the Flink SQLGateway port. No other changes are required. Restart the interpreter, and the migration is complete!
Then we can re execute the Hive SQL statement in Zeppelin and find that the results are consistent.
As shown in the figure above, it is the result of querying the total sales of each store on Sunday. The pie chart result is completely consistent with the Hive engine query result. The difference is that this query is run on the Flink engine.
After Hive SQL is migrated to Flink SQL, it can not only achieve better performance, but also obtain additional capabilities provided by Flink SQL, including richer federated query and streaming batching capabilities.
We can use Flink DDL to create a new catalog. For example, there are new and additional dimension information in the MySQL table, which is not in Hive, and we want to associate it for new data exploration. You can use Flink's CREATE CATALOG statement to create a MySQL catalog to implement federated queries. At the same time, Flink will push down projects and filters that can be pushed down to MySQL for tailoring.
In addition, Hive SQL can also be used to experience the ability of stream computing. Use Flink syntax to create a datagen table, which will continuously generate random data. Switch back to Hive syntax to create a Hive result table sink. Change the running mode to streaming, execute the insert into statement, and then submit a stream job, which will continuously write the data generated in datagen to Hive.
To verify that the Hive result table is always being written to data by the streaming job, we can also query the written table using Hive syntax. As shown in the figure above, by executing the count (*) statement continuously, you can see that the table has been writing data, so the query results will change constantly.
5、 Future planning
In the future, Flink will continue to evolve in the following three aspects:
First, continue to make more attempts and investments in batch to improve the stability and performance of batch, with the goal of catching up with mainstream batch computing engines in the short term.
Second, improve the analysis of data lakes, such as more efficient batch data lake read/write, query optimization push down, read/write optimization on column storage, and support of Iceberg, Hudi, and Flink Table Store. In addition, it also provides rich lake data query and management functions, such as the ability to query snapshot versions, query metadata, richer DML syntax (UPDATE, DELETE, MERGE INTO), and the CALL command to manage lake data.
Third, Flink Batch ecological construction, including further improvement of Remote Shuffle Service and blood relationship management.
Q: Hive writes are executed through Flink. If Hive has a large amount of data, will there be insufficient memory, OOM and other errors?
A: At present, under the batch mode of Flink, almost all operators have memory management mechanisms. Data is not stored in Flink as Java objects, but a separate memory space is opened in Java memory for its use. If the memory is full, disk dropping and spilling will be performed. The speed may decrease slightly, but generally memory OOM will not occur.
Q: Does Flink support Hive custom UDF functions? What is the migration cost?
A: Yes, it can be migrated directly.
Q: Is there any risk in moving the existing offline digital warehouse from Hive to Flink? What are the precautions for smooth migration?
A: At present, smooth migration mostly uses dual running platforms. Some jobs are selected through the mechanism for migration, and the migrated jobs run on both platforms at the same time. Therefore, it is necessary to verify whether the behavior and results are consistent, and then gradually offline the jobs of the old platform to become single running. The whole process needs to be gradual, usually taking half a year to one year.
Q: In Demo, a SQL query uses the Hive on MR engine. After migration, do you want to use Flink SQLGateway or Hive on MR mode?
A: After migration, because the configured port is the Flink SQL Gateway port, the SQL request goes through the Flink SQL Gateway. The Gateway will compile the Hive SQL into Flink jobs and submit them to the YARN cluster for running.
Q: When Flink runs batch tasks, do we specify or automatically generate the number of TaskManagers?
A: For the standalone mode, including the standalone mode running on the k8s, the number of TMs is specified by the user. In other modes, Flink decides and pulls up the number of TMs, including yarn/k8s application mode, yarn session mode, yarn per job mode, and native k8s session mode. The number of pulled TMs is related to the number of slots requested by the job. The taskmanager.numberOfTaskSlots parameter determines the mapping relationship between the number of slots and the number of TMs. The number of slots is related to the concurrency of the scheduled job node.
Q: When Flink runs on the K8S, if dynamic resource allocation is enabled, will the shuffle data always be saved on the POD disk?
A: You can choose to either on TM or on RemoteShuffleService.
Q: After offline job migration, do you still support with as syntax and partition by syntax?
A: The WITH AS syntax is still supported, and the PARTITIONED BY syntax in CREATE TABLE is also supported.
Knowledge Base Team
Knowledge Base Team
Knowledge Base Team
Knowledge Base Team
Explore More Special Offers
50,000 email package starts as low as USD 1.99, 120 short messages start at only USD 1.00