すべてのプロダクト
Search
ドキュメントセンター

E-MapReduce:オフライン Spark 利用例

最終更新日:Mar 27, 2026

Spark を使用して、Simple Log Service (SLS) のデータをバッチ (オフライン) モードで読み取ります。これは、連続ストリームではなく、開始時刻と終了時刻が定義された有界データセットとして扱われます。EMR は、Spark RDD と Spark SQL の2つのアプローチをサポートしています。

前提条件

開始する前に、以下を準備してください。

  • Spark がインストールされた EMR クラスター

  • 読み取るデータを含む Simple Log Service プロジェクトと Logstore

  • Logstore への読み取りアクセス権を持つ RAM ユーザーの AccessKey ID と AccessKey Secret

  • (Spark SQL のみ) JAR ファイルが配置されている EMR クラスターノードへのアクセス

Spark RDD を使用した Simple Log Service からの読み取り

サンプルコード

以下の Scala プログラムは、指定された時間範囲のログデータを Logstore から読み取り、その出力をファイルシステムパスに保存します。

// TestBatchLoghub.scala

object TestBatchLoghub {
  def main(args: Array[String]): Unit = {
    if (args.length < 6) {
      System.err.println(
        """Usage: TestBatchLoghub <sls project> <sls logstore> <sls endpoint>
          |  <access key id> <access key secret> <output path> <start time> <end time=now>
        """.stripMargin)
      System.exit(1)
    }

    val loghubProject    = args(0)
    val logStore         = args(1)
    val endpoint         = args(2)
    val accessKeyId      = args(3)     // Read from environment variable at submit time
    val accessKeySecret  = args(4)     // Read from environment variable at submit time
    val outputPath       = args(5)
    val startTime        = args(6).toLong

    val sc = new SparkContext(new SparkConf().setAppName("test batch loghub"))
    var rdd: JavaRDD[String] = null

    if (args.length > 7) {
      // Read log data between startTime and endTime
      rdd = LoghubUtils.createRDD(sc, loghubProject, logStore, accessKeyId, accessKeySecret, endpoint, startTime, args(7).toLong)
    } else {
      // Read log data from startTime to now
      rdd = LoghubUtils.createRDD(sc, loghubProject, logStore, accessKeyId, accessKeySecret, endpoint, startTime)
    }

    rdd.saveAsTextFile(outputPath)
  }
}

LoghubUtils.createRDD() は、各要素がログエントリである JavaRDD[String] を返します。現在時刻まで読み取るには7つの引数を渡し、明示的な終了時刻を指定するには8つの引数を渡します。

Maven POM 構成の詳細については、「aliyun-emapreduce-demo」をご参照ください。

接続パラメーター

パラメーター 説明
<sls project> SLS プロジェクト名 my-project
<sls logstore> プロジェクト内の Logstore 名 my-logstore
<sls endpoint> ご利用のリージョンの SLS エンドポイント cn-hangzhou.log.aliyuncs.com
<access key id> ご利用の RAM ユーザーの AccessKey ID $ALIBABA_CLOUD_ACCESS_KEY_ID
<access key secret> ご利用の RAM ユーザーの AccessKey Secret $ALIBABA_CLOUD_ACCESS_KEY_SECRET
<output path> RDD 結果の出力パス oss://my-bucket/output/
<start time> 時間範囲の開始時刻 1700000000
<end time> (オプション) 時間範囲の終了時刻。デフォルトは現在時刻です。 1700003600

コンパイルと実行

ステップ 1: コードのコンパイル。

mvn clean package -DskipTests

コンパイルされた JAR は target/shaded/ ディレクトリに保存されます。

ステップ 2: ジョブの送信。

重要

Alibaba Cloud アカウント (root ユーザー) の AccessKey ペアではなく、RAM ユーザーの AccessKey ペアを使用してください。アカウントレベルの AccessKey ペアは、すべての API オペレーションへのアクセスを許可します。RAM ユーザーの作成方法については、「Create a RAM user」をご参照ください。認証情報は環境変数として保存し、スクリプトにハードコードしないでください。

送信する前に環境変数を設定します。

export ALIBABA_CLOUD_ACCESS_KEY_ID=<access_key_id>
export ALIBABA_CLOUD_ACCESS_KEY_SECRET=<access_key_secret>

