Introduction and Practice of Flink SQL Table

Flink has a very powerful API abstraction capability. It provides three-layer APIs, which are Process Function, DataStream API, SQL and Table API from bottom to top. These three layers have different user groups. The lower the layer, the higher the flexibility and the higher the threshold. The highest layer has a lower threshold, but some flexibility will be sacrificed.

Why spend energy doing SQL and Table API?

The DataStream API is very easy to use because it is very expressive, users can maintain and update the application state, and its control over time is also very flexible. But relatively speaking, its complexity and threshold are also higher, and it is not suitable for everyone. Many users want to focus on business logic. Therefore, to provide a simpler and easier-to-understand API, SQL is currently the best choice.

Flink SQL and Table API have many advantages. First of all, it is very easy to understand. Many people in different industries and fields understand SQL, and it has become the standard language in the big data processing ecosystem; second, SQL is a declarative language. Users only need to express what they want, and do not need to care about how calculation; then SQL will be automatically optimized to generate the optimal execution plan; at the same time, SQL is a language that has been used for more than 30 years and is very stable; finally, SQL can more easily unify streams and batches, and can be processed simultaneously with the same system. Let users only focus on the core business logic.

Introduction to SQL and the Table API

Flink's relational API mainly exposes two types, one is SQL API, and the other is Table API. The SQL API completely follows the standard design of ANSI SQL, so if you have a SQL foundation, its learning threshold is relatively low, and Table can be understood as a SQL-like programming API. They are all unified batch processing and stream processing APIs. Regardless of whether the input is static batch processing data or unlimited stream processing data, the results of his query are the same. In summary, it is a piece of code and a result, which is also the most important evaluation index for batch unification.

Flink's workflow

The following is a relatively high-level overview. After SQL and Table enter Flink, they will be transformed into a unified data structure expression form, that is, Logical Plan. Among them, the Catalog will provide some raw data information for subsequent optimization. Logical Plan is the intersection of optimization. After a series of optimization rules, Flink will optimize the initial Logical Plan to Physical Plan, and translate it into Transformation through the Code Generation mechanism, and finally convert it into JobGraph, which is used to submit to the Flink cluster for distribution type of execution. It can be seen that the entire process does not have a separate path for stream processing and batch processing, because these optimized processes and expansions are shared.

Understanding streams and batches with examples

For example, a clicked file has user, click time and URL. If we want to count the number of clicks, in the case of selecting users for unified batch processing, its feature is one-time read-in and one-time output.

And if Click is a data stream, in this case, a result can be output after inputting a piece of data. For example, the first click of Marry will record once, and the second click will perform incremental calculations. So the input data will continue to be read in, and the results will continue to be updated.

It can be seen that the stream and batch results are the same here, so we can migrate the previous batch-processed SQL to Flink for stream processing, and its results and semantics should be the same as the previous batch process.

Flink SQL and Table application cases

Typical ones include low-latency ETL processing, such as data preprocessing, cleaning, and filtering; and data pipelines. Flink can do real-time and offline data pipelines, build low-latency real-time data warehouses, and synchronize data in real time. Synchronize from one data system to another;

The third type is streaming and batch data analysis, to calculate and update offline or real-time data, and visualize them, such as the large screen of Alibaba Double 11;

The last one is pattern recognition, which is to identify event streams that match a certain pattern in the data stream in real time, and then provide corresponding monitoring or alarm services, such as monitoring services for some abnormal events in online car-hailing.

Flink's core functions

The figure below contains some core functions of Flink. The first is the DDL of SQL. DDL is directly connected to the external system. Its strength determines the connectivity between Flink and the external system. As a computing engine, the connectivity with external data storage is very important; the second is the complete type system, which supports a variety of data types, which is also very necessary for the SQL engine; the third is the efficient streaming TopN, which has very powerful stream processing capabilities and is used to calculate rankings in real time, such as the sales rankings of Double 11. ; There is also efficient streaming deduplication to filter data, because sometimes the collection will contain duplicate data; there are also dimension table associations, docking CDC, etc.

In addition, Flink has a lot of built-in functions, supports MiniBatch, and has a variety of hot spot solutions. It also supports complete batch processing, is suitable for languages such as Python, and has functions such as Hive integration. It can not only directly access Hive data, but also is compatible with Hive syntax, so that users do not need to switch frequently.


The following is a real-time analysis of e-commerce user behavior. We consume user behavior data in real time from Kafka, associate it with data in MySQL, write it into the index of Elasticsearch, and use Kibana for visual presentation. This is an end-to-end real-time application build.

The following is the final display result on Kibana. There will be a panel for real-time monitoring, showing data including the current number of independent users, category ranking, and purchase volume at each time period.

The following is the user behavior log from a certain treasure. We only selected the behavior on November 27, which contains these fields, including user ID, product ID, product category ID, behavior type and timestamp. Among the behavior types, pv stands for click, buy stands for purchase, cart stands for adding to shopping cart, fav stands for collection event, and timestamp stands for the time when the event occurred.

