This topic describes how to use Spark to read data from Object Storage Service (OSS).

Background information

E-MapReduce (EMR) provides the following methods for you to access OSS:
  • You can access OSS by using MetaService.
  • You can access OSS without using an AccessKey pair.
  • You can access OSS by explicitly writing an AccessKey pair and an endpoint.
    Note You must use an internal endpoint of OSS. For more information about endpoints, see OSS endpoints.

Example of using Spark RDD to access OSS

The following example shows how to use Spark to read data from OSS and write the processed data back to OSS without using an AccessKey pair:
val conf = new SparkConf().setAppName("Test OSS")
val sc = new SparkContext(conf)
val pathIn = "oss://bucket/path/to/read"
val inputData = sc.textFile(pathIn)
val cnt = inputData.count
println(s"count: $cnt")
val outputPath = "oss://bucket/path/to/write"
val outpuData = inputData.map(e => s"$e has been processed.")
outpuData.saveAsTextFile(outputPath)

For the complete sample code, visit GitHub.

Example of using PySpark to access OSS

The following example shows how to use PySpark to read data from OSS and write the processed data back to OSS without using an AccessKey pair:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("Python Spark SQL OSS example").getOrCreate()
pathIn = "oss://bucket/path/to/read"
df = spark.read.text(pathIn)
cnt = df.count()
print(cnt)
outputPath = "oss://bucket/path/to/write"
df.write.format("parquet").mode('overwrite').save(outputPath)

Example of executing Spark SQL statements to create a CSV file

The following example shows how to execute Spark SQL statements to create a CSV file and store the file in OSS.
  1. Execute the following Spark SQL statements to create a CSV file:
    create database test_db location "oss://test_bucket/test_db";
    use test_db;
    CREATE TABLE student (id INT, name STRING, age INT)
        USING CSV options ("delimiter"=";",  "header"="true");
    insert into student values(1,"ab",12);
    select * from student;
    1    ab    12                
    • bucket_test: specifies the name of the OSS bucket that is used for the test.
    • delimiter: specifies the delimiter that is used to separate texts in the CSV file.
    • header: specifies whether the first row in the CSV file is table headers. The value true indicates that the first row in the CSV file is table headers. The value false indicates that the first row in the CSV file is not table headers.
  2. View the CSV file stored in OSS. The CSV file contains the following information. The first row is table headers.
    id;name;age
    1;ab;12

Use an AccessKey pair to access OSS

The following example shows how to specify an AccessKey pair for an EMR cluster to access OSS.
  1. Change the password-free configuration.

    You can access OSS from an EMR cluster without using a password. You can determine whether to access OSS without using a password by configuring the fs.oss.credentials.provider parameter in the core-site.xml file of Hadoop Distributed File System (HDFS).

    If you want to access OSS by using an AccessKey pair, you can remove the setting of the fs.oss.credentials.provider parameter from the core-site.xml file and add the following parameter settings to the core-site.xml file.
    <property>
      <name>fs.oss.accessKeyId</name>
      <value>LTAI5tM85Z4sc****</value>
    </property>
    <property>
      <name>fs.oss.accessKeySecret</name>
      <value>HF7P1L8PS6Eqf****</value>
    </property>
  2. Run the following command to check whether the configuration takes effect:
    After you remove the setting of the fs.oss.credentials.provider parameter, you cannot access OSS by running the ls command.
    hadoop fs -ls oss://test_bucket/test_db
    ls: ERROR: without login secrets configured.
    After the AccessKey pair configurations are added, you can view the directories of OSS objects.
    hadoop fs -ls oss://test_bucket/test_db
    drwxrwxrwx   - root root          0 2022-11-30 12:51 oss://test_bucket/test_db/student
  3. Restart components related to Spark and check whether the components run as expected.