本ページでは、Spark RDD API 操作を呼び出して、Log Service データを利用するオフラインジョブを開発する方法について説明します。
サンプルコード
## TestBatchLoghub.Scala
object TestBatchLoghub {
def main(args: Array[String]): Unit = {
if (args.length < 6) {
System.err.println(
"""Usage: TestBatchLoghub <sls project> <sls logstore> <sls endpoint>
| <access key id> <access key secret> <output path> <start time> <end time=now>
""".stripMargin)
System.exit(1)
}
val loghubProject = args(0)
val logStore = args(1)
val endpoint = args(2)
val accessKeyId = args(3)
val accessKeySecret = args(4)
val outputPath = args(5)
val startTime = args(6).toLong
val sc = new SparkContext(new SparkConf().setAppName("test batch loghub"))
var rdd:JavaRDD[String] = null
if (args.length > 7) {
rdd = LoghubUtils.createRDD(sc, loghubProject, logStore, accessKeyId, accessKeySecret, endpoint, startTime, args(7).toLong)
} else {
rdd = LoghubUtils.createRDD(sc, loghubProject, logStore, accessKeyId, accessKeySecret, endpoint, startTime)
}
rdd.saveAsTextFile(outputPath)
}
}
注 Maven プロジェクトオブジェクトモデル (POM) ファイルの詳細については、「 aliyun-emapreduce-demo」 をご参照ください。
コンパイルと実行
## Compile and run a command.
mvn clean package -DskipTests
## After the compiled command is run, the JAR file of the job is stored in the target/shaded/ directory.
## Submit and run the job.
spark-submit --master yarn-cluster --executor-cores 2 --executor-memory 1g --driver-memory 1g
--num-executors 2--class x.x.x.TestBatchLoghub xxx.jar <sls project> <sls logstore>
<sls endpoint> <access key id> <access key secret> <output path> <start time> [<end time=now>]
重要
- 実際の状況に基づいて、クラスパスとパッケージパスを x.x.x.TestBatchLoghub と xxx.jar 形式で指定する必要があります。
- 実際のデータサイズとクラスタースケールに基づいてジョブリソースを調整する必要があります。 クラスターが小さすぎると、ジョブの実行が失敗する可能性があります。