Construction and application of feature platform in Shuhe

1、 Feature Platform Overview

First is the overview of the feature platform. The entire feature platform is divided into four layers, namely data service, storage service, computing engine and original storage. The data service layer provides external services, mainly including four types:

• First, traditional API spot checks;
• Second, polling;
• Third, event messages;
• The fourth is synchronous call calculation.

The synchronous call computing service is real-time computing, which is equivalent to on-site policy computing, while the API check service is pre calculated and stored. In order to provide data services, two service modes of feature row storage and feature column storage are provided, which respectively support API click query and circle query. There are two computing engines, offline computing engine and streaming batch computing engine. The bottom layer of the feature platform is the original storage, which is used to support offline computing, while the event storage is used to support streaming and batch computing.

Next, take MySQL as an example to introduce the simplified data flow process of the feature platform.

First is the offline part, which extracts the data of MySQL database to EMR through Sqoop or other extraction tools, and then stores the final operation results in HBase and ClickHouse through Hive operation, corresponding to feature row storage and feature column storage respectively, to provide API click query and round robin query services. At the same time, MySQL's Binlog will be written to Kafka in real time, and then Kafka's data will be consumed into the Flink streaming batch integrated computing engine. Meanwhile, Kafka's data will also be consumed into the event storage HBase, and the event storage HBase's data will also be provided to the Flink streaming batch integrated computing engine. After the engine calculates, the data is written to HBase and ClickHouse, and an event message is also sent. The data transferred to the event store HBase can provide real-time call services.

2、 Feature storage service

Next, we will introduce the feature storage service.

We divide the features into four categories:

• Synchronization features: real-time writing, offline correction, streaming and batching.
• Instant calculation feature: API call operation, offline batch calculation, consistent logic.
• Real time feature: Traditional real-time link can realize complex real-time logic and can be replaced by streaming and batching.
• Offline feature: Traditional offline link realizes complex offline logic.

There are several reasons why there must be offline links:

First, the real-time link is a pure incremental link. Its link will be very long, and problems may occur at any link. Once an error occurs, the data will not be automatically corrected for a long time.

Second, real-time links require timeliness, especially when it comes to multi stream joins. Once there is a delay, you need to return a downgrade result as soon as possible. In order to control the final error rate of real-time features and limit the error to a small period of time, offline link correction is required. The feature storage service can be modified in two ways. One is synchronization feature, which is a stream batch link with its own modification; Other features are generally combined by real-time+offline+real-time computing.

Take MySQL as an example to introduce the overall data flow of the storage service. For the offline part, the data of MySQL database is extracted to EMR through the Sqoop extraction tool, and then the final operation results are saved to HBase and ClickHouse through Hive operation. At the same time, Binlog will be written to Kafka in real time, and then Kafka's data will be consumed into the Flink streaming and batch computing engine. After the engine's calculation, the data will be written to HBase and ClickHouse. HBase and ClickHouse provide API click query and circle query services.

2.1 Real time characteristic data flow

In the real-time data stream, MySQL writes Kafka data and Kafka data of other buried point classes through Binlog, writes the results to another Kafka after calculation, and finally writes the consumption data to HBase and ClickHouse.

2.2 Offline Characteristic Data Flow

In the offline feature data stream, MySQL is extracted through Sqoop, OSS through Spark or other methods, Kafka uses Flume to extract and enter EMR, and then uses Hive or Spark to perform operations. At the same time, it is written into HBase and ClickHouse.

2.3 Synchronous characteristic data flow

In the synchronous feature data stream, MySQL's Binlog will be written into the real-time Kafka, and then Kafka's data will be written into the event storage in real time. At the same time, MySQL will also be modified and initialized offline. Flink processes streams and batches at the same time, and writes them to HBase and ClickHouse.

2.4 Real time calculation of characteristic data flow

In the real-time computing feature data stream, API click query and circle query services are provided based on the data of HBase and ClickHouse.

This is the introduction of the entire storage service. This part involves most of the feature storage services, as shown in the orange part of the figure.

3、 Integrated streaming batch solution

