すべてのプロダクト
Search
ドキュメントセンター

MaxCompute:データセンターでSparkを使用してMaxComputeからデータを読み書きするプロセスをシミュレートする

最終更新日:Jul 09, 2025

既存のデータレイクハウスアーキテクチャでは、MaxComputeはHadoopクラスターからデータを読み書きするハブとして機能します。 データセンターがデプロイされているシナリオでは、クラスター情報をインターネットに公開したくない場合、開発者はオンプレミスのHadoopクラスターからクラウド内のデータにアクセスする必要があります。 このトピックでは、Alibaba Cloud E-MapReduce (EMR) クラスターをオンプレミスHadoopクラスターとして使用して、MaxComputeからデータを読み書きする方法について説明します。

背景情報

次の図は、このプラクティスで使用されるデータセンターアーキテクチャを示しています。 Simulated data center architecture

開発環境の準備

  • EMRクラスターを準備します。

    1. EMRクラスターを購入します。

      詳細については、「EMRの使い方」をご参照ください。

    2. EMRクラスターにログインします。

      説明

      EMRクラスターへのログイン方法の詳細については、「クラスターへのログイン」をご参照ください。

      この方法では、EMRクラスターのElastic Compute Service (ECS) インスタンスにログインする必要があります。 ECSインスタンスへの接続方法の詳細については、「ECSインスタンスへの接続」をご参照ください。

  • ScalaでIntelliJ IDEAプロジェクトを準備します。

    1. IntelliJ IDEAをインストールします。

      このプラクティスではIntelliJ IDEAが使用されます。 IntelliJ IDEAのインストール方法の詳細については、「IntelliJ IDEAのインストール」をご参照ください。

    2. Maven をインストールします。

      詳細については、「Apache Mavenのインストール」をご参照ください。

    3. Scalaプロジェクトを作成します。

      1. Scalaプラグインをダウンロードします。

        IntelliJ IDEAを起動し、ファイル > 設定を選択します。 [設定] ダイアログボックスで、左側のナビゲーションウィンドウで [プラグイン] をクリックし、Scalaプラグインカードで [インストール] をクリックします。 Scala

      2. Scala Java Development Kit (JDK) をインストールします。

        詳細については、「コンピューターへのScalaのインストール」をご参照ください。

      3. Scalaプロジェクトを作成します。

        IntelliJ IDEAで、Scala > IDEAを選択してScalaプロジェクトを作成します。 Scala project

  • MaxComputeデータを準備します。

    1. MaxComputeプロジェクトを作成します。

      MaxComputeプロジェクトの作成方法の詳細については、「MaxCompute プロジェクトを作成する」をご参照ください。

    2. AccessKeyペアを取得します。

      AccessKey IDとAccessKeyシークレットは、AccessKeyページから取得できます。

    3. MaxComputeプロジェクトのエンドポイントを取得します。

      MaxComputeプロジェクトのエンドポイントを取得します。 MaxComputeプロジェクトのエンドポイントは、MaxComputeプロジェクトの作成時に選択したリージョンとネットワーク接続方法によって異なります。 異なるリージョンとネットワーク接続方法に対応するエンドポイントの詳細については、「エンドポイント」をご参照ください。

    4. テーブルを作成します。

      この方法では、テスト用にパーティションテーブルと非パーティションテーブルを準備する必要があります。 テーブルの作成方法の詳細については、「テーブルの作成」をご参照ください。

