Spark と Object Storage Service (OSS) を統合することで、Alibaba Cloud E-MapReduce (EMR) はクラウドベースのデータレイクのデータの効率的な処理と分析を実現します。EMR では、AccessKey ペアを指定せずに、または AccessKey ペアを明示的に指定することで、OSS からデータを読み取り、OSS にデータを書き込むことができます。このトピックでは、Spark を使用して OSS データを処理および分析する方法について説明します。
AccessKey ペアを指定せずに OSS からデータを読み取り、OSS にデータを書き込む
Spark Resilient Distributed Dataset (RDD) を使用して OSS からデータを読み取り、OSS にデータを書き込む
次の例は、AccessKey ペアを指定せずに、Spark を使用して OSS からデータを読み取り、処理済みデータを OSS に書き戻す方法を示しています。
SSH モードでクラスタのマスターノードにログオンします。詳細については、「クラスタのマスターノードにログオンする」をご参照ください。
次のコマンドを実行して、Spark Shell を起動します。
spark-shellビジネス要件に基づいて次のコードのパラメーターを変更した後、Spark Shell で次の Scala コードを実行して、OSS からデータを読み取り、OSS にデータを書き込みます。
import org.apache.spark.{SparkConf, SparkContext} val conf = new SparkConf().setAppName("Test OSS") val sc = new SparkContext(conf) val pathIn = "oss://<yourBucket>/path/to/read" val inputData = sc.textFile(pathIn) val cnt = inputData.count println(s"count: $cnt") val outputPath = "oss://<yourBucket>/path/to/write" val outputData = inputData.map(e => s"$e has been processed.") outputData.saveAsTextFile(outputPath)この例では、ビジネス要件に基づいて次のパラメーターを置き換えます。
yourBucket: OSS バケットの名前を指定します。pathIn: データを読み取るファイルのパスを指定します。outputPath: データを書き込むファイルのパスを指定します。
完全なサンプルコードについては、GitHub にアクセスしてください。
PySpark を使用して OSS からデータを読み取り、OSS にデータを書き込む
次の例は、PySpark を使用して OSS からデータを読み取り、処理済みデータを AccessKey ペアを指定せずに OSS に書き戻す方法を示しています。
SSH モードでクラスタのマスターノードにログオンします。詳細については、「クラスタのマスターノードにログオンする」をご参照ください。
次のコマンドを実行して、PySpark のインタラクティブ環境に移動します。
pysparkビジネス要件に基づいて次のコードのパラメーターを変更した後、PySpark でコードを実行します。
from pyspark.sql import SparkSession spark = SparkSession.builder.appName("Python Spark SQL OSS example").getOrCreate() pathIn = "oss://<yourBucket>/path/to/read" df = spark.read.text(pathIn) cnt = df.count() print(cnt) outputPath = "oss://<yourBucket>/path/to/write" df.write.format("parquet").mode('overwrite').save(outputPath)この例では、ビジネス要件に基づいて次のパラメーターを置き換えます。
yourBucket: OSS バケットの名前を指定します。pathIn: データを読み取るファイルのパスを指定します。outputPath: データを書き込むファイルのパスを指定します。
Spark SQL ステートメントを実行して CSV ファイルを作成し、OSS に保存する
次の例は、Spark SQL ステートメントを実行して CSV ファイルを作成し、OSS に保存する方法を示しています。
SSH モードでクラスタのマスターノードにログオンします。詳細については、「クラスタのマスターノードにログオンする」をご参照ください。
次のコマンドを実行して、Spark SQL CLI を開きます。
spark-sqlビジネス要件に基づいて次のコードのパラメーターを変更した後、Spark SQL でコードを実行します。
CREATE DATABASE test_db LOCATION "oss://<yourBucket>/test_db"; USE test_db; CREATE TABLE student (id INT, name STRING, age INT) USING CSV options ("delimiter"=";", "header"="true"); INSERT INTO student VALUES(1, "ab", 12); SELECT * FROM student;コードのパラメーター:
yourBucket: OSS バケットの名前を指定します。delimiter: CSV ファイルのデータを区切るために使用するデリミタを指定します。header: CSV ファイルの最初の行がテーブルヘッダーであるかどうかを指定します。true値は、CSV ファイルの最初の行がテーブルヘッダーであることを示します。false値は、CSV ファイルの最初の行がテーブルヘッダーではないことを示します。
Spark SQL ステートメントが実行されると、次の結果が返されます。
1 ab 12OSS に保存されている CSV ファイルを表示します。
次のコードは、CSV ファイルに含まれる情報を提供します。最初の行は、セミコロン (;) で区切られたテーブルヘッダーです。サンプルコード:
id;name;age 1;ab;12
AccessKey ペアを明示的に指定して OSS からデータを読み取り、OSS にデータを書き込む
次の例は、OSS にアクセスするために EMR クラスタの AccessKey ペアを明示的に指定する方法を示しています。
パスワードなしの構成を削除します。
パスワードを入力せずに EMR クラスタから OSS にアクセスできます。パスワードなしの構成を削除する場合は、Hadoop-Common サービスの core-site.xml ファイルから fs.oss.credentials.provider パラメーターの設定を削除する必要があります。
次のコマンドを実行して、パスワードなしのアクセスが利用可能かどうかを確認します。
hadoop fs -ls oss://<yourBucket>/test_db次の情報が返されます。これは、fs.oss.credentials.provider パラメーターの設定を削除した後に OSS にアクセスできないことを示しています。
ls: ERROR: not found login secrets, please configure the accessKeyId and accessKeySecret.OSS にアクセスするために EMR クラスタの AccessKey ペアを明示的に指定します。
パスワードなしの構成を削除した後に AccessKey ペアを使用して OSS にアクセスするには、Hadoop-Common サービスの core-site.xml ファイルに AccessKey ペアに関連する次のパラメーターを追加する必要があります。
キー
値
説明
fs.oss.accessKeyId
LTAI5tM85Z4sc****
AccessKey ID。
fs.oss.accessKeySecret
HF7P1L8PS6Eqf****
AccessKey シークレット。
OSS にアクセスするために使用する AccessKey ペアを明示的に指定した後、次のコマンドを実行して AccessKey ペアが有効になっているかどうかを確認します。
hadoop fs -ls oss://<yourBucket>/test_db次の情報が返された場合、OSS ファイルのパスを表示できます。
drwxrwxrwx - root root 0 2025-02-24 11:45 oss://<yourBucket>/test_db/studentSpark 関連サービスを再起動します。Spark 関連サービスが正常に実行を開始したら、Spark RDD、PySpark、または Spark SQL を使用して OSS からデータを読み取り、OSS にデータを書き込むことができます。