This topic describes how to use the serverless Spark engine of Data Lake Analytics (DLA) to access Tablestore.

Prerequisites

Procedure

  1. Prepare the following test code for accessing Tablestore, package the test code into a JAR file, and then upload this file to an OSS directory.
    package com.aliyun.spark
    
    import org.apache.spark.SparkConf
    import org.apache.spark.sql.types.{DataType, IntegerType, StringType, StructField, StructType}
    import org.apache.spark.sql.{Row, SaveMode, SparkSession}
    
    import scala.collection.mutable
    // Configure predicate pushdown for batch computing.
    object SparkTablestore {
      def main(args: Array[String]): Unit = {
        if (args.length < 5) {
          System.err.println(
            """Usage: SparkTablestore <instance-id> <table-name> <VPC endpoint of the Tablestore instance> <ACCESS_KEY_ID>
              |         <ACCESS_KEY_SECRET>
            """.stripMargin)
          System.exit(1)
        }
        val instanceId = args(0)
        val tableName = args(1)
        val endpoint = args(2)
        val accessKeyId = args(3)
        val accessKeySecret = args(4)
        // The table schema. You must create a table with the following schema in Tablestore. 
        val catalog =
          """
            |{
            |  "columns": {
            |    "id":{
            |      "col":"id",
            |      "type":"string"
            |    },
            |     "company": {
            |      "col": "company",
            |      "type": "string"
            |    },
            |    "age": {
            |      "col": "age",
            |      "type": "integer"
            |    },
            |
            |    "name": {
            |      "col": "name",
            |      "type": "string"
            |    }
            |  }
            |}
            |""".stripMargin
        val sparkConf = new SparkConf
        val options = new mutable.HashMap[String, String]()
        // The name of the Tablestore instance. 
        options.put("instance.name", instanceId)
        // The name of the Tablestore table that you want to access. 
        options.put("table.name", tableName)
        // The endpoint of the Tablestore instance.
        options.put("endpoint", endpoint)
        // The AccessKey ID that is used to access the Tablestore instance. 
        options.put("access.key.id", accessKeyId)
        // The AccessKey secret that is used to access the Tablestore instance. 
        options.put("access.key.secret", accessKeySecret)
        // The table schema, which is in the JSON format. 
        options.put("catalog", catalog)
        // Specifies whether the predicate that is used to compare the relationship between the data type in the table with the LONG type is pushed down. The relationship is represented by range operators (≥ > < ≤). 
        options.put("push.down.range.long", "true")
        // Specifies whether the predicate that is used to compare the relationship between the data type in the table with the STRING type is pushed down. The relationship is represented by range operators (≥ > < ≤). 
        options.put("push.down.range.string", "true")
        // The maximum number of data records that can be synchronized by using the Tablestore channel in each Spark batch processing period. The default value is 10000. 
        options.put("maxOffsetsPerChannel", "10000")
        // The name of the search index. This parameter is optional. 
        //options.put("search.index.name", "<index_name>")
        // The tunnel ID. This parameter is optional. 
        //options.put("tunnel.id", "<tunnel id>")
        val spark = SparkSession.builder.config(sparkConf).appName("Serverless Spark Tablestore Demo").getOrCreate
        import spark.implicits._
        val dfReader = spark.read.format("Tablestore").options(options)
        val df = dfReader.load()
        // Display the table content. 
        df.show()
        df.printSchema()
        // Write data to the table. 
        val schema = StructType.apply(Seq(StructField("id", StringType), StructField("company", StringType),StructField("age", IntegerType), StructField("name", StringType)))
        val newData = spark.sparkContext.parallelize(Seq(("1","ant",10,"xxx"))).map(row => Row(row._1, row._2, row._3, row._4))
        val newDataDF = spark.createDataFrame(newData, schema)
        newDataDF.write.format("Tablestore").options(options).save
        val dfReader1 = spark.read.format("Tablestore").options(options)
        val df1 = dfReader1.load()
        df1.show()
      }
    }
  2. Log on to the DLA console.
  3. In the top navigation bar, select the region in which your Tablestore instance resides.
  4. In the left-side navigation pane, choose Serverless Spark > Submit job.
  5. On the Parameter Configuration page, click Create a job Template.
  6. In the Create a job Template dialog box, configure the parameters and click OK to create a Spark job.
    3
  7. In the Job List navigation tree, click the name of the Spark job that you created. Enter the following job configurations in the code editor. Then, click Save and Execute.
    {
        "args": [
            "<instanceId>", 
            "<tableName>",  
            "<endpoint>",  
            "<access key id>", 
            "<access key secret>"  
        ],
        "name": "Tablestore",
        "className": "com.aliyun.spark.SparkTablestore",
        "conf": {
            "spark.driver.resourceSpec": "medium",
            "spark.dla.connectors": "oss",
            "spark.executor.instances": 1,
            "spark.dla.job.log.oss.uri": "oss://</path/to/store/your/spark/log>",  
            "spark.executor.resourceSpec": "medium"
        },
        "file": "oss://path/to/spark-examples-0.0.1-SNAPSHOT-shaded.jar"
    }

    The following table describes the parameters in the preceding code.

    Parameter Description
    instanceId The name of the Tablestore instance.
    tableName The name of the Tablestore table.
    endpoint The VPC endpoint of the Tablestore instance. You can query endpoints of the Tablestore instance and select the VPC endpoint in the Tablestore console. VPC endpoint
    access key id The AccessKey ID that is used to access the Tablestore instance.
    access key secret The AccessKey secret that is used to access the Tablestore instance.
    spark.dla.job.log.oss.uri The OSS directory in which Spark logs are saved.