MaxComputeからのデータの読み取りとMaxComputeへのデータの書き込み

  1. コードを書き込みます。

    次のサンプルコードは、パーティション分割されていないテーブルからデータを読み取るために使用されます。

    説明

    パーティションテーブルからのデータの読み取り、非パーティションテーブルへのデータの書き込み、およびパーティションテーブルへのデータの書き込みに使用されるサンプルコードについては、「PartitionDataReaderTest.scala」、「DataWriterTest.scala」、および「PartitionDataWriterTest.scala」をご参照ください。 ビジネス要件に基づいてコードを作成できます。

    /*
     * Licensed under the Apache License, Version 2.0 (the "License");
     * you may not use this file except in compliance with the License.
     * You may obtain a copy of the License at
     *
     *     http://www.apache.org/licenses/LICENSE-2.0
     *
     * Unless required by applicable law or agreed to in writing, software
     * distributed under the License is distributed on an "AS IS" BASIS,
     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
     * See the License for the specific language governing permissions and
     * limitations under the License.
     */
    
    import org.apache.spark.sql.SparkSession
    
    /**
      * @author renxiang
      * @date 2021-12-20
      */
    object DataReaderTest {
    
      val ODPS_DATA_SOURCE = "org.apache.spark.sql.odps.datasource.DefaultSource"
      val ODPS_ENDPOINT = "http://service.cn.maxcompute.aliyun.com/api"
    
    
      def main(args: Array[String]): Unit = {
        val odpsProject = args(0)
        val odpsAkId = args(1)
        val odpsAkKey = args(2)
        val odpsTable = args(3)
    
        val spark = SparkSession
          .builder()
          .appName("odps-datasource-reader")
          .getOrCreate()
    
        import spark._
    
        val df = spark.read.format(ODPS_DATA_SOURCE)
          .option("spark.hadoop.odps.project.name", odpsProject)
          .option("spark.hadoop.odps.access.id", odpsAkId)
          .option("spark.hadoop.odps.access.key", odpsAkKey)
          .option("spark.hadoop.odps.end.point", ODPS_ENDPOINT)
          .option("spark.hadoop.odps.table.name", odpsTable)
          .load()
    
        df.createOrReplaceTempView("odps_table")
    
        println("select * from odps_table")
        val dfFullScan = sql("select * from odps_table")
        println(dfFullScan.count)
        dfFullScan.show(10)
    
        Thread.sleep(72*3600*1000)
      }
    }
  2. コードをパッケージ化してアップロードします。

    1. Mavenを使用してコードをパッケージ化します。

      1. IntelliJ IDEAのコード開発ページの右側のウィンドウで、[Maven] をクリックします。

      2. [Maven] ダイアログボックスで、[Lifecycle] ディレクトリの [package] をダブルクリックします。

    2. オンプレミスマシンでJARファイルをコンパイルします。

      1. プロジェクトディレクトリに移動します。

        Windowsコマンドプロンプトなど、使用するオペレーティングシステムのコマンドラインウィンドウで次のコマンドを実行します。

        cd ${project.dir}/spark-datasource-v3.1
      2. 次のmvnコマンドを実行して、ソースコードをコンパイルし、JARファイルにパッケージ化します。

        mvn clean package jar:test-jar
      3. dependencies.jarおよびtests.jarファイルがtargetディレクトリに格納されているかどうかを確認します。 target directory

    3. JARファイルをサーバーにアップロードします。

      1. scpコマンドを実行して、2つのJARファイルをサーバーにアップロードします。 コマンド構文:

        scp <Directory of the JAR files on the on-premises machine> root@<Public IP address of the ECS instance>:<Directory of the JAR files on the server>

        サンプルコマンド:

        scp D:\Project\emr_mc_1\spark-datasource-v3.1\target\spark-datasource-1.0-SNAPSHOT-tests.jar root@8.xx.xx.xx:/root/emr_mc

        Upload the JAR files

      2. JARファイルを表示します。

        JARファイルを表示するには、サーバーのemr_mcディレクトリでllコマンドを実行します。 View the JAR files

      3. 次のコマンドを実行して、JARファイルを各ECSインスタンスにアップロードします。

        scp -r [Directory of the JAR files on the source server] root@Private IP address of the ECS instance:[Address of the JAR files on the destination server]
  3. コードを実行します。

    • 実行モード

      • ローカルモード

        • ローカルモードでコードを実行するためのコマンド構文

          ./bin/spark-submit \
              --master local \
              --jars ${project.dir}/spark-datasource-v3.1/target/spark-datasource-1.0-SNAPSHOT-jar-with-dependencies.jar,${project.dir}/spark-datasource-v2.3/libs/cupid-table-api-1.1.5-SNAPSHOT.jar,${project.dir}/spark-datasource-v2.3/libs/table-api-tunnel-impl-1.1.5-SNAPSHOT.jar \
              --class DataReaderTest \
              ${jar-path} \
              ${maxcompute-project-name} \
              ${aliyun-access-key-id} \
              ${aliyun-access-key-secret} \
              ${maxcompute-table-name}
        • パラメーター

          パラメーター

          説明

          master

          実行モード。 有効な値:

          • ローカル: このモードでコードを実行すると、現在のECSインスタンス上のコンピューティングリソースのみが呼び出されます。

          • Yarn: このモードでコードを実行すると、EMRクラスター内のすべてのECSインスタンスのコンピューティングリソースが呼び出されます。 Yarnモードのコード実行効率は、ローカルモードのコード実行効率よりも高くなります。

          依存関係を含むJARファイルのディレクトリ。

          class

          実行するJARファイルのクラスの名前。

          jarパス

          実行するJARファイルのディレクトリ。

          maxcompute-project-name

          MaxCompute プロジェクトの名前を設定します。

          aliyun-access-key-id

          Alibaba CloudアカウントまたはAlibaba CloudアカウントのRAMユーザーのAccessKey ID。

          AccessKeyペアページからAccessKey IDを取得できます。

          aliyun-access-key-secret

          AccessKey IDに対応するAccessKeyシークレット。

          AccessKeyペアページからAccessKey secretを取得できます。

          maxcompute-table-name

          読み取りまたは書き込み操作を実行するMaxComputeテーブルの名前。

      • 糸モード

        • Yarnモードでコードを実行するためのコマンド構文

          val ODPS_ENDPOINT = "http://service.cn-beijing.maxcompute.aliyun-inc.com/api"
          
          ./bin/spark-submit \
              --master yarn \
              --jars ${project.dir}/spark-datasource-v3.1/target/spark-datasource-1.0-SNAPSHOT-jar-with-dependencies.jar,${project.dir}/spark-datasource-v2.3/libs/cupid-table-api-1.1.5-SNAPSHOT.jar,${project.dir}/spark-datasource-v2.3/libs/table-api-tunnel-impl-1.1.5-SNAPSHOT.jar \
              --class DataReaderTest \
              ${jar-path} \
              ${maxcompute-project-name} \
              ${aliyun-access-key-id} \
              ${aliyun-access-key-secret} \
              ${maxcompute-table-name}
        • パラメーター

          パラメーター

          説明

          master

          実行モード。 有効な値:

          • ローカル: このモードでコードを実行すると、現在のECSインスタンス上のコンピューティングリソースのみが呼び出されます。

          • Yarn: このモードでコードを実行すると、EMRクラスター内のすべてのECSインスタンスのコンピューティングリソースが呼び出されます。 Yarnモードのコード実行効率は、ローカルモードのコード実行効率よりも高くなります。

          依存関係を含むJARファイルのディレクトリ。

          class

          実行するJARファイルのクラスの名前。

          jarパス

          実行するJARファイルのディレクトリ。

          maxcompute-project-name

          MaxCompute プロジェクトの名前を設定します。

          aliyun-access-key-id

          Alibaba CloudアカウントまたはAlibaba CloudアカウントのRAMユーザーのAccessKey ID。

          AccessKeyペアページからAccessKey IDを取得できます。

          aliyun-access-key-secret

          AccessKey IDに対応するAccessKeyシークレット。

          AccessKeyペアページからAccessKey secretを取得できます。

          maxcompute-table-name

          読み取りまたは書き込み操作を実行するMaxComputeテーブルの名前。

    • 例1: MaxComputeのパーティション分割されていないテーブルからデータを読み取ります。

      • コマンド構文

        -- Go to the Spark directory.
        cd /usr/lib/spark-current
        
        -- Submit a task.
        ./bin/spark-submit \
            --master local \
            --jars ${project.dir}/spark-datasource-v3.1/target/spark-datasource-1.0-SNAPSHOT-jar-with-dependencies.jar,${project.dir}/spark-datasource-v2.3/libs/cupid-table-api-1.1.5-SNAPSHOT.jar,${project.dir}/spark-datasource-v2.3/libs/table-api-tunnel-impl-1.1.5-SNAPSHOT.jar \
            --class DataReaderTest \
            ${project.dir}/spark-datasource-v3.1/target/spark-datasource-1.0-SNAPSHOT-tests.jar \
            ${maxcompute-project-name} \
            ${aliyun-access-key-id} \
            ${aliyun-access-key-secret} \
            ${maxcompute-table-name}
      • 次のサンプルコードは、操作の実行方法を示しています。 Read data from a non-partitioned table

      • 実行結果を次の図に示します。 Execution result

    • 例2: MaxComputeのパーティションテーブルからデータを読み取ります。

      • コマンド構文

        -- Go to the Spark directory.
        cd /usr/lib/spark-current
        
        -- Submit a task.
        ./bin/spark-submit \
            --master local \
            --jars ${project.dir}/spark-datasource-v3.1/target/spark-datasource-1.0-SNAPSHOT-jar-with-dependencies.jar,${project.dir}/spark-datasource-v2.3/libs/cupid-table-api-1.1.5-SNAPSHOT.jar,${project.dir}/spark-datasource-v2.3/libs/table-api-tunnel-impl-1.1.5-SNAPSHOT.jar \
            --class PartitionDataReaderTest \
            ${project.dir}/spark-datasource-v3.1/target/spark-datasource-1.0-SNAPSHOT-tests.jar \
            ${maxcompute-project-name} \
            ${aliyun-access-key-id} \
            ${aliyun-access-key-secret} \
            ${maxcompute-table-name} \
            ${partition-descripion}
      • 次のサンプルコードは、操作の実行方法を示しています。 Read data from a partitioned table

      • 実行結果を次の図に示します。 Execution result

    • 例3: MaxComputeのパーティション分割されていないテーブルにデータを書き込みます。

      • コマンド構文

        ./bin/spark-submit \
            --master local \
            --jars ${project.dir}/spark-datasource-v3.1/target/spark-datasource-1.0-SNAPSHOT-jar-with-dependencies.jar,${project.dir}/spark-datasource-v2.3/libs/cupid-table-api-1.1.5-SNAPSHOT.jar,${project.dir}/spark-datasource-v2.3/libs/table-api-tunnel-impl-1.1.5-SNAPSHOT.jar \
            --class DataWriterTest \
            ${project.dir}/spark-datasource-v3.1/target/spark-datasource-1.0-SNAPSHOT-tests.jar \
            ${maxcompute-project-name} \
            ${aliyun-access-key-id} \
            ${aliyun-access-key-secret} \
            ${maxcompute-table-name}
      • 次のサンプルコードは、操作の実行方法を示しています。 Write data to a non-partitioned table

      • 実行結果を次の図に示します。 Execution result

    • 例4: MaxComputeのパーティションテーブルへのデータの書き込み

      • コマンド構文

        ./bin/spark-submit \
            --master local \
            --jars ${project.dir}/spark-datasource-v3.1/target/spark-datasource-1.0-SNAPSHOT-jar-with-dependencies.jar,${project.dir}/spark-datasource-v2.3/libs/cupid-table-api-1.1.5-SNAPSHOT.jar,${project.dir}/spark-datasource-v2.3/libs/table-api-tunnel-impl-1.1.5-SNAPSHOT.jar \
            --class PartitionDataWriterTest \
            ${project.dir}/spark-datasource-v3.1/target/spark-datasource-1.0-SNAPSHOT-tests.jar \
            ${maxcompute-project-name} \
            ${aliyun-access-key-id} \
            ${aliyun-access-key-secret} \
            ${maxcompute-table-name} \
            ${partition-descripion}
      • 次のサンプルコードは、操作の実行方法を示しています。 Write data to a partitioned table

      • 実行結果を次の図に示します。 Execution result

