既存のデータレイクハウスアーキテクチャでは、MaxComputeはHadoopクラスターからデータを読み書きするハブとして機能します。 データセンターがデプロイされているシナリオでは、クラスター情報をインターネットに公開したくない場合、開発者はオンプレミスのHadoopクラスターからクラウド内のデータにアクセスする必要があります。 このトピックでは、Alibaba Cloud E-MapReduce (EMR) クラスターをオンプレミスHadoopクラスターとして使用して、MaxComputeからデータを読み書きする方法について説明します。
背景情報
次の図は、このプラクティスで使用されるデータセンターアーキテクチャを示しています。 
開発環境の準備
EMRクラスターを準備します。
EMRクラスターを購入します。
詳細については、「EMRの使い方」をご参照ください。
EMRクラスターにログインします。
説明EMRクラスターへのログイン方法の詳細については、「クラスターへのログイン」をご参照ください。
この方法では、EMRクラスターのElastic Compute Service (ECS) インスタンスにログインする必要があります。 ECSインスタンスへの接続方法の詳細については、「ECSインスタンスへの接続」をご参照ください。
ScalaでIntelliJ IDEAプロジェクトを準備します。
IntelliJ IDEAをインストールします。
このプラクティスではIntelliJ IDEAが使用されます。 IntelliJ IDEAのインストール方法の詳細については、「IntelliJ IDEAのインストール」をご参照ください。
Maven をインストールします。
詳細については、「Apache Mavenのインストール」をご参照ください。
Scalaプロジェクトを作成します。
Scalaプラグインをダウンロードします。
IntelliJ IDEAを起動し、ファイル > 設定を選択します。 [設定] ダイアログボックスで、左側のナビゲーションウィンドウで [プラグイン] をクリックし、Scalaプラグインカードで [インストール] をクリックします。

Scala Java Development Kit (JDK) をインストールします。
詳細については、「コンピューターへのScalaのインストール」をご参照ください。
Scalaプロジェクトを作成します。
IntelliJ IDEAで、Scala > IDEAを選択してScalaプロジェクトを作成します。

