This topic describes how to use AnalyticDB for MySQL Data Lakehouse Edition (V3.0) Spark to access Tair (Redis OSS-compatible) over an elastic network interface (ENI).
Prerequisites
An AnalyticDB for MySQL Data Lakehouse Edition (V3.0) cluster is created. For more information, see Create a cluster.
A database account is created.
If you use an Alibaba Cloud account, you need to create only a privileged database account. For more information, see Create a database account.
If you use a Resource Access Management (RAM) user, you must create both a privileged database account and a standard database account and associate the standard account with the RAM user. For more information, see Create a database account and Associate or disassociate a database account with or from a RAM user.
A job resource group is created. For more information, see Create a resource group.
A Tair (Redis OSS-compatible) instance is created in the same region as the AnalyticDB for MySQL Data Lakehouse Edition (V3.0) cluster. For more information, see Create an instance.
An Elastic Compute Service (ECS) security group is added to the Tair (Redis OSS-compatible) instance as a whitelist. Rules of the security group allow inbound and outbound traffic on ports of the Tair (Redis OSS-compatible) instance. For more information, see Configure whitelists and Add a security group rule.
Object Storage Service (OSS) is activated and a bucket is created. For more information, see Activate OSS and Create buckets.
Procedure
Download the JAR packages that are required for AnalyticDB for MySQL Spark to access Tair (Redis OSS-compatible). For more information, see Spark Redis, Jedis, and Apache Commons Pool.
Add the following dependencies to the pom.xml file:
<dependency> <groupId>com.redislabs</groupId> <artifactId>spark-redis_2.12</artifactId> <version>3.1.0</version> </dependency> <dependency> <groupId>redis.clients</groupId> <artifactId>jedis</artifactId> <version>3.9.0</version> </dependency> <dependency> <groupId>org.apache.commons</groupId> <artifactId>commons-pool2</artifactId> <version>2.11.1</version> </dependency>Write and package a program. In this example, the generated package is named
redis_test.jar. Sample code:package com.aliyun.spark import org.apache.spark.SparkConf import org.apache.spark.sql.SparkSession import org.apache.spark.sql.types._ object SparkOnRedis { def main(args: Array[String]): Unit = { val redisHost = args(0) val redisPort = args(1) val redisPassword = args(2) var redisTableName = args(3) // The Tair (Redis OSS-compatible) instance information that is specified in the Spark configuration. val sparkConf = new SparkConf() .set("spark.redis.host", redisHost) .set("spark.redis.port", redisPort) .set("spark.redis.auth", redisPassword) val sparkSession = SparkSession .builder() .config(sparkConf) .getOrCreate() // Sample data. val data = Seq( Person("John", 30, "60 Wall Street", 150.5), Person("Peter", 35, "110 Wall Street", 200.3) ) // Call the Dataset API operation to write the data to the Tair (Redis OSS-compatible) instance. val dfw = sparkSession.createDataFrame(data) dfw.write.format("org.apache.spark.sql.redis") .option("model", "hash") .option("table", redisTableName) .save() // Use the default mode to read the hash value of the Tair (Redis OSS-compatible) instance. var loadedDf = sparkSession.read.format("org.apache.spark.sql.redis") .option("table", redisTableName) .load() .cache() loadedDf.show(10) // Set the infer.schema parameter to true. In this case, Spark retrieves the schema of the Tair (Redis OSS-compatible) instance. loadedDf = sparkSession.read.format("org.apache.spark.sql.redis") .option("keys.pattern", redisTableName + ":*") .option("infer.schema", "true") .load() loadedDf.show(10) // Define the schema. loadedDf = sparkSession.read.format("org.apache.spark.sql.redis") .option("keys.pattern", redisTableName + ":*") .schema(StructType(Array( StructField("name", StringType), StructField("age", IntegerType), StructField("address", StringType), StructField("salary", DoubleType) ))) .load() loadedDf.show(10) sparkSession.stop() } } case class Person(name: String, age: Int, address: String, salary: Double)Upload the JAR packages downloaded in Step 1 and the
redis_test.jarprogram to OSS. For more information, see Simple upload.Go to the Spark JAR Development page.
Log on to the AnalyticDB for MySQL console.
In the upper-left corner of the page, select the region where the cluster resides.
In the left-side navigation pane, click Clusters.
On the Data Lakehouse Edition (V3.0) tab, find the cluster that you want to manage and click the cluster ID.
In the left-side navigation pane, choose Job Development > Spark JAR Development.
Select a job resource group and a job type for the Spark job. In this example, the batch type is selected.
Enter the following code in the Spark editor:
{ "name": "<redis-example>", "file": "oss://<bucket_name>/redis_test.jar", "className": "com.aliyun.spark.<SparkOnRedis>", "jars": [ "oss://<bucket_name>/spark-redis_2.12-3.1.0.jar", "oss://<bucket_name>/jedis-3.9.0.jar", "oss://<bucket_name>/commons-pool2-2.11.1.jar" ], "args": [ -- The internal endpoint of the Tair (Redis OSS-compatible) instance. In the Connection Information section of the Instance Information page, you can view different types of endpoints and port numbers of the instance. "<r-bp1qsslcssr****.redis.rds.aliyuncs.com>", -- The port number of the Tair (Redis OSS-compatible) instance. Set the value to 6379. "6379", -- The password of the database account of the Tair (Redis OSS-compatible) instance. "<your_password>", -- The name of the table in the Tair (Redis OSS-compatible) instance. "<redis_table_name>" ], "conf": { "spark.adb.eni.enabled": "true", "spark.adb.eni.vswitchId": "vsw-bp17jqw3lrrobn6y****", "spark.adb.eni.securityGroupId": "sg-bp163uxgt4zandx****", "spark.driver.resourceSpec": "small", "spark.executor.instances": 1, "spark.executor.resourceSpec": "small" } }The following table describes the parameters.
Parameter
Description
nameThe name of the Spark job.
fileThe path of the main file of the Spark job. The main file can be a JAR package that contains the entry class or an executable file that serves as the entry point for the Python program.
NoteThe main file of a Spark job must be stored in OSS.
classNameThe entry class of the Java or Scala program. The entry class is not required for a Python program.
argsThe arguments that are required for the use of the JAR packages. Specify the arguments based on your business requirements. Separate multiple parameters with commas (,).
spark.adb.eni.enabledSpecifies whether to enable ENI. When you use Data Lakehouse Edition (V3.0) Spark to access Tair (Redis OSS-compatible), you must enable ENI.
spark.adb.eni.vswitchIdThe vSwitch ID of the Tair (Redis OSS-compatible) instance. On the Instance Information page, you can view the vSwitch ID.
spark.adb.eni.securityGroupIdThe ID of the ECS security group that is added to the Tair (Redis OSS-compatible) instance as a whitelist. For more information about how to add ECS security groups as whitelists, see the "Method 2: Add ECS security groups as whitelists" section of the Configure whitelists topic.
conf
The configuration parameters that are required for the Spark job, which are similar to those of Apache Spark. The parameters must be in the
key:valueformat. Separate multiple parameters with commas (,). For more information, see Conf configuration parameters.Click Run Now.
After the state of the involved Spark application changes to Completed, find the Spark application and click Log in the Actions column on the Applications tab to view the data of the Tair (Redis OSS-compatible) table.