このトピックでは、Spark を使用して MaxCompute からデータを読み書きする方法について説明します。
手順
OdpsOps オブジェクトを初期化します。
Spark では、OdpsOps クラスを使用して MaxCompute のデータを管理します。
import com.aliyun.odps.TableSchema import com.aliyun.odps.data.Record import org.apache.spark.aliyun.odps.OdpsOps import org.apache.spark.{SparkContext, SparkConf} object Sample { def main(args: Array[String]): Unit = { // == ステップ 1 == val accessKeyId = System.getenv("ALIBABA_CLOUD_ACCESS_KEY_ID") val accessKeySecret = System.getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET") // 内部 URL を例として使用します。 val urls = Seq("http://odps-ext.aliyun-inc.com/api", "http://dt-ext.odps.aliyun-inc.com") val conf = new SparkConf().setAppName("Test Odps") val sc = new SparkContext(conf) val odpsOps = OdpsOps(sc, accessKeyId, accessKeySecret, urls(0), urls(1)) // 呼び出しのコード: // == ステップ 2 == ... // == ステップ 3 == ... } }説明サンプルコードを実行する前に、環境変数を構成する必要があります。環境変数の構成方法の詳細については、このトピックの環境変数を構成するセクションをご参照ください。
MaxCompute から Spark にテーブルデータを読み込みます。
OdpsOps オブジェクトの readTable メソッドを使用して、MaxCompute から Spark にテーブルデータを読み込みます。
// == ステップ 2 == val project = <odps-project> val table = <odps-table> val numPartitions = 2 val inputData = odpsOps.readTable(project, table, read, numPartitions) inputData.top(10).foreach(println) // == ステップ 3 == ...上記のコードでは、MaxCompute のテーブルデータを解析および前処理するための read 関数を定義する必要があります。read 関数を定義するコード:
def read(record: Record, schema: TableSchema): String = { record.getString(0) }Spark の結果データを MaxCompute テーブルに保存します。
OdpsOps オブジェクトの saveToTable メソッドを使用して、Spark の結果データを MaxCompute テーブルに保存します。
val resultData = inputData.map(e => s"$e has been processed.") odpsOps.saveToTable(project, table, dataRDD, write)上記のコードでは、データを前処理するための write 関数を定義する必要があります。write 関数を定義するコード:
def write(s: String, emptyRecord: Record, schema: TableSchema): Unit = { val r = emptyRecord r.set(0, s) }パーティションテーブルのパラメーターの形式に注意してください。
Spark を使用して MaxCompute のパーティションテーブルからデータを読み書きする場合、パーティションキー列名 = パーティション名形式でパーティションを指定する必要があります。複数のパーティションが関係する場合は、コンマ (,) で区切ります。
例 1: pt が 1 であるパーティションからデータを読み取るには、pt='1' を使用します。
例 2: pt が 1 であるパーティションと ps が 2 であるパーティションからデータを読み取るには、pt='1', ps='2' を使用します。
付録
完全なサンプルコードについては、GitHub にアクセスしてください。