すべてのプロダクト
Search
ドキュメントセンター

E-MapReduce:オフラインモードでSparkを使用してデータを使用する

最終更新日:Jan 11, 2025

このトピックでは、Sparkを使用してログサービスのデータをオフラインモードで使用する方法について説明します。

Spark RDDを使用してログサービスのデータを使用する

サンプルコード

## TestBatchLoghub.Scala

object TestBatchLoghub {
  def main(args: Array[String]): Unit = {
    if (args.length < 6) {
      System.err.println(
        """Usage: TestBatchLoghub <sls project> <sls logstore> <sls endpoint>
          |  <access key id> <access key secret> <output path> <start time> <end time=now>
        """.stripMargin)
      System.exit(1)
    }

    val loghubProject = args(0)
    val logStore = args(1)
    val endpoint = args(2)
    val accessKeyId = args(3)
    val accessKeySecret = args(4)
    val outputPath = args(5)
    val startTime = args(6).toLong

    val sc = new SparkContext(new SparkConf().setAppName("test batch loghub"))
    var rdd:JavaRDD[String] = null
    if (args.length > 7) {
      rdd = LoghubUtils.createRDD(sc, loghubProject, logStore, accessKeyId, accessKeySecret, endpoint, startTime, args(7).toLong)
    } else {
      rdd = LoghubUtils.createRDD(sc, loghubProject, logStore, accessKeyId, accessKeySecret, endpoint, startTime)
    }

    rdd.saveAsTextFile(outputPath)
  }
}
説明

Mavenプロジェクトオブジェクトモデル(POM)ファイルの詳細については、「aliyun-emapreduce-demo」をご参照ください。

コードのコンパイルと実行

説明

サンプルコードを実行する前に、環境変数を設定する必要があります。環境変数の設定方法の詳細については、このトピックの環境変数の設定セクションをご参照ください。

## Run a command to compile the code.
mvn clean package -DskipTests

## After the code is compiled, the JAR package of the job is stored in the target/shaded/ directory. 

## Submit and run the job.
spark-submit --master yarn-cluster --executor-cores 2 --executor-memory 1g --driver-memory 1g --num-executors 2 --class x.x.x.TestBatchLoghub xxx.jar <sls project> <sls logstore> <sls endpoint> $ALIBABA_CLOUD_ACCESS_KEY_ID $ALIBABA_CLOUD_ACCESS_KEY_SECRET <output path> <start time> [<end time=now>]
重要
  • x.x.x.TestBatchLoghubxxx.jar は、実際のクラスパスとパッケージパスに置き換える必要があります。

  • ジョブリソースの設定は、実際のデータサイズとクラスタ規模に基づいて調整する必要があります。クラスタの仕様が低い場合、上記のコードのコマンドを実行してもジョブを実行できない場合があります。

Spark SQLステートメントを使用してログサービスのデータを使用する

サンプル SQL ステートメント

spark-sql --jars /opt/apps/SPARK-EXTENSION/spark-extension-current/spark3-emrsdk/* \
  --hiveconf accessKeyId=$ALIBABA_CLOUD_ACCESS_KEY_ID \
  --hiveconf accessKeySecret=$ALIBABA_CLOUD_ACCESS_KEY_SECRET
説明

/opt/apps/SPARK-EXTENSION/spark-extension-current/spark3-emrsdk/* には、LogHubデータソースのタイプが含まれています。 E-MapReduce(EMR)クラスタで Spark 2 を使用している場合は、上記のステートメントの spark3spark2 に変更する必要があります。

オンプレミスマシンの開発環境で Spark 3 を使用してログサービスのデータを使用する場合は、次の手順を実行できます。これらの手順は、Spark 2 を使用する場合に実行できる手順と似ています。

  1. クラスタの /opt/apps/SPARK-EXTENSION/spark-extension-current/spark3-emrsdk/emr-datasources_shaded_2.12 ディレクトリにある JAR パッケージをオンプレミスマシンにダウンロードします。

  2. Maven を使用して、JAR パッケージをオンプレミスマシンにインストールします。

    mvn install:install-file -DgroupId=com.aliyun.emr -DartifactId=emr-datasources_shaded_2.12 -Dversion=3.0.2 -Dpackaging=jar -Dfile=/Users/zhongqiang.czq/Downloads/tempory/emr-datasources_shaded_2.12-3.0.2.jar
  3. pom.xml ファイルに次の依存関係を追加します。

    <dependency>
      <groupId>com.aliyun.emr</groupId>
      <artifactId>emr-datasources_shaded_2.12</artifactId>
      <version>3.0.2</version>
    </dependency>

テーブルを作成し、テーブルからデータを読み取る例

create table test_sls
using loghub
  options(endpoint='cn-hangzhou-intranet.log.aliyuncs.com',
          access.key.id='${hiveconf:accessKeyId}',
          access.key.secret='${hiveconf:accessKeySecret}',
          sls.project='test_project',
          sls.store='test_store',
          startingoffsets='earliest'
);

select * from test_sls;

環境変数の設定

このセクションでは、オペレーティングシステムで ALIBABA_CLOUD_ACCESS_KEY_ID および ALIBABA_CLOUD_ACCESS_KEY_SECRET 環境変数を構成する方法について説明します。

重要
  • Alibaba Cloud アカウントの AccessKey ペアを使用して、すべての API 操作にアクセスできます。 RAM ユーザーを使用して API 操作を呼び出したり、日常の O&M を実行することをお勧めします。 RAM ユーザーの使用方法については、「RAMユーザーの作成」をご参照ください。

  • プロジェクトコードなど、他の人が簡単にアクセスできるファイルに AccessKey ペアを含めないことをお勧めします。そうしないと、AccessKey ペアが漏洩し、アカウント内のリソースが安全でなくなる可能性があります。

  • Linux および macOS

    次のコマンドを実行して、環境変数を設定します。

    <access_key_id> および <access_key_secret> を、RAM ユーザーの AccessKey ID と AccessKey シークレットに置き換えます。

    export ALIBABA_CLOUD_ACCESS_KEY_ID=<access_key_id>
    export ALIBABA_CLOUD_ACCESS_KEY_SECRET=<access_key_secret>
  • Windows

    1. 環境変数ファイルを作成し、ALIBABA_CLOUD_ACCESS_KEY_ID および ALIBABA_CLOUD_ACCESS_KEY_SECRET 環境変数をファイルに追加し、環境変数に AccessKey ID と AccessKey シークレットを設定します。

    2. AccessKey ペアを有効にするには、Windows を再起動します。

参照

Spark を使用して Kafka にアクセスする方法の詳細については、「Structured Streaming + Kafka Integration Guide」をご参照ください。