本文介紹了如何在EMR Serverless Spark中開發並運行一個讀寫Kafka的流式任務。通過上傳JAR包、建立流任務並運行,最終可以通過日誌探查或控制台查看結果,驗證資料讀取和寫入的正確性。
前提條件
操作流程
步驟一:上傳Kafka相關JAR至OSS
Serverless Spark引擎在esr-2.8.0、esr-3.4.0、esr-4.4.0及以上版本中內建了Kafka JAR,您可以直接跳過該步驟。如果是其他版本的引擎,請按照以下步驟進行操作:
需根據所使用的Spark版本手動添加以下依賴。
Spark 3.5版本:kafka-spark35-jars.zip。
Spark 3.4版本:kafka-spark34-jars.zip。
Spark 3.3版本:kafka-spark33-jars.zip。
解壓後,上傳所有JAR包至OSS。本文以kafka-spark35-jars.zip為例。
hadoop fs -put /root/spark-sql-kafka-0-10_2.12-3.5.3.jar oss://<YOUR_BUCKET>.<region>.oss-dls.aliyuncs.com/ hadoop fs -put /root/kafka-clients-3.4.1.jar oss://<YOUR_BUCKET>.<region>.oss-dls.aliyuncs.com/ hadoop fs -put /root/spark-token-provider-kafka-0-10_2.12-3.5.3.jar oss://<YOUR_BUCKET>.<region>.oss-dls.aliyuncs.com/ hadoop fs -put /root/commons-pool2-2.11.1.jar oss://<YOUR_BUCKET>.<region>.oss-dls.aliyuncs.com/也可以通過OSS控制台或其他方式進行上傳。
步驟二:建立網路連接
Serverless Spark需要能夠打通與Kafka之間的網路才可以正常訪問Kafka服務。更多網路連接資訊,請參見EMR Serverless Spark與其他VPC間網路互連。
步驟三:準備測試檔案
Scala程式碼範例如下,更多樣本參考Spark讀寫Kafka。請單擊下載SparkExample-1.0-SNAPSHOT-jar-with-dependencies.jar,以便直接使用測試JAR包或自行進行打包,pom檔案的相關配置詳見附錄。
程式碼範例
讀Kafka
訂閱雲訊息佇列Kafka topic訊息,以[key,value]的形式在控制台輸出。
object StreamingReadKafka {
def main(args: Array[String]): Unit = {
import org.apache.spark.sql.SparkSession
val servers = args(0)
val topic = args(1)
val spark = SparkSession.builder()
.appName("test read kafka")
.getOrCreate()
import spark.implicits._
val df = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", servers)
.option("subscribe", topic)
.load()
val query = df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
.as[(String, String)]
.writeStream
.format("console")
.start()
query.awaitTermination()
}
}寫Kfaka
從 Spark 內建的資料來源讀取資料,產生包含 value 和 timestamp 的記錄,並將value 列同時用作 key 和 value 列,以每秒產生 1 條記錄的速率寫入Kafka。
object StreamingWriteKafka {
def main(args: Array[String]): Unit = {
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions._
import org.apache.spark.sql.streaming.Trigger
val servers = args(0)
val topic = args(1)
val checkpointDir = args(2)
val spark = SparkSession.builder()
.appName("test write kafka")
.getOrCreate()
val df = spark.readStream
.format("rate")
.option("rowsPerSecond", "1")
.load()
.withColumn("key", col("value"))
.withColumn("value", col("value"))
val query = df
.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", servers)
.option("topic", topic)
.option("checkpointLocation", checkpointDir)
.trigger(Trigger.ProcessingTime("10 seconds"))
.start()
query.awaitTermination()
}
}步驟四:上傳JAR包
進入資源上傳頁面。
在左側導覽列,選擇。
在Spark頁面,單擊目標工作空間名稱。
在EMR Serverless Spark頁面,單擊左側導覽列中的檔案管理。
在檔案管理頁面,單擊上傳檔案。
在上傳檔案對話方塊中,單擊待上傳檔案地區選取項目JAR包,或直接拖拽JAR包到待上傳檔案地區。
步驟五:建立流任務並運行
在EMR Serverless Spark頁面,單擊左側導覽列中的資料開發。
在開發目錄頁簽下,單擊
(建立)表徵圖。在彈出的對話方塊中,輸入名稱,根據實際需求在流任務中選擇JAR類型,然後單擊確定。
在右上方選擇目標隊列。
添加隊列的具體操作,請參見管理資源隊列。
在建立的開發頁簽中,配置以下資訊,其餘參數無需配置,然後單擊發布。
參數
說明
主jar資源
選擇前一個步驟中上傳的JAR包。本文樣本是
SparkExample-1.0-SNAPSHOT-jar-with-dependencies.jar。引擎版本
選擇合適的Spark版本。本文樣本是
esr-4.3.0。Main Class
提交Spark任務時所指定的主類。
讀Kafka樣本值:
org.example.StreamingReadKafka。寫 Kafka樣本值:
org.example.StreamingWriteKafka。
運行參數
傳遞給主類自訂參數。多個參數使用空格分隔。
Kafka存取點資訊,本文樣本值是:
alikafka-serverless-cn-xxxxxx-1000-vpc.alikafka.aliyuncs.com:9092。訂閱Kafka topic,本文樣本值是:
test。checkpoint儲存路徑:
oss://<YOUR_BUCKET_PATH>/。僅寫Kafka需要該參數。
網路連接
選擇步驟二中建立的網路。
Spark配置
通過
spark.emr.serverless.user.defined.jars參數指定Kafka相關JAR包,多個JAR用逗號分隔。spark.emr.serverless.user.defined.jars oss://<YOUR_BUCKET_PATH>/commons-pool2-2.11.1.jar,oss://<YOUR_BUCKET_PATH>/kafka-clients-3.4.1.jar,oss://<YOUR_BUCKET_PATH>/spark-sql-kafka-0-10_2.12-3.5.3.jar,oss://<YOUR_BUCKET_PATH>/spark-token-provider-kafka-0-10_2.12-3.5.3.jar發布後,單擊前往營運,在跳轉頁面,單擊啟動。
步驟六:查看結果
讀Kafka
啟動任務後,在頁簽單擊目標任務。
在日誌探查頁簽,您可以查看相關的日誌資訊。
寫Kafka
啟動任務後,登入Kafka控制台,進入到Kafka執行個體頁面。
在訊息查詢頁簽,單擊查詢,您可以查看到Spark寫入Kafka的訊息。
附錄
本文樣本pom檔案如下。
<dependencies>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.12</artifactId>
<version>3.5.2</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.12</artifactId>
<version>3.5.2</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-hive_2.12</artifactId>
<version>3.5.2</version>
<scope>provided</scope>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>net.alchim31.maven</groupId>
<artifactId>scala-maven-plugin</artifactId>
<version>3.2.2</version>
<executions>
<execution>
<goals>
<goal>compile</goal>
<goal>testCompile</goal>
</goals>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-assembly-plugin</artifactId>
<version>3.3.0</version>
<configuration>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
<archive>
<manifest>
<mainClass>com.example.Main</mainClass>
</manifest>
</archive>
</configuration>
<executions>
<execution>
<id>make-assembly</id>
<phase>package</phase>
<goals>
<goal>single</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>