All Products
Search
Document Center

AnalyticDB for MySQL:Access ApsaraDB for Redis

Last Updated:May 08, 2023

This topic describes how to use AnalyticDB for MySQL Data Lakehouse Edition (V3.0) Spark to access ApsaraDB for Redis over an elastic network interface (ENI).

Prerequisites

Procedure

  1. Download the JAR packages that are required for AnalyticDB for MySQL Spark to access ApsaraDB for Redis. For more information, see Spark Redis, Jedis, and Apache Commons Pool.

  2. Add the following dependencies to the pom.xml file:

          <dependency>
                <groupId>com.redislabs</groupId>
                <artifactId>spark-redis_2.12</artifactId>
                <version>3.1.0</version>
          </dependency>
          <dependency>
                <groupId>redis.clients</groupId>
                <artifactId>jedis</artifactId>
                <version>3.9.0</version>
          </dependency>
          <dependency>
                <groupId>org.apache.commons</groupId>
                <artifactId>commons-pool2</artifactId>
                <version>2.11.1</version>
          </dependency>
  3. Write and package a program. In this example, the generated package is named redis_test.jar. Sample code:

    package com.aliyun.spark
    
    import org.apache.spark.SparkConf
    import org.apache.spark.sql.SparkSession
    import org.apache.spark.sql.types._
    
    object SparkOnRedis {
    
      def main(args: Array[String]): Unit = {
        val redisHost = args(0)
        val redisPort = args(1)
        val redisPassword = args(2)
        var redisTableName = args(3)
    
       // The ApsaraDB for Redis instance information that is specified in the Spark configuration. 
        val sparkConf = new SparkConf()
          .set("spark.redis.host", redisHost)
          .set("spark.redis.port", redisPort)
          .set("spark.redis.auth", redisPassword)
    
        val sparkSession = SparkSession
          .builder()
          .config(sparkConf)
          .getOrCreate()
    
        // Sample data. 
        val data =
          Seq(
            Person("John", 30, "60 Wall Street", 150.5),
            Person("Peter", 35, "110 Wall Street", 200.3)
          )
    
        // Call the Dataset API operation to write the data to the ApsaraDB for Redis instance. 
        val dfw = sparkSession.createDataFrame(data)
        dfw.write.format("org.apache.spark.sql.redis")
          .option("model", "hash")
          .option("table", redisTableName)
          .save()
    
        // Use the default mode to read the hash value of the ApsaraDB for Redis instance. 
        var loadedDf = sparkSession.read.format("org.apache.spark.sql.redis")
          .option("table", redisTableName)
          .load()
          .cache()
        loadedDf.show(10)
    
        // Set the infer.schema parameter to true. In this case, Spark retrieves the schema of the ApsaraDB for Redis instance. 
        loadedDf = sparkSession.read.format("org.apache.spark.sql.redis")
          .option("keys.pattern", redisTableName + ":*")
          .option("infer.schema", "true")
          .load()
        loadedDf.show(10)
    
        // Define the schema. 
        loadedDf = sparkSession.read.format("org.apache.spark.sql.redis")
          .option("keys.pattern", redisTableName + ":*")
          .schema(StructType(Array(
            StructField("name", StringType),
            StructField("age", IntegerType),
            StructField("address", StringType),
            StructField("salary", DoubleType)
          )))
          .load()
        loadedDf.show(10)
    
        sparkSession.stop()
      }
    
    }
    
    case class Person(name: String, age: Int, address: String, salary: Double)
  4. Upload the JAR packages downloaded in Step 1 and the redis_test.jar program to OSS. For more information, see Simple upload.

  5. Go to the Spark JAR Development page.

    1. Log on to the AnalyticDB for MySQL console.

    2. In the upper-left corner of the page, select the region where the cluster resides.

    3. In the left-side navigation pane, click Clusters.

    4. On the Data Lakehouse Edition (V3.0) tab, find the cluster that you want to manage and click the cluster ID.

    5. In the left-side navigation pane, choose Job Development > Spark JAR Development.

  6. Select the job resource group and a job type for the Spark job. In this example, the batch type is selected.

  7. Enter the following code in the Spark editor:

    {
        "name": "<redis-example>",
        "file": "oss://<bucket_name>/redis_test.jar",
        "className": "com.aliyun.spark.<SparkOnRedis>",
        "jars": [
          "oss://<bucket_name>/spark-redis_2.12-3.1.0.jar",
          "oss://<bucket_name>/jedis-3.9.0.jar",
          "oss://<bucket_name>/commons-pool2-2.11.1.jar"
        ],
        "args": [
          -- The internal endpoint of the ApsaraDB for Redis instance. In the Connection Information section of the Instance Information page, you can view different types of endpoints and port numbers of the instance. 
          "<r-bp1qsslcssr****.redis.rds.aliyuncs.com>",
          -- The port number of the ApsaraDB for Redis instance. Set the value to 6379. 
          "6379",
          -- The password of the database account of the ApsaraDB for Redis instance. 
          "<your_password>",
          -- The name of the table in the ApsaraDB for Redis instance. 
          "<redis_table_name>"
        ],
        "conf": {
            "spark.adb.eni.enabled": "true",
            "spark.adb.eni.vswitchId": "vsw-bp17jqw3lrrobn6y****",
            "spark.adb.eni.securityGroupId": "sg-bp163uxgt4zandx****",
            "spark.driver.resourceSpec": "small",
            "spark.executor.instances": 1,
            "spark.executor.resourceSpec": "small"
        }
    }

    The following table describes the parameters.

    Parameter

    Description

    name

    The name of the Spark job.

    file

    The path of the main file of the Spark job. The main file can be a JAR package that contains the entry class or an executable file that serves as the entry point for the Python program.

    Note

    The main file of a Spark job must be stored in OSS.

    className

    The entry class of the Java or Scala program. The entry class is not required for a Python program.

    args

    The arguments that are required for the use of the JAR packages. Specify the arguments based on your business requirements. Separate multiple parameters with commas (,).

    spark.adb.eni.enabled

    Specifies whether to enable ENI. When you use Data Lakehouse Edition (V3.0) Spark to access ApsaraDB for Redis, you must enable ENI.

    spark.adb.eni.vswitchId

    The vSwitch ID of the ApsaraDB for Redis instance. On the Instance Information page, you can view the vSwitch ID.

    spark.adb.eni.securityGroupId

    The ID of the ECS security group that is added to the ApsaraDB for Redis instance as a whitelist. For more information about how to add ECS security groups as whitelists, see the "Method 2: Add ECS security groups as whitelists" section of the Configure whitelists topic.

    conf

    The configuration parameters that are required for the Spark job, which are similar to those of Apache Spark. The parameters must be in the key:value format. Separate multiple parameters with commas (,). For more information, see Conf configuration parameters.

  8. Click Run Now.

  9. After the state of the involved Spark application changes to Completed, find the Spark application and click Log in the Actions column on the Applications tab to view the data of the ApsaraDB for Redis table.