Implementation of a large-scale real-time risk control system based on Flink in Alibaba

At present, Flink basically serves all BUs of the Group. At the peak of the Double 11 Festival, the computing capacity has reached 4 billion pieces per second, and the number of computing tasks has reached more than 30,000, using a total of 1 million+Core; It covers almost all specific businesses within the group, such as: data middle office, AI middle office, risk control middle office, real-time operation and maintenance, search and recommendation, etc.

1、 Build risk control system based on Flink

Risk control is a big topic, involving rule engine, NoSQL DB, CEP, etc. This chapter mainly introduces some basic concepts of risk control. On the big data side, we divide risk control into 3 × 2:

• 1 2 3represents that risk control is either rule based or algorithm or model based; 4;
• 3 represents three types of risk control: prior risk control, incident risk control and post event risk control.

1.1 Three risk control businesses

For event risk control and post event risk control, the perception on the end is asynchronous, while for pre risk control, the perception on the end is synchronous.

Here are some explanations for the prior risk control. The prior risk control is to store the trained model or the calculated data in Redis, MongoDB and other databases;

• One way is that rule engines like Sidden, Groovy and Drools on the end directly go to Redis and MongoDB to fetch data and return results;

• The other method is based on Kubeflow KFserving, which returns the results based on the trained algorithm and model after the end request.

On the whole, the latency of both methods is about 200 milliseconds, which can be used as a synchronous RPC or HTTP request.

For Flink related big data scenarios, it is an asynchronous risk control request. Its asynchronous timeliness is very low, usually one second or two seconds. If the ultra-low delay is pursued, it can be considered as a kind of risk control in the event, and the risk control decision-making process can be handled by the machine.

A common type is to use Flink SQL to make statistics on indicator thresholds, Flink CEP to analyze behavior sequence rules, and TensorFlow on Flink to describe algorithms in TensorFlow, and then use Flink to calculate TensorFlow rules.

1.2 Flink is the best choice for rule risk control

At present, Flink is the best choice for risk control within Alibaba Group for three reasons:

• Event driven

• Millisecond latency

• Streaming batch integration

1.3 Three Elements of Rule Risk Control

There are three elements in the rule risk control, and all the following contents are carried out around these three elements:

• Facts: refers to risk control events, which may come from business parties or log burial points, and are input to the entire risk control system;

• Rules: they are often defined by the business side, that is, what business goals this rule should meet;

• Threshold: the severity of the description corresponding to the rule.

1.4 Flink rule expression enhancement

For Flink, it can be divided into stateless rules and stateful rules, of which stateful rules are the core of Flink risk control:

• Stateless rule: mainly used for data ETL. One scenario is that when a word value segment of an event is greater than X, the current risk control behavior will be triggered; Another scenario is that the downstream of the Flink task is a risk control based on model or algorithm. There is no need to make rule judgments on the Flink side, but to quantize and normalize the data, such as multi stream correlation, case when judgment, etc., to turn the data into a 0/1 vector, and then push it to the downstream TensorFlow for prediction.

• Stateful rules:

• Statistical rules: calculation rules based on statistical analysis. For example, if the number of visits within 5 minutes is more than 100, risk control is considered to be triggered;

• Sequential rules: in the event sequence, an event has an impact on the preceding and following events, such as clicking, adding a shopping cart, and deleting three events. This continuous sequence of actions is a special behavior, which may be considered as maliciously reducing the evaluation score of the merchant's goods, but these three events are not a risk control event independently; Alibaba Cloud real-time computing Flink improves the rule capability based on sequence, providing technical escort for e-commerce transaction scenarios on the cloud and within the group;

• Mixed rules: combination of statistical and sequential.

2、 Alibaba Risk Control Practice

This chapter mainly introduces how Alibaba meets the three risk control elements mentioned above in engineering.

In terms of overall technology, it is currently divided into three modules: perception, disposal and insight:

• Perception: the purpose is to perceive all exceptions and find problems in advance, such as capturing some data types different from common data distribution and outputting a list of such exceptions; Another example is that one year, because of the adjustment of cycling policy, the sales volume of helmets will increase, which will lead to an increase in the click through rate and conversion rate of related products. This situation needs to be perceived and caught in time, because it is a normal behavior rather than cheating;

• Disposal: that is, how to implement rules. Now there are three lines of defense: hourly, real-time and offline. Compared with the previous matching of single policies, the accuracy of association and integration will be higher. For example, comprehensive research will be made on the continuous behavior of some users in the recent period of time;

