This topic describes how to use Data Lake Analytics (DLA) to access Tablestore.

Prerequisites

Procedure

  1. Compile the following test code for accessing Tablestore, package the into a JAR file, and then upload this file to OSS:
    package com.aliyun.spark
    
    import org.apache.spark.SparkConf
    import org.apache.spark.sql.types.{ DataType, IntegerType, StringType, StructField, StructType}
    import org.apache.spark.sql.{ Row, SaveMode, SparkSession}
    
    import scala.collection.mutable
    //https://www.alibabacloud.com/help/doc-detail/187100.html
    object SparkTableStore {
      def main(args: Array[String]): Unit = {
        if (args.length < 5) {
          System.err.println(
            """Usage: SparkTableStore <instance-id> <table-name> <VPC endpoint of the Tablestore instance> <ACCESS_KEY_ID>
              |         <ACCESS_KEY_SECRET>
            """.stripMargin)
          System.exit(1)
        }
        val instanceId = args(0)
        val tableName = args(1)
        val endpoint = args(2)
        val accessKeyId = args(3)
        val accessKeySecret = args(4)
        //The table schema. You must create a table with the following schema in Tablestore.
        val catalog =
          """
            |{
            |  "columns": {
            |    "id":{
            |      "col":"id",
            |      "type":"string"
            |    },
            |     "company": {
            |      "col": "company",
            |      "type": "string"
            |    },
            |    "age": {
            |      "col": "age",
            |      "type": "integer"
            |    },
            |
            |    "name": {
            |      "col": "name",
            |      "type": "string"
            |    }
            |  }
            |}
            |""".stripMargin
        val sparkConf = new SparkConf
        val options = new mutable.HashMap[String, String]()
        //The name of the Tablestore instance.
        options.put("instance.name", instanceId)
        //The name of the table that you want to access.
        options.put("table.name", tableName)
        //The endpoint of the Tablestore instance.
        options.put("endpoint", endpoint)
        //The AccessKey ID that is used to access the Tablestore instance.
        options.put("access.key.id", accessKeyId)
        //The AccessKey secret that is used to access the Tablestore instance.
        options.put("access.key.secret", accessKeySecret)
        //The table schema, which is in the JSON format.
        options.put("catalog", catalog)
        //Specifies whether the predicate that is used to compare the relationship between the data type in the table with the LONG type is pushed down. The relationship is represented by Range operators (>= > < <=).
        options.put("push.down.range.long", "true")
        //Specifies whether the predicate that is used to compare the relationship between the data type in the table with the STRING type is pushed down. The relationship is represented by Range operators (>= > < <=).
        options.put("push.down.range.string", "true")
        //The maximum number of data records that can be synchronized through the Tablestore channel in each Spark batch processing period. The default value is 10000.
        options.put("maxOffsetsPerChannel", "10000")
        //The name of the search index. This parameter is optional.
        //options.put("search.index.name", "<index_name>")
        //The tunnel ID. This parameter is optional.
        //options.put("tunnel.id", "<tunnel id>")
        val spark = SparkSession.builder.config(sparkConf).appName("Serverless Spark TableStore Demo").getOrCreate
        import spark.implicits._
        val dfReader = spark.read.format("tablestore").options(options)
        val df = dfReader.load()
        //Display the table content.
        df.show()
        df.printSchema()
        //Write data to the table.
        val schema = StructType.apply(Seq(StructField("id", StringType), StructField("company", StringType),StructField("age", IntegerType), StructField("name", StringType)))
        val newData = spark.sparkContext.parallelize(Seq(("1","ant",10,"xxx"))).map(row => Row(row._1, row._2, row._3, row._4))
        val newDataDF = spark.createDataFrame(newData, schema)
        newDataDF.write.format("tablestore").options(options).save
        val dfReader1 = spark.read.format("tablestore").options(options)
        val df1 = dfReader1.load()
        df1.show()
      }
    }
  2. Log on to the DLA console.
  3. In the top navigation bar, select the region where your Tablestore instance resides.
  4. In the left-side navigation pane, choose Serverless Spark > Submit job.
  5. On the Parameter Configuration page, click Create Job.
  6. In the Create Job dialog box, configure the parameters as prompted and click OK to create a Spark job.
    3
  7. In the Job List navigation tree, click the name of the Spark job that you created. In the code editor, enter the following content of the job. Then, save and submit the job.
    {
        "args": [
            "<instanceId>",  #The name of the Tablestore instance.
            "<tableName>,   #The name of the table in Tablestore.
            "<endpoint>",  #The endpoint of the Tablestore instance. You can view the endpoint in the Tablestore console.
            "<access key id>",  #The AccessKey ID that is used to access the Tablestore instance.
            "<access key secret>"  #The AccessKey secret that is used to access the Tablestore instance.
        ],
        "name": "TableStore",
        "className": "com.aliyun.spark.SparkTableStore",
        "conf": {
            "spark.driver.resourceSpec": "medium",
            "spark.dla.connectors": "oss",
            "spark.executor.instances": 1,
            "spark.dla.job.log.oss.uri": "oss://</path/to/store/your/spark/log>",  #The directory in which Spark logs are saved.
            "spark.executor.resourceSpec": "medium"
        },
        "file": "oss://path/to/spark-examples-0.0.1-SNAPSHOT-shaded.jar"
    }