パフォーマンステストの実行

このプラクティスでは、クラウド相互接続にEMRとMaxComputeが使用されます。 データセンターがMaxComputeに接続されている場合、読み書きのパフォーマンスは、Tunnelリソースまたは物理接続の帯域幅に依存します。

  • インスタンス仕様

    インスタンス

    リソースプランの仕様

    EMRクラスター

    • マスターノード数: 2。

      • ECSインスタンスの仕様: ecs.c6.2xlarge、8 vCPU、16 GiBのメモリ、2.5 Gbit/s。

      • システムディスク: 120 GiBのメモリを備えた1つの拡張SSD (ESSD) 。

      • データディスク: 80 GiBのメモリを備えた1台のESSD。

    • コアノード数: 2。

      • ECSインスタンスの仕様: ecs.c6.2xlarge、8 vCPU、16 GiBのメモリ、2.5 Gbit/s。

      • システムディスク: 120 GiBのメモリを備えた1台のESSD。

      • データディスク: 4つのESSD (各ディスクに80 GiBのメモリ) 。

    MaxCompute

    従量課金スタンダードエディション。

  • 大きなテーブルからデータを読み取るテストを実行します。

    次の表に、大きなテーブルに関する情報を示します。

    項目

    説明

    テーブル名

    dwd_product_movie_basic_info

    説明

    このテーブルは、MAXCOMPUTE_PUBLIC_DATAプロジェクトのパブリックデータセットのテーブルです。 パブリックデータセットの詳細については、「パブリックデータセット」をご参照ください。

    テーブルサイズ

    4829258484バイト

    パーティションの数

    593

    データが読み取られるパーティションの名前

    20170422

    以下の図にテスト結果を示します。 Test result操作には0.850871秒かかります。

  • 大きなテーブルにデータを書き込むテストを実行します。

    • 数万のデータレコードをパーティションに書き込みます。 Data write operation 1操作には2.533892秒かかります。

    • 数十万のデータレコードをパーティションに書き込みます。 Data write operation 2操作には8.441193秒かかります。

    • 数百万のデータレコードをパーティションに書き込みます。 Data write operation 3操作には73.28秒かかります。