This topic describes how to run a Spark MLlib job on the serverless Spark engine in DLA.
Scenario
This example classifies the following data into two clusters by using the k-means clustering algorithm in the serverless Spark engine of DLA and checks whether the test data belongs to these clusters.
0.0 0.0 0.0
0.1 0.1 0.1
0.2 0.2 0.2
9.0 9.0 9.0
9.1 9.1 9.1
9.2 9.2 9.2
Prerequisites
The test data is uploaded to OSS and stored in the rawdata.csv file.
Procedure
Log on to the DLA console.
In the top navigation bar, select the region where DLA is deployed.
In the left-side navigation pane, choose Serverless Spark > Submit job.
On the Parameter Configuration page, click Create Job.
In the Create Job dialog box, specify the parameters as required.
Click OK.
Unfold Job List, click the Spark job name, and then enter the code of the Spark MLlib job in the Spark job editor.
{ "name": "spark-mllib-test", "file": "oss://${your oss bucket}/jars/test/spark-examples-0.0.1-SNAPSHOT.jar", "className": "com.aliyun.spark.SparkMLlib", "args": ["oss://${your oss bucket}/data/rawdata.csv"], "conf": { "spark.driver.resourceSpec": "medium", "spark.executor.instances": 2, "spark.executor.resourceSpec": "medium", "spark.dla.connectors": "oss" } }
Note: If you submit a Spark job as a RAM user, you must specify the spark.dla.roleArn parameter. For more information, see Grant permissions to a RAM user.
Sample code
The following code snippet is the source code for the main class of Spark MLlib.
package com.aliyun.spark
import org.apache.spark.SparkConf
import org.apache.spark.mllib.clustering.KMeans
import org.apache.spark.mllib.linalg.Vectors
import org.apache.spark.sql.SparkSession
object SparkMLlib {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setAppName("Spark MLlib")
val spark = SparkSession
.builder()
.config(conf)
.getOrCreate()
val rawDataPath = args(0)
val data = spark.sparkContext.textFile(rawDataPath)
val parsedData = data.map(s => Vectors.dense(s.split(' ').map(_.toDouble)))
val numClusters = 2
val numIterations = 20
val model = KMeans.train(parsedData, numClusters, numIterations)
for (c <- model.clusterCenters) {
println(s"cluster center: ${c.toString}")
}
val cost = model.computeCost(parsedData)
//Predict data.
println("Vectors 0.2 0.2 0.2 is belongs to clusters:" +
model.predict(Vectors.dense("0.2 0.2 0.2".split(' ').map(_.toDouble))))
println("Vectors 0.25 0.25 0.25 is belongs to clusters:" +
model.predict(Vectors.dense("0.25 0.25 0.25".split(' ').map(_.toDouble))))
println("Vectors 8 8 8 is belongs to clusters:" +
model.predict(Vectors.dense("8 8 8".split(' ').map(_.toDouble))))
}
}
In the following execution result, the first two rows belong to cluster 0, and the last row belongs to cluster 1.
Vectors 0.2 0.2 0.2 is belongs to clusters:0
Vectors 0.25 0.25 0.25 is belongs to clusters:0
Vectors 8 8 8 is belongs to clusters:1