The optimization practice of Spark on k8s in Alibaba Cloud EMR

Basic concepts and features of park on K8s

First, I'd like to share some background of Spark on K8s.

1. Spark's cluster deployment mode

Spark now supports four deployment modes:

• Standalone: Spark's built-in scheduler is generally used in the test environment. Because the scheduling framework of big data is not fully utilized, cluster resources cannot be fully utilized.

• Hadoop YARN: The most common way, originated from Hadoop, has a good community ecology.

• Apache Mesos: Similar to YARN, it is also a resource management framework, which has gradually withdrawn from the historical stage.

• Kubernetes: Spark on K8s, Spark3.1.1 officially provides available support for this deployment mode, and more and more users are also actively trying to do this.

The advantages of using Spark on K8s are as follows:

• Improve resource utilization: It is not necessary to deploy multiple clusters according to the usage scenario. All Spark jobs share cluster resources, which can improve the overall cluster utilization. In addition, when used in the cloud, container instances can be elastic, and pay as you go.

• Unified operation and maintenance mode: The community ecology and tools of K8s can be used to uniformly maintain clusters and reduce the operation and maintenance costs brought by cluster switching.

• Containerization: Through container image management, improve the portability of Spark tasks, avoid version conflicts caused by different versions of Sparks, and support multiple versions of A/B Tests.

In particular, according to our test, the performance gap between Spark on K8s and Spark on YARN is negligible under the same cluster resource conditions. In addition, making full use of the elastic resources of Spark on K8s can better speed up Spark operations.

In conclusion, compared with the Spark on YARN model, the advantages of the Spark on K8s model outweigh the disadvantages.

2. Spark on K8s deployment architecture

In the current environment, there are two ways to submit Spark jobs to K8s:

• Use native spark submit

In this way, K8s clusters do not need to install components in advance. Like the current YARN submission method, the client side submitting jobs needs to install the Spark environment and configure kubectl, which is a tool for connecting K8s clusters. Then, mark the K8s cluster address and the Spark image address used in the submission command.

The above figure shows in detail the task running process of submitting tasks to K8S using the native spark submit. After the user executes the spark submit command on the client side, a process will be started locally. The process will connect to the K8S API server and request to create a Driver Pod. The Driver Pod will start the Spark Context during the startup process and be responsible for applying for the Executor Pod. After the task is completed, the Driver Pod will be responsible for cleaning the Executor Pod. However, the Driver Pod will be retained after completion for log or status viewing, which requires manual cleaning.

advantage:

This submission method conforms to users' usage habits, reduces users' learning costs, and has better integration with existing big data platforms. Because it is submitted in the Client mode, it supports local dependency and the Spark shell interactive operation mode.

• Use Spark-on-K8s operator

Spark-on-K8s operator is an open-source component of Google. A resident pod needs to be deployed in K8s cluster in advance to provide related services. Different from the first method, this method is no longer used to submit a command line. Instead, kubectl is used to submit a yaml file to submit a job. In essence, this tool is implemented in the form of spark submit, which is equivalent to that the information in the command line is submitted in a file format. However, Spark-on-K8s operator has made some auxiliary tools based on the first method, including scheduled scheduling, monitoring, job management, etc.

In terms of process, when the user submits a yaml file, the Spark on K8s operator resident in the K8s cluster listens to the event and starts a spark task by parsing the file and executing the Spark submit command.

In addition to different submission methods, we just mentioned that this tool provides some auxiliary functions. The Spark on K8s operator intercepts the Api request of K8s through the Mutating Administration Webhook mechanism of K8s. When the Driver and Executor Pod resources are started, they can be customized. On the other hand, the tool can monitor the events of the Driver and the Executor Pod to track and manage the task execution progress.

advantage:

The existence of tools supports job management, including record, retry, and scheduled execution. It provides job monitoring indicators, and can also interface with Prometheus for unified monitoring. It supports automatic cleaning of job resources, and can also automatically configure service/express of Spark UI.

3. Community progress of Spark on K8s

Before Spark 2.3, some people tried to support Spark on K8s by deploying YARN on K8s, but in essence, Spark is still under the control of YARN resources, so it cannot be called Spark on K8s in the full sense.

Spark2.3, the first time that the community released support for the native Spark on K8s, is the first time that it officially supports such a deployment mode.

