Flink builds large-scale real-time risk control system

At present, Flink basically serves all BUs of the group, with a computing capacity of 4 billion items per second at the peak of Double Eleven, and more than 30,000 computing tasks, using a total of 1 million + Core; covering almost all specific businesses within the group , For example: data center, AI center, risk control center, real-time operation and maintenance, search recommendation, etc.

1. Build a risk control system based on Flink

Risk control is a big topic, involving rule engine, NoSQL DB, CEP, etc. This chapter mainly talks about some basic concepts of risk control. On the big data side, we divide the risk control into a 3 × 2 relationship:

2 means risk control is either rule-based, algorithm-based or model-based;
3 represents three types of risk control: pre-event risk control, event risk control and post-event risk control.

1.1 Three types of risk control business

For in-event risk control and post-event risk control, the perception on the terminal is asynchronous, and for pre-event risk control, the perception on the terminal is synchronous.

* Here is a little explanation for the prior risk control. The prior risk control is to store the trained model or the calculated data in databases such as Redis and MongoDB;

*One way is to have rule engines like Sidden, Groovy, and Drools on the end to directly fetch data from Redis and MongoDB to return results;

*Another way is based on Kubeflow KFserving. After the request comes from the end, the result is returned based on the trained algorithm and model.

Generally speaking, the delay of these two methods is about 200 milliseconds, which can be used as a synchronous RPC or HTTP request.

For the big data scenario related to Flink, it is an asynchronous risk control request, and its asynchronous timeliness is very low, usually one or two seconds. If you pursue ultra-low latency, you can think of it as a kind of risk control in the event, and the risk control decision-making process can be handled by machines.

A very common type is to use Flink SQL for indicator threshold statistics, Flink CEP for behavior sequence rule analysis, and the other is to use Tensorflow on Flink to describe algorithms in Tensorflow, and then use Flink to execute the calculation of Tensorflow rules .

1.2 Flink is the best choice for regular risk control

At present, Flink is the best choice for risk control in the Ali Group. There are three main reasons:

*event driven
* Latency in milliseconds
*Flow batch integration

1.3 Three elements of rules and risk control

There are three elements in rule risk control, and all the content that will be discussed later revolves around these three elements:

Facts: Refers to risk control events, which may come from business parties or log buried points, and are the input of the entire risk control system;
Rules: It is often defined by the business side, that is, what kind of business goals this rule should meet;
Threshold Threshold: The severity of the description corresponding to the rule.

1.4 Enhancement of Flink rule expression

For Flink, it can be divided into two types: stateless rules and stateful rules, among which stateful rules are the core of Flink's risk control:

Stateless rules: mainly for data ETL. One scenario is that when a field value of a certain event is greater than X, the current risk control behavior is triggered; the other scenario is that the downstream of the Flink task is a model or algorithm-based For risk control, there is no need to make rule judgments on the Flink side, but to vectorize and normalize the data, such as multi-stream association, Case When judgment, etc., to turn the data into a 0/1 vector, and then push it to the downstream TensorFlow for prediction.

Stateful rules:

*Statistical rules: Calculation rules based on statistical analysis. For example, if the number of visits is greater than 100 within 5 minutes, the risk control is considered to be triggered;
*Sequential rules: In the event sequence, an event has an impact on the previous and subsequent events, such as clicking, adding to the shopping cart, and deleting three events. This continuous sequence of behaviors is a special behavior, and it may be considered that this behavior is malicious Reduce the evaluation score of merchant products, but these three events are not a risk control event independently;
*Alibaba Cloud's real-time computing Flink improves the sequence-based rule capabilities, providing technical support for e-commerce transaction scenarios on the cloud and within the group;

Hybrid rules: a combination of both statistical and sequential.


2. Ali’s actual risk control

This chapter mainly introduces how Ali satisfies the three elements of risk control mentioned above in terms of engineering.

From the perspective of overall technology, it is currently divided into three modules: perception, processing and insight:

Perception: The purpose is to perceive all abnormalities and find problems in advance, such as capturing some data types that are different from the common data distribution, and output a list of such abnormalities; another example is that the sales of helmets increase due to the adjustment of the riding policy in a certain year, As a result, the click rate and conversion rate of related products will increase. This situation needs to be captured in time, because it is a normal behavior rather than cheating;
Disposal: that is, how to implement the rules. Now there are three lines of defense: hourly, real-time, and offline. Compared with the matching of a single policy before, the accuracy after association and integration will be higher, such as associating certain users in the recent period continuous behavior to carry out comprehensive research and judgment;
Insight: In order to discover some risk control behaviors that are currently not perceived and cannot be directly described by rules, for example, risk control needs to represent samples with a high degree of abstraction. It must first be projected into an appropriate subspace, and then combined with the time dimension in the Some features are found in high-dimensional to identify new abnormalities.

