全部產品
Search
文件中心

E-MapReduce:Spark對接OSS

更新時間:Mar 28, 2025

通過整合Spark與OSS,阿里雲EMR實現了對雲端資料湖的高效處理與分析。EMR支援通過免AccessKey和顯式AccessKey兩種方式讀寫OSS資料。本文主要介紹Spark如何處理和分析OSS中的資料。

免AccessKey方式讀寫OSS

通過Spark RDD讀寫OSS資料

本樣本為您展示,Spark如何以免AccessKey方式讀取OSS中資料,並將處理完的資料寫回至OSS。

  1. 通過SSH方式串連叢集的Master節點,具體操作請參見登入叢集Master節點

  2. 執行以下命令,啟動Spark Shell。

    spark-shell
  3. 根據實際情況修改下面代碼中的參數後,在Spark Shell中運行以下Scala代碼讀寫OSS資料。

    import org.apache.spark.{SparkConf, SparkContext}
    val conf = new SparkConf().setAppName("Test OSS")
    val sc = new SparkContext(conf)
    val pathIn = "oss://<yourBucket>/path/to/read"
    val inputData = sc.textFile(pathIn)
    val cnt = inputData.count
    println(s"count: $cnt")
    val outputPath = "oss://<yourBucket>/path/to/write"
    val outputData = inputData.map(e => s"$e has been processed.")
    outputData.saveAsTextFile(outputPath)

    本文樣本中以下參數請根據您的實際情況替換:

    • yourBucket:OSS Bucket的名稱。

    • pathIn:要讀取檔案的路徑。

    • outputPath:要寫入的檔案路徑。

    完整範例程式碼,請參見Spark對接OSS

通過PySpark讀寫OSS資料

本樣本為您展示,PySpark如何以免AccessKey方式讀取OSS中資料,並將處理完的資料寫回至OSS。

  1. 通過SSH方式串連叢集的Master節點,具體操作請參見登入叢集Master節點

  2. 執行以下命令,進入PySpark互動式環境。

    pyspark
  3. 根據實際情況修改下面代碼中的參數後,在PySpark環境中運行代碼。

    from pyspark.sql import SparkSession
    spark = SparkSession.builder.appName("Python Spark SQL OSS example").getOrCreate()
    pathIn = "oss://<yourBucket>/path/to/read"
    df = spark.read.text(pathIn)
    cnt = df.count()
    print(cnt)
    outputPath = "oss://<yourBucket>/path/to/write"
    df.write.format("parquet").mode('overwrite').save(outputPath)
    

    本文樣本中以下參數請根據您的實際情況替換:

    • yourBucket:OSS Bucket的名稱。

    • pathIn:要讀取檔案的路徑。

    • outputPath:要寫入的檔案路徑。

通過Spark-SQL建立CSV表寫入OSS

本樣本為您展示Spark-SQL如何建立CSV表,並存放至OSS,操作步驟如下。

  1. 通過SSH方式串連叢集的Master節點,具體操作請參見登入叢集Master節點

  2. 執行以下命令,進入Spark SQL命令列。

    spark-sql
  3. 根據實際情況修改下面代碼中的參數後,在Spark SQL環境中運行代碼。

    CREATE DATABASE test_db LOCATION "oss://<yourBucket>/test_db";
    USE test_db;
    CREATE TABLE student (id INT, name STRING, age INT)
        USING CSV options ("delimiter"=";",  "header"="true");
    INSERT INTO student VALUES(1, "ab", 12);
    SELECT * FROM student;     

    命令中涉及的參數如下:

    • yourBucket:OSS Bucket的名稱。

    • delimiter:指定CSV檔案中資料的分隔字元。

    • header:指定CSV檔案中第一行是否是表頭,可設定true表示是,false表示否。

    執行完以上命令,返回以下資訊。

    1    ab    12     
  4. 查看儲存在OSS上的CSV檔案。

    CSV檔案的第一行包含表頭,並使用分號(;)作為欄位的分隔字元。樣本內容如下。

    id;name;age
    1;ab;12

顯式AccessKey方式讀寫OSS

本樣本將為您展示叢集如何顯式指定訪問OSS的AccessKey,操作步驟如下。

  1. 刪除免密配置。

    EMR叢集預設是通過免密配置訪問OSS。如果要取消免密訪問,您需要刪除Hadoop-Common服務core-site.xml中的fs.oss.credentials.provider配置項。

  2. 執行以下命令驗證是否可免密訪問。

    hadoop fs -ls oss://<yourBucket>/test_db

    返回以下資訊,發現無法訪問OSS。

    ls: ERROR: not found login secrets, please configure the accessKeyId and accessKeySecret.
  3. 顯式指定訪問OSS的AccessKey。

    去掉免密配置後,使用AccessKey訪問,您需要在Hadoop-Common服務的core-site.xml中增加下面AccessKey的配置項。

    Key

    Value

    描述

    fs.oss.accessKeyId

    yourAccessKeyID

    阿里雲帳號或RAM使用者的AccessKey ID。

    fs.oss.accessKeySecret

    yourAccessKeySecret

    阿里雲帳號或RAM使用者的AccessKey Secret。

  4. 顯式指定訪問OSS的AccessKey後,執行以下命令驗證是否生效。

    hadoop fs -ls oss://<yourBucket>/test_db

    返回以下資訊,則能夠查看OSS檔案目錄。

    drwxrwxrwx   - root root          0 2025-02-24 11:45 oss://<yourBucket>/test_db/student
  5. 重啟Spark相關服務,待Spark相關服務正常後即可通過Spark RDD、PySpark、Spark-SQL等方式讀寫OSS資料。

常見問題

讀取和寫入的OSS Bucket不同,要如何通過Spark讀寫OSS?

  • 如果讀取和寫入的Bucket要使用不同的credential,您可以配置Bucket層級的Credential Provider,例如,您可以配置fs.oss.bucket.<BucketName>.credentials.provider,其中<BucketName>為要配置的OSS Bucket名稱。具體可參考按Bucket配置OSS/OSS-HDFS Credential Provider

  • 如果讀取和寫入的Bucket屬於不同的Region,您可以通過oss://<BucketName>.<外網訪問Endpoint>/方式實現Spark讀寫OSS。該方式會產生流量費用,同時會存在穩定性問題,使用時需注意。

Amazon S3 SDK如何訪問OSS?

Object Storage Service提供了相容Amazon S3的API。當您將資料從Amazon S3遷移到OSS後,只需簡單的配置修改,即可讓您的用戶端應用相容OSS服務。具體請參見使用Amazon S3 SDK訪問OSS