Bytedance Flink status query practice and optimization

1、 Background

As we all know, the State in Flink saves the intermediate results of the operator calculation process. When an exception occurs to a task, you can obtain valid clues by querying the State in the task snapshot.

However, for Flink SQL tasks, when we want to query the job state, the cost of querying the state is usually too high because we can't know the definition method and specific type of the state.

In order to solve this problem, the ByteDance streaming computing team has internally proposed the solution of State Query on Flink SQL - users can simply query the State by writing SQL. This article will mainly introduce the related work of ByteDance in Flink status query.

2、 State Processor API Introduction

When it comes to state query, we naturally associate with the feature of Flink in version 1.9 -- State Processor API. Using the State Processor API, we can convert the Savepoint generated by the job into a DataSet, and then use the DataSet API to complete operations such as querying, modifying, and initializing the state.

The following is a brief introduction to how to use the State Processor API to complete the state query:

*First, create an ExistingSavepoint to represent a Savepoint. When initializing ExistingSavepoint, you need to provide information such as Savepoint path and StateBackend;

*Then implement the ReaderFunction to re-register the required state and define the way to process the state. In the process of querying the state, all keys will be traversed and the state will be operated according to the way we defined;

*Finally, you can complete the state query by calling Savepoint.readKeyedState and passing in the uid and ReaderFunction of the operator.

Next, we will briefly describe the principle behind the State query.
The Savepoint directory contains two types of files. One is the status data file, such as opA-1-state in the figure above. This file contains the detailed data of operator A in the first SubTask state; There is also a metadata file corresponding to the_ Metadata, the mapping relationship between each operator and the state file is saved in the metadata file.

When we are performing status query. First, on the client side, the metadata file will be parsed according to the savepoint path. Through operator ID, you can obtain the handle of the file corresponding to the status to be queried. When the state query is actually executed, the task responsible for reading the state will create a new StateBackend, and then restore the data in the state file to the StateBackend. After the state recovery is completed, all keys will be traversed and the corresponding state will be handed over to the ReaderFunction for processing.

Some students may ask, since the community has provided the function of querying the State, why should we do the same work? This is mainly because we found some problems in the process of using the State Processor API:

*Each time we query the State, we need to develop a Flink Batch task independently, which has a certain development cost for users;
*When implementing ReaderFunction, it is necessary to clearly understand the definition method of task state, including the name, type and State Descriptor of the State, which is a high threshold for users to use;
*When using the State Processor API, you can only query the status of a single operator, and you cannot query the status of multiple operators at the same time;
*You cannot directly query the meta information of the task status, such as the status used by the task or the type of a status.

In general, we have two goals. One is to reduce the use cost of users; The second is to enhance the status query function. We hope that users can use the simplest way to query the State; At the same time, you don't need to know any information.

In addition, we also hope that users can query the states of multiple operators at the same time, or directly query which states are used by the job, and what type of each state is.

Therefore, we propose the solution of State Query on Flink SQL. To put it simply, it is like treating the State as a database. Users can easily query the State by writing SQL.

In this solution, we need to solve two problems:

How to shield the information of the state from the user: referring to the State Processor API, we can know that a lot of information needs to be provided to query the state, such as Savepoint path, StateBacked type, operator id, State Descriptor, and so on. It is obviously difficult to fully express these complex information through SQL statements. So what exactly is needed to query the state, and how can we shield the user from the complex details in the state? This is the first difficulty we face.

How to express a state in SQL: The storage method of a state in Flink is not the same as Database. How can we express the query process of a state in SQL? This is another difficulty that we need to solve.

3、 StateMeta Snapshot mechanism

First, let's answer the first question. What information is needed to query a State?

You can refer to the example of the State Processor API above. When we create ExistingSavepoint and ReaderFunction, we need to provide information such as Savepoint path, Backend type, OperatorID, type of operator key, State name and Serializer. We can collectively call these as the meta information of the state.

For Flink SQL tasks, it is very high for users to understand this information clearly. Our idea is that users only need to provide the simplest information, namely the Savepoint ID, and then the Flink framework stores other meta information in the Savepoint, so that users can be shielded from the complex details of the State and complete the state query. Therefore, we introduced the StateMeta Snapshot mechanism.

StateMeta Snapshot is simply the process of adding state meta information to Savepoint Metadata. The specific steps are as follows:

*First, when registering the state, the task will save the meta information such as operatorName ID KeySerializer StateDescriptors in the task's memory;
*When the Savepoint is triggered, the Task will also take a snapshot of the meta information of the status while taking a snapshot. After the snapshot is completed, report the state meta information and the state file handle to the JobManager;
*After receiving the StateMeta information submitted by all Tasks, the JobManager will merge these state meta information, and finally save the merged state meta information in the file named stateInfo in the Savepoint directory.

