全部產品
Search
文件中心

E-MapReduce:通過Serverless Spark提交PySpark流任務

更新時間:Aug 28, 2025

在巨量資料快速發展的時代,串流技術對於即時資料分析至關重要。EMR Serverless Spark提供了一個強大而可擴充的平台,它不僅簡化了即時資料處理流程,還免去了伺服器管理的煩惱,提升了效率。本文將指導您使用EMR Serverless Spark提交PySpark流式任務,展示其在流處理方面的易用性和可營運性。

前提條件

已建立工作空間,詳情請參見建立工作空間

操作流程

步驟一:建立即時資料流叢集併產生訊息

  1. 在EMR on ECS頁面,建立包含Kafka服務的即時資料流叢集,詳情請參見建立叢集

  2. 登入EMR叢集的Master節點,詳情請參見登入叢集

  3. 執行以下命令,切換目錄。

    cd /var/log/emr/taihao_exporter
  4. 執行以下命令,建立Topic。

    # 建立名為taihaometrics的Topic,分區數10,副本因子2。
    kafka-topics.sh --partitions 10 --replication-factor 2 --bootstrap-server core-1-1:9092 --topic taihaometrics --create
  5. 執行以下命令,發送訊息。

    # 使用kafka-console-producer發送訊息到taihaometrics Topic。
    tail -f metrics.log | kafka-console-producer.sh --broker-list core-1-1:9092 --topic taihaometrics

步驟二:新增網路連接

  1. 進入網路連接頁面。

    1. 在EMR控制台的左側導覽列,選擇EMR Serverless > Spark

    2. Spark頁面,單擊目標工作空間名稱。

    3. EMR Serverless Spark頁面,單擊左側導覽列中的網路連接

  2. 網路連接頁面,單擊新增網路連接

  3. 新增網路連接對話方塊中,配置以下資訊,單擊確定

    參數

    說明

    串連名稱

    輸入新增串連的名稱。例如,connection_to_emr_kafka。

    專用網路

    選擇與EMR叢集相同的專用網路。

    如果當前沒有可選擇的專用網路,請單擊建立專用網路,前往專用網路控制台建立,詳情請參見建立和管理專用網路

    交換器

    選擇與EMR叢集部署在同一專用網路下的相同交換器。

    如果當前可用性區域沒有交換器,請單擊虛擬交換器,前往專用網路控制台建立,詳情請參見建立和管理交換器

    狀態顯示為已成功時,表示新增網路連接成功。

步驟三:為EMR叢集添加安全性群組規則

  1. 擷取叢集節點交換器的網段。

    您可以在節點管理頁面,單擊節點群組名稱,查看關聯的交換器資訊,然後登入專用網路管理主控台,在交換器頁面擷取交換器的網段。

    image

  2. 添加安全性群組規則。

    1. 叢集管理頁面,單擊目的地組群的叢集ID。

    2. 基礎資訊頁面,單擊叢集安全性群組後面的連結。

    3. 安全性群組詳情頁面的訪問規則地區,單擊增加規則,填寫以下資訊,然後單擊確定

      參數

      說明

      訪問來源

      填寫前一步驟中擷取的指定交換器的網段。

      重要

      為防止被外部的使用者攻擊導致安全問題,授權對象禁止填寫為0.0.0.0/0。

      訪問目的(本執行個體)

      填寫9092連接埠。

步驟四:上傳JAR包至OSS

解壓檔案kafka.zip,並將檔案中的所有JAR包上傳至OSS,上傳操作請參見簡單上傳

步驟五:上傳資源檔

  1. 在EMR Serverless Spark頁面,單擊左側導覽列中的檔案管理

  2. 檔案管理頁面,單擊上傳檔案

  3. 上傳檔案對話方塊中,單擊待上傳檔案地區選取項目pyspark_ss_demo.py檔案。

步驟六:建立並啟動流任務

  1. 在EMR Serverless Spark頁面,單擊左側的資料開發

  2. 開發目錄頁簽下,單擊image表徵圖。

  3. 輸入名稱,任務類型選擇流任務 > PySpark,然後單擊確定

  4. 在建立的開發中,配置以下資訊,其餘參數無需配置,然後單擊儲存

    參數

    說明

    主Python資源

    選擇前一個步驟中在資源上傳頁面上傳的pyspark_ss_demo.py檔案。

    引擎版本

    Spark的版本,詳情請參見引擎版本介紹

    運行參數

    EMR叢集core-1-1節點的內網IP地址。您可以在EMR叢集的節點管理頁面的Core節點群組下查看。

    Spark配置

    Spark的配置資訊。本文樣本如下。

    spark.jars oss://path/to/commons-pool2-2.11.1.jar,oss://path/to/kafka-clients-2.8.1.jar,oss://path/to/spark-sql-kafka-0-10_2.12-3.3.1.jar,oss://path/to/spark-token-provider-kafka-0-10_2.12-3.3.1.jar
    spark.emr.serverless.network.service.name connection_to_emr_kafka
    說明
    • spark.jars:指定Spark任務運行時需要載入的外部JAR包路徑。請根據實際情況替換為步驟四中上傳的所有JAR包檔案路徑。

    • spark.emr.serverless.network.service.name:指定網路連接的名稱。請根據實際情況替換為步驟二中建立的網路連接名稱。

  5. 單擊發布

  6. 發布任務對話方塊中,單擊確定

  7. 啟動流任務。

    1. 單擊前往營運

    2. 單擊啟動

步驟七:查看日誌

  1. 單擊日誌探查頁簽。

  2. 日誌探查頁簽,您可以看到應用程式執行的相關資訊以及返回的結果。

    image

相關文檔

PySpark的開發流程樣本,請參見PySpark開發快速入門