全部產品
Search
文件中心

E-MapReduce:讀寫Kafka

更新時間:Sep 19, 2025

本文介紹了如何在EMR Serverless Spark中開發並運行一個讀寫Kafka的流式任務。通過上傳JAR包、建立流任務並運行,最終可以通過日誌探查或控制台查看結果,驗證資料讀取和寫入的正確性。

前提條件

操作流程

步驟一:上傳Kafka相關JAR至OSS

Serverless Spark引擎在esr-2.8.0、esr-3.4.0、esr-4.4.0及以上版本中內建了Kafka JAR,您可以直接跳過該步驟。如果是其他版本的引擎,請按照以下步驟進行操作:

  1. 需根據所使用的Spark版本手動添加以下依賴。

  2. 解壓後,上傳所有JAR包至OSS。本文以kafka-spark35-jars.zip為例。

    hadoop fs -put /root/spark-sql-kafka-0-10_2.12-3.5.3.jar oss://<YOUR_BUCKET>.<region>.oss-dls.aliyuncs.com/
    hadoop fs -put /root/kafka-clients-3.4.1.jar oss://<YOUR_BUCKET>.<region>.oss-dls.aliyuncs.com/
    hadoop fs -put /root/spark-token-provider-kafka-0-10_2.12-3.5.3.jar oss://<YOUR_BUCKET>.<region>.oss-dls.aliyuncs.com/
    hadoop fs -put /root/commons-pool2-2.11.1.jar oss://<YOUR_BUCKET>.<region>.oss-dls.aliyuncs.com/
    也可以通過OSS控制台或其他方式進行上傳。

步驟二:建立網路連接

Serverless Spark需要能夠打通與Kafka之間的網路才可以正常訪問Kafka服務。更多網路連接資訊,請參見EMR Serverless Spark與其他VPC間網路互連

步驟三:準備測試檔案

Scala程式碼範例如下,更多樣本參考Spark讀寫Kafka。請單擊下載SparkExample-1.0-SNAPSHOT-jar-with-dependencies.jar,以便直接使用測試JAR包或自行進行打包,pom檔案的相關配置詳見附錄

程式碼範例

讀Kafka

訂閱雲訊息佇列Kafka topic訊息,以[key,value]的形式在控制台輸出。

object StreamingReadKafka {

  def main(args: Array[String]): Unit = {

    import org.apache.spark.sql.SparkSession

    val servers = args(0)
    val topic = args(1)

    val spark = SparkSession.builder()
      .appName("test read kafka")
      .getOrCreate()

    import spark.implicits._

    val df = spark
      .readStream
      .format("kafka")
      .option("kafka.bootstrap.servers", servers)
      .option("subscribe", topic)
      .load()

    val query = df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
      .as[(String, String)]
      .writeStream
      .format("console")
      .start()

    query.awaitTermination()
  }
}

寫Kfaka

從 Spark 內建的資料來源讀取資料,產生包含 value 和 timestamp 的記錄,並將value 列同時用作 key 和 value 列,以每秒產生 1 條記錄的速率寫入Kafka。

object StreamingWriteKafka {

  def main(args: Array[String]): Unit = {

    import org.apache.spark.sql.SparkSession
    import org.apache.spark.sql.functions._
    import org.apache.spark.sql.streaming.Trigger

    val servers = args(0)
    val topic = args(1)
    val checkpointDir = args(2)

    val spark = SparkSession.builder()
      .appName("test write kafka")
      .getOrCreate()

    val df = spark.readStream
      .format("rate")
      .option("rowsPerSecond", "1")
      .load()
      .withColumn("key", col("value"))
      .withColumn("value", col("value"))

    val query = df
      .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
      .writeStream
      .format("kafka")
      .option("kafka.bootstrap.servers", servers)
      .option("topic", topic)
      .option("checkpointLocation", checkpointDir)
      .trigger(Trigger.ProcessingTime("10 seconds"))
      .start()

    query.awaitTermination()
  }
}

