本文介紹如何使用AnalyticDB for MySQLSpark通過ENI網路訪問Redis資料。
前提條件
AnalyticDB for MySQL叢集的產品系列為湖倉版。
已在AnalyticDB for MySQL叢集中建立Job型資源群組。具體操作,請參見建立資源群組。
已建立AnalyticDB for MySQL叢集的資料庫帳號。
如果是通過阿里雲帳號訪問,只需建立高許可權帳號。具體操作,請參見建立高許可權帳號。
如果是通過RAM使用者訪問,需要建立高許可權帳號和普通帳號並且將RAM使用者綁定到普通帳號上。具體操作,請參見建立資料庫帳號和綁定或解除綁定RAM使用者與資料庫帳號。
AnalyticDB for MySQL叢集與Redis執行個體位於同一地區。具體操作,請參見步驟1:建立執行個體。
已將Redis執行個體添加到安全性群組中,且安全性群組規則的入方向與出方向允許存取Redis連接埠的訪問請求。具體操作,請參見設定白名單和添加安全性群組規則。
操作步驟
下載AnalyticDB for MySQL Spark訪問Redis依賴的Jar包。下載連結,請參見spark-redis、jedis和commons-pool2。
在pom.xml檔案的dependencies中添加依賴項。
<dependency> <groupId>com.redislabs</groupId> <artifactId>spark-redis_2.12</artifactId> <version>3.1.0</version> </dependency> <dependency> <groupId>redis.clients</groupId> <artifactId>jedis</artifactId> <version>3.9.0</version> </dependency> <dependency> <groupId>org.apache.commons</groupId> <artifactId>commons-pool2</artifactId> <version>2.11.1</version> </dependency>編寫如下樣本程式來訪問Redis,並進行編譯打包。本文產生的Jar包名稱為
redis_test.jar。範例程式碼如下:package com.aliyun.spark import org.apache.spark.SparkConf import org.apache.spark.sql.SparkSession import org.apache.spark.sql.types._ object SparkOnRedis { def main(args: Array[String]): Unit = { val redisHost = args(0) val redisPort = args(1) val redisPassword = args(2) var redisTableName = args(3) //Spark Conf中配置的Redis資訊。 val sparkConf = new SparkConf() .set("spark.redis.host", redisHost) .set("spark.redis.port", redisPort) .set("spark.redis.auth", redisPassword) val sparkSession = SparkSession .builder() .config(sparkConf) .getOrCreate() //範例資料。 val data = Seq( Person("John", 30, "60 Wall Street", 150.5), Person("Peter", 35, "110 Wall Street", 200.3) ) //通過Dataset API寫入資料。 val dfw = sparkSession.createDataFrame(data) dfw.write.format("org.apache.spark.sql.redis") .option("model", "hash") .option("table", redisTableName) .save() //預設讀取Redis的Hash值。 var loadedDf = sparkSession.read.format("org.apache.spark.sql.redis") .option("table", redisTableName) .load() .cache() loadedDf.show(10) //設定infer.schema=true,Spark會檢索Redis的Schema。 loadedDf = sparkSession.read.format("org.apache.spark.sql.redis") .option("keys.pattern", redisTableName + ":*") .option("infer.schema", "true") .load() loadedDf.show(10) //指定Schema的方式。 loadedDf = sparkSession.read.format("org.apache.spark.sql.redis") .option("keys.pattern", redisTableName + ":*") .schema(StructType(Array( StructField("name", StringType), StructField("age", IntegerType), StructField("address", StringType), StructField("salary", DoubleType) ))) .load() loadedDf.show(10) sparkSession.stop() } } case class Person(name: String, age: Int, address: String, salary: Double)將步驟1下載的Jar包和樣本程式
redis_test.jar上傳至OSS。具體操作,請參見簡單上傳。進入Spark Jar開發。
登入雲原生資料倉儲AnalyticDB MySQL控制台,在左上方選擇叢集所在地區。在左側導覽列,單擊集群清單,在湖倉版頁簽下,單擊目的地組群ID。
在左側導覽列,單擊作業開發> Spark Jar 開發。
在編輯器視窗上方,選擇Job型資源群組和作業類型。本文以Batch類型為例。
在編輯器中輸入以下作業內容。
{ "name": "<redis-example>", "file": "oss://<bucket_name>/redis_test.jar", "className": "com.aliyun.spark.<SparkOnRedis>", "jars": [ "oss://<bucket_name>/spark-redis_2.12-3.1.0.jar", "oss://<bucket_name>/jedis-3.9.0.jar", "oss://<bucket_name>/commons-pool2-2.11.1.jar" ], "args": [ -- Redis執行個體的內網串連地址。在目標執行個體的執行個體資訊頁面的串連資訊地區,可查看到各連線類型的地址和連接埠號碼。 "<r-bp1qsslcssr****.redis.rds.aliyuncs.com>", --Redis執行個體的連接埠號碼,固定為6379。 "6379", -- Redis執行個體的資料庫帳號密碼。 "<your_password>", -- Redis執行個體的表名。 "<redis_table_name>" ], "conf": { "spark.adb.eni.enabled": "true", "spark.adb.eni.vswitchId": "vsw-bp17jqw3lrrobn6y****", "spark.adb.eni.securityGroupId": "sg-bp163uxgt4zandx****", "spark.driver.resourceSpec": "small", "spark.executor.instances": 1, "spark.executor.resourceSpec": "small" } }參數說明如下。
參數
說明
nameSpark作業名稱。
fileSpark作業主檔案的儲存位置。主檔案是入口類所在的Jar包或者Python的入口執行檔案。
說明Spark作業主檔案目前只支援儲存在OSS中。
classNameJava或者Scala程式入口類名稱。Python不需要指定入口類。
args請根據業務需求,填寫使用Jar包時需要的參數。多個參數之間以英文逗號(,)分隔。
spark.adb.eni.enabled是否開啟ENI訪問。本文需要開啟ENI訪問。
spark.adb.eni.vswitchId交換器ID。在目標Redis執行個體的執行個體資訊頁面擷取交換器ID。
spark.adb.eni.securityGroupIdRedis執行個體中添加的安全性群組ID。如未添加安全性群組,請參見設定白名單。
conf其他參數
與開源Spark中的配置項基本一致,參數格式為
key:value形式,多個參數之間以英文逗號(,)分隔。更多conf參數,請參見Conf配置參數。單擊運行。
待應用列表中目標應用的狀態為已完成,您可以單擊操作列的日誌查看Redis表的資料。