Spark2.4 has made a few feature optimizations. The function is really improved in Spark3, especially when Spark3.1 is officially available (GA). At present, the Spark on K8s direction is very popular, so if interested students suggest upgrading to Spark3.1 directly to try this deployment method.

4. Key Features of Spark on K8s

• Optimize Spark Pod configuration properties

The Pod definition of K8S usually adopts Yaml description processing. Early Driver and Executor Pod definitions can only be configured through Spark Conf, which is very flexible. After all, not all configurations can be processed through Spark Conf. Starting from Spark3.0, template files are supported. The user can create a template file, define the attributes of the Pod, and then transfer them through the Spark configuration. Compared with single configuration, it is more convenient and flexible.

• Dynamic Allocation

In Spark2, dynamic resource allocation can only use the External Shuffle Service (ESS) mode. In this mode, all the shuffle data generated by the executor during execution is taken over by the ESS service, and the executor can recycle at any time after execution. However, this mode is generally started and managed by YARN's Node Manager, which is difficult to deploy on K8s.

Spark3 supports the feature of Shuffle Tracking, that is, it can use its own management of executor to achieve the effect of dynamic resource configuration without ESS. However, the disadvantage of this method is that the executor cannot dynamically recycle in the shuffle read phase. It still needs to be reserved for the reducer to read the shuffle data, and then it needs to wait until the driver side gc to mark that the executor can be released. The resource release efficiency is low.

• Node decommissioning

In the K8s environment, it is common for nodes to shrink and preemptive instance recycling. In particular, in some scenarios, the priority of some Spark tasks is lowered to meet the use of other high priority tasks. In this scenario, the stage recalculation may occur when the executor exits directly, which prolongs the execution time of Spark. Spark3.1 provides the "elegant offline" feature. It supports that before the "forced" offline of the executor pod, the driver can be notified not to assign new tasks, and the cached data or shuffle files can be migrated to other executor pods to ensure the efficiency of the corresponding Spark tasks and avoid recalculation.

At present, this function is experimental, that is, it is not enabled by default.

• PersistentVolumeClaim reuse

Persistent Volume Claim (pvc for short) is the storage declaration of K8S. Each pod can explicitly apply for mounting. Spark3.1 supports the dynamic creation of PVC, which means that there is no need to declare the application in advance, and resources can be attached dynamically with the implementation of the application. But at this time, the life cycle of PVC is accompanied by the Executor. If the above preemptive closing is forced, the data saved on the PVC will also be lost and recalculated. Therefore, Spark 3.2 supports the reuse of PVC. Its life cycle accompanies the Driver, avoiding re application and calculation, and ensuring the overall efficiency.

Optimization and Best Practices of Spark on K8s in Alibaba Cloud EMR

Next, let's share Alibaba Cloud EMR's optimization and best practices for Spark on K8s.

1. Introduction to Spark on ACK

ACK: Alibaba Cloud Container Service Kubernetes, or ACK for short.

EMR: Alibaba Cloud open-source big data platform E-MapReduce, or EMR for short.

On Alibaba Cloud's public cloud, we have an EMR on ACK product, which includes Spark type clusters, hereinafter referred to as Spark on ACK. Spark on ACK is a semi managed big data platform. First, users need to have their own ACK cluster, that is, the k8s cluster. Then we will create a namespace for Spark jobs in this cluster, and install some fixed components such as the spark operator and the historyserver. Subsequent Spark job pods will also run under this namespace, These Spark job pods can be run using the user's own ACK node machine, or our elastic instance ECI to achieve pay as you go. What is the so-called elastic instance ECI? Let's introduce it in detail.

2. On cloud flexibility advantages

The biggest advantage of Spark on the cloud is better flexibility. In Alibaba Cloud's ACK environment, it provides an elastic container instance ECI product. ECI means that when we apply for a pod, we no longer use the resources of our own machine nodes, but fully use the resources on the cloud to create a pod, which can be quickly pulled up and paid per second. I think it is very cost-effective to use ECI to run Spark jobs, because usually people use Spark jobs to run batch processing tasks. In the early morning peak, there may be only a few queries during the day. This obvious feature of peak and valley is very suitable for fast elasticity and pay as you go. In addition, ECI can use spot preemptive instances, with a one hour protection period, and in combination with Spark's Node decommissioning feature, which can save a lot of costs.

3. RSS optimized shuffle and dynamic resources