次に、ジョブを送信します。

spark-submit \
  --master yarn-cluster \
  --executor-cores 2 \
  --executor-memory 1g \
  --driver-memory 1g \
  --num-executors 2 \
  --class x.x.x.TestBatchLoghub xxx.jar \
  <sls project> <sls logstore> <sls endpoint> \
  $ALIBABA_CLOUD_ACCESS_KEY_ID $ALIBABA_CLOUD_ACCESS_KEY_SECRET \
  <output path> <start time> [<end time>]

x.x.x.TestBatchLoghub を実際の完全修飾クラス名に、xxx.jar を実際の JAR パスに置き換えます。--executor-cores--executor-memory、および --num-executors は、データ量とクラスター容量に基づいて調整してください。

Spark SQL を使用した Simple Log Service からの読み取り

Spark SQL は、EMR Spark 拡張 JAR に含まれる loghub データソースを使用します。

LogHub JAR を使用した spark-sql の起動

重要

Alibaba Cloud アカウント (root ユーザー) の AccessKey ペアではなく、RAM ユーザーの AccessKey ペアを使用してください。認証情報は --hiveconf を介して渡される環境変数として保存し、スクリプトにハードコードしないでください。

spark-sql \
  --jars /opt/apps/SPARK-EXTENSION/spark-extension-current/spark3-emrsdk/* \
  --hiveconf accessKeyId=$ALIBABA_CLOUD_ACCESS_KEY_ID \
  --hiveconf accessKeySecret=$ALIBABA_CLOUD_ACCESS_KEY_SECRET

EMR クラスターが Spark 2 を使用している場合は、JAR パスの spark3spark2 に置き換えます。

/opt/apps/SPARK-EXTENSION/spark-extension-current/spark2-emrsdk/*

テーブルの作成とデータクエリ

CREATE TABLE test_sls
USING loghub
OPTIONS (
  endpoint         = 'cn-hangzhou-intranet.log.aliyuncs.com',
  access.key.id    = '${hiveconf:accessKeyId}',
  access.key.secret= '${hiveconf:accessKeySecret}',
  sls.project      = 'test_project',
  sls.store        = 'test_store',
  startingoffsets  = 'earliest'
);

SELECT * FROM test_sls;

接続パラメーター

パラメーター 必須 説明
endpoint はい ご利用のリージョンの SLS エンドポイント cn-hangzhou-intranet.log.aliyuncs.com
access.key.id はい AccessKey ID。 --hiveconf ${hiveconf:accessKeyId}
access.key.secret はい AccessKey Secret。 --hiveconf ${hiveconf:accessKeySecret}
sls.project はい SLS プロジェクト名 test_project
sls.store はい Logstore 名 test_store
startingoffsets いいえ 読み取り開始位置。earliest を使用して、Logstore の最初から利用可能なすべてのデータを読み取ります。 earliest

オンプレミス開発環境での LogHub JAR の使用

Spark 3 に対してローカルで開発およびテストを行うには (Spark 2 でも手順は同じです)、EMR データソース JAR をローカル Maven リポジトリにインストールします。

ステップ 1: EMR クラスターから JAR をダウンロード。

クラスターノード上の次のパスから、JAR をローカルマシンにコピーします。

/opt/apps/SPARK-EXTENSION/spark-extension-current/spark3-emrsdk/emr-datasources_shaded_2.12

ステップ 2: JAR をローカル Maven リポジトリにインストール。

mvn install:install-file \
  -DgroupId=com.aliyun.emr \
  -DartifactId=emr-datasources_shaded_2.12 \
  -Dversion=3.0.2 \
  -Dpackaging=jar \
  -Dfile=<path-to-downloaded-jar>

<path-to-downloaded-jar> を、JAR を保存したローカルパスに置き換えます。

ステップ 3: pom.xml に依存関係を追加。

<dependency>
  <groupId>com.aliyun.emr</groupId>
  <artifactId>emr-datasources_shaded_2.12</artifactId>
  <version>3.0.2</version>
</dependency>

参考資料

Spark を使用して Kafka にアクセスする方法の詳細については、「Structured Streaming + Kafka Integration Guide」をご参照ください。