How to implement an algorithm?

1: Background

In the cloud computing environment, the requirements of virtual machine load balancing, automatic scaling, green energy saving, and host machine upgrade make us need to use virtual machine (VM) migration technology, especially virtual machine live migration technology, for down time (downtime) The requirements are relatively high, and the shorter the downtime, the shorter the customer business interruption time and the smaller the impact. If the future workload trend of the VM can be predicted based on the historical workload of the VM, the most suitable time window can be found to complete the virtual machine live migration operation.

So we began to explore how to use machine learning algorithms to predict the load of ECS virtual machines and the downtime of live migration. However, for machine learning algorithms to play a role in the production environment, many supporting systems are needed to support them. In order to quickly implement existing algorithms in the actual production environment, and use GPU acceleration to achieve large-scale computing, we built a GPU-accelerated large-scale distributed machine learning system, named Xiaozhuge, as the ECS data center platform Heterogeneous machine learning algorithm acceleration engine. Xiaozhuge equipped with the above algorithms has been launched in the production environment, supporting the large-scale hot migration prediction of virtual machines on the scale of Alibaba Cloud's entire network.

2: Options
So what components are needed for a complete large-scale distributed system machine learning system?

1 Overall Architecture
With such a large number of virtual machines in the entire Alibaba Cloud network, in order to complete the prediction within 24 hours, it is necessary to optimize every link of the end-to-end process. So this must be a complex engineering implementation. In order to build this platform efficiently, a lot of existing products and services on Alibaba Cloud are used to build it.

The whole platform includes: Web service, MQ message queue, Redis database, SLS/MaxComputer/HybridDB data acquisition, upload and download of OSS model warehouse, GPU cloud server, DASK distributed framework, RAPIDS acceleration library.

1) Architecture

The figure below is the overall architecture of Xiao Zhuge.

Xiaozhuge is an end-to-end GPU-accelerated machine learning platform based on RAPIDS+DASK. The entire platform is built based on products and services on Alibaba Cloud.

We provide a Tengine+Flask-based web service on the front end to accept data calculation requests sent by clients, and use message queues to decouple from the large-scale computing clusters on the back end.

The Dask distributed framework provides data preparation, model training, and management and scheduling of predicted computing nodes. At the same time, we use Alibaba Cloud's MaxComputer to process offline data during the training phase, and use real-time computing engines such as Blink to process online data during the prediction phase. For processing, use the HybridDB analytical database to store processed online data for real-time predictive data fetching, use Alibaba Cloud's object storage service OSS to obtain training data and save training models, and use GPU cloud servers to accelerate machine learning operations.

2) Design Thinking

Let's talk about the core design thinking of the platform.

One is the use of distributed message queues:

First of all, it can realize the decoupling of the front-end business platform and the back-end computing system, and realize asynchronous business processing.
It can also achieve high concurrency: enabling the system to support high concurrent reading and writing on a scale of more than one million.
In addition, if the back-end system fails, messages can be accumulated in the queue without being lost. After the back-end system recovers, requests can continue to be processed to meet high availability.
The consumers of the message queue can be multiple sets of computing systems, and multiple sets of systems can be upgraded in rotation without affecting the front-end business and achieving high scalability.
The other is the design of a GPU-accelerated distributed parallel computing backend:

The computing resource is Alibaba Cloud's GPU cloud server. For the distributed parallel computing framework, I chose the lightweight DASK, which is easier to use and more flexible, can write parallel computing code with a higher degree of freedom, and can be well combined with the GPU machine learning acceleration library RAPIDS.
At the same time, through data links with multiple data warehouses such as MaxComputer and HybridDB, an end-to-end computing platform from data preparation, offline training to online prediction has been realized.
We have done a lot of evaluation and corresponding optimization design work on the selection of data warehouses. MaxComputer is used for offline training data warehouses due to its poor real-time performance. SLS has good real-time performance but is not suitable for large-scale concurrent access. For real-time prediction of its data read The retrieval performance cannot meet the demand, so Cstore (HybridDB for MySQL, now upgraded to AnalyticDB) with better performance and concurrency scale is selected for real-time forecasting.
The construction of the entire platform involves the cooperation of multiple internal business teams, and the analysis of business needs determines the final algorithm, as well as the determination of solutions in terms of data ETL and data source performance and stability, and how the prediction results are applied to hot migration tasks The implementation plan was determined, and an end-to-end platform was finally realized to achieve the business goal.