Then, in the state query, you only need to parse the stateInfo file in the Savepoint, instead of requiring users to enter the meta information of these states through code. In this way, the cost of user query status can be greatly reduced.

4. State as Database

Next, let's answer the second question, how do we use SQL to express State. In fact, the community proposed some solutions when designing the State Processor API, that is, State As Database.

In traditional databases, a Table is usually represented by three elements: Catalog, Database, and Table. In fact, we can also map the same logic to Flink State. We can regard Flink's State as a special data source, and each Savepoint generated by a job is regarded as an independent DB. In this DB, we abstract the state meta information and state detailed data into different tables and expose them to users. Users can directly query these tables to obtain task status information.

First, let's look at how to represent State as a Table. We all know that in Flink, there are two types of commonly used State, namely KeyedState and OperatorState.

For OperatorState, it has only one attribute, Value, which is used to represent the specific value of this State. Therefore, we can represent OperatorState as a table structure containing only one Value field.

For KeyedState, the value of each State may be different under different Key and Namespace, so we can express KeyedState as a table structure containing three fields: Key, Namespace, and Value.

After we have abstracted a single State, it is easier to represent multiple States. It can be seen that in the example above, this operator contains 3 states, namely two KeyedStates and one OperatorState. We only need to simply union these tables, and then use the state_name field to distinguish different states. Indicates all states in this operator.

Finally, there is another question, how do we know which states are used by a task or the specific types of these states?

To solve this problem, we define a special table -- StateMeta, which is used to represent the meta information of all States in a Flink task. StateMeta contains the name of each State in a task, the operator ID of the State, the operator name, the type of Key and the type of Value, etc., so that users can directly query the StateMeta table to obtain the meta information of all states in the task .

5. Use Flink Batch SQL to query task status

The above is the overall introduction of the status query scheme. So how do we query a State? Let's take a Word Count task as an example to illustrate.

First, we need to create a Flink SQL task and start it. Through the web-ui, you can see that this task contains three operators, namely Source, Aggregate and Sink. Then, we can trigger the Savepoint, and get the corresponding SavepointID when the Savepoint is created successfully. We can query the job status through SavepointID.

If we don't know anything about the use of states in Flink SQL tasks, the first thing we need to query is which states are included in this Flink task and the types of these states. We can get this information from the StateMeta table. As shown in scene 1 in the figure above, by querying the StateMeta table, you can see that this task contains a ListState and a ValueState, which exist in the Source operator and the Aggregate operator respectively.

In addition, some students who are familiar with Flink know that the State in KafkaSource is used to record the offset information of current consumption. As shown in Scenario 2, we can obtain the Partition and Offset information of the Kafka Topic consumed in the task by querying the status of the Source operator.

There is also a relatively common scenario, such as downstream business students discovering that the result of a certain key (such as key_662) is abnormal. When locating the problem, we can directly query the status of the aggregate operator in the job, and at the same time specify that the key is equal to key_662 as the query condition. As shown in scene three in the above figure, you can see from the query results that when the key is 662, the corresponding aggregation result is 11290. In this way, users can easily verify whether the status is correct.

6. Future Outlook

In the future, we plan to further enrich the functions of State. Currently, we support the function of querying State using SQL. In fact, the community also provides the ability to modify and initialize State. In some scenarios, these capabilities are also more important. For example, we know that some keys in the state are calculated incorrectly, and we hope to correct this part of the data in the state; or the task logic is not fully compatible with the previous state after the change, at this time we hope to be able to modify and initialize the state to generate a new Savepoint. Similarly, in terms of usage, we also hope that users can directly use the insert and update syntax in SQL to complete state modification and initialization operations.

Second, we will further enhance the usability of State. We use the DAG editing solution to solve the state incompatibility problem when the job topology changes, but when the Flink SQL task modifies the field, the State Serializer may change, which also leads to state incompatibility. In response to this situation, we have designed a complete Flink SQL State Schema Evolution solution, which can greatly enhance the state recovery ability of Flink SQL tasks after changes, and the solution is currently being implemented. In addition, we also provide a comprehensive pre-check capability for state recovery, which can check whether the state is compatible and notify the user before the task goes online, so as to avoid the impact of job startup failure caused by state incompatibility on the line.

Related Articles

Explore More Special Offers

  1. Short Message Service(SMS) & Mail Service

    50,000 email package starts as low as USD 1.99, 120 short messages start at only USD 1.00

phone Contact Us