全部产品
Search
文档中心

E-MapReduce:Kirim pekerjaan streaming PySpark menggunakan Serverless Spark

更新时间:Nov 10, 2025

Di era data besar, pemrosesan aliran sangat penting untuk analitik data real-time. EMR Serverless Spark adalah platform andal dan dapat diskalakan yang menyederhanakan pemrosesan data real-time serta meningkatkan efisiensi dengan menghilangkan kebutuhan untuk mengelola server. Topik ini menjelaskan cara mengirim pekerjaan streaming PySpark menggunakan EMR Serverless Spark dan menunjukkan kemudahan penggunaan serta pemeliharaan platform tersebut untuk pemrosesan aliran.

Prasyarat

Ruang kerja telah dibuat. Untuk informasi selengkapnya, lihat Buat ruang kerja.

Prosedur

Langkah 1: Buat kluster Dataflow real-time dan hasilkan pesan

  1. Pada halaman EMR on ECS, buat kluster Dataflow yang mencakup layanan Kafka. Untuk informasi selengkapnya, lihat Buat kluster.

  2. Masuk ke node master kluster EMR. Untuk informasi lebih lanjut, lihat Masuk ke Kluster.

  3. Jalankan perintah berikut untuk mengganti direktori.

    cd /var/log/emr/taihao_exporter
  4. Jalankan perintah berikut untuk membuat topik.

    # Buat topik bernama taihaometrics dengan 10 partisi dan faktor replikasi 2.
    kafka-topics.sh --partitions 10 --replication-factor 2 --bootstrap-server core-1-1:9092 --topic taihaometrics --create
  5. Jalankan perintah berikut untuk mengirim pesan.

    # Gunakan kafka-console-producer untuk mengirim pesan ke topik taihaometrics.
    tail -f metrics.log | kafka-console-producer.sh --broker-list core-1-1:9092 --topic taihaometrics

Langkah 2: Buat koneksi jaringan

  1. Buka halaman Koneksi Jaringan.

    1. Pada panel navigasi di sebelah kiri Konsol EMR, pilih EMR Serverless > Spark.

    2. Pada halaman Spark, klik nama ruang kerja yang dituju.

    3. Pada halaman EMR Serverless Spark, klik Network Connections pada panel navigasi di sebelah kiri.

  2. Di halaman Network Connections, klik Create Network Connection.

  3. Pada kotak dialog Create Network Connection, konfigurasikan parameter berikut lalu klik OK.

    Parameter

    Deskripsi

    Name

    Masukkan nama untuk koneksi baru. Misalnya, connection_to_emr_kafka.

    Virtual Private Cloud (VPC)

    Pilih VPC yang sama dengan kluster EMR.

    Jika tidak tersedia VPC, klik Create VPC untuk membuka Konsol VPC dan membuatnya. Untuk informasi selengkapnya, lihat Buat dan kelola VPC.

    VSwitch

    Pilih vSwitch yang sama yang berada dalam VPC yang sama dengan kluster EMR.

    Jika tidak tersedia vSwitch di zona saat ini, klik VSwitch untuk membuka Konsol VPC dan membuatnya. Untuk informasi selengkapnya, lihat Buat dan kelola vSwitch.

    Saat Status berubah menjadi Succeeded, koneksi jaringan telah berhasil dibuat.

Langkah 3: Tambahkan aturan grup keamanan untuk kluster EMR

  1. Dapatkan blok CIDR dari vSwitch untuk node kluster.

    Pada halaman Node Management, klik nama grup node untuk melihat informasi vSwitch terkait. Lalu, masuk ke Konsol VPC dan dapatkan blok CIDR vSwitch pada halaman VSwitch.

    image

  2. Tambahkan aturan grup keamanan.

    1. Pada halaman Cluster Management, klik ID kluster yang dituju.

    2. Pada halaman Basic Information, klik tautan di samping Cluster Security Group.

    3. Pada halaman Security Group Details, pada bagian Access Rules, klik Add Rule. Konfigurasikan parameter berikut lalu klik OK.

      Parameter

      Deskripsi

      Source

      Masukkan blok CIDR vSwitch yang Anda peroleh pada langkah sebelumnya.

      Penting

      Untuk mencegah risiko keamanan akibat serangan eksternal, jangan mengatur objek otorisasi menjadi 0.0.0.0/0.

      Destination (This Instance)

      Masukkan Port 9092.

Langkah 4: Unggah paket JAR ke OSS

Ekstrak file kafka.zip dan unggah semua paket JAR ke Object Storage Service (OSS). Untuk informasi selengkapnya, lihat Unggahan sederhana.

Langkah 5: Unggah file sumber daya

  1. Pada halaman EMR Serverless Spark, klik File Management pada panel navigasi di sebelah kiri.

  2. Pada halaman File Management, klik Upload File.

  3. Pada kotak dialog Upload File, klik area unggah dan pilih file pyspark_ss_demo.py.

Langkah 6: Buat dan mulai pekerjaan streaming

  1. Pada halaman EMR Serverless Spark, klik Data Development pada panel navigasi di sebelah kiri.

  2. Pada tab Development, klik ikon image.

  3. Masukkan nama, pilih Streaming Job > PySpark sebagai jenis pekerjaan, lalu klik OK.

  4. Pada tab pengembangan baru, konfigurasikan parameter berikut, biarkan parameter lain pada nilai default-nya, lalu klik Save.

    Parameter

    Deskripsi

    Main Python Resource

    Pilih file pyspark_ss_demo.py yang telah Anda unggah pada halaman Resource Upload pada langkah sebelumnya.

    Engine Version

    Versi Spark. Untuk informasi selengkapnya, lihat Versi mesin.

    Runtime Parameters

    Alamat IP internal dari node core-1-1 kluster EMR. Anda dapat melihat alamat tersebut di bawah grup node Core pada halaman Node Management kluster EMR.

    Spark Configurations

    Informasi konfigurasi Spark. Kode berikut memberikan contohnya.

    spark.jars oss://path/to/commons-pool2-2.11.1.jar,oss://path/to/kafka-clients-2.8.1.jar,oss://path/to/spark-sql-kafka-0-10_2.12-3.3.1.jar,oss://path/to/spark-token-provider-kafka-0-10_2.12-3.3.1.jar
    spark.emr.serverless.network.service.name connection_to_emr_kafka
    Catatan
    • spark.jars: Menentukan jalur paket JAR eksternal yang akan dimuat saat waktu proses. Ganti jalur tersebut dengan jalur aktual semua paket JAR yang Anda unggah pada Langkah 4.

    • spark.emr.serverless.network.service.name: Menentukan nama koneksi jaringan. Ganti nama tersebut dengan nama koneksi jaringan yang Anda buat pada Langkah 2.

  5. Klik Publish.

  6. Pada kotak dialog Publish Job, klik OK.

  7. Mulai pekerjaan streaming.

    1. Klik Go To O&M.

    2. Klik Start.

Langkah 7: Lihat log

  1. Klik tab Log Exploration.

  2. Pada tab Log Exploration, Anda dapat melihat informasi tentang eksekusi aplikasi dan hasil yang dikembalikan.

    image

Referensi

Untuk informasi selengkapnya tentang proses pengembangan PySpark, lihat Panduan cepat pengembangan PySpark.