RocketMQ Flink Catalog Design and Practice

Related Tags:1.Realtime Compute for Apache Flink
2. Flink Python

1、 Flink and Flink Catalog

Flink is a distributed computing engine, which has realized batch streaming integration and can process bounded and unbounded data. Computing resources need to be allocated and managed effectively to execute streaming applications.

At present, Flink API is abstracted into four parts:

The top abstraction is SQL. The association between SQL abstraction and Table API abstraction is very close, and SQL query statements can be executed on the table defined in the Table API.

The second layer is abstracted as the Table API. The Table API is a declarative programming (DSL) API centered on a table. For example, in the streaming data scenario, it can represent a dynamically changing table.

The third layer of abstraction is Core APIs. Many programs may not use the lowest level API, but can use Core APIs for programming: including DataStream API (applied to bounded/unbounded data flow scenarios) and DataSet API (applied to bounded data set scenarios).

The fourth layer is abstracted as stateful real-time stream processing.

Flink Catalog provides metadata information, such as databases, tables, partitions, views, and functions and information stored in databases or other external systems. Flink manages metadata in two ways: temporary and persistent. The built-in GenericInMemoryCatalog is a memory based catalog. All metadata is available only during the life cycle of the session. JdbcCatalog and HiveCatalog are catalogs that can persist metadata.

Flink Catalog is extended and supports user customization. In order to use a custom catalog in Flink SQL, you need to implement the corresponding catalog factory by implementing the CatalogFactory interface. The factory was discovered using Java's Service Provider Interface (SPI). You can add classes that implement this interface to META_ INF/services/org.apache.flink.table.factories. FactoryJAR file.

2、 RocketMQ Flink Connector

RocketMQ connector provides Flink with the ability to consume and write data from RocketMQ Topic. Flink's Table API&SQL program can connect to other external systems for reading and writing batch and streaming tables. Source provides access to data stored in external systems such as databases, key value stores, message queues, or file systems. Sink sends data to an external storage system.

The Github warehouse of the project is:


3、 RocketMQ Flink Catalog

3.1 Design and Implementation

3.1.1 The design of RocketMQ Flink Catalog is mainly divided into two steps

Implement a RocketMqCatalogFactory to create a factory with configured catalog instances based on string properties. Add this implementation class to META_ INF/services/org.apache.flink.table.factories. Factory.

Inherit AbstractCatalog to implement RocketMqCatalog, and complete the query operation of database, table, partition and other information by implementing the methods in the Catalog interface.

The class diagram is as follows:

3.1.2 Storage of RocketMQ Flink Catalog

The underlying storage of RocketMQ Flink Catalog uses RocketMQ Schema Registry. When Flink calls the catalog, it interacts with the RocketMQ Schema Registry server through the RocketMQ Schema Registry client in the AbstractCatalog implementation class.

Database: returns the default.

Table: Get the corresponding schema from RocketMQ Schema Registry, and then parse the IDL and convert it to DataType.

Partition: get partition related information from RocketMQ through DefaultMQAdminExt.

RocketMQ Schema Registry is a topic schema management center. It provides a RESTful interface for registration, deletion, update, acquisition and reference modes of Topic (RocketMQ Topic). The New RocketMQ client can directly send structured data by associating Schema with Subject. Users no longer need to care about the details of serialization and deserialization.

3.1.3 API supported by RocketMQ Flink Catalog

At present, RocketMQ Flink Catalog supports the query and determination of the existence of databases, tables and partitions, but does not support the creation, modification and deletion. Therefore, the corresponding schema needs to be created through RocketMQ Schema Registry before use.

3.2 Guidelines

Table Environment is the core concept of integrating Table API&SQL in Flink. It is responsible for:

Register the Table in the internal Catalog.

Register an external catalog.

Load pluggable modules.

Execute SQL query.

Register a custom function (scale, table, or aggregation).

Convert a DataStream or DataSet to a Table.