In the time when we only provided feature storage services, we found many problems and some business demands. First, some questions:

• Is there any data that has not been used before intensive cultivation of existing model strategies? For example, the point in time data of status changes in MySQL.

• Is the offline logic of the input item complete enough? Why should the real-time input item be rearranged and supplemented? If offline input items want to be programmed in real time, they need to reorganize the logic. Some of them are even too complex to complete real-time conversion in traditional ways.

• Uncertain use scenarios, unable to distinguish spot check and run batch, can they be covered at the same time? For many business personnel, it is unknown whether the desired model and strategy ultimately need to be run in batch or checked. Is there any way to meet both requirements.

• It is difficult to understand the stream processing logic. Why stream join? Can't you directly "fetch data"? For model developers, they do not know the process of stream processing, so it is difficult for them to make real-time features.

• The air running test of real-time model strategy takes a long time, can it be shortened?

• The model strategy development training is fast, but the online development of real-time input items takes a long time. Can you accelerate it?

For these problems, we propose some solutions:

• [Data] stores state change data and supports restoring the data slice state at any time. This also has an additional advantage. There will be no feature traversal problem when model training is conducted through the streaming batch integration scheme, because there is no way to get the future number.

• [Logic] Integration of flow and batch, focusing on flow, consistent logic, no need to verify caliber. This data is used for training, and the same data is used for online and back testing, which can ensure that the final results are consistent.

• Integration of [Execute] flow, batch and call, adaptive to different scenarios.

• [Development] uses "fetching" instead of stream merging to encapsulate the unique concept of real-time streams and reduce the threshold of real-time development.

• [Test] supports backtracking test in any time period to increase the speed of real-time development and test.

• [Go online] The self-service flow batch integrated model is developed and launched online, reducing communication links and increasing online efficiency.

Traditional real-time streaming schemes include Lambda and Kappa.

Lambda provides two sets of logic, real-time and offline, and finally combines them in the database. Lambda has the advantages of simple architecture, good combination of offline batch processing and real-time stream processing, stable and controllable real-time computing cost, and easy to revise offline data; The disadvantage is that real-time and offline data are difficult to maintain the results, and two systems need to be maintained.

3.1 Streaming Batch Integration Scheme

Kappa uses real-time logic to save historical data, get one slice of data each time, and finally merge it. Kappa has the advantage that it only needs to maintain the real-time processing module, and can replay messages without offline real-time data consolidation; The disadvantage is that it strongly relies on the caching capability of the message middleware, and data loss occurs during real-time data processing, which is unacceptable in the financial field

As Kappa abandons the offline processing module, it also abandons the feature that offline computing is more reliable and stable. Although Lambda ensures the stability of offline computing, the maintenance cost of the dual systems is very high, and the operation and maintenance of the two sets of codes are very complex.

Therefore, we propose the streaming batch integration scheme of Lambda+Kappa. As shown in the figure, the first half of the data flow is the lambda architecture, and its center is an HBase event storage; The second part is the Kappa architecture for users to complete stream processing and batch processing.

The figure above shows the overall streaming and batching solution with MySQL as an example. First, MySQL Binlog enters Kafka, and sends data to the event center through offline modification and slicing. At the same time, the same Kafka is used to trigger real-time streams. Then the event center will provide data acquisition and offline batch running services. Finally, the metadata center manages and maintains data uniformly to avoid synchronization. Flink provides the entire logical service.

3.2 Event Center

The event center in the figure uses the lambda architecture to store all the changed data, make daily corrections, and pursue the best cost performance through the cold and hot mixed storage and reheating mechanism. In addition, we refer to Flink's watermark mechanism to ensure that the current value is synchronized. Finally, the event center provides the message forwarding mechanism and the asynchronous to synchronous mechanism, replacing the flow Join with "fetching", the message forwarding mechanism, and the asynchronous to synchronous mechanism. Support triggering - message receiving and triggering - polling call, and give the interface the ability to trace back.