MaxComputeデータを準備します。
MaxComputeプロジェクトを作成します。
MaxComputeプロジェクトの作成方法の詳細については、「MaxCompute プロジェクトを作成する」をご参照ください。
AccessKeyペアを取得します。
AccessKey IDとAccessKeyシークレットは、AccessKeyページから取得できます。
MaxComputeプロジェクトのエンドポイントを取得します。
MaxComputeプロジェクトのエンドポイントを取得します。 MaxComputeプロジェクトのエンドポイントは、MaxComputeプロジェクトの作成時に選択したリージョンとネットワーク接続方法によって異なります。 異なるリージョンとネットワーク接続方法に対応するエンドポイントの詳細については、「エンドポイント」をご参照ください。
テーブルを作成します。
この方法では、テスト用にパーティションテーブルと非パーティションテーブルを準備する必要があります。 テーブルの作成方法の詳細については、「テーブルの作成」をご参照ください。
MaxComputeからのデータの読み取りとMaxComputeへのデータの書き込み
コードを書き込みます。
次のサンプルコードは、パーティション分割されていないテーブルからデータを読み取るために使用されます。
説明パーティションテーブルからのデータの読み取り、非パーティションテーブルへのデータの書き込み、およびパーティションテーブルへのデータの書き込みに使用されるサンプルコードについては、「PartitionDataReaderTest.scala」、「DataWriterTest.scala」、および「PartitionDataWriterTest.scala」をご参照ください。 ビジネス要件に基づいてコードを作成できます。
/* * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ import org.apache.spark.sql.SparkSession /** * @author renxiang * @date 2021-12-20 */ object DataReaderTest { val ODPS_DATA_SOURCE = "org.apache.spark.sql.odps.datasource.DefaultSource" val ODPS_ENDPOINT = "http://service.cn.maxcompute.aliyun.com/api" def main(args: Array[String]): Unit = { val odpsProject = args(0) val odpsAkId = args(1) val odpsAkKey = args(2) val odpsTable = args(3) val spark = SparkSession .builder() .appName("odps-datasource-reader") .getOrCreate() import spark._ val df = spark.read.format(ODPS_DATA_SOURCE) .option("spark.hadoop.odps.project.name", odpsProject) .option("spark.hadoop.odps.access.id", odpsAkId) .option("spark.hadoop.odps.access.key", odpsAkKey) .option("spark.hadoop.odps.end.point", ODPS_ENDPOINT) .option("spark.hadoop.odps.table.name", odpsTable) .load() df.createOrReplaceTempView("odps_table") println("select * from odps_table") val dfFullScan = sql("select * from odps_table") println(dfFullScan.count) dfFullScan.show(10) Thread.sleep(72*3600*1000) } }コードをパッケージ化してアップロードします。
Mavenを使用してコードをパッケージ化します。
IntelliJ IDEAのコード開発ページの右側のウィンドウで、[Maven] をクリックします。
[Maven] ダイアログボックスで、[Lifecycle] ディレクトリの [package] をダブルクリックします。
オンプレミスマシンでJARファイルをコンパイルします。
プロジェクトディレクトリに移動します。
Windowsコマンドプロンプトなど、使用するオペレーティングシステムのコマンドラインウィンドウで次のコマンドを実行します。
cd ${project.dir}/spark-datasource-v3.1次の
mvnコマンドを実行して、ソースコードをコンパイルし、JARファイルにパッケージ化します。mvn clean package jar:test-jardependencies.jarおよびtests.jarファイルが
targetディレクトリに格納されているかどうかを確認します。
JARファイルをサーバーにアップロードします。
scpコマンドを実行して、2つのJARファイルをサーバーにアップロードします。 コマンド構文:scp <Directory of the JAR files on the on-premises machine> root@<Public IP address of the ECS instance>:<Directory of the JAR files on the server>サンプルコマンド:
scp D:\Project\emr_mc_1\spark-datasource-v3.1\target\spark-datasource-1.0-SNAPSHOT-tests.jar root@8.xx.xx.xx:/root/emr_mc
JARファイルを表示します。
JARファイルを表示するには、サーバーの
emr_mcディレクトリでllコマンドを実行します。
次のコマンドを実行して、JARファイルを各ECSインスタンスにアップロードします。
scp -r [Directory of the JAR files on the source server] root@Private IP address of the ECS instance:[Address of the JAR files on the destination server]
コードを実行します。
実行モード
ローカルモード
ローカルモードでコードを実行するためのコマンド構文
./bin/spark-submit \ --master local \ --jars ${project.dir}/spark-datasource-v3.1/target/spark-datasource-1.0-SNAPSHOT-jar-with-dependencies.jar,${project.dir}/spark-datasource-v2.3/libs/cupid-table-api-1.1.5-SNAPSHOT.jar,${project.dir}/spark-datasource-v2.3/libs/table-api-tunnel-impl-1.1.5-SNAPSHOT.jar \ --class DataReaderTest \ ${jar-path} \ ${maxcompute-project-name} \ ${aliyun-access-key-id} \ ${aliyun-access-key-secret} \ ${maxcompute-table-name}パラメーター
パラメーター
説明
master
実行モード。 有効な値:
ローカル: このモードでコードを実行すると、現在のECSインスタンス上のコンピューティングリソースのみが呼び出されます。
Yarn: このモードでコードを実行すると、EMRクラスター内のすべてのECSインスタンスのコンピューティングリソースが呼び出されます。 Yarnモードのコード実行効率は、ローカルモードのコード実行効率よりも高くなります。
瓶
依存関係を含むJARファイルのディレクトリ。
class
実行するJARファイルのクラスの名前。
jarパス
実行するJARファイルのディレクトリ。
maxcompute-project-name
MaxCompute プロジェクトの名前を設定します。
aliyun-access-key-id
Alibaba CloudアカウントまたはAlibaba CloudアカウントのRAMユーザーのAccessKey ID。
AccessKeyペアページからAccessKey IDを取得できます。
aliyun-access-key-secret
AccessKey IDに対応するAccessKeyシークレット。
AccessKeyペアページからAccessKey secretを取得できます。
maxcompute-table-name
読み取りまたは書き込み操作を実行するMaxComputeテーブルの名前。
糸モード
Yarnモードでコードを実行するためのコマンド構文
val ODPS_ENDPOINT = "http://service.cn-beijing.maxcompute.aliyun-inc.com/api" ./bin/spark-submit \ --master yarn \ --jars ${project.dir}/spark-datasource-v3.1/target/spark-datasource-1.0-SNAPSHOT-jar-with-dependencies.jar,${project.dir}/spark-datasource-v2.3/libs/cupid-table-api-1.1.5-SNAPSHOT.jar,${project.dir}/spark-datasource-v2.3/libs/table-api-tunnel-impl-1.1.5-SNAPSHOT.jar \ --class DataReaderTest \ ${jar-path} \ ${maxcompute-project-name} \ ${aliyun-access-key-id} \ ${aliyun-access-key-secret} \ ${maxcompute-table-name}パラメーター
パラメーター
説明
master
実行モード。 有効な値:
ローカル: このモードでコードを実行すると、現在のECSインスタンス上のコンピューティングリソースのみが呼び出されます。
Yarn: このモードでコードを実行すると、EMRクラスター内のすべてのECSインスタンスのコンピューティングリソースが呼び出されます。 Yarnモードのコード実行効率は、ローカルモードのコード実行効率よりも高くなります。
瓶
依存関係を含むJARファイルのディレクトリ。
class
実行するJARファイルのクラスの名前。
jarパス
実行するJARファイルのディレクトリ。
maxcompute-project-name
MaxCompute プロジェクトの名前を設定します。
aliyun-access-key-id
Alibaba CloudアカウントまたはAlibaba CloudアカウントのRAMユーザーのAccessKey ID。
AccessKeyペアページからAccessKey IDを取得できます。
aliyun-access-key-secret
AccessKey IDに対応するAccessKeyシークレット。
AccessKeyペアページからAccessKey secretを取得できます。
maxcompute-table-name
読み取りまたは書き込み操作を実行するMaxComputeテーブルの名前。
例1: MaxComputeのパーティション分割されていないテーブルからデータを読み取ります。
コマンド構文
-- Go to the Spark directory. cd /usr/lib/spark-current -- Submit a task. ./bin/spark-submit \ --master local \ --jars ${project.dir}/spark-datasource-v3.1/target/spark-datasource-1.0-SNAPSHOT-jar-with-dependencies.jar,${project.dir}/spark-datasource-v2.3/libs/cupid-table-api-1.1.5-SNAPSHOT.jar,${project.dir}/spark-datasource-v2.3/libs/table-api-tunnel-impl-1.1.5-SNAPSHOT.jar \ --class DataReaderTest \ ${project.dir}/spark-datasource-v3.1/target/spark-datasource-1.0-SNAPSHOT-tests.jar \ ${maxcompute-project-name} \ ${aliyun-access-key-id} \ ${aliyun-access-key-secret} \ ${maxcompute-table-name}次のサンプルコードは、操作の実行方法を示しています。