Holds a reference to the ExecutionEnvironment or StreamExecutionEnvironment.

3.2.1 Create and register a catalog
Table API :
RocketMQCatalog rocketMqCatalog = new RocketMQCatalog("rocketmq_catalog", "default", " http://localhost:9876 ", " http://localhost:8080 ");
tableEnvironment. registerCatalog("rocketmq_catalog", rocketMqCatalog);
TableResult tableResult = tableEnvironment.executeSql(
"CREATE CATALOG rocketmq_catalog WITH (" +
"'type'='rocketmq_catalog'," +
"'nameserver.address'=' http://localhost:9876 '," +
"'schema.registry.base.url'=' http://localhost:8088 '); ");
3.2.2 Modifying the Current Catalog
Table API :
tableEnvironment. executeSql("USE CATALOG rocketmq_catalog");
3.2.3 List Available Catalogs
Table API :
String[] catalogs = tableEnvironment.listCatalogs();
TableResult tableResult = tableEnvironment. executeSql("show catalogs");
3.2.4 List Available Databases
Table API :
String[] databases = tableEnvironment.listDatabases();
TableResult tableResult = tableEnvironment. executeSql("show databases");
3.2.5 List Available Table
Table API:
String[] tables = tableEnvironment.listTables();
TableResult tableResult = tableEnvironment. executeSql("show tables");

3.3 Quick Start
The available RocketMQ and RocketMQ Schema Registry should be prepared in advance:
RocketMQ deployment: Introduction/02quickstart
RocketMQ Schema Registry deployment:
3.3.1 Create Topic
Create two topics, Rocketmq_ Source and rocketmq_ sink。
3.3.2 Register Source Schema
curl -X POST -H "Content-Type: application/json"
-d '{"schemaIdl":"
http://localhost:8088/schema -registry/v1/subject/rocketmq_ source/schema/rocketmq_ source_ schema
3.3.3 Register Sink Schema
curl -X POST -H "Content-Type: application/json"
-d '{"schemaIdl":"
http://localhost:8088/schema -registry/v1/subject/rocketmq_ sink/schema/rocketmq_ sink_ schema
3.3.4 Adding Dependencies
Create a task project and add the dependency of Rocketmq link:


At present, RocketMQ Schema Registry has not released an official version, only a snapshot version. If you find that the jar cannot be found, you can try the following methods:

Apache Snapshot Repository



3.3.5 Create Task
* @author lixiaoshuang
public class RocketMqCatalog {
public static void main(String[] args) {
//Initialize table environment parameters
EnvironmentSettings environmentSettings = EnvironmentSettings.newInstance().inStreamingMode().build();
//Create a table environment
TableEnvironment tableEnvironment = TableEnvironment.create(environmentSettings);
//Register Rocketmq catalog
"CREATE CATALOG rocketmq_catalog WITH (" +
"'type'='rocketmq_catalog'," +
"'nameserver.address'=' http://localhost:9876 '," +
"'schema.registry.base.url'=' http://localhost:8088 '); ");
tableEnvironment. executeSql("USE CATALOG rocketmq_catalog");
//From Rocketmq_ Get data from source and write it to Rocketmq_ Sink
TableResult tableResult = tableEnvironment. executeSql("INSERT INTO rocketmq_sink /*+ OPTIONS" +
"('producerGroup'='topic_producer_group') */ select * from rocketmq_source /*+ OPTIONS" +
"('consumerGroup'='topic_consumer_group') */");

After starting the task and running it, open the RocketMQ console and send it to Rocketmq_ Source This topic sends a message.
Then check Rocketmq_ The state of the sink. It will be found that the message has been written to Rocketmq_ Sink

Related Articles

Explore More Special Offers

  1. Short Message Service(SMS) & Mail Service

    50,000 email package starts as low as USD 1.99, 120 short messages start at only USD 1.00

phone Contact Us