All Products
Search
Document Center

Spark MLlib

Last Updated: May 29, 2020

This topic describes how to run a Spark MLlib task in Data Lake Analytics (DLA) serverless Spark.

Scenario

This example divides the following data into two clusters by using the k-means clustering in DLA serverless Spark and checks whether the test data belongs to these clusters.

  1. 0.0 0.0 0.0
  2. 0.1 0.1 0.1
  3. 0.2 0.2 0.2
  4. 9.0 9.0 9.0
  5. 9.1 9.1 9.1
  6. 9.2 9.2 9.2

Prerequisites

MLlib is short for Machine Learning Library. Make sure that the following prerequisites are met before you run a Spark MLlib task in DLA serverless Spark.

Procedure

  1. Log on to the DLA console.

  2. In the top navigation bar, select the region where DLA is deployed.

  3. In the left-side navigation pane, choose Serverless Spark > Submit job.

  4. On the Parameter Configuration page, click Create Job.

  5. In the Create Job dialog box, specify the parameters as required.

    Parameter Description
    File Name The name of the folder or file.
    Data Format The type of data that you want to access. Valid values: File and Folder.
    Parent The parent directory of the file or folder.
    • The job list is the root directory, and all jobs must be created in the list.
    • You can first create a folder in the job list, and then create jobs in the folder. Alternatively, you can create jobs in the root directory.
  6. After you set the parameters, click OK.

  7. Click the Spark job name and enter the Spark MLlib task content in the Spark job edit box.

    1. {
    2. "name": "spark-mllib-test",
    3. "file": "oss://${your oss bucket}/jars/test/spark-examples-0.0.1-SNAPSHOT.jar",
    4. "className": "com.aliyun.spark.SparkMLlib",
    5. "args": ["oss://${your oss bucket}/data/rawdata.csv"],
    6. "conf": {
    7. "spark.driver.resourceSpec": "medium",
    8. "spark.executor.instances": 2,
    9. "spark.executor.resourceSpec": "medium"
    10. }
    11. }
    Parameter Description
    args The parameters that you want to specify for the Spark MLlib task. Separate multiple parameters with commas (,).

    In the preceding example, the value of args is the storage path of the rawdata.csv file in OSS.

    name The name of the Spark MLlib task.
    file The storage path of the JAR packages that are used to run the Spark MLlib task.

    Note: The JAR packages on which a Spark MLlib task depends must be stored in OSS.

    className</ td> The entry class or main class. In this topic, com.aliyun.spark.SparkMLlib is used. For more information, see Sample code.
    conf
    • "spark.driver.resourceSpec":"medium" indicates that the specification of the driver is medium with 2 vCPUs and 8 GB of memory.
    • "spark.executor.resourceSpec":"medium" indicates that the specification of the executor is medium with 2 vCPUs and 8 GB of memory.

    If you do not specify the conf parameter, the default values of parameters for creating the virtual cluster are used.

  8. After the Spark MLlib task is created, click Execute. You can check the execution status in real time.

    • STARTING: The task is being submitted.
    • RUNNING: The task is running.
    • SUCCESS: The task is executed.
    • DEAD: An error occurs during the task execution. You can view the log to troubleshoot the error.
    • KILLED: The task is manually killed.

Sample code

The following code is the source code for the main class of Spark MLlib.

  1. package com.aliyun.spark
  2. import org.apache.spark.SparkConf
  3. import org.apache.spark.mllib.clustering.KMeans
  4. import org.apache.spark.mllib.linalg.Vectors
  5. import org.apache.spark.sql.SparkSession
  6. object SparkMLlib {
  7. def main(args: Array[String]): Unit = {
  8. val conf = new SparkConf().setAppName("Spark MLlib")
  9. val spark = SparkSession
  10. .builder()
  11. .config(conf)
  12. .getOrCreate()
  13. val rawDataPath = args (0)
  14. val data = spark.sparkContext.textFile(rawDataPath)
  15. val parsedData = data.map(s => Vectors.dense(s.split(' ').map(_.toDouble)))
  16. val numClusters = 2
  17. val numIterations = 20
  18. val model = KMeans.train(parsedData, numClusters, numIterations)
  19. for (c <- model.clusterCenters) {
  20. println(s"cluster center: ${c.toString}")
  21. }
  22. val cost = model.computeCost(parsedData)
  23. //Predict data.
  24. println("Vectors 0.2 0.2 0.2 is belongs to clusters:" +
  25. model.predict(Vectors.dense("0.2 0.2 0.2".split(' ').map(_.toDouble))))
  26. println("Vectors 0.25 0.25 0.25 is belongs to clusters:" +
  27. model.predict(Vectors.dense("0.25 0.25 0.25".split(' ').map(_.toDouble))))
  28. println("Vectors 8 8 8 is belongs to clusters:" +
  29. model.predict(Vectors.dense("8 8 8".split(' ').map(_.toDouble))))
  30. }
  31. }

The execution result of the preceding code is as follows. The first two vectors belong to cluster 0 and the last vector belongs to cluster 1.

  1. Vectors 0.2 0.2 0.2 is belongs to clusters:0
  2. Vectors 0.25 0.25 0.25 is belongs to clusters:0
  3. Vectors 8 8 8 is belongs to clusters:1