基於MongoDB官方提供的Spark Connector,EMR Serverless Spark可以在開發時添加對應的配置來串連MongoDB。本文為您介紹在EMR Serverless Spark環境中實現MongoDB的資料讀取和寫入操作。
前提條件
使用限制
僅Serverless Spark以下引擎版本支援本文操作:
esr-4.x:esr-4.1.0及之後版本。
esr-3.x:esr-3.1.0及之後版本。
esr-2.x:esr-2.5.0及之後版本。
操作流程
步驟一:擷取MongoDB Spark Connector JAR並上傳至OSS
根據MongoDB Spark Connector官方文檔,結合Spark和MongoDB的版本,從Maven下載所需的依賴包,範例中的5.0.1為MongoDB的版本號碼,所需JAR包如下所示:
mongo-spark-connector_2.12-10.4.1.jarmongodb-driver-core-5.0.1.jarmongodb-driver-sync-5.0.1.jarbson-5.0.1.jar
將下載的MongoDB Spark Connector JAR上傳至阿里雲OSS中,上傳操作可以參見簡單上傳。
步驟二:建立網路連接
Serverless Spark需要能夠打通與MongoDB之間的網路才可以正常訪問MongoDB。更多網路連接資訊,請參見EMR Serverless Spark與其他VPC間網路互連。
配置安全性群組規則時,連接埠範圍請根據實際需求選擇性開放必要的連接埠。連接埠範圍的取值為1-65535。
步驟三:Serverless Spark讀取MongoDB表
建立Notebook會話,詳情請參見管理Notebook會話。
建立會話時,在引擎版本下拉式清單中選擇支援本文操作的引擎版本,在網路連接中選擇步驟二中建立好的網路連接,並在Spark配置中添加以下參數來載入MongoDB Spark Connector。
spark.mongodb.write.connection.uri mongodb://<MongoDB地址>:27017 spark.mongodb.read.connection.uri mongodb://<MongoDB地址>:27017 spark.emr.serverless.user.defined.jars oss://<bucketname>/path/to/mongo-spark-connector_2.12-10.4.1.jar,oss://<bucketname>/path/to/mongodb-driver-core-5.0.1.jar,oss://<bucketname>/path/to/mongodb-driver-sync-5.0.1.jar,oss://<bucketname>/path/to/bson-5.0.1.jar涉及參數說明如下,請根據實際情況替換。
參數
描述
樣本
spark.mongodb.write.connection.uriSpark寫入和讀取資料到MongoDB的串連URI。其中:
<MongoDB地址>:MongoDB服務的IP地址。27017:MongoDB預設監聽的連接埠號碼。
mongodb://192.168.x.x:27017
spark.mongodb.read.connection.urispark.emr.serverless.user.defined.jarsSpark所需的外部依賴項。
上傳至OSS的檔案,例如,
oss://<yourBucketname>/spark/mongodb/mongo-spark-connector_2.12-10.4.1.jar。在資料開發頁面,選擇建立一個類型的任務,然後在右上方選擇建立的Notebook會話。
更多操作,請參見管理Notebook會話。
拷貝如下代碼到新增的Notebook頁簽中,並根據需要修改相應的參數資訊,然後單擊運行。
df = spark.read \ .format("mongodb") \ .option("database", "<yourDatabase>") \ .option("collection", "<yourCollection>") \ .load() df.printSchema() df.show()涉及參數資訊說明如下,請根據實際情況填寫。
參數
描述
<yourDatabase>MongoDB服務中實際的資料庫名稱。例如,本文為mongo_table。
<yourCollection>MongoDB服務中實際的集合名稱。例如,本文為MongoCollection。
如果能夠正常返回已有的資料,則表明配置正確。

步驟四:Serverless Spark寫入MongoDB表
拷貝如下代碼到前一個步驟中新增的Notebook頁簽中,並根據需要修改相應的參數資訊,然後單擊運行。
from pyspark.sql import Row
data = [
Row(name="Sam", age=25, city="New York"),
Row(name="Charlie", age=35, city="Chicago")
]
df = spark.createDataFrame(data)
df.show()
df.write \
.format("mongodb") \
.option("database", "<yourDatabase>") \
.option("collection", "<yourCollection>") \
.mode("append") \
.save()
如果能夠正常返回寫入的資料,則表明配置正確。