Practical exercise

The sample code for the exercise has been uploaded to Github. If you are interested, you can follow this document step by step. We can prepare a Linux or MacOS computer with Docker installed, no need to download additional packages.

First, we create a new directory, such as flink-sql-demo, and then download the demo file of docker-compose, you can click in to see this file.

There is a dategen data source, we can control its generation speed, for example, change the generation speed from 2000 to 3000.

We start all the containers in docker through docker-compose up-d. Containers include two Flink clusters, Jobmanager and Taskmanager, as well as Kibana, Elasticsearch, Zookeeper, MySQL, Kafka, etc.

We can use the Docker-compose command to see the latest 10 pieces of data in Kafka. It has user ID, product ID, category ID, user behavior, and a TS representing the time when the behavior occurred at that time.

Then we start today's protagonist, and start the SQL-Client container through Docker-compose. When we see this squirrel, the SQL Client is successfully started, and we can run SQL commands in it.

In the first step, we need to use DDL to create a data source, and create the data source of the user log first. We use the DDL syntax of Create Table to create a user behavior table, which has 5 fields, including user ID, commodity ID, category ID, user behavior and TS timestamp. With is followed by some attributes of how to connect to external systems, such as using Kafka to connect to external topics.

In addition, we can also view user behavior through show table, and use describe table to view table structure, fields, calculated columns, watermark strategies, and so on.

We can also enter port 8081, which is a Web UI interface of the Flink cluster under Docker compose, where you can check out various columns.

Next, we use 3 actual battles to draw some charts and gain an in-depth understanding of some functions of Flink.

The first is to count the transaction volume per hour. We first use DDL to create an Elasticsearch table, define the hourly transaction volume, and then submit the query for statistical analysis of the hourly transaction volume.

We need to do a sliding window every hour, using the Tumble Window syntax. The first field of Tumble defines the time attribute, which is the TS event time just mentioned, and the window size is one hour, which means that we will slide a window every hour, and then perform statistical analysis on the data in the window.

We submit this Query, and then access Kibana through port 5601 to visualize it. When we first came in, it was empty, and there was no data in it, so we generally need to create create index pattern first, enter through the Index Pattern in the Management of the page, find our index, and click to create it.

After creating the Index Pattern, we can do some discovery or visualization in it. As you can see, these fields are defined in the DDL just now, and they have corresponding values.

Of course, we want to visualize in the end, so we need to create a Dashboard. Click Dashboard in the upper left corner of the page, then click Create New to create a new view, and then you can set the hourly volume.

We draw an area chart, select the purchase volume max on the Y-axis, change the label name to "Trading Volume", and then select "hour-of-day" because the X-axis shows time, and change the order by alphabetical order to 24, Then click the play button, and the area chart will be drawn. We can also click Save, so that the area chart will be saved to the Dashboard.

Then we draw another graph to count the cumulative number of unique users every 10 minutes a day. Similarly, we need to create an Elasticsearch table in the SQL CLI to store the result summary data. The character field includes date time and cumulative uv number.

Then, we execute Table in SQL CLI.

Here Query mainly does one thing, which is to select the date and time. The only special thing here is that because our requirement is to do points every 10 minutes, we use the function of connecting two vertical lines of Substr to achieve it. Then, we submit the query to the SQL CLI to run as before.

As before, we create a new view, here we create a wiring diagram.

We take the value of uv on the Y axis and name it "Number of Independent Access Users", select terms on the X axis, then select time-str, order by Alphabetical, and change it to 150 points in a day. Then click play, and the graph of the number of unique users will appear. Similarly, we can click Save to add this graph to the Dashboard.

Then let's draw the third picture. The third picture is the top category ranking list. Because the category corresponding to a product is too subdivided, for example, it corresponds to a very detailed third-level and fourth-level category, so it may not be meaningful to the ranking list. But we want to reduce to a top-level category for statistical analysis, so we prepared a MySQL container before starting, which prepared the mapping relationship between subcategories and top-level categories.

We first create a MySQL table in the SQL CLI, and then use it as a dimension table query. At the same time, we create an Elasticsearch table to store category statistics. In Query, we will use the Create View syntax to register a temporary view to simplify the writing, because it may be more complicated to write two Query together.

We also run the code in the SQL CLI, and then go to the Kibana page to build indexes and add visualizations. Here we use Horizontal Bar to draw a histogram.

On the Y axis, we count the transaction volume of the category, the X axis uses the name of the category, and the sorting is in reverse order, and then click the play button.

Finally, we also click Save to add the category leaderboard to the Dashboard. Adding the two we made before, there are 3 charts on the Dashboard, here you can drag and drop the charts to beautify them.

Related Articles

Explore More Special Offers

  1. Short Message Service(SMS) & Mail Service

    50,000 email package starts as low as USD 1.99, 120 short messages start at only USD 1.00

phone Contact Us