実行結果を次の図に示します。

例2: MaxComputeのパーティションテーブルからデータを読み取ります。
コマンド構文
-- Go to the Spark directory. cd /usr/lib/spark-current -- Submit a task. ./bin/spark-submit \ --master local \ --jars ${project.dir}/spark-datasource-v3.1/target/spark-datasource-1.0-SNAPSHOT-jar-with-dependencies.jar,${project.dir}/spark-datasource-v2.3/libs/cupid-table-api-1.1.5-SNAPSHOT.jar,${project.dir}/spark-datasource-v2.3/libs/table-api-tunnel-impl-1.1.5-SNAPSHOT.jar \ --class PartitionDataReaderTest \ ${project.dir}/spark-datasource-v3.1/target/spark-datasource-1.0-SNAPSHOT-tests.jar \ ${maxcompute-project-name} \ ${aliyun-access-key-id} \ ${aliyun-access-key-secret} \ ${maxcompute-table-name} \ ${partition-descripion}次のサンプルコードは、操作の実行方法を示しています。

実行結果を次の図に示します。

例3: MaxComputeのパーティション分割されていないテーブルにデータを書き込みます。
コマンド構文
./bin/spark-submit \ --master local \ --jars ${project.dir}/spark-datasource-v3.1/target/spark-datasource-1.0-SNAPSHOT-jar-with-dependencies.jar,${project.dir}/spark-datasource-v2.3/libs/cupid-table-api-1.1.5-SNAPSHOT.jar,${project.dir}/spark-datasource-v2.3/libs/table-api-tunnel-impl-1.1.5-SNAPSHOT.jar \ --class DataWriterTest \ ${project.dir}/spark-datasource-v3.1/target/spark-datasource-1.0-SNAPSHOT-tests.jar \ ${maxcompute-project-name} \ ${aliyun-access-key-id} \ ${aliyun-access-key-secret} \ ${maxcompute-table-name}次のサンプルコードは、操作の実行方法を示しています。