步驟四:上傳JAR包

  1. 進入資源上傳頁面。

    1. 登入E-MapReduce控制台

    2. 在左側導覽列,選擇EMR Serverless > Spark

    3. Spark頁面,單擊目標工作空間名稱。

    4. 在EMR Serverless Spark頁面,單擊左側導覽列中的檔案管理

  2. 檔案管理頁面,單擊上傳檔案

  3. 上傳檔案對話方塊中,單擊待上傳檔案地區選取項目JAR包,或直接拖拽JAR包到待上傳檔案地區。

步驟五:建立流任務並運行

  1. 在EMR Serverless Spark頁面,單擊左側導覽列中的資料開發

  2. 開發目錄頁簽下,單擊image(建立)表徵圖。

  3. 在彈出的對話方塊中,輸入名稱,根據實際需求在流任務中選擇JAR類型,然後單擊確定

  4. 在右上方選擇目標隊列。

    添加隊列的具體操作,請參見管理資源隊列

  5. 在建立的開發頁簽中,配置以下資訊,其餘參數無需配置,然後單擊發布

    參數

    說明

    主jar資源

    選擇前一個步驟中上傳的JAR包。本文樣本是SparkExample-1.0-SNAPSHOT-jar-with-dependencies.jar

    引擎版本

    選擇合適的Spark版本。本文樣本是esr-4.3.0

    Main Class

    提交Spark任務時所指定的主類。

    • 讀Kafka樣本值:org.example.StreamingReadKafka

    • 寫 Kafka樣本值:org.example.StreamingWriteKafka

    運行參數

    傳遞給主類自訂參數。多個參數使用空格分隔。

    • Kafka存取點資訊,本文樣本值是:alikafka-serverless-cn-xxxxxx-1000-vpc.alikafka.aliyuncs.com:9092

    • 訂閱Kafka topic,本文樣本值是:test

    • checkpoint儲存路徑:oss://<YOUR_BUCKET_PATH>/。僅寫Kafka需要該參數。

    網路連接

    選擇步驟二中建立的網路。

    Spark配置

    通過spark.emr.serverless.user.defined.jars參數指定Kafka相關JAR包,多個JAR用逗號分隔。

    spark.emr.serverless.user.defined.jars  oss://<YOUR_BUCKET_PATH>/commons-pool2-2.11.1.jar,oss://<YOUR_BUCKET_PATH>/kafka-clients-3.4.1.jar,oss://<YOUR_BUCKET_PATH>/spark-sql-kafka-0-10_2.12-3.5.3.jar,oss://<YOUR_BUCKET_PATH>/spark-token-provider-kafka-0-10_2.12-3.5.3.jar
  6. 發布後,單擊前往營運,在跳轉頁面,單擊啟動

步驟六:查看結果

讀Kafka

  1. 啟動任務後,在任務編排 > 流式任務頁簽單擊目標任務。

  2. 日誌探查頁簽,您可以查看相關的日誌資訊。

寫Kafka

  1. 啟動任務後,登入Kafka控制台,進入到Kafka執行個體頁面。

  2. 訊息查詢頁簽,單擊查詢,您可以查看到Spark寫入Kafka的訊息。

附錄

本文樣本pom檔案如下。

<dependencies>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-core_2.12</artifactId>
            <version>3.5.2</version>
            <scope>provided</scope>
        </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-sql_2.12</artifactId>
            <version>3.5.2</version>
            <scope>provided</scope>
        </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-hive_2.12</artifactId>
            <version>3.5.2</version>
            <scope>provided</scope>
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>net.alchim31.maven</groupId>
                <artifactId>scala-maven-plugin</artifactId>
                <version>3.2.2</version>
                <executions>
                    <execution>
                        <goals>
                            <goal>compile</goal>
                            <goal>testCompile</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>

            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-assembly-plugin</artifactId>
                <version>3.3.0</version>
                <configuration>
                    <descriptorRefs>
                        <descriptorRef>jar-with-dependencies</descriptorRef>
                    </descriptorRefs>
                    <archive>
                        <manifest>
                            <mainClass>com.example.Main</mainClass>
                        </manifest>
                    </archive>
                </configuration>
                <executions>
                    <execution>
                        <id>make-assembly</id>
                        <phase>package</phase>
                        <goals>
                            <goal>single</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>