Flink Ecosystems

The realization principle of Flink SQL connecting external systems

Before talking about the principle, let's answer why use Flink SQL? SQL is a standardized data query language, and in Flink SQL, we can integrate with various systems through Catalog, and we have also developed a wealth of built-in operators and functions, and Flink SQL can also process batch data and streams at the same time Data can greatly improve the efficiency of data analysis.

So why does Flink SQL need to connect to external systems? Flink SQL itself is a stream computing engine, it does not maintain any data, so for Flink SQL, all data is stored in the external system, that is, all tables are in the external system, we only need to connect these Only external systems can actually read and write data.

Before explaining how Flink SQL interfaces with external systems, let's take a look at how Flink's internal DataStream and Table are converted? Assuming that there is already a DataStream program, then we can convert it into a Table to use, and use some powerful functions of Flink SQL to query it, which can be understood through the following examples, which is similar to the internal connection of Flink SQL.


For Flink SQL, the component connected to the external system is called Connector. The following table lists several commonly used connectors supported by Flink SQL. For example, Filesystem connects to the file system, JDBC connects to external relational databases, and so on. Each Connector is mainly responsible for implementing a source and a sink. The source is responsible for reading data from the external system, and the sink is responsible for writing data to the external system.


Format specifies the format of the data in the external system, such as a Kafka table, the data in it may be stored in CSV format or JSON format, so when we designate a Connector to connect to the external table, usually also It is necessary to specify what the Format is so that Flink can read and write the data correctly.


The Catalog can connect to the metadata of the external system, and then provide the metadata information to Flink, so that Flink can directly access the created tables or databases in the external system. For example, the metadata of Hive is stored in the Hive Metastore, so if Flink wants to access the Hive table, it has a HiveCatalog to connect to the metadata. In addition, it can also help Flink to persist its own metadata. For example, HiveCatalog can not only help Flink access Hive, but also help Flink store some table information created by Flink, so that you don’t need to rebuild the table every time you start the Session, and directly read the table created in the Hive Metastore. The table will do.

How to create a table to specify the external connector? The following example is a table created through DDL. This is a relatively standard Create Table statement, in which all the parameters related to the Connector are specified in the with statement. For example, the Connector here is equal to Kafka and so on.

After creating a table through DDL, how is this table used in Flink? A key concept here is Table Factory. In this yellow box, we can build a table through DDL, or get it from an external system through Catalog, and then convert it into a Catalog Table object. When we refer to Catalog Table in an SQL statement, Flink will create a corresponding source or sink for this table. The module that creates source and sink is called Table Factory.

There are two ways to obtain the Table Factory, one is that the Catalog itself is bound to a Table Factory, and the other is to determine the Table Factory through the Java SPI, but it will not report an error unless there is exactly one pair when searching.

Connectors commonly used in Flink SQL

Kafka Connector

Kafka Connector is the most used, because Flink is a flow computing engine, and Kafka is the most popular message queue, so most users who use Flink also use Kafka. If we want to create a Kafka table, we need to specify some specific parameters, such as specifying the Connector field as Kafka, and the topic corresponding to Kafka, etc. We can see these parameters and their meanings in the figure below.

To use Kafka Connector, you need to add some dependent Jar packages of Kafka. Depending on the version of Kafka used, the added Jar packages are also different. These Jar packages can be downloaded from the official website.

Elasticsearch Connector

Elasticsearch Connector only implements Sink, so it can only write to ES, but not read from it. Its Connector type can be specified as ES6 or ES7; Hosts is each node of the specified ES, in the form of a domain name plus a port number; Index is an index specifying to write ES, similar to a table in a traditional database; Document Type is similar to A row in a table in a traditional database, but it does not need to be specified in ES7.

Sink of ES supports two modes of append and upsert. If this ES table specifies PK when defining, then Sink will work in upsert mode. If PK is not specified, it will work in append mode, but like ROW and MAP Such types cannot be used as PK.

Similarly, using ES also requires specifying additional dependencies and adding ES Connectors for different ES versions.

FileSystem Connector

This Connector is connected to a file system, and it reads and writes files on this file system. The FileSystem mentioned here refers to Flink's FileSystem abstraction, which supports many different implementations, such as supporting local file systems, Hadoop, S3, OSS and other different implementations. At the same time, it also supports partitions, adopting a partition directory structure similar to Hive, but the partition information does not need to be registered in the Catalog.

Hive Connector

Hive should be the earliest SQL engine, and most users are using it in batch processing scenarios. Hive Connector can be divided into two levels. First, in terms of metadata, we use HiveCatalog to connect to Hive metadata. At the same time, we provide HiveTableSource and HiveTableSink to read and write Hive table data.

To use Hive Connector, you need to specify the Hive Catalog. Here is an example showing how to specify the Hive Catalog.

Using Hive Connector also needs to add some additional dependencies. You can choose the corresponding Jar package according to the Hive version you are using.

In addition to connecting to external systems, we also have built-in Connectors. On the one hand, they help new users get started as soon as possible and experience the powerful functions of Flink SQL. On the other hand, they can also help Flink developers to do some code debugging.

DataGen Connector

DataGen Connector is a data generator. For example, a DataGen table is created here, and several fields are specified. Specify the type of Connector as DataGen, read this table at this time, Connector will be responsible for generating data, that is to say, the data is generated, not stored somewhere in advance. Then the user can do some fine-grained control on the DataGen Connector, for example, you can specify how many rows of data are generated per second, and then a field can be specified to be created by sequence, that is, from small to large, or by random. create etc.

Print Connector

The Print Connector provides the Sink function, which is responsible for printing all the data to the standard output or standard error output. The printing format is preceded by a row kind. When creating a print table, you only need to specify the Connector type as print.

Black Hole Connector

BlackHole Connector is also a sink, it will discard all the data, that is to say, the data is written and it will be discarded without doing anything, mainly for performance testing. To create a BlackHole, you only need to specify the Connector type as BlackHole.

Related Articles

Explore More Special Offers

  1. Short Message Service(SMS) & Mail Service

    50,000 email package starts as low as USD 1.99, 120 short messages start at only USD 1.00

phone Contact Us