すべてのプロダクト
Search
ドキュメントセンター

E-MapReduce:Spark を使用して OSS にアクセスする

最終更新日:Apr 03, 2025

Spark と Object Storage Service (OSS) を統合することで、Alibaba Cloud E-MapReduce (EMR) はクラウドベースのデータレイクのデータの効率的な処理と分析を実現します。EMR では、AccessKey ペアを指定せずに、または AccessKey ペアを明示的に指定することで、OSS からデータを読み取り、OSS にデータを書き込むことができます。このトピックでは、Spark を使用して OSS データを処理および分析する方法について説明します。

AccessKey ペアを指定せずに OSS からデータを読み取り、OSS にデータを書き込む

Spark Resilient Distributed Dataset (RDD) を使用して OSS からデータを読み取り、OSS にデータを書き込む

次の例は、AccessKey ペアを指定せずに、Spark を使用して OSS からデータを読み取り、処理済みデータを OSS に書き戻す方法を示しています。

  1. SSH モードでクラスタのマスターノードにログオンします。詳細については、「クラスタのマスターノードにログオンする」をご参照ください。

  2. 次のコマンドを実行して、Spark Shell を起動します。

    spark-shell
  3. ビジネス要件に基づいて次のコードのパラメーターを変更した後、Spark Shell で次の Scala コードを実行して、OSS からデータを読み取り、OSS にデータを書き込みます。

    import org.apache.spark.{SparkConf, SparkContext}
    val conf = new SparkConf().setAppName("Test OSS")
    val sc = new SparkContext(conf)
    val pathIn = "oss://<yourBucket>/path/to/read"
    val inputData = sc.textFile(pathIn)
    val cnt = inputData.count
    println(s"count: $cnt")
    val outputPath = "oss://<yourBucket>/path/to/write"
    val outputData = inputData.map(e => s"$e has been processed.")
    outputData.saveAsTextFile(outputPath)

    この例では、ビジネス要件に基づいて次のパラメーターを置き換えます。

    • yourBucket: OSS バケットの名前を指定します。

    • pathIn: データを読み取るファイルのパスを指定します。

    • outputPath: データを書き込むファイルのパスを指定します。

    完全なサンプルコードについては、GitHub にアクセスしてください。

PySpark を使用して OSS からデータを読み取り、OSS にデータを書き込む

次の例は、PySpark を使用して OSS からデータを読み取り、処理済みデータを AccessKey ペアを指定せずに OSS に書き戻す方法を示しています。

  1. SSH モードでクラスタのマスターノードにログオンします。詳細については、「クラスタのマスターノードにログオンする」をご参照ください。

  2. 次のコマンドを実行して、PySpark のインタラクティブ環境に移動します。

    pyspark
  3. ビジネス要件に基づいて次のコードのパラメーターを変更した後、PySpark でコードを実行します。

    from pyspark.sql import SparkSession
    spark = SparkSession.builder.appName("Python Spark SQL OSS example").getOrCreate()
    pathIn = "oss://<yourBucket>/path/to/read"
    df = spark.read.text(pathIn)
    cnt = df.count()
    print(cnt)
    outputPath = "oss://<yourBucket>/path/to/write"
    df.write.format("parquet").mode('overwrite').save(outputPath)
    

    この例では、ビジネス要件に基づいて次のパラメーターを置き換えます。

    • yourBucket: OSS バケットの名前を指定します。

    • pathIn: データを読み取るファイルのパスを指定します。

    • outputPath: データを書き込むファイルのパスを指定します。

Spark SQL ステートメントを実行して CSV ファイルを作成し、OSS に保存する