実行結果を次の図に示します。

例4: MaxComputeのパーティションテーブルへのデータの書き込み
コマンド構文
./bin/spark-submit \ --master local \ --jars ${project.dir}/spark-datasource-v3.1/target/spark-datasource-1.0-SNAPSHOT-jar-with-dependencies.jar,${project.dir}/spark-datasource-v2.3/libs/cupid-table-api-1.1.5-SNAPSHOT.jar,${project.dir}/spark-datasource-v2.3/libs/table-api-tunnel-impl-1.1.5-SNAPSHOT.jar \ --class PartitionDataWriterTest \ ${project.dir}/spark-datasource-v3.1/target/spark-datasource-1.0-SNAPSHOT-tests.jar \ ${maxcompute-project-name} \ ${aliyun-access-key-id} \ ${aliyun-access-key-secret} \ ${maxcompute-table-name} \ ${partition-descripion}次のサンプルコードは、操作の実行方法を示しています。

実行結果を次の図に示します。

パフォーマンステストの実行
このプラクティスでは、クラウド相互接続にEMRとMaxComputeが使用されます。 データセンターがMaxComputeに接続されている場合、読み書きのパフォーマンスは、Tunnelリソースまたは物理接続の帯域幅に依存します。
インスタンス仕様
インスタンス
リソースプランの仕様
EMRクラスター
マスターノード数: 2。
ECSインスタンスの仕様: ecs.c6.2xlarge、8 vCPU、16 GiBのメモリ、2.5 Gbit/s。
システムディスク: 120 GiBのメモリを備えた1つの拡張SSD (ESSD) 。
データディスク: 80 GiBのメモリを備えた1台のESSD。
コアノード数: 2。
ECSインスタンスの仕様: ecs.c6.2xlarge、8 vCPU、16 GiBのメモリ、2.5 Gbit/s。
システムディスク: 120 GiBのメモリを備えた1台のESSD。
データディスク: 4つのESSD (各ディスクに80 GiBのメモリ) 。
MaxCompute
従量課金スタンダードエディション。
大きなテーブルからデータを読み取るテストを実行します。
次の表に、大きなテーブルに関する情報を示します。
項目
説明
テーブル名
dwd_product_movie_basic_info
説明このテーブルは、MAXCOMPUTE_PUBLIC_DATAプロジェクトのパブリックデータセットのテーブルです。 パブリックデータセットの詳細については、「パブリックデータセット」をご参照ください。
テーブルサイズ
4829258484バイト
パーティションの数
593
データが読み取られるパーティションの名前
20170422
以下の図にテスト結果を示します。
操作には0.850871秒かかります。 大きなテーブルにデータを書き込むテストを実行します。
数万のデータレコードをパーティションに書き込みます。
操作には2.533892秒かかります。 数十万のデータレコードをパーティションに書き込みます。
操作には8.441193秒かかります。 数百万のデータレコードをパーティションに書き込みます。
操作には73.28秒かかります。