2.1 Phase 1: SQL real-time association & real-time statistics
At this stage, there is a SQL-based evaluation risk control system, using simple SQL to do some real-time association and statistics, such as using SQL for aggregation operations SUM(amount) > 50, where the rule is SUM(amount), and the threshold corresponding to the rule is 50; assuming that 10, 20, 50, and 100 rules are running online at the same time, because a single Flink SQL job can only execute one rule, then it is necessary to apply for 4 Flink jobs for these 4 thresholds. The advantage is that the development logic is simple and the job isolation is high, but the disadvantage is that it greatly wastes computing resources.

2.2 Phase 2: Broadcast Stream

The main problem of the risk control rules in Phase 1 is that the rules and thresholds are immutable. There are currently some solutions in the Flink community, such as based on BroadcastStream. In the figure below, Transaction Source is responsible for event access, and Rule Source is a BroadcastStream , when there is a new threshold, it can be broadcast to each operator through BroadcastStream.

For example, it is judged that the risk control object has been accessed more than 10 times in a row within one minute, but it may have to be changed to 20 or 30 times on 618 or Double 11 before it will be perceived by the online system downstream of the risk control system.

In the first stage, there are only two options: the first is to run all jobs online; the second is to stop a Flink job at a certain moment and start a new job based on a new indicator.

If it is based on BroadcastStream, the distribution of rule indicator thresholds can be realized, and the online indicator thresholds can be directly modified without restarting the job.

2.3 Phase 3: Dynamic CEP

The main problem in the second stage is that it can only update the index threshold. Although it greatly facilitates the business system, it is actually difficult to satisfy the upper-level business. There are two main appeals: combining CEP to realize the perception of behavior sequences; combining CEP can still dynamically modify the threshold and even the rules themselves.

In the third stage, Alibaba Cloud Flink made a high degree of abstraction related to CEP, decoupling the CEP rules and CEP execution nodes, that is to say, the rules can be stored in external third-party storage such as RDS and Hologres, and after the CEP job is published, the database can be loaded The CEP rules in CEP are used to achieve dynamic replacement, so the expressiveness of the job will be enhanced.

Secondly, the flexibility of the job will be enhanced. For example, if you want to see some behaviors under a certain APP and update the indicator threshold of this behavior, you can update the CEP rules through third-party storage instead of Flink itself.

Another advantage of doing this is that the rules can be exposed to the upper-level business side, so that the business can really write the risk control rules. We become a real rule center, which is the benefit of the dynamic CEP capability. In Alibaba Cloud's services, the dynamic CEP capability has been integrated into the latest version. Alibaba Cloud's fully managed Flink service greatly simplifies the development cycle of risk control scenarios.

2.4 Phase Four: Shared Computing

Going one step further on the basis of Phase 3, Alibaba Cloud has implemented a "shared computing" solution. In this shared computing solution, CEP rules can be completely described by the modeling platform, and exposed to upper-level customers or business parties as a very friendly rule description platform, which can be coupled through drag-and-drop or other methods, and then in the scheduling engine Select the Event Incoming Source to run the rule on. For example, the two models now serve the Taobao APP, and they can completely fall into the same Flink CEP job of Fact, so that the business side, execution layer, and engine layer can be completely decoupled. Currently, Alibaba Cloud's shared computing solutions are very mature, with rich customer implementation practices.


2.5 Stage five: separation of business development and platform construction

Among the three parties on the engine side, platform side, and business side, Phase 4 can achieve decoupling between the engine side and the platform side, but the business side is still highly bound. The working mode of the two is still a collaborative relationship between Party A and Party B, that is, the business side controls the business rules, and the platform side accepts the risk control needs of the business team to develop risk control rules. But the platform team usually gives priority to personnel, and the business team will grow stronger and stronger as the business develops.

At this time, the business side itself can abstract some basic concepts, precipitate some common business specifications, and assemble them into a friendly DSL, and then implement job submission through Alibaba Cloud's fully decoupled Open API.

Due to the need to support nearly 100 BUs in the group at the same time, there is no way to provide customized support for each BU. We can only open up the capabilities of the engine as much as possible, and then submit the business side to the platform through DSL packaging to truly implement When it comes time, only one middle station is exposed to customers.

3. Difficulties in large-scale risk control technology

This chapter mainly introduces some technical difficulties of large-scale risk control, and how Alibaba Cloud breaks through these technical difficulties in fully managed Flink commercial products.

3.1 Fine-grained resource adjustment

