Di era data besar, pemrosesan aliran sangat penting untuk analitik data real-time. EMR Serverless Spark adalah platform andal dan dapat diskalakan yang menyederhanakan pemrosesan data real-time serta meningkatkan efisiensi dengan menghilangkan kebutuhan untuk mengelola server. Topik ini menjelaskan cara mengirim pekerjaan streaming PySpark menggunakan EMR Serverless Spark dan menunjukkan kemudahan penggunaan serta pemeliharaan platform tersebut untuk pemrosesan aliran.
Prasyarat
Ruang kerja telah dibuat. Untuk informasi selengkapnya, lihat Buat ruang kerja.
Prosedur
Langkah 1: Buat kluster Dataflow real-time dan hasilkan pesan
Pada halaman EMR on ECS, buat kluster Dataflow yang mencakup layanan Kafka. Untuk informasi selengkapnya, lihat Buat kluster.
Masuk ke node master kluster EMR. Untuk informasi lebih lanjut, lihat Masuk ke Kluster.
Jalankan perintah berikut untuk mengganti direktori.
cd /var/log/emr/taihao_exporterJalankan perintah berikut untuk membuat topik.
# Buat topik bernama taihaometrics dengan 10 partisi dan faktor replikasi 2. kafka-topics.sh --partitions 10 --replication-factor 2 --bootstrap-server core-1-1:9092 --topic taihaometrics --createJalankan perintah berikut untuk mengirim pesan.
# Gunakan kafka-console-producer untuk mengirim pesan ke topik taihaometrics. tail -f metrics.log | kafka-console-producer.sh --broker-list core-1-1:9092 --topic taihaometrics
Langkah 2: Buat koneksi jaringan
Buka halaman Koneksi Jaringan.
Pada panel navigasi di sebelah kiri Konsol EMR, pilih .
Pada halaman Spark, klik nama ruang kerja yang dituju.
Pada halaman EMR Serverless Spark, klik Network Connections pada panel navigasi di sebelah kiri.
Di halaman Network Connections, klik Create Network Connection.
Pada kotak dialog Create Network Connection, konfigurasikan parameter berikut lalu klik OK.
Parameter
Deskripsi
Name
Masukkan nama untuk koneksi baru. Misalnya, connection_to_emr_kafka.
Virtual Private Cloud (VPC)
Pilih VPC yang sama dengan kluster EMR.
Jika tidak tersedia VPC, klik Create VPC untuk membuka Konsol VPC dan membuatnya. Untuk informasi selengkapnya, lihat Buat dan kelola VPC.
VSwitch
Pilih vSwitch yang sama yang berada dalam VPC yang sama dengan kluster EMR.
Jika tidak tersedia vSwitch di zona saat ini, klik VSwitch untuk membuka Konsol VPC dan membuatnya. Untuk informasi selengkapnya, lihat Buat dan kelola vSwitch.
Saat Status berubah menjadi Succeeded, koneksi jaringan telah berhasil dibuat.
Langkah 3: Tambahkan aturan grup keamanan untuk kluster EMR
Dapatkan blok CIDR dari vSwitch untuk node kluster.
Pada halaman Node Management, klik nama grup node untuk melihat informasi vSwitch terkait. Lalu, masuk ke Konsol VPC dan dapatkan blok CIDR vSwitch pada halaman VSwitch.

Tambahkan aturan grup keamanan.
Pada halaman Cluster Management, klik ID kluster yang dituju.
Pada halaman Basic Information, klik tautan di samping Cluster Security Group.
Pada halaman Security Group Details, pada bagian Access Rules, klik Add Rule. Konfigurasikan parameter berikut lalu klik OK.
Parameter
Deskripsi
Source
Masukkan blok CIDR vSwitch yang Anda peroleh pada langkah sebelumnya.
PentingUntuk mencegah risiko keamanan akibat serangan eksternal, jangan mengatur objek otorisasi menjadi 0.0.0.0/0.
Destination (This Instance)
Masukkan Port 9092.
Langkah 4: Unggah paket JAR ke OSS
Ekstrak file kafka.zip dan unggah semua paket JAR ke Object Storage Service (OSS). Untuk informasi selengkapnya, lihat Unggahan sederhana.
Langkah 5: Unggah file sumber daya
Pada halaman EMR Serverless Spark, klik File Management pada panel navigasi di sebelah kiri.
Pada halaman File Management, klik Upload File.
Pada kotak dialog Upload File, klik area unggah dan pilih file pyspark_ss_demo.py.
Langkah 6: Buat dan mulai pekerjaan streaming
Pada halaman EMR Serverless Spark, klik Data Development pada panel navigasi di sebelah kiri.
Pada tab Development, klik ikon
.Masukkan nama, pilih sebagai jenis pekerjaan, lalu klik OK.
Pada tab pengembangan baru, konfigurasikan parameter berikut, biarkan parameter lain pada nilai default-nya, lalu klik Save.
Parameter
Deskripsi
Main Python Resource
Pilih file pyspark_ss_demo.py yang telah Anda unggah pada halaman Resource Upload pada langkah sebelumnya.
Engine Version
Versi Spark. Untuk informasi selengkapnya, lihat Versi mesin.
Runtime Parameters
Alamat IP internal dari node core-1-1 kluster EMR. Anda dapat melihat alamat tersebut di bawah grup node Core pada halaman Node Management kluster EMR.
Spark Configurations
Informasi konfigurasi Spark. Kode berikut memberikan contohnya.
spark.jars oss://path/to/commons-pool2-2.11.1.jar,oss://path/to/kafka-clients-2.8.1.jar,oss://path/to/spark-sql-kafka-0-10_2.12-3.3.1.jar,oss://path/to/spark-token-provider-kafka-0-10_2.12-3.3.1.jar spark.emr.serverless.network.service.name connection_to_emr_kafkaCatatanspark.jars: Menentukan jalur paket JAR eksternal yang akan dimuat saat waktu proses. Ganti jalur tersebut dengan jalur aktual semua paket JAR yang Anda unggah pada Langkah 4.spark.emr.serverless.network.service.name: Menentukan nama koneksi jaringan. Ganti nama tersebut dengan nama koneksi jaringan yang Anda buat pada Langkah 2.
Klik Publish.
Pada kotak dialog Publish Job, klik OK.
Mulai pekerjaan streaming.
Klik Go To O&M.
Klik Start.
Langkah 7: Lihat log
Klik tab Log Exploration.
Pada tab Log Exploration, Anda dapat melihat informasi tentang eksekusi aplikasi dan hasil yang dikembalikan.

Referensi
Untuk informasi selengkapnya tentang proses pengembangan PySpark, lihat Panduan cepat pengembangan PySpark.