All Products
Search
Document Center

Data Lake Analytics - Deprecated:Redis

Last Updated:Feb 19, 2024

ApsaraDB for Redis is a database service compatible with the open source Redis protocol and provides hybrid storage of memory and disks. Based on reliable hot standby architectures and scalable cluster architectures, ApsaraDB for Redis is suitable for scenarios that require flexible configuration changes, high throughput, and low latency. This topic describes how to use the serverless Spark engine of Data Lake Analytics (DLA) to access ApsaraDB for Redis.

Important

DLA is discontinued. AnalyticDB for MySQL supports the features of DLA and provides additional features and enhanced performance. For more information about how to use AnalyticDB for MySQL Spark to access ApsaraDB for Redis, see Access ApsaraDB for Redis.

Prerequisites

  • Object Storage Service (OSS) is activated. For more information, see Activate OSS.

  • An ApsaraDB for Redis instance is created. For more information, see Step 1: Create an ApsaraDB for Redis instance.

  • The security group ID and vSwitch ID that are used by the serverless Spark engine of DLA to access the ApsaraDB for Redis instance are obtained. For more information, see Configure the network of data sources.

  • The security group ID and vSwitch ID that are used by the serverless Spark engine of DLA to access the ApsaraDB for Redis instance are added to the whitelist of the ApsaraDB for Redis instance. For more information, see Step 2: Configure whitelists.

Procedure

  1. Compile the following test code and download the POM file that contains the dependencies required to access ApsaraDB for Redis. Then, package the test code and dependencies into separate JAR files and upload the JAR files to OSS.

    Sample test code:

    package com.aliyun.spark
    
    import org.apache.spark.SparkConf
    import org.apache.spark.sql.SparkSession
    import org.apache.spark.sql.types._
    
    object SparkOnRedis {
    
      def main(args: Array[String]): Unit = {
        // redisHost: the internal endpoint of the ApsaraDB for Redis instance. redisPort: the port number of the ApsaraDB for Redis instance. redisPassword: the password of the database account of the ApsaraDB for Redis instance. 
        val redisHost = args(0)
        val redisPort = args(1)
        val redisPassword = args(2)
        // The name of the table in the ApsaraDB for Redis instance. 
        var redisTableName = args(3)
    
        // The ApsaraDB for Redis instance information that is specified in the Spark configuration. 
        val sparkConf = new SparkConf()
          .set("spark.redis.host", redisHost)
          .set("spark.redis.port", redisPort)
          .set("spark.redis.auth", redisPassword)
    
        val sparkSession = SparkSession
          .builder()
          .config(sparkConf)
          .getOrCreate()
    
        // Sample data. 
        val data =
          Seq(
            Person("John", 30, "60 Wall Street", 150.5),
            Person("Peter", 35, "110 Wall Street", 200.3)
          )
    
        // Call the Dataset API operation to write the data to the ApsaraDB for Redis instance. 
        val dfw = sparkSession.createDataFrame(data)
        dfw.write.format("org.apache.spark.sql.redis")
          .option("model", "hash")
          .option("table", redisTableName)
          .save()
    
        // Use the default mode to read the hash value of the ApsaraDB for Redis instance. 
        var loadedDf = sparkSession.read.format("org.apache.spark.sql.redis")
          .option("table", redisTableName)
          .load()
          .cache()
        loadedDf.show(10)
    
        // Set the infer.schema parameter to true. In this case, Spark retrieves the schema of the ApsaraDB for Redis instance. 
        loadedDf = sparkSession.read.format("org.apache.spark.sql.redis")
          //        .option("table", redisTableName)
          .option("keys.pattern", redisTableName + ":*")
          .option("infer.schema", "true")
          .load()
        loadedDf.show(10)
    
        // Define the schema. 
        loadedDf = sparkSession.read.format("org.apache.spark.sql.redis")
          .option("keys.pattern", redisTableName + ":*")
          .schema(StructType(Array(
            StructField("name", StringType),
            StructField("age", IntegerType),
            StructField("address", StringType),
            StructField("salary", DoubleType)
          )))
          .load()
        loadedDf.show(10)
    
        sparkSession.stop()
      }
    
    }
    
    case class Person(name: String, age: Int, address: String, salary: Double)

    POM file that contains the dependencies of ApsaraDB for Redis:

            <dependency>
                <groupId>org.apache.commons</groupId>
                <artifactId>commons-pool2</artifactId>
                <version>2.0</version>
            </dependency>
            <dependency>
                <groupId>com.redislabs</groupId>
                <artifactId>jedis</artifactId>
                <version>3.0.0-m1</version>
            </dependency>
            <dependency>
                <groupId>com.redislabs</groupId>
                <artifactId>spark-redis</artifactId>
                <version>2.3.1-m3</version>
            </dependency>
  2. Log on to the DLA console.

  3. In the top navigation bar, select the region in which the ApsaraDB for Redis instance resides.

  4. In the left-side navigation pane, choose Serverless Spark > Submit job.

  5. On the Parameter Configuration page, click Create a job Template.

  6. In the Create a job Template dialog box, configure the parameters as prompted and click OK.

    3

  7. On the Job list tab, click the name of the Spark job that you created and enter the following code in the code editor. Then, save and submit the Spark job.

    {
        "args": [
            "r-xxx1.redis.rds.aliyuncs.com",  # The internal endpoint of the ApsaraDB for Redis instance. 
            "6379",  # The port number of the ApsaraDB for Redis instance. 
            "xxx2",  # The password of the database account of the ApsaraDB for Redis instance. 
            "spark-test"  # The name of the table in the ApsaraDB for Redis instance. 
        ],
        "file": "oss://spark_test/jars/redis/spark-examples-0.0.1-SNAPSHOT.jar",  # The path of the JAR file that contains the test code. 
        "name": "redis-test",
        "jars": [
            "oss://spark_test/jars/redis/spark-redis-2.3.1-m3.jar",  # The path of the spark-redis-2.3.1-m3.jar file in OSS. 
            "oss://spark_test/jars/redis/commons-pool2-2.0.jar",  # The path of the commons-pool2-2.0.jar file in OSS. 
            "oss://spark_test/jars/redis/jedis-3.0.0-m1.jar"  # The path of the jedis-3.0.0-m1.jar file in OSS. 
        ],
        "className": "com.aliyun.spark.SparkOnRedis",
        "conf": {
            "spark.driver.resourceSpec": "small",
            "spark.executor.instances": 2,
            "spark.executor.resourceSpec": "small",
            "spark.dla.eni.enable": "true",
            "spark.dla.eni.vswitch.id": "vsw-xxx",  # The vSwitch ID of the ApsaraDB for Redis instance. 
            "spark.dla.eni.security.group.id": "sg-xxx"  # The ID of the security group that is added to the ApsaraDB for Redis instance as a whitelist. 
        }
    }

Result

After the job succeeds, find the job and click Log in the Operation column to view the logs of the job. If the following log appears, the job succeeds.