Spark Shuffle relies heavily on local storage. However, in the cloud environment, it is difficult for a machine with separate storage to guarantee its own local disk. The size of the cloud disk cannot be estimated, and the cost performance ratio is not high. On the other hand, Spark's native dynamic resource configuration without ESS and the executor's resource release efficiency are low, which may cause resource waste because it cannot be recycled.

Spark Shuffle itself has many disadvantages. The output of Mapper increases, causing the spike to the local disk, causing additional IO; Reducer pulls the data on the Mapper side concurrently, resulting in a large number of random reads, reducing efficiency; In the shuffle process, numMapper * numReducer network connections are generated, which consume too much CPU resources and cause performance and stability problems; When the duplicate of Shuffle data sheet causes data loss, it needs to be recalculated, wasting resources.

Alibaba Cloud provides independently deployed RSS, which is now open source on github and can directly connect to ACKs. Users do not need to pay attention to whether Shuffle data is supported by local disks. The original spark shuffle data is saved on the local disk of the executor. After using RSS, the shuffle data is handed over to RSS for management. In fact, the industry has reached a consensus on the use of push based external shuffle services. Many companies are doing optimization in this regard. There are many advantages. The Executor can be recycled after execution to save resources; RSS also optimizes a large number of traditional random reads into additional writes and sequential reads, which further makes up for the efficiency problem of Spark Shuffle; RSS services support HA deployment and multi copy mode, reduce the possibility of repeated computing, and further ensure the efficiency of Spark tasks.

4. Enhance K8s job level scheduling

The default scheduling granularity of K8s scheduler is Pod, but the default scheduling granularity of traditional Spark tasks is application. The startup of an application will be accompanied by the startup of multiple Pod execution support. Therefore, when a large number of Spark tasks are submitted suddenly, a large number of Driver Pods may be started, and all of them are waiting for the startup of the Executor Pod, resulting in the deadlock of the entire cluster. On the other hand, the multi tenant scenario of K8S is not well supported, nor does it support flexible scheduling between tenants, as well as dynamic quotas. Compared with YARN's scheduling strategy, K8s has a single scheduling strategy, which is the default priority+FIFO mode, and cannot achieve fair scheduling.

Alibaba Cloud ACK has been enhanced in this respect:

• When scheduling, priority should be given to determining whether the resources are sufficient to solve the above possible deadlock problems.

• Multi tenant tree queues are implemented based on NameSpace. The upper and lower limits of resources can be set for queues, which supports preemption of resources between queues.

• The priority queue of Spark jobs is scheduled in App granularity, supporting fairness between queues. Dispatch, and based on the extension of Spark-on-K8s operator, the submitted jobs will automatically enter the queue.

5. Cloud data lake storage and acceleration

• In the K8s environment, compared with the traditional Hadoop cluster, the data lake storage OSS is more suitable for the separation of storage and computing architecture. Spark on ACK has a Jindo SDK built in to seamlessly connect with OSS.

• Fluid can support the cache acceleration in the Spark on K8s deployment mode, and can increase the running speed by about 30% in the TPC-DS scenario.

6. Use DLF to build cloud data lake

If you want to use the components of the Hadoop ecosystem on the K8s, you need to deploy them additionally. However, Spark on ACK is seamlessly connected to Alibaba Cloud DLF (Data Lake Formation). DLF provides unified metadata services, supports permission control and audit, and also provides data warehousing, interactive analysis of Spark SQL, data lake management, storage analysis and cost optimization.

7. Ease of use improvement

Spark on ACK provides a CLI tool, which can directly submit Spark jobs in the Spark submit syntax, and will also be recorded in the Spark operator for management. We mentioned the advantages and disadvantages of the two ways of submitting jobs. The spark operator has good job management capabilities, but the submitted jobs are not compatible with the old command syntax, nor can they run an interactive shell. Users migrating from the old cluster have more trouble making changes. Therefore, using our tool, you can enjoy the advantages of the two ways at the same time, which is a big improvement in user usability.

For log collection, Spark on ACK provides a log collection scheme and allows users to view logs on the interface like Spark on YARN through the HistoryServer.

Related Articles

Explore More Special Offers

  1. Short Message Service(SMS) & Mail Service

    50,000 email package starts as low as USD 1.99, 120 short messages start at only USD 1.00

phone Contact Us