The data flow of the event center is described below. As shown in the figure, multiple data sources such as MySQL and Kafka are forwarded to Kafka through different paths. Then Flink directly consumes Kafka and writes it to HBase hot storage in real time. In addition, offline corrected data is also written to HBase heat storage through EMR. Another set of Replica mechanisms completes the replication between HBase hot storage and HBase cold storage. The data of HBase cold storage will also be reheated into HBase hot storage.

The storage structure of the whole event center is shown in the figure. In the cold storage, only the main data is placed. In the hot storage, in addition to the subject data, there are three tables for different index work. The TTL of the hot storage is generally 32 days, which can be adjusted under special circumstances.

In the read data stream of the event center, the real-time trigger is Kafka, and the backtracking and retrieval are both HBase hot storage. The internal reheating mechanism updates the data from HBase cold storage to HBase hot storage. This part of the logic is transparent to developers, who do not need to pay attention to where the data comes from.

The following describes the watermarking mechanism and stream join of the event center.

Suppose we want to join two streams, which can also be simply understood as two tables that are related by a foreign key. When any table changes, we need to trigger at least one final complete join record. We record the two flows as A and B respectively, and assume that A flows first. When the event center watermarking mechanism is turned on, the current event of stream A has been recorded in the event center when the stream A is triggered. There are two cases:

The relevant data of stream B can be obtained from the event center, which means that during the period from the time when the current event of stream A is recorded in the event center to the time when the current event of stream B is run to the time when the relevant data of stream B is read, stream B has completed the recording of the event center, and the data at this time has been complete.

If the relevant data of stream B cannot be obtained from the event center, the event center watermark mechanism indicates that the relevant events of stream B have not been triggered at this time. Since the current event of stream A has been written to the event center, the current event data of stream A must be available when the related events of stream B are triggered, and the data is complete at this time. Therefore, through the event center watermarking mechanism, it can be ensured that after replacing the stream join with "fetching", there will be at least one calculation with complete data.

Triggered message reception is completed through message forwarding. When the external system sends a request, it will forward Kafka, and then Kafka's data will enter the event center at the same time. Next, it will trigger the corresponding calculation. Finally, it will use the message queue to send the calculation results, and the external system will receive the results of this message.

At the same time, it also provides polling services and message forwarding. The previous mechanism is the same as the message receiving mechanism, except that an event center is added to re store the calculation results and then provide services.

3.3 Access consistency

Another problem in data retrieval is that it is not possible to obtain the latest data unless the data is directly obtained from the metabase, but this operation is generally prohibited because it will bring pressure on the main database. In order to ensure the consistency of data, we have taken some measures. First of all, we divide the consistency into four levels:

Final consistency: the updated data can be accessed after a period of time, and the entire streaming batch integration scheme is guaranteed to be final consistency by default.

Trigger flow strong consistency (can be delayed): ensure that the current data in the trigger flow can be obtained in the process of fetching the trigger flow as early as the current data. Using the watermark scheme, delay when the watermark is not satisfied.

Strong consistency of data retrieval (can be delayed): ensure that data earlier than the user's time requirements can be obtained. Using the watermark scheme, delay when the watermark is not satisfied.

Strong consistency of data retrieval (no delay): ensure that data earlier than the user's time requirements can be obtained. When the watermark is not satisfied, it is directly supplemented from the data source incrementally. Incremental data retrieval will bring pressure on the data source.

3.4 Integrated Streaming and Batch Operation

We use PyFlink to implement streaming and batching operations. Python is used because model and policy developers are more familiar with Python, and Flink ensures logical consistency. Based on PyFlink, we encapsulate complex trigger logic, complex access logic, and can reuse code fragments.

The code organization structure of PyFlink is shown in the figure, including three parts: starting, main logic and output. You don't need to implement these three parts yourself, just select the encapsulated output.

Flink's overall data flow is also simple. At the top is the trigger logic, and then the main logic is triggered. There will be access logic in the main logic to complete the access, and finally the output logic. Here, the underlying encapsulation of trigger logic, fetching logic and output logic is adaptive to changes in the flow batch, so it can ensure that the input and output are unchanged at the same time. In most cases, the logic itself does not need to consider changes in the flow batch environment.

