This topic describes how to call Spark RDD API operations to develop an offline job to consume Log Service data.

Sample code

## TestBatchLoghub.Scala

object TestBatchLoghub {
  def main(args: Array[String]): Unit = {
    if (args.length < 6) {
      System.err.println(
        """Usage: TestBatchLoghub <sls project> <sls logstore> <sls endpoint>
          |  <access key id> <access key secret> <output path> <start time> <end time=now>
        """.stripMargin)
      System.exit(1)
    }

    val loghubProject = args(0)
    val logStore = args(1)
    val endpoint = args(2)
    val accessKeyId = args(3)
    val accessKeySecret = args(4)
    val outputPath = args(5)
    val startTime = args(6).toLong

    val sc = new SparkContext(new SparkConf().setAppName("test batch loghub"))
    var rdd:JavaRDD[String] = null
    if (args.length > 7) {
      rdd = LoghubUtils.createRDD(sc, loghubProject, logStore, accessKeyId, accessKeySecret, endpoint, startTime, args(7).toLong)
    } else {
      rdd = LoghubUtils.createRDD(sc, loghubProject, logStore, accessKeyId, accessKeySecret, endpoint, startTime)
    }

    rdd.saveAsTextFile(outputPath)
  }
}
Note For more information about the Maven project object model (POM) file, see aliyun-emapreduce-demo.

Compilation and running

## Compile and run a command.
mvn clean package -DskipTests

## After the compiled command is run, the JAR file of the job is stored in the target/shaded/ directory.

## Submit and run the job.

spark-submit --master yarn-cluster --executor-cores 2 --executor-memory 1g --driver-memory 1g 
--num-executors 2--class x.x.x.TestBatchLoghub xxx.jar <sls project> <sls logstore> 
<sls endpoint> <access key id> <access key secret> <output path> <start time> [<end time=now>]
Notice
  • You need to specify the classpath and package path based on the actual situation in the format of x.x.x.TestBatchLoghub and xxx.jar.
  • You need to adjust the job resources based on the actual data size and cluster scale. If the cluster is too small, you may fail to run the job.