By integrating Spark and Object Storage Service (OSS), Alibaba Cloud E-MapReduce (EMR) implements efficient processing and analysis of data in cloud-based data lakes. EMR allows you to read data from and write data to OSS without specifying an AccessKey pair or by explicitly specifying an AccessKey pair. This topic describes how to use Spark to process and analyze OSS data.
Read data from and write data to OSS without specifying an AccessKey pair
Use Spark Resilient Distributed Dataset (RDD) to read data from and write data to OSS
The following example shows how to use Spark to read data from OSS and write the processed data back to OSS without specifying an AccessKey pair:
Log on to the master node of your cluster in SSH mode. For more information, see Log on to the master node of a cluster.
Run the following command to start Spark Shell:
spark-shellAfter you modify the parameters in the following code based on your business requirements, run the following Scala code in Spark Shell to read data from and write data to OSS.
import org.apache.spark.{SparkConf, SparkContext} val conf = new SparkConf().setAppName("Test OSS") val sc = new SparkContext(conf) val pathIn = "oss://<yourBucket>/path/to/read" val inputData = sc.textFile(pathIn) val cnt = inputData.count println(s"count: $cnt") val outputPath = "oss://<yourBucket>/path/to/write" val outputData = inputData.map(e => s"$e has been processed.") outputData.saveAsTextFile(outputPath)In this example, replace the following parameters based on your business requirements:
yourBucket: specifies the name of an OSS bucket.pathIn: specifies the path of the file from which you want to read data.outputPath: specifies the path of the file to which you want to write data.
For the complete sample code, visit GitHub.
Use PySpark to read data from and write data to OSS
The following example shows how to use PySpark to read data from OSS and write the processed data back to OSS without specifying an AccessKey pair:
Log on to the master node of your cluster in SSH mode. For more information, see Log on to the master node of a cluster.
Run the following command to go to the interactive environment of PySpark:
pysparkAfter you modify the parameters in the following code based on your business requirements, run the code in PySpark.
from pyspark.sql import SparkSession spark = SparkSession.builder.appName("Python Spark SQL OSS example").getOrCreate() pathIn = "oss://<yourBucket>/path/to/read" df = spark.read.text(pathIn) cnt = df.count() print(cnt) outputPath = "oss://<yourBucket>/path/to/write" df.write.format("parquet").mode('overwrite').save(outputPath)In this example, replace the following parameters based on your business requirements:
yourBucket: specifies the name of an OSS bucket.pathIn: specifies the path of the file from which you want to read data.outputPath: specifies the path of the file to which you want to write data.
Execute Spark SQL statements to create a CSV file and store the file in OSS
The following example shows how to execute Spark SQL statements to create a CSV file and store the file in OSS.
Log on to the master node of your cluster in SSH mode. For more information, see Log on to the master node of a cluster.
Run the following command to open the Spark SQL CLI:
spark-sqlAfter you modify the parameters in the following code based on your business requirements, run the code in Spark SQL.
CREATE DATABASE test_db LOCATION "oss://<yourBucket>/test_db"; USE test_db; CREATE TABLE student (id INT, name STRING, age INT) USING CSV options ("delimiter"=";", "header"="true"); INSERT INTO student VALUES(1, "ab", 12); SELECT * FROM student;Parameters in the code:
yourBucket: specifies the name of an OSS bucket.delimiter: specifies the delimiter that is used to separate data in the CSV file.header: specifies whether the first row in the CSV file is table headers. The valuetrueindicates that the first row in the CSV file is table headers. The valuefalseindicates that the first row in the CSV file is not table headers.
After the Spark SQL statements are executed, the following result is returned:
1 ab 12View the CSV file stored in OSS.
The following code provides the information contained in the CSV file. The first row is table headers that are separated by semicolon (;). Sample code:
id;name;age 1;ab;12
Read data from and write data to OSS by explicitly specifying an AccessKey pair
The following example shows how to explicitly specify an AccessKey pair for an EMR cluster to access OSS.
Delete the password-free configuration.
You can access OSS from an EMR cluster without entering a password. If you want to delete the password-free configuration, you must remove the setting of the fs.oss.credentials.provider parameter from the core-site.xml file of the Hadoop-Common service.
Run the following command to check whether password-free access is available:
hadoop fs -ls oss://<yourBucket>/test_dbThe following information is returned, which indicates that you cannot access OSS after you remove the setting of the fs.oss.credentials.provider parameter.
ls: ERROR: not found login secrets, please configure the accessKeyId and accessKeySecret.Explicitly specify an AccessKey pair for the EMR cluster to access OSS.
To access OSS with an AccessKey pair after you delete the password-free configuration, you must add the following parameters related to an AccessKey pair to the core-site.xml file of the Hadoop-Common service.
Key
Value
Description
fs.oss.accessKeyId
LTAI5tM85Z4sc****
The AccessKey ID.
fs.oss.accessKeySecret
HF7P1L8PS6Eqf****
The AccessKey secret.
After you explicitly specify the AccessKey pair that is used to access OSS, run the following command to check whether the AccessKey pair takes effect:
hadoop fs -ls oss://<yourBucket>/test_dbIf the following information is returned, you can view the path of the OSS file.
drwxrwxrwx - root root 0 2025-02-24 11:45 oss://<yourBucket>/test_db/studentRestart Spark-related services. After Spark-related services start to run normally, you can use Spark RDD, PySpark, or Spark SQL to read data from and write data to OSS.