This topic describes how to use XGBoost to perform distributed training in an E-MapReduce (EMR) Data Science cluster. You can adapt your data based on the example in this topic and modify the provided sample code for custom modeling.

Prerequisites

  • Development tools
    • Java Development Kit (JDK) 8 is installed on your on-premises machine.
    • Maven 3.x is installed on your on-premises machine.
    • An integrated development environment (IDE) for Java or Scala is installed on your on-premises machine. We recommend that you use IntelliJ IDEA. The JDK and Maven are configured.
  • An EMR Data Science cluster is created. For more information, see Create a cluster.
  • The dsdemo code is downloaded. To obtain the dsdemo code, join the DingTalk group numbered 32497587.

Background information

Distributed training is based on Spark on YARN. The sample code uses the Scala language.

Sample code

The following sample code shows the content of the SparkTraining.java file in the IDE:

object SparkTraining {

  def main(args: Array[String]): Unit = {
    if (args.length < 1) {
      // scalastyle:off
      println("Usage: program input_path")
      sys.exit(1)
    }
    val spark = SparkSession.builder().getOrCreate()
    val inputPath = args(0)
    val schema = new StructType(Array(
      StructField("sepal length", DoubleType, true),
      StructField("sepal width", DoubleType, true),
      StructField("petal length", DoubleType, true),
      StructField("petal width", DoubleType, true),
      StructField("class", StringType, true)))
    val rawInput = spark.read.schema(schema).csv(inputPath)

    // transform class to index to make xgboost happy
    val stringIndexer = new StringIndexer()
      .setInputCol("class")
      .setOutputCol("classIndex")
      .fit(rawInput)
    val labelTransformed = stringIndexer.transform(rawInput).drop("class")
    // compose all feature columns as vector
    val vectorAssembler = new VectorAssembler().
      setInputCols(Array("sepal length", "sepal width", "petal length", "petal width")).
      setOutputCol("features")
    val xgbInput = vectorAssembler.transform(labelTransformed).select("features",
      "classIndex")

    val Array(train, eval1, eval2, test) = xgbInput.randomSplit(Array(0.6, 0.2, 0.1, 0.1))

    /**
     * setup  "timeout_request_workers" -> 60000L to make this application if it cannot get enough resources
     * to get 2 workers within 60000 ms
     *
     * setup "checkpoint_path" -> "/checkpoints" and "checkpoint_interval" -> 2 to save checkpoint for every
     * two iterations
     */
    val xgbParam = Map("eta" -> 0.1f,
      "max_depth" -> 2,
      "objective" -> "multi:softprob",
      "num_class" -> 3,
      "num_round" -> 100,
      "num_workers" -> 2,
      "eval_sets" -> Map("eval1" -> eval1, "eval2" -> eval2))
    val xgbClassifier = new XGBoostClassifier(xgbParam).
      setFeaturesCol("features").
      setLabelCol("classIndex")
    val xgbClassificationModel = xgbClassifier.fit(train)
    val results = xgbClassificationModel.transform(test)
    results.show()
  }
}

Run code

You can run the code in the following modes:
  • Local mode
    #!/bin/sh
    hadoop fs -put -f iris.csv hdfs://emr-header-1:9000/
    spark-submit --master 'local[8]' \
    --class ml.dmlc.xgboost4j.scala.example.spark.SparkTraining xgboosttraining-0.1-SNAPSHOT.jar \
    hdfs://emr-header-1:9000/iris.csv
  • Distributed YARN-cluster mode
    #!/bin/sh
    hadoop fs -put -f iris.csv hdfs://emr-header-1:9000/
    spark-submit --master yarn-cluster \
    --class ml.dmlc.xgboost4j.scala.example.spark.SparkTraining xgboosttraining-0.1-SNAPSHOT.jar \
    hdfs://emr-header-1:9000/iris.csv