ApsaraDB for Redis is a database service that is compatible with the open source Redis protocol and provides hybrid storage of memory and hard disks. Based on reliable hot standby architecture and scalable cluster architecture, ApsaraDB for Redis is suitable for scenarios that require flexible configuration changes, high throughput, and low latency. This topic describes how to use the serverless Spark engine of Data Lake Analytics (DLA) to access ApsaraDB for Redis.

Prerequisites

  • Object Storage Service (OSS) is activated. For more information, see Activate OSS.
  • An ApsaraDB for Redis cluster is created. For more information, see Step 1: Create an ApsaraDB for Redis instance.
  • The security group ID and vSwitch ID that are used by the serverless Spark engine of DLA to access the ApsaraDB for Redis cluster are obtained. For more information, see Access your VPC.
  • The security group ID and vSwitch ID that are used by the serverless Spark engine of DLA to access the ApsaraDB for Redis cluster are added to a whitelist of the ApsaraDB for Redis cluster. For more information, see Step 2: Set IP address whitelists.

Procedure

  1. Compile the following test code and the pom.xml file that contains the dependencies required for accessing the ApsaraDB for Redis cluster. Then, package the test code and dependencies into a JAR file and upload this file to OSS.
    Sample test code:
    package com.aliyun.spark
    
    import org.apache.spark.SparkConf
    import org.apache.spark.sql.SparkSession
    import org.apache.spark.sql.types._
    
    object SparkOnRedis {
    
      def main(args: Array[String]): Unit = {
        // redisHost: the internal endpoint that is used to access the ApsaraDB for Redis database. redisPort: the port number that is used to access the ApsaraDB for Redis database. redisPassword: the password that is used to log on to the ApsaraDB for Redis database.
        val redisHost = args(0)
        val redisPort = args(1)
        val redisPassword = args(2)
        // The name of the table in the ApsaraDB for Redis cluster.
        var redisTableName = args(3)
    
        // The information of the ApsaraDB for Redis cluster configured in a SparkConf.
        val sparkConf = new SparkConf()
          .set("spark.redis.host", redisHost)
          .set("spark.redis.port", redisPort)
          .set("spark.redis.auth", redisPassword)
    
        val sparkSession = SparkSession
          .builder()
          .config(sparkConf)
          .getOrCreate()
    
        // Sample data.
        val data =
          Seq(
            Person("John", 30, "60 Wall Street", 150.5),
            Person("Peter", 35, "110 Wall Street", 200.3)
          )
    
        // Call the Dataset API operation to write data to the ApsaraDB for Redis database.
        val dfw = sparkSession.createDataFrame(data)
        dfw.write.format("org.apache.spark.sql.redis")
          .option("model", "hash")
          .option("table", redisTableName)
          .save()
    
        // Use the default mode to read the hash value of the ApsaraDB for Redis cluster.
        var loadedDf = sparkSession.read.format("org.apache.spark.sql.redis")
          .option("table", redisTableName)
          .load()
          .cache()
        loadedDf.show(10)
    
        // Set infer.schema to true, which enables the serverless Spark engine of DLA to retrieve the schema from the ApsaraDB for Redis database.
        loadedDf = sparkSession.read.format("org.apache.spark.sql.redis")
          //        .option("table", redisTableName)
          .option("keys.pattern", redisTableName + ":*")
          .option("infer.schema", "true")
          .load()
        loadedDf.show(10)
    
        // Define the schema.
        loadedDf = sparkSession.read.format("org.apache.spark.sql.redis")
          .option("keys.pattern", redisTableName + ":*")
          .schema(StructType(Array(
            StructField("name", StringType),
            StructField("age", IntegerType),
            StructField("address", StringType),
            StructField("salary", DoubleType)
          )))
          .load()
        loadedDf.show(10)
    
        sparkSession.stop()
      }
    
    }
    
    case class Person(name: String, age: Int, address: String, salary: Double)
    Dependencies in the pom.xml file of the ApsaraDB for Redis cluster:
            <dependency>
                <groupId>org.apache.commons</groupId>
                <artifactId>commons-pool2</artifactId>
                <version>2.0</version>
            </dependency>
            <dependency>
                <groupId>com.redislabs</groupId>
                <artifactId>jedis</artifactId>
                <version>3.0.0-m1</version>
            </dependency>
            <dependency>
                <groupId>com.redislabs</groupId>
                <artifactId>spark-redis</artifactId>
                <version>2.3.1-m3</version>
            </dependency>
  2. Log on to the DLA console.
  3. In the top navigation bar, select the region where the ApsaraDB for Redis cluster resides.
  4. In the left-side navigation pane, choose Serverless Spark > Submit job.
  5. On the Parameter Configuration page, click Create Job.
  6. In the Create Job dialog box, configure the parameters and click OK to create a Spark job.
    3
  7. In the Job List navigation tree, click the Spark job that you created and enter the following content of the job in the code editor. Replace the parameter values based on the following parameter descriptions. Then, click Save and Execute.
    {
        "args": [
            "r-xxx1.redis.rds.aliyuncs.com",  # The internal endpoint  (host) that is used to access the ApsaraDB for Redis database.
            "6379",  # The port number that is used to access the ApsaraDB for Redis database.
            "xxx2",  # The password that is used to log on to the ApsaraDB for Redis database.
            "spark-test"  # The name of the table in the ApsaraDB for Redis database.
        ],
        "file": "oss://spark_test/jars/redis/spark-examples-0.0.1-SNAPSHOT.jar",  # The OSS directory where the test code is saved.
        "name": "redis-test",
        "jars": [
            "oss://spark_test/jars/redis/spark-redis-2.3.1-m3.jar",  # The OSS directory where the JAR file that contains the dependencies of the test code is saved.
            "oss://spark_test/jars/redis/commons-pool2-2.0.jar",  # The OSS directory where the JAR file that contains the dependencies of the test code is saved.
            "oss://spark_test/jars/redis/jedis-3.0.0-m1.jar"  # The OSS directory where the JAR file that contains the dependencies of the test code is saved.
        ],
        "className": "com.aliyun.spark.SparkOnRedis",
        "conf": {
            "spark.driver.resourceSpec": "small",
            "spark.executor.instances": 2,
            "spark.executor.resourceSpec": "small",
            "spark.dla.eni.enable": "true",
            "spark.dla.eni.vswitch.id": "vsw-xxx",  # The ID of the vSwitch to which the ApsaraDB for Redis cluster belongs.
            "spark.dla.eni.security.group.id": "sg-xxx"  # The ID of the security group to which the ApsaraDB for Redis cluster belongs.
        }
    }

Result

After the job succeeds, find the job and click Log in the Operation column to view the logs of the job.