ApsaraDB for MongoDB is an online database service developed based on the Apsara distributed operating system and a high-reliability storage engine. ApsaraDB for MongoDB is fully compatible with the MongoDB protocol and provides stable, reliable, and scalable database services. This topic describes how to use the serverless Spark engine of Data Lake Analytics (DLA) to access ApsaraDB for MongoDB.

Prerequisites

  • Object Storage Service (OSS) is activated. For more information, see Activate OSS.
  • An ApsaraDB for MongoDB instance is created. For more information, see Create an instance.
  • You have performed the following operations to create the data required for accessing the ApsaraDB for MongoDB instance:
    Connect to a sharded cluster ApsaraDB for MongoDB instance by using DMS, run the following command to create the config database in the Data Management Service (DMS) console, and then insert the following test data into the database:
    db.createCollection("test_collection");
    db.test_collection.insert( {"id":"id01","name":"name01"});
    db.test_collection.insert( {"id":"id02","name":"name02"});
    db.test_collection.insert( {"id":"id03","name":"name03"});
    db.test_collection.find().pretty()
  • The security group ID and vSwitch ID that are used by the serverless Spark engine of DLA to access the ApsaraDB for MongoDB instance are obtained. For more information, see Access your VPC.
  • The security group ID and vSwitch ID that are used by the serverless Spark engine of DLA to access the ApsaraDB for MongoDB instance are added to a whitelist of the ApsaraDB for MongoDB instance. For more information, see Configure a whitelist for an ApsaraDB for MongoDB instance.