• Insight: In order to find some risk control behaviors that are not perceived at present and can not be directly described by rules, for example, risk control needs to be highly abstract to represent the samples, which should be projected to an appropriate subspace first, and then combined with the time dimension to find some features in high dimensions to identify new anomalies.

2.1 Phase I: SQL real-time correlation&real-time statistics

At this stage, there is a risk control system based on SQL evaluation, which uses simple SQL to do some real-time correlation and statistics. For example, using SQL to aggregate SUM (amount)>50, where the rule is SUM (amount), and the threshold value corresponding to the rule is 50; Suppose that there are four kinds of rules: 10, 20, 50, and 100 running online at the same time. Because a single Flink SQL job can only execute one rule, you need to apply for four Flink jobs for these four thresholds. The advantage is that the development logic is simple and the job isolation is high, but the disadvantage is that computing resources are greatly wasted.

2.2 Phase II: Broadcast Stream

The main problem of the risk control rules in Phase I is that the rules and thresholds are immutable. At present, there will be some solutions in the Flink community, such as implementation based on BroadcastStream. In the figure below, the Transaction Source is responsible for the access of events. The Rule Source is a BroadcastStream. When there is a new threshold, it can be broadcast to various operators through BroadcastStream.

For example, it is judged that the risk control object accessed more than 10 times in a minute, but it may need to be changed to 20 or 30 times in 618 or Double 11 before it can be perceived by the online system downstream of the risk control system.

In the first stage, there are only two choices: the first is to run all the homework online; The second is to stop a Flink job at a certain moment and start a new job based on the new indicator.

If it is based on BroadcastStream, the rule indicator threshold can be issued, and the online indicator threshold can be modified directly without job restart.

2.3 Phase III: Dynamic CEP

The main problem of Phase II is that only the indicator threshold can be updated. Although it greatly facilitates the business system, it is actually difficult to meet the upper business. There are two main appeals: combining CEP to realize the perception of behavior sequence; After combining with CEP, we can still dynamically modify the threshold value and even the rule itself.

In phase 3, Alibaba Cloud Flink has made a high degree of abstraction related to CEP, decoupled CEP rules and CEP execution nodes, which means that rules can be stored in external third-party storage such as RDS and Hologres. After CEP jobs are published, CEP rules in the database can be loaded to achieve dynamic replacement, so the job expression ability will be enhanced.

Secondly, job flexibility will be enhanced. For example, if you want to see some behaviors under an APP and update the indicator threshold of this behavior, you can update the CEP rules through third-party storage rather than Flink itself.

Another advantage of this approach is that it can expose the rules to the upper business, so that the business can truly write risk control rules. We become a real rule center, which is the benefit of dynamic CEP capabilities. In Alibaba Cloud's services, dynamic CEP capabilities have been integrated into the latest version. Alibaba Cloud's fully hosted Flink service greatly simplifies the development cycle of risk control scenarios.

2.4 Phase IV: Shared Computing

On the basis of Phase 3, Alibaba Cloud has developed a "shared computing" solution. In this shared computing solution, CEP rules can be completely described by the modeling platform, which exposes a very friendly rule description platform to upper level customers or business parties. It can be coupled through drag and drop or other methods, and then select the event access source on the scheduling engine to run the rules. For example, now both models are used for Taobao APP, which can be completely applied to the Flink CEP job of the same Fact, so that the business side, execution layer and engine layer can be completely decoupled. At present, Alibaba Cloud's shared computing solutions are very mature, with rich customer practice.

2.5 Phase V: Separation of business development and platform construction

Between the engine side, platform side and business side, phase 4 can achieve decoupling between the engine side and platform side, but it is still highly bound to the business side. The working mode of both parties is still the collaborative relationship between Party A and Party B. That is, the business side grasps the business rules, and the platform side accepts the risk control needs of the business team to develop the risk control rules. However, the platform team usually gives priority to personnel, and the business team will grow stronger with business development.

At this time, the business side itself can abstract some basic concepts, precipitate some common business specifications, assemble a friendly DSL, and then submit jobs through Alibaba Cloud's fully decoupled Open API.

Since it is necessary to support nearly 100 BUs in the group at the same time, there is no way to provide customized support for each BU. Instead, we can only open the engine's capabilities as much as possible, and then the business side submits them to the platform through DSL encapsulation, which truly exposes only one middle office to customers.

3、 Technical difficulties of large-scale risk control

This chapter mainly introduces some technical difficulties in large-scale risk control, and how Alibaba Cloud can overcome these technical difficulties in fully hosted Flink commercial products.

3.1 Fine grained resource adjustment