In stream computing systems, data sources are often not blocked nodes. The upstream data reading node does not have performance problems because there is no computing logic, and the downstream data processing node is the performance bottleneck of the entire task.

Since Flink's jobs are resource-divided by slots, the default source node and worker node have the same concurrency. In this case, we hope that the concurrency of the source node and the CEP worker node can be adjusted separately. For example, in the figure below, we can see that the concurrency of the CEP worker node of a job can reach 2000, while the source node only needs 2 parallel degrees , which can greatly improve the performance of CEP nodes.

In addition, it divides the TM memory and CPU resources where the CEP work nodes are located. In the open source Flink, the TM is isomorphic as a whole, that is to say, the source node and the work node have exactly the same specifications. From the perspective of saving resources, the source node does not need as much memory and CPU resources as the CEP node in the real production environment. The source node only needs a small CPU and memory to meet the data capture.

Alibaba Cloud's fully managed Flink enables source nodes and CEP nodes to run on heterogeneous TMs, that is, CEP worker node TM resources are significantly larger than source node TM resources, and CEP work execution efficiency will become higher. Considering the optimization brought about by fine-grained resource adjustment, fully managed services on the cloud can save 20% of the cost compared with self-built IDC Flink.

3.2 Flow-batch Integration & Adaptive Batch Scheduler

If the flow engine and the batch engine do not adopt the same set of execution modes, they will often encounter inconsistent data calibers. The reason for this problem is that it is difficult to fully describe the flow rules under the batch rules; for example, in Flink, there is a A special UDF, but there is no corresponding UDF in the Spark engine. When such data calibers are inconsistent, which aspect of the data caliber to choose becomes a very important issue.

On the basis of Flink's stream-batch integration, the CEP rules described in stream mode can be run again with the same caliber in batch mode and get the same results, so there is no need to develop batch-mode-related CEP jobs.

On top of this, Ali implemented an adaptive Batch Scheduler. In fact, the daily effect output of CEP rules is not necessarily balanced. For example, there is no abnormal behavior in today's behavior sequence, and there is only a small amount of data input downstream. At this time, an elastic cluster will be reserved for batch analysis; when When the results of CEP are few, the downstream batch analysis only needs very small resources, and even the parallelism of each batch analysis work node does not need to be specified at the beginning, and the work nodes can be based on the output of upstream data and task load To automatically adjust the degree of parallelism in batch mode, and truly achieve elastic batch analysis, this is the unique advantage of Alibaba Cloud's Flink stream-batch integrated Batch Scheduler.

3.3 Combined reading reduces the pressure on the common layer

This is a problem encountered in practice. The current development model is basically based on the data center, such as real-time data warehouse. In the real-time data warehouse scenario, there may not be many data sources, but the middle layer DWD will become many, the middle layer may evolve into many DWS layers, and even evolve into many data marts for use by various departments , in this case, the reading pressure of a single table will be very high.

Usually multiple source tables are associated (widened) with each other to form a DWD layer, which is dependent on multiple DWD tables from the perspective of a single source table. The DWD layer is also consumed by jobs in multiple different business domains to form DWS. Based on this situation, Ali has implemented source-based merging. It only needs to read DWD once and the Flink side will help you process it into multiple DWS tables in the business domain, which can greatly reduce the execution pressure on the public layer.


3.4 State backend for KV separation design

When a CEP node is executed, it will involve very large-scale local data reading, especially in the calculation mode of behavior sequence, because it needs to cache all the previous data or the behavior sequence within a certain period of time.

In this case, a relatively big problem is that there is a very large performance overhead for back-end state storage (such as: RocksDB), which will affect the performance of CEP nodes. At present, Alibaba has implemented the state backend designed for KV separation. Alibaba Cloud Flink uses Gemini as the state backend by default. The measured performance in the CEP scenario has at least 100% improvement.


3.5 Dimensional data partition loading

In many cases, risk control needs to be analyzed based on historical behavior. Historical behavior data is generally stored in Hive or ODPS tables, and the scale of this table may be at the terabyte level. The open source Flink needs to load this super large dimension table on each dimension table node by default, which is actually unrealistic. Alibaba Cloud implements shuffle-based partitioning of in-memory data, and dimension table nodes will only load data belonging to the current shuffle partition.


4. Alibaba Cloud Flink FY23 Risk Control Evolution Plan

For Alibaba Cloud as a whole, the FY23 evolution plan includes the following:

* Enhanced expressive power
* Enhanced observability
*Enhancement of executive ability
*Performance enhancement

Related Articles

Explore More Special Offers

  1. Short Message Service(SMS) & Mail Service

    50,000 email package starts as low as USD 1.99, 120 short messages start at only USD 1.00

phone Contact Us