All Products
Search
Document Center

E-MapReduce:Use Spark to access OSS

Last Updated:Jan 17, 2024

You can use Spark to access Object Storage Service (OSS) and process and analyze OSS data in an efficient and flexible manner. This way, big data processing is integrated with cloud storage. This topic describes how to use Spark to process and analyze OSS data.

Background information

E-MapReduce (EMR) provides the following methods for you to access OSS:

  • You can access OSS by using MetaService.

  • You can access OSS without using an AccessKey pair.

  • You can access OSS by explicitly writing an AccessKey pair and an endpoint.

    Note

    You must use an internal endpoint of OSS. For more information about endpoints, see OSS endpoints.

Example of using Spark RDD to access OSS

The following example shows how to use Spark to read data from OSS and write the processed data back to OSS without using an AccessKey pair:

Run the following code in Spark Shell:

val conf = new SparkConf().setAppName("Test OSS")
val sc = new SparkContext(conf)
val pathIn = "oss://bucket/path/to/read"
val inputData = sc.textFile(pathIn)
val cnt = inputData.count
println(s"count: $cnt")
val outputPath = "oss://bucket/path/to/write"
val outpuData = inputData.map(e => s"$e has been processed.")
outpuData.saveAsTextFile(outputPath)

For the complete sample code, visit GitHub.

In this example, replace the following parameters based on your business requirements:

  • pathIn: specifies the path of the file from which you want to read data.

  • outputPath: specifies the path of the file to which you want to write data.

Example of using PySpark to access OSS

The following example shows how to use PySpark to read data from OSS and write the processed data back to OSS without using an AccessKey pair:

from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("Python Spark SQL OSS example").getOrCreate()
pathIn = "oss://bucket/path/to/read"
df = spark.read.text(pathIn)
cnt = df.count()
print(cnt)
outputPath = "oss://bucket/path/to/write"
df.write.format("parquet").mode('overwrite').save(outputPath)

Example of executing Spark SQL statements to create a CSV file

The following example shows how to execute Spark SQL statements to create a CSV file and store the file in OSS.

  1. Execute the following Spark SQL statements to create a CSV file:

    CREATE DATABASE test_db LOCATION "oss://test_bucket/test_db";
    USE test_db;
    CREATE TABLE student (id INT, name STRING, age INT)
        USING CSV options ("delimiter"=";",  "header"="true");
    INSERT INTO student VALUES(1, "ab", 12);
    SELECT * FROM student;     

    After the Spark SQL statements are executed, the following result is returned:

    1    ab    12     

    Parameters in the Spark SQL statements:

    • test_bucket: specifies the name of the OSS bucket.

    • delimiter: specifies the delimiter that is used to separate data in the CSV file.

    • header: specifies whether the first row in the CSV file is table headers. The value true indicates that the first row in the CSV file is table headers. The value false indicates that the first row in the CSV file is not table headers.

  2. View the CSV file stored in OSS.

    The following code provides the information contained in the CSV file. The first row is table headers that are separated by semicolon (;).

    id;name;age
    1;ab;12

Use an AccessKey pair to access OSS

The following example shows how to specify an AccessKey pair for an EMR cluster to access OSS.

  1. Change the password-free configuration.

    You can access OSS from an EMR cluster without using a password. You can determine whether to access OSS without using a password by configuring the fs.oss.credentials.provider parameter in the core-site.xml file of Hadoop Distributed File System (HDFS).

    If you want to access OSS by using an AccessKey pair, you must remove the setting of the fs.oss.credentials.provider parameter from the core-site.xml file and add the settings of the AccessKey pair to the core-site.xml file.

    <property>
      <name>fs.oss.accessKeyId</name>
      <value>LTAI5tM85Z4sc****</value>
    </property>
    <property>
      <name>fs.oss.accessKeySecret</name>
      <value>HF7P1L8PS6Eqf****</value>
    </property>
  2. Run the following command to check whether the configuration takes effect:

    hadoop fs -ls oss://test_bucket/test_db

    The following information is returned, which indicates that you cannot access OSS after you remove the setting of the fs.oss.credentials.provider parameter.

    ls: ERROR: without login secrets configured.

    After the settings of the AccessKey pair are added, you can run the hadoop fs -ls oss://test_bucket/test_db command again to view the directories of OSS objects.

    drwxrwxrwx   - root root          0 2022-11-30 12:51 oss://test_bucket/test_db/student
  3. Restart components related to Spark and check whether the components run as expected.