すべてのプロダクト
Search
ドキュメントセンター

E-MapReduce:Spark を使用して MaxCompute にアクセスする

最終更新日:Jan 11, 2025

このトピックでは、Spark を使用して MaxCompute からデータを読み書きする方法について説明します。

手順

  1. OdpsOps オブジェクトを初期化します。

    Spark では、OdpsOps クラスを使用して MaxCompute のデータを管理します。

    import com.aliyun.odps.TableSchema
         import com.aliyun.odps.data.Record
         import org.apache.spark.aliyun.odps.OdpsOps
         import org.apache.spark.{SparkContext, SparkConf}
         object Sample {
           def main(args: Array[String]): Unit = {    
             // == ステップ 1 ==
             val accessKeyId = System.getenv("ALIBABA_CLOUD_ACCESS_KEY_ID")
             val accessKeySecret = System.getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET")
             // 内部 URL を例として使用します。
             val urls = Seq("http://odps-ext.aliyun-inc.com/api", "http://dt-ext.odps.aliyun-inc.com") 
             val conf = new SparkConf().setAppName("Test Odps")
             val sc = new SparkContext(conf)
             val odpsOps = OdpsOps(sc, accessKeyId, accessKeySecret, urls(0), urls(1))
             // 呼び出しのコード:
             // == ステップ 2 ==
             ...
             // == ステップ 3 ==
             ...
           }
         }
    説明

    サンプルコードを実行する前に、環境変数を構成する必要があります。環境変数の構成方法の詳細については、このトピックの環境変数を構成するセクションをご参照ください。

  2. MaxCompute から Spark にテーブルデータを読み込みます。

    OdpsOps オブジェクトの readTable メソッドを使用して、MaxCompute から Spark にテーブルデータを読み込みます。

    // == ステップ 2 ==
             val project = <odps-project>
             val table = <odps-table>
             val numPartitions = 2
             val inputData = odpsOps.readTable(project, table, read, numPartitions)
             inputData.top(10).foreach(println)
             // == ステップ 3 ==
             ...

    上記のコードでは、MaxCompute のテーブルデータを解析および前処理するための read 関数を定義する必要があります。read 関数を定義するコード:

    def read(record: Record, schema: TableSchema): String = {
               record.getString(0)
             }
  3. Spark の結果データを MaxCompute テーブルに保存します。

    OdpsOps オブジェクトの saveToTable メソッドを使用して、Spark の結果データを MaxCompute テーブルに保存します。

    val resultData = inputData.map(e => s"$e has been processed.")
             odpsOps.saveToTable(project, table, dataRDD, write)

    上記のコードでは、データを前処理するための write 関数を定義する必要があります。write 関数を定義するコード:

    def write(s: String, emptyRecord: Record, schema: TableSchema): Unit = {
               val r = emptyRecord
               r.set(0, s)
             }
  4. パーティションテーブルのパラメーターの形式に注意してください。

    Spark を使用して MaxCompute のパーティションテーブルからデータを読み書きする場合、パーティションキー列名 = パーティション名形式でパーティションを指定する必要があります。複数のパーティションが関係する場合は、コンマ (,) で区切ります。

    • 例 1: pt が 1 であるパーティションからデータを読み取るには、pt='1' を使用します。

    • 例 2: pt が 1 であるパーティションと ps が 2 であるパーティションからデータを読み取るには、pt='1', ps='2' を使用します。

付録

完全なサンプルコードについては、GitHub にアクセスしてください。