This topic describes how to use AnalyticDB for MySQL Data Lakehouse Edition (V3.0) Spark to access ApsaraDB for Redis over an elastic network interface (ENI).
Prerequisites
An AnalyticDB for MySQL Data Lakehouse Edition (V3.0) cluster is created. For more information, see Create a cluster.
A database account is created.
If you use an Alibaba Cloud account, you need to create only a privileged database account. For more information, see Create a database account.
If you use a Resource Access Management (RAM) user, you must create both a privileged database account and a standard database account and associate the standard account with the RAM user. For more information, see Create a database account and Associate or disassociate a database account with or from a RAM user.
A job resource group is created. For more information, see Create a resource group.
An ApsaraDB for Redis instance is created in the same region as the AnalyticDB for MySQL Data Lakehouse Edition (V3.0) cluster. For more information, see Create an ApsaraDB for Redis instance.
An Elastic Compute Service (ECS) security group is added to the ApsaraDB for Redis instance as a whitelist. Rules of the security group allow inbound and outbound traffic on ports of the ApsaraDB for Redis instance. For more information, see Configure whitelists and Add a security group rule.
Object Storage Service (OSS) is activated and a bucket is created. For more information, see Activate OSS and Create buckets.
Procedure
Download the JAR packages that are required for AnalyticDB for MySQL Spark to access ApsaraDB for Redis. For more information, see Spark Redis, Jedis, and Apache Commons Pool.
Add the following dependencies to the pom.xml file:
<dependency> <groupId>com.redislabs</groupId> <artifactId>spark-redis_2.12</artifactId> <version>3.1.0</version> </dependency> <dependency> <groupId>redis.clients</groupId> <artifactId>jedis</artifactId> <version>3.9.0</version> </dependency> <dependency> <groupId>org.apache.commons</groupId> <artifactId>commons-pool2</artifactId> <version>2.11.1</version> </dependency>
Write and package a program. In this example, the generated package is named
redis_test.jar
. Sample code:package com.aliyun.spark import org.apache.spark.SparkConf import org.apache.spark.sql.SparkSession import org.apache.spark.sql.types._ object SparkOnRedis { def main(args: Array[String]): Unit = { val redisHost = args(0) val redisPort = args(1) val redisPassword = args(2) var redisTableName = args(3) // The ApsaraDB for Redis instance information that is specified in the Spark configuration. val sparkConf = new SparkConf() .set("spark.redis.host", redisHost) .set("spark.redis.port", redisPort) .set("spark.redis.auth", redisPassword) val sparkSession = SparkSession .builder() .config(sparkConf) .getOrCreate() // Sample data. val data = Seq( Person("John", 30, "60 Wall Street", 150.5), Person("Peter", 35, "110 Wall Street", 200.3) ) // Call the Dataset API operation to write the data to the ApsaraDB for Redis instance. val dfw = sparkSession.createDataFrame(data) dfw.write.format("org.apache.spark.sql.redis") .option("model", "hash") .option("table", redisTableName) .save() // Use the default mode to read the hash value of the ApsaraDB for Redis instance. var loadedDf = sparkSession.read.format("org.apache.spark.sql.redis") .option("table", redisTableName) .load() .cache() loadedDf.show(10) // Set the infer.schema parameter to true. In this case, Spark retrieves the schema of the ApsaraDB for Redis instance. loadedDf = sparkSession.read.format("org.apache.spark.sql.redis") .option("keys.pattern", redisTableName + ":*") .option("infer.schema", "true") .load() loadedDf.show(10) // Define the schema. loadedDf = sparkSession.read.format("org.apache.spark.sql.redis") .option("keys.pattern", redisTableName + ":*") .schema(StructType(Array( StructField("name", StringType), StructField("age", IntegerType), StructField("address", StringType), StructField("salary", DoubleType) ))) .load() loadedDf.show(10) sparkSession.stop() } } case class Person(name: String, age: Int, address: String, salary: Double)
Upload the JAR packages downloaded in Step 1 and the
redis_test.jar
program to OSS. For more information, see Simple upload.Go to the Spark JAR Development page.
Log on to the AnalyticDB for MySQL console.
In the upper-left corner of the page, select the region where the cluster resides.
In the left-side navigation pane, click Clusters.
On the Data Lakehouse Edition (V3.0) tab, find the cluster that you want to manage and click the cluster ID.
In the left-side navigation pane, choose Job Development > Spark JAR Development.
Select the job resource group and a job type for the Spark job. In this example, the batch type is selected.
Enter the following code in the Spark editor:
{ "name": "<redis-example>", "file": "oss://<bucket_name>/redis_test.jar", "className": "com.aliyun.spark.<SparkOnRedis>", "jars": [ "oss://<bucket_name>/spark-redis_2.12-3.1.0.jar", "oss://<bucket_name>/jedis-3.9.0.jar", "oss://<bucket_name>/commons-pool2-2.11.1.jar" ], "args": [ -- The internal endpoint of the ApsaraDB for Redis instance. In the Connection Information section of the Instance Information page, you can view different types of endpoints and port numbers of the instance. "<r-bp1qsslcssr****.redis.rds.aliyuncs.com>", -- The port number of the ApsaraDB for Redis instance. Set the value to 6379. "6379", -- The password of the database account of the ApsaraDB for Redis instance. "<your_password>", -- The name of the table in the ApsaraDB for Redis instance. "<redis_table_name>" ], "conf": { "spark.adb.eni.enabled": "true", "spark.adb.eni.vswitchId": "vsw-bp17jqw3lrrobn6y****", "spark.adb.eni.securityGroupId": "sg-bp163uxgt4zandx****", "spark.driver.resourceSpec": "small", "spark.executor.instances": 1, "spark.executor.resourceSpec": "small" } }
The following table describes the parameters.
Parameter
Description
name
The name of the Spark job.
file
The path of the main file of the Spark job. The main file can be a JAR package that contains the entry class or an executable file that serves as the entry point for the Python program.
NoteThe main file of a Spark job must be stored in OSS.
className
The entry class of the Java or Scala program. The entry class is not required for a Python program.
args
The arguments that are required for the use of the JAR packages. Specify the arguments based on your business requirements. Separate multiple parameters with commas (,).
spark.adb.eni.enabled
Specifies whether to enable ENI. When you use Data Lakehouse Edition (V3.0) Spark to access ApsaraDB for Redis, you must enable ENI.
spark.adb.eni.vswitchId
The vSwitch ID of the ApsaraDB for Redis instance. On the Instance Information page, you can view the vSwitch ID.
spark.adb.eni.securityGroupId
The ID of the ECS security group that is added to the ApsaraDB for Redis instance as a whitelist. For more information about how to add ECS security groups as whitelists, see the "Method 2: Add ECS security groups as whitelists" section of the Configure whitelists topic.
conf
The configuration parameters that are required for the Spark job, which are similar to those of Apache Spark. The parameters must be in the
key:value
format. Separate multiple parameters with commas (,). For more information, see Conf configuration parameters.Click Run Now.
After the state of the involved Spark application changes to Completed, find the Spark application and click Log in the Actions column on the Applications tab to view the data of the ApsaraDB for Redis table.