次の例は、Spark SQL ステートメントを実行して CSV ファイルを作成し、OSS に保存する方法を示しています。

  1. SSH モードでクラスタのマスターノードにログオンします。詳細については、「クラスタのマスターノードにログオンする」をご参照ください。

  2. 次のコマンドを実行して、Spark SQL CLI を開きます。

    spark-sql
  3. ビジネス要件に基づいて次のコードのパラメーターを変更した後、Spark SQL でコードを実行します。

    CREATE DATABASE test_db LOCATION "oss://<yourBucket>/test_db";
    USE test_db;
    CREATE TABLE student (id INT, name STRING, age INT)
        USING CSV options ("delimiter"=";",  "header"="true");
    INSERT INTO student VALUES(1, "ab", 12);
    SELECT * FROM student;     

    コードのパラメーター:

    • yourBucket: OSS バケットの名前を指定します。

    • delimiter: CSV ファイルのデータを区切るために使用するデリミタを指定します。

    • header: CSV ファイルの最初の行がテーブルヘッダーであるかどうかを指定します。true 値は、CSV ファイルの最初の行がテーブルヘッダーであることを示します。false 値は、CSV ファイルの最初の行がテーブルヘッダーではないことを示します。

    Spark SQL ステートメントが実行されると、次の結果が返されます。

    1    ab    12     
  4. OSS に保存されている CSV ファイルを表示します。

    次のコードは、CSV ファイルに含まれる情報を提供します。最初の行は、セミコロン (;) で区切られたテーブルヘッダーです。サンプルコード:

    id;name;age
    1;ab;12

AccessKey ペアを明示的に指定して OSS からデータを読み取り、OSS にデータを書き込む

次の例は、OSS にアクセスするために EMR クラスタの AccessKey ペアを明示的に指定する方法を示しています。

  1. パスワードなしの構成を削除します。

    パスワードを入力せずに EMR クラスタから OSS にアクセスできます。パスワードなしの構成を削除する場合は、Hadoop-Common サービスの core-site.xml ファイルから fs.oss.credentials.provider パラメーターの設定を削除する必要があります。

  2. 次のコマンドを実行して、パスワードなしのアクセスが利用可能かどうかを確認します。

    hadoop fs -ls oss://<yourBucket>/test_db

    次の情報が返されます。これは、fs.oss.credentials.provider パラメーターの設定を削除した後に OSS にアクセスできないことを示しています。

    ls: ERROR: not found login secrets, please configure the accessKeyId and accessKeySecret.
  3. OSS にアクセスするために EMR クラスタの AccessKey ペアを明示的に指定します。

    パスワードなしの構成を削除した後に AccessKey ペアを使用して OSS にアクセスするには、Hadoop-Common サービスの core-site.xml ファイルに AccessKey ペアに関連する次のパラメーターを追加する必要があります。

    キー

    説明

    fs.oss.accessKeyId

    LTAI5tM85Z4sc****

    AccessKey ID。

    fs.oss.accessKeySecret

    HF7P1L8PS6Eqf****

    AccessKey シークレット。

  4. OSS にアクセスするために使用する AccessKey ペアを明示的に指定した後、次のコマンドを実行して AccessKey ペアが有効になっているかどうかを確認します。

    hadoop fs -ls oss://<yourBucket>/test_db

    次の情報が返された場合、OSS ファイルのパスを表示できます。

    drwxrwxrwx   - root root          0 2025-02-24 11:45 oss://<yourBucket>/test_db/student
  5. Spark 関連サービスを再起動します。Spark 関連サービスが正常に実行を開始したら、Spark RDD、PySpark、または Spark SQL を使用して OSS からデータを読み取り、OSS にデータを書き込むことができます。

よくある質問

Spark を使用してバケット A からデータを読み取り、バケット B にデータを書き込むにはどうすればよいですか?

  • バケット A とバケット B で異なる認証情報を使用する場合は、バケットレベルの認証情報プロバイダーを構成できます。たとえば、fs.oss.bucket.<BucketName>.credentials.provider パラメーターを構成できます。<BucketName> は、構成する OSS バケットの名前を指定します。詳細については、「OSS または OSS-HDFS の認証情報プロバイダーをバケットごとに構成する」をご参照ください。

  • バケット A とバケット B が異なるリージョンにある場合は、oss://<BucketName>.<バケットのパブリックエンドポイント>/ を使用して、Spark が OSS からデータを読み取り、OSS にデータを書き込むことができるようにします。この方法を使用すると、トラフィック料金が発生し、安定性の問題が発生します。

Amazon S3 SDK を使用して OSS にアクセスするにはどうすればよいですか?

OSS は、Amazon Simple Storage Service (Amazon S3) と互換性のある API 操作を提供します。データを Amazon S3 から OSS に移行した後、簡単な構成変更を行うことで、クライアント アプリケーションを OSS と互換性を持たせることができます。詳細については、「Amazon S3 SDK を使用して OSS にアクセスする」をご参照ください。