在巨量資料快速發展的時代,串流技術對於即時資料分析至關重要。EMR Serverless Spark提供了一個強大而可擴充的平台,它不僅簡化了即時資料處理流程,還免去了伺服器管理的煩惱,提升了效率。本文將指導您使用EMR Serverless Spark提交PySpark流式任務,展示其在流處理方面的易用性和可營運性。
前提條件
已建立工作空間,詳情請參見建立工作空間。
操作流程
步驟一:建立即時資料流叢集併產生訊息
在EMR on ECS頁面,建立包含Kafka服務的即時資料流叢集,詳情請參見建立叢集。
登入EMR叢集的Master節點,詳情請參見登入叢集。
執行以下命令,切換目錄。
cd /var/log/emr/taihao_exporter執行以下命令,建立Topic。
# 建立名為taihaometrics的Topic,分區數10,副本因子2。 kafka-topics.sh --partitions 10 --replication-factor 2 --bootstrap-server core-1-1:9092 --topic taihaometrics --create執行以下命令,發送訊息。
# 使用kafka-console-producer發送訊息到taihaometrics Topic。 tail -f metrics.log | kafka-console-producer.sh --broker-list core-1-1:9092 --topic taihaometrics
步驟二:新增網路連接
進入網路連接頁面。
在EMR控制台的左側導覽列,選擇。
在Spark頁面,單擊目標工作空間名稱。
在EMR Serverless Spark頁面,單擊左側導覽列中的網路連接。
在網路連接頁面,單擊新增網路連接。
在新增網路連接對話方塊中,配置以下資訊,單擊確定。
參數
說明
串連名稱
輸入新增串連的名稱。例如,connection_to_emr_kafka。
專用網路
選擇與EMR叢集相同的專用網路。
如果當前沒有可選擇的專用網路,請單擊建立專用網路,前往專用網路控制台建立,詳情請參見建立和管理專用網路。
交換器
選擇與EMR叢集部署在同一專用網路下的相同交換器。
如果當前可用性區域沒有交換器,請單擊虛擬交換器,前往專用網路控制台建立,詳情請參見建立和管理交換器。
當狀態顯示為已成功時,表示新增網路連接成功。
步驟三:為EMR叢集添加安全性群組規則
擷取叢集節點交換器的網段。
您可以在節點管理頁面,單擊節點群組名稱,查看關聯的交換器資訊,然後登入專用網路管理主控台,在交換器頁面擷取交換器的網段。

添加安全性群組規則。
在叢集管理頁面,單擊目的地組群的叢集ID。
在基礎資訊頁面,單擊叢集安全性群組後面的連結。
在安全性群組詳情頁面的訪問規則地區,單擊增加規則,填寫以下資訊,然後單擊確定。
參數
說明
訪問來源
填寫前一步驟中擷取的指定交換器的網段。
重要為防止被外部的使用者攻擊導致安全問題,授權對象禁止填寫為0.0.0.0/0。
訪問目的(本執行個體)
填寫9092連接埠。
步驟四:上傳JAR包至OSS
步驟五:上傳資源檔
在EMR Serverless Spark頁面,單擊左側導覽列中的檔案管理。
在檔案管理頁面,單擊上傳檔案。
在上傳檔案對話方塊中,單擊待上傳檔案地區選取項目pyspark_ss_demo.py檔案。
步驟六:建立並啟動流任務
在EMR Serverless Spark頁面,單擊左側的資料開發。
在開發目錄頁簽下,單擊
表徵圖。輸入名稱,任務類型選擇,然後單擊確定。
在建立的開發中,配置以下資訊,其餘參數無需配置,然後單擊儲存。
參數
說明
主Python資源
選擇前一個步驟中在資源上傳頁面上傳的pyspark_ss_demo.py檔案。
引擎版本
Spark的版本,詳情請參見引擎版本介紹。
運行參數
EMR叢集core-1-1節點的內網IP地址。您可以在EMR叢集的節點管理頁面的Core節點群組下查看。
Spark配置
Spark的配置資訊。本文樣本如下。
spark.jars oss://path/to/commons-pool2-2.11.1.jar,oss://path/to/kafka-clients-2.8.1.jar,oss://path/to/spark-sql-kafka-0-10_2.12-3.3.1.jar,oss://path/to/spark-token-provider-kafka-0-10_2.12-3.3.1.jar spark.emr.serverless.network.service.name connection_to_emr_kafka說明spark.jars:指定Spark任務運行時需要載入的外部JAR包路徑。請根據實際情況替換為步驟四中上傳的所有JAR包檔案路徑。spark.emr.serverless.network.service.name:指定網路連接的名稱。請根據實際情況替換為步驟二中建立的網路連接名稱。
單擊發布。
在發布任務對話方塊中,單擊確定。
啟動流任務。
單擊前往營運。
單擊啟動。
步驟七:查看日誌
單擊日誌探查頁簽。
在日誌探查頁簽,您可以看到應用程式執行的相關資訊以及返回的結果。

相關文檔
PySpark的開發流程樣本,請參見PySpark開發快速入門。