Procedure

  1. Compile the following test code and download the dependencies required for accessing the ApsaraDB for MongoDB instance. Then, package the test code and dependencies into a JAR file and upload this file to OSS.
    Sample test code:
    package com.aliyun.spark
    
    import com.mongodb.spark.MongoSpark
    import com.mongodb.spark.config.{ ReadConfig, WriteConfig}
    import org.apache.spark.SparkConf
    import org.apache.spark.sql.SparkSession
    import org.bson.Document
    
    object SparkOnMongoDB {
    
      def main(args: Array[String]): Unit = {
        //Obtain the connection string URI, database, and collection of the ApsaraDB for MongoDB instance.
        val connectionStringURI = args(0)
        val database = args(1)
        val collection = args(2)
        //The name of the table in the serverless Spark engine.
        var sparkTableName = if (args.size > 3) args(3) else "spark_on_mongodb_sparksession_test01"
    
        val sparkSession = SparkSession
          .builder()
          .appName("scala spark on MongoDB test")
          .getOrCreate()
    
        //The serverless Spark engine reads data from ApsaraDB for MongoDB in the following methods:
        //Call the Dataset API operation.
        //Configure ApsaraDB for MongoDB parameters.
        val sparkConf = new SparkConf()
          .set("spark.mongodb.input.uri", connectionStringURI)
          .set("spark.mongodb.input.database", database)
          .set("spark.mongodb.input.collection", collection)
          .set("spark.mongodb.output.uri", connectionStringURI)
          .set("spark.mongodb.output.database", database)
          .set("spark.mongodb.output.collection", collection)
        val readConf = ReadConfig(sparkConf)
        //Obtain the DataFrame.
        val df = MongoSpark.load(sparkSession, readConf)
        df.show(1)
    
        //Use the MongoSpark.save method to import data to ApsaraDB for MongoDB.
        val docs =
          """
            |{"id": "id105", "name": "name105"}
            |{"id": "id106", "name": "name106"}
            |{"id": "id107", "name": "name107"}
            |"""
            .trim.stripMargin.split("[\\r\\n]+").toSeq
        val writeConfig: WriteConfig = WriteConfig(Map(
          "uri" -> connectionStringURI,
          "spark.mongodb.output.database" -> database,
          "spark.mongodb.output.collection"-> collection))
        MongoSpark.save(sparkSession.sparkContext.parallelize(docs.map(Document.parse)), writeConfig)
    
        //Execute SQL statements with a schema specified or not specified.
        //If you specify a schema in SQL statements, the fields in the specified schema must be consistent with those in the schema of the collection in ApsaraDB for MongoDB.
        var createCmd =
        s"""CREATE TABLE ${sparkTableName} (
           |      id String,
           |      name String
           |    ) USING com.mongodb.spark.sql
           |    options (
           |    uri '$connectionStringURI',
           |    database '$database',
           |    collection '$collection'
           |    )""".stripMargin
    
        sparkSession.sql(createCmd)
        var querySql = "select * from " + sparkTableName + " limit 1"
        sparkSession.sql(querySql).show
    
        //If you do not specify a schema in SQL statements, the serverless Spark engine maps its schema to the schema of the collection in ApsaraDB for MongoDB.
        sparkTableName = sparkTableName + "_noschema"
        createCmd =
          s"""CREATE TABLE ${sparkTableName} USING com.mongodb.spark.sql
             |    options (
             |    uri '$connectionStringURI',
             |    database '$database',
             |    collection '$collection'
             |    )""".stripMargin
    
        sparkSession.sql(createCmd)
        querySql = "select * from " + sparkTableName + " limit 1"
        sparkSession.sql(querySql).show
    
        sparkSession.stop()
    
      }
    }
    POM file that contains dependencies of ApsaraDB for MongoDB:
            <dependency>
                <groupId>org.mongodb.spark</groupId>
                <artifactId>mongo-spark-connector_2.11</artifactId>
                <version>2.4.2</version>
            </dependency>
            <dependency>
                <groupId>org.mongodb</groupId>
                <artifactId>mongo-java-driver</artifactId>
                <version>3.8.2</version>
            </dependency>
  2. Log on to the DLA console.
  3. In the top navigation bar, select the region where the ApsaraDB for MongoDB instance resides.
  4. In the left-side navigation pane, choose Serverless Spark > Submit job.
  5. On the Parameter Configuration page, click Create Job.
  6. In the Create Job dialog box, configure the parameters as prompted and click OK to create a Spark job.
    3
  7. In the Job List navigation tree, click the name of the Spark job that you created and enter the following content of the job in the code editor. Replace the parameter values as required based on the following parameter description. Then, save and submit the job.
    {
        "args": [
            "mongodb://root:xxx@xxx:3717,xxx:3717/xxx",  #The connection string URI of the ApsaraDB for MongoDB cluster.
            "config",  #The name of the database in the ApsaraDB for MongoDB cluster.
            "test_collection",  #The name of the collection of the ApsaraDB for MongoDB cluster.
            "spark_on_mongodb"  #The name of the Spark table that is mapped to the collection of the ApsaraDB for MongoDB cluster.
        ],
        "file": "oss://spark_test/jars/mongodb/spark-examples-0.0.1-SNAPSHOT.jar",  #The OSS directory in which the test package is saved.
        "name": "mongodb-test",
        "jars": [
            "oss://spark_test/jars/mongodb/mongo-java-driver-3.8.2.jar",  ##The OSS directory in which the test dependency is saved.
            "oss://spark_test/jars/mongodb/mongo-spark-connector_2.11-2.4.2.jar"  ##The OSS directory in which the test dependency is saved.
        ],
        "className": "com.aliyun.spark.SparkOnMongoDB",
        "conf": {
            "spark.driver.resourceSpec": "small",
            "spark.executor.instances": 2,
            "spark.executor.resourceSpec": "small",
            "spark.dla.eni.enable": "true", 
            "spark.dla.eni.vswitch.id": "vsw-xxx",   #The ID of the vSwitch in the VPC where the ApsaraDB for MongoDB cluster resides.
            "spark.dla.eni.security.group.id": "sg-xxx"  #The ID of the security group in the VPC where the ApsaraDB for MongoDB cluster resides.
        }
    }

Result

After the job succeeds, find the job and click Log in the Operation column to view the logs of the job.