本文简单介绍如何使用 Spark RDD API 开发一个离线作业消费 LogService 数据。
示例代码
## TestBatchLoghub.Scala
object TestBatchLoghub {
def main(args: Array[String]): Unit = {
if (args.length < 6) {
System.err.println(
"""Usage: TestBatchLoghub <sls project> <sls logstore> <sls endpoint>
| <access key id> <access key secret> <output path> <start time> <end time=now>
""".stripMargin)
System.exit(1)
}
val loghubProject = args(0)
val logStore = args(1)
val endpoint = args(2)
val accessKeyId = args(3)
val accessKeySecret = args(4)
val outputPath = args(5)
val startTime = args(6).toLong
val sc = new SparkContext(new SparkConf().setAppName("test batch loghub"))
var rdd:JavaRDD[String] = null
if (args.length > 7) {
rdd = LoghubUtils.createRDD(sc, loghubProject, logStore, accessKeyId, accessKeySecret, endpoint, startTime, args(7).toLong)
} else {
rdd = LoghubUtils.createRDD(sc, loghubProject, logStore, accessKeyId, accessKeySecret, endpoint, startTime)
}
rdd.saveAsTextFile(outputPath)
}
}
说明 Maven pom 文件可以参见aliyun-emapreduce-demo。
编译运行
## 编译命令
mvn clean package -DskipTests
## 编译完后,作业jar包位于target/shaded/下
## 提交执行
spark-submit --master yarn-cluster --executor-cores 2 --executor-memory 1g --driver-memory 1g
--num-executors 2--class x.x.x.TestBatchLoghub xxx.jar <sls project> <sls logstore>
<sls endpoint> <access key id> <access key secret> <output path> <start time> [<end time=now>]
注意
- x.x.x.TestBatchLoghub 和 xxx.jar 需要替换成真实的类路径和包路径。
- 作业资源需要根据实际数据规模和实际集群规模调整,如果集群太小,直接运行以上命令可能无法执行。