2 message queue
The message queue uses Alibaba Cloud's RocketMQ.

The use of message queues needs to consider the following issues:

1) Message idempotence

It is used to solve the problem of message duplication during delivery.

In the message consumption scenario, the message has been delivered to the consumer and the business processing has been completed. When the client responds to the server, the network is disconnected. In order to ensure that the message is consumed at least once, the message queue RocketMQ server will try to deliver the previously processed message again after the network is restored, and the consumer will subsequently receive two messages with the same content and the same Message ID.

We use the Redis database to record the Message Key of each message for idempotency. If a duplicate message is found during consumption, it will be discarded to prevent the message from being repeatedly consumed and executed.

2) Whether the messages are unordered or sequential

For the batch message processing predicted by the whole network, the order of messages does not need to be considered, so in order to ensure the processing performance of messages, we choose out-of-order messages.

3 Data processing and data platform selection
Data is the foundation of everything. First, it is necessary to solve the storage, analysis and processing of massive data.

The data we need to process can be of different kinds as follows:
Real-time data and non-real-time data
formatted and unformatted data
Data that needs to be indexed and data that only needs to be calculated
Full data and sample data
Visualization data and alarm data
Each category corresponds to one or more data processing, analysis and storage methods.

Multiple dimensions and multiple data sources are another consideration. For relatively complex business scenarios, data of different dimensions are often required (for example, we use real-time CPU utilization data for hot migration prediction, as well as data of other dimensions such as virtual machine specifications) to be considered together. Similarly, there is not only one type of data generated in a load scenario, and not all data is processed and stored in a unified way, so multiple different data sources are often used in practice.

The massive data on the public cloud has reached the level of TB or PB, and the traditional data storage methods can no longer meet the demand. Therefore, the Hadoop ecosystem was born for the storage of big data. The traditional system data storage method will bring a series of problems after the amount of data reaches a certain scale: performance problems, cost problems, single point of failure problems, data accuracy problems and so on. Instead, it is a distributed storage system represented by HDFS.

In addition to the problem of data storage, real-time data collection is also very important. Business systems have their own real-time logs, and log collection tools are deployed together with business services. In order not to preempt resources from online services, log collection must strictly control the occupied resources. At the same time, log cleaning and analysis operations cannot be performed at the collection end, but should be collected in one place for processing.

As far as the data warehouse we use is concerned, we initially chose ODPS (MaxCompute, similar to the open source Hadoop system) and SLS (Alibaba Cloud's log service). ODPS can be used as an offline data warehouse to store massive historical data, while SLS stores massive real-time monitoring data (such as the CPU utilization data of the ECS virtual machine we use).

However, there will be too much data and information overload will occur, so it is often necessary to aggregate the data before using it (for example, our CPU utilization prediction is based on the original minute-level sampling data for 5-minute average and 1-hour average. polymerization). Because we found that the aggregation calculation that comes with SLS is very slow due to the large amount of calculation, which cannot meet the actual calculation needs. Therefore, the data center uses the real-time computing platform Blink to store the aggregated data in the new SLS warehouse for our actual computing use. Blink is a custom-developed and optimized flow computing platform within the group based on Flink, an open source real-time computing flow processing platform of Apache.

During large-scale online forecasting, we also found that SLS cannot satisfy high-concurrency and low-latency forecasting data pulls at all. Often, the delay in forecasting is greatly increased because the data cannot be pulled in the queue or the pull speed is too slow. After evaluation After the test, we chose the Cstore data warehouse provided by the ECS data center to store the aggregated data, and pulled the data required for prediction from Cstore, thus solving the problem of pulling high-concurrency and low-latency prediction data.

4 Construction of GPU-accelerated distributed parallel computing backend
The core of the entire distributed parallel computing backend is the selection of parallel computing framework and GPU acceleration.

1) Frame selection

