Gunakan Spark untuk membaca data Simple Log Service (SLS) dalam mode batch (offline)—sebagai dataset terbatas dengan rentang waktu mulai dan akhir yang telah ditentukan—bukan sebagai aliran kontinu. EMR mendukung dua pendekatan: Spark RDD dan Spark SQL.
Prasyarat
Sebelum memulai, pastikan Anda telah memiliki:
-
Kluster EMR dengan Spark yang telah diinstal
-
Proyek Simple Log Service dan Logstore yang berisi data yang akan dibaca
-
ID AccessKey dan rahasia AccessKey untuk RAM user yang memiliki akses baca ke Logstore tersebut
-
(Hanya untuk Spark SQL) Akses ke node kluster EMR tempat file JAR berada
Gunakan Spark RDD untuk membaca dari Simple Log Service
Kode contoh
Program Scala berikut membaca data log dari Logstore dalam rentang waktu tertentu dan menyimpan output ke path sistem file.
// TestBatchLoghub.scala
object TestBatchLoghub {
def main(args: Array[String]): Unit = {
if (args.length < 6) {
System.err.println(
"""Usage: TestBatchLoghub <sls project> <sls logstore> <sls endpoint>
| <access key id> <access key secret> <output path> <start time> <end time=now>
""".stripMargin)
System.exit(1)
}
val loghubProject = args(0)
val logStore = args(1)
val endpoint = args(2)
val accessKeyId = args(3) // Dibaca dari variabel lingkungan saat submit
val accessKeySecret = args(4) // Dibaca dari variabel lingkungan saat submit
val outputPath = args(5)
val startTime = args(6).toLong
val sc = new SparkContext(new SparkConf().setAppName("test batch loghub"))
var rdd: JavaRDD[String] = null
if (args.length > 7) {
// Baca data log antara startTime dan endTime
rdd = LoghubUtils.createRDD(sc, loghubProject, logStore, accessKeyId, accessKeySecret, endpoint, startTime, args(7).toLong)
} else {
// Baca data log dari startTime hingga sekarang
rdd = LoghubUtils.createRDD(sc, loghubProject, logStore, accessKeyId, accessKeySecret, endpoint, startTime)
}
rdd.saveAsTextFile(outputPath)
}
}
LoghubUtils.createRDD() mengembalikan JavaRDD[String], di mana setiap elemen merupakan entri log. Berikan tujuh argumen untuk membaca hingga waktu saat ini atau delapan argumen untuk menentukan waktu akhir secara eksplisit.
Untuk konfigurasi POM Maven, lihat aliyun-emapreduce-demo.
Parameter koneksi
| Parameter | Deskripsi | Contoh |
|---|---|---|
<sls project> |
Nama proyek SLS | my-project |
<sls logstore> |
Nama Logstore dalam proyek | my-logstore |
<sls endpoint> |
Titik akhir SLS untuk wilayah Anda | cn-hangzhou.log.aliyuncs.com |
<access key id> |
ID AccessKey RAM user Anda | Gunakan $ALIBABA_CLOUD_ACCESS_KEY_ID |
<access key secret> |
Rahasia AccessKey RAM user Anda | Gunakan $ALIBABA_CLOUD_ACCESS_KEY_SECRET |
<output path> |
Path output untuk hasil RDD | oss://my-bucket/output/ |
<start time> |
Awal rentang waktu | 1700000000 |
<end time> |
(Opsional) Akhir rentang waktu. Nilai default adalah waktu saat ini. | 1700003600 |
Kompilasi dan jalankan
Langkah 1: Kompilasi kode.
mvn clean package -DskipTests
File JAR hasil kompilasi disimpan ke direktori target/shaded/.
Langkah 2: Kirim pekerjaan.
Gunakan pasangan Kunci Akses RAM user, bukan pasangan Kunci Akses Akun Alibaba Cloud Anda. Pasangan Kunci Akses tingkat akun memberikan akses ke semua operasi API. Untuk cara membuat RAM user, lihat Create a RAM user. Simpan kredensial sebagai variabel lingkungan—jangan hardcode di skrip Anda.
Tetapkan variabel lingkungan sebelum submit:
export ALIBABA_CLOUD_ACCESS_KEY_ID=<access_key_id>
export ALIBABA_CLOUD_ACCESS_KEY_SECRET=<access_key_secret>
Kemudian submit pekerjaan:
spark-submit \
--master yarn-cluster \
--executor-cores 2 \
--executor-memory 1g \
--driver-memory 1g \
--num-executors 2 \
--class x.x.x.TestBatchLoghub xxx.jar \
<sls project> <sls logstore> <sls endpoint> \
$ALIBABA_CLOUD_ACCESS_KEY_ID $ALIBABA_CLOUD_ACCESS_KEY_SECRET \
<output path> <start time> [<end time>]
Ganti x.x.x.TestBatchLoghub dengan nama kelas lengkap yang sebenarnya dan xxx.jar dengan path JAR yang sebenarnya. Sesuaikan --executor-cores, --executor-memory, dan --num-executors berdasarkan volume data dan kapasitas kluster Anda.
Gunakan Spark SQL untuk membaca dari Simple Log Service
Spark SQL menggunakan sumber data loghub, yang termasuk dalam file JAR ekstensi Spark EMR.
Jalankan spark-sql dengan JAR LogHub
Gunakan pasangan Kunci Akses RAM user, bukan pasangan Kunci Akses Akun Alibaba Cloud Anda. Simpan kredensial sebagai variabel lingkungan yang diteruskan melalui --hiveconf—jangan hardcode di skrip Anda.
spark-sql \
--jars /opt/apps/SPARK-EXTENSION/spark-extension-current/spark3-emrsdk/* \
--hiveconf accessKeyId=$ALIBABA_CLOUD_ACCESS_KEY_ID \
--hiveconf accessKeySecret=$ALIBABA_CLOUD_ACCESS_KEY_SECRET
Jika kluster EMR Anda menggunakan Spark 2, ganti spark3 dengan spark2 pada path JAR:
/opt/apps/SPARK-EXTENSION/spark-extension-current/spark2-emrsdk/*
Buat tabel dan kueri data
CREATE TABLE test_sls
USING loghub
OPTIONS (
endpoint = 'cn-hangzhou-intranet.log.aliyuncs.com',
access.key.id = '${hiveconf:accessKeyId}',
access.key.secret= '${hiveconf:accessKeySecret}',
sls.project = 'test_project',
sls.store = 'test_store',
startingoffsets = 'earliest'
);
SELECT * FROM test_sls;
Parameter koneksi
| Parameter | Wajib | Deskripsi | Contoh |
|---|---|---|---|
endpoint |
Ya | Titik akhir SLS untuk wilayah Anda | cn-hangzhou-intranet.log.aliyuncs.com |
access.key.id |
Ya | ID AccessKey, diteruskan melalui --hiveconf |
${hiveconf:accessKeyId} |
access.key.secret |
Ya | Rahasia AccessKey, diteruskan melalui --hiveconf |
${hiveconf:accessKeySecret} |
sls.project |
Ya | Nama proyek SLS | test_project |
sls.store |
Ya | Nama Logstore | test_store |
startingoffsets |
Tidak | Posisi awal pembacaan. Gunakan earliest untuk membaca semua data yang tersedia sejak awal Logstore. |
earliest |
Gunakan JAR LogHub di lingkungan pengembangan lokal
Untuk mengembangkan dan menguji secara lokal terhadap Spark 3 (langkahnya sama untuk Spark 2), instal file JAR sumber data EMR ke repositori Maven lokal Anda.
Langkah 1: Unduh JAR dari kluster EMR Anda.
Salin file JAR dari path berikut pada node kluster Anda ke mesin lokal:
/opt/apps/SPARK-EXTENSION/spark-extension-current/spark3-emrsdk/emr-datasources_shaded_2.12
Langkah 2: Instal JAR ke repositori Maven lokal Anda.
mvn install:install-file \
-DgroupId=com.aliyun.emr \
-DartifactId=emr-datasources_shaded_2.12 \
-Dversion=3.0.2 \
-Dpackaging=jar \
-Dfile=<path-to-downloaded-jar>
Ganti <path-to-downloaded-jar> dengan path lokal tempat Anda menyimpan file JAR tersebut.
Langkah 3: Tambahkan dependensi ke pom.xml Anda.
<dependency>
<groupId>com.aliyun.emr</groupId>
<artifactId>emr-datasources_shaded_2.12</artifactId>
<version>3.0.2</version>
</dependency>
Referensi
Untuk informasi lebih lanjut tentang cara menggunakan Spark untuk mengakses Kafka, lihat Structured Streaming + Kafka Integration Guide.