Topik ini menjelaskan cara menggunakan Spark untuk mengonsumsi data Layanan Log dalam mode offline.
Gunakan Spark RDD untuk mengonsumsi data Layanan Log
Kode contoh
## TestBatchLoghub.Scala
object TestBatchLoghub {
def main(args: Array[String]): Unit = {
if (args.length < 6) {
System.err.println(
"""Usage: TestBatchLoghub <sls project> <sls logstore> <sls endpoint>
| <access key id> <access key secret> <output path> <start time> <end time=now>
""".stripMargin)
System.exit(1)
}
val loghubProject = args(0)
val logStore = args(1)
val endpoint = args(2)
val accessKeyId = args(3)
val accessKeySecret = args(4)
val outputPath = args(5)
val startTime = args(6).toLong
val sc = new SparkContext(new SparkConf().setAppName("test batch loghub"))
var rdd:JavaRDD[String] = null
if (args.length > 7) {
rdd = LoghubUtils.createRDD(sc, loghubProject, logStore, accessKeyId, accessKeySecret, endpoint, startTime, args(7).toLong)
} else {
rdd = LoghubUtils.createRDD(sc, loghubProject, logStore, accessKeyId, accessKeySecret, endpoint, startTime)
}
rdd.saveAsTextFile(outputPath)
}
}Untuk informasi lebih lanjut tentang file model objek proyek Maven (POM), lihat aliyun-emapreduce-demo.
Kompilasi dan jalankan kode
Anda harus mengonfigurasi variabel lingkungan sebelum dapat menjalankan kode contoh. Untuk informasi lebih lanjut tentang cara mengonfigurasi variabel lingkungan, lihat bagian Konfigurasikan Variabel Lingkungan dalam topik ini.
## Jalankan perintah untuk mengompilasi kode.
mvn clean package -DskipTests
## Setelah kode dikompilasi, paket JAR pekerjaan disimpan di direktori target/shaded/.
## Kirim dan jalankan pekerjaan.
spark-submit --master yarn-cluster --executor-cores 2 --executor-memory 1g --driver-memory 1g --num-executors 2 --class x.x.x.TestBatchLoghub xxx.jar <sls project> <sls logstore> <sls endpoint> $ALIBABA_CLOUD_ACCESS_KEY_ID $ALIBABA_CLOUD_ACCESS_KEY_SECRET <output path> <start time> [<end time=now>]Anda perlu mengganti
x.x.x.TestBatchLoghubdanxxx.jardengan classpath dan jalur paket aktual berdasarkan kebutuhan bisnis Anda.Anda harus menyesuaikan konfigurasi sumber daya pekerjaan berdasarkan ukuran data aktual dan skala kluster. Jika kluster menggunakan spesifikasi rendah, Anda mungkin gagal menjalankan pekerjaan dengan menjalankan perintah dalam kode sebelumnya.
Gunakan pernyataan SQL Spark untuk mengonsumsi data Layanan Log
Pernyataan SQL contoh
spark-sql --jars /opt/apps/SPARK-EXTENSION/spark-extension-current/spark3-emrsdk/* \
--hiveconf accessKeyId=$ALIBABA_CLOUD_ACCESS_KEY_ID \
--hiveconf accessKeySecret=$ALIBABA_CLOUD_ACCESS_KEY_SECRET/opt/apps/SPARK-EXTENSION/spark-extension-current/spark3-emrsdk/* berisi tipe sumber data LogHub. Jika kluster E-MapReduce (EMR) Anda menggunakan Spark 2, Anda harus mengubah spark3 dalam pernyataan sebelumnya menjadi spark2.
Jika Anda ingin menggunakan Spark 3 untuk mengonsumsi data Layanan Log dalam lingkungan pengembangan mesin lokal Anda, Anda dapat melakukan langkah-langkah berikut. Langkah-langkah tersebut mirip dengan langkah-langkah yang dapat Anda lakukan saat menggunakan Spark 2.
Unduh paket JAR di direktori
/opt/apps/SPARK-EXTENSION/spark-extension-current/spark3-emrsdk/emr-datasources_shaded_2.12kluster Anda ke mesin lokal Anda.Gunakan Maven untuk menginstal paket JAR ke mesin lokal Anda.
mvn install:install-file -DgroupId=com.aliyun.emr -DartifactId=emr-datasources_shaded_2.12 -Dversion=3.0.2 -Dpackaging=jar -Dfile=/Users/zhongqiang.czq/Downloads/tempory/emr-datasources_shaded_2.12-3.0.2.jarTambahkan dependensi berikut ke file pom.xml:
<dependency> <groupId>com.aliyun.emr</groupId> <artifactId>emr-datasources_shaded_2.12</artifactId> <version>3.0.2</version> </dependency>
Contoh pembuatan tabel dan membaca data dari tabel
create table test_sls
using loghub
options(endpoint='cn-hangzhou-intranet.log.aliyuncs.com',
access.key.id='${hiveconf:accessKeyId}',
access.key.secret='${hiveconf:accessKeySecret}',
sls.project='test_project',
sls.store='test_store',
startingoffsets='earliest'
);
select * from test_sls;Konfigurasikan variabel lingkungan
Berikut ini menjelaskan cara mengonfigurasi variabel lingkungan ALIBABA_CLOUD_ACCESS_KEY_ID dan ALIBABA_CLOUD_ACCESS_KEY_SECRET di sistem operasi Anda.
Pasangan AccessKey akun Alibaba Cloud dapat digunakan untuk mengakses semua operasi API. Kami merekomendasikan Anda menggunakan pengguna RAM untuk memanggil operasi API atau melakukan O&M rutin. Untuk informasi tentang cara menggunakan pengguna RAM, lihat Buat Pengguna RAM.
Kami merekomendasikan Anda tidak menyertakan pasangan AccessKey Anda dalam file yang mudah diakses oleh orang lain, seperti kode proyek. Jika tidak, pasangan AccessKey Anda mungkin bocor dan sumber daya di akun Anda menjadi tidak aman.
Linux dan macOS
Jalankan perintah berikut untuk mengonfigurasi variabel lingkungan.
Ganti
<access_key_id>dan<access_key_secret>dengan ID AccessKey dan Rahasia AccessKey pengguna RAM Anda.export ALIBABA_CLOUD_ACCESS_KEY_ID=<access_key_id> export ALIBABA_CLOUD_ACCESS_KEY_SECRET=<access_key_secret>Windows
Buat file variabel lingkungan, tambahkan variabel lingkungan ALIBABA_CLOUD_ACCESS_KEY_ID dan ALIBABA_CLOUD_ACCESS_KEY_SECRET ke file tersebut, lalu atur variabel lingkungan ke ID AccessKey dan Rahasia AccessKey Anda.
Mulai ulang Windows agar pasangan AccessKey berlaku.
Referensi
Untuk informasi lebih lanjut tentang cara menggunakan Spark untuk mengakses Kafka, lihat Panduan Integrasi Structured Streaming + Kafka.