In the selection of the distributed parallel computing framework, there are some considerations as follows. SPARK is currently the mainstream distributed parallel computing framework for big data computing, but it is limited by the performance and cost of the CPU and the SPARK task cannot obtain GPU acceleration (the Xiaozhuge At that time, SPARK did not provide comprehensive support for GPU acceleration, and the SPARK 3.0 preview version released later has provided support for GPU acceleration. To meet the needs of large-scale forecasting across the entire network, we chose DASK, a lightweight distributed computing framework, combined with the GPU acceleration library RAPIDS to accelerate our algorithm on the GPU cloud server.

We use the characteristics of the lazy computing of the DASK parallel framework and the provided code to package and distribute all Dask Worker capabilities, distribute the Worker execution code to each Worker node through the Dask Scheduler, and pass the Dask Client after the back-end message queue consumer receives the calculation task Submit the execution tasks to the Dask Scheduler, and the Dask Scheduler is responsible for scheduling the computing tasks to the designated Worker nodes to complete the corresponding computing tasks. It can be seen that the architecture and execution process of the Dask framework are very similar to Spark, except that Dask is lighter and is a framework of the Python language ecology, which is suitable for integrating RAPIDS.

According to business needs, we designed the following computing tasks: data preparation tasks, training tasks, prediction tasks, and configured corresponding Dask Workers for different tasks to complete corresponding calculations. The message queue corresponding to this has also designed a corresponding message topic, and the Web Server has also designed a corresponding unified HTTP message format.

Workers for training and computing tasks are deployed on GPU servers, while data preparation phases currently without GPU acceleration are deployed on CPU servers.

For each task, a rich selection of parameters is designed, which can flexibly support prediction targets (cluster dimension, NC dimension, VM dimension), algorithm models (ARIMA, LSTM, XGBoost, etc.), algorithm tasks (regression tasks, classification tasks, etc.), etc. different computing tasks.

The computing backend is connected with multiple data sources to realize the automatic pull of offline training data (ODPS) and online prediction data (CStore), and the automatic saving and pulling of models (OSS), forming a closed-loop end-to-end computing platform .

2) GPU acceleration

In order to improve computing efficiency, we use the RAPIDS acceleration library to achieve GPU acceleration of the core algorithm.

RAPIDS stands for Real-time Acceleration Platform for Integrated Data Science. As the name suggests, RAPIDS is a GPU-accelerated library for data science and machine learning. With RAPIDS, we can use GPUs to accelerate big data and machine learning algorithms.

During the course of the project, we made a lot of optimizations to the algorithm and computing task process, and finally only used 8 small-sized GPU servers to realize the prediction task that originally required 50 + large-sized CPU servers (2000+ vCPUs) to complete , the cost has dropped significantly to 1/10 of the previous one.

5 Model update and evaluation release system
A complete machine learning platform also needs to provide an automatic offline training system and a model evaluation and release system.

Xiaozhuge currently running online has realized automatic online real-time prediction, but the evaluation, update and release of the model have not yet been fully automated, which is currently being supplemented and improved.

At present, Xiaozhuge has provided the automatic generation and collection of online test evaluation data. In the future, combined with the automated model evaluation system and model release system, it will be able to realize the full process automation in the true sense.

3: Summary

In the context of cloud native, more and more business systems choose to build their own business platforms on the cloud. With the help of the complete technical ecology of the public cloud, it is not so difficult to build an enterprise-level platform that can be used in a production environment.

At the same time, through the Xiaozhuge platform, the project of GPU-accelerated machine learning has been implemented. From the perspective of actual business results, it has also proved the huge value potential of GPU in the field of accelerated data science.

Related Articles

Explore More Special Offers

  1. Short Message Service(SMS) & Mail Service

    50,000 email package starts as low as USD 1.99, 120 short messages start at only USD 1.00

phone Contact Us