In stream computing systems, data sources are often not blocking nodes. The upstream data reading node does not have performance problems because there is no computing logic. The downstream data processing node is the performance bottleneck of the entire task.

Because Flink jobs are divided into resources by Slot, the default Source node and the work node have the same concurrency. In this case, we want to adjust the concurrency of the source node and the CEP work node separately. For example, in the figure below, we can see that the concurrency of the CEP work node of a job can reach 2000, while the source node only needs two parallelism degrees, which can greatly improve the performance of the CEP node.

In addition, it divides the TM memory and CPU resources where the CEP work node is located. In open-source Flink, TM is isomorphic as a whole, that is, the source node and the work node are of the same specifications. From the perspective of resource saving, in a real production environment, the Source node does not need the same amount of memory and CPU resources as the CEP node. The Source node only needs a smaller CPU and memory to meet data capture.

Alibaba Cloud's fully hosted Flink enables the source node and CEP node to run on heterogeneous TMs, that is, CEP work node TM resources are significantly greater than source node TM resources, and CEP work execution efficiency will become higher. Considering the optimization brought about by fine-grained resource adjustment, the full hosting service on the cloud can save 20% of the cost compared with the self built IDC Flink.

3.2 Streaming Batch Integration&Adaptive Batch Scheduler

If the flow engine and batch engine do not adopt the same set of execution modes, they will often encounter inconsistent data caliber. The reason for this problem is that it is difficult to fully describe the flow rules under batch rules; For example, there is a special UDF in Flink, but there is no corresponding UDF in Spark engine. When the data caliber is inconsistent, it becomes a very important question which aspect of the data caliber to choose.

On the basis of Flink streaming and batching, the CEP rules described by the streaming mode can be run again under the batch mode with the same caliber and get the same results, so there is no need to develop CEP jobs related to the batch mode.

On this basis, Alibaba has implemented an adaptive Batch Scheduler. In fact, the daily effect output of CEP rules is not necessarily balanced. For example, there is no abnormal behavior in today's behavior sequence, and there are only a few data inputs downstream. At this time, an elastic cluster will be reserved for batch analysis; When there are few CEP results, downstream batch analysis requires only a small amount of resources. Even the parallelism of each batch analysis work node does not need to be specified at the beginning. The work node can automatically adjust the parallelism in batch mode according to the output of upstream data and the workload of the task, which truly achieves elastic batch analysis. This is the unique advantage of Alibaba Cloud Flink Batch Scheduler.

3.3 Combined reading to reduce common layer pressure

This is a problem encountered in practice. The current development mode is basically based on the data middle platform, such as real-time data warehouse. In the real-time data warehouse scenario, there may not be many data sources, but the middle layer DWD will become many. The middle layer may evolve into many DWS layers, or even into many data marts for use by various departments. In this case, the reading pressure of a single table will be great.

Usually, multiple source tables are associated with each other (widened) to form a DWD layer. From the perspective of a single source table, it is dependent on multiple DWD tables. The DWD layer will also be consumed by operations in different business domains to form a DWS. Based on this situation, Alibaba has implemented source based consolidation. You only need to read the DWD once, which will help you process multiple DWS tables in the business domain on the Flink side. This can greatly reduce the execution pressure on the public layer.

3.4 Status rear end of KV separation design

When the CEP node executes, it will involve very large-scale local data reading, especially in the behavior sequence computing mode, because it needs to cache all the previous data or the behavior sequence within a certain time.

In this case, a big problem is that the back-end state storage (such as RocksDB) has a very large performance overhead, which will affect the performance of CEP nodes. At present, Alibaba has implemented the status backend of the KV separation design. Alibaba Cloud Flink uses Gemini as the status backend by default. The measured performance in the CEP scenario has been improved by at least 100%.

3.5 Dimension Data Partition Loading

In many cases, risk control needs to be analyzed based on historical behavior. Historical behavior data will generally be stored in Hive or ODPS tables, and the scale of this table may be TB. Open source Flink needs to load this super large dimension table on each dimension table node by default, which is actually unrealistic. Alibaba Cloud implements the segmentation of memory data based on Shuffle. The dimension table node will only load data belonging to the current Shuffle partition.

4、 Alibaba Cloud Flink FY23 Risk Control Evolution Plan

For Alibaba Cloud as a whole, the evolution plan of FY23 includes the following:

• Increased expressiveness
• Increased observability
• Enhanced execution capability
• Performance enhancements

Related Articles

Explore More Special Offers

  1. Short Message Service(SMS) & Mail Service

    50,000 email package starts as low as USD 1.99, 120 short messages start at only USD 1.00

phone Contact Us