全部產品
Search
文件中心

E-MapReduce:讀寫MongoDB

更新時間:Jul 05, 2025

基於MongoDB官方提供的Spark Connector,EMR Serverless Spark可以在開發時添加對應的配置來串連MongoDB。本文為您介紹在EMR Serverless Spark環境中實現MongoDB的資料讀取和寫入操作。

前提條件

使用限制

僅Serverless Spark以下引擎版本支援本文操作:

  • esr-4.x:esr-4.1.0及之後版本。

  • esr-3.x:esr-3.1.0及之後版本。

  • esr-2.x:esr-2.5.0及之後版本。

操作流程

步驟一:擷取MongoDB Spark Connector JAR並上傳至OSS

  1. 根據MongoDB Spark Connector官方文檔,結合Spark和MongoDB的版本,從Maven下載所需的依賴包,範例中的5.0.1為MongoDB的版本號碼,所需JAR包如下所示:

    • mongo-spark-connector_2.12-10.4.1.jar

    • mongodb-driver-core-5.0.1.jar

    • mongodb-driver-sync-5.0.1.jar

    • bson-5.0.1.jar

  2. 將下載的MongoDB Spark Connector JAR上傳至阿里雲OSS中,上傳操作可以參見簡單上傳

步驟二:建立網路連接

Serverless Spark需要能夠打通與MongoDB之間的網路才可以正常訪問MongoDB。更多網路連接資訊,請參見EMR Serverless Spark與其他VPC間網路互連

重要

配置安全性群組規則時,連接埠範圍請根據實際需求選擇性開放必要的連接埠。連接埠範圍的取值為1-65535。

步驟三:Serverless Spark讀取MongoDB

  1. 建立Notebook會話,詳情請參見管理Notebook會話

    建立會話時,在引擎版本下拉式清單中選擇支援本文操作的引擎版本,在網路連接中選擇步驟二中建立好的網路連接,並在Spark配置中添加以下參數來載入MongoDB Spark Connector。

    spark.mongodb.write.connection.uri                mongodb://<MongoDB地址>:27017
    spark.mongodb.read.connection.uri                 mongodb://<MongoDB地址>:27017
    spark.emr.serverless.user.defined.jars            oss://<bucketname>/path/to/mongo-spark-connector_2.12-10.4.1.jar,oss://<bucketname>/path/to/mongodb-driver-core-5.0.1.jar,oss://<bucketname>/path/to/mongodb-driver-sync-5.0.1.jar,oss://<bucketname>/path/to/bson-5.0.1.jar

    涉及參數說明如下,請根據實際情況替換。

    參數

    描述

    樣本

    spark.mongodb.write.connection.uri

    Spark寫入和讀取資料到MongoDB的串連URI。其中:

    • <MongoDB地址>:MongoDB服務的IP地址。

    • 27017:MongoDB預設監聽的連接埠號碼。

    mongodb://192.168.x.x:27017

    spark.mongodb.read.connection.uri

    spark.emr.serverless.user.defined.jars

    Spark所需的外部依賴項。

    上傳至OSS的檔案,例如,oss://<yourBucketname>/spark/mongodb/mongo-spark-connector_2.12-10.4.1.jar

  2. 資料開發頁面,選擇建立一個Python > Notebook類型的任務,然後在右上方選擇建立的Notebook會話。

    更多操作,請參見管理Notebook會話

  3. 拷貝如下代碼到新增的Notebook頁簽中,並根據需要修改相應的參數資訊,然後單擊運行

    df = spark.read \
        .format("mongodb") \
        .option("database", "<yourDatabase>") \
        .option("collection", "<yourCollection>") \
        .load()
    
    df.printSchema()
    df.show()
    

    涉及參數資訊說明如下,請根據實際情況填寫。

    參數

    描述

    <yourDatabase>

    MongoDB服務中實際的資料庫名稱。例如,本文為mongo_table。

    <yourCollection>

    MongoDB服務中實際的集合名稱。例如,本文為MongoCollection。

    如果能夠正常返回已有的資料,則表明配置正確。

    image

步驟四:Serverless Spark寫入MongoDB

拷貝如下代碼到前一個步驟中新增的Notebook頁簽中,並根據需要修改相應的參數資訊,然後單擊運行

from pyspark.sql import Row

data = [
    Row(name="Sam", age=25, city="New York"),
    Row(name="Charlie", age=35, city="Chicago")
]

df = spark.createDataFrame(data)
df.show()

df.write \
    .format("mongodb") \
    .option("database", "<yourDatabase>") \
    .option("collection", "<yourCollection>") \
    .mode("append") \
    .save()
    

如果能夠正常返回寫入的資料,則表明配置正確。

image