Here is a typical usage process of PyFlink. First, select the trigger flow, write the fetching and preprocessing logic, import the published fetching or processing logic code, set the sampling logic and test run, obtain the test run results, and further analyze and train in the analysis platform. When you want to publish the model after the training, you can select the model that has been trained in the job. If necessary, you can set the initialization related parameters. Finally, the model is released online.



4、 Model Policy Call Scheme

We provide four call schemes:

Feature storage service scheme. Flink job performs pre calculation, writes the calculation results to the feature storage service platform, and provides external services through the data service platform.

Interface triggering -- polling scheme. Call and poll the event center message forwarding interface until the Flink job returns the calculation result.

Interface triggering -- message receiving scheme. Call the event center message forwarding interface to trigger Flink job operation and receive the operation result message returned by the Flink job.

Direct message receiving scheme. Directly receive the operation result message returned by Flink job.

4.1 Feature storage service scheme

Feature storage service is divided into three cases, namely real-time, offline modification and offline initialization. When a new variable goes online or the old variable changes logically, the full amount of data needs to be refreshed once. In this case, offline initialization is required. Real time flow is triggered in real time, and offline correction and offline initialization are triggered in batches. If there is a retrieval logic, the data will be retrieved from HBase. Of course, the real-time and offline jobs in the retrieval process are certainly different, but developers need not pay attention to them because they have been encapsulated. The real-time Flink job results will be sent to Kafka, and the results of offline correction and offline initialization will be entered into EMR, and finally written into feature storage, namely HBase and Clickhouse.

The above figure shows the timing of the feature storage service scheme. Kafka triggers Flink, and then Flink operations are written as feature storage. When it is triggered, if there is an external call, the latest data cannot be obtained. The updated data cannot be obtained until the operation is completed and written to the storage.

4.2 Interface Triggering - Polling Scheme

In the interface trigger polling scheme, the trigger call will trigger the message forwarding, which will be forwarded to Kafka, and then Flink will deliver the operation results to Kafka. If this time does not exceed the time of a single request, it will return directly. At this time, triggering polling will degenerate into a word call. On the contrary, it will continue to enter the event store HBase and obtain the results through polling calls.

Interface triggering - The timing chart of the polling scheme is shown in the above figure. When an external call is triggered, a message forwarding will trigger the Flink operation. In the process of Flink operation and writing to the database, there will be multiple polls. If it cannot be obtained in a fixed time, it will prompt for timeout; If the data has been written in the next polling, the acquisition is successful.

4.3 Interface Triggering - Message Receiving Scheme

Interface triggering - the message receiving scheme is a simplification of rotation training. If the business system supports message reception, the whole link becomes simpler. You only need to trigger the calculation through the message forwarding service, and then listen to the result message.

Interface triggering - The timing of the message receiving scheme is serial. After triggering, the Flink operation is performed. After the operation, the result data is transmitted to the caller through the message receiving mechanism.

4.4 Direct message receiving scheme

The direct message receiving scheme is pure streaming. Kafka triggers Flink calculation. After the calculation, the data is transferred to the message queue and the other party subscribes to receive it. The whole timing is also very simple, as shown in the figure below.

We divide the use of data into three categories: instant call, real-time stream, and offline batch data. Their timeliness decreases in turn. We register the three cases together through the event center. Finally, we only need to provide Flink with relevant data through the event center as a transit. For Flink, we don't need to care about the way in which it is invoked.

Finally, summarize the four schemes of integration of obscenity, batch and call:

• Feature storage service scheme: provide persistent feature storage through feature storage service. API spot checking and feature polling services are provided.

• Interface triggering - polling scheme: provide synchronous call calculation services through the message forwarding and message query services of the event center.

• Interface triggering - message receiving scheme: provide event message service through the message forwarding service of the event center.

• Direct message receiving scheme: support complex event triggering and provide event messages

Related Articles

Explore More Special Offers

  1. Short Message Service(SMS) & Mail Service

    50,000 email package starts as low as USD 1.99, 120 short messages start at only USD 1.00

phone Contact Us