すべてのプロダクト
Search
ドキュメントセンター

E-MapReduce:Spark Streaming を使用してリアルタイムでデータを使用する

最終更新日:Jan 11, 2025

このトピックでは、Spark DataFrame API 操作を呼び出して、ログサービスのデータを使用するストリーミングジョブを開発する方法について説明します。

Scala で Spark Structured Streaming を使用して LogHub にアクセスする

サンプルコード

## StructuredLoghubSample.Scala

object StructuredLoghubSample {
  def main(args: Array[String]) {
    if (args.length < 7) {
      System.err.println("Usage: StructuredLoghubSample <logService-project> " +
        "<logService-store> <access-key-id> <access-key-secret> <endpoint> " +
        "<starting-offsets> <max-offsets-per-trigger>[outputPath] [<checkpoint-location>]")
      System.exit(1)
    }

    val Array(project, logStore, accessKeyId, accessKeySecret, endpoint, startingOffsets, maxOffsetsPerTrigger, outputPath, _*) = args
    val checkpointLocation =
      if (args.length > 8) args(8) else "/tmp/temporary-" + UUID.randomUUID.toString

    val spark = SparkSession
      .builder
      .appName("StructuredLoghubSample")
      .getOrCreate()

    import spark.implicits._

    // 入力ストリームを表す DataSet を loghub から作成します
    val lines = spark
      .readStream
      .format("loghub")
      .option("sls.project", project)
      .option("sls.store", logStore)
      .option("access.key.id", accessKeyId)
      .option("access.key.secret", accessKeySecret)
      .option("endpoint", endpoint)
      .option("startingoffsets", startingOffsets)
      .option("maxOffsetsPerTrigger", maxOffsetsPerTrigger)
      .load()
      .selectExpr("CAST(__value__ AS STRING)")
      .as[String]

    val query = lines.writeStream
      .format("parquet")
      .option("checkpointLocation", checkpointLocation)
      .option("path", outputPath)
      .outputMode("append")
      .trigger(Trigger.ProcessingTime(30000))
      .start()

    query.awaitTermination()

  }
}
説明

Maven プロジェクト・オブジェクト・モデル(POM)ファイルの詳細については、「aliyun-emapreduce-demo」をご参照ください。

コードをコンパイルして実行する

## コードをコンパイルするコマンドを実行します。
mvn clean package -DskipTests

## コードがコンパイルされると、ジョブの JAR ファイルは target ディレクトリに保存されます。

## ジョブを送信して実行します。
spark-submit --master --master yarn --deploy-mode cluster --executor-cores 2 --executor-memory 1g --driver-memory 1g \
--num-executors 2 --jars /opt/apps/SPARK-EXTENSION/spark-extension-current/spark2-emrsdk/emr-datasources_shaded_2.11-2.3.1.jar --class x.x.x.StructuredLoghubSample xxx.jar \
<logService-project> <logService-store> <access-key-id> <access-key-secret> <endpoint> \
<starting-offsets><max-offsets-per-trigger> <output-path> <checkpoint-location>
説明

ジョブリソースの構成は、実際のデータサイズとクラスタ規模に基づいて調整する必要があります。 クラスタの仕様が低い場合、上記のコードのコマンドを実行してもジョブを実行できない可能性があります。

以下の情報は、実際の環境に基づいて置き換えてください。

  • x.x.x.StructuredLoghubSample: x.x.x は、使用環境における StructuredLoghubSample クラスのパッケージ名を示します。

  • xxx.jar: プロジェクトの JAR ファイル。

  • <starting-offsets>: データの取得を開始する位置。 有効な値: earliest および latest。

  • <max-offsets-per-trigger>: 各トリガー期間中に取得されるメッセージの最大数、またはデータの最大オフセット。

  • <output-path>: 出力データが保存されるディレクトリ。 例: /loghub/data/

  • <checkpoint-location>: チェックポイントが保存されるディレクトリ。 例: /loghub/checkpoint

  • --jars: このパラメーターは必須です。パラメーター値は、LogHub の Spark DataSource の JAR パッケージです。 このパラメーターを指定しないと、Caused by: java.lang.ClassNotFoundException: loghub.DefaultSource エラーが報告されます。

    • Spark 2 の場合、--jars パラメーターを次の値に設定できます。

      --jars /opt/apps/SPARK-EXTENSION/spark-extension-current/spark2-emrsdk/emr-datasources_shaded_2.11-2.3.1.jar
    • Spark 3 の場合、--jars パラメーターを次の値に設定できます。

      --jars /opt/apps/SPARK-EXTENSION/spark-extension-current/spark3-emrsdk/emr-datasources_shaded_2.12-3.0.2.jar
    説明

    上記の値のディレクトリがクラスターに存在しない場合は、 /usr/lib/emrsdk-current/ ディレクトリを使用してください。

PySpark Structured Streaming を使用して LogHub にアクセスする

サンプルコード

from pyspark.sql import SparkSession

spark = SparkSession \
    .builder \
    .appName("xx") \
    .getOrCreate()

# LogHub データソースからデータを読み取ります。
lines = spark \
    .readStream \
    .format("loghub") \
    .option("endpoint", "cn-hangzhou-intranet.log.aliyuncs.com") \
    .option("access.key.id", "LTAI----") \
    .option("access.key.secret", "DTi----") \
    .option("sls.project", "emr-test-hz-1") \
    .option("sls.store", "test1") \
    .option("startingoffsets", "earliest") \
    .load()


# 変換ロジックを処理します。
wordCounts = lines.groupBy("__logStore__").count()

# シンクロジックを処理します。
query = wordCounts \
    .writeStream \
    .outputMode("complete") \
    .format("console") \
    .start()

query.awaitTermination()

パラメータ

パラメータ

説明

endpoint

LogHub にアクセスするために使用されるエンドポイント。 例: cn-hangzhou-intranet.log.aliyuncs.com。

access.key.id

Alibaba Cloud アカウントの AccessKey ID。

access.key.secret

Alibaba Cloud アカウントの AccessKey シークレット。

sls.project

ログストアの名前。

sls.store

ログサービスプロジェクトの名前。

startingoffsets

データの取得を開始する位置。 有効な値: earliest および latest。

Python スクリプトを実行する

  • Spark 2 の場合、次のコマンドを実行して Python スクリプトを実行できます。

    spark-submit --jars /opt/apps/SPARK-EXTENSION/spark-extension-current/spark2-emrsdk/emr-datasources_shaded_2.11-2.3.1.jar --master local loghub.py
  • Spark 3 の場合、次のコマンドを実行して Python スクリプトを実行できます。

    spark-submit --jars /opt/apps/SPARK-EXTENSION/spark-extension-current/spark3-emrsdk/emr-datasources_shaded_2.12-3.0.2.jar --master local loghub.py
説明

上記の値のディレクトリがクラスタに存在しない場合は、/usr/lib/emrsdk-current/ ディレクトリを使用してください。