Spark を使用して、Simple Log Service (SLS) のデータをバッチ (オフライン) モードで読み取ります。これは、連続ストリームではなく、開始時刻と終了時刻が定義された有界データセットとして扱われます。EMR は、Spark RDD と Spark SQL の2つのアプローチをサポートしています。
前提条件
開始する前に、以下を準備してください。
-
Spark がインストールされた EMR クラスター
-
読み取るデータを含む Simple Log Service プロジェクトと Logstore
-
Logstore への読み取りアクセス権を持つ RAM ユーザーの AccessKey ID と AccessKey Secret
-
(Spark SQL のみ) JAR ファイルが配置されている EMR クラスターノードへのアクセス
Spark RDD を使用した Simple Log Service からの読み取り
サンプルコード
以下の Scala プログラムは、指定された時間範囲のログデータを Logstore から読み取り、その出力をファイルシステムパスに保存します。
// TestBatchLoghub.scala
object TestBatchLoghub {
def main(args: Array[String]): Unit = {
if (args.length < 6) {
System.err.println(
"""Usage: TestBatchLoghub <sls project> <sls logstore> <sls endpoint>
| <access key id> <access key secret> <output path> <start time> <end time=now>
""".stripMargin)
System.exit(1)
}
val loghubProject = args(0)
val logStore = args(1)
val endpoint = args(2)
val accessKeyId = args(3) // Read from environment variable at submit time
val accessKeySecret = args(4) // Read from environment variable at submit time
val outputPath = args(5)
val startTime = args(6).toLong
val sc = new SparkContext(new SparkConf().setAppName("test batch loghub"))
var rdd: JavaRDD[String] = null
if (args.length > 7) {
// Read log data between startTime and endTime
rdd = LoghubUtils.createRDD(sc, loghubProject, logStore, accessKeyId, accessKeySecret, endpoint, startTime, args(7).toLong)
} else {
// Read log data from startTime to now
rdd = LoghubUtils.createRDD(sc, loghubProject, logStore, accessKeyId, accessKeySecret, endpoint, startTime)
}
rdd.saveAsTextFile(outputPath)
}
}
LoghubUtils.createRDD() は、各要素がログエントリである JavaRDD[String] を返します。現在時刻まで読み取るには7つの引数を渡し、明示的な終了時刻を指定するには8つの引数を渡します。
Maven POM 構成の詳細については、「aliyun-emapreduce-demo」をご参照ください。
接続パラメーター
| パラメーター | 説明 | 例 |
|---|---|---|
<sls project> |
SLS プロジェクト名 | my-project |
<sls logstore> |
プロジェクト内の Logstore 名 | my-logstore |
<sls endpoint> |
ご利用のリージョンの SLS エンドポイント | cn-hangzhou.log.aliyuncs.com |
<access key id> |
ご利用の RAM ユーザーの AccessKey ID | $ALIBABA_CLOUD_ACCESS_KEY_ID |
<access key secret> |
ご利用の RAM ユーザーの AccessKey Secret | $ALIBABA_CLOUD_ACCESS_KEY_SECRET |
<output path> |
RDD 結果の出力パス | oss://my-bucket/output/ |
<start time> |
時間範囲の開始時刻 | 1700000000 |
<end time> |
(オプション) 時間範囲の終了時刻。デフォルトは現在時刻です。 | 1700003600 |
コンパイルと実行
ステップ 1: コードのコンパイル。
mvn clean package -DskipTests
コンパイルされた JAR は target/shaded/ ディレクトリに保存されます。
ステップ 2: ジョブの送信。
Alibaba Cloud アカウント (root ユーザー) の AccessKey ペアではなく、RAM ユーザーの AccessKey ペアを使用してください。アカウントレベルの AccessKey ペアは、すべての API オペレーションへのアクセスを許可します。RAM ユーザーの作成方法については、「Create a RAM user」をご参照ください。認証情報は環境変数として保存し、スクリプトにハードコードしないでください。
送信する前に環境変数を設定します。
export ALIBABA_CLOUD_ACCESS_KEY_ID=<access_key_id>
export ALIBABA_CLOUD_ACCESS_KEY_SECRET=<access_key_secret>
次に、ジョブを送信します。
spark-submit \
--master yarn-cluster \
--executor-cores 2 \
--executor-memory 1g \
--driver-memory 1g \
--num-executors 2 \
--class x.x.x.TestBatchLoghub xxx.jar \
<sls project> <sls logstore> <sls endpoint> \
$ALIBABA_CLOUD_ACCESS_KEY_ID $ALIBABA_CLOUD_ACCESS_KEY_SECRET \
<output path> <start time> [<end time>]
x.x.x.TestBatchLoghub を実際の完全修飾クラス名に、xxx.jar を実際の JAR パスに置き換えます。--executor-cores、--executor-memory、および --num-executors は、データ量とクラスター容量に基づいて調整してください。
Spark SQL を使用した Simple Log Service からの読み取り
Spark SQL は、EMR Spark 拡張 JAR に含まれる loghub データソースを使用します。
LogHub JAR を使用した spark-sql の起動
Alibaba Cloud アカウント (root ユーザー) の AccessKey ペアではなく、RAM ユーザーの AccessKey ペアを使用してください。認証情報は --hiveconf を介して渡される環境変数として保存し、スクリプトにハードコードしないでください。
spark-sql \
--jars /opt/apps/SPARK-EXTENSION/spark-extension-current/spark3-emrsdk/* \
--hiveconf accessKeyId=$ALIBABA_CLOUD_ACCESS_KEY_ID \
--hiveconf accessKeySecret=$ALIBABA_CLOUD_ACCESS_KEY_SECRET
EMR クラスターが Spark 2 を使用している場合は、JAR パスの spark3 を spark2 に置き換えます。
/opt/apps/SPARK-EXTENSION/spark-extension-current/spark2-emrsdk/*
テーブルの作成とデータクエリ
CREATE TABLE test_sls
USING loghub
OPTIONS (
endpoint = 'cn-hangzhou-intranet.log.aliyuncs.com',
access.key.id = '${hiveconf:accessKeyId}',
access.key.secret= '${hiveconf:accessKeySecret}',
sls.project = 'test_project',
sls.store = 'test_store',
startingoffsets = 'earliest'
);
SELECT * FROM test_sls;
接続パラメーター
| パラメーター | 必須 | 説明 | 例 |
|---|---|---|---|
endpoint |
はい | ご利用のリージョンの SLS エンドポイント | cn-hangzhou-intranet.log.aliyuncs.com |
access.key.id |
はい | AccessKey ID。 --hiveconf |
${hiveconf:accessKeyId} |
access.key.secret |
はい | AccessKey Secret。 --hiveconf |
${hiveconf:accessKeySecret} |
sls.project |
はい | SLS プロジェクト名 | test_project |
sls.store |
はい | Logstore 名 | test_store |
startingoffsets |
いいえ | 読み取り開始位置。earliest を使用して、Logstore の最初から利用可能なすべてのデータを読み取ります。 |
earliest |
オンプレミス開発環境での LogHub JAR の使用
Spark 3 に対してローカルで開発およびテストを行うには (Spark 2 でも手順は同じです)、EMR データソース JAR をローカル Maven リポジトリにインストールします。
ステップ 1: EMR クラスターから JAR をダウンロード。
クラスターノード上の次のパスから、JAR をローカルマシンにコピーします。
/opt/apps/SPARK-EXTENSION/spark-extension-current/spark3-emrsdk/emr-datasources_shaded_2.12
ステップ 2: JAR をローカル Maven リポジトリにインストール。
mvn install:install-file \
-DgroupId=com.aliyun.emr \
-DartifactId=emr-datasources_shaded_2.12 \
-Dversion=3.0.2 \
-Dpackaging=jar \
-Dfile=<path-to-downloaded-jar>
<path-to-downloaded-jar> を、JAR を保存したローカルパスに置き換えます。
ステップ 3: pom.xml に依存関係を追加。
<dependency>
<groupId>com.aliyun.emr</groupId>
<artifactId>emr-datasources_shaded_2.12</artifactId>
<version>3.0.2</version>
</dependency>
参考資料
Spark を使用して Kafka にアクセスする方法の詳細については、「Structured Streaming + Kafka Integration Guide」をご参照ください。