通過整合Spark與OSS,阿里雲EMR實現了對雲端資料湖的高效處理與分析。EMR支援通過免AccessKey和顯式AccessKey兩種方式讀寫OSS資料。本文主要介紹Spark如何處理和分析OSS中的資料。
免AccessKey方式讀寫OSS
通過Spark RDD讀寫OSS資料
本樣本為您展示,Spark如何以免AccessKey方式讀取OSS中資料,並將處理完的資料寫回至OSS。
通過SSH方式串連叢集的Master節點,具體操作請參見登入叢集Master節點。
執行以下命令,啟動Spark Shell。
spark-shell根據實際情況修改下面代碼中的參數後,在Spark Shell中運行以下Scala代碼讀寫OSS資料。
import org.apache.spark.{SparkConf, SparkContext} val conf = new SparkConf().setAppName("Test OSS") val sc = new SparkContext(conf) val pathIn = "oss://<yourBucket>/path/to/read" val inputData = sc.textFile(pathIn) val cnt = inputData.count println(s"count: $cnt") val outputPath = "oss://<yourBucket>/path/to/write" val outputData = inputData.map(e => s"$e has been processed.") outputData.saveAsTextFile(outputPath)本文樣本中以下參數請根據您的實際情況替換:
yourBucket:OSS Bucket的名稱。pathIn:要讀取檔案的路徑。outputPath:要寫入的檔案路徑。
完整範例程式碼,請參見Spark對接OSS。
通過PySpark讀寫OSS資料
本樣本為您展示,PySpark如何以免AccessKey方式讀取OSS中資料,並將處理完的資料寫回至OSS。
通過SSH方式串連叢集的Master節點,具體操作請參見登入叢集Master節點。
執行以下命令,進入PySpark互動式環境。
pyspark根據實際情況修改下面代碼中的參數後,在PySpark環境中運行代碼。
from pyspark.sql import SparkSession spark = SparkSession.builder.appName("Python Spark SQL OSS example").getOrCreate() pathIn = "oss://<yourBucket>/path/to/read" df = spark.read.text(pathIn) cnt = df.count() print(cnt) outputPath = "oss://<yourBucket>/path/to/write" df.write.format("parquet").mode('overwrite').save(outputPath)本文樣本中以下參數請根據您的實際情況替換:
yourBucket:OSS Bucket的名稱。pathIn:要讀取檔案的路徑。outputPath:要寫入的檔案路徑。
通過Spark-SQL建立CSV表寫入OSS
本樣本為您展示Spark-SQL如何建立CSV表,並存放至OSS,操作步驟如下。
通過SSH方式串連叢集的Master節點,具體操作請參見登入叢集Master節點。
執行以下命令,進入Spark SQL命令列。
spark-sql根據實際情況修改下面代碼中的參數後,在Spark SQL環境中運行代碼。
CREATE DATABASE test_db LOCATION "oss://<yourBucket>/test_db"; USE test_db; CREATE TABLE student (id INT, name STRING, age INT) USING CSV options ("delimiter"=";", "header"="true"); INSERT INTO student VALUES(1, "ab", 12); SELECT * FROM student;命令中涉及的參數如下:
yourBucket:OSS Bucket的名稱。delimiter:指定CSV檔案中資料的分隔字元。header:指定CSV檔案中第一行是否是表頭,可設定true表示是,false表示否。
執行完以上命令,返回以下資訊。
1 ab 12查看儲存在OSS上的CSV檔案。
CSV檔案的第一行包含表頭,並使用分號(;)作為欄位的分隔字元。樣本內容如下。
id;name;age 1;ab;12
顯式AccessKey方式讀寫OSS
本樣本將為您展示叢集如何顯式指定訪問OSS的AccessKey,操作步驟如下。
刪除免密配置。
EMR叢集預設是通過免密配置訪問OSS。如果要取消免密訪問,您需要刪除Hadoop-Common服務core-site.xml中的fs.oss.credentials.provider配置項。
執行以下命令驗證是否可免密訪問。
hadoop fs -ls oss://<yourBucket>/test_db返回以下資訊,發現無法訪問OSS。
ls: ERROR: not found login secrets, please configure the accessKeyId and accessKeySecret.顯式指定訪問OSS的AccessKey。
去掉免密配置後,使用AccessKey訪問,您需要在Hadoop-Common服務的core-site.xml中增加下面AccessKey的配置項。
Key
Value
描述
fs.oss.accessKeyId
yourAccessKeyID
阿里雲帳號或RAM使用者的AccessKey ID。
fs.oss.accessKeySecret
yourAccessKeySecret
阿里雲帳號或RAM使用者的AccessKey Secret。
顯式指定訪問OSS的AccessKey後,執行以下命令驗證是否生效。
hadoop fs -ls oss://<yourBucket>/test_db返回以下資訊,則能夠查看OSS檔案目錄。
drwxrwxrwx - root root 0 2025-02-24 11:45 oss://<yourBucket>/test_db/student重啟Spark相關服務,待Spark相關服務正常後即可通過Spark RDD、PySpark、Spark-SQL等方式讀寫OSS資料。