ApsaraDB for MongoDB is an online database service developed based on the Apsara distributed operating system and a high-reliability storage engine. ApsaraDB for MongoDB is fully compatible with the MongoDB protocol and provides stable, reliable, and scalable database services. This topic describes how to use the serverless Spark engine of Data Lake Analytics (DLA) to access ApsaraDB for MongoDB.
DLA is discontinued. AnalyticDB for MySQL supports the features of DLA and provides more features and better performance. For more information about how to use AnalyticDB for MySQL, see Access ApsaraDB for MongoDB.
Prerequisites
Object Storage Service (OSS) is activated. For more information, see Activate OSS.
An ApsaraDB for MongoDB instance is created. For more information, see Create a standalone instance.
The following operations are performed to create the data that is required to access the ApsaraDB for MongoDB instance:
Connect to an ApsaraDB for MongoDB sharded cluster instance by using Data Management (DMS), run commands to create the config database in DMS, and then insert the following test data into the database:
db.createCollection("test_collection"); db.test_collection.insert( {"id":"id01","name":"name01"}); db.test_collection.insert( {"id":"id02","name":"name02"}); db.test_collection.insert( {"id":"id03","name":"name03"}); db.test_collection.find().pretty()
The security group ID and vSwitch ID that are used by the serverless Spark engine of DLA to access the ApsaraDB for MongoDB instance are obtained. For more information, see Configure the network of data sources.
The security group ID and vSwitch ID that are used by the serverless Spark engine of DLA to access the ApsaraDB for MongoDB instance are added to a whitelist of the ApsaraDB for MongoDB instance. For more information, see Configure a whitelist for an ApsaraDB for MongoDB instance.
Procedure
Compile the following test code and download the dependencies that are required for accessing the ApsaraDB for MongoDB instance. Then, package the test code and dependencies into a JAR file and upload the file to OSS.
Sample test code:
package com.aliyun.spark import com.mongodb.spark.MongoSpark import com.mongodb.spark.config.{ReadConfig, WriteConfig} import org.apache.spark.SparkConf import org.apache.spark.sql.SparkSession import org.bson.Document object SparkOnMongoDB { def main(args: Array[String]): Unit = { //Obtain the connection string URI, database, and collection of the ApsaraDB for MongoDB instance. val connectionStringURI = args(0) val database = args(1) val collection = args(2) //The name of the table in the serverless Spark engine. var sparkTableName = if (args.size > 3) args(3) else "spark_on_mongodb_sparksession_test01" val sparkSession = SparkSession .builder() .appName("scala spark on MongoDB test") .getOrCreate() //The serverless Spark engine reads data from ApsaraDB for MongoDB in the following methods: //Call the Dataset API operation. //Configure ApsaraDB for MongoDB parameters. val sparkConf = new SparkConf() .set("spark.mongodb.input.uri", connectionStringURI) .set("spark.mongodb.input.database", database) .set("spark.mongodb.input.collection", collection) .set("spark.mongodb.output.uri", connectionStringURI) .set("spark.mongodb.output.database", database) .set("spark.mongodb.output.collection", collection) val readConf = ReadConfig(sparkConf) //Obtain the DataFrame. val df = MongoSpark.load(sparkSession, readConf) df.show(1) //Use the MongoSpark.save method to import data to ApsaraDB for MongoDB. val docs = """ |{"id": "id105", "name": "name105"} |{"id": "id106", "name": "name106"} |{"id": "id107", "name": "name107"} |""" .trim.stripMargin.split("[\\r\\n]+").toSeq val writeConfig: WriteConfig = WriteConfig(Map( "uri" -> connectionStringURI, "spark.mongodb.output.database" -> database, "spark.mongodb.output.collection"-> collection)) MongoSpark.save(sparkSession.sparkContext.parallelize(docs.map(Document.parse)), writeConfig) //Execute SQL statements with a schema specified or not specified. //If you specify a schema in SQL statements, the fields in the specified schema must be consistent with those in the schema of the collection in ApsaraDB for MongoDB. var createCmd = s"""CREATE TABLE ${sparkTableName} ( | id String, | name String | ) USING com.mongodb.spark.sql | options ( | uri '$connectionStringURI', | database '$database', | collection '$collection' | )""".stripMargin sparkSession.sql(createCmd) var querySql = "select * from " + sparkTableName + " limit 1" sparkSession.sql(querySql).show //If you do not specify a schema in SQL statements, the serverless Spark engine maps its schema to the schema of the collection in ApsaraDB for MongoDB. sparkTableName = sparkTableName + "_noschema" createCmd = s"""CREATE TABLE ${sparkTableName} USING com.mongodb.spark.sql | options ( | uri '$connectionStringURI', | database '$database', | collection '$collection' | )""".stripMargin sparkSession.sql(createCmd) querySql = "select * from " + sparkTableName + " limit 1" sparkSession.sql(querySql).show sparkSession.stop() } }
POM file that contains dependencies of ApsaraDB for MongoDB:
<dependency> <groupId>org.mongodb.spark</groupId> <artifactId>mongo-spark-connector_2.11</artifactId> <version>2.4.2</version> </dependency> <dependency> <groupId>org.mongodb</groupId> <artifactId>mongo-java-driver</artifactId> <version>3.8.2</version> </dependency>
Log on to the DLA console.
In the top navigation bar, select the region in which the ApsaraDB for MongoDB instance resides.
In the left-side navigation pane, choose .
On the Parameter Configuration page, click Create a job Template.
In the Create a job Template dialog box, configure the parameters as prompted and click OK.
On the Job List tab, click the name of the Spark job that you created and enter the following content of the job in the code editor. Then, save and submit the Spark job.
{ "args": [ "mongodb://root:xxx@xxx:3717,xxx:3717/xxx", #The connection string URI of the ApsaraDB for MongoDB instance. "config", #The name of the database in the ApsaraDB for MongoDB instance. "test_collection", #The name of the collection of the ApsaraDB for MongoDB instance. "spark_on_mongodb" #The name of the Spark table that is mapped to the collection of the ApsaraDB for MongoDB instance. ], "file": "oss://spark_test/jars/mongodb/spark-examples-0.0.1-SNAPSHOT.jar", #The OSS directory in which the test package is saved. "name": "mongodb-test", "jars": [ "oss://spark_test/jars/mongodb/mongo-java-driver-3.8.2.jar", ##The OSS directory in which the test dependencies are saved. "oss://spark_test/jars/mongodb/mongo-spark-connector_2.11-2.4.2.jar" ##The OSS directory in which the test dependencies are saved. ], "className": "com.aliyun.spark.SparkOnMongoDB", "conf": { "spark.driver.resourceSpec": "small", "spark.executor.instances": 2, "spark.executor.resourceSpec": "small", "spark.dla.eni.enable": "true", "spark.dla.eni.vswitch.id": "vsw-xxx", #The ID of the vSwitch in the VPC in which the ApsaraDB for MongoDB instance resides. "spark.dla.eni.security.group.id": "sg-xxx" #The ID of the security group in the VPC in which the ApsaraDB for MongoDB instance resides. } }
Result
After the job is complete, find the job and click Log in the Operation column to view the logs of the job. If the following logs appear, the job succeeds.