全部產品
Search
文件中心

AnalyticDB:訪問Redis資料來源

更新時間:Nov 02, 2024

本文介紹如何使用AnalyticDB for MySQLSpark通過ENI網路訪問Redis資料。

前提條件

操作步驟

  1. 下載AnalyticDB for MySQL Spark訪問Redis依賴的Jar包。下載連結,請參見spark-redisjediscommons-pool2

  2. 在pom.xml檔案的dependencies中添加依賴項。

          <dependency>
                <groupId>com.redislabs</groupId>
                <artifactId>spark-redis_2.12</artifactId>
                <version>3.1.0</version>
          </dependency>
          <dependency>
                <groupId>redis.clients</groupId>
                <artifactId>jedis</artifactId>
                <version>3.9.0</version>
          </dependency>
          <dependency>
                <groupId>org.apache.commons</groupId>
                <artifactId>commons-pool2</artifactId>
                <version>2.11.1</version>
          </dependency>
  3. 編寫如下樣本程式來訪問Redis,並進行編譯打包。本文產生的Jar包名稱為redis_test.jar。範例程式碼如下:

    package com.aliyun.spark
    
    import org.apache.spark.SparkConf
    import org.apache.spark.sql.SparkSession
    import org.apache.spark.sql.types._
    
    object SparkOnRedis {
    
      def main(args: Array[String]): Unit = {
        val redisHost = args(0)
        val redisPort = args(1)
        val redisPassword = args(2)
        var redisTableName = args(3)
    
       //Spark Conf中配置的Redis資訊。
        val sparkConf = new SparkConf()
          .set("spark.redis.host", redisHost)
          .set("spark.redis.port", redisPort)
          .set("spark.redis.auth", redisPassword)
    
        val sparkSession = SparkSession
          .builder()
          .config(sparkConf)
          .getOrCreate()
    
        //範例資料。
        val data =
          Seq(
            Person("John", 30, "60 Wall Street", 150.5),
            Person("Peter", 35, "110 Wall Street", 200.3)
          )
    
        //通過Dataset API寫入資料。
        val dfw = sparkSession.createDataFrame(data)
        dfw.write.format("org.apache.spark.sql.redis")
          .option("model", "hash")
          .option("table", redisTableName)
          .save()
    
        //預設讀取Redis的Hash值。
        var loadedDf = sparkSession.read.format("org.apache.spark.sql.redis")
          .option("table", redisTableName)
          .load()
          .cache()
        loadedDf.show(10)
    
        //設定infer.schema=true,Spark會檢索Redis的Schema。
        loadedDf = sparkSession.read.format("org.apache.spark.sql.redis")
          .option("keys.pattern", redisTableName + ":*")
          .option("infer.schema", "true")
          .load()
        loadedDf.show(10)
    
        //指定Schema的方式。
        loadedDf = sparkSession.read.format("org.apache.spark.sql.redis")
          .option("keys.pattern", redisTableName + ":*")
          .schema(StructType(Array(
            StructField("name", StringType),
            StructField("age", IntegerType),
            StructField("address", StringType),
            StructField("salary", DoubleType)
          )))
          .load()
        loadedDf.show(10)
    
        sparkSession.stop()
      }
    
    }
    
    case class Person(name: String, age: Int, address: String, salary: Double)
  4. 將步驟1下載的Jar包和樣本程式redis_test.jar上傳至OSS。具體操作,請參見簡單上傳

  5. 進入Spark Jar開發。

    1. 登入雲原生資料倉儲AnalyticDB MySQL控制台,在左上方選擇叢集所在地區。在左側導覽列,單擊集群清單,在湖倉版頁簽下,單擊目的地組群ID。

    2. 在左側導覽列,單擊作業開發Spark Jar 開發

  6. 在編輯器視窗上方,選擇Job型資源群組和作業類型。本文以Batch類型為例。

  7. 在編輯器中輸入以下作業內容。

    {
        "name": "<redis-example>",
        "file": "oss://<bucket_name>/redis_test.jar",
        "className": "com.aliyun.spark.<SparkOnRedis>",
        "jars": [
          "oss://<bucket_name>/spark-redis_2.12-3.1.0.jar",
          "oss://<bucket_name>/jedis-3.9.0.jar",
          "oss://<bucket_name>/commons-pool2-2.11.1.jar"
        ],
        "args": [
          -- Redis執行個體的內網串連地址。在目標執行個體的執行個體資訊頁面的串連資訊地區,可查看到各連線類型的地址和連接埠號碼。
          "<r-bp1qsslcssr****.redis.rds.aliyuncs.com>",
          --Redis執行個體的連接埠號碼,固定為6379。
          "6379",
          -- Redis執行個體的資料庫帳號密碼。
          "<your_password>",
          -- Redis執行個體的表名。
          "<redis_table_name>"
        ],
        "conf": {
            "spark.adb.eni.enabled": "true",
            "spark.adb.eni.vswitchId": "vsw-bp17jqw3lrrobn6y****",
            "spark.adb.eni.securityGroupId": "sg-bp163uxgt4zandx****",
            "spark.driver.resourceSpec": "small",
            "spark.executor.instances": 1,
            "spark.executor.resourceSpec": "small"
        }
    }

    參數說明如下。

    參數

    說明

    name

    Spark作業名稱。

    file

    Spark作業主檔案的儲存位置。主檔案是入口類所在的Jar包或者Python的入口執行檔案。

    說明

    Spark作業主檔案目前只支援儲存在OSS中。

    className

    Java或者Scala程式入口類名稱。Python不需要指定入口類。

    args

    請根據業務需求,填寫使用Jar包時需要的參數。多個參數之間以英文逗號(,)分隔。

    spark.adb.eni.enabled

    是否開啟ENI訪問。本文需要開啟ENI訪問。

    spark.adb.eni.vswitchId

    交換器ID。在目標Redis執行個體的執行個體資訊頁面擷取交換器ID。

    spark.adb.eni.securityGroupId

    Redis執行個體中添加的安全性群組ID。如未添加安全性群組,請參見設定白名單

    conf其他參數

    與開源Spark中的配置項基本一致,參數格式為key:value形式,多個參數之間以英文逗號(,)分隔。更多conf參數,請參見Conf配置參數

  8. 單擊運行

  9. 應用列表中目標應用的狀態為已完成,您可以單擊操作列的